From dfd6205b953cb9c75c26af415d31fdbd00d31e3b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 25 Nov 2020 14:55:32 +0100 Subject: [PATCH] Consistency graph workflow merges all the entities by ID --- .../dhp/schema/oaf/OafMapperUtils.java | 30 +- .../oa/dedup/DispatchEntitiesSparkJob.java | 93 ++++++ .../dhp/oa/dedup/GroupEntitiesSparkJob.java | 202 +++++++++++++ .../dhp/oa/dedup/SparkPropagateRelation.java | 13 +- .../dedup/consistency/oozie_app/workflow.xml | 280 ++++++++++++------ .../dedup/dispatch_entities_parameters.json | 26 ++ .../group_graph_entities_parameters.json | 0 .../dedup/propagateRelation_parameters.json | 2 +- 8 files changed, 539 insertions(+), 107 deletions(-) create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DispatchEntitiesSparkJob.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/dispatch_entities_parameters.json rename dhp-workflows/{dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph => dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup}/group_graph_entities_parameters.json (100%) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java index 301afacce..6e5857505 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java @@ -19,24 +19,24 @@ public class OafMapperUtils { public static Oaf merge(final Oaf o1, final Oaf o2) { if (ModelSupport.isSubClass(o1, OafEntity.class)) { - if (ModelSupport.isSubClass(o1, Result.class)) { - - return mergeResults((Result) o1, (Result) o2); - } else if (ModelSupport.isSubClass(o1, Datasource.class)) { - ((Datasource) o1).mergeFrom((Datasource) o2); - } else if (ModelSupport.isSubClass(o1, Organization.class)) { - ((Organization) o1).mergeFrom((Organization) o2); - } else if (ModelSupport.isSubClass(o1, Project.class)) { - ((Project) o1).mergeFrom((Project) o2); - } else { - throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName()); - } + return mergeEntities((OafEntity) o1, (OafEntity) o2); } else if (ModelSupport.isSubClass(o1, Relation.class)) { ((Relation) o1).mergeFrom((Relation) o2); - } else { - throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName()); } - return o1; + throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName()); + } + + public static OafEntity mergeEntities(OafEntity e1, OafEntity e2) { + if (ModelSupport.isSubClass(e1, Result.class)) { + return mergeResults((Result) e1, (Result) e2); + } else if (ModelSupport.isSubClass(e1, Datasource.class)) { + ((Datasource) e1).mergeFrom((Datasource) e2); + } else if (ModelSupport.isSubClass(e1, Organization.class)) { + ((Organization) e1).mergeFrom((Organization) e2); + } else if (ModelSupport.isSubClass(e1, Project.class)) { + ((Project) e1).mergeFrom((Project) e2); + } + throw new RuntimeException("invalid OafEntity subtype:" + e1.getClass().getCanonicalName()); } public static Result mergeResults(Result r1, Result r2) { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DispatchEntitiesSparkJob.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DispatchEntitiesSparkJob.java new file mode 100644 index 000000000..5506b5470 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DispatchEntitiesSparkJob.java @@ -0,0 +1,93 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; + +public class DispatchEntitiesSparkJob { + + private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + DispatchEntitiesSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/dispatch_entities_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + dispatchEntities(spark, inputPath, entityClazz, outputPath); + }); + } + + private static void dispatchEntities( + SparkSession spark, + String inputPath, + Class clazz, + String outputPath) { + + spark + .read() + .textFile(inputPath) + .filter((FilterFunction) s -> isEntityType(s, clazz)) + .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), + Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + + private static boolean isEntityType(final String s, final Class clazz) { + return StringUtils.substringBefore(s, "|").equals(clazz.getName()); + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java new file mode 100644 index 000000000..5835617fb --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java @@ -0,0 +1,202 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; + +/** + * Groups the graph content by entity identifier to ensure ID uniqueness + */ +public class GroupEntitiesSparkJob { + + private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); + + private final static String ID_JPATH = "$.id"; + + private final static String SOURCE_JPATH = "$.source"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + GroupEntitiesSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/group_graph_entities_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String graphInputPath = parser.get("graphInputPath"); + log.info("graphInputPath: {}", graphInputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + groupEntities(spark, graphInputPath, outputPath); + }); + } + + private static void groupEntities( + SparkSession spark, + String inputPath, + String outputPath) { + + final TypedColumn aggregator = new GroupingAggregator().toColumn(); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + spark + .read() + .textFile(toSeq(listPaths(inputPath, sc))) + .map((MapFunction) s -> parseOaf(s), Encoders.kryo(OafEntity.class)) + .filter((FilterFunction) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e))) + .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) + .agg(aggregator) + .map( + (MapFunction, String>) t -> t._2().getClass().getName() + + "|" + OBJECT_MAPPER.writeValueAsString(t._2()), + Encoders.STRING()) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .text(outputPath); + } + + public static class GroupingAggregator extends Aggregator { + + @Override + public OafEntity zero() { + return null; + } + + @Override + public OafEntity reduce(OafEntity b, OafEntity a) { + return mergeAndGet(b, a); + } + + private OafEntity mergeAndGet(OafEntity b, OafEntity a) { + if (Objects.nonNull(a) && Objects.nonNull(b)) { + return OafMapperUtils.mergeEntities(b, a); + } + return Objects.isNull(a) ? b : a; + } + + @Override + public OafEntity merge(OafEntity b, OafEntity a) { + return mergeAndGet(b, a); + } + + @Override + public OafEntity finish(OafEntity j) { + return j; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(OafEntity.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(OafEntity.class); + } + + } + + private static OafEntity parseOaf(String s) { + + DocumentContext dc = JsonPath + .parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)); + final String id = dc.read(ID_JPATH); + if (StringUtils.isNotBlank(id)) { + + String prefix = StringUtils.substringBefore(id, "|"); + switch (prefix) { + case "10": + return parse(s, Datasource.class); + case "20": + return parse(s, Organization.class); + case "40": + return parse(s, Project.class); + case "50": + String resultType = dc.read("$.resulttype.classid"); + switch (resultType) { + case "publication": + return parse(s, Publication.class); + case "dataset": + return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); + case "software": + return parse(s, Software.class); + case "other": + return parse(s, OtherResearchProduct.class); + default: + throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType)); + } + default: + throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix)); + } + } else { + throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s)); + } + } + + private static OafEntity parse(String s, Class clazz) { + try { + return OBJECT_MAPPER.readValue(s, clazz); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static List listPaths(String inputPath, JavaSparkContext sc) { + return HdfsSupport + .listFiles(inputPath, sc.hadoopConfiguration()) + .stream() + .filter(f -> !f.equals("relation")) + .collect(Collectors.toList()); + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 699039c99..1cd1545cd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -3,6 +3,8 @@ package eu.dnetlib.dhp.oa.dedup; import static org.apache.spark.sql.functions.col; +import java.util.Objects; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -28,7 +30,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { SOURCE, TARGET } - public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) throws Exception { + public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } @@ -55,13 +57,13 @@ public class SparkPropagateRelation extends AbstractSparkAction { final String graphBasePath = parser.get("graphBasePath"); final String workingPath = parser.get("workingPath"); - final String dedupGraphPath = parser.get("dedupGraphPath"); + final String graphOutputPath = parser.get("graphOutputPath"); log.info("graphBasePath: '{}'", graphBasePath); log.info("workingPath: '{}'", workingPath); - log.info("dedupGraphPath: '{}'", dedupGraphPath); + log.info("graphOutputPath: '{}'", graphOutputPath); - final String outputRelationPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); + final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation"); removeOutputDir(spark, outputRelationPath); Dataset mergeRels = spark @@ -101,7 +103,8 @@ public class SparkPropagateRelation extends AbstractSparkAction { newRels .union(updated) .union(mergeRels) - .map((MapFunction) r -> r, Encoders.kryo(Relation.class))), + .map((MapFunction) r -> r, Encoders.kryo(Relation.class))) + .filter((FilterFunction) r -> !Objects.equals(r.getSource(), r.getTarget())), outputRelationPath, SaveMode.Overwrite); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index 926287032..71690a0ed 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -2,15 +2,15 @@ graphBasePath - the raw graph base path + the input graph base path workingPath path of the working directory - dedupGraphPath - path of the dedup graph + graphOutputPath + path of the output graph sparkDriverMemory @@ -91,116 +91,224 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --i${graphBasePath} - --o${dedupGraphPath} - --w${workingPath} + --graphBasePath${graphBasePath} + --o${graphOutputPath} + --workingPath${workingPath} - + - - - - - - - - + + + yarn + cluster + group graph entities + eu.dnetlib.dhp.oa.dedup.GroupEntitiesSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --graphInputPath${graphBasePath} + --outputPath${workingPath}/grouped_entities + + + + + + + + + + + + + - - - - - - -pb - ${graphBasePath}/datasource - ${dedupGraphPath}/datasource - - + + + yarn + cluster + Dispatch publications + eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/datasource + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + + - - - - - - -pb - ${graphBasePath}/project - ${dedupGraphPath}/project - - + + + yarn + cluster + Dispatch project + eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/project + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + + - - - - - - -pb - ${graphBasePath}/organization - ${dedupGraphPath}/organization - - + + + yarn + cluster + Dispatch organization + eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/organization + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + + - - - - - - -pb - ${graphBasePath}/publication - ${dedupGraphPath}/publication - - + + + yarn + cluster + Dispatch publication + eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + + - - - - - - -pb - ${graphBasePath}/dataset - ${dedupGraphPath}/dataset - - + + + yarn + cluster + Dispatch dataset + eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + + - - - - - - -pb - ${graphBasePath}/software - ${dedupGraphPath}/software - - + + + yarn + cluster + Dispatch software + eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + + - - - - - - -pb - ${graphBasePath}/otherresearchproduct - ${dedupGraphPath}/otherresearchproduct - - + + + yarn + cluster + Dispatch otherresearchproduct + eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${workingPath}/grouped_entities + --outputPath${graphOutputPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + + - + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/dispatch_entities_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/dispatch_entities_parameters.json new file mode 100644 index 000000000..aa8d2a7c2 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/dispatch_entities_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "i", + "paramLongName": "inputPath", + "paramDescription": "the source path", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "path of the output graph", + "paramRequired": true + }, + { + "paramName": "c", + "paramLongName": "graphTableClassName", + "paramDescription": "the graph entity class name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/group_graph_entities_parameters.json similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json rename to dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/group_graph_entities_parameters.json diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json index 6a2a48746..e2a5281ae 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json @@ -13,7 +13,7 @@ }, { "paramName": "o", - "paramLongName": "dedupGraphPath", + "paramLongName": "graphOutputPath", "paramDescription": "the path of the dedup graph", "paramRequired": true }