blacklist filtering moved before the cleanup phase in order to have case sensitive regex #485

Merged
claudio.atzori merged 1 commits from dedup_blacklist_fix into beta 2024-10-28 09:42:52 +01:00
8 changed files with 111 additions and 75 deletions

View File

@ -38,7 +38,7 @@ public class NumAuthorsTitleSuffixPrefixChain extends AbstractClusteringFunction
@Override
protected Collection<String> doApply(Config conf, String s) {
return suffixPrefixChain(cleanup(s), param("mod"));
return suffixPrefixChain(cleanup(s), paramOrDefault("mod", 10));
}
private Collection<String> suffixPrefixChain(String s, int mod) {

View File

@ -54,6 +54,22 @@ public class FieldDef implements Serializable {
public FieldDef() {
}
public FieldDef clone() {
FieldDef fieldDef = new FieldDef();
fieldDef.setName(this.name);
fieldDef.setPath(this.path);
fieldDef.setType(this.type);
fieldDef.setOverrideMatch(this.overrideMatch);
fieldDef.setSize(this.size);
fieldDef.setLength(this.length);
fieldDef.setFilter(this.filter);
fieldDef.setSorted(this.sorted);
fieldDef.setClean(this.clean);
fieldDef.setInfer(this.infer);
fieldDef.setInferenceFrom(this.inferenceFrom);
return fieldDef;
}
public String getInferenceFrom() {
return inferenceFrom;
}

View File

@ -19,48 +19,10 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable {
val model: SparkModel = SparkModel(conf)
val dedup: (Dataset[Row] => Dataset[Row]) = df => {
df.transform(filterAndCleanup)
.transform(generateClustersWithCollect)
df.transform(generateClustersWithCollect)
.transform(processBlocks)
}
val filterAndCleanup: (Dataset[Row] => Dataset[Row]) = df => {
val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => {
if (conf.blacklists.containsKey(fdef.getName)) {
res.withColumn(
fdef.getName + "_filtered",
filterColumnUDF(fdef).apply(new Column(fdef.getName))
)
} else {
res
}
})
df_with_filters
}
def filterColumnUDF(fdef: FieldDef): UserDefinedFunction = {
val blacklist: Predicate[String] = conf.blacklists().get(fdef.getName)
if (blacklist == null) {
throw new IllegalArgumentException("Column: " + fdef.getName + " does not have any filter")
} else {
fdef.getType match {
case Type.List | Type.JSON =>
udf[Array[String], Array[String]](values => {
values.filter((v: String) => !blacklist.test(v))
})
case _ =>
udf[String, String](v => {
if (blacklist.test(v)) ""
else v
})
}
}
}
val generateClustersWithCollect: (Dataset[Row] => Dataset[Row]) = df_with_filters => {
var df_with_clustering_keys: Dataset[Row] = null

View File

@ -5,12 +5,12 @@ import eu.dnetlib.pace.common.AbstractPaceFunctions
import eu.dnetlib.pace.config.{DedupConfig, Type}
import eu.dnetlib.pace.util.{MapDocumentUtil, SparkCompatUtils}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import org.apache.spark.sql.{Dataset, Row}
import java.util.Locale
import java.util.function.Predicate
import java.util.regex.Pattern
import scala.collection.JavaConverters._
@ -29,8 +29,20 @@ case class SparkModel(conf: DedupConfig) {
identifier.setName(identifierFieldName)
identifier.setType(Type.String)
// create fields for blacklist
val filtered = conf.getPace.getModel.asScala.flatMap(fdef => {
if (conf.blacklists().containsKey(fdef.getName)) {
val fdef_filtered = fdef.clone()
fdef_filtered.setName(fdef.getName + "_filtered")
Seq(fdef, fdef_filtered)
}
else {
Seq(fdef)
}
})
// Construct a Spark StructType representing the schema of the model
(Seq(identifier) ++ conf.getPace.getModel.asScala)
(Seq(identifier) ++ filtered)
.foldLeft(
new StructType()
)((resType, fieldDef) => {
@ -44,7 +56,6 @@ case class SparkModel(conf: DedupConfig) {
})
})
}
val identityFieldPosition: Int = schema.fieldIndex(identifierFieldName)
@ -52,7 +63,8 @@ case class SparkModel(conf: DedupConfig) {
val orderingFieldPosition: Int = schema.fieldIndex(orderingFieldName)
val parseJsonDataset: (Dataset[String] => Dataset[Row]) = df => {
df.map(r => rowFromJson(r))(SparkCompatUtils.encoderFor(schema))
df
.map(r => rowFromJson(r))(SparkCompatUtils.encoderFor(schema))
}
def rowFromJson(json: String): Row = {
@ -64,41 +76,63 @@ case class SparkModel(conf: DedupConfig) {
schema.fieldNames.zipWithIndex.foldLeft(values) {
case ((res, (fname, index))) =>
val fdef = conf.getPace.getModelMap.get(fname)
val fdef = conf.getPace.getModelMap.get(fname.split("_filtered")(0))
if (fdef != null) {
res(index) = fdef.getType match {
case Type.String | Type.Int =>
MapDocumentUtil.truncateValue(
MapDocumentUtil.getJPathString(fdef.getPath, documentContext),
fdef.getLength
)
if (!fname.contains("_filtered")) { //process fields with no blacklist
res(index) = fdef.getType match {
case Type.String | Type.Int =>
MapDocumentUtil.truncateValue(
MapDocumentUtil.getJPathString(fdef.getPath, documentContext),
fdef.getLength
)
case Type.URL =>
var uv = MapDocumentUtil.getJPathString(fdef.getPath, documentContext)
if (!URL_REGEX.matcher(uv).matches)
uv = ""
uv
case Type.URL =>
var uv = MapDocumentUtil.getJPathString(fdef.getPath, documentContext)
if (!URL_REGEX.matcher(uv).matches)
uv = ""
uv
case Type.List | Type.JSON =>
MapDocumentUtil.truncateList(
MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
fdef.getSize
).asScala
case Type.List | Type.JSON =>
MapDocumentUtil.truncateList(
MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
fdef.getSize
).asScala
case Type.StringConcat =>
val jpaths = CONCAT_REGEX.split(fdef.getPath)
case Type.StringConcat =>
val jpaths = CONCAT_REGEX.split(fdef.getPath)
MapDocumentUtil.truncateValue(
jpaths
.map(jpath => MapDocumentUtil.getJPathString(jpath, documentContext))
.mkString(" "),
fdef.getLength
)
MapDocumentUtil.truncateValue(
jpaths
.map(jpath => MapDocumentUtil.getJPathString(jpath, documentContext))
.mkString(" "),
fdef.getLength
)
case Type.DoubleArray =>
MapDocumentUtil.getJPathArray(fdef.getPath, json)
case Type.DoubleArray =>
MapDocumentUtil.getJPathArray(fdef.getPath, json)
}
}
else { //process fields with blacklist
val blacklist: Predicate[String] = conf.blacklists().get(fdef.getName)
res(index) = fdef.getType match {
case Type.List | Type.JSON =>
MapDocumentUtil.truncateList(
MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
fdef.getSize
).asScala.filter((v: String) => !blacklist.test(v))
case _ =>
val value: String = MapDocumentUtil.truncateValue(
MapDocumentUtil.getJPathString(fdef.getPath, documentContext),
fdef.getLength
)
if (blacklist.test(value)) "" else value
}
}
val filter = fdef.getFilter
@ -125,13 +159,12 @@ case class SparkModel(conf: DedupConfig) {
}
if (StringUtils.isNotBlank(fdef.getInfer)) {
val inferFrom : String = if (StringUtils.isNotBlank(fdef.getInferenceFrom)) fdef.getInferenceFrom else fdef.getPath
val inferFrom: String = if (StringUtils.isNotBlank(fdef.getInferenceFrom)) fdef.getInferenceFrom else fdef.getPath
res(index) = res(index) match {
case x: Seq[String] => x.map(inference(_, MapDocumentUtil.getJPathString(inferFrom, documentContext), fdef.getInfer))
case _ => inference(res(index).toString, MapDocumentUtil.getJPathString(inferFrom, documentContext), fdef.getInfer)
}
}
}
res
@ -139,6 +172,7 @@ case class SparkModel(conf: DedupConfig) {
}
new GenericRowWithSchema(values, schema)
}
def clean(value: String, cleantype: String) : String = {

View File

@ -227,4 +227,17 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
System.out.println(cf.apply(conf, Lists.newArrayList(s)));
}
@Test
public void testNumAuthorsTitleSuffixPrefixChain() {
final ClusteringFunction cf = new NumAuthorsTitleSuffixPrefixChain(params);
params.put("mod", 10);
final String title = "PARP-2 Regulates SIRT1 Expression and Whole-Body Energy Expenditure";
final String num_authors = "10";
System.out.println("title = " + title);
System.out.println("num_authors = " + num_authors);
System.out.println(cf.apply(conf, Lists.newArrayList(num_authors, title)));
}
}

View File

@ -327,4 +327,16 @@ public class ComparatorTest extends AbstractPaceTest {
}
@Test
public void titleVersionMatchTest() {
TitleVersionMatch titleVersionMatch = new TitleVersionMatch(params);
double result = titleVersionMatch
.compare(
"parp 2 regulates sirt 1 expression and whole body energy expenditure",
"parp 2 regulates sirt 1 expression and whole body energy expenditure", conf);
assertEquals(1.0, result);
}
}

View File

@ -91,7 +91,6 @@ public class SparkBlockStats extends AbstractSparkAction {
.read()
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.transform(deduper.model().parseJsonDataset())
.transform(deduper.filterAndCleanup())
.transform(deduper.generateClustersWithCollect())
.filter(functions.size(new Column("block")).geq(1));

View File

@ -96,7 +96,7 @@
"aggregation": "MAX",
"positive": "layer4",
"negative": "NO_MATCH",
"undefined": "MATCH",
"undefined": "layer4",
"ignoreUndefined": "true"
},
"layer4": {