code formatted

This commit is contained in:
sandro.labruzzo 2024-12-18 11:17:42 +01:00
parent b7357e18b2
commit d8124b947e
2 changed files with 48 additions and 35 deletions

View File

@ -2,8 +2,7 @@ package eu.dnetlib.dhp.oa.graph.minidump
import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.schema.oaf.{Relation, StructuredProperty} import eu.dnetlib.dhp.schema.oaf.{Relation, StructuredProperty}
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.functions.{col, from_json, monotonically_increasing_id, row_number, size}
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
@ -22,31 +21,35 @@ class SparkCreateMiniDumpGraph(propertyPath: String, args: Array[String], log: L
// generateMiniDump(spark, sourcePath, targetPath) // generateMiniDump(spark, sourcePath, targetPath)
} }
def generateMiniDump(spark: SparkSession, sourcePath: String, pidListPath: String, targetPath:String): Unit = { def generateMiniDump(spark: SparkSession, sourcePath: String, pidListPath: String, targetPath: String): Unit = {
import spark.implicits._ import spark.implicits._
val pidSchema = new StructType().add("pid", StringType).add("pidType", StringType).add("type", StringType) val pidSchema = new StructType().add("pid", StringType).add("pidType", StringType).add("type", StringType)
val idSchema = new StructType().add("id", StringType) val idSchema = new StructType().add("id", StringType)
val idWithPidSchema = new StructType() val idWithPidSchema = new StructType()
.add("id", StringType). .add("id", StringType)
add("pid", ArrayType(Encoders.bean(classOf[StructuredProperty]).schema)) .add("pid", ArrayType(Encoders.bean(classOf[StructuredProperty]).schema))
val typologies = List("publication", "dataset", "software", "otherresearchproduct", "project", "organization") val typologies = List("publication", "dataset", "software", "otherresearchproduct", "project", "organization")
val pidList = spark.read.schema(pidSchema).json(pidListPath) val pidList = spark.read.schema(pidSchema).json(pidListPath)
typologies.foreach(t => {
typologies.foreach(t => {
println(s"filtering $t") println(s"filtering $t")
val entity = spark.read.schema(idWithPidSchema).json(s"$sourcePath/$t") val entity = spark.read
.selectExpr("explode(pid) as pids", "id") .schema(idWithPidSchema)
.selectExpr("id", "pids.value as pid", "pids.qualifier.classid as pidType") .json(s"$sourcePath/$t")
.distinct() .selectExpr("explode(pid) as pids", "id")
val filerId = entity.join(pidList, pidList("pid") === entity("pid")) .selectExpr("id", "pids.value as pid", "pids.qualifier.classid as pidType")
.select("id").distinct() .distinct()
val filerId = entity
.join(pidList, pidList("pid") === entity("pid"))
.select("id")
.distinct()
val currentEntity = spark.read.text(s"$sourcePath/$t") val currentEntity = spark.read.text(s"$sourcePath/$t")
val resultWithId=currentEntity.withColumn("jsonData",from_json(col("value"),idSchema)).selectExpr("jsonData.id as id", "value") val resultWithId =
resultWithId.join(filerId, resultWithId("id") === filerId("id")) currentEntity.withColumn("jsonData", from_json(col("value"), idSchema)).selectExpr("jsonData.id as id", "value")
resultWithId
.join(filerId, resultWithId("id") === filerId("id"))
.select("value") .select("value")
.repartition(10) .repartition(10)
.write .write
@ -55,28 +58,32 @@ class SparkCreateMiniDumpGraph(propertyPath: String, args: Array[String], log: L
.text(s"$targetPath/$t") .text(s"$targetPath/$t")
}) })
val emptyDataset = spark.createDataset(Seq.empty[String]).toDF("id")
val emptyDataset = spark.createDataset(Seq.empty[String]).toDF("id") typologies
.foldLeft(emptyDataset)((res, item) => {
typologies.foldLeft(emptyDataset)((res, item) => { println(s"adding $item")
println(s"adding $item") res.union(
res.union( spark.read
spark.read .schema(idSchema)
.schema(idSchema) .json(s"$sourcePath/$item")
.json(s"$sourcePath/$item") .selectExpr("id")
.selectExpr("id") .distinct()
.distinct() )
) })
}).distinct() .distinct()
.write.mode(SaveMode.Overwrite).save(s"$targetPath/usedIds") .write
.mode(SaveMode.Overwrite)
.save(s"$targetPath/usedIds")
val filteredIds = spark.read.load(s"$targetPath/usedIds") val filteredIds = spark.read.load(s"$targetPath/usedIds")
val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(s"$sourcePath/relation") val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(s"$sourcePath/relation")
val filteredRelations = relations.join(filteredIds, relations("source") === filteredIds("id") || relations("target") === filteredIds("id")) val filteredRelations = relations.join(
filteredIds,
relations("source") === filteredIds("id") || relations("target") === filteredIds("id")
)
filteredRelations filteredRelations.write
.write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(s"$targetPath/relation") .json(s"$targetPath/relation")

View File

@ -9,9 +9,15 @@ class MiniDumpTest {
def testMiniDumpGeneration(): Unit = { def testMiniDumpGeneration(): Unit = {
val spark = SparkSession.builder().appName("MiniDumpTest").master("local[*]").getOrCreate() val spark = SparkSession.builder().appName("MiniDumpTest").master("local[*]").getOrCreate()
val sparkCreateMiniDumpGraph = new SparkCreateMiniDumpGraph("src/test/resources/application.properties", Array(), null) val sparkCreateMiniDumpGraph =
new SparkCreateMiniDumpGraph("src/test/resources/application.properties", Array(), null)
sparkCreateMiniDumpGraph.generateMiniDump(spark, "/home/sandro/OGraph/05_graph_inferred", "/home/sandro/OGraph/pid_json", "/home/sandro/OGraph/minidump") sparkCreateMiniDumpGraph.generateMiniDump(
spark,
"/home/sandro/OGraph/05_graph_inferred",
"/home/sandro/OGraph/pid_json",
"/home/sandro/OGraph/minidump"
)
} }