From b7357e18b28c4566cdaf61b7bcb88f47595f02e9 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Wed, 18 Dec 2024 09:48:04 +0100 Subject: [PATCH] implemented generation of miniDUMP starting from a jsonFile with schema pid pidtype --- .../minidump/SparkCreateMiniDumpGraph.scala | 42 +++++++++++-------- .../dhp/oa/graph/minidump/MiniDumpTest.scala | 10 ++--- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/minidump/SparkCreateMiniDumpGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/minidump/SparkCreateMiniDumpGraph.scala index 441adbc4c..3d0ae15b0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/minidump/SparkCreateMiniDumpGraph.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/minidump/SparkCreateMiniDumpGraph.scala @@ -31,26 +31,10 @@ class SparkCreateMiniDumpGraph(propertyPath: String, args: Array[String], log: L add("pid", ArrayType(Encoders.bean(classOf[StructuredProperty]).schema)) val typologies = List("publication", "dataset", "software", "otherresearchproduct", "project", "organization") - val emptyDataset = spark.emptyDataFrame + val pidList = spark.read.schema(pidSchema).json(pidListPath) - val filteredIds = typologies.foldLeft(emptyDataset)((res, item) => { - println(s"adding $item") - res.union( - spark.read - .schema(idWithPidSchema) - .json(s"$sourcePath/$item") - .selectExpr("explode(pid) as pids", "id") - .selectExpr("id", "pids.value as pid", "pids.qualifier.classid as pidType") - .distinct() - ) - }).distinct() - .createOrReplaceTempView("filteredIds") - - - - typologies.foreach(t => { println(s"filtering $t") val entity = spark.read.schema(idWithPidSchema).json(s"$sourcePath/$t") @@ -72,6 +56,30 @@ class SparkCreateMiniDumpGraph(propertyPath: String, args: Array[String], log: L }) + val emptyDataset = spark.createDataset(Seq.empty[String]).toDF("id") + + typologies.foldLeft(emptyDataset)((res, item) => { + println(s"adding $item") + res.union( + spark.read + .schema(idSchema) + .json(s"$sourcePath/$item") + .selectExpr("id") + .distinct() + ) + }).distinct() + .write.mode(SaveMode.Overwrite).save(s"$targetPath/usedIds") + + + val filteredIds = spark.read.load(s"$targetPath/usedIds") + val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(s"$sourcePath/relation") + val filteredRelations = relations.join(filteredIds, relations("source") === filteredIds("id") || relations("target") === filteredIds("id")) + + filteredRelations + .write + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(s"$targetPath/relation") } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/minidump/MiniDumpTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/minidump/MiniDumpTest.scala index e54e869a5..4804a646e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/minidump/MiniDumpTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/minidump/MiniDumpTest.scala @@ -7,11 +7,11 @@ class MiniDumpTest { @Test def testMiniDumpGeneration(): Unit = { -// val spark = SparkSession.builder().appName("MiniDumpTest").master("local[*]").getOrCreate() -// -// val sparkCreateMiniDumpGraph = new SparkCreateMiniDumpGraph("src/test/resources/application.properties", Array(), null) -// -// sparkCreateMiniDumpGraph.generateMiniDump(spark, "/home/sandro/OGraph/05_graph_inferred", "/home/sandro/OGraph/pid_json", "/home/sandro/OGraph/minidump") + val spark = SparkSession.builder().appName("MiniDumpTest").master("local[*]").getOrCreate() + + val sparkCreateMiniDumpGraph = new SparkCreateMiniDumpGraph("src/test/resources/application.properties", Array(), null) + + sparkCreateMiniDumpGraph.generateMiniDump(spark, "/home/sandro/OGraph/05_graph_inferred", "/home/sandro/OGraph/pid_json", "/home/sandro/OGraph/minidump") }