diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java index 8c0efdcad7..0fc72db1e1 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java @@ -43,22 +43,17 @@ public class SparkCreateSimRels implements Serializable { //read oozie parameters final String graphBasePath = parser.get("graphBasePath"); final String isLookUpUrl = parser.get("isLookUpUrl"); - final String rawSet = parser.get("rawSet"); final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); - System.out.println(String.format("rawSet: '%s'", rawSet)); System.out.println(String.format("actionSetId: '%s'", actionSetId)); System.out.println(String.format("workingPath: '%s'", workingPath)); try (SparkSession spark = getSparkSession(parser)) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - //create empty sequenceFile for the accumulation - JavaRDD> simRel = sc.emptyRDD(); - //for each dedup configuration for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { final String entity = dedupConf.getWf().getEntityType(); @@ -83,23 +78,16 @@ public class SparkCreateSimRels implements Serializable { .write() .mode("overwrite") .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); - - if (rawSet != null) { - //create atomic actions - JavaRDD> newSimRels = relationsRDD - .map(this::createSequenceFileRow); - - simRel = simRel.union(newSimRels); - } } - - if (rawSet != null) - simRel.mapToPair(r -> r) - .saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } - } + /** + * Utility method used to create an atomic action from a Relation object + * @param relation input relation + * @return A tuple2 with [id, json serialization of the atomic action] + * @throws JsonProcessingException + */ public Tuple2 createSequenceFileRow(Relation relation) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency/oozie_app/workflow.xml index d3121ea77a..d481a6cfb8 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency/oozie_app/workflow.xml @@ -65,7 +65,7 @@ --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mtyarn-cluster + -mtyarn --i${graphBasePath} --w${workingPath} --o${dedupGraphPath} @@ -92,7 +92,7 @@ --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mtyarn-cluster + -mtyarn --i${graphBasePath} --o${dedupGraphPath} --w${workingPath} diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json index 9eb08a29b3..8cffa86dc8 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json @@ -23,12 +23,6 @@ "paramDescription": "the base path of the raw graph", "paramRequired": true }, - { - "paramName": "o", - "paramLongName": "rawSet", - "paramDescription": "the raw set to be saved (full path)", - "paramRequired": false - }, { "paramName": "w", "paramLongName": "workingPath", diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml index abd1528573..e2c7f425b5 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml @@ -4,10 +4,6 @@ graphBasePath the raw graph base path - - rawSet - the output directory in the targetPath - isLookUpUrl the address of the lookUp service @@ -58,7 +54,6 @@ - yarn @@ -101,7 +96,7 @@ --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mtyarn-cluster + -mtyarn --i${graphBasePath} --w${workingPath} --la${isLookUpUrl} @@ -129,7 +124,7 @@ --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mtyarn-cluster + -mtyarn --i${graphBasePath} --w${workingPath} --la${isLookUpUrl}