Compare commits
1 Commits
main
...
dedup_incr
Author | SHA1 | Date |
---|---|---|
Giambattista Bloisi | 6b23b5336d |
|
@ -7,7 +7,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
|
|||
import org.apache.spark.sql.expressions._
|
||||
import org.apache.spark.sql.functions.{col, lit, udf}
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql.{Column, Dataset, Row, functions}
|
||||
import org.apache.spark.sql.{Column, Dataset, Row, SaveMode, functions}
|
||||
|
||||
import java.util.function.Predicate
|
||||
import java.util.stream.Collectors
|
||||
|
@ -91,10 +91,19 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable {
|
|||
|
||||
val df_with_blocks = df_with_clustering_keys
|
||||
// filter out rows with position exceeding the maxqueuesize parameter
|
||||
.filter(col("position").leq(conf.getWf.getQueueMaxSize))
|
||||
.groupBy("clustering", "key")
|
||||
//.filter(col("position").leq(conf.getWf.getQueueMaxSize))
|
||||
.groupBy(col("clustering"), col("key"), functions.floor(col("position").divide(lit(conf.getWf.getQueueMaxSize))))
|
||||
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
|
||||
.filter(functions.size(new Column("block")).gt(1))
|
||||
.union(
|
||||
df_with_clustering_keys
|
||||
// create small blocks of records on "the border" of maxqueuesize: getSlidingWindowSize/2 elements before and after
|
||||
.filter(col("position").mod(conf.getWf.getQueueMaxSize).lt(conf.getWf.getSlidingWindowSize/2) ||
|
||||
col("position").mod(conf.getWf.getQueueMaxSize).gt(conf.getWf.getQueueMaxSize - (conf.getWf.getSlidingWindowSize/2)))
|
||||
.groupBy(col("clustering"), col("key"), functions.floor((col("position") + lit(conf.getWf.getSlidingWindowSize/2)).divide(lit(conf.getWf.getQueueMaxSize))))
|
||||
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
|
||||
.filter(functions.size(new Column("block")).gt(1))
|
||||
)
|
||||
|
||||
df_with_blocks
|
||||
}
|
||||
|
|
|
@ -93,18 +93,13 @@ public class SparkWhitelistSimRels extends AbstractSparkAction {
|
|||
Dataset<Row> entities = spark
|
||||
.read()
|
||||
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||
.repartition(numPartitions)
|
||||
.withColumn("id", functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()));
|
||||
.select(functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()).as("id") )
|
||||
.distinct();
|
||||
|
||||
Dataset<Row> whiteListRels1 = whiteListRels
|
||||
.join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "inner")
|
||||
.select("from", "to");
|
||||
|
||||
Dataset<Row> whiteListRels2 = whiteListRels1
|
||||
.join(entities, whiteListRels1.col("to").equalTo(entities.col("id")), "inner")
|
||||
.select("from", "to");
|
||||
|
||||
Dataset<Relation> whiteListSimRels = whiteListRels2
|
||||
Dataset<Relation> whiteListSimRels = whiteListRels
|
||||
.join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "semi")
|
||||
.join(entities, functions.col("to").equalTo(entities.col("id")), "semi")
|
||||
.map(
|
||||
(MapFunction<Row, Relation>) r -> DedupUtility
|
||||
.createSimRel(r.getString(0), r.getString(1), entity),
|
||||
|
|
|
@ -8,8 +8,12 @@ import scala.collection.JavaConversions;
|
|||
object GraphProcessor {
|
||||
|
||||
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
|
||||
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
|
||||
val cc = graph.connectedComponents(maxIterations).vertices
|
||||
|
||||
val (cc, didConverge, iterations) = com.kwartile.lib.cc.ConnectedComponent
|
||||
.run(edges.map{ e => Seq(e.srcId, e.dstId).toList}, 50)
|
||||
|
||||
// val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
|
||||
// val cc = graph.connectedComponents(maxIterations).vertices
|
||||
|
||||
val joinResult = vertexes.leftOuterJoin(cc).map {
|
||||
case (id, (openaireId, cc)) => {
|
||||
|
|
|
@ -127,6 +127,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
--conf spark.network.timeout=600
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
|
@ -154,6 +155,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
--conf spark.network.timeout=600
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
|
@ -182,6 +184,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
--conf spark.network.timeout=600
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
|
@ -209,6 +212,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
--conf spark.network.timeout=600
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
|
@ -236,6 +240,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
--conf spark.network.timeout=600
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
|
@ -263,6 +268,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.network.timeout=600
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
|
@ -289,6 +295,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=10000
|
||||
--conf spark.network.timeout=600
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
|
@ -315,6 +322,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=10000
|
||||
--conf spark.network.timeout=600
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
|
|
Loading…
Reference in New Issue