diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java
index b723de9554..5e37938c7e 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java
@@ -1,11 +1,15 @@
package eu.dnetlib.dhp.oa.graph.merge;
+import static eu.dnetlib.dhp.common.GraphSupport.deleteGraphTable;
+import static eu.dnetlib.dhp.common.GraphSupport.saveGraphTable;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional;
+import eu.dnetlib.dhp.common.GraphFormat;
+import eu.dnetlib.dhp.common.GraphSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@@ -20,7 +24,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
@@ -35,8 +38,6 @@ public class MergeGraphSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
public static void main(String[] args) throws Exception {
@@ -60,19 +61,31 @@ public class MergeGraphSparkJob {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
- String betaInputPath = parser.get("betaInputPath");
- log.info("betaInputPath: {}", betaInputPath);
+ String betaInputGraph = parser.get("betaInputGraph");
+ log.info("betaInputGraph: {}", betaInputGraph);
- String prodInputPath = parser.get("prodInputPath");
- log.info("prodInputPath: {}", prodInputPath);
+ String prodInputGraph = parser.get("prodInputGraph");
+ log.info("prodInputGraph: {}", prodInputGraph);
- String outputPath = parser.get("outputPath");
- log.info("outputPath: {}", outputPath);
+ String outputGraph = parser.get("outputGraph");
+ log.info("outputGraph: {}", outputGraph);
+
+ GraphFormat inputGraphFormat = Optional
+ .ofNullable(parser.get("inputGraphFormat"))
+ .map(GraphFormat::valueOf)
+ .orElse(GraphFormat.DEFAULT);
+ log.info("inputGraphFormat: {}", inputGraphFormat);
+
+ GraphFormat outputGraphFormat = Optional
+ .ofNullable(parser.get("outputGraphFormat"))
+ .map(GraphFormat::valueOf)
+ .orElse(GraphFormat.DEFAULT);
+ log.info("outputGraphFormat: {}", outputGraphFormat);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
- Class extends OafEntity> entityClazz = (Class extends OafEntity>) Class.forName(graphTableClassName);
+ Class extends Oaf> clazz = (Class extends Oaf>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@@ -82,41 +95,39 @@ public class MergeGraphSparkJob {
conf,
isSparkSessionManaged,
spark -> {
- removeOutputDir(spark, outputPath);
- mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
+ deleteGraphTable(spark, clazz, outputGraph, outputGraphFormat);
+ mergeGraphTable(spark, priority, betaInputGraph, prodInputGraph, clazz, clazz, outputGraph, inputGraphFormat, outputGraphFormat);
});
}
private static
void mergeGraphTable(
- SparkSession spark,
- String priority,
- String betaInputPath,
- String prodInputPath,
- Class
p_clazz,
- Class b_clazz,
- String outputPath) {
+ SparkSession spark,
+ String priority,
+ String betaInputGraph,
+ String prodInputGraph,
+ Class
p_clazz,
+ Class b_clazz,
+ String outputGraph, GraphFormat inputGraphFormat, GraphFormat outputGraphFormat) {
- Dataset> beta = readTableFromPath(spark, betaInputPath, b_clazz);
- Dataset> prod = readTableFromPath(spark, prodInputPath, p_clazz);
+ Dataset> beta = readGraph(spark, betaInputGraph, b_clazz, inputGraphFormat);
+ Dataset> prod = readGraph(spark, prodInputGraph, p_clazz, inputGraphFormat);
- prod
- .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
- .map((MapFunction, Tuple2>, P>) value -> {
- Optional p = Optional.ofNullable(value._1()).map(Tuple2::_2);
- Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2);
- switch (priority) {
- default:
- case "BETA":
- return mergeWithPriorityToBETA(p, b);
- case "PROD":
- return mergeWithPriorityToPROD(p, b);
- }
- }, Encoders.bean(p_clazz))
- .filter((FilterFunction
) Objects::nonNull)
- .write()
- .mode(SaveMode.Overwrite)
- .option("compression", "gzip")
- .json(outputPath);
+ Dataset
merged = prod
+ .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
+ .map((MapFunction, Tuple2>, P>) value -> {
+ Optional p = Optional.ofNullable(value._1()).map(Tuple2::_2);
+ Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2);
+ switch (priority) {
+ default:
+ case "BETA":
+ return mergeWithPriorityToBETA(p, b);
+ case "PROD":
+ return mergeWithPriorityToPROD(p, b);
+ }
+ }, Encoders.bean(p_clazz))
+ .filter((FilterFunction
) Objects::nonNull);
+
+ saveGraphTable(merged, p_clazz, outputGraph, outputGraphFormat);
}
private static
P mergeWithPriorityToPROD(Optional
p, Optional b) {
@@ -139,24 +150,15 @@ public class MergeGraphSparkJob {
return null;
}
- private static Dataset> readTableFromPath(
- SparkSession spark, String inputEntityPath, Class clazz) {
+ private static Dataset> readGraph(
+ SparkSession spark, String inputGraph, Class clazz, GraphFormat inputGraphFormat) {
- log.info("Reading Graph table from: {}", inputEntityPath);
- return spark
- .read()
- .textFile(inputEntityPath)
- .map(
- (MapFunction>) value -> {
- final T t = OBJECT_MAPPER.readValue(value, clazz);
+ log.info("Reading Graph table from: {}", inputGraph);
+ return GraphSupport.readGraph(spark, inputGraph, clazz, inputGraphFormat)
+ .map((MapFunction>) t -> {
final String id = ModelSupport.idFn().apply(t);
return new Tuple2<>(id, t);
- },
- Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
- }
-
- private static void removeOutputDir(SparkSession spark, String path) {
- HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
}
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml
index 07a125fb64..dabafb7f88 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml
@@ -2,21 +2,31 @@
- betaInputGgraphPath
- the beta graph root path
+ betaInputGraph
+ the beta graph name (or path)
- prodInputGgraphPath
- the production graph root path
+ prodInputGraph
+ the production graph name (or path)
- graphOutputPath
- the output merged graph root path
+ outputGraph
+ the merged graph name (or path)
priority
decides from which infrastructure the content must win in case of ID clash
+
+ inputGraphFormat
+ HIVE
+ the input graph data format
+
+
+ outputGraphFormat
+ HIVE
+ the output graph data format
+
sparkDriverMemory
@@ -88,9 +98,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/publication
- --prodInputPath${prodInputGgraphPath}/publication
- --outputPath${graphOutputPath}/publication
+ --betaInputGraph${betaInputGraph}
+ --prodInputGraph${prodInputGraph}
+ --outputGraph${outputGraph}
+ --inputGraphFormat${inputGraphFormat}
+ --outputGraphFormat${outputGraphFormat}
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication
--priority${priority}
@@ -115,9 +127,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/dataset
- --prodInputPath${prodInputGgraphPath}/dataset
- --outputPath${graphOutputPath}/dataset
+ --betaInputGraph${betaInputGraph}
+ --prodInputGraph${prodInputGraph}
+ --outputGraph${outputGraph}
+ --inputGraphFormat${inputGraphFormat}
+ --outputGraphFormat${outputGraphFormat}
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset
--priority${priority}
@@ -142,9 +156,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/otherresearchproduct
- --prodInputPath${prodInputGgraphPath}/otherresearchproduct
- --outputPath${graphOutputPath}/otherresearchproduct
+ --betaInputGraph${betaInputGraph}
+ --prodInputGraph${prodInputGraph}
+ --outputGraph${outputGraph}
+ --inputGraphFormat${inputGraphFormat}
+ --outputGraphFormat${outputGraphFormat}
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
--priority${priority}
@@ -169,9 +185,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/software
- --prodInputPath${prodInputGgraphPath}/software
- --outputPath${graphOutputPath}/software
+ --betaInputGraph${betaInputGraph}
+ --prodInputGraph${prodInputGraph}
+ --outputGraph${outputGraph}
+ --inputGraphFormat${inputGraphFormat}
+ --outputGraphFormat${outputGraphFormat}
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software
--priority${priority}
@@ -196,9 +214,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/datasource
- --prodInputPath${prodInputGgraphPath}/datasource
- --outputPath${graphOutputPath}/datasource
+ --betaInputGraph${betaInputGraph}
+ --prodInputGraph${prodInputGraph}
+ --outputGraph${outputGraph}
+ --inputGraphFormat${inputGraphFormat}
+ --outputGraphFormat${outputGraphFormat}
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource
--priority${priority}
@@ -223,9 +243,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/organization
- --prodInputPath${prodInputGgraphPath}/organization
- --outputPath${graphOutputPath}/organization
+ --betaInputGraph${betaInputGraph}
+ --prodInputGraph${prodInputGraph}
+ --outputGraph${outputGraph}
+ --inputGraphFormat${inputGraphFormat}
+ --outputGraphFormat${outputGraphFormat}
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization
--priority${priority}
@@ -250,9 +272,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/project
- --prodInputPath${prodInputGgraphPath}/project
- --outputPath${graphOutputPath}/project
+ --betaInputGraph${betaInputGraph}
+ --prodInputGraph${prodInputGraph}
+ --outputGraph${outputGraph}
+ --inputGraphFormat${inputGraphFormat}
+ --outputGraphFormat${outputGraphFormat}
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project
--priority${priority}
@@ -277,9 +301,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
- --betaInputPath${betaInputGgraphPath}/relation
- --prodInputPath${prodInputGgraphPath}/relation
- --outputPath${graphOutputPath}/relation
+ --betaInputGraph${betaInputGraph}
+ --prodInputGraph${prodInputGraph}
+ --outputGraph${outputGraph}
+ --inputGraphFormat${inputGraphFormat}
+ --outputGraphFormat${outputGraphFormat}
--graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation
--priority${priority}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json
index 1a612807b7..67fc7d42c6 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json
@@ -7,20 +7,32 @@
},
{
"paramName": "bin",
- "paramLongName": "betaInputPath",
- "paramDescription": "the beta graph root path",
+ "paramLongName": "betaInputGraph",
+ "paramDescription": "the beta graph name (or path)",
"paramRequired": true
},
{
"paramName": "pin",
- "paramLongName": "prodInputPath",
- "paramDescription": "the production graph root path",
+ "paramLongName": "prodInputGraph",
+ "paramDescription": "the production graph name (or path)",
"paramRequired": true
},
{
"paramName": "out",
- "paramLongName": "outputPath",
- "paramDescription": "the output merged graph root path",
+ "paramLongName": "outputGraph",
+ "paramDescription": "the output merged graph (or path)",
+ "paramRequired": true
+ },
+ {
+ "paramName": "igf",
+ "paramLongName": "inputGraphFormat",
+ "paramDescription": "the format of the input graphs",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ogf",
+ "paramLongName": "outputGraphFormat",
+ "paramDescription": "the format of the output merged graph",
"paramRequired": true
},
{