diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java new file mode 100644 index 000000000..c2bcf69f0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java @@ -0,0 +1,115 @@ +package eu.dnetlib.dhp.oa.graph.raw; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.raw.common.RelationIdMapping; +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.FileNotFoundException; +import java.util.Objects; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class PatchRelationsApplication { + + private static final Logger log = LoggerFactory.getLogger(PatchRelationsApplication.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Optional.ofNullable( + PatchRelationsApplication.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/patch_relations_parameters.json")) + .orElseThrow(FileNotFoundException::new) + )); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphBasePath = parser.get("graphBasePath"); + log.info("graphBasePath: {}", graphBasePath); + + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); + + final String idMappingPath = parser.get("idMappingPath"); + log.info("idMappingPath: {}", idMappingPath); + + final SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> patchRelations(spark, graphBasePath, workingDir, idMappingPath)); + } + + /** + * Substitutes the identifiers (source/target) from the set of relations part of the graphBasePath included in the + * mapping provided by the dataset stored on idMappingPath, using workingDir as intermediate storage location. + * + * @param spark the SparkSession + * @param graphBasePath base graph path providing the set of relations to patch + * @param workingDir intermediate storage location + * @param idMappingPath dataset providing the old -> new identifier mapping + */ + private static void patchRelations(final SparkSession spark, final String graphBasePath, final String workingDir, final String idMappingPath) { + + final String relationPath = graphBasePath + "/relation"; + + final Dataset rels = Utils.readPath(spark, relationPath, Relation.class); + final Dataset idMapping = Utils.readPath(spark, idMappingPath, RelationIdMapping.class); + + rels + .joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "left") + .map((MapFunction, Relation>) t -> { + final Relation r = t._1(); + Optional.ofNullable(t._2()) + .map(RelationIdMapping::getNewId) + .ifPresent(r::setSource); + return r; + }, Encoders.bean(Relation.class)) + .joinWith(idMapping, rels.col("target").equalTo(idMapping.col("oldId")), "left") + .map((MapFunction, Relation>) t -> { + final Relation r = t._1(); + Optional.ofNullable(t._2()) + .map(RelationIdMapping::getNewId) + .ifPresent(r::setTarget); + return r; + }, Encoders.bean(Relation.class)) + .map( + (MapFunction) OBJECT_MAPPER::writeValueAsString, + Encoders.STRING()) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .text(workingDir); + + spark.read().textFile(workingDir) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .text(relationPath); + } + + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/RelationIdMapping.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/RelationIdMapping.java new file mode 100644 index 000000000..f251da8c3 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/RelationIdMapping.java @@ -0,0 +1,24 @@ +package eu.dnetlib.dhp.oa.graph.raw.common; + +public class RelationIdMapping { + + private String oldId; + + private String newId; + + public String getOldId() { + return oldId; + } + + public void setOldId(final String oldId) { + this.oldId = oldId; + } + + public String getNewId() { + return newId; + } + + public void setNewId(final String newId) { + this.newId = newId; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/patch_relations_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/patch_relations_parameters.json new file mode 100644 index 000000000..178c2d69b --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/patch_relations_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "g", + "paramLongName": "graphBasePath", + "paramDescription": "base graph path providing the set of relations to patch", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingDir", + "paramDescription": "intermediate storage location", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "idMappingPath", + "paramDescription": "dataset providing the old -> new identifier mapping", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 7f1ecb39f..e7320de3b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -100,6 +100,16 @@ a blacklist of nsprefixes (comma separeted) + + shouldPatchRelations + false + activates the relation patching phase, driven by the content in ${idMappingPath} + + + idMappingPath + + path pointing to the relations identifiers mapping dataset + sparkDriverMemory memory for driver process @@ -538,7 +548,42 @@ - + + + + + + ${(shouldPatchRelations eq "true") and + (fs:exists(concat(concat(wf:conf('nameNode'),'/'),wf:conf('idMappingPath'))) eq "true")} + + + + + + + + yarn + cluster + PatchRelations + eu.dnetlib.dhp.oa.graph.raw.PatchRelationsApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --graphBasePath${workingDir}/graph_raw + --workingDir${workingDir}/patch_relations + --idMappingPath${idMappingPath} + + + +