From e5bff64f2e34355a7e9ce2ba1c3b49b517e8211e Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 14 Dec 2021 09:56:55 +0100 Subject: [PATCH 1/4] [scholexplorer] - Minor fix on SparkConvertRDDtoDataset -first implementation of retrieve datacite dump --- .../sx/graph/SparkRetrieveDataciteDelta.scala | 54 +++++++++++++++++++ .../graph/finalGraph/oozie_app/workflow.xml | 2 +- .../sx/graph/SparkConvertRDDtoDataset.scala | 28 +++++----- 3 files changed, 69 insertions(+), 15 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala new file mode 100644 index 000000000..62f53e4ad --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala @@ -0,0 +1,54 @@ +package eu.dnetlib.dhp.sx.graph + +import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.apache.spark.sql.functions.max +import org.slf4j.Logger + +class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) { + + + def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):String = { + log.info("Retrieve last entities collected From") + + implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] + import spark.implicits._ + + val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Result] + + entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0) + + + + } + + + /** + * Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val sourcePath = parser.get("sourcePath") + log.info(s"SourcePath is '$sourcePath'") + + val datacitePath = parser.get("datacitePath") + log.info(s"DatacitePath is '$datacitePath'") + + + log.info("Retrieve last entities collected From") + + implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] + + val lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities") + + + + + + + + + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml index 17996c82c..85c0d486d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml @@ -79,7 +79,7 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=20000 + --conf spark.sql.shuffle.partitions=30000 --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index 23f039c70..2115df1fd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -2,12 +2,11 @@ package eu.dnetlib.dhp.sx.graph import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Software,Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} - object SparkConvertRDDtoDataset { def main(args: Array[String]): Unit = { @@ -32,39 +31,40 @@ object SparkConvertRDDtoDataset { val entityPath = s"$t/entities" val relPath = s"$t/relation" val mapper = new ObjectMapper() - implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) - implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) - implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) - implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct]) - implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software]) + implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) + implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) + implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct]) + implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software]) log.info("Converting dataset") - val rddDataset = spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset])) + val rddDataset =spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false) spark.createDataset(rddDataset).as[OafDataset].write.mode(SaveMode.Overwrite).save(s"$entityPath/dataset") log.info("Converting publication") - val rddPublication = spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication])) + val rddPublication =spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false) spark.createDataset(rddPublication).as[Publication].write.mode(SaveMode.Overwrite).save(s"$entityPath/publication") log.info("Converting software") - val rddSoftware = spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software])) + val rddSoftware =spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false) spark.createDataset(rddSoftware).as[Software].write.mode(SaveMode.Overwrite).save(s"$entityPath/software") log.info("Converting otherresearchproduct") - val rddOtherResearchProduct = spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct])) + val rddOtherResearchProduct =spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false) spark.createDataset(rddOtherResearchProduct).as[OtherResearchProduct].write.mode(SaveMode.Overwrite).save(s"$entityPath/otherresearchproduct") log.info("Converting Relation") - val relationSemanticFilter = List("cites", "iscitedby", "merges", "ismergedin") + val relationSemanticFilter = List("cites", "iscitedby","merges", "ismergedin", "HasAmongTopNSimilarDocuments","IsAmongTopNSimilarDocuments" ) - val rddRelation = spark.sparkContext.textFile(s"$sourcePath/relation") + val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation") .map(s => mapper.readValue(s, classOf[Relation])) - .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) + .filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false) + .filter(r=> r.getSource.startsWith("50") && r.getTarget.startsWith("50")) .filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") From 63952018c09dd5f1c43f5a4323b2d3fdb2a7ee17 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 14 Dec 2021 09:58:50 +0100 Subject: [PATCH 2/4] [scholexplorer] -moved SparkRetrieveDataciteDelta in scala folder --- .../eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename dhp-workflows/dhp-graph-mapper/src/main/{java => scala}/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala (96%) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala similarity index 96% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala rename to dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala index 62f53e4ad..8db271941 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala @@ -1,9 +1,9 @@ package eu.dnetlib.dhp.sx.graph import eu.dnetlib.dhp.application.AbstractScalaApplication -import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} -import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import eu.dnetlib.dhp.schema.oaf.Result import org.apache.spark.sql.functions.max +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.slf4j.Logger class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) { From b881ee5ef8b0f12dfaa25e448e34b1f0e6853c54 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 15 Dec 2021 11:25:23 +0100 Subject: [PATCH 3/4] [scholexplorer] - implemented generation of scholix of delta update of datacite --- .../dhp/sx/graph/scholix/ScholixUtils.scala | 57 +++- .../sx/graph/SparkRetrieveDataciteDelta.scala | 291 ++++++++++++++++++ .../sx/graph/SparkRetrieveDataciteDelta.scala | 54 ---- .../sx/graph/RetrieveDataciteDeltaTest.scala | 26 ++ 4 files changed, 364 insertions(+), 64 deletions(-) rename {dhp-workflows/dhp-graph-mapper => dhp-common}/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala (88%) create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala similarity index 88% rename from dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala rename to dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index bf81a26d4..f8010bbd5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -53,8 +53,6 @@ object ScholixUtils { else { summary.getDate.get(0) } - - } def inverseRelationShip(rel: ScholixRelationship): ScholixRelationship = { @@ -68,7 +66,6 @@ object ScholixUtils { override def zero: RelatedEntities = null override def reduce(b: RelatedEntities, a: (String, String, Long)): RelatedEntities = { - val id = a._1 val relatedDataset = if ("dataset".equalsIgnoreCase(a._2)) a._3 else 0 val relatedPublication = if ("publication".equalsIgnoreCase(a._2)) a._3 else 0 @@ -142,6 +139,14 @@ object ScholixUtils { } + def extractCollectedFrom(summary: ScholixResource): List[ScholixEntityId] = { + if (summary.getCollectedFrom != null && !summary.getCollectedFrom.isEmpty) { + val l: List[ScholixEntityId] = summary.getCollectedFrom.asScala.map { + d => new ScholixEntityId(d.getProvider.getName, d.getProvider.getIdentifiers) + }(collection.breakOut) + l + } else List() + } def extractCollectedFrom(summary: ScholixSummary): List[ScholixEntityId] = { if (summary.getDatasources != null && !summary.getDatasources.isEmpty) { @@ -160,7 +165,7 @@ object ScholixUtils { c => new ScholixEntityId(c.getValue, List(new ScholixIdentifier(c.getKey, DNET_IDENTIFIER_SCHEMA, null)).asJava) - }(collection breakOut) + }.toList l } else List() } @@ -222,6 +227,38 @@ object ScholixUtils { } + + def scholixFromSource(relation: Relation, source: ScholixResource):Scholix = { + if (relation == null || source == null) + return null + val s = new Scholix + var l: List[ScholixEntityId] = extractCollectedFrom(relation) + if (l.isEmpty) + l = extractCollectedFrom(source) + if (l.isEmpty) + return null + s.setLinkprovider(l.asJava) + var d = extractRelationDate(relation) + if (d == null) + d = source.getPublicationDate + + s.setPublicationDate(d) + + + if (source.getPublisher != null && !source.getPublisher.isEmpty) { + s.setPublisher(source.getPublisher) + } + + val semanticRelation = relations.getOrElse(relation.getRelClass.toLowerCase, null) + if (semanticRelation == null) + return null + s.setRelationship(new ScholixRelationship(semanticRelation.original, "datacite", semanticRelation.inverse)) + s.setSource(source) + + s + } + + def scholixFromSource(relation: Relation, source: ScholixSummary): Scholix = { if (relation == null || source == null) @@ -303,7 +340,7 @@ object ScholixUtils { s.setSubType(r.getInstance().get(0).getInstancetype.getClassname) if (r.getTitle != null && r.getTitle.asScala.nonEmpty) { - val titles: List[String] = r.getTitle.asScala.map(t => t.getValue)(collection breakOut) + val titles: List[String] = r.getTitle.asScala.map(t => t.getValue).toList if (titles.nonEmpty) s.setTitle(titles.asJava) else @@ -311,12 +348,12 @@ object ScholixUtils { } if (r.getAuthor != null && !r.getAuthor.isEmpty) { - val authors: List[String] = r.getAuthor.asScala.map(a => a.getFullname)(collection breakOut) - if (authors nonEmpty) + val authors: List[String] = r.getAuthor.asScala.map(a => a.getFullname).toList + if (authors.nonEmpty) s.setAuthor(authors.asJava) } if (r.getInstance() != null) { - val dt: List[String] = r.getInstance().asScala.filter(i => i.getDateofacceptance != null).map(i => i.getDateofacceptance.getValue)(collection.breakOut) + val dt: List[String] = r.getInstance().asScala.filter(i => i.getDateofacceptance != null).map(i => i.getDateofacceptance.getValue).toList if (dt.nonEmpty) s.setDate(dt.distinct.asJava) } @@ -327,7 +364,7 @@ object ScholixUtils { } if (r.getSubject != null && !r.getSubject.isEmpty) { - val subjects: List[SchemeValue] = r.getSubject.asScala.map(s => new SchemeValue(s.getQualifier.getClassname, s.getValue))(collection breakOut) + val subjects: List[SchemeValue] = r.getSubject.asScala.map(s => new SchemeValue(s.getQualifier.getClassname, s.getValue)).toList if (subjects.nonEmpty) s.setSubject(subjects.asJava) } @@ -336,7 +373,7 @@ object ScholixUtils { s.setPublisher(List(r.getPublisher.getValue).asJava) if (r.getCollectedfrom != null && !r.getCollectedfrom.isEmpty) { - val cf: List[CollectedFromType] = r.getCollectedfrom.asScala.map(c => new CollectedFromType(c.getValue, c.getKey, "complete"))(collection breakOut) + val cf: List[CollectedFromType] = r.getCollectedfrom.asScala.map(c => new CollectedFromType(c.getValue, c.getKey, "complete")).toList if (cf.nonEmpty) s.setDatasources(cf.distinct.asJava) } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala new file mode 100644 index 000000000..7f37829ee --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala @@ -0,0 +1,291 @@ +package eu.dnetlib.dhp.sx.graph + +import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup +import eu.dnetlib.dhp.datacite.{DataciteToOAFTransformation, DataciteType} +import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result} +import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource} +import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary +import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils +import eu.dnetlib.dhp.utils.{DHPUtils, ISLookupClientFactory} +import org.apache.spark.sql.functions.max +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import java.text.SimpleDateFormat + +class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) { + + val ISO_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ssZ" + val simpleFormatter = new SimpleDateFormat(ISO_DATE_PATTERN) + + val SCHOLIX_RESOURCE_PATH_NAME = "scholixResource" + val DATACITE_OAF_PATH_NAME = "dataciteOAFUpdate" + val PID_MAP_PATH_NAME = "pidMap" + val RESOLVED_REL_PATH_NAME ="resolvedRelation" + val SCHOLIX_PATH_NAME = "scholix" + + + def scholixResourcePath(workingPath:String) = s"$workingPath/$SCHOLIX_RESOURCE_PATH_NAME" + def dataciteOAFPath(workingPath:String) = s"$workingPath/$DATACITE_OAF_PATH_NAME" + def pidMapPath(workingPath:String) = s"$workingPath/$PID_MAP_PATH_NAME" + def resolvedRelationPath(workingPath:String) = s"$workingPath/$RESOLVED_REL_PATH_NAME" + def scholixPath(workingPath:String) = s"$workingPath/$SCHOLIX_PATH_NAME" + + + /** + * Utility to parse Date in ISO8601 to epochMillis + * @param inputDate The String represents an input date in ISO8601 + * @return The relative epochMillis of parsed date + */ + def ISO8601toEpochMillis(inputDate:String):Long = { + simpleFormatter.parse(inputDate).getTime + } + + + /** + * This method tries to retrieve the last collection date from all datacite + * records in HDFS. + * This method should be called before indexing scholexplorer to retrieve + * the delta of Datacite record to download, since from the generation of + * raw graph to the generation of Scholexplorer sometimes it takes 20 days + * @param spark + * @param entitiesPath + * @return the last collection date from the current scholexplorer Graph of the datacite records + */ + def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):Long = { + log.info("Retrieve last entities collected From") + + implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] + import spark.implicits._ + + val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Result] + + val date = entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0) + + ISO8601toEpochMillis(date) + } + + + /** + * The method of update Datacite relationships on Scholexplorer + * needs some utilities data structures + * One is the scholixResource DS that stores all the nodes in the Scholix Graph + * in format (dnetID, ScholixResource ) + * @param summaryPath the path of the summary in Scholix + * @param workingPath the working path + * @param spark the spark session + */ + def generateScholixResource(summaryPath:String, workingPath: String, spark:SparkSession) :Unit = { + implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] + implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] + + log.info("Convert All summary to ScholixResource") + spark.read.load(summaryPath).as[ScholixSummary] + .map(ScholixUtils.generateScholixResourceFromSummary) + .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0) + .map(r=> (r.getIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder)) + .write.mode(SaveMode.Overwrite).save(scholixResourcePath(workingPath)) + + } + + + /** + * This method get and Transform only datacite records with + * timestamp greater than timestamp + * @param datacitePath the datacite input Path + * @param timestamp the timestamp + * @param workingPath the working path where save the generated Dataset + * @param spark SparkSession + * @param vocabularies Vocabularies needed for transformation + */ + + def getDataciteUpdate(datacitePath:String, timestamp:Long, workingPath:String, spark:SparkSession,vocabularies: VocabularyGroup): Unit = { + import spark.implicits._ + val ds = spark.read.load(datacitePath).as[DataciteType] + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + ds.filter(_.timestamp>=timestamp) + .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks = true)) + .flatMap(i => fixRelations(i)).filter(i => i != null) + .write.mode(SaveMode.Overwrite).save(dataciteOAFPath(workingPath)) + + } + + + /** + * This method convert an Instance of OAF Result into + * Scholix Resource + * @param r The input Result + * @return The Scholix Resource + */ + def resultToScholixResource(r:Result):ScholixResource = { + ScholixUtils.generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r)) + } + + + /** + * After added the new ScholixResource, we need to update the scholix Pid Map + * to intersected with the new Datacite Relations + + * @param workingPath The working Path starting from save the new Map + * @param spark the spark session + */ + def generatePidMap(workingPath:String, spark:SparkSession ) :Unit = { + implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] + import spark.implicits._ + spark.read.load(scholixResourcePath(workingPath)).as[(String,ScholixResource)] + .flatMap(r=> + r._2.getIdentifier.asScala + .map(i =>DHPUtils.generateUnresolvedIdentifier(i.getIdentifier, i.getSchema)) + .map((_, r._1)) + )(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .groupByKey(_._1) + .reduceGroups((a,b) => if (a!= null && a._2!= null) a else b) + .map(_._2)(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .write.mode(SaveMode.Overwrite).save(pidMapPath(workingPath)) + } + + /** + * This method convert the new Datacite Resource into Scholix Resource + * Needed to fill the source and the type of Scholix Relationships + * @param workingPath the Working Path + * @param spark The spark Session + */ + def addMissingScholixResource(workingPath:String, spark:SparkSession ) :Unit = { + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] + implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] + + spark.read.load(dataciteOAFPath(workingPath)).as[Oaf] + .filter(_.isInstanceOf[Result]) + .map(_.asInstanceOf[Result]) + .map(resultToScholixResource) + .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0) + .map(r=> (r.getIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder)) + .write.mode(SaveMode.Append).save(scholixResourcePath(workingPath)) + } + + + /** + * This method resolve the datacite relation and filter the resolved + * relation + * @param workingPath the working path + * @param spark the spark session + */ + + def resolveUpdateRelation(workingPath:String, spark:SparkSession) :Unit = { + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation] + import spark.implicits._ + + val pidMap = spark.read.load(pidMapPath(workingPath)).as[(String,String)] + + val unresolvedRelations:Dataset[(String,Relation)] = spark.read.load(dataciteOAFPath(workingPath)).as[Oaf] + .filter(_.isInstanceOf[Relation]) + .map(_.asInstanceOf[Relation]) + .map { r => + if (r.getSource.startsWith("unresolved")) + (r.getSource, r) + else + (r.getTarget, r) + }(Encoders.tuple(Encoders.STRING, relationEncoder)) + + unresolvedRelations + .joinWith(pidMap, unresolvedRelations("_1").equalTo(pidMap("_1"))) + .map(t => { + val r =t._1._2 + val resolvedIdentifier = t._2._2 + if (r.getSource.startsWith("unresolved")) + r.setSource(resolvedIdentifier) + else + r.setTarget(resolvedIdentifier) + r + })(relationEncoder) + .filter(r => !(r.getSource.startsWith("unresolved") || r.getTarget.startsWith("unresolved") )) + .write.mode(SaveMode.Overwrite) + .save(resolvedRelationPath(workingPath)) + } + + + + /** + * This method generate scholix starting from resolved relation + * + * + * @param workingPath + * @param spark + */ + def generateScholixUpdate(workingPath:String, spark:SparkSession) :Unit = { + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val scholixEncoder:Encoder[Scholix] = Encoders.kryo[Scholix] + implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] + implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation] + import spark.implicits._ + + val relationss:Dataset[(String, Relation)] = spark.read.load(s"$workingPath/ResolvedRelation").as[Relation].map(r =>(r.getSource,r))(Encoders.tuple(Encoders.STRING, relationEncoder)) + + val id_summary:Dataset[(String,ScholixResource)] = spark.read.load(scholixResourcePath(workingPath)).as[(String,ScholixResource)] + + relationss.joinWith(id_summary, relationss("_1").equalTo(id_summary("_1")),"inner") + .map(t => (t._1._2.getTarget,ScholixUtils.scholixFromSource(t._1._2, t._2._2)))(Encoders.tuple(Encoders.STRING, scholixEncoder)) + .write.mode(SaveMode.Overwrite) + .save(s"$workingPath/scholix_one_verse") + + val source_scholix:Dataset[(String,Scholix)] = spark.read.load(s"$workingPath/scholix_one_verse").as[(String,Scholix)] + + source_scholix.joinWith(id_summary, source_scholix("_1").equalTo(id_summary("_1")),"inner") + .map(t => { + val target = t._2._2 + val scholix = t._1._2 + scholix.setTarget(target) + scholix + })(scholixEncoder).write.mode(SaveMode.Overwrite).save(s"$workingPath/scholix") + } + + + + + + /** + * Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val sourcePath = parser.get("sourcePath") + log.info(s"SourcePath is '$sourcePath'") + + val datacitePath = parser.get("datacitePath") + log.info(s"DatacitePath is '$datacitePath'") + + val workingPath = parser.get("workingPath") + log.info(s"workingPath is '$workingPath'") + + val isLookupUrl: String = parser.get("isLookupUrl") + log.info("isLookupUrl: {}", isLookupUrl) + + val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) + val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) + require(vocabularies != null) + + + val updateDS:Boolean = "true".equalsIgnoreCase(parser.get("updateDS")) + log.info(s"updateDS is '$updateDS'") + + var lastCollectionDate = 0L + if (updateDS) { + generateScholixResource(s"$sourcePath/provision/summaries", workingPath, spark) + log.info("Retrieve last entities collected From starting from scholix Graph") + lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities") + } + + getDataciteUpdate(datacitePath, lastCollectionDate, workingPath, spark, vocabularies) + addMissingScholixResource(workingPath,spark) + generatePidMap(workingPath, spark) + resolveUpdateRelation(workingPath,spark) + generateScholixUpdate(workingPath, spark) + + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala deleted file mode 100644 index 8db271941..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala +++ /dev/null @@ -1,54 +0,0 @@ -package eu.dnetlib.dhp.sx.graph - -import eu.dnetlib.dhp.application.AbstractScalaApplication -import eu.dnetlib.dhp.schema.oaf.Result -import org.apache.spark.sql.functions.max -import org.apache.spark.sql.{Encoder, Encoders, SparkSession} -import org.slf4j.Logger - -class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) { - - - def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):String = { - log.info("Retrieve last entities collected From") - - implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] - import spark.implicits._ - - val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Result] - - entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0) - - - - } - - - /** - * Here all the spark applications runs this method - * where the whole logic of the spark node is defined - */ - override def run(): Unit = { - val sourcePath = parser.get("sourcePath") - log.info(s"SourcePath is '$sourcePath'") - - val datacitePath = parser.get("datacitePath") - log.info(s"DatacitePath is '$datacitePath'") - - - log.info("Retrieve last entities collected From") - - implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] - - val lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities") - - - - - - - - - - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala new file mode 100644 index 000000000..c277b0aa1 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala @@ -0,0 +1,26 @@ +package eu.dnetlib.dhp.sx.graph +import org.junit.jupiter.api.Test + +import java.text.SimpleDateFormat + + + +class RetrieveDataciteDeltaTest { + + @Test + def testParsingDate(): Unit = { + + + val inputDate = "2021-12-02T11:17:36+0000" + + val t = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").parse(inputDate).getTime + + + println(t) + + + + } + + +} From 3920d68992b1d7f8a67afec5cf8dd3b03037cd29 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 21 Dec 2021 11:41:49 +0100 Subject: [PATCH 4/4] Fixed workflow generation of delta in datacite --- .../dhp/sx/graph/scholix/ScholixUtils.scala | 22 ++- .../datacite/oozie_app/config-default.xml | 19 +++ .../sx/graph/datacite/oozie_app/workflow.xml | 62 +++++++ .../graph/retrieve_datacite_delta_params.json | 41 +++++ .../sx/graph/SparkRetrieveDataciteDelta.scala | 154 +++++++++-------- .../PrepareInfo.java | 21 +-- .../SparkResultToOrganizationFromSemRel.java | 5 - .../StepActions.java | 6 +- .../PrepareInfoJobTest.java | 43 ++--- .../sx/graph/convert_object_json_params.json | 9 +- .../graph/finalGraph/oozie_app/workflow.xml | 62 ------- .../eu/dnetlib/dhp/sx/graph/relations.json | 158 ------------------ .../oozie_app/config-default.xml | 10 ++ .../serializeGraph/oozie_app/workflow.xml | 83 +++++++++ .../sx/graph/SparkConvertObjectToJson.scala | 6 +- 15 files changed, 364 insertions(+), 337 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index f8010bbd5..f35af0905 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -12,7 +12,7 @@ import org.json4s.jackson.JsonMethods.parse import scala.collection.JavaConverters._ import scala.io.Source -object ScholixUtils { +object ScholixUtils extends Serializable { val DNET_IDENTIFIER_SCHEMA: String = "DNET Identifier" @@ -24,7 +24,7 @@ object ScholixUtils { case class RelatedEntities(id: String, relatedDataset: Long, relatedPublication: Long) {} val relations: Map[String, RelationVocabulary] = { - val input = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/relations.json")).mkString + val input = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/scholexplorer/relation/relations.json")).mkString implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(input) @@ -62,6 +62,11 @@ object ScholixUtils { } + def generateScholixResourceFromResult(r:Result) :ScholixResource = { + generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r)) + } + + val statsAggregator: Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] = new Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] with Serializable { override def zero: RelatedEntities = null @@ -184,6 +189,19 @@ object ScholixUtils { } + def generateCompleteScholix(scholix: Scholix, target: ScholixResource): Scholix = { + val s = new Scholix + s.setPublicationDate(scholix.getPublicationDate) + s.setPublisher(scholix.getPublisher) + s.setLinkprovider(scholix.getLinkprovider) + s.setRelationship(scholix.getRelationship) + s.setSource(scholix.getSource) + s.setTarget(target) + s.setIdentifier(DHPUtils.md5(s"${s.getSource.getIdentifier}::${s.getRelationship.getName}::${s.getTarget.getIdentifier}")) + s + } + + def generateScholixResourceFromSummary(summaryObject: ScholixSummary): ScholixResource = { val r = new ScholixResource r.setIdentifier(summaryObject.getLocalIdentifier) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml new file mode 100644 index 000000000..bdd48b0ab --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml @@ -0,0 +1,19 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml new file mode 100644 index 000000000..751b124cf --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml @@ -0,0 +1,62 @@ + + + + sourcePath + the source path of scholix graph + + + datacitePath + the datacite native path + + + workingSupportPath + the working Support path + + + isLookupUrl + The IS lookUp service endopoint + + + updateDS + false + The transformation Rule to apply + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + New Update from Datacite to Scholix + eu.dnetlib.dhp.sx.graph.SparkRetrieveDataciteDelta + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=6000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${sourcePath} + --datacitePath${datacitePath} + --masteryarn + --workingSupportPath${workingSupportPath} + --isLookupUrl${isLookupUrl} + --updateDS${updateDS} + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json new file mode 100644 index 000000000..78777ffff --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json @@ -0,0 +1,41 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source mdstore path", + "paramRequired": true + }, + + { + "paramName": "d", + "paramLongName": "datacitePath", + "paramDescription": "the datacite native path", + "paramRequired": true + }, + + { + "paramName": "w", + "paramLongName": "workingSupportPath", + "paramDescription": "the working Support path", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "isLookupUrl", + "paramDescription": "the isLookup URL", + "paramRequired": true + }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "updateDS", + "paramDescription": "Need to regenerate all support Dataset", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala index 7f37829ee..45a6cfc89 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala @@ -9,9 +9,10 @@ import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource} import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils import eu.dnetlib.dhp.utils.{DHPUtils, ISLookupClientFactory} +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.functions.max import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.Logger +import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ import java.text.SimpleDateFormat @@ -58,14 +59,15 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):Long = { log.info("Retrieve last entities collected From") - implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] import spark.implicits._ - val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Result] + val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Oaf].filter(o =>o.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]) val date = entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0) - ISO8601toEpochMillis(date) + ISO8601toEpochMillis(date) / 1000 } @@ -73,7 +75,7 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L * The method of update Datacite relationships on Scholexplorer * needs some utilities data structures * One is the scholixResource DS that stores all the nodes in the Scholix Graph - * in format (dnetID, ScholixResource ) + * in format ScholixResource * @param summaryPath the path of the summary in Scholix * @param workingPath the working path * @param spark the spark session @@ -84,11 +86,37 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L log.info("Convert All summary to ScholixResource") spark.read.load(summaryPath).as[ScholixSummary] - .map(ScholixUtils.generateScholixResourceFromSummary) + .map(ScholixUtils.generateScholixResourceFromSummary)(scholixResourceEncoder) .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0) - .map(r=> (r.getIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder)) - .write.mode(SaveMode.Overwrite).save(scholixResourcePath(workingPath)) + .write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_native") + } + /** + * This method convert the new Datacite Resource into Scholix Resource + * Needed to fill the source and the type of Scholix Relationships + * @param workingPath the Working Path + * @param spark The spark Session + */ + def addMissingScholixResource(workingPath:String, spark:SparkSession ) :Unit = { + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] + implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] + import spark.implicits._ + + spark.read.load(dataciteOAFPath(workingPath)).as[Oaf] + .filter(_.isInstanceOf[Result]) + .map(_.asInstanceOf[Result]) + .map(ScholixUtils.generateScholixResourceFromResult) + .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0) + .write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_update") + + val update = spark.read.load(s"${scholixResourcePath(workingPath)}_update").as[ScholixResource] + val native = spark.read.load(s"${scholixResourcePath(workingPath)}_native").as[ScholixResource] + val graph = update.union(native) + .groupByKey(_.getDnetIdentifier) + .reduceGroups((a,b) => if (a!= null && a.getDnetIdentifier!= null) a else b) + .map(_._2) + graph.write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_graph") } @@ -102,29 +130,20 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L * @param vocabularies Vocabularies needed for transformation */ - def getDataciteUpdate(datacitePath:String, timestamp:Long, workingPath:String, spark:SparkSession,vocabularies: VocabularyGroup): Unit = { + def getDataciteUpdate(datacitePath:String, timestamp:Long, workingPath:String, spark:SparkSession,vocabularies: VocabularyGroup): Long = { import spark.implicits._ val ds = spark.read.load(datacitePath).as[DataciteType] implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] - ds.filter(_.timestamp>=timestamp) - .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks = true)) - .flatMap(i => fixRelations(i)).filter(i => i != null) - .write.mode(SaveMode.Overwrite).save(dataciteOAFPath(workingPath)) - + val total = ds.filter(_.timestamp>=timestamp).count() + if (total >0) { + ds.filter(_.timestamp >= timestamp) + .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks = true)) + .flatMap(i => fixRelations(i)).filter(i => i != null) + .write.mode(SaveMode.Overwrite).save(dataciteOAFPath(workingPath)) + } + total } - - /** - * This method convert an Instance of OAF Result into - * Scholix Resource - * @param r The input Result - * @return The Scholix Resource - */ - def resultToScholixResource(r:Result):ScholixResource = { - ScholixUtils.generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r)) - } - - /** * After added the new ScholixResource, we need to update the scholix Pid Map * to intersected with the new Datacite Relations @@ -135,11 +154,11 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L def generatePidMap(workingPath:String, spark:SparkSession ) :Unit = { implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] import spark.implicits._ - spark.read.load(scholixResourcePath(workingPath)).as[(String,ScholixResource)] + spark.read.load(s"${scholixResourcePath(workingPath)}_graph").as[ScholixResource] .flatMap(r=> - r._2.getIdentifier.asScala + r.getIdentifier.asScala .map(i =>DHPUtils.generateUnresolvedIdentifier(i.getIdentifier, i.getSchema)) - .map((_, r._1)) + .map(t =>(t, r.getDnetIdentifier)) )(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .groupByKey(_._1) .reduceGroups((a,b) => if (a!= null && a._2!= null) a else b) @@ -147,27 +166,6 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L .write.mode(SaveMode.Overwrite).save(pidMapPath(workingPath)) } - /** - * This method convert the new Datacite Resource into Scholix Resource - * Needed to fill the source and the type of Scholix Relationships - * @param workingPath the Working Path - * @param spark The spark Session - */ - def addMissingScholixResource(workingPath:String, spark:SparkSession ) :Unit = { - implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] - implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] - - spark.read.load(dataciteOAFPath(workingPath)).as[Oaf] - .filter(_.isInstanceOf[Result]) - .map(_.asInstanceOf[Result]) - .map(resultToScholixResource) - .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0) - .map(r=> (r.getIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder)) - .write.mode(SaveMode.Append).save(scholixResourcePath(workingPath)) - } - - /** * This method resolve the datacite relation and filter the resolved * relation @@ -222,25 +220,26 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L implicit val scholixEncoder:Encoder[Scholix] = Encoders.kryo[Scholix] implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation] - import spark.implicits._ + implicit val intermediateEncoder :Encoder[(String,Scholix)] = Encoders.tuple(Encoders.STRING, scholixEncoder) - val relationss:Dataset[(String, Relation)] = spark.read.load(s"$workingPath/ResolvedRelation").as[Relation].map(r =>(r.getSource,r))(Encoders.tuple(Encoders.STRING, relationEncoder)) - val id_summary:Dataset[(String,ScholixResource)] = spark.read.load(scholixResourcePath(workingPath)).as[(String,ScholixResource)] + val relations:Dataset[(String, Relation)] = spark.read.load(resolvedRelationPath(workingPath)).as[Relation].map(r =>(r.getSource,r))(Encoders.tuple(Encoders.STRING, relationEncoder)) - relationss.joinWith(id_summary, relationss("_1").equalTo(id_summary("_1")),"inner") - .map(t => (t._1._2.getTarget,ScholixUtils.scholixFromSource(t._1._2, t._2._2)))(Encoders.tuple(Encoders.STRING, scholixEncoder)) - .write.mode(SaveMode.Overwrite) - .save(s"$workingPath/scholix_one_verse") + val id_summary:Dataset[(String,ScholixResource)] = spark.read.load(s"${scholixResourcePath(workingPath)}_graph").as[ScholixResource].map(r => (r.getDnetIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder)) - val source_scholix:Dataset[(String,Scholix)] = spark.read.load(s"$workingPath/scholix_one_verse").as[(String,Scholix)] + id_summary.cache() + + relations.joinWith(id_summary, relations("_1").equalTo(id_summary("_1")),"inner") + .map(t => (t._1._2.getTarget,ScholixUtils.scholixFromSource(t._1._2, t._2._2))) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/scholix_one_verse") + + val source_scholix:Dataset[(String, Scholix)] =spark.read.load(s"$workingPath/scholix_one_verse").as[(String,Scholix)] source_scholix.joinWith(id_summary, source_scholix("_1").equalTo(id_summary("_1")),"inner") .map(t => { - val target = t._2._2 - val scholix = t._1._2 - scholix.setTarget(target) - scholix + val target:ScholixResource =t._2._2 + val scholix:Scholix = t._1._2 + ScholixUtils.generateCompleteScholix(scholix,target) })(scholixEncoder).write.mode(SaveMode.Overwrite).save(s"$workingPath/scholix") } @@ -259,7 +258,7 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L val datacitePath = parser.get("datacitePath") log.info(s"DatacitePath is '$datacitePath'") - val workingPath = parser.get("workingPath") + val workingPath = parser.get("workingSupportPath") log.info(s"workingPath is '$workingPath'") val isLookupUrl: String = parser.get("isLookupUrl") @@ -279,13 +278,28 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L log.info("Retrieve last entities collected From starting from scholix Graph") lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities") } + else { + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + fs.delete(new Path(s"${scholixResourcePath(workingPath)}_native"), true) + fs.rename(new Path(s"${scholixResourcePath(workingPath)}_graph"), new Path(s"${scholixResourcePath(workingPath)}_native")) + lastCollectionDate = retrieveLastCollectedFrom(spark, dataciteOAFPath(workingPath)) + } - getDataciteUpdate(datacitePath, lastCollectionDate, workingPath, spark, vocabularies) - addMissingScholixResource(workingPath,spark) - generatePidMap(workingPath, spark) - resolveUpdateRelation(workingPath,spark) - generateScholixUpdate(workingPath, spark) - - + val numRecords = getDataciteUpdate(datacitePath, lastCollectionDate, workingPath, spark, vocabularies) + if (numRecords>0) { + addMissingScholixResource(workingPath,spark) + generatePidMap(workingPath, spark) + resolveUpdateRelation(workingPath,spark) + generateScholixUpdate(workingPath, spark) + } + } +} + + +object SparkRetrieveDataciteDelta { + val log: Logger = LoggerFactory.getLogger(SparkRetrieveDataciteDelta.getClass) + + def main(args: Array[String]): Unit = { + new SparkRetrieveDataciteDelta("/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json", args, log).initialize().run() } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java index 707462f24..23909fd9a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java @@ -33,13 +33,14 @@ public class PrepareInfo implements Serializable { private static final Logger log = LoggerFactory.getLogger(PrepareInfo.class); // associate orgs with all their parent - private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet " + + private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet " + + "FROM relation " + "WHERE lower(relclass) = '" + ModelConstants.IS_PARENT_OF.toLowerCase() + "' and datainfo.deletedbyinference = false " + "GROUP BY target"; - //associates results with all the orgs they are affiliated to + // associates results with all the orgs they are affiliated to private static final String RESULT_ORGANIZATION_QUERY = "SELECT source key, collect_set(target) as valueSet " + "FROM relation " + "WHERE lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() + @@ -88,7 +89,7 @@ public class PrepareInfo implements Serializable { childParentPath, leavesPath, resultOrganizationPath, - relationPath)); + relationPath)); } private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath, @@ -113,13 +114,13 @@ public class PrepareInfo implements Serializable { .json(resultOrganizationPath); relation - .filter( - (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(relationPath); + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(relationPath); Dataset children = spark .sql( diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java index e26276f53..9ceebf222 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.resulttoorganizationfromsemrel; import static eu.dnetlib.dhp.PropagationConstant.*; - import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.io.Serializable; @@ -22,13 +21,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.KeyValueSet; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Relation; - public class SparkResultToOrganizationFromSemRel implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class); private static final int MAX_ITERATION = 5; @@ -201,6 +198,4 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { .json(outputPath); } - - } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java index 02444cb15..1adbbe60e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java @@ -75,8 +75,8 @@ public class StepActions implements Serializable { ret.setValueSet(orgs); return ret; }, Encoders.bean(KeyValueSet.class)) - .write() - .mode(SaveMode.Overwrite) + .write() + .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } @@ -179,7 +179,6 @@ public class StepActions implements Serializable { "GROUP BY resId") .as(Encoders.bean(KeyValueSet.class)); - // create new relations from result to organization for each result linked to a leaf return resultParent .flatMap( @@ -200,7 +199,6 @@ public class StepActions implements Serializable { .iterator(), Encoders.bean(Relation.class)); - } } diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java index 21d99321b..2d2668db3 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java @@ -84,7 +84,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + "-relationPath", workingDir.toString() + "/relation" }); @@ -229,7 +229,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + "-relationPath", workingDir.toString() + "/relation" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -335,34 +335,35 @@ public class PrepareInfoJobTest { } @Test - public void relationTest()throws Exception { + public void relationTest() throws Exception { PrepareInfo - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-graphPath", getClass() - .getResource( - "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest") - .getPath(), - "-hive_metastore_uris", "", - "-leavesPath", workingDir.toString() + "/currentIteration/", - "-resultOrgPath", workingDir.toString() + "/resultOrganization/", - "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-graphPath", getClass() + .getResource( + "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest") + .getPath(), + "-hive_metastore_uris", "", + "-leavesPath", workingDir.toString() + "/currentIteration/", + "-resultOrgPath", workingDir.toString() + "/resultOrganization/", + "-childParentPath", workingDir.toString() + "/childParentOrg/", + "-relationPath", workingDir.toString() + "/relation" - }); + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); Assertions.assertEquals(7, verificationDs.count()); } + @Test public void resultOrganizationTest1() throws Exception { @@ -378,7 +379,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + "-relationPath", workingDir.toString() + "/relation" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -512,7 +513,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + "-relationPath", workingDir.toString() + "/relation" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -539,7 +540,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + "-relationPath", workingDir.toString() + "/relation" }); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json index 4b15da623..890570a0b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json @@ -1,6 +1,7 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}, - {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true} + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"su", "paramLongName":"scholixUpdatePath", "paramDescription": "the scholix updated Path", "paramRequired": false}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}, + {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml index 85c0d486d..e46e59cc0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml @@ -90,68 +90,6 @@ --relationPath${targetPath}/relation - - - - - - - - - - - - - - - - yarn - cluster - Serialize scholix to JSON - eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=6000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${targetPath}/provision/scholix/scholix - --targetPath${targetPath}/index/scholix_json - --objectTypescholix - - - - - - - - - yarn - cluster - Serialize summary to JSON - eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=6000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${targetPath}/provision/summaries_filtered - --targetPath${targetPath}/index/summaries_json - --objectTypesummary - diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json deleted file mode 100644 index 98e8daa18..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json +++ /dev/null @@ -1,158 +0,0 @@ -{ - "cites":{ - "original":"Cites", - "inverse":"IsCitedBy" - }, - "compiles":{ - "original":"Compiles", - "inverse":"IsCompiledBy" - }, - "continues":{ - "original":"Continues", - "inverse":"IsContinuedBy" - }, - "derives":{ - "original":"IsSourceOf", - "inverse":"IsDerivedFrom" - }, - "describes":{ - "original":"Describes", - "inverse":"IsDescribedBy" - }, - "documents":{ - "original":"Documents", - "inverse":"IsDocumentedBy" - }, - "hasmetadata":{ - "original":"HasMetadata", - "inverse":"IsMetadataOf" - }, - "hasassociationwith":{ - "original":"HasAssociationWith", - "inverse":"HasAssociationWith" - }, - "haspart":{ - "original":"HasPart", - "inverse":"IsPartOf" - }, - "hasversion":{ - "original":"HasVersion", - "inverse":"IsVersionOf" - }, - "iscitedby":{ - "original":"IsCitedBy", - "inverse":"Cites" - }, - "iscompiledby":{ - "original":"IsCompiledBy", - "inverse":"Compiles" - }, - "iscontinuedby":{ - "original":"IsContinuedBy", - "inverse":"Continues" - }, - "isderivedfrom":{ - "original":"IsDerivedFrom", - "inverse":"IsSourceOf" - }, - "isdescribedby":{ - "original":"IsDescribedBy", - "inverse":"Describes" - }, - "isdocumentedby":{ - "original":"IsDocumentedBy", - "inverse":"Documents" - }, - "isidenticalto":{ - "original":"IsIdenticalTo", - "inverse":"IsIdenticalTo" - }, - "ismetadatafor":{ - "original":"IsMetadataFor", - "inverse":"IsMetadataOf" - }, - "ismetadataof":{ - "original":"IsMetadataOf", - "inverse":"IsMetadataFor" - }, - "isnewversionof":{ - "original":"IsNewVersionOf", - "inverse":"IsPreviousVersionOf" - }, - "isobsoletedby":{ - "original":"IsObsoletedBy", - "inverse":"Obsoletes" - }, - "isoriginalformof":{ - "original":"IsOriginalFormOf", - "inverse":"IsVariantFormOf" - }, - "ispartof":{ - "original":"IsPartOf", - "inverse":"HasPart" - }, - "ispreviousversionof":{ - "original":"IsPreviousVersionOf", - "inverse":"IsNewVersionOf" - }, - "isreferencedby":{ - "original":"IsReferencedBy", - "inverse":"References" - }, - "isrelatedto":{ - "original":"IsRelatedTo", - "inverse":"IsRelatedTo" - }, - "isrequiredby":{ - "original":"IsRequiredBy", - "inverse":"Requires" - }, - "isreviewedby":{ - "original":"IsReviewedBy", - "inverse":"Reviews" - }, - "issourceof":{ - "original":"IsSourceOf", - "inverse":"IsDerivedFrom" - }, - "issupplementedby":{ - "original":"IsSupplementedBy", - "inverse":"IsSupplementTo" - }, - "issupplementto":{ - "original":"IsSupplementTo", - "inverse":"IsSupplementedBy" - }, - "isvariantformof":{ - "original":"IsVariantFormOf", - "inverse":"IsOriginalFormOf" - }, - "isversionof":{ - "original":"IsVersionOf", - "inverse":"HasVersion" - }, - "obsoletes":{ - "original":"Obsoletes", - "inverse":"IsObsoletedBy" - }, - "references":{ - "original":"References", - "inverse":"IsReferencedBy" - }, - "requires":{ - "original":"Requires", - "inverse":"IsRequiredBy" - }, - "related":{ - "original":"IsRelatedTo", - "inverse":"IsRelatedTo" - }, - "reviews":{ - "original":"Reviews", - "inverse":"IsReviewedBy" - }, - "unknown":{ - "original":"Unknown", - "inverse":"Unknown" - } -} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml new file mode 100644 index 000000000..6fb2a1253 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml @@ -0,0 +1,10 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml new file mode 100644 index 000000000..2844d7baa --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml @@ -0,0 +1,83 @@ + + + + scholixUpdatePath + the working dir base path of the scholix updated + + + targetPath + the final graph path + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + yarn + cluster + Serialize scholix to JSON + eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=6000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/provision/scholix/scholix + --targetPath${targetPath}/index/scholix_json + --scholixUpdatePath${scholixUpdatePath} + --objectTypescholix + + + + + + + + + yarn + cluster + Serialize summary to JSON + eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=6000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/provision/summaries_filtered + --targetPath${targetPath}/index/summaries_json + --objectTypesummary + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala index cc1b97fd6..0c54de7c8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala @@ -30,6 +30,9 @@ object SparkConvertObjectToJson { log.info(s"targetPath -> $targetPath") val objectType = parser.get("objectType") log.info(s"objectType -> $objectType") + val scholixUpdatePath = parser.get("scholixUpdatePath") + log.info(s"scholixUpdatePath -> $scholixUpdatePath") + implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix] @@ -42,7 +45,8 @@ object SparkConvertObjectToJson { case "scholix" => log.info("Serialize Scholix") val d: Dataset[Scholix] = spark.read.load(sourcePath).as[Scholix] - d.map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.repartition(6000).saveAsTextFile(targetPath, classOf[GzipCodec]) + val u :Dataset[Scholix]= spark.read.load(s"$scholixUpdatePath/scholix").as[Scholix] + d.union(u).repartition(8000).map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.saveAsTextFile(targetPath, classOf[GzipCodec]) case "summary" => log.info("Serialize Summary") val d: Dataset[ScholixSummary] = spark.read.load(sourcePath).as[ScholixSummary]