diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 2e96b35632..a66ab431c7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.dedup; import java.io.IOException; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -34,7 +35,7 @@ public class SparkCreateSimRels extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); - public static final int NUM_PARTITIONS = 10000; + public static final int NUM_PARTITIONS = 1000; public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -63,7 +64,12 @@ public class SparkCreateSimRels extends AbstractSparkAction { final String isLookUpUrl = parser.get("isLookUpUrl"); final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + log.info("numPartitions: '{}'", numPartitions); log.info("graphBasePath: '{}'", graphBasePath); log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("actionSetId: '{}'", actionSetId); @@ -83,7 +89,7 @@ public class SparkCreateSimRels extends AbstractSparkAction { JavaPairRDD mapDocuments = sc .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .repartition(NUM_PARTITIONS) + .repartition(numPartitions) .mapToPair( (PairFunction) s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); @@ -93,13 +99,13 @@ public class SparkCreateSimRels extends AbstractSparkAction { // create blocks for deduplication JavaPairRDD blocks = Deduper .createSortedBlocks(mapDocuments, dedupConf) - .repartition(NUM_PARTITIONS); + .repartition(numPartitions); // create relations by comparing only elements in the same group Deduper .computeRelations(sc, blocks, dedupConf) .map(t -> createSimRel(t._1(), t._2(), entity)) - .repartition(NUM_PARTITIONS) + .repartition(numPartitions) .map(r -> OBJECT_MAPPER.writeValueAsString(r)) .saveAsTextFile(outputPath); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json index ce38dc6f00..09f4365d34 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json @@ -22,5 +22,11 @@ "paramLongName": "workingPath", "paramDescription": "path of the working directory", "paramRequired": true + }, + { + "paramName": "np", + "paramLongName": "numPartitions", + "paramDescription": "number of partitions for the similarity relations intermediate phases", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index 298a248e3b..c42ce12639 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -20,6 +20,10 @@ dedupGraphPath path for the output graph + + cutConnectedComponent + max number of elements in a connected component + sparkDriverMemory memory for driver process @@ -106,10 +110,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --i${graphBasePath} - --la${isLookUpUrl} - --asi${actionSetId} - --w${workingPath} + --graphBasePath${graphBasePath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --workingPath${workingPath} + --numPartitions8000 @@ -132,10 +137,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --i${graphBasePath} - --w${workingPath} - --la${isLookUpUrl} - --asi${actionSetId} + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --cutConnectedComponent${cutConnectedComponent} @@ -158,10 +164,10 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --i${graphBasePath} - --w${workingPath} - --la${isLookUpUrl} - --asi${actionSetId} + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} @@ -184,9 +190,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --i${graphBasePath} - --w${workingPath} - --o${dedupGraphPath} + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --dedupGraphPath${dedupGraphPath} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json index c91f3c04bc..6a2a48746b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json @@ -1,17 +1,17 @@ [ -{ - "paramName": "i", - "paramLongName": "graphBasePath", - "paramDescription": "the base path of raw graph", - "paramRequired": true -}, -{ - "paramName": "w", - "paramLongName": "workingPath", - "paramDescription": "the working directory path", - "paramRequired": true -}, -{ + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "the working directory path", + "paramRequired": true + }, + { "paramName": "o", "paramLongName": "dedupGraphPath", "paramDescription": "the path of the dedup graph",