WIP investigating failures in the duplicateScan phase

This commit is contained in:
Claudio Atzori 2023-05-22 10:36:24 +02:00
parent a235d2a24a
commit 0bd306b196
2 changed files with 26 additions and 18 deletions

View File

@ -7,6 +7,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
@ -73,6 +74,8 @@ public class SparkCreateSimRels extends AbstractSparkAction {
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
spark.sparkContext().setCheckpointDir(workingPath + "/checkpoint");
// for each dedup configuration
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
@ -85,8 +88,10 @@ public class SparkCreateSimRels extends AbstractSparkAction {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaPairRDD<String, MapDocument> mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
final JavaRDD<String> rdd = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity));
rdd.checkpoint();
JavaPairRDD<String, MapDocument> mapDocuments = rdd
.repartition(numPartitions)
.mapToPair(
(PairFunction<String, String, MapDocument>) s -> {
@ -95,15 +100,18 @@ public class SparkCreateSimRels extends AbstractSparkAction {
});
// create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper
.createSortedBlocks(mapDocuments, dedupConf)
.repartition(numPartitions);
final JavaPairRDD<String, Block> sortedBlocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
sortedBlocks.checkpoint();
JavaPairRDD<String, Block> blocks = sortedBlocks.repartition(numPartitions);
final JavaRDD<Relation> simRelsRdd = Deduper
.computeRelations(sc, blocks, dedupConf)
.map(t -> DedupUtility.createSimRel(t._1(), t._2(), entity));
simRelsRdd.checkpoint();
Dataset<Relation> simRels = spark
.createDataset(
Deduper
.computeRelations(sc, blocks, dedupConf)
.map(t -> DedupUtility.createSimRel(t._1(), t._2(), entity))
simRelsRdd
.repartition(numPartitions)
.rdd(),
Encoders.bean(Relation.class));

View File

@ -126,13 +126,13 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>15000</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="WhitelistSimRels"/>
<error to="Kill"/>
@ -153,14 +153,14 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--whiteListPath</arg><arg>${whiteListPath}</arg>
<arg>--numPartitions</arg><arg>15000</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateMergeRel"/>
<error to="Kill"/>
@ -181,7 +181,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
@ -208,7 +208,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
@ -235,13 +235,13 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--numPartitions</arg><arg>15000</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateOrgsDedupRecord"/>
<error to="Kill"/>
@ -288,7 +288,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=10000
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
@ -314,7 +314,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=10000
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>