wip: first implementation of minidump generation

This commit is contained in:
sandro.labruzzo 2024-12-13 16:55:43 +01:00
parent 82f92f23dd
commit e6e84d2f1d
2 changed files with 62 additions and 19 deletions

View File

@ -1,9 +1,9 @@
package eu.dnetlib.dhp.oa.graph.minidump
import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.oaf.{Relation, StructuredProperty}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, from_json, monotonically_increasing_id, row_number}
import org.apache.spark.sql.functions.{col, from_json, monotonically_increasing_id, row_number, size}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
@ -19,31 +19,56 @@ class SparkCreateMiniDumpGraph(propertyPath: String, args: Array[String], log: L
log.info("sourcePath: {}", sourcePath)
val targetPath = parser.get("targetPath")
log.info("targetPath: {}", targetPath)
generateMiniDump(spark, sourcePath, targetPath)
// generateMiniDump(spark, sourcePath, targetPath)
}
private def generateMiniDump(spark: SparkSession, sourcePath: String, targetPath: String): Unit = {
def generateMiniDump(spark: SparkSession, sourcePath: String, pidListPath: String, targetPath:String): Unit = {
import spark.implicits._
val relation = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(s"$sourcePath/relation")
val strategy = Window.partitionBy("relClass").orderBy(monotonically_increasing_id())
var relationSample = relation.withColumn("row_number", row_number().over(strategy))
.where("row_number <= 1000")
.drop("row_number")
relationSample.write.mode(SaveMode.Overwrite).option("compression", "gzip").json(s"$targetPath/relation")
val typology = List("publication", "dataset", "software", "otherresearchproduct", "project", "organization")
relationSample= spark.read.json(s"$targetPath/relation")
val relId = relationSample.selectExpr("source as id").union(relationSample.selectExpr("target as id")).distinct()
val pidSchema = new StructType().add("pid", StringType).add("pidType", StringType).add("type", StringType)
val idSchema = new StructType().add("id", StringType)
typology.foreach(item => {
val entity = spark.read.json(s"$sourcePath/$item")
val resultWithId=entity.withColumn("jsonData",from_json(col("value"),idSchema)).selectExpr("jsonData.id as id", "value")
resultWithId.join(relId, resultWithId("id") === relId("id"), "leftSemi")
val idWithPidSchema = new StructType()
.add("id", StringType).
add("pid", ArrayType(Encoders.bean(classOf[StructuredProperty]).schema))
val typologies = List("publication", "dataset", "software", "otherresearchproduct", "project", "organization")
val emptyDataset = spark.emptyDataFrame
val pidList = spark.read.schema(pidSchema).json(pidListPath)
val filteredIds = typologies.foldLeft(emptyDataset)((res, item) => {
println(s"adding $item")
res.union(
spark.read
.schema(idWithPidSchema)
.json(s"$sourcePath/$item")
.selectExpr("explode(pid) as pids", "id")
.selectExpr("id", "pids.value as pid", "pids.qualifier.classid as pidType")
.distinct()
)
}).distinct()
.createOrReplaceTempView("filteredIds")
typologies.foreach(t => {
println(s"filtering $t")
val entity = spark.read.schema(idWithPidSchema).json(s"$sourcePath/$t")
.selectExpr("explode(pid) as pids", "id")
.selectExpr("id", "pids.value as pid", "pids.qualifier.classid as pidType")
.distinct()
val filerId = entity.join(pidList, pidList("pid") === entity("pid"))
.select("id").distinct()
val currentEntity = spark.read.text(s"$sourcePath/$t")
val resultWithId=currentEntity.withColumn("jsonData",from_json(col("value"),idSchema)).selectExpr("jsonData.id as id", "value")
resultWithId.join(filerId, resultWithId("id") === filerId("id"))
.select("value")
.repartition(20)
.repartition(10)
.write
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(s"$targetPath/$item")
.text(s"$targetPath/$t")
})

View File

@ -0,0 +1,18 @@
package eu.dnetlib.dhp.oa.graph.minidump
import org.apache.spark.sql.SparkSession
import org.junit.jupiter.api.Test
class MiniDumpTest {
@Test
def testMiniDumpGeneration(): Unit = {
// val spark = SparkSession.builder().appName("MiniDumpTest").master("local[*]").getOrCreate()
//
// val sparkCreateMiniDumpGraph = new SparkCreateMiniDumpGraph("src/test/resources/application.properties", Array(), null)
//
// sparkCreateMiniDumpGraph.generateMiniDump(spark, "/home/sandro/OGraph/05_graph_inferred", "/home/sandro/OGraph/pid_json", "/home/sandro/OGraph/minidump")
}
}