implemented generation of miniDUMP starting from a jsonFile with schema pid pidtype

This commit is contained in:
sandro.labruzzo 2024-12-18 09:48:04 +01:00
parent e6e84d2f1d
commit b7357e18b2
2 changed files with 30 additions and 22 deletions

View File

@ -31,26 +31,10 @@ class SparkCreateMiniDumpGraph(propertyPath: String, args: Array[String], log: L
add("pid", ArrayType(Encoders.bean(classOf[StructuredProperty]).schema)) add("pid", ArrayType(Encoders.bean(classOf[StructuredProperty]).schema))
val typologies = List("publication", "dataset", "software", "otherresearchproduct", "project", "organization") val typologies = List("publication", "dataset", "software", "otherresearchproduct", "project", "organization")
val emptyDataset = spark.emptyDataFrame
val pidList = spark.read.schema(pidSchema).json(pidListPath) val pidList = spark.read.schema(pidSchema).json(pidListPath)
val filteredIds = typologies.foldLeft(emptyDataset)((res, item) => {
println(s"adding $item")
res.union(
spark.read
.schema(idWithPidSchema)
.json(s"$sourcePath/$item")
.selectExpr("explode(pid) as pids", "id")
.selectExpr("id", "pids.value as pid", "pids.qualifier.classid as pidType")
.distinct()
)
}).distinct()
.createOrReplaceTempView("filteredIds")
typologies.foreach(t => { typologies.foreach(t => {
println(s"filtering $t") println(s"filtering $t")
val entity = spark.read.schema(idWithPidSchema).json(s"$sourcePath/$t") val entity = spark.read.schema(idWithPidSchema).json(s"$sourcePath/$t")
@ -72,6 +56,30 @@ class SparkCreateMiniDumpGraph(propertyPath: String, args: Array[String], log: L
}) })
val emptyDataset = spark.createDataset(Seq.empty[String]).toDF("id")
typologies.foldLeft(emptyDataset)((res, item) => {
println(s"adding $item")
res.union(
spark.read
.schema(idSchema)
.json(s"$sourcePath/$item")
.selectExpr("id")
.distinct()
)
}).distinct()
.write.mode(SaveMode.Overwrite).save(s"$targetPath/usedIds")
val filteredIds = spark.read.load(s"$targetPath/usedIds")
val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(s"$sourcePath/relation")
val filteredRelations = relations.join(filteredIds, relations("source") === filteredIds("id") || relations("target") === filteredIds("id"))
filteredRelations
.write
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(s"$targetPath/relation")
} }
} }

View File

@ -7,11 +7,11 @@ class MiniDumpTest {
@Test @Test
def testMiniDumpGeneration(): Unit = { def testMiniDumpGeneration(): Unit = {
// val spark = SparkSession.builder().appName("MiniDumpTest").master("local[*]").getOrCreate() val spark = SparkSession.builder().appName("MiniDumpTest").master("local[*]").getOrCreate()
//
// val sparkCreateMiniDumpGraph = new SparkCreateMiniDumpGraph("src/test/resources/application.properties", Array(), null) val sparkCreateMiniDumpGraph = new SparkCreateMiniDumpGraph("src/test/resources/application.properties", Array(), null)
//
// sparkCreateMiniDumpGraph.generateMiniDump(spark, "/home/sandro/OGraph/05_graph_inferred", "/home/sandro/OGraph/pid_json", "/home/sandro/OGraph/minidump") sparkCreateMiniDumpGraph.generateMiniDump(spark, "/home/sandro/OGraph/05_graph_inferred", "/home/sandro/OGraph/pid_json", "/home/sandro/OGraph/minidump")
} }