From 6b23b5336dda1066d55c74415ff83c1fd8850d26 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 2 Oct 2023 09:25:12 +0200 Subject: [PATCH] Increase the number of blocks used in dedup --- .../java/eu/dnetlib/pace/model/SparkDeduper.scala | 15 ++++++++++++--- .../dhp/oa/dedup/SparkWhitelistSimRels.java | 15 +++++---------- .../dhp/oa/dedup/graph/GraphProcessor.scala | 8 ++++++-- .../dhp/oa/dedup/scan/oozie_app/workflow.xml | 8 ++++++++ 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala index b3f56bcdb..f09420d13 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala @@ -7,7 +7,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.expressions._ import org.apache.spark.sql.functions.{col, lit, udf} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, Dataset, Row, functions} +import org.apache.spark.sql.{Column, Dataset, Row, SaveMode, functions} import java.util.function.Predicate import java.util.stream.Collectors @@ -91,10 +91,19 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable { val df_with_blocks = df_with_clustering_keys // filter out rows with position exceeding the maxqueuesize parameter - .filter(col("position").leq(conf.getWf.getQueueMaxSize)) - .groupBy("clustering", "key") + //.filter(col("position").leq(conf.getWf.getQueueMaxSize)) + .groupBy(col("clustering"), col("key"), functions.floor(col("position").divide(lit(conf.getWf.getQueueMaxSize)))) .agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block")) .filter(functions.size(new Column("block")).gt(1)) + .union( + df_with_clustering_keys + // create small blocks of records on "the border" of maxqueuesize: getSlidingWindowSize/2 elements before and after + .filter(col("position").mod(conf.getWf.getQueueMaxSize).lt(conf.getWf.getSlidingWindowSize/2) || + col("position").mod(conf.getWf.getQueueMaxSize).gt(conf.getWf.getQueueMaxSize - (conf.getWf.getSlidingWindowSize/2))) + .groupBy(col("clustering"), col("key"), functions.floor((col("position") + lit(conf.getWf.getSlidingWindowSize/2)).divide(lit(conf.getWf.getQueueMaxSize)))) + .agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block")) + .filter(functions.size(new Column("block")).gt(1)) + ) df_with_blocks } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java index 94a09ed05..d2d1a9264 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java @@ -93,18 +93,13 @@ public class SparkWhitelistSimRels extends AbstractSparkAction { Dataset entities = spark .read() .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .repartition(numPartitions) - .withColumn("id", functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath())); + .select(functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()).as("id") ) + .distinct(); - Dataset whiteListRels1 = whiteListRels - .join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "inner") - .select("from", "to"); - Dataset whiteListRels2 = whiteListRels1 - .join(entities, whiteListRels1.col("to").equalTo(entities.col("id")), "inner") - .select("from", "to"); - - Dataset whiteListSimRels = whiteListRels2 + Dataset whiteListSimRels = whiteListRels + .join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "semi") + .join(entities, functions.col("to").equalTo(entities.col("id")), "semi") .map( (MapFunction) r -> DedupUtility .createSimRel(r.getString(0), r.getString(1), entity), diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala index f4dd85d75..e82b647cc 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala @@ -8,8 +8,12 @@ import scala.collection.JavaConversions; object GraphProcessor { def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { - val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby - val cc = graph.connectedComponents(maxIterations).vertices + + val (cc, didConverge, iterations) = com.kwartile.lib.cc.ConnectedComponent + .run(edges.map{ e => Seq(e.srcId, e.dstId).toList}, 50) + + // val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby + // val cc = graph.connectedComponents(maxIterations).vertices val joinResult = vertexes.leftOuterJoin(cc).map { case (id, (openaireId, cc)) => { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index ba2270c8a..ea5b91b15 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -127,6 +127,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=15000 + --conf spark.network.timeout=600 --graphBasePath${graphBasePath} --isLookUpUrl${isLookUpUrl} @@ -154,6 +155,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=15000 + --conf spark.network.timeout=600 --graphBasePath${graphBasePath} --isLookUpUrl${isLookUpUrl} @@ -182,6 +184,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=15000 + --conf spark.network.timeout=600 --graphBasePath${graphBasePath} --workingPath${workingPath} @@ -209,6 +212,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=15000 + --conf spark.network.timeout=600 --graphBasePath${graphBasePath} --workingPath${workingPath} @@ -236,6 +240,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=15000 + --conf spark.network.timeout=600 --graphBasePath${graphBasePath} --workingPath${workingPath} @@ -263,6 +268,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=600 --graphBasePath${graphBasePath} --workingPath${workingPath} @@ -289,6 +295,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=10000 + --conf spark.network.timeout=600 --graphBasePath${graphBasePath} --workingPath${workingPath} @@ -315,6 +322,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=10000 + --conf spark.network.timeout=600 --graphBasePath${graphBasePath} --workingPath${workingPath}