wip minidump
This commit is contained in:
parent
547bae21a0
commit
82f92f23dd
|
@ -2,10 +2,11 @@ package eu.dnetlib.dhp.oa.graph.minidump
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation
|
import eu.dnetlib.dhp.schema.oaf.Relation
|
||||||
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils
|
import org.apache.spark.sql.expressions.Window
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
|
import org.apache.spark.sql.functions.{col, from_json, monotonically_increasing_id, row_number}
|
||||||
|
import org.apache.spark.sql.types._
|
||||||
|
import org.apache.spark.sql.{Encoders, SaveMode, SparkSession}
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
import org.apache.spark.sql.functions.{concat, first}
|
|
||||||
|
|
||||||
class SparkCreateMiniDumpGraph(propertyPath: String, args: Array[String], log: Logger)
|
class SparkCreateMiniDumpGraph(propertyPath: String, args: Array[String], log: Logger)
|
||||||
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
|
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
|
||||||
|
@ -18,55 +19,35 @@ class SparkCreateMiniDumpGraph(propertyPath: String, args: Array[String], log: L
|
||||||
log.info("sourcePath: {}", sourcePath)
|
log.info("sourcePath: {}", sourcePath)
|
||||||
val targetPath = parser.get("targetPath")
|
val targetPath = parser.get("targetPath")
|
||||||
log.info("targetPath: {}", targetPath)
|
log.info("targetPath: {}", targetPath)
|
||||||
generateMiniSetOfRelationships(spark, sourcePath, targetPath)
|
generateMiniDump(spark, sourcePath, targetPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def generateMiniSetOfRelationships(spark: SparkSession, sourcePath: String, targetPath: String): Unit = {
|
private def generateMiniDump(spark: SparkSession, sourcePath: String, targetPath: String): Unit = {
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
implicit val relationEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
|
val relation = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(s"$sourcePath/relation")
|
||||||
val relation = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(s"$sourcePath/relation").as[Relation]
|
val strategy = Window.partitionBy("relClass").orderBy(monotonically_increasing_id())
|
||||||
|
var relationSample = relation.withColumn("row_number", row_number().over(strategy))
|
||||||
val allRelTypes: List[String] = relation.select("relClass").distinct.collect().map(r => r.getString(0)).toList
|
.where("row_number <= 1000")
|
||||||
|
.drop("row_number")
|
||||||
val emptyDataset: Dataset[Relation] = spark.emptyDataset[Relation]
|
relationSample.write.mode(SaveMode.Overwrite).option("compression", "gzip").json(s"$targetPath/relation")
|
||||||
val relationDump = allRelTypes.foldLeft[Dataset[Relation]](emptyDataset)((res, relType) => {
|
val typology = List("publication", "dataset", "software", "otherresearchproduct", "project", "organization")
|
||||||
res.union(relation.filter($"relClass" === relType).limit(1000))
|
relationSample= spark.read.json(s"$targetPath/relation")
|
||||||
res
|
val relId = relationSample.selectExpr("source as id").union(relationSample.selectExpr("target as id")).distinct()
|
||||||
})
|
val idSchema = new StructType().add("id", StringType)
|
||||||
|
typology.foreach(item => {
|
||||||
relationDump
|
val entity = spark.read.json(s"$sourcePath/$item")
|
||||||
.flatMap(r => {
|
val resultWithId=entity.withColumn("jsonData",from_json(col("value"),idSchema)).selectExpr("jsonData.id as id", "value")
|
||||||
val inverseRelation = new Relation
|
resultWithId.join(relId, resultWithId("id") === relId("id"), "leftSemi")
|
||||||
inverseRelation.setSource(r.getTarget)
|
.select("value")
|
||||||
inverseRelation.setRelType(r.getRelType)
|
.repartition(20)
|
||||||
inverseRelation.setRelClass(ScholixUtils.invRel(r.getRelClass))
|
|
||||||
inverseRelation.setProperties(r.getProperties)
|
|
||||||
inverseRelation.setTarget(r.getSource)
|
|
||||||
inverseRelation.setSubRelType(r.getSubRelType)
|
|
||||||
inverseRelation.setCollectedfrom(r.getCollectedfrom)
|
|
||||||
inverseRelation.setDataInfo(r.getDataInfo)
|
|
||||||
inverseRelation.setLastupdatetimestamp(r.getLastupdatetimestamp)
|
|
||||||
inverseRelation.setValidated(r.getValidated)
|
|
||||||
List(r, inverseRelation)
|
|
||||||
})
|
|
||||||
.withColumn("id", concat($"source", $"relType", $"target"))
|
|
||||||
.groupBy("id")
|
|
||||||
.agg(
|
|
||||||
first("source").as("source"),
|
|
||||||
first("relType").as("relType"),
|
|
||||||
first("target").as("target"),
|
|
||||||
first("relClass").as("relClass"),
|
|
||||||
first("properties").as("properties"),
|
|
||||||
first("subRelType").as("subRelType"),
|
|
||||||
first("collectedfrom").as("collectedfrom"),
|
|
||||||
first("dataInfo").as("dataInfo"),
|
|
||||||
first("lastupdatetimestamp").as("lastupdatetimestamp"),
|
|
||||||
first("validated").as("validated")
|
|
||||||
)
|
|
||||||
.drop("id")
|
|
||||||
.write
|
.write
|
||||||
.mode("overwrite")
|
.mode(SaveMode.Overwrite)
|
||||||
.json(s"$targetPath/relation")
|
.option("compression", "gzip")
|
||||||
|
.text(s"$targetPath/$item")
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue