diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/RowDataOrderingComparator.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/RowDataOrderingComparator.java index 42c226f87..e5739b972 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/RowDataOrderingComparator.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/RowDataOrderingComparator.java @@ -49,7 +49,12 @@ public class RowDataOrderingComparator implements Comparator { final String to1 = NGramUtils.cleanupForOrdering(o1); final String to2 = NGramUtils.cleanupForOrdering(o2); - return to1.compareTo(to2); + int res = to1.compareTo(to2); + if (res == 0) { + return o1.compareTo(o2); + } + + return res; } } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala index 9a89de57f..22a382812 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.functions.{col, lit, udf} +import java.util.Collections import java.util.stream.Collectors case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Serializable { @@ -31,11 +32,15 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria private val urlFilter = (s: String) => URL_REGEX.matcher(s).matches val modelExtractor: (Dataset[String] => Dataset[Row]) = df => { + df.withColumn("mapDocument", rowFromJsonUDF.apply(df.col(df.columns(0)))) .withColumn("identifier", new Column("mapDocument.identifier")) - .repartition(new Column("identifier")) + //.repartition(new Column("identifier")) .dropDuplicates("identifier") .select("mapDocument.*") + + df.map(r => rowFromJson(r))(RowEncoder(rowDataType)) + .dropDuplicates("identifier") } val generateClusters: (Dataset[Row] => Dataset[Row]) = df => { @@ -81,12 +86,15 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria // Using SQL because GROUPING SETS are not available through Scala/Java DSL df_with_keys.sqlContext.sql( - ("SELECT coalesce(" + keys + ") as key, collect_sort_slice(" + fields + ") as block FROM " + tempTable + " WHERE coalesce(" + keys + ") IS NOT NULL GROUP BY GROUPING SETS (" + keys + ") HAVING size(block) > 1") + ("SELECT coalesce(" + keys + ") as key, sort_array(collect_sort_slice(" + fields + ")) as block FROM " + tempTable + " WHERE coalesce(" + keys + ") IS NOT NULL GROUP BY GROUPING SETS (" + keys + ") HAVING size(block) > 1") ) } val generateClustersWithDFAPI: (Dataset[Row] => Dataset[Row]) = df => { + + System.out.println(conf.getWf.getEntityType + "::" +conf.getWf.getSubEntityType) + val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => { if (conf.blacklists.containsKey(fdef.getName)) { res.withColumn( @@ -112,15 +120,74 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria columns.add(new Column(fName)) } - val ds: Dataset[Row] = df_with_filters.withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(columns.asScala: _*)))) - .select((Seq(rowDataType.fieldNames: _*) ++ Seq("key")).map(col): _*) - .groupByKey(r => r.getAs[String]("key"))(Encoders.STRING) + val tmp: Dataset[Row] = df_with_filters.withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(columns.asScala: _*)))) + + + + + /*.select((Seq(rowDataType.fieldNames: _*) ++ Seq("key")).map(col): _*) + .groupByKey(r => r.getAs[String]("key"))(Encoders.STRING) .agg(collectSortSliceAggregator.toColumn) .toDF("key", "block") - .select(col("block.block").as("block")) + .select(col("block.block").as("block"))*/ - /*.groupBy("key") - .agg(collectSortSliceUDAF(rowDataType.fieldNames.map(col): _*).as("block"))*/ + System.out.println(cd.getName) + + val ds = tmp.groupBy("key") + // .agg(functions.sort_array(collectSortSliceUDAF(rowDataType.fieldNames.map(col): _*)).as("block")) + .agg(functions.collect_set(functions.struct(rowDataType.fieldNames.map(col): _*)).as("block")) + //.filter(functions.size(new Column("block")).geq(new Literal(2, DataTypes.IntegerType))) + + + //df_with_filters.printSchema() + //ds.printSchema() + + if (relBlocks == null) relBlocks = ds + else relBlocks = relBlocks.union(ds) + } + + // System.out.println() + + relBlocks + } + + val generateClustersWithWindows: (Dataset[Row] => Dataset[Row]) = df => { + + val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => { + if (conf.blacklists.containsKey(fdef.getName)) { + res.withColumn( + fdef.getName + "_filtered", + filterColumnUDF(fdef).apply(new Column(fdef.getName)) + ) + } else { + res + } + }) + + var relBlocks: Dataset[Row] = null + + import scala.collection.JavaConversions._ + + for (cd <- conf.clusterings()) { + System.out.println(conf.getWf.getEntityType + "::" + conf.getWf.getSubEntityType+ ": " + cd.getName + " " + cd.toString) + + val columns: util.List[Column] = new util.ArrayList[Column](cd.getFields().size) + + for (fName <- cd.getFields()) { + if (conf.blacklists.containsKey(fName)) + columns.add(new Column(fName + "_filtered")) + else + columns.add(new Column(fName)) + } + + // Add 'key' column with the value generated by the given clustering definition + val ds: Dataset[Row] = df_with_filters.withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(columns.asScala: _*)))) + // Add position column having the position of the row within the set of rows having the same key value ordered by the sorting value + .withColumn("position", functions.row_number().over(Window.partitionBy("key").orderBy(col(conf.getWf.getOrderField)))) + // filter out rows with position exceeding the maxqueuesize parameter + .filter(col("position").leq(conf.getWf.getQueueMaxSize)) + .groupBy("key") + .agg(functions.collect_set(functions.struct(rowDataType.fieldNames.map(col): _*)).as("block")) .filter(functions.size(new Column("block")).geq(new Literal(2, DataTypes.IntegerType))) if (relBlocks == null) relBlocks = ds @@ -224,6 +291,67 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria relBlocks } + val printAnalytics: (Dataset[Row] => Dataset[Row]) = df => { + + val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => { + if (conf.blacklists.containsKey(fdef.getName)) { + res.withColumn( + fdef.getName + "_filtered", + filterColumnUDF(fdef).apply(new Column(fdef.getName)) + ) + } else { + res + } + }) + + var relBlocks: Dataset[Row] = null + + import scala.collection.JavaConversions._ + + for (cd <- conf.clusterings()) { + val columns: util.List[Column] = new util.ArrayList[Column](cd.getFields().size) + + for (fName <- cd.getFields()) { + if (conf.blacklists.containsKey(fName)) + columns.add(new Column(fName + "_filtered")) + else + columns.add(new Column(fName)) + } + + // Add 'key' column with the value generated by the given clustering definition + val ds: Dataset[Row] = df_with_filters.withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(columns.asScala: _*)))) + // Add position column having the position of the row within the set of rows having the same key value ordered by the sorting value + .withColumn("position", functions.row_number().over(Window.partitionBy("key").orderBy(conf.getWf.getOrderField))) + // filter out rows with position exceeding the maxqueuesize parameter + .filter(col("position").lt(conf.getWf.getQueueMaxSize)) + + + // inner join to compute all combination of rows to compare + // note the condition on position to obtain 'windowing': given a row this is compared at most with the next + // SlidingWindowSize rows following the sort order + val dsWithMatch = ds.as("l").join(ds.as("r"), + col("l.key").equalTo(col("r.key")), + "inner" + ) + .filter((col("l.position").lt(col("r.position"))) + && (col("r.position").lt(col("l.position").plus(lit(conf.getWf.getSlidingWindowSize))))) + // Add match column with the result of comparison + // dsWithMatch.show(false) + + if (relBlocks == null) + relBlocks = dsWithMatch + else + relBlocks = relBlocks.union(dsWithMatch) + } + + System.out.println(conf.getWf.getEntityType + "::" + conf.getWf.getSubEntityType) + System.out.println("Total number of comparations: " + relBlocks.count()) + + + df + } + + val generateAndProcessClustersWithJoins: (Dataset[Row] => Dataset[Row]) = df => { val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => { @@ -287,7 +415,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria val res = relBlocks //.select(col("l.identifier").as("from"), col("r.identifier").as("to")) - .repartition() + //.repartition() .distinct() // res.show(false) @@ -301,29 +429,48 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria df.filter(functions.size(new Column("block")).geq(new Literal(2, DataTypes.IntegerType))) .withColumn("relations", processBlock(df.sqlContext.sparkContext).apply(new Column("block"))) .select(functions.explode(new Column("relations")).as("relation")) - .repartition(new Column("relation")) + //.repartition(new Column("relation")) .dropDuplicates("relation") } val rowDataType: StructType = { - val unordered = conf.getPace.getModel.asScala.foldLeft( - new StructType() - )((resType, fdef) => { - resType.add(fdef.getType match { - case Type.List | Type.JSON => - StructField(fdef.getName, DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty) - case Type.DoubleArray => - StructField(fdef.getName, DataTypes.createArrayType(DataTypes.DoubleType), true, Metadata.empty) - case _ => - StructField(fdef.getName, DataTypes.StringType, true, Metadata.empty) - }) - }) +// val unordered = conf.getPace.getModel.asScala.foldLeft( +// new StructType() +// )((resType, fdef) => { +// resType.add(fdef.getType match { +// case Type.List | Type.JSON => +// StructField(fdef.getName, DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty) +// case Type.DoubleArray => +// StructField(fdef.getName, DataTypes.createArrayType(DataTypes.DoubleType), true, Metadata.empty) +// case _ => +// StructField(fdef.getName, DataTypes.StringType, true, Metadata.empty) +// }) +// }) +// +// conf.getPace.getModel.asScala.filterNot(_.getName.equals(conf.getWf.getOrderField)).foldLeft( +// new StructType() +// .add(unordered(conf.getWf.getOrderField)) +// .add(StructField("identifier", DataTypes.StringType, false, Metadata.empty)) +// )((resType, fdef) => resType.add(unordered(fdef.getName))) + + val identifier = new FieldDef() + identifier.setName("identifier") + identifier.setType(Type.String) + + (conf.getPace.getModel.asScala ++ Seq(identifier)).sortBy(_.getName) + .foldLeft( + new StructType() + )((resType, fdef) => { + resType.add(fdef.getType match { + case Type.List | Type.JSON => + StructField(fdef.getName, DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty) + case Type.DoubleArray => + StructField(fdef.getName, DataTypes.createArrayType(DataTypes.DoubleType), true, Metadata.empty) + case _ => + StructField(fdef.getName, DataTypes.StringType, true, Metadata.empty) + }) + }) - conf.getPace.getModel.asScala.filterNot(_.getName.equals(conf.getWf.getOrderField)).foldLeft( - new StructType() - .add(unordered(conf.getWf.getOrderField)) - .add(StructField("identifier", DataTypes.StringType, false, Metadata.empty)) - )((resType, fdef) => resType.add(unordered(fdef.getName))) } @@ -332,7 +479,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria val orderingFieldPosition: Int = rowDataType.fieldIndex(conf.getWf.getOrderField) - val rowFromJson = (json: String) => { + def rowFromJson(json: String) : Row = { val documentContext = JsonPath.using(Configuration.defaultConfiguration.addOptions(Option.SUPPRESS_EXCEPTIONS)).parse(json) val values = new Array[Any](rowDataType.size) @@ -360,7 +507,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria MapDocumentUtil.truncateList( MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType), fdef.getSize - ) + ).toArray case Type.StringConcat => val jpaths = CONCAT_REGEX.split(fdef.getPath) @@ -384,7 +531,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria new GenericRowWithSchema(values, rowDataType) } - val rowFromJsonUDF = udf(rowFromJson, rowDataType) + val rowFromJsonUDF = udf(rowFromJson(_), rowDataType) def filterColumnUDF(fdef: FieldDef): UserDefinedFunction = { @@ -428,7 +575,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria new BlockProcessor(conf, identityFieldPosition, orderingFieldPosition).processSortedRows(mapDocuments, reporter) reporter.getRelations.asScala.toArray - }) + }).asNondeterministic() } val collectSortSliceAggregator : Aggregator[Row,Seq[Row], Row] = new Aggregator[Row, Seq[Row], Row] () { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DatePicker.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DatePicker.java index 9d767c4d2..8ed3c68b2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DatePicker.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DatePicker.java @@ -5,14 +5,14 @@ import static java.util.Collections.reverseOrder; import static java.util.Map.Entry.comparingByValue; import static java.util.stream.Collectors.toMap; -import static org.apache.commons.lang.StringUtils.endsWith; -import static org.apache.commons.lang.StringUtils.substringBefore; +import static org.apache.commons.lang3.StringUtils.endsWith; +import static org.apache.commons.lang3.StringUtils.substringBefore; import java.time.Year; import java.util.*; import java.util.stream.Collectors; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import eu.dnetlib.dhp.schema.oaf.Field; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java index 719bebfb6..9d8014dc9 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java @@ -10,6 +10,7 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.catalyst.expressions.Literal; import org.apache.spark.sql.types.DataTypes; @@ -99,7 +100,7 @@ public class SparkBlockStats extends AbstractSparkAction { .transform(sparkConfig.generateClusters()) .filter(functions.size(new Column("block")).geq(new Literal(1, DataTypes.IntegerType))); - simRels.map(b -> { + simRels.map((MapFunction) b -> { Collection documents = b.getList(1); List mapDocuments = documents diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index cd914e2df..5bc658212 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -93,7 +93,7 @@ public class SparkCreateSimRels extends AbstractSparkAction { .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .transform(sparkConfig.modelExtractor()) // Extract fields from input json column according to model // definition - .transform(sparkConfig.generateClustersWithDFAPIMerged()) // generate pairs according to + .transform(sparkConfig.generateClustersWithWindows()) // generate pairs according to // filters, clusters, and model // definition .transform(sparkConfig.processClusters()) // process blocks and emits pairs of found diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index 3e564052e..4a39a175d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -6,7 +6,7 @@ import java.io.Serializable; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.codehaus.jackson.annotate.JsonIgnore; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java index 461a1f947..c80d4d048 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java @@ -22,7 +22,7 @@ class JsonPathTest { final String org = IOUtils.toString(getClass().getResourceAsStream("organization.json")); - Row row = SparkDedupConfig.apply(conf, 1).rowFromJson().apply(org); + Row row = SparkDedupConfig.apply(conf, 1).rowFromJson(org); Assertions.assertNotNull(row); Assertions.assertTrue(StringUtils.isNotBlank(row.getAs("identifier"))); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala index 9d57e5869..704c9ab5c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala @@ -130,7 +130,7 @@ object SparkCreateInputGraph { val ds: Dataset[T] = spark.read.load(sourcePath).as[T] ds.groupByKey(_.getId) - .reduceGroups { (x, y) => + .reduceGroups { (x: T, y: T) => x.mergeFrom(y) x }