From d9e3b8993739c649b622aaa6c6c401710f8b2a31 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 23 Jul 2021 16:38:32 +0200 Subject: [PATCH] implemented last part of workflows to generate scholixGraph --- .../sx/graph/SparkConvertRDDtoDataset.scala | 43 ++++++++++++++++ .../dhp/sx/graph/SparkResolveRelation.scala | 22 ++++---- .../oozie_app/config-default.xml | 0 .../oozie_app/workflow.xml | 51 ++++--------------- .../resolverelation/oozie_app/workflow.xml | 12 ++++- 5 files changed, 75 insertions(+), 53 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/{step1 => finalGraph}/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/{step1 => finalGraph}/oozie_app/workflow.xml (77%) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala new file mode 100644 index 000000000..2cd176dee --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -0,0 +1,43 @@ +package eu.dnetlib.dhp.sx.graph +import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset} +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} +object SparkConvertRDDtoDataset { + + def main(args: Array[String]): Unit = { + val entities = List( + ("dataset", classOf[OafDataset]), + ("otherresearchproduct", classOf[OtherResearchProduct]), + ("publication", classOf[Publication]), + ("software", classOf[Software]) + ) + + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + val mapper = new ObjectMapper() + implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + + entities.foreach{ + e => + val rdd =spark.sparkContext.textFile(s"$sourcePath/${e._1}").map(s => mapper.readValue(s, e._2)) + spark.createDataset(rdd).as[Result].write.mode(SaveMode.Overwrite).save(s"$targetPath/${e._1}") + } + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index aa09311e3..0d0dc4159 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -30,7 +30,7 @@ object SparkResolveRelation { val relationPath = parser.get("relationPath") log.info(s"sourcePath -> $relationPath") val entityPath = parser.get("entityPath") - log.info(s"targetPath -> $entityPath") + log.info(s"entityPath -> $entityPath") val workingPath = parser.get("workingPath") log.info(s"workingPath -> $workingPath") @@ -48,8 +48,8 @@ object SparkResolveRelation { m => val sourceResolved = m._2 val currentRelation = m._1._2 - if (sourceResolved!=null && sourceResolved._2!=null && sourceResolved._2.nonEmpty) - currentRelation.setSource(sourceResolved._2) + if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty) + currentRelation.setSource(sourceResolved._1) currentRelation }.write .mode(SaveMode.Overwrite) @@ -61,13 +61,13 @@ object SparkResolveRelation { m => val targetResolved = m._2 val currentRelation = m._1._2 - if (targetResolved!=null && targetResolved._2.nonEmpty) - currentRelation.setTarget(targetResolved._2) + if (targetResolved!=null && targetResolved._1.nonEmpty) + currentRelation.setTarget(targetResolved._1) currentRelation }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50")) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/relation") + .save(s"$workingPath/relation_resolved") } @@ -89,16 +89,16 @@ object SparkResolveRelation { val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*") .map(i => extractPidsFromRecord(i)) - .filter(s => s != null && s._2!=null && s._2.nonEmpty) + .filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty) .flatMap{ p => p._2.map(pid => - (p._1,convertPidToDNETIdentifier(pid._1, pid._2)) + (p._1, convertPidToDNETIdentifier(pid._1, pid._2)) ) - } + }.filter(r =>r._1 != null || r._2 != null) spark.createDataset(d) - .groupByKey(_._1) - .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) + .groupByKey(_._2) + .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y) .map(s => s._2) .write .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml similarity index 77% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml index 3ea4e9d30..4e601bff3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + sourcePath @@ -6,48 +6,22 @@ targetPath - the graph Raw base path + the final graph path - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - Extract entities in raw graph - eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=2000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${sourcePath} - --targetPath${targetPath} - - - - - - - - - yarn - cluster - Resolve Relations in raw graph - eu.dnetlib.dhp.sx.graph.SparkResolveRelation + Import JSONRDD to Dataset kryo + eu.dnetlib.dhp.sx.graph.SparkConvertRDDtoDataset dhp-graph-mapper-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -60,9 +34,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn - --relationPath${targetPath}/extracted/relation - --workingPath${targetPath}/resolved/ - --entityPath${targetPath}/dedup + --sourcePath${sourcePath} + --targetPath${targetPath}/entities @@ -87,7 +60,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn - --sourcePath${targetPath}/dedup + --sourcePath${targetPath}/entities --targetPath${targetPath}/provision/summaries @@ -114,7 +87,7 @@ --masteryarn --summaryPath${targetPath}/provision/summaries --targetPath${targetPath}/provision/scholix - --relationPath${targetPath}/resolved/resolvedRelation + --relationPath${sourcePath}/relation_resolved @@ -182,9 +155,5 @@ - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml index e73b61a74..7683ff94c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml @@ -15,14 +15,24 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + yarn