Compare commits
1 Commits
main
...
dedup_incr
Author | SHA1 | Date |
---|---|---|
Giambattista Bloisi | 6b23b5336d |
|
@ -7,7 +7,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
|
||||||
import org.apache.spark.sql.expressions._
|
import org.apache.spark.sql.expressions._
|
||||||
import org.apache.spark.sql.functions.{col, lit, udf}
|
import org.apache.spark.sql.functions.{col, lit, udf}
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.apache.spark.sql.{Column, Dataset, Row, functions}
|
import org.apache.spark.sql.{Column, Dataset, Row, SaveMode, functions}
|
||||||
|
|
||||||
import java.util.function.Predicate
|
import java.util.function.Predicate
|
||||||
import java.util.stream.Collectors
|
import java.util.stream.Collectors
|
||||||
|
@ -91,10 +91,19 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable {
|
||||||
|
|
||||||
val df_with_blocks = df_with_clustering_keys
|
val df_with_blocks = df_with_clustering_keys
|
||||||
// filter out rows with position exceeding the maxqueuesize parameter
|
// filter out rows with position exceeding the maxqueuesize parameter
|
||||||
.filter(col("position").leq(conf.getWf.getQueueMaxSize))
|
//.filter(col("position").leq(conf.getWf.getQueueMaxSize))
|
||||||
.groupBy("clustering", "key")
|
.groupBy(col("clustering"), col("key"), functions.floor(col("position").divide(lit(conf.getWf.getQueueMaxSize))))
|
||||||
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
|
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
|
||||||
.filter(functions.size(new Column("block")).gt(1))
|
.filter(functions.size(new Column("block")).gt(1))
|
||||||
|
.union(
|
||||||
|
df_with_clustering_keys
|
||||||
|
// create small blocks of records on "the border" of maxqueuesize: getSlidingWindowSize/2 elements before and after
|
||||||
|
.filter(col("position").mod(conf.getWf.getQueueMaxSize).lt(conf.getWf.getSlidingWindowSize/2) ||
|
||||||
|
col("position").mod(conf.getWf.getQueueMaxSize).gt(conf.getWf.getQueueMaxSize - (conf.getWf.getSlidingWindowSize/2)))
|
||||||
|
.groupBy(col("clustering"), col("key"), functions.floor((col("position") + lit(conf.getWf.getSlidingWindowSize/2)).divide(lit(conf.getWf.getQueueMaxSize))))
|
||||||
|
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
|
||||||
|
.filter(functions.size(new Column("block")).gt(1))
|
||||||
|
)
|
||||||
|
|
||||||
df_with_blocks
|
df_with_blocks
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,18 +93,13 @@ public class SparkWhitelistSimRels extends AbstractSparkAction {
|
||||||
Dataset<Row> entities = spark
|
Dataset<Row> entities = spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||||
.repartition(numPartitions)
|
.select(functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()).as("id") )
|
||||||
.withColumn("id", functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()));
|
.distinct();
|
||||||
|
|
||||||
Dataset<Row> whiteListRels1 = whiteListRels
|
|
||||||
.join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "inner")
|
|
||||||
.select("from", "to");
|
|
||||||
|
|
||||||
Dataset<Row> whiteListRels2 = whiteListRels1
|
Dataset<Relation> whiteListSimRels = whiteListRels
|
||||||
.join(entities, whiteListRels1.col("to").equalTo(entities.col("id")), "inner")
|
.join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "semi")
|
||||||
.select("from", "to");
|
.join(entities, functions.col("to").equalTo(entities.col("id")), "semi")
|
||||||
|
|
||||||
Dataset<Relation> whiteListSimRels = whiteListRels2
|
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<Row, Relation>) r -> DedupUtility
|
(MapFunction<Row, Relation>) r -> DedupUtility
|
||||||
.createSimRel(r.getString(0), r.getString(1), entity),
|
.createSimRel(r.getString(0), r.getString(1), entity),
|
||||||
|
|
|
@ -8,8 +8,12 @@ import scala.collection.JavaConversions;
|
||||||
object GraphProcessor {
|
object GraphProcessor {
|
||||||
|
|
||||||
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
|
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
|
||||||
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
|
|
||||||
val cc = graph.connectedComponents(maxIterations).vertices
|
val (cc, didConverge, iterations) = com.kwartile.lib.cc.ConnectedComponent
|
||||||
|
.run(edges.map{ e => Seq(e.srcId, e.dstId).toList}, 50)
|
||||||
|
|
||||||
|
// val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
|
||||||
|
// val cc = graph.connectedComponents(maxIterations).vertices
|
||||||
|
|
||||||
val joinResult = vertexes.leftOuterJoin(cc).map {
|
val joinResult = vertexes.leftOuterJoin(cc).map {
|
||||||
case (id, (openaireId, cc)) => {
|
case (id, (openaireId, cc)) => {
|
||||||
|
|
|
@ -127,6 +127,7 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
|
--conf spark.network.timeout=600
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||||
|
@ -154,6 +155,7 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
|
--conf spark.network.timeout=600
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||||
|
@ -182,6 +184,7 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
|
--conf spark.network.timeout=600
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
@ -209,6 +212,7 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
|
--conf spark.network.timeout=600
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
@ -236,6 +240,7 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
|
--conf spark.network.timeout=600
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
@ -263,6 +268,7 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
--conf spark.network.timeout=600
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
@ -289,6 +295,7 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=10000
|
--conf spark.sql.shuffle.partitions=10000
|
||||||
|
--conf spark.network.timeout=600
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
@ -315,6 +322,7 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=10000
|
--conf spark.sql.shuffle.partitions=10000
|
||||||
|
--conf spark.network.timeout=600
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
|
Loading…
Reference in New Issue