diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala index d171d96d9..9d99463c9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.graph import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty} +import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat import eu.dnetlib.dhp.schema.sx.scholix.{ Scholix, ScholixCollectedFrom, @@ -10,6 +11,7 @@ import eu.dnetlib.dhp.schema.sx.scholix.{ ScholixRelationship, ScholixResource } +import org.apache.logging.log4j.core.appender.ConsoleAppender.Target import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse @@ -26,6 +28,16 @@ case class RelationInfo( ) {} case class RelKeyValue(key: String, value: String) {} +case class SummaryResource( + id: String, + typology: String, + subType: String, + pids: List[String], + pidTypes: List[String], + publishers: List[String], + date: String +) {} + object ScholexplorerUtils { val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier" @@ -86,6 +98,99 @@ object ScholexplorerUtils { .toList } + def generateSummaryResource(input: ScholixResource): SummaryResource = { + val distinctIds = if (input.getIdentifier != null) { + input.getIdentifier.asScala.map(i => i.getIdentifier).distinct.toList + } else List() + val distinctTypes = if (input.getIdentifier != null) { + input.getIdentifier.asScala.map(i => i.getSchema).distinct.toList + } else List() + val distinctPublishers = if (input.getPublisher != null) { + input.getPublisher.asScala.map(i => i.getName).distinct.sorted.take(5).toList + } else List() + SummaryResource( + id = input.getDnetIdentifier, + typology = input.getObjectType, + subType = input.getObjectSubType, + pids = distinctIds, + pidTypes = distinctTypes, + publishers = distinctPublishers, + date = input.getPublicationDate + ) + } + + def generateScholixFlat(relation: RelationInfo, summary: SummaryResource, updateSource: Boolean): ScholixFlat = { + val scholix = new ScholixFlat + scholix.setIdentifier(relation.id) + if (relation.collectedfrom != null && relation.collectedfrom.nonEmpty) + scholix.setLinkProviders( + relation.collectedfrom + .map(cf => { + cf.value + }) + .distinct + .sorted + .take(5) + .toList + .asJava + ) + else { + scholix.setLinkProviders(List("OpenAIRE").asJava) + } + val semanticRelation = relations.getOrElse(relation.relclass.toLowerCase, null) + if (semanticRelation == null) + return null + + scholix.setRelationType(semanticRelation.original) + scholix.setPublicationDate(summary.date) + if (updateSource) { + if (summary.pids.isEmpty) + return null + scholix.setSourceId(summary.id) + scholix.setSourcePid(summary.pids.asJava) + scholix.setSourcePidType(summary.pidTypes.asJava) + scholix.setSourceType(summary.typology) + scholix.setSourceSubType(summary.subType) + if (summary.publishers.nonEmpty) { + scholix.setSourcePublisher(summary.publishers.asJava) + } + } else { + if (summary.pids.isEmpty) + return null + scholix.setTargetId(summary.id) + scholix.setTargetPid(summary.pids.asJava) + scholix.setTargetPidType(summary.pidTypes.asJava) + scholix.setTargetType(summary.typology) + scholix.setTargetSubType(summary.subType) + if (summary.publishers.nonEmpty) { + scholix.setTargetPublisher(summary.publishers.asJava) + } + } + scholix + } + + def mergeScholixFlat(source: ScholixFlat, target: ScholixFlat): ScholixFlat = { + if (source.getPublicationDate == null) { + source.setPublicationDate(target.getPublicationDate) + } + + source.setTargetId(target.getTargetId) + source.setTargetPid(target.getTargetPid) + source.setTargetPidType(target.getTargetPidType) + source.setTargetType(target.getTargetType) + source.setTargetSubType(target.getTargetSubType) + + if (source.getLinkProviders != null) + source.setTargetPublisher(target.getTargetPublisher) + else if (source.getLinkProviders != null && target.getLinkProviders != null) { + source.setLinkProviders( + source.getLinkProviders.asScala.union(target.getLinkProviders.asScala).sorted.distinct.take(5).asJava + ) + } + + source + } + def generateScholixResourceFromResult(result: Result): ScholixResource = { if (result.getInstance() == null || result.getInstance().size() == 0) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala index dd420ab95..102b74c22 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala @@ -1,15 +1,8 @@ package eu.dnetlib.dhp.sx.graph import eu.dnetlib.dhp.application.AbstractScalaApplication -import eu.dnetlib.dhp.schema.oaf.{ - KeyValue, - OtherResearchProduct, - Publication, - Relation, - Result, - Software, - Dataset => OafDataset -} +import eu.dnetlib.dhp.schema.oaf.{KeyValue, OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource} import org.apache.spark.sql.functions.{col, concat, expr, first, md5} import org.apache.spark.sql.types.StructType @@ -101,6 +94,31 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo } + def generateFlatScholix(outputPath: String, spark: SparkSession): Unit = { + import spark.implicits._ + implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource]) + implicit val scholixEncoder: Encoder[ScholixFlat] = Encoders.bean(classOf[ScholixFlat]) + val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo] + val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource] + + val summaries =resource.map(s=> ScholexplorerUtils.generateSummaryResource(s)) + + val scholix_source =relations + .joinWith(summaries, relations("source")=== summaries("id")) + .map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, true)) + + val scholix_target =relations.joinWith(summaries, relations("target")=== summaries("id")) + .map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, false)) + + scholix_source + .joinWith(scholix_target, scholix_source("identifier")===scholix_target("identifier"), "inner") + .map(s =>ScholexplorerUtils.mergeScholixFlat(s._1, s._2)) + .write + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(s"$outputPath/scholix") + } + def generateScholix(outputPath: String, spark: SparkSession): Unit = { implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource]) implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo(classOf[Scholix]) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala index 204fe9794..c451dd9db 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala @@ -14,13 +14,15 @@ class ScholixGenerationTest { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val app = new SparkCreateScholexplorerDump(null, null, null) -// app.generateScholixResource("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark) -// app.generateBidirectionalRelations( -// "/home/sandro/Downloads/scholix_sample/", -// "/home/sandro/Downloads/scholix/", -// spark -// ) - app.generateScholix("/home/sandro/Downloads/scholix/", spark) + + val basePath = "/Users/sandro/Downloads" + app.generateScholixResource(s"$basePath/scholix_sample/", s"$basePath/scholix/", spark) + app.generateBidirectionalRelations( + s"$basePath/scholix_sample/", + s"$basePath/scholix/", + spark + ) + app.generateFlatScholix(s"$basePath/scholix/", spark) } } diff --git a/pom.xml b/pom.xml index cc8d509f7..79c73e8cb 100644 --- a/pom.xml +++ b/pom.xml @@ -960,7 +960,7 @@ 1.1.3 1.7 1.0.7 - [6.1.2] + [6.1.3-FLAT-SCHOLIX] cdh5.9.2 3.5 11.0.2