Increase the number of blocks used in dedup

This commit is contained in:
Giambattista Bloisi 2023-10-02 09:25:12 +02:00
parent 0935d7757c
commit 6b23b5336d
4 changed files with 31 additions and 15 deletions

View File

@ -7,7 +7,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions.{col, lit, udf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, Dataset, Row, functions}
import org.apache.spark.sql.{Column, Dataset, Row, SaveMode, functions}
import java.util.function.Predicate
import java.util.stream.Collectors
@ -91,10 +91,19 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable {
val df_with_blocks = df_with_clustering_keys
// filter out rows with position exceeding the maxqueuesize parameter
.filter(col("position").leq(conf.getWf.getQueueMaxSize))
.groupBy("clustering", "key")
//.filter(col("position").leq(conf.getWf.getQueueMaxSize))
.groupBy(col("clustering"), col("key"), functions.floor(col("position").divide(lit(conf.getWf.getQueueMaxSize))))
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
.filter(functions.size(new Column("block")).gt(1))
.union(
df_with_clustering_keys
// create small blocks of records on "the border" of maxqueuesize: getSlidingWindowSize/2 elements before and after
.filter(col("position").mod(conf.getWf.getQueueMaxSize).lt(conf.getWf.getSlidingWindowSize/2) ||
col("position").mod(conf.getWf.getQueueMaxSize).gt(conf.getWf.getQueueMaxSize - (conf.getWf.getSlidingWindowSize/2)))
.groupBy(col("clustering"), col("key"), functions.floor((col("position") + lit(conf.getWf.getSlidingWindowSize/2)).divide(lit(conf.getWf.getQueueMaxSize))))
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
.filter(functions.size(new Column("block")).gt(1))
)
df_with_blocks
}

View File

@ -93,18 +93,13 @@ public class SparkWhitelistSimRels extends AbstractSparkAction {
Dataset<Row> entities = spark
.read()
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.repartition(numPartitions)
.withColumn("id", functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()));
.select(functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()).as("id") )
.distinct();
Dataset<Row> whiteListRels1 = whiteListRels
.join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "inner")
.select("from", "to");
Dataset<Row> whiteListRels2 = whiteListRels1
.join(entities, whiteListRels1.col("to").equalTo(entities.col("id")), "inner")
.select("from", "to");
Dataset<Relation> whiteListSimRels = whiteListRels2
Dataset<Relation> whiteListSimRels = whiteListRels
.join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "semi")
.join(entities, functions.col("to").equalTo(entities.col("id")), "semi")
.map(
(MapFunction<Row, Relation>) r -> DedupUtility
.createSimRel(r.getString(0), r.getString(1), entity),

View File

@ -8,8 +8,12 @@ import scala.collection.JavaConversions;
object GraphProcessor {
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
val cc = graph.connectedComponents(maxIterations).vertices
val (cc, didConverge, iterations) = com.kwartile.lib.cc.ConnectedComponent
.run(edges.map{ e => Seq(e.srcId, e.dstId).toList}, 50)
// val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
// val cc = graph.connectedComponents(maxIterations).vertices
val joinResult = vertexes.leftOuterJoin(cc).map {
case (id, (openaireId, cc)) => {

View File

@ -127,6 +127,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=600
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
@ -154,6 +155,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=600
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
@ -182,6 +184,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=600
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
@ -209,6 +212,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=600
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
@ -236,6 +240,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=600
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
@ -263,6 +268,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=600
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
@ -289,6 +295,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=600
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
@ -315,6 +322,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=600
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>