From 3920d68992b1d7f8a67afec5cf8dd3b03037cd29 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 21 Dec 2021 11:41:49 +0100 Subject: [PATCH] Fixed workflow generation of delta in datacite --- .../dhp/sx/graph/scholix/ScholixUtils.scala | 22 ++- .../datacite/oozie_app/config-default.xml | 19 +++ .../sx/graph/datacite/oozie_app/workflow.xml | 62 +++++++ .../graph/retrieve_datacite_delta_params.json | 41 +++++ .../sx/graph/SparkRetrieveDataciteDelta.scala | 154 +++++++++-------- .../PrepareInfo.java | 21 +-- .../SparkResultToOrganizationFromSemRel.java | 5 - .../StepActions.java | 6 +- .../PrepareInfoJobTest.java | 43 ++--- .../sx/graph/convert_object_json_params.json | 9 +- .../graph/finalGraph/oozie_app/workflow.xml | 62 ------- .../eu/dnetlib/dhp/sx/graph/relations.json | 158 ------------------ .../oozie_app/config-default.xml | 10 ++ .../serializeGraph/oozie_app/workflow.xml | 83 +++++++++ .../sx/graph/SparkConvertObjectToJson.scala | 6 +- 15 files changed, 364 insertions(+), 337 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index f8010bbd5..f35af0905 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -12,7 +12,7 @@ import org.json4s.jackson.JsonMethods.parse import scala.collection.JavaConverters._ import scala.io.Source -object ScholixUtils { +object ScholixUtils extends Serializable { val DNET_IDENTIFIER_SCHEMA: String = "DNET Identifier" @@ -24,7 +24,7 @@ object ScholixUtils { case class RelatedEntities(id: String, relatedDataset: Long, relatedPublication: Long) {} val relations: Map[String, RelationVocabulary] = { - val input = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/relations.json")).mkString + val input = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/scholexplorer/relation/relations.json")).mkString implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(input) @@ -62,6 +62,11 @@ object ScholixUtils { } + def generateScholixResourceFromResult(r:Result) :ScholixResource = { + generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r)) + } + + val statsAggregator: Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] = new Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] with Serializable { override def zero: RelatedEntities = null @@ -184,6 +189,19 @@ object ScholixUtils { } + def generateCompleteScholix(scholix: Scholix, target: ScholixResource): Scholix = { + val s = new Scholix + s.setPublicationDate(scholix.getPublicationDate) + s.setPublisher(scholix.getPublisher) + s.setLinkprovider(scholix.getLinkprovider) + s.setRelationship(scholix.getRelationship) + s.setSource(scholix.getSource) + s.setTarget(target) + s.setIdentifier(DHPUtils.md5(s"${s.getSource.getIdentifier}::${s.getRelationship.getName}::${s.getTarget.getIdentifier}")) + s + } + + def generateScholixResourceFromSummary(summaryObject: ScholixSummary): ScholixResource = { val r = new ScholixResource r.setIdentifier(summaryObject.getLocalIdentifier) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml new file mode 100644 index 000000000..bdd48b0ab --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml @@ -0,0 +1,19 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml new file mode 100644 index 000000000..751b124cf --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml @@ -0,0 +1,62 @@ + + + + sourcePath + the source path of scholix graph + + + datacitePath + the datacite native path + + + workingSupportPath + the working Support path + + + isLookupUrl + The IS lookUp service endopoint + + + updateDS + false + The transformation Rule to apply + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + New Update from Datacite to Scholix + eu.dnetlib.dhp.sx.graph.SparkRetrieveDataciteDelta + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=6000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${sourcePath} + --datacitePath${datacitePath} + --masteryarn + --workingSupportPath${workingSupportPath} + --isLookupUrl${isLookupUrl} + --updateDS${updateDS} + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json new file mode 100644 index 000000000..78777ffff --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json @@ -0,0 +1,41 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source mdstore path", + "paramRequired": true + }, + + { + "paramName": "d", + "paramLongName": "datacitePath", + "paramDescription": "the datacite native path", + "paramRequired": true + }, + + { + "paramName": "w", + "paramLongName": "workingSupportPath", + "paramDescription": "the working Support path", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "isLookupUrl", + "paramDescription": "the isLookup URL", + "paramRequired": true + }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "updateDS", + "paramDescription": "Need to regenerate all support Dataset", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala index 7f37829ee..45a6cfc89 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala @@ -9,9 +9,10 @@ import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource} import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils import eu.dnetlib.dhp.utils.{DHPUtils, ISLookupClientFactory} +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.functions.max import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.Logger +import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ import java.text.SimpleDateFormat @@ -58,14 +59,15 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):Long = { log.info("Retrieve last entities collected From") - implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] import spark.implicits._ - val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Result] + val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Oaf].filter(o =>o.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]) val date = entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0) - ISO8601toEpochMillis(date) + ISO8601toEpochMillis(date) / 1000 } @@ -73,7 +75,7 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L * The method of update Datacite relationships on Scholexplorer * needs some utilities data structures * One is the scholixResource DS that stores all the nodes in the Scholix Graph - * in format (dnetID, ScholixResource ) + * in format ScholixResource * @param summaryPath the path of the summary in Scholix * @param workingPath the working path * @param spark the spark session @@ -84,11 +86,37 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L log.info("Convert All summary to ScholixResource") spark.read.load(summaryPath).as[ScholixSummary] - .map(ScholixUtils.generateScholixResourceFromSummary) + .map(ScholixUtils.generateScholixResourceFromSummary)(scholixResourceEncoder) .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0) - .map(r=> (r.getIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder)) - .write.mode(SaveMode.Overwrite).save(scholixResourcePath(workingPath)) + .write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_native") + } + /** + * This method convert the new Datacite Resource into Scholix Resource + * Needed to fill the source and the type of Scholix Relationships + * @param workingPath the Working Path + * @param spark The spark Session + */ + def addMissingScholixResource(workingPath:String, spark:SparkSession ) :Unit = { + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] + implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] + import spark.implicits._ + + spark.read.load(dataciteOAFPath(workingPath)).as[Oaf] + .filter(_.isInstanceOf[Result]) + .map(_.asInstanceOf[Result]) + .map(ScholixUtils.generateScholixResourceFromResult) + .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0) + .write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_update") + + val update = spark.read.load(s"${scholixResourcePath(workingPath)}_update").as[ScholixResource] + val native = spark.read.load(s"${scholixResourcePath(workingPath)}_native").as[ScholixResource] + val graph = update.union(native) + .groupByKey(_.getDnetIdentifier) + .reduceGroups((a,b) => if (a!= null && a.getDnetIdentifier!= null) a else b) + .map(_._2) + graph.write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_graph") } @@ -102,29 +130,20 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L * @param vocabularies Vocabularies needed for transformation */ - def getDataciteUpdate(datacitePath:String, timestamp:Long, workingPath:String, spark:SparkSession,vocabularies: VocabularyGroup): Unit = { + def getDataciteUpdate(datacitePath:String, timestamp:Long, workingPath:String, spark:SparkSession,vocabularies: VocabularyGroup): Long = { import spark.implicits._ val ds = spark.read.load(datacitePath).as[DataciteType] implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] - ds.filter(_.timestamp>=timestamp) - .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks = true)) - .flatMap(i => fixRelations(i)).filter(i => i != null) - .write.mode(SaveMode.Overwrite).save(dataciteOAFPath(workingPath)) - + val total = ds.filter(_.timestamp>=timestamp).count() + if (total >0) { + ds.filter(_.timestamp >= timestamp) + .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks = true)) + .flatMap(i => fixRelations(i)).filter(i => i != null) + .write.mode(SaveMode.Overwrite).save(dataciteOAFPath(workingPath)) + } + total } - - /** - * This method convert an Instance of OAF Result into - * Scholix Resource - * @param r The input Result - * @return The Scholix Resource - */ - def resultToScholixResource(r:Result):ScholixResource = { - ScholixUtils.generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r)) - } - - /** * After added the new ScholixResource, we need to update the scholix Pid Map * to intersected with the new Datacite Relations @@ -135,11 +154,11 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L def generatePidMap(workingPath:String, spark:SparkSession ) :Unit = { implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] import spark.implicits._ - spark.read.load(scholixResourcePath(workingPath)).as[(String,ScholixResource)] + spark.read.load(s"${scholixResourcePath(workingPath)}_graph").as[ScholixResource] .flatMap(r=> - r._2.getIdentifier.asScala + r.getIdentifier.asScala .map(i =>DHPUtils.generateUnresolvedIdentifier(i.getIdentifier, i.getSchema)) - .map((_, r._1)) + .map(t =>(t, r.getDnetIdentifier)) )(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .groupByKey(_._1) .reduceGroups((a,b) => if (a!= null && a._2!= null) a else b) @@ -147,27 +166,6 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L .write.mode(SaveMode.Overwrite).save(pidMapPath(workingPath)) } - /** - * This method convert the new Datacite Resource into Scholix Resource - * Needed to fill the source and the type of Scholix Relationships - * @param workingPath the Working Path - * @param spark The spark Session - */ - def addMissingScholixResource(workingPath:String, spark:SparkSession ) :Unit = { - implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] - implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] - - spark.read.load(dataciteOAFPath(workingPath)).as[Oaf] - .filter(_.isInstanceOf[Result]) - .map(_.asInstanceOf[Result]) - .map(resultToScholixResource) - .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0) - .map(r=> (r.getIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder)) - .write.mode(SaveMode.Append).save(scholixResourcePath(workingPath)) - } - - /** * This method resolve the datacite relation and filter the resolved * relation @@ -222,25 +220,26 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L implicit val scholixEncoder:Encoder[Scholix] = Encoders.kryo[Scholix] implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource] implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation] - import spark.implicits._ + implicit val intermediateEncoder :Encoder[(String,Scholix)] = Encoders.tuple(Encoders.STRING, scholixEncoder) - val relationss:Dataset[(String, Relation)] = spark.read.load(s"$workingPath/ResolvedRelation").as[Relation].map(r =>(r.getSource,r))(Encoders.tuple(Encoders.STRING, relationEncoder)) - val id_summary:Dataset[(String,ScholixResource)] = spark.read.load(scholixResourcePath(workingPath)).as[(String,ScholixResource)] + val relations:Dataset[(String, Relation)] = spark.read.load(resolvedRelationPath(workingPath)).as[Relation].map(r =>(r.getSource,r))(Encoders.tuple(Encoders.STRING, relationEncoder)) - relationss.joinWith(id_summary, relationss("_1").equalTo(id_summary("_1")),"inner") - .map(t => (t._1._2.getTarget,ScholixUtils.scholixFromSource(t._1._2, t._2._2)))(Encoders.tuple(Encoders.STRING, scholixEncoder)) - .write.mode(SaveMode.Overwrite) - .save(s"$workingPath/scholix_one_verse") + val id_summary:Dataset[(String,ScholixResource)] = spark.read.load(s"${scholixResourcePath(workingPath)}_graph").as[ScholixResource].map(r => (r.getDnetIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder)) - val source_scholix:Dataset[(String,Scholix)] = spark.read.load(s"$workingPath/scholix_one_verse").as[(String,Scholix)] + id_summary.cache() + + relations.joinWith(id_summary, relations("_1").equalTo(id_summary("_1")),"inner") + .map(t => (t._1._2.getTarget,ScholixUtils.scholixFromSource(t._1._2, t._2._2))) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/scholix_one_verse") + + val source_scholix:Dataset[(String, Scholix)] =spark.read.load(s"$workingPath/scholix_one_verse").as[(String,Scholix)] source_scholix.joinWith(id_summary, source_scholix("_1").equalTo(id_summary("_1")),"inner") .map(t => { - val target = t._2._2 - val scholix = t._1._2 - scholix.setTarget(target) - scholix + val target:ScholixResource =t._2._2 + val scholix:Scholix = t._1._2 + ScholixUtils.generateCompleteScholix(scholix,target) })(scholixEncoder).write.mode(SaveMode.Overwrite).save(s"$workingPath/scholix") } @@ -259,7 +258,7 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L val datacitePath = parser.get("datacitePath") log.info(s"DatacitePath is '$datacitePath'") - val workingPath = parser.get("workingPath") + val workingPath = parser.get("workingSupportPath") log.info(s"workingPath is '$workingPath'") val isLookupUrl: String = parser.get("isLookupUrl") @@ -279,13 +278,28 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L log.info("Retrieve last entities collected From starting from scholix Graph") lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities") } + else { + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + fs.delete(new Path(s"${scholixResourcePath(workingPath)}_native"), true) + fs.rename(new Path(s"${scholixResourcePath(workingPath)}_graph"), new Path(s"${scholixResourcePath(workingPath)}_native")) + lastCollectionDate = retrieveLastCollectedFrom(spark, dataciteOAFPath(workingPath)) + } - getDataciteUpdate(datacitePath, lastCollectionDate, workingPath, spark, vocabularies) - addMissingScholixResource(workingPath,spark) - generatePidMap(workingPath, spark) - resolveUpdateRelation(workingPath,spark) - generateScholixUpdate(workingPath, spark) - - + val numRecords = getDataciteUpdate(datacitePath, lastCollectionDate, workingPath, spark, vocabularies) + if (numRecords>0) { + addMissingScholixResource(workingPath,spark) + generatePidMap(workingPath, spark) + resolveUpdateRelation(workingPath,spark) + generateScholixUpdate(workingPath, spark) + } + } +} + + +object SparkRetrieveDataciteDelta { + val log: Logger = LoggerFactory.getLogger(SparkRetrieveDataciteDelta.getClass) + + def main(args: Array[String]): Unit = { + new SparkRetrieveDataciteDelta("/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json", args, log).initialize().run() } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java index 707462f24..23909fd9a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java @@ -33,13 +33,14 @@ public class PrepareInfo implements Serializable { private static final Logger log = LoggerFactory.getLogger(PrepareInfo.class); // associate orgs with all their parent - private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet " + + private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet " + + "FROM relation " + "WHERE lower(relclass) = '" + ModelConstants.IS_PARENT_OF.toLowerCase() + "' and datainfo.deletedbyinference = false " + "GROUP BY target"; - //associates results with all the orgs they are affiliated to + // associates results with all the orgs they are affiliated to private static final String RESULT_ORGANIZATION_QUERY = "SELECT source key, collect_set(target) as valueSet " + "FROM relation " + "WHERE lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() + @@ -88,7 +89,7 @@ public class PrepareInfo implements Serializable { childParentPath, leavesPath, resultOrganizationPath, - relationPath)); + relationPath)); } private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath, @@ -113,13 +114,13 @@ public class PrepareInfo implements Serializable { .json(resultOrganizationPath); relation - .filter( - (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(relationPath); + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(relationPath); Dataset children = spark .sql( diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java index e26276f53..9ceebf222 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.resulttoorganizationfromsemrel; import static eu.dnetlib.dhp.PropagationConstant.*; - import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.io.Serializable; @@ -22,13 +21,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.KeyValueSet; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Relation; - public class SparkResultToOrganizationFromSemRel implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class); private static final int MAX_ITERATION = 5; @@ -201,6 +198,4 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { .json(outputPath); } - - } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java index 02444cb15..1adbbe60e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java @@ -75,8 +75,8 @@ public class StepActions implements Serializable { ret.setValueSet(orgs); return ret; }, Encoders.bean(KeyValueSet.class)) - .write() - .mode(SaveMode.Overwrite) + .write() + .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } @@ -179,7 +179,6 @@ public class StepActions implements Serializable { "GROUP BY resId") .as(Encoders.bean(KeyValueSet.class)); - // create new relations from result to organization for each result linked to a leaf return resultParent .flatMap( @@ -200,7 +199,6 @@ public class StepActions implements Serializable { .iterator(), Encoders.bean(Relation.class)); - } } diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java index 21d99321b..2d2668db3 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java @@ -84,7 +84,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + "-relationPath", workingDir.toString() + "/relation" }); @@ -229,7 +229,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + "-relationPath", workingDir.toString() + "/relation" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -335,34 +335,35 @@ public class PrepareInfoJobTest { } @Test - public void relationTest()throws Exception { + public void relationTest() throws Exception { PrepareInfo - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-graphPath", getClass() - .getResource( - "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest") - .getPath(), - "-hive_metastore_uris", "", - "-leavesPath", workingDir.toString() + "/currentIteration/", - "-resultOrgPath", workingDir.toString() + "/resultOrganization/", - "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-graphPath", getClass() + .getResource( + "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest") + .getPath(), + "-hive_metastore_uris", "", + "-leavesPath", workingDir.toString() + "/currentIteration/", + "-resultOrgPath", workingDir.toString() + "/resultOrganization/", + "-childParentPath", workingDir.toString() + "/childParentOrg/", + "-relationPath", workingDir.toString() + "/relation" - }); + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); Assertions.assertEquals(7, verificationDs.count()); } + @Test public void resultOrganizationTest1() throws Exception { @@ -378,7 +379,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + "-relationPath", workingDir.toString() + "/relation" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -512,7 +513,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + "-relationPath", workingDir.toString() + "/relation" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -539,7 +540,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + "-relationPath", workingDir.toString() + "/relation" }); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json index 4b15da623..890570a0b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json @@ -1,6 +1,7 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}, - {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true} + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"su", "paramLongName":"scholixUpdatePath", "paramDescription": "the scholix updated Path", "paramRequired": false}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}, + {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml index 85c0d486d..e46e59cc0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml @@ -90,68 +90,6 @@ --relationPath${targetPath}/relation - - - - - - - - - - - - - - - - yarn - cluster - Serialize scholix to JSON - eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=6000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${targetPath}/provision/scholix/scholix - --targetPath${targetPath}/index/scholix_json - --objectTypescholix - - - - - - - - - yarn - cluster - Serialize summary to JSON - eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=6000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${targetPath}/provision/summaries_filtered - --targetPath${targetPath}/index/summaries_json - --objectTypesummary - diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json deleted file mode 100644 index 98e8daa18..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json +++ /dev/null @@ -1,158 +0,0 @@ -{ - "cites":{ - "original":"Cites", - "inverse":"IsCitedBy" - }, - "compiles":{ - "original":"Compiles", - "inverse":"IsCompiledBy" - }, - "continues":{ - "original":"Continues", - "inverse":"IsContinuedBy" - }, - "derives":{ - "original":"IsSourceOf", - "inverse":"IsDerivedFrom" - }, - "describes":{ - "original":"Describes", - "inverse":"IsDescribedBy" - }, - "documents":{ - "original":"Documents", - "inverse":"IsDocumentedBy" - }, - "hasmetadata":{ - "original":"HasMetadata", - "inverse":"IsMetadataOf" - }, - "hasassociationwith":{ - "original":"HasAssociationWith", - "inverse":"HasAssociationWith" - }, - "haspart":{ - "original":"HasPart", - "inverse":"IsPartOf" - }, - "hasversion":{ - "original":"HasVersion", - "inverse":"IsVersionOf" - }, - "iscitedby":{ - "original":"IsCitedBy", - "inverse":"Cites" - }, - "iscompiledby":{ - "original":"IsCompiledBy", - "inverse":"Compiles" - }, - "iscontinuedby":{ - "original":"IsContinuedBy", - "inverse":"Continues" - }, - "isderivedfrom":{ - "original":"IsDerivedFrom", - "inverse":"IsSourceOf" - }, - "isdescribedby":{ - "original":"IsDescribedBy", - "inverse":"Describes" - }, - "isdocumentedby":{ - "original":"IsDocumentedBy", - "inverse":"Documents" - }, - "isidenticalto":{ - "original":"IsIdenticalTo", - "inverse":"IsIdenticalTo" - }, - "ismetadatafor":{ - "original":"IsMetadataFor", - "inverse":"IsMetadataOf" - }, - "ismetadataof":{ - "original":"IsMetadataOf", - "inverse":"IsMetadataFor" - }, - "isnewversionof":{ - "original":"IsNewVersionOf", - "inverse":"IsPreviousVersionOf" - }, - "isobsoletedby":{ - "original":"IsObsoletedBy", - "inverse":"Obsoletes" - }, - "isoriginalformof":{ - "original":"IsOriginalFormOf", - "inverse":"IsVariantFormOf" - }, - "ispartof":{ - "original":"IsPartOf", - "inverse":"HasPart" - }, - "ispreviousversionof":{ - "original":"IsPreviousVersionOf", - "inverse":"IsNewVersionOf" - }, - "isreferencedby":{ - "original":"IsReferencedBy", - "inverse":"References" - }, - "isrelatedto":{ - "original":"IsRelatedTo", - "inverse":"IsRelatedTo" - }, - "isrequiredby":{ - "original":"IsRequiredBy", - "inverse":"Requires" - }, - "isreviewedby":{ - "original":"IsReviewedBy", - "inverse":"Reviews" - }, - "issourceof":{ - "original":"IsSourceOf", - "inverse":"IsDerivedFrom" - }, - "issupplementedby":{ - "original":"IsSupplementedBy", - "inverse":"IsSupplementTo" - }, - "issupplementto":{ - "original":"IsSupplementTo", - "inverse":"IsSupplementedBy" - }, - "isvariantformof":{ - "original":"IsVariantFormOf", - "inverse":"IsOriginalFormOf" - }, - "isversionof":{ - "original":"IsVersionOf", - "inverse":"HasVersion" - }, - "obsoletes":{ - "original":"Obsoletes", - "inverse":"IsObsoletedBy" - }, - "references":{ - "original":"References", - "inverse":"IsReferencedBy" - }, - "requires":{ - "original":"Requires", - "inverse":"IsRequiredBy" - }, - "related":{ - "original":"IsRelatedTo", - "inverse":"IsRelatedTo" - }, - "reviews":{ - "original":"Reviews", - "inverse":"IsReviewedBy" - }, - "unknown":{ - "original":"Unknown", - "inverse":"Unknown" - } -} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml new file mode 100644 index 000000000..6fb2a1253 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml @@ -0,0 +1,10 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml new file mode 100644 index 000000000..2844d7baa --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml @@ -0,0 +1,83 @@ + + + + scholixUpdatePath + the working dir base path of the scholix updated + + + targetPath + the final graph path + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + yarn + cluster + Serialize scholix to JSON + eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=6000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/provision/scholix/scholix + --targetPath${targetPath}/index/scholix_json + --scholixUpdatePath${scholixUpdatePath} + --objectTypescholix + + + + + + + + + yarn + cluster + Serialize summary to JSON + eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=6000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/provision/summaries_filtered + --targetPath${targetPath}/index/summaries_json + --objectTypesummary + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala index cc1b97fd6..0c54de7c8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala @@ -30,6 +30,9 @@ object SparkConvertObjectToJson { log.info(s"targetPath -> $targetPath") val objectType = parser.get("objectType") log.info(s"objectType -> $objectType") + val scholixUpdatePath = parser.get("scholixUpdatePath") + log.info(s"scholixUpdatePath -> $scholixUpdatePath") + implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix] @@ -42,7 +45,8 @@ object SparkConvertObjectToJson { case "scholix" => log.info("Serialize Scholix") val d: Dataset[Scholix] = spark.read.load(sourcePath).as[Scholix] - d.map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.repartition(6000).saveAsTextFile(targetPath, classOf[GzipCodec]) + val u :Dataset[Scholix]= spark.read.load(s"$scholixUpdatePath/scholix").as[Scholix] + d.union(u).repartition(8000).map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.saveAsTextFile(targetPath, classOf[GzipCodec]) case "summary" => log.info("Serialize Summary") val d: Dataset[ScholixSummary] = spark.read.load(sourcePath).as[ScholixSummary]