From 2ca0a436ad60f8110c09379b7c365cdfcfcf0480 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 11 Nov 2021 10:25:42 +0100 Subject: [PATCH] added SparkResolveEntities node to the oozie wf --- .../resolution/SparkResolveEntities.scala | 17 ++++++++-- .../graph/resolution/oozie_app/workflow.xml | 32 +++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala index 60df58e45f..afd195ed04 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala @@ -2,9 +2,11 @@ package eu.dnetlib.dhp.oa.graph.resolution import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.HdfsSupport import eu.dnetlib.dhp.schema.common.EntityType import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset} import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} @@ -34,13 +36,22 @@ object SparkResolveEntities { val unresolvedPath = parser.get("unresolvedPath") log.info(s"unresolvedPath -> $unresolvedPath") + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + fs.mkdirs(new Path(workingPath)) resolveEntities(spark, workingPath, unresolvedPath) - - generateResolvedEntities(spark, workingPath, graphBasePath) - } + // TO BE conservative we keep the original entities in the working dir + // and save the resolved entities on the graphBasePath + //In future these lines of code should be removed + entities.foreach { + e => + fs.rename(new Path(s"$graphBasePath/$e"), new Path(s"$workingPath/${e}_old")) + fs.rename(new Path(s"$workingPath/resolvedGraph/$e"), new Path(s"$graphBasePath/$e")) + } + +} def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml index 52f0e6e9d0..4773fc87c6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml @@ -4,6 +4,10 @@ graphBasePath the path of the graph + + unresolvedPath + the path of the unresolved Entities + @@ -36,5 +40,33 @@ + + + + yarn + cluster + Resolve Relations in raw graph + eu.dnetlib.dhp.oa.graph.resolution.SparkResolveEntities + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=10000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --graphBasePath${graphBasePath} + --unresolvedPath${unresolvedPath} + --workingPath${workingDir} + + + + + + \ No newline at end of file