Compare commits

...

1 Commits

Author SHA1 Message Date
Giambattista Bloisi 6b23b5336d Increase the number of blocks used in dedup 2023-10-02 09:25:12 +02:00
4 changed files with 31 additions and 15 deletions

View File

@ -7,7 +7,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.expressions._ import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions.{col, lit, udf} import org.apache.spark.sql.functions.{col, lit, udf}
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, Dataset, Row, functions} import org.apache.spark.sql.{Column, Dataset, Row, SaveMode, functions}
import java.util.function.Predicate import java.util.function.Predicate
import java.util.stream.Collectors import java.util.stream.Collectors
@ -91,10 +91,19 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable {
val df_with_blocks = df_with_clustering_keys val df_with_blocks = df_with_clustering_keys
// filter out rows with position exceeding the maxqueuesize parameter // filter out rows with position exceeding the maxqueuesize parameter
.filter(col("position").leq(conf.getWf.getQueueMaxSize)) //.filter(col("position").leq(conf.getWf.getQueueMaxSize))
.groupBy("clustering", "key") .groupBy(col("clustering"), col("key"), functions.floor(col("position").divide(lit(conf.getWf.getQueueMaxSize))))
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block")) .agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
.filter(functions.size(new Column("block")).gt(1)) .filter(functions.size(new Column("block")).gt(1))
.union(
df_with_clustering_keys
// create small blocks of records on "the border" of maxqueuesize: getSlidingWindowSize/2 elements before and after
.filter(col("position").mod(conf.getWf.getQueueMaxSize).lt(conf.getWf.getSlidingWindowSize/2) ||
col("position").mod(conf.getWf.getQueueMaxSize).gt(conf.getWf.getQueueMaxSize - (conf.getWf.getSlidingWindowSize/2)))
.groupBy(col("clustering"), col("key"), functions.floor((col("position") + lit(conf.getWf.getSlidingWindowSize/2)).divide(lit(conf.getWf.getQueueMaxSize))))
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
.filter(functions.size(new Column("block")).gt(1))
)
df_with_blocks df_with_blocks
} }

View File

@ -93,18 +93,13 @@ public class SparkWhitelistSimRels extends AbstractSparkAction {
Dataset<Row> entities = spark Dataset<Row> entities = spark
.read() .read()
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.repartition(numPartitions) .select(functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()).as("id") )
.withColumn("id", functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath())); .distinct();
Dataset<Row> whiteListRels1 = whiteListRels
.join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "inner")
.select("from", "to");
Dataset<Row> whiteListRels2 = whiteListRels1 Dataset<Relation> whiteListSimRels = whiteListRels
.join(entities, whiteListRels1.col("to").equalTo(entities.col("id")), "inner") .join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "semi")
.select("from", "to"); .join(entities, functions.col("to").equalTo(entities.col("id")), "semi")
Dataset<Relation> whiteListSimRels = whiteListRels2
.map( .map(
(MapFunction<Row, Relation>) r -> DedupUtility (MapFunction<Row, Relation>) r -> DedupUtility
.createSimRel(r.getString(0), r.getString(1), entity), .createSimRel(r.getString(0), r.getString(1), entity),

View File

@ -8,8 +8,12 @@ import scala.collection.JavaConversions;
object GraphProcessor { object GraphProcessor {
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
val cc = graph.connectedComponents(maxIterations).vertices val (cc, didConverge, iterations) = com.kwartile.lib.cc.ConnectedComponent
.run(edges.map{ e => Seq(e.srcId, e.dstId).toList}, 50)
// val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
// val cc = graph.connectedComponents(maxIterations).vertices
val joinResult = vertexes.leftOuterJoin(cc).map { val joinResult = vertexes.leftOuterJoin(cc).map {
case (id, (openaireId, cc)) => { case (id, (openaireId, cc)) => {

View File

@ -127,6 +127,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=600
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
@ -154,6 +155,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=600
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
@ -182,6 +184,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=600
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
@ -209,6 +212,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=600
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
@ -236,6 +240,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=600
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
@ -263,6 +268,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=600
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
@ -289,6 +295,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=10000 --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=600
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
@ -315,6 +322,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=10000 --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=600
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>