diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NumAuthorsTitleSuffixPrefixChain.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NumAuthorsTitleSuffixPrefixChain.java index f1d1e17b90..4e6d8231fc 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NumAuthorsTitleSuffixPrefixChain.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NumAuthorsTitleSuffixPrefixChain.java @@ -38,7 +38,7 @@ public class NumAuthorsTitleSuffixPrefixChain extends AbstractClusteringFunction @Override protected Collection doApply(Config conf, String s) { - return suffixPrefixChain(cleanup(s), param("mod")); + return suffixPrefixChain(cleanup(s), paramOrDefault("mod", 10)); } private Collection suffixPrefixChain(String s, int mod) { diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java index b0dc116566..2e329f6901 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java @@ -54,6 +54,22 @@ public class FieldDef implements Serializable { public FieldDef() { } + public FieldDef clone() { + FieldDef fieldDef = new FieldDef(); + fieldDef.setName(this.name); + fieldDef.setPath(this.path); + fieldDef.setType(this.type); + fieldDef.setOverrideMatch(this.overrideMatch); + fieldDef.setSize(this.size); + fieldDef.setLength(this.length); + fieldDef.setFilter(this.filter); + fieldDef.setSorted(this.sorted); + fieldDef.setClean(this.clean); + fieldDef.setInfer(this.infer); + fieldDef.setInferenceFrom(this.inferenceFrom); + return fieldDef; + } + public String getInferenceFrom() { return inferenceFrom; } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala index bc702b9e2d..a3eb3cba8e 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala @@ -19,48 +19,10 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable { val model: SparkModel = SparkModel(conf) val dedup: (Dataset[Row] => Dataset[Row]) = df => { - df.transform(filterAndCleanup) - .transform(generateClustersWithCollect) + df.transform(generateClustersWithCollect) .transform(processBlocks) } - - val filterAndCleanup: (Dataset[Row] => Dataset[Row]) = df => { - val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => { - if (conf.blacklists.containsKey(fdef.getName)) { - res.withColumn( - fdef.getName + "_filtered", - filterColumnUDF(fdef).apply(new Column(fdef.getName)) - ) - } else { - res - } - }) - - df_with_filters - } - - def filterColumnUDF(fdef: FieldDef): UserDefinedFunction = { - val blacklist: Predicate[String] = conf.blacklists().get(fdef.getName) - - if (blacklist == null) { - throw new IllegalArgumentException("Column: " + fdef.getName + " does not have any filter") - } else { - fdef.getType match { - case Type.List | Type.JSON => - udf[Array[String], Array[String]](values => { - values.filter((v: String) => !blacklist.test(v)) - }) - - case _ => - udf[String, String](v => { - if (blacklist.test(v)) "" - else v - }) - } - } - } - val generateClustersWithCollect: (Dataset[Row] => Dataset[Row]) = df_with_filters => { var df_with_clustering_keys: Dataset[Row] = null diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala index c6db623398..580a88b7ef 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala @@ -5,12 +5,12 @@ import eu.dnetlib.pace.common.AbstractPaceFunctions import eu.dnetlib.pace.config.{DedupConfig, Type} import eu.dnetlib.pace.util.{MapDocumentUtil, SparkCompatUtils} import org.apache.commons.lang3.StringUtils -import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} import org.apache.spark.sql.{Dataset, Row} import java.util.Locale +import java.util.function.Predicate import java.util.regex.Pattern import scala.collection.JavaConverters._ @@ -29,8 +29,20 @@ case class SparkModel(conf: DedupConfig) { identifier.setName(identifierFieldName) identifier.setType(Type.String) + // create fields for blacklist + val filtered = conf.getPace.getModel.asScala.flatMap(fdef => { + if (conf.blacklists().containsKey(fdef.getName)) { + val fdef_filtered = fdef.clone() + fdef_filtered.setName(fdef.getName + "_filtered") + Seq(fdef, fdef_filtered) + } + else { + Seq(fdef) + } + }) + // Construct a Spark StructType representing the schema of the model - (Seq(identifier) ++ conf.getPace.getModel.asScala) + (Seq(identifier) ++ filtered) .foldLeft( new StructType() )((resType, fieldDef) => { @@ -44,7 +56,6 @@ case class SparkModel(conf: DedupConfig) { }) }) - } val identityFieldPosition: Int = schema.fieldIndex(identifierFieldName) @@ -52,7 +63,8 @@ case class SparkModel(conf: DedupConfig) { val orderingFieldPosition: Int = schema.fieldIndex(orderingFieldName) val parseJsonDataset: (Dataset[String] => Dataset[Row]) = df => { - df.map(r => rowFromJson(r))(SparkCompatUtils.encoderFor(schema)) + df + .map(r => rowFromJson(r))(SparkCompatUtils.encoderFor(schema)) } def rowFromJson(json: String): Row = { @@ -64,41 +76,63 @@ case class SparkModel(conf: DedupConfig) { schema.fieldNames.zipWithIndex.foldLeft(values) { case ((res, (fname, index))) => - val fdef = conf.getPace.getModelMap.get(fname) + + val fdef = conf.getPace.getModelMap.get(fname.split("_filtered")(0)) if (fdef != null) { - res(index) = fdef.getType match { - case Type.String | Type.Int => - MapDocumentUtil.truncateValue( - MapDocumentUtil.getJPathString(fdef.getPath, documentContext), - fdef.getLength - ) + if (!fname.contains("_filtered")) { //process fields with no blacklist + res(index) = fdef.getType match { + case Type.String | Type.Int => + MapDocumentUtil.truncateValue( + MapDocumentUtil.getJPathString(fdef.getPath, documentContext), + fdef.getLength + ) - case Type.URL => - var uv = MapDocumentUtil.getJPathString(fdef.getPath, documentContext) - if (!URL_REGEX.matcher(uv).matches) - uv = "" - uv + case Type.URL => + var uv = MapDocumentUtil.getJPathString(fdef.getPath, documentContext) + if (!URL_REGEX.matcher(uv).matches) + uv = "" + uv - case Type.List | Type.JSON => - MapDocumentUtil.truncateList( - MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType), - fdef.getSize - ).asScala + case Type.List | Type.JSON => + MapDocumentUtil.truncateList( + MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType), + fdef.getSize + ).asScala - case Type.StringConcat => - val jpaths = CONCAT_REGEX.split(fdef.getPath) + case Type.StringConcat => + val jpaths = CONCAT_REGEX.split(fdef.getPath) - MapDocumentUtil.truncateValue( - jpaths - .map(jpath => MapDocumentUtil.getJPathString(jpath, documentContext)) - .mkString(" "), - fdef.getLength - ) + MapDocumentUtil.truncateValue( + jpaths + .map(jpath => MapDocumentUtil.getJPathString(jpath, documentContext)) + .mkString(" "), + fdef.getLength + ) - case Type.DoubleArray => - MapDocumentUtil.getJPathArray(fdef.getPath, json) + case Type.DoubleArray => + MapDocumentUtil.getJPathArray(fdef.getPath, json) + } } + else { //process fields with blacklist + val blacklist: Predicate[String] = conf.blacklists().get(fdef.getName) + + res(index) = fdef.getType match { + case Type.List | Type.JSON => + MapDocumentUtil.truncateList( + MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType), + fdef.getSize + ).asScala.filter((v: String) => !blacklist.test(v)) + + case _ => + val value: String = MapDocumentUtil.truncateValue( + MapDocumentUtil.getJPathString(fdef.getPath, documentContext), + fdef.getLength + ) + if (blacklist.test(value)) "" else value + } + } + val filter = fdef.getFilter @@ -125,13 +159,12 @@ case class SparkModel(conf: DedupConfig) { } if (StringUtils.isNotBlank(fdef.getInfer)) { - val inferFrom : String = if (StringUtils.isNotBlank(fdef.getInferenceFrom)) fdef.getInferenceFrom else fdef.getPath + val inferFrom: String = if (StringUtils.isNotBlank(fdef.getInferenceFrom)) fdef.getInferenceFrom else fdef.getPath res(index) = res(index) match { case x: Seq[String] => x.map(inference(_, MapDocumentUtil.getJPathString(inferFrom, documentContext), fdef.getInfer)) case _ => inference(res(index).toString, MapDocumentUtil.getJPathString(inferFrom, documentContext), fdef.getInfer) } } - } res @@ -139,6 +172,7 @@ case class SparkModel(conf: DedupConfig) { } new GenericRowWithSchema(values, schema) + } def clean(value: String, cleantype: String) : String = { diff --git a/dhp-pace-core/src/test/java/eu/dnetlib/pace/clustering/ClusteringFunctionTest.java b/dhp-pace-core/src/test/java/eu/dnetlib/pace/clustering/ClusteringFunctionTest.java index e62f742f8a..236f17ecaf 100644 --- a/dhp-pace-core/src/test/java/eu/dnetlib/pace/clustering/ClusteringFunctionTest.java +++ b/dhp-pace-core/src/test/java/eu/dnetlib/pace/clustering/ClusteringFunctionTest.java @@ -227,4 +227,17 @@ public class ClusteringFunctionTest extends AbstractPaceTest { System.out.println(cf.apply(conf, Lists.newArrayList(s))); } + @Test + public void testNumAuthorsTitleSuffixPrefixChain() { + + final ClusteringFunction cf = new NumAuthorsTitleSuffixPrefixChain(params); + params.put("mod", 10); + + final String title = "PARP-2 Regulates SIRT1 Expression and Whole-Body Energy Expenditure"; + final String num_authors = "10"; + System.out.println("title = " + title); + System.out.println("num_authors = " + num_authors); + System.out.println(cf.apply(conf, Lists.newArrayList(num_authors, title))); + } + } diff --git a/dhp-pace-core/src/test/java/eu/dnetlib/pace/comparators/ComparatorTest.java b/dhp-pace-core/src/test/java/eu/dnetlib/pace/comparators/ComparatorTest.java index c008902c4d..d2e83e6955 100644 --- a/dhp-pace-core/src/test/java/eu/dnetlib/pace/comparators/ComparatorTest.java +++ b/dhp-pace-core/src/test/java/eu/dnetlib/pace/comparators/ComparatorTest.java @@ -327,4 +327,16 @@ public class ComparatorTest extends AbstractPaceTest { } + @Test + public void titleVersionMatchTest() { + + TitleVersionMatch titleVersionMatch = new TitleVersionMatch(params); + + double result = titleVersionMatch + .compare( + "parp 2 regulates sirt 1 expression and whole body energy expenditure", + "parp 2 regulates sirt 1 expression and whole body energy expenditure", conf); + assertEquals(1.0, result); + } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java index 3e5215d420..612a1cb193 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java @@ -91,7 +91,6 @@ public class SparkBlockStats extends AbstractSparkAction { .read() .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .transform(deduper.model().parseJsonDataset()) - .transform(deduper.filterAndCleanup()) .transform(deduper.generateClustersWithCollect()) .filter(functions.size(new Column("block")).geq(1)); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json index c3a769874a..c5ff1c1fa5 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json @@ -96,7 +96,7 @@ "aggregation": "MAX", "positive": "layer4", "negative": "NO_MATCH", - "undefined": "MATCH", + "undefined": "layer4", "ignoreUndefined": "true" }, "layer4": {