From 0bd306b19615f6f5eb3509cc0bdd0867fb48116e Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 22 May 2023 10:36:24 +0200 Subject: [PATCH] WIP investigating failures in the duplicateScan phase --- .../dhp/oa/dedup/SparkCreateSimRels.java | 24 ++++++++++++------- .../dhp/oa/dedup/scan/oozie_app/workflow.xml | 20 ++++++++-------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 3aa8f241d..2d32fae41 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -7,6 +7,7 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; @@ -73,6 +74,8 @@ public class SparkCreateSimRels extends AbstractSparkAction { log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); + spark.sparkContext().setCheckpointDir(workingPath + "/checkpoint"); + // for each dedup configuration for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { @@ -85,8 +88,10 @@ public class SparkCreateSimRels extends AbstractSparkAction { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaPairRDD mapDocuments = sc - .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + final JavaRDD rdd = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)); + rdd.checkpoint(); + + JavaPairRDD mapDocuments = rdd .repartition(numPartitions) .mapToPair( (PairFunction) s -> { @@ -95,15 +100,18 @@ public class SparkCreateSimRels extends AbstractSparkAction { }); // create blocks for deduplication - JavaPairRDD blocks = Deduper - .createSortedBlocks(mapDocuments, dedupConf) - .repartition(numPartitions); + final JavaPairRDD sortedBlocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); + sortedBlocks.checkpoint(); + JavaPairRDD blocks = sortedBlocks.repartition(numPartitions); + + final JavaRDD simRelsRdd = Deduper + .computeRelations(sc, blocks, dedupConf) + .map(t -> DedupUtility.createSimRel(t._1(), t._2(), entity)); + simRelsRdd.checkpoint(); Dataset simRels = spark .createDataset( - Deduper - .computeRelations(sc, blocks, dedupConf) - .map(t -> DedupUtility.createSimRel(t._1(), t._2(), entity)) + simRelsRdd .repartition(numPartitions) .rdd(), Encoders.bean(Relation.class)); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index ba2270c8a..02fdd8431 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -126,13 +126,13 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15000 + --conf spark.sql.shuffle.partitions=3840 --graphBasePath${graphBasePath} --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetId} --workingPath${workingPath} - --numPartitions15000 + --numPartitions8000 @@ -153,14 +153,14 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15000 + --conf spark.sql.shuffle.partitions=3840 --graphBasePath${graphBasePath} --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetId} --workingPath${workingPath} --whiteListPath${whiteListPath} - --numPartitions15000 + --numPartitions8000 @@ -181,7 +181,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15000 + --conf spark.sql.shuffle.partitions=3840 --graphBasePath${graphBasePath} --workingPath${workingPath} @@ -208,7 +208,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15000 + --conf spark.sql.shuffle.partitions=3840 --graphBasePath${graphBasePath} --workingPath${workingPath} @@ -235,13 +235,13 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15000 + --conf spark.sql.shuffle.partitions=3840 --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetIdOpenorgs} - --numPartitions15000 + --numPartitions8000 @@ -288,7 +288,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=10000 + --conf spark.sql.shuffle.partitions=3840 --graphBasePath${graphBasePath} --workingPath${workingPath} @@ -314,7 +314,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=10000 + --conf spark.sql.shuffle.partitions=3840 --graphBasePath${graphBasePath} --workingPath${workingPath}