|
|
|
@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
|
|
|
|
|
import scala.collection.mutable
|
|
|
|
|
import org.apache.spark.sql.functions.{col, lit, udf}
|
|
|
|
|
|
|
|
|
|
import java.util.Collections
|
|
|
|
|
import java.util.stream.Collectors
|
|
|
|
|
|
|
|
|
|
case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Serializable {
|
|
|
|
@ -31,11 +32,15 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
private val urlFilter = (s: String) => URL_REGEX.matcher(s).matches
|
|
|
|
|
|
|
|
|
|
val modelExtractor: (Dataset[String] => Dataset[Row]) = df => {
|
|
|
|
|
|
|
|
|
|
df.withColumn("mapDocument", rowFromJsonUDF.apply(df.col(df.columns(0))))
|
|
|
|
|
.withColumn("identifier", new Column("mapDocument.identifier"))
|
|
|
|
|
.repartition(new Column("identifier"))
|
|
|
|
|
//.repartition(new Column("identifier"))
|
|
|
|
|
.dropDuplicates("identifier")
|
|
|
|
|
.select("mapDocument.*")
|
|
|
|
|
|
|
|
|
|
df.map(r => rowFromJson(r))(RowEncoder(rowDataType))
|
|
|
|
|
.dropDuplicates("identifier")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val generateClusters: (Dataset[Row] => Dataset[Row]) = df => {
|
|
|
|
@ -81,12 +86,15 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
|
|
|
|
|
// Using SQL because GROUPING SETS are not available through Scala/Java DSL
|
|
|
|
|
df_with_keys.sqlContext.sql(
|
|
|
|
|
("SELECT coalesce(" + keys + ") as key, collect_sort_slice(" + fields + ") as block FROM " + tempTable + " WHERE coalesce(" + keys + ") IS NOT NULL GROUP BY GROUPING SETS (" + keys + ") HAVING size(block) > 1")
|
|
|
|
|
("SELECT coalesce(" + keys + ") as key, sort_array(collect_sort_slice(" + fields + ")) as block FROM " + tempTable + " WHERE coalesce(" + keys + ") IS NOT NULL GROUP BY GROUPING SETS (" + keys + ") HAVING size(block) > 1")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val generateClustersWithDFAPI: (Dataset[Row] => Dataset[Row]) = df => {
|
|
|
|
|
|
|
|
|
|
System.out.println(conf.getWf.getEntityType + "::" +conf.getWf.getSubEntityType)
|
|
|
|
|
|
|
|
|
|
val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => {
|
|
|
|
|
if (conf.blacklists.containsKey(fdef.getName)) {
|
|
|
|
|
res.withColumn(
|
|
|
|
@ -112,15 +120,74 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
columns.add(new Column(fName))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val ds: Dataset[Row] = df_with_filters.withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(columns.asScala: _*))))
|
|
|
|
|
.select((Seq(rowDataType.fieldNames: _*) ++ Seq("key")).map(col): _*)
|
|
|
|
|
val tmp: Dataset[Row] = df_with_filters.withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(columns.asScala: _*))))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*.select((Seq(rowDataType.fieldNames: _*) ++ Seq("key")).map(col): _*)
|
|
|
|
|
.groupByKey(r => r.getAs[String]("key"))(Encoders.STRING)
|
|
|
|
|
.agg(collectSortSliceAggregator.toColumn)
|
|
|
|
|
.toDF("key", "block")
|
|
|
|
|
.select(col("block.block").as("block"))
|
|
|
|
|
.select(col("block.block").as("block"))*/
|
|
|
|
|
|
|
|
|
|
/*.groupBy("key")
|
|
|
|
|
.agg(collectSortSliceUDAF(rowDataType.fieldNames.map(col): _*).as("block"))*/
|
|
|
|
|
System.out.println(cd.getName)
|
|
|
|
|
|
|
|
|
|
val ds = tmp.groupBy("key")
|
|
|
|
|
// .agg(functions.sort_array(collectSortSliceUDAF(rowDataType.fieldNames.map(col): _*)).as("block"))
|
|
|
|
|
.agg(functions.collect_set(functions.struct(rowDataType.fieldNames.map(col): _*)).as("block"))
|
|
|
|
|
//.filter(functions.size(new Column("block")).geq(new Literal(2, DataTypes.IntegerType)))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//df_with_filters.printSchema()
|
|
|
|
|
//ds.printSchema()
|
|
|
|
|
|
|
|
|
|
if (relBlocks == null) relBlocks = ds
|
|
|
|
|
else relBlocks = relBlocks.union(ds)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// System.out.println()
|
|
|
|
|
|
|
|
|
|
relBlocks
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val generateClustersWithWindows: (Dataset[Row] => Dataset[Row]) = df => {
|
|
|
|
|
|
|
|
|
|
val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => {
|
|
|
|
|
if (conf.blacklists.containsKey(fdef.getName)) {
|
|
|
|
|
res.withColumn(
|
|
|
|
|
fdef.getName + "_filtered",
|
|
|
|
|
filterColumnUDF(fdef).apply(new Column(fdef.getName))
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
res
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
var relBlocks: Dataset[Row] = null
|
|
|
|
|
|
|
|
|
|
import scala.collection.JavaConversions._
|
|
|
|
|
|
|
|
|
|
for (cd <- conf.clusterings()) {
|
|
|
|
|
System.out.println(conf.getWf.getEntityType + "::" + conf.getWf.getSubEntityType+ ": " + cd.getName + " " + cd.toString)
|
|
|
|
|
|
|
|
|
|
val columns: util.List[Column] = new util.ArrayList[Column](cd.getFields().size)
|
|
|
|
|
|
|
|
|
|
for (fName <- cd.getFields()) {
|
|
|
|
|
if (conf.blacklists.containsKey(fName))
|
|
|
|
|
columns.add(new Column(fName + "_filtered"))
|
|
|
|
|
else
|
|
|
|
|
columns.add(new Column(fName))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add 'key' column with the value generated by the given clustering definition
|
|
|
|
|
val ds: Dataset[Row] = df_with_filters.withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(columns.asScala: _*))))
|
|
|
|
|
// Add position column having the position of the row within the set of rows having the same key value ordered by the sorting value
|
|
|
|
|
.withColumn("position", functions.row_number().over(Window.partitionBy("key").orderBy(col(conf.getWf.getOrderField))))
|
|
|
|
|
// filter out rows with position exceeding the maxqueuesize parameter
|
|
|
|
|
.filter(col("position").leq(conf.getWf.getQueueMaxSize))
|
|
|
|
|
.groupBy("key")
|
|
|
|
|
.agg(functions.collect_set(functions.struct(rowDataType.fieldNames.map(col): _*)).as("block"))
|
|
|
|
|
.filter(functions.size(new Column("block")).geq(new Literal(2, DataTypes.IntegerType)))
|
|
|
|
|
|
|
|
|
|
if (relBlocks == null) relBlocks = ds
|
|
|
|
@ -224,6 +291,67 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
relBlocks
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val printAnalytics: (Dataset[Row] => Dataset[Row]) = df => {
|
|
|
|
|
|
|
|
|
|
val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => {
|
|
|
|
|
if (conf.blacklists.containsKey(fdef.getName)) {
|
|
|
|
|
res.withColumn(
|
|
|
|
|
fdef.getName + "_filtered",
|
|
|
|
|
filterColumnUDF(fdef).apply(new Column(fdef.getName))
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
res
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
var relBlocks: Dataset[Row] = null
|
|
|
|
|
|
|
|
|
|
import scala.collection.JavaConversions._
|
|
|
|
|
|
|
|
|
|
for (cd <- conf.clusterings()) {
|
|
|
|
|
val columns: util.List[Column] = new util.ArrayList[Column](cd.getFields().size)
|
|
|
|
|
|
|
|
|
|
for (fName <- cd.getFields()) {
|
|
|
|
|
if (conf.blacklists.containsKey(fName))
|
|
|
|
|
columns.add(new Column(fName + "_filtered"))
|
|
|
|
|
else
|
|
|
|
|
columns.add(new Column(fName))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add 'key' column with the value generated by the given clustering definition
|
|
|
|
|
val ds: Dataset[Row] = df_with_filters.withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(columns.asScala: _*))))
|
|
|
|
|
// Add position column having the position of the row within the set of rows having the same key value ordered by the sorting value
|
|
|
|
|
.withColumn("position", functions.row_number().over(Window.partitionBy("key").orderBy(conf.getWf.getOrderField)))
|
|
|
|
|
// filter out rows with position exceeding the maxqueuesize parameter
|
|
|
|
|
.filter(col("position").lt(conf.getWf.getQueueMaxSize))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// inner join to compute all combination of rows to compare
|
|
|
|
|
// note the condition on position to obtain 'windowing': given a row this is compared at most with the next
|
|
|
|
|
// SlidingWindowSize rows following the sort order
|
|
|
|
|
val dsWithMatch = ds.as("l").join(ds.as("r"),
|
|
|
|
|
col("l.key").equalTo(col("r.key")),
|
|
|
|
|
"inner"
|
|
|
|
|
)
|
|
|
|
|
.filter((col("l.position").lt(col("r.position")))
|
|
|
|
|
&& (col("r.position").lt(col("l.position").plus(lit(conf.getWf.getSlidingWindowSize)))))
|
|
|
|
|
// Add match column with the result of comparison
|
|
|
|
|
// dsWithMatch.show(false)
|
|
|
|
|
|
|
|
|
|
if (relBlocks == null)
|
|
|
|
|
relBlocks = dsWithMatch
|
|
|
|
|
else
|
|
|
|
|
relBlocks = relBlocks.union(dsWithMatch)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
System.out.println(conf.getWf.getEntityType + "::" + conf.getWf.getSubEntityType)
|
|
|
|
|
System.out.println("Total number of comparations: " + relBlocks.count())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
val generateAndProcessClustersWithJoins: (Dataset[Row] => Dataset[Row]) = df => {
|
|
|
|
|
|
|
|
|
|
val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => {
|
|
|
|
@ -287,7 +415,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
|
|
|
|
|
val res = relBlocks
|
|
|
|
|
//.select(col("l.identifier").as("from"), col("r.identifier").as("to"))
|
|
|
|
|
.repartition()
|
|
|
|
|
//.repartition()
|
|
|
|
|
.distinct()
|
|
|
|
|
|
|
|
|
|
// res.show(false)
|
|
|
|
@ -301,12 +429,36 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
df.filter(functions.size(new Column("block")).geq(new Literal(2, DataTypes.IntegerType)))
|
|
|
|
|
.withColumn("relations", processBlock(df.sqlContext.sparkContext).apply(new Column("block")))
|
|
|
|
|
.select(functions.explode(new Column("relations")).as("relation"))
|
|
|
|
|
.repartition(new Column("relation"))
|
|
|
|
|
//.repartition(new Column("relation"))
|
|
|
|
|
.dropDuplicates("relation")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val rowDataType: StructType = {
|
|
|
|
|
val unordered = conf.getPace.getModel.asScala.foldLeft(
|
|
|
|
|
// val unordered = conf.getPace.getModel.asScala.foldLeft(
|
|
|
|
|
// new StructType()
|
|
|
|
|
// )((resType, fdef) => {
|
|
|
|
|
// resType.add(fdef.getType match {
|
|
|
|
|
// case Type.List | Type.JSON =>
|
|
|
|
|
// StructField(fdef.getName, DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty)
|
|
|
|
|
// case Type.DoubleArray =>
|
|
|
|
|
// StructField(fdef.getName, DataTypes.createArrayType(DataTypes.DoubleType), true, Metadata.empty)
|
|
|
|
|
// case _ =>
|
|
|
|
|
// StructField(fdef.getName, DataTypes.StringType, true, Metadata.empty)
|
|
|
|
|
// })
|
|
|
|
|
// })
|
|
|
|
|
//
|
|
|
|
|
// conf.getPace.getModel.asScala.filterNot(_.getName.equals(conf.getWf.getOrderField)).foldLeft(
|
|
|
|
|
// new StructType()
|
|
|
|
|
// .add(unordered(conf.getWf.getOrderField))
|
|
|
|
|
// .add(StructField("identifier", DataTypes.StringType, false, Metadata.empty))
|
|
|
|
|
// )((resType, fdef) => resType.add(unordered(fdef.getName)))
|
|
|
|
|
|
|
|
|
|
val identifier = new FieldDef()
|
|
|
|
|
identifier.setName("identifier")
|
|
|
|
|
identifier.setType(Type.String)
|
|
|
|
|
|
|
|
|
|
(conf.getPace.getModel.asScala ++ Seq(identifier)).sortBy(_.getName)
|
|
|
|
|
.foldLeft(
|
|
|
|
|
new StructType()
|
|
|
|
|
)((resType, fdef) => {
|
|
|
|
|
resType.add(fdef.getType match {
|
|
|
|
@ -319,11 +471,6 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
conf.getPace.getModel.asScala.filterNot(_.getName.equals(conf.getWf.getOrderField)).foldLeft(
|
|
|
|
|
new StructType()
|
|
|
|
|
.add(unordered(conf.getWf.getOrderField))
|
|
|
|
|
.add(StructField("identifier", DataTypes.StringType, false, Metadata.empty))
|
|
|
|
|
)((resType, fdef) => resType.add(unordered(fdef.getName)))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
@ -332,7 +479,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
|
|
|
|
|
val orderingFieldPosition: Int = rowDataType.fieldIndex(conf.getWf.getOrderField)
|
|
|
|
|
|
|
|
|
|
val rowFromJson = (json: String) => {
|
|
|
|
|
def rowFromJson(json: String) : Row = {
|
|
|
|
|
val documentContext =
|
|
|
|
|
JsonPath.using(Configuration.defaultConfiguration.addOptions(Option.SUPPRESS_EXCEPTIONS)).parse(json)
|
|
|
|
|
val values = new Array[Any](rowDataType.size)
|
|
|
|
@ -360,7 +507,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
MapDocumentUtil.truncateList(
|
|
|
|
|
MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
|
|
|
|
|
fdef.getSize
|
|
|
|
|
)
|
|
|
|
|
).toArray
|
|
|
|
|
|
|
|
|
|
case Type.StringConcat =>
|
|
|
|
|
val jpaths = CONCAT_REGEX.split(fdef.getPath)
|
|
|
|
@ -384,7 +531,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
new GenericRowWithSchema(values, rowDataType)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val rowFromJsonUDF = udf(rowFromJson, rowDataType)
|
|
|
|
|
val rowFromJsonUDF = udf(rowFromJson(_), rowDataType)
|
|
|
|
|
|
|
|
|
|
def filterColumnUDF(fdef: FieldDef): UserDefinedFunction = {
|
|
|
|
|
|
|
|
|
@ -428,7 +575,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
|
|
|
|
|
new BlockProcessor(conf, identityFieldPosition, orderingFieldPosition).processSortedRows(mapDocuments, reporter)
|
|
|
|
|
|
|
|
|
|
reporter.getRelations.asScala.toArray
|
|
|
|
|
})
|
|
|
|
|
}).asNondeterministic()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val collectSortSliceAggregator : Aggregator[Row,Seq[Row], Row] = new Aggregator[Row, Seq[Row], Row] () {
|
|
|
|
|