diff --git a/dhp-common/src/main/resources/eu/dnetlib/scholexplorer/relation/relations.json b/dhp-common/src/main/resources/eu/dnetlib/scholexplorer/relation/relations.json index 98e8daa18c..4f0cee53d7 100644 --- a/dhp-common/src/main/resources/eu/dnetlib/scholexplorer/relation/relations.json +++ b/dhp-common/src/main/resources/eu/dnetlib/scholexplorer/relation/relations.json @@ -154,5 +154,13 @@ "unknown":{ "original":"Unknown", "inverse":"Unknown" + }, + "isamongtopnsimilardocuments": { + "original": "IsAmongTopNSimilarDocuments", + "inverse": "HasAmongTopNSimilarDocuments" + }, + "hasamongtopnsimilardocuments": { + "original": "HasAmongTopNSimilarDocuments", + "inverse": "IsAmongTopNSimilarDocuments" } } \ No newline at end of file diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index a995016a8d..f256ca1a12 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -65,7 +65,11 @@ object ScholixUtils extends Serializable { } def generateScholixResourceFromResult(r: Result): ScholixResource = { - generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r)) + val sum = ScholixUtils.resultToSummary(r) + if (sum != null) + generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r)) + else + null } val statsAggregator: Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] = @@ -153,6 +157,14 @@ object ScholixUtils extends Serializable { } + def invRel(rel: String): String = { + val semanticRelation = relations.getOrElse(rel.toLowerCase, null) + if (semanticRelation != null) + semanticRelation.inverse + else + null + } + def extractCollectedFrom(summary: ScholixResource): List[ScholixEntityId] = { if (summary.getCollectedFrom != null && !summary.getCollectedFrom.isEmpty) { val l: List[ScholixEntityId] = summary.getCollectedFrom.asScala.map { d => @@ -377,10 +389,7 @@ object ScholixUtils extends Serializable { if (persistentIdentifiers.isEmpty) return null s.setLocalIdentifier(persistentIdentifiers.asJava) - if (r.isInstanceOf[Publication]) - s.setTypology(Typology.publication) - else - s.setTypology(Typology.dataset) + s.setTypology(r.getResulttype.getClassid) s.setSubType(r.getInstance().get(0).getInstancetype.getClassname) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/create_scholix_dump_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/create_scholix_dump_params.json new file mode 100644 index 0000000000..fead58ab1c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/create_scholix_dump_params.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the scholix dump", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/relation/relations.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/relation/relations.json new file mode 100644 index 0000000000..4f0cee53d7 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/relation/relations.json @@ -0,0 +1,166 @@ +{ + "cites":{ + "original":"Cites", + "inverse":"IsCitedBy" + }, + "compiles":{ + "original":"Compiles", + "inverse":"IsCompiledBy" + }, + "continues":{ + "original":"Continues", + "inverse":"IsContinuedBy" + }, + "derives":{ + "original":"IsSourceOf", + "inverse":"IsDerivedFrom" + }, + "describes":{ + "original":"Describes", + "inverse":"IsDescribedBy" + }, + "documents":{ + "original":"Documents", + "inverse":"IsDocumentedBy" + }, + "hasmetadata":{ + "original":"HasMetadata", + "inverse":"IsMetadataOf" + }, + "hasassociationwith":{ + "original":"HasAssociationWith", + "inverse":"HasAssociationWith" + }, + "haspart":{ + "original":"HasPart", + "inverse":"IsPartOf" + }, + "hasversion":{ + "original":"HasVersion", + "inverse":"IsVersionOf" + }, + "iscitedby":{ + "original":"IsCitedBy", + "inverse":"Cites" + }, + "iscompiledby":{ + "original":"IsCompiledBy", + "inverse":"Compiles" + }, + "iscontinuedby":{ + "original":"IsContinuedBy", + "inverse":"Continues" + }, + "isderivedfrom":{ + "original":"IsDerivedFrom", + "inverse":"IsSourceOf" + }, + "isdescribedby":{ + "original":"IsDescribedBy", + "inverse":"Describes" + }, + "isdocumentedby":{ + "original":"IsDocumentedBy", + "inverse":"Documents" + }, + "isidenticalto":{ + "original":"IsIdenticalTo", + "inverse":"IsIdenticalTo" + }, + "ismetadatafor":{ + "original":"IsMetadataFor", + "inverse":"IsMetadataOf" + }, + "ismetadataof":{ + "original":"IsMetadataOf", + "inverse":"IsMetadataFor" + }, + "isnewversionof":{ + "original":"IsNewVersionOf", + "inverse":"IsPreviousVersionOf" + }, + "isobsoletedby":{ + "original":"IsObsoletedBy", + "inverse":"Obsoletes" + }, + "isoriginalformof":{ + "original":"IsOriginalFormOf", + "inverse":"IsVariantFormOf" + }, + "ispartof":{ + "original":"IsPartOf", + "inverse":"HasPart" + }, + "ispreviousversionof":{ + "original":"IsPreviousVersionOf", + "inverse":"IsNewVersionOf" + }, + "isreferencedby":{ + "original":"IsReferencedBy", + "inverse":"References" + }, + "isrelatedto":{ + "original":"IsRelatedTo", + "inverse":"IsRelatedTo" + }, + "isrequiredby":{ + "original":"IsRequiredBy", + "inverse":"Requires" + }, + "isreviewedby":{ + "original":"IsReviewedBy", + "inverse":"Reviews" + }, + "issourceof":{ + "original":"IsSourceOf", + "inverse":"IsDerivedFrom" + }, + "issupplementedby":{ + "original":"IsSupplementedBy", + "inverse":"IsSupplementTo" + }, + "issupplementto":{ + "original":"IsSupplementTo", + "inverse":"IsSupplementedBy" + }, + "isvariantformof":{ + "original":"IsVariantFormOf", + "inverse":"IsOriginalFormOf" + }, + "isversionof":{ + "original":"IsVersionOf", + "inverse":"HasVersion" + }, + "obsoletes":{ + "original":"Obsoletes", + "inverse":"IsObsoletedBy" + }, + "references":{ + "original":"References", + "inverse":"IsReferencedBy" + }, + "requires":{ + "original":"Requires", + "inverse":"IsRequiredBy" + }, + "related":{ + "original":"IsRelatedTo", + "inverse":"IsRelatedTo" + }, + "reviews":{ + "original":"Reviews", + "inverse":"IsReviewedBy" + }, + "unknown":{ + "original":"Unknown", + "inverse":"Unknown" + }, + "isamongtopnsimilardocuments": { + "original": "IsAmongTopNSimilarDocuments", + "inverse": "HasAmongTopNSimilarDocuments" + }, + "hasamongtopnsimilardocuments": { + "original": "HasAmongTopNSimilarDocuments", + "inverse": "IsAmongTopNSimilarDocuments" + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala new file mode 100644 index 0000000000..95564d5236 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala @@ -0,0 +1,256 @@ +package eu.dnetlib.dhp.sx.graph + +import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty} +import eu.dnetlib.dhp.schema.sx.scholix.{ + Scholix, + ScholixCollectedFrom, + ScholixEntityId, + ScholixIdentifier, + ScholixRelationship, + ScholixResource +} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse + +import scala.collection.JavaConverters._ +import scala.io.Source + +case class RelationInfo( + source: String, + target: String, + relclass: String, + id: String, + collectedfrom: Seq[RelKeyValue] +) {} +case class RelKeyValue(key: String, value: String) {} + +object ScholexplorerUtils { + + val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier" + + case class RelationVocabulary(original: String, inverse: String) {} + + val relations: Map[String, RelationVocabulary] = { + val input = Source + .fromInputStream( + getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/relation/relations.json") + ) + .mkString + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + + lazy val json: json4s.JValue = parse(input) + + json.extract[Map[String, RelationVocabulary]] + } + + def invRel(rel: String): String = { + val semanticRelation = relations.getOrElse(rel.toLowerCase, null) + if (semanticRelation != null) + semanticRelation.inverse + else + null + } + + def generateDatasourceOpenAIREURLS(id: String): String = { + if (id != null && id.length > 12) + s"https://explore.openaire.eu/search/dataprovider?datasourceId=${id.substring(3)}" + else + null + } + + def findURLForPID( + pidValue: List[StructuredProperty], + urls: List[String] + ): List[(StructuredProperty, String)] = { + pidValue.map { p => + val pv = p.getValue + + val r = urls.find(u => u.toLowerCase.contains(pv.toLowerCase)) + (p, r.orNull) + } + } + + def extractTypedIdentifierFromInstance(r: Result): List[ScholixIdentifier] = { + if (r.getInstance() == null || r.getInstance().isEmpty) + return List() + r.getInstance() + .asScala + .filter(i => i.getUrl != null && !i.getUrl.isEmpty) + .filter(i => i.getPid != null && i.getUrl != null) + .flatMap(i => findURLForPID(i.getPid.asScala.toList, i.getUrl.asScala.toList)) + .map(i => new ScholixIdentifier(i._1.getValue, i._1.getQualifier.getClassid, i._2)) + .distinct + .toList + } + + def generateScholixResourceFromResult(result: Result): ScholixResource = { + + if (result.getInstance() == null || result.getInstance().size() == 0) + return null + + if (result.getPid == null || result.getPid.isEmpty) + return null + + val r = new ScholixResource + r.setDnetIdentifier(result.getId) + + val persistentIdentifiers: List[ScholixIdentifier] = extractTypedIdentifierFromInstance(result) + if (persistentIdentifiers.isEmpty) + return null + + r.setIdentifier(persistentIdentifiers.asJava) + + r.setObjectType(result.getResulttype.getClassid) + + r.setObjectSubType( + result + .getInstance() + .asScala + .filter(i => i != null && i.getInstancetype != null) + .map(i => i.getInstancetype.getClassname) + .distinct + .head + ) + + if (result.getTitle != null && result.getTitle.asScala.nonEmpty) { + val titles: List[String] = result.getTitle.asScala.map(t => t.getValue).toList + if (titles.nonEmpty) + r.setTitle(titles.head) + else + return null + } + if (result.getAuthor != null && !result.getAuthor.isEmpty) { + val authors: List[ScholixEntityId] = + result.getAuthor.asScala + .map(a => { + val entity = new ScholixEntityId() + entity.setName(a.getFullname) + if (a.getPid != null && a.getPid.size() > 0) + entity.setIdentifiers( + a.getPid.asScala + .map(sp => { + val id = new ScholixIdentifier() + id.setIdentifier(sp.getValue) + id.setSchema(sp.getQualifier.getClassid) + id + }) + .take(3) + .toList + .asJava + ) + entity + }) + .toList + if (authors.nonEmpty) + r.setCreator(authors.asJava) + + } + + val dt: List[String] = result + .getInstance() + .asScala + .filter(i => i.getDateofacceptance != null) + .map(i => i.getDateofacceptance.getValue) + .toList + if (dt.nonEmpty) + r.setPublicationDate(dt.distinct.head) + + r.setPublisher( + result + .getInstance() + .asScala + .map(i => i.getHostedby) + .filter(h => !"unknown".equalsIgnoreCase(h.getValue)) + .map(h => { + val eid = new ScholixEntityId() + eid.setName(h.getValue) + val id = new ScholixIdentifier() + id.setIdentifier(h.getKey) + id.setSchema(OPENAIRE_IDENTIFIER_SCHEMA) + id.setUrl(generateDatasourceOpenAIREURLS(h.getKey)) + eid.setIdentifiers(List(id).asJava) + eid + }) + .distinct + .asJava + ) + + r.setCollectedFrom( + result.getCollectedfrom.asScala + .map(cf => { + val scf = new ScholixCollectedFrom() + scf.setProvisionMode("collected") + scf.setCompletionStatus("complete") + val eid = new ScholixEntityId() + eid.setName(cf.getValue) + val id = new ScholixIdentifier() + id.setIdentifier(cf.getKey) + id.setSchema(OPENAIRE_IDENTIFIER_SCHEMA) + id.setUrl(generateDatasourceOpenAIREURLS(cf.getKey)) + eid.setIdentifiers(List(id).asJava) + scf.setProvider(eid) + scf + }) + .asJava + ) + + r + } + + def generateScholix(relation: RelationInfo, source: ScholixResource): Scholix = { + val s: Scholix = new Scholix + s.setSource(source) + if (relation.collectedfrom != null && relation.collectedfrom.nonEmpty) + s.setLinkprovider( + relation.collectedfrom + .map(cf => { + val eid = new ScholixEntityId() + eid.setName(cf.value) + val id = new ScholixIdentifier() + id.setIdentifier(cf.key) + id.setSchema(OPENAIRE_IDENTIFIER_SCHEMA) + id.setUrl(generateDatasourceOpenAIREURLS(cf.key)) + eid.setIdentifiers(List(id).asJava) + eid + }) + .toList + .asJava + ) + else { + val eid = new ScholixEntityId() + eid.setName("OpenAIRE") + val id = new ScholixIdentifier() + id.setIdentifier("10|infrastruct_::f66f1bd369679b5b077dcdf006089556") + id.setSchema(OPENAIRE_IDENTIFIER_SCHEMA) + id.setUrl(generateDatasourceOpenAIREURLS(id.getIdentifier)) + eid.setIdentifiers(List(id).asJava) + s.setLinkprovider(List(eid).asJava) + } + s.setIdentifier(relation.id) + val semanticRelation = relations.getOrElse(relation.relclass.toLowerCase, null) + if (semanticRelation == null) + return null + s.setRelationship( + new ScholixRelationship(semanticRelation.original, "datacite", semanticRelation.inverse) + ) + s.setPublicationDate(source.getPublicationDate) + s.setPublisher(source.getPublisher) + val mockTarget = new ScholixResource + mockTarget.setDnetIdentifier(relation.target) + s.setTarget(mockTarget) + s + } + + def updateTarget(s: Scholix, t: ScholixResource): Scholix = { + + s.setTarget(t) + val spublishers: Seq[ScholixEntityId] = + if (s.getPublisher != null && !s.getPublisher.isEmpty) s.getPublisher.asScala else List() + val tpublishers: Seq[ScholixEntityId] = + if (t.getPublisher != null && !t.getPublisher.isEmpty) t.getPublisher.asScala else List() + val mergedPublishers = spublishers.union(tpublishers).distinct.take(10).toList + s.setPublisher(mergedPublishers.asJava) + s + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala new file mode 100644 index 0000000000..9334fc6e03 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala @@ -0,0 +1,130 @@ +package eu.dnetlib.dhp.sx.graph + +import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.schema.oaf.{ + KeyValue, + OtherResearchProduct, + Publication, + Relation, + Result, + Software, + Dataset => OafDataset +} +import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource} +import org.apache.spark.sql.functions.{col, concat, expr, md5} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql._ +import org.slf4j.{Logger, LoggerFactory} + +class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], log: Logger) + extends AbstractScalaApplication(propertyPath, args, log: Logger) { + + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val sourcePath = parser.get("sourcePath") + log.info("sourcePath: {}", sourcePath) + val targetPath = parser.get("targetPath") + log.info("targetPath: {}", targetPath) + generateBidirectionalRelations(sourcePath, targetPath, spark) + generateScholixResource(sourcePath, targetPath, spark) + generateScholix(targetPath, spark) + } + + def generateScholixResource(inputPath: String, outputPath: String, spark: SparkSession): Unit = { + val entityMap: Map[String, StructType] = Map( + "publication" -> Encoders.bean(classOf[Publication]).schema, + "dataset" -> Encoders.bean(classOf[OafDataset]).schema, + "software" -> Encoders.bean(classOf[Software]).schema, + "otherresearchproduct" -> Encoders.bean(classOf[OtherResearchProduct]).schema + ) + + implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource]) + implicit val resultEncoder: Encoder[Result] = Encoders.bean(classOf[Result]) + + val resDs = spark.emptyDataset[ScholixResource] + val scholixResourceDS = entityMap.foldLeft[Dataset[ScholixResource]](resDs)((res, item) => { + println(s"adding ${item._1}") + res.union( + spark.read + .schema(item._2) + .json(s"$inputPath/${item._1}") + .as[Result] + .map(r => ScholexplorerUtils.generateScholixResourceFromResult(r)) + .filter(s => s != null) + ) + }) + scholixResourceDS.write.mode(SaveMode.Overwrite).save(s"$outputPath/resource") + } + + def generateBidirectionalRelations(inputPath: String, otuputPath: String, spark: SparkSession): Unit = { + val relSchema = Encoders.bean(classOf[Relation]).schema + + val relDF = spark.read + .schema(relSchema) + .json(s"$inputPath/relation") + .where( + "datainfo.deletedbyinference is false and source like '50%' and target like '50%' " + + "and relClass <> 'merges' and relClass <> 'isMergedIn'" + ) + .select("source", "target", "collectedfrom", "relClass") + + def invRel: String => String = { s => + ScholexplorerUtils.invRel(s) + } + + import org.apache.spark.sql.functions.udf + val inverseRelationUDF = udf(invRel) + val inverseRelation = relDF.select( + col("target").alias("source"), + col("source").alias("target"), + col("collectedfrom"), + inverseRelationUDF(col("relClass")).alias("relClass") + ) + + val bidRel = inverseRelation + .union(relDF) + .withColumn("id", md5(concat(col("source"), col("relClass"), col("target")))) + .withColumn("cf", expr("transform(collectedfrom, x -> struct(x.key, x.value))")) + .drop("collectedfrom") + .withColumnRenamed("cf", "collectedfrom") + .distinct() + + bidRel.write.mode(SaveMode.Overwrite).save(s"$otuputPath/relation") + + } + + def generateScholix(outputPath: String, spark: SparkSession): Unit = { + implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource]) + implicit val scholixEncoder: Encoder[Scholix] = Encoders.bean(classOf[Scholix]) + + import spark.implicits._ + val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo] + val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource] + + val scholix_one_verse = relations + .joinWith(resource, relations("source") === resource("dnetIdentifier"), "inner") + .map(res => ScholexplorerUtils.generateScholix(res._1, res._2)) + + scholix_one_verse + .joinWith(resource, scholix_one_verse("target.dnetIdentifier") === resource("dnetIdentifier"), "inner") + .map(k => ScholexplorerUtils.updateTarget(k._1, k._2)) + .write + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(s"$outputPath/scholix") + } +} + +object SparkCreateScholexplorerDump { + val logger: Logger = LoggerFactory.getLogger(SparkCreateScholexplorerDump.getClass) + + def main(args: Array[String]): Unit = { + new SparkCreateScholexplorerDump( + log = logger, + args = args, + propertyPath = "/eu/dnetlib/dhp/sx/create_scholix_dump_params.json" + ).initialize().run() + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala new file mode 100644 index 0000000000..0a2872cb48 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala @@ -0,0 +1,17 @@ +package eu.dnetlib.dhp.sx.graph.scholix + +import eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump +import org.apache.spark.sql.SparkSession +import org.junit.jupiter.api.Test + +class ScholixGenerationTest { + + @Test + def generateScholix(): Unit = { + val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() + val app = new SparkCreateScholexplorerDump(null, null, null) +// app.generateScholixResource("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark) +// app.generateBidirectionalRelations("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark) + app.generateScholix("/home/sandro/Downloads/scholix/", spark) + } +} diff --git a/pom.xml b/pom.xml index d3db1d3d47..9f6f1f2a90 100644 --- a/pom.xml +++ b/pom.xml @@ -960,7 +960,7 @@ 1.1.3 1.7 1.0.7 - [6.1.1] + [6.1.2-SNAPSHOT] cdh5.9.2 3.5 11.0.2