Compare commits

...

1 Commits

Author SHA1 Message Date
Claudio Atzori 0bd306b196 WIP investigating failures in the duplicateScan phase 2023-05-22 10:36:24 +02:00
2 changed files with 26 additions and 18 deletions

View File

@ -7,6 +7,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -73,6 +74,8 @@ public class SparkCreateSimRels extends AbstractSparkAction {
log.info("actionSetId: '{}'", actionSetId); log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath); log.info("workingPath: '{}'", workingPath);
spark.sparkContext().setCheckpointDir(workingPath + "/checkpoint");
// for each dedup configuration // for each dedup configuration
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
@ -85,8 +88,10 @@ public class SparkCreateSimRels extends AbstractSparkAction {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaPairRDD<String, MapDocument> mapDocuments = sc final JavaRDD<String> rdd = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity));
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) rdd.checkpoint();
JavaPairRDD<String, MapDocument> mapDocuments = rdd
.repartition(numPartitions) .repartition(numPartitions)
.mapToPair( .mapToPair(
(PairFunction<String, String, MapDocument>) s -> { (PairFunction<String, String, MapDocument>) s -> {
@ -95,15 +100,18 @@ public class SparkCreateSimRels extends AbstractSparkAction {
}); });
// create blocks for deduplication // create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper final JavaPairRDD<String, Block> sortedBlocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
.createSortedBlocks(mapDocuments, dedupConf) sortedBlocks.checkpoint();
.repartition(numPartitions); JavaPairRDD<String, Block> blocks = sortedBlocks.repartition(numPartitions);
final JavaRDD<Relation> simRelsRdd = Deduper
.computeRelations(sc, blocks, dedupConf)
.map(t -> DedupUtility.createSimRel(t._1(), t._2(), entity));
simRelsRdd.checkpoint();
Dataset<Relation> simRels = spark Dataset<Relation> simRels = spark
.createDataset( .createDataset(
Deduper simRelsRdd
.computeRelations(sc, blocks, dedupConf)
.map(t -> DedupUtility.createSimRel(t._1(), t._2(), entity))
.repartition(numPartitions) .repartition(numPartitions)
.rdd(), .rdd(),
Encoders.bean(Relation.class)); Encoders.bean(Relation.class));

View File

@ -126,13 +126,13 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg> <arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>15000</arg> <arg>--numPartitions</arg><arg>8000</arg>
</spark> </spark>
<ok to="WhitelistSimRels"/> <ok to="WhitelistSimRels"/>
<error to="Kill"/> <error to="Kill"/>
@ -153,14 +153,14 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg> <arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--whiteListPath</arg><arg>${whiteListPath}</arg> <arg>--whiteListPath</arg><arg>${whiteListPath}</arg>
<arg>--numPartitions</arg><arg>15000</arg> <arg>--numPartitions</arg><arg>8000</arg>
</spark> </spark>
<ok to="CreateMergeRel"/> <ok to="CreateMergeRel"/>
<error to="Kill"/> <error to="Kill"/>
@ -181,7 +181,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
@ -208,7 +208,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
@ -235,13 +235,13 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg> <arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--numPartitions</arg><arg>15000</arg> <arg>--numPartitions</arg><arg>8000</arg>
</spark> </spark>
<ok to="CreateOrgsDedupRecord"/> <ok to="CreateOrgsDedupRecord"/>
<error to="Kill"/> <error to="Kill"/>
@ -288,7 +288,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=10000 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
@ -314,7 +314,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=10000 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>