forked from D-Net/dnet-hadoop
Merge pull request 'blacklist filtering moved before the cleanup phase in order to have case sensitive regex' (#485) from dedup_blacklist_fix into beta
Reviewed-on: D-Net/dnet-hadoop#485
This commit is contained in:
commit
67e37f41fb
|
@ -38,7 +38,7 @@ public class NumAuthorsTitleSuffixPrefixChain extends AbstractClusteringFunction
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<String> doApply(Config conf, String s) {
|
protected Collection<String> doApply(Config conf, String s) {
|
||||||
return suffixPrefixChain(cleanup(s), param("mod"));
|
return suffixPrefixChain(cleanup(s), paramOrDefault("mod", 10));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Collection<String> suffixPrefixChain(String s, int mod) {
|
private Collection<String> suffixPrefixChain(String s, int mod) {
|
||||||
|
|
|
@ -54,6 +54,22 @@ public class FieldDef implements Serializable {
|
||||||
public FieldDef() {
|
public FieldDef() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public FieldDef clone() {
|
||||||
|
FieldDef fieldDef = new FieldDef();
|
||||||
|
fieldDef.setName(this.name);
|
||||||
|
fieldDef.setPath(this.path);
|
||||||
|
fieldDef.setType(this.type);
|
||||||
|
fieldDef.setOverrideMatch(this.overrideMatch);
|
||||||
|
fieldDef.setSize(this.size);
|
||||||
|
fieldDef.setLength(this.length);
|
||||||
|
fieldDef.setFilter(this.filter);
|
||||||
|
fieldDef.setSorted(this.sorted);
|
||||||
|
fieldDef.setClean(this.clean);
|
||||||
|
fieldDef.setInfer(this.infer);
|
||||||
|
fieldDef.setInferenceFrom(this.inferenceFrom);
|
||||||
|
return fieldDef;
|
||||||
|
}
|
||||||
|
|
||||||
public String getInferenceFrom() {
|
public String getInferenceFrom() {
|
||||||
return inferenceFrom;
|
return inferenceFrom;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,48 +19,10 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable {
|
||||||
val model: SparkModel = SparkModel(conf)
|
val model: SparkModel = SparkModel(conf)
|
||||||
|
|
||||||
val dedup: (Dataset[Row] => Dataset[Row]) = df => {
|
val dedup: (Dataset[Row] => Dataset[Row]) = df => {
|
||||||
df.transform(filterAndCleanup)
|
df.transform(generateClustersWithCollect)
|
||||||
.transform(generateClustersWithCollect)
|
|
||||||
.transform(processBlocks)
|
.transform(processBlocks)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
val filterAndCleanup: (Dataset[Row] => Dataset[Row]) = df => {
|
|
||||||
val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => {
|
|
||||||
if (conf.blacklists.containsKey(fdef.getName)) {
|
|
||||||
res.withColumn(
|
|
||||||
fdef.getName + "_filtered",
|
|
||||||
filterColumnUDF(fdef).apply(new Column(fdef.getName))
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
res
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
df_with_filters
|
|
||||||
}
|
|
||||||
|
|
||||||
def filterColumnUDF(fdef: FieldDef): UserDefinedFunction = {
|
|
||||||
val blacklist: Predicate[String] = conf.blacklists().get(fdef.getName)
|
|
||||||
|
|
||||||
if (blacklist == null) {
|
|
||||||
throw new IllegalArgumentException("Column: " + fdef.getName + " does not have any filter")
|
|
||||||
} else {
|
|
||||||
fdef.getType match {
|
|
||||||
case Type.List | Type.JSON =>
|
|
||||||
udf[Array[String], Array[String]](values => {
|
|
||||||
values.filter((v: String) => !blacklist.test(v))
|
|
||||||
})
|
|
||||||
|
|
||||||
case _ =>
|
|
||||||
udf[String, String](v => {
|
|
||||||
if (blacklist.test(v)) ""
|
|
||||||
else v
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
val generateClustersWithCollect: (Dataset[Row] => Dataset[Row]) = df_with_filters => {
|
val generateClustersWithCollect: (Dataset[Row] => Dataset[Row]) = df_with_filters => {
|
||||||
var df_with_clustering_keys: Dataset[Row] = null
|
var df_with_clustering_keys: Dataset[Row] = null
|
||||||
|
|
||||||
|
|
|
@ -5,12 +5,12 @@ import eu.dnetlib.pace.common.AbstractPaceFunctions
|
||||||
import eu.dnetlib.pace.config.{DedupConfig, Type}
|
import eu.dnetlib.pace.config.{DedupConfig, Type}
|
||||||
import eu.dnetlib.pace.util.{MapDocumentUtil, SparkCompatUtils}
|
import eu.dnetlib.pace.util.{MapDocumentUtil, SparkCompatUtils}
|
||||||
import org.apache.commons.lang3.StringUtils
|
import org.apache.commons.lang3.StringUtils
|
||||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder
|
|
||||||
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
|
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
|
||||||
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
|
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
|
||||||
import org.apache.spark.sql.{Dataset, Row}
|
import org.apache.spark.sql.{Dataset, Row}
|
||||||
|
|
||||||
import java.util.Locale
|
import java.util.Locale
|
||||||
|
import java.util.function.Predicate
|
||||||
import java.util.regex.Pattern
|
import java.util.regex.Pattern
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
@ -29,8 +29,20 @@ case class SparkModel(conf: DedupConfig) {
|
||||||
identifier.setName(identifierFieldName)
|
identifier.setName(identifierFieldName)
|
||||||
identifier.setType(Type.String)
|
identifier.setType(Type.String)
|
||||||
|
|
||||||
|
// create fields for blacklist
|
||||||
|
val filtered = conf.getPace.getModel.asScala.flatMap(fdef => {
|
||||||
|
if (conf.blacklists().containsKey(fdef.getName)) {
|
||||||
|
val fdef_filtered = fdef.clone()
|
||||||
|
fdef_filtered.setName(fdef.getName + "_filtered")
|
||||||
|
Seq(fdef, fdef_filtered)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Seq(fdef)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
// Construct a Spark StructType representing the schema of the model
|
// Construct a Spark StructType representing the schema of the model
|
||||||
(Seq(identifier) ++ conf.getPace.getModel.asScala)
|
(Seq(identifier) ++ filtered)
|
||||||
.foldLeft(
|
.foldLeft(
|
||||||
new StructType()
|
new StructType()
|
||||||
)((resType, fieldDef) => {
|
)((resType, fieldDef) => {
|
||||||
|
@ -44,7 +56,6 @@ case class SparkModel(conf: DedupConfig) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val identityFieldPosition: Int = schema.fieldIndex(identifierFieldName)
|
val identityFieldPosition: Int = schema.fieldIndex(identifierFieldName)
|
||||||
|
@ -52,7 +63,8 @@ case class SparkModel(conf: DedupConfig) {
|
||||||
val orderingFieldPosition: Int = schema.fieldIndex(orderingFieldName)
|
val orderingFieldPosition: Int = schema.fieldIndex(orderingFieldName)
|
||||||
|
|
||||||
val parseJsonDataset: (Dataset[String] => Dataset[Row]) = df => {
|
val parseJsonDataset: (Dataset[String] => Dataset[Row]) = df => {
|
||||||
df.map(r => rowFromJson(r))(SparkCompatUtils.encoderFor(schema))
|
df
|
||||||
|
.map(r => rowFromJson(r))(SparkCompatUtils.encoderFor(schema))
|
||||||
}
|
}
|
||||||
|
|
||||||
def rowFromJson(json: String): Row = {
|
def rowFromJson(json: String): Row = {
|
||||||
|
@ -64,9 +76,11 @@ case class SparkModel(conf: DedupConfig) {
|
||||||
|
|
||||||
schema.fieldNames.zipWithIndex.foldLeft(values) {
|
schema.fieldNames.zipWithIndex.foldLeft(values) {
|
||||||
case ((res, (fname, index))) =>
|
case ((res, (fname, index))) =>
|
||||||
val fdef = conf.getPace.getModelMap.get(fname)
|
|
||||||
|
val fdef = conf.getPace.getModelMap.get(fname.split("_filtered")(0))
|
||||||
|
|
||||||
if (fdef != null) {
|
if (fdef != null) {
|
||||||
|
if (!fname.contains("_filtered")) { //process fields with no blacklist
|
||||||
res(index) = fdef.getType match {
|
res(index) = fdef.getType match {
|
||||||
case Type.String | Type.Int =>
|
case Type.String | Type.Int =>
|
||||||
MapDocumentUtil.truncateValue(
|
MapDocumentUtil.truncateValue(
|
||||||
|
@ -99,6 +113,26 @@ case class SparkModel(conf: DedupConfig) {
|
||||||
case Type.DoubleArray =>
|
case Type.DoubleArray =>
|
||||||
MapDocumentUtil.getJPathArray(fdef.getPath, json)
|
MapDocumentUtil.getJPathArray(fdef.getPath, json)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
else { //process fields with blacklist
|
||||||
|
val blacklist: Predicate[String] = conf.blacklists().get(fdef.getName)
|
||||||
|
|
||||||
|
res(index) = fdef.getType match {
|
||||||
|
case Type.List | Type.JSON =>
|
||||||
|
MapDocumentUtil.truncateList(
|
||||||
|
MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
|
||||||
|
fdef.getSize
|
||||||
|
).asScala.filter((v: String) => !blacklist.test(v))
|
||||||
|
|
||||||
|
case _ =>
|
||||||
|
val value: String = MapDocumentUtil.truncateValue(
|
||||||
|
MapDocumentUtil.getJPathString(fdef.getPath, documentContext),
|
||||||
|
fdef.getLength
|
||||||
|
)
|
||||||
|
if (blacklist.test(value)) "" else value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
val filter = fdef.getFilter
|
val filter = fdef.getFilter
|
||||||
|
|
||||||
|
@ -131,7 +165,6 @@ case class SparkModel(conf: DedupConfig) {
|
||||||
case _ => inference(res(index).toString, MapDocumentUtil.getJPathString(inferFrom, documentContext), fdef.getInfer)
|
case _ => inference(res(index).toString, MapDocumentUtil.getJPathString(inferFrom, documentContext), fdef.getInfer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res
|
res
|
||||||
|
@ -139,6 +172,7 @@ case class SparkModel(conf: DedupConfig) {
|
||||||
}
|
}
|
||||||
|
|
||||||
new GenericRowWithSchema(values, schema)
|
new GenericRowWithSchema(values, schema)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def clean(value: String, cleantype: String) : String = {
|
def clean(value: String, cleantype: String) : String = {
|
||||||
|
|
|
@ -227,4 +227,17 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
|
||||||
System.out.println(cf.apply(conf, Lists.newArrayList(s)));
|
System.out.println(cf.apply(conf, Lists.newArrayList(s)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNumAuthorsTitleSuffixPrefixChain() {
|
||||||
|
|
||||||
|
final ClusteringFunction cf = new NumAuthorsTitleSuffixPrefixChain(params);
|
||||||
|
params.put("mod", 10);
|
||||||
|
|
||||||
|
final String title = "PARP-2 Regulates SIRT1 Expression and Whole-Body Energy Expenditure";
|
||||||
|
final String num_authors = "10";
|
||||||
|
System.out.println("title = " + title);
|
||||||
|
System.out.println("num_authors = " + num_authors);
|
||||||
|
System.out.println(cf.apply(conf, Lists.newArrayList(num_authors, title)));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -327,4 +327,16 @@ public class ComparatorTest extends AbstractPaceTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void titleVersionMatchTest() {
|
||||||
|
|
||||||
|
TitleVersionMatch titleVersionMatch = new TitleVersionMatch(params);
|
||||||
|
|
||||||
|
double result = titleVersionMatch
|
||||||
|
.compare(
|
||||||
|
"parp 2 regulates sirt 1 expression and whole body energy expenditure",
|
||||||
|
"parp 2 regulates sirt 1 expression and whole body energy expenditure", conf);
|
||||||
|
assertEquals(1.0, result);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,6 @@ public class SparkBlockStats extends AbstractSparkAction {
|
||||||
.read()
|
.read()
|
||||||
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||||
.transform(deduper.model().parseJsonDataset())
|
.transform(deduper.model().parseJsonDataset())
|
||||||
.transform(deduper.filterAndCleanup())
|
|
||||||
.transform(deduper.generateClustersWithCollect())
|
.transform(deduper.generateClustersWithCollect())
|
||||||
.filter(functions.size(new Column("block")).geq(1));
|
.filter(functions.size(new Column("block")).geq(1));
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,7 @@
|
||||||
"aggregation": "MAX",
|
"aggregation": "MAX",
|
||||||
"positive": "layer4",
|
"positive": "layer4",
|
||||||
"negative": "NO_MATCH",
|
"negative": "NO_MATCH",
|
||||||
"undefined": "MATCH",
|
"undefined": "layer4",
|
||||||
"ignoreUndefined": "true"
|
"ignoreUndefined": "true"
|
||||||
},
|
},
|
||||||
"layer4": {
|
"layer4": {
|
||||||
|
|
Loading…
Reference in New Issue