From b881ee5ef8b0f12dfaa25e448e34b1f0e6853c54 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 15 Dec 2021 11:25:23 +0100 Subject: [PATCH] [scholexplorer] - implemented generation of scholix of delta update of datacite --- .../dhp/sx/graph/scholix/ScholixUtils.scala | 57 +++- .../sx/graph/SparkRetrieveDataciteDelta.scala | 291 ++++++++++++++++++ .../sx/graph/SparkRetrieveDataciteDelta.scala | 54 ---- .../sx/graph/RetrieveDataciteDeltaTest.scala | 26 ++ 4 files changed, 364 insertions(+), 64 deletions(-) rename {dhp-workflows/dhp-graph-mapper => dhp-common}/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala (88%) create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala similarity index 88% rename from dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala rename to dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index bf81a26d4..f8010bbd5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -53,8 +53,6 @@ object ScholixUtils { else { summary.getDate.get(0) } - - } def inverseRelationShip(rel: ScholixRelationship): ScholixRelationship = { @@ -68,7 +66,6 @@ object ScholixUtils { override def zero: RelatedEntities = null override def reduce(b: RelatedEntities, a: (String, String, Long)): RelatedEntities = { - val id = a._1 val relatedDataset = if ("dataset".equalsIgnoreCase(a._2)) a._3 else 0 val relatedPublication = if ("publication".equalsIgnoreCase(a._2)) a._3 else 0 @@ -142,6 +139,14 @@ object ScholixUtils { } + def extractCollectedFrom(summary: ScholixResource): List[ScholixEntityId] = { + if (summary.getCollectedFrom != null && !summary.getCollectedFrom.isEmpty) { + val l: List[ScholixEntityId] = summary.getCollectedFrom.asScala.map { + d => new ScholixEntityId(d.getProvider.getName, d.getProvider.getIdentifiers) + }(collection.breakOut) + l + } else List() + } def extractCollectedFrom(summary: ScholixSummary): List[ScholixEntityId] = { if (summary.getDatasources != null && !summary.getDatasources.isEmpty) { @@ -160,7 +165,7 @@ object ScholixUtils { c => new ScholixEntityId(c.getValue, List(new ScholixIdentifier(c.getKey, DNET_IDENTIFIER_SCHEMA, null)).asJava) - }(collection breakOut) + }.toList l } else List() } @@ -222,6 +227,38 @@ object ScholixUtils { } + + def scholixFromSource(relation: Relation, source: ScholixResource):Scholix = { + if (relation == null || source == null) + return null + val s = new Scholix + var l: List[ScholixEntityId] = extractCollectedFrom(relation) + if (l.isEmpty) + l = extractCollectedFrom(source) + if (l.isEmpty) + return null + s.setLinkprovider(l.asJava) + var d = extractRelationDate(relation) + if (d == null) + d = source.getPublicationDate + + s.setPublicationDate(d) + + + if (source.getPublisher != null && !source.getPublisher.isEmpty) { + s.setPublisher(source.getPublisher) + } + + val semanticRelation = relations.getOrElse(relation.getRelClass.toLowerCase, null) + if (semanticRelation == null) + return null + s.setRelationship(new ScholixRelationship(semanticRelation.original, "datacite", semanticRelation.inverse)) + s.setSource(source) + + s + } + + def scholixFromSource(relation: Relation, source: ScholixSummary): Scholix = { if (relation == null || source == null) @@ -303,7 +340,7 @@ object ScholixUtils { s.setSubType(r.getInstance().get(0).getInstancetype.getClassname) if (r.getTitle != null && r.getTitle.asScala.nonEmpty) { - val titles: List[String] = r.getTitle.asScala.map(t => t.getValue)(collection breakOut) + val titles: List[String] = r.getTitle.asScala.map(t => t.getValue).toList if (titles.nonEmpty) s.setTitle(titles.asJava) else @@ -311,12 +348,12 @@ object ScholixUtils { } if (r.getAuthor != null && !r.getAuthor.isEmpty) { - val authors: List[String] = r.getAuthor.asScala.map(a => a.getFullname)(collection breakOut) - if (authors nonEmpty) + val authors: List[String] = r.getAuthor.asScala.map(a => a.getFullname).toList + if (authors.nonEmpty) s.setAuthor(authors.asJava) } if (r.getInstance() != null) { - val dt: List[String] = r.getInstance().asScala.filter(i => i.getDateofacceptance != null).map(i => i.getDateofacceptance.getValue)(collection.breakOut) + val dt: List[String] = r.getInstance().asScala.filter(i => i.getDateofacceptance != null).map(i => i.getDateofacceptance.getValue).toList if (dt.nonEmpty) s.setDate(dt.distinct.asJava) } @@ -327,7 +364,7 @@ object ScholixUtils { } if (r.getSubject != null && !r.getSubject.isEmpty) { - val subjects: List[SchemeValue] = r.getSubject.asScala.map(s => new SchemeValue(s.getQualifier.getClassname, s.getValue))(collection breakOut) + val subjects: List[SchemeValue] = r.getSubject.asScala.map(s => new SchemeValue(s.getQualifier.getClassname, s.getValue)).toList if (subjects.nonEmpty) s.setSubject(subjects.asJava) } @@ -336,7 +373,7 @@ object ScholixUtils { s.setPublisher(List(r.getPublisher.getValue).asJava) if (r.getCollectedfrom != null && !r.getCollectedfrom.isEmpty) { - val cf: List[CollectedFromType] = r.getCollectedfrom.asScala.map(c => new CollectedFromType(c.getValue, c.getKey, "complete"))(collection breakOut) + val cf: List[CollectedFromType] = r.getCollectedfrom.asScala.map(c => new CollectedFromType(c.getValue, c.getKey, "complete")).toList if (cf.nonEmpty) s.setDatasources(cf.distinct.asJava) } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala new file mode 100644 index 000000000..7f37829ee --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala @@ -0,0 +1,291 @@ +package eu.dnetlib.dhp.sx.graph + +import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup +import eu.dnetlib.dhp.datacite.{DataciteToOAFTransformation, DataciteType} +import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result} +import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource} +import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary +import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils +import eu.dnetlib.dhp.utils.{DHPUtils, ISLookupClientFactory} +import org.apache.spark.sql.functions.max +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import java.text.SimpleDateFormat + +class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) { + + val ISO_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ssZ" + val simpleFormatter = new SimpleDateFormat(ISO_DATE_PATTERN) + + val SCHOLIX_RESOURCE_PATH_NAME = "scholixResource" + val DATACITE_OAF_PATH_NAME = "dataciteOAFUpdate" + val PID_MAP_PATH_NAME = "pidMap" + val RESOLVED_REL_PATH_NAME ="resolvedRelation" + val SCHOLIX_PATH_NAME = "scholix" + + + def scholixResourcePath(workingPath:String) = s"$workingPath/$SCHOLIX_RESOURCE_PATH_NAME" + def dataciteOAFPath(workingPath:String) = s"$workingPath/$DATACITE_OAF_PATH_NAME" + def pidMapPath(workingPath:String) = s"$workingPath/$PID_MAP_PATH_NAME" + def resolvedRelationPath(workingPath:String) = s"$workingPath/$RESOLVED_REL_PATH_NAME" + def scholixPath(workingPath:String) = s"$workingPath/$SCHOLIX_PATH_NAME" + + + /** + * Utility to parse Date in ISO8601 to epochMillis + * @param inputDate The String represents an input date in ISO8601 + * @return The relative epochMillis of parsed date + */ + def ISO8601toEpochMillis(inputDate:String):Long = { + simpleFormatter.parse(inputDate).getTime + } + + + /** + * This method tries to retrieve the last collection date from all datacite + * records in HDFS. + * This method should be called before indexing scholexplorer to retrieve + * the delta of Datacite record to download, since from the generation of + * raw graph to the generation of Scholexplorer sometimes it takes 20 days + * @param spark + * @param entitiesPath + * @return the last collection date from the current scholexplorer Graph of the datacite records + */ + def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):Long = { + log.info("Retrieve last entities collected From") + + implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] + import spark.implicits._ + + val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Result] + + val date = entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0) + + ISO8601toEpochMillis(date) + } + + + /** + * The method of update Datacite relationships on Scholexplorer + * needs some utilities data structures + * One is the scholixResource DS that stores all the nodes in the Scholix Graph + * in format (dnetID, ScholixResource ) + * @param summaryPath the path of the summary in Scholix + * @param workingPath the working path + * @param spark the spark session + */ + def generateScholixResource(summaryPath:String, workingPath: String, spark:SparkSession) :Unit = { + implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] + implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] + + log.info("Convert All summary to ScholixResource") + spark.read.load(summaryPath).as[ScholixSummary] + .map(ScholixUtils.generateScholixResourceFromSummary) + .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0) + .map(r=> (r.getIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder)) + .write.mode(SaveMode.Overwrite).save(scholixResourcePath(workingPath)) + + } + + + /** + * This method get and Transform only datacite records with + * timestamp greater than timestamp + * @param datacitePath the datacite input Path + * @param timestamp the timestamp + * @param workingPath the working path where save the generated Dataset + * @param spark SparkSession + * @param vocabularies Vocabularies needed for transformation + */ + + def getDataciteUpdate(datacitePath:String, timestamp:Long, workingPath:String, spark:SparkSession,vocabularies: VocabularyGroup): Unit = { + import spark.implicits._ + val ds = spark.read.load(datacitePath).as[DataciteType] + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + ds.filter(_.timestamp>=timestamp) + .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks = true)) + .flatMap(i => fixRelations(i)).filter(i => i != null) + .write.mode(SaveMode.Overwrite).save(dataciteOAFPath(workingPath)) + + } + + + /** + * This method convert an Instance of OAF Result into + * Scholix Resource + * @param r The input Result + * @return The Scholix Resource + */ + def resultToScholixResource(r:Result):ScholixResource = { + ScholixUtils.generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r)) + } + + + /** + * After added the new ScholixResource, we need to update the scholix Pid Map + * to intersected with the new Datacite Relations + + * @param workingPath The working Path starting from save the new Map + * @param spark the spark session + */ + def generatePidMap(workingPath:String, spark:SparkSession ) :Unit = { + implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] + import spark.implicits._ + spark.read.load(scholixResourcePath(workingPath)).as[(String,ScholixResource)] + .flatMap(r=> + r._2.getIdentifier.asScala + .map(i =>DHPUtils.generateUnresolvedIdentifier(i.getIdentifier, i.getSchema)) + .map((_, r._1)) + )(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .groupByKey(_._1) + .reduceGroups((a,b) => if (a!= null && a._2!= null) a else b) + .map(_._2)(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .write.mode(SaveMode.Overwrite).save(pidMapPath(workingPath)) + } + + /** + * This method convert the new Datacite Resource into Scholix Resource + * Needed to fill the source and the type of Scholix Relationships + * @param workingPath the Working Path + * @param spark The spark Session + */ + def addMissingScholixResource(workingPath:String, spark:SparkSession ) :Unit = { + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] + implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] + + spark.read.load(dataciteOAFPath(workingPath)).as[Oaf] + .filter(_.isInstanceOf[Result]) + .map(_.asInstanceOf[Result]) + .map(resultToScholixResource) + .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0) + .map(r=> (r.getIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder)) + .write.mode(SaveMode.Append).save(scholixResourcePath(workingPath)) + } + + + /** + * This method resolve the datacite relation and filter the resolved + * relation + * @param workingPath the working path + * @param spark the spark session + */ + + def resolveUpdateRelation(workingPath:String, spark:SparkSession) :Unit = { + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation] + import spark.implicits._ + + val pidMap = spark.read.load(pidMapPath(workingPath)).as[(String,String)] + + val unresolvedRelations:Dataset[(String,Relation)] = spark.read.load(dataciteOAFPath(workingPath)).as[Oaf] + .filter(_.isInstanceOf[Relation]) + .map(_.asInstanceOf[Relation]) + .map { r => + if (r.getSource.startsWith("unresolved")) + (r.getSource, r) + else + (r.getTarget, r) + }(Encoders.tuple(Encoders.STRING, relationEncoder)) + + unresolvedRelations + .joinWith(pidMap, unresolvedRelations("_1").equalTo(pidMap("_1"))) + .map(t => { + val r =t._1._2 + val resolvedIdentifier = t._2._2 + if (r.getSource.startsWith("unresolved")) + r.setSource(resolvedIdentifier) + else + r.setTarget(resolvedIdentifier) + r + })(relationEncoder) + .filter(r => !(r.getSource.startsWith("unresolved") || r.getTarget.startsWith("unresolved") )) + .write.mode(SaveMode.Overwrite) + .save(resolvedRelationPath(workingPath)) + } + + + + /** + * This method generate scholix starting from resolved relation + * + * + * @param workingPath + * @param spark + */ + def generateScholixUpdate(workingPath:String, spark:SparkSession) :Unit = { + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val scholixEncoder:Encoder[Scholix] = Encoders.kryo[Scholix] + implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] + implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation] + import spark.implicits._ + + val relationss:Dataset[(String, Relation)] = spark.read.load(s"$workingPath/ResolvedRelation").as[Relation].map(r =>(r.getSource,r))(Encoders.tuple(Encoders.STRING, relationEncoder)) + + val id_summary:Dataset[(String,ScholixResource)] = spark.read.load(scholixResourcePath(workingPath)).as[(String,ScholixResource)] + + relationss.joinWith(id_summary, relationss("_1").equalTo(id_summary("_1")),"inner") + .map(t => (t._1._2.getTarget,ScholixUtils.scholixFromSource(t._1._2, t._2._2)))(Encoders.tuple(Encoders.STRING, scholixEncoder)) + .write.mode(SaveMode.Overwrite) + .save(s"$workingPath/scholix_one_verse") + + val source_scholix:Dataset[(String,Scholix)] = spark.read.load(s"$workingPath/scholix_one_verse").as[(String,Scholix)] + + source_scholix.joinWith(id_summary, source_scholix("_1").equalTo(id_summary("_1")),"inner") + .map(t => { + val target = t._2._2 + val scholix = t._1._2 + scholix.setTarget(target) + scholix + })(scholixEncoder).write.mode(SaveMode.Overwrite).save(s"$workingPath/scholix") + } + + + + + + /** + * Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val sourcePath = parser.get("sourcePath") + log.info(s"SourcePath is '$sourcePath'") + + val datacitePath = parser.get("datacitePath") + log.info(s"DatacitePath is '$datacitePath'") + + val workingPath = parser.get("workingPath") + log.info(s"workingPath is '$workingPath'") + + val isLookupUrl: String = parser.get("isLookupUrl") + log.info("isLookupUrl: {}", isLookupUrl) + + val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) + val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) + require(vocabularies != null) + + + val updateDS:Boolean = "true".equalsIgnoreCase(parser.get("updateDS")) + log.info(s"updateDS is '$updateDS'") + + var lastCollectionDate = 0L + if (updateDS) { + generateScholixResource(s"$sourcePath/provision/summaries", workingPath, spark) + log.info("Retrieve last entities collected From starting from scholix Graph") + lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities") + } + + getDataciteUpdate(datacitePath, lastCollectionDate, workingPath, spark, vocabularies) + addMissingScholixResource(workingPath,spark) + generatePidMap(workingPath, spark) + resolveUpdateRelation(workingPath,spark) + generateScholixUpdate(workingPath, spark) + + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala deleted file mode 100644 index 8db271941..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala +++ /dev/null @@ -1,54 +0,0 @@ -package eu.dnetlib.dhp.sx.graph - -import eu.dnetlib.dhp.application.AbstractScalaApplication -import eu.dnetlib.dhp.schema.oaf.Result -import org.apache.spark.sql.functions.max -import org.apache.spark.sql.{Encoder, Encoders, SparkSession} -import org.slf4j.Logger - -class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) { - - - def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):String = { - log.info("Retrieve last entities collected From") - - implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] - import spark.implicits._ - - val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Result] - - entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0) - - - - } - - - /** - * Here all the spark applications runs this method - * where the whole logic of the spark node is defined - */ - override def run(): Unit = { - val sourcePath = parser.get("sourcePath") - log.info(s"SourcePath is '$sourcePath'") - - val datacitePath = parser.get("datacitePath") - log.info(s"DatacitePath is '$datacitePath'") - - - log.info("Retrieve last entities collected From") - - implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] - - val lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities") - - - - - - - - - - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala new file mode 100644 index 000000000..c277b0aa1 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala @@ -0,0 +1,26 @@ +package eu.dnetlib.dhp.sx.graph +import org.junit.jupiter.api.Test + +import java.text.SimpleDateFormat + + + +class RetrieveDataciteDeltaTest { + + @Test + def testParsingDate(): Unit = { + + + val inputDate = "2021-12-02T11:17:36+0000" + + val t = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").parse(inputDate).getTime + + + println(t) + + + + } + + +}