From 06def0c0cb7ac5f2bef1a56b9a4fad3bc9ea3415 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 13 Jul 2020 20:09:06 +0200 Subject: [PATCH] SparkBlockStats allows to repartition the input rdd via the numPartitions workflow parameter --- .../dhp/oa/dedup/AbstractSparkAction.java | 2 + .../dnetlib/dhp/oa/dedup/SparkBlockStats.java | 61 +++++++++---------- .../dhp/oa/dedup/SparkCreateSimRels.java | 2 - .../oa/dedup/createBlockStats_parameters.json | 6 ++ .../dedup/statistics/oozie_app/workflow.xml | 13 ++-- 5 files changed, 47 insertions(+), 37 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 2120da0808..74cecb7b6b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -28,6 +28,8 @@ import eu.dnetlib.pace.config.DedupConfig; abstract class AbstractSparkAction implements Serializable { + protected static final int NUM_PARTITIONS = 1000; + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java index 49f8123e87..d5de309670 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.dedup; import java.io.IOException; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -23,49 +24,41 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.model.FieldListImpl; -import eu.dnetlib.pace.model.FieldValueImpl; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.MapDocumentUtil; import scala.Tuple2; public class SparkBlockStats extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkBlockStats.class); + private static final Logger log = LoggerFactory.getLogger(SparkBlockStats.class); - public SparkBlockStats(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } + public SparkBlockStats(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkBlockStats.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json"))); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkBlockStats.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json"))); + parser.parseArgument(args); - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf - .registerKryoClasses( - new Class[] { - MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class - }); + SparkConf conf = new SparkConf(); - new SparkBlockStats(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } + new SparkBlockStats(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } - public Long computeComparisons(Long blockSize, Long slidingWindowSize){ + public Long computeComparisons(Long blockSize, Long slidingWindowSize) { - if (slidingWindowSize >= blockSize) - return (slidingWindowSize * (slidingWindowSize - 1)) / 2; - else { - return (blockSize - slidingWindowSize + 1) * (slidingWindowSize * (slidingWindowSize - 1)) / 2; - } - } + if (slidingWindowSize >= blockSize) + return (slidingWindowSize * (slidingWindowSize - 1)) / 2; + else { + return (blockSize - slidingWindowSize + 1) * (slidingWindowSize * (slidingWindowSize - 1)) / 2; + } + } @Override public void run(ISLookUpService isLookUpService) @@ -76,6 +69,10 @@ public class SparkBlockStats extends AbstractSparkAction { final String isLookUpUrl = parser.get("isLookUpUrl"); final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); log.info("graphBasePath: '{}'", graphBasePath); log.info("isLookUpUrl: '{}'", isLookUpUrl); @@ -95,6 +92,7 @@ public class SparkBlockStats extends AbstractSparkAction { JavaPairRDD mapDocuments = sc .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .repartition(numPartitions) .mapToPair( (PairFunction) s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); @@ -105,6 +103,7 @@ public class SparkBlockStats extends AbstractSparkAction { JavaPairRDD blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); JavaRDD blockStats = blocks + .repartition(numPartitions) .map( b -> new BlockStats( b._1(), diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 3beb90e0b3..b3ee47bfc2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -35,8 +35,6 @@ public class SparkCreateSimRels extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); - public static final int NUM_PARTITIONS = 1000; - public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json index ce38dc6f00..09f4365d34 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json @@ -22,5 +22,11 @@ "paramLongName": "workingPath", "paramDescription": "path of the working directory", "paramRequired": true + }, + { + "paramName": "np", + "paramLongName": "numPartitions", + "paramDescription": "number of partitions for the similarity relations intermediate phases", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/workflow.xml index 8b75d16b3d..c0080b028f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/workflow.xml @@ -12,6 +12,10 @@ actionSetId id of the actionSet + + numPartitions + number of partitions for the similarity relations intermediate phases + sparkDriverMemory memory for driver process @@ -90,10 +94,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --i${graphBasePath} - --la${isLookUpUrl} - --asi${actionSetId} - --w${workingDir} + --graphBasePath${graphBasePath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --workingPath${workingDir} + --numPartitions${numPartitions}