Improved deduplication

This commit is contained in:
Sandro La Bruzzo 2019-12-05 14:14:25 +01:00
parent 075f741d28
commit 16c670a5d5
14 changed files with 392 additions and 48 deletions

View File

@ -6,7 +6,7 @@
<parent>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-dedup</artifactId>
<version>3.0.16-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@ -38,10 +38,6 @@
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>stringtemplate</artifactId>
@ -65,16 +61,23 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>com.arakelian</groupId>
<artifactId>java-jq</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,25 +1,23 @@
package eu.dnetlib.pace.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import eu.dnetlib.pace.condition.ConditionAlgo;
import eu.dnetlib.pace.model.ClusteringDef;
import eu.dnetlib.pace.model.FieldDef;
import eu.dnetlib.pace.util.PaceException;
import org.antlr.stringtemplate.StringTemplate;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.BiFunction;
import eu.dnetlib.pace.util.PaceException;
import org.antlr.stringtemplate.StringTemplate;
import org.apache.commons.io.IOUtils;
import com.google.common.collect.Maps;
import eu.dnetlib.pace.condition.ConditionAlgo;
import eu.dnetlib.pace.model.ClusteringDef;
import eu.dnetlib.pace.model.FieldDef;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.jackson.map.ObjectMapper;
public class DedupConfig implements Config, Serializable {

View File

@ -1,15 +1,14 @@
package eu.dnetlib.pace.config;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import eu.dnetlib.pace.common.AbstractPaceFunctions;
import eu.dnetlib.pace.condition.ConditionAlgo;
import eu.dnetlib.pace.model.ClusteringDef;
import eu.dnetlib.pace.model.CondDef;
import eu.dnetlib.pace.model.FieldDef;
import eu.dnetlib.pace.util.PaceResolver;
import org.apache.commons.collections.CollectionUtils;
import org.codehaus.jackson.annotate.JsonIgnore;
import java.io.Serializable;
import java.text.Normalizer;

View File

@ -1,17 +1,17 @@
package eu.dnetlib.pace.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.pace.util.PaceException;
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.GsonBuilder;
import eu.dnetlib.pace.util.PaceException;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
public class WfConfig implements Serializable {
@ -76,6 +76,9 @@ public class WfConfig implements Serializable {
/** Maximum number of allowed children. */
private int maxChildren = MAX_CHILDREN;
/** The Jquery path to retrieve the identifier */
private String idPath = ".id";
public WfConfig() {}
/**
@ -245,6 +248,14 @@ public class WfConfig implements Serializable {
this.maxChildren = maxChildren;
}
public String getIdPath() {
return idPath;
}
public void setIdPath(String idPath) {
this.idPath = idPath;
}
/*
* (non-Javadoc)
*

View File

@ -1,7 +1,8 @@
package eu.dnetlib.pace.distance.eval;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.pace.util.PaceException;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;

View File

@ -1,19 +1,15 @@
package eu.dnetlib.pace.model;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.pace.clustering.ClusteringFunction;
import eu.dnetlib.pace.config.PaceConfig;
import eu.dnetlib.pace.util.PaceException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import eu.dnetlib.pace.clustering.*;
import eu.dnetlib.pace.config.PaceConfig;
import eu.dnetlib.pace.util.PaceException;
import eu.dnetlib.pace.util.PaceResolver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.jackson.map.ObjectMapper;
public class ClusteringDef implements Serializable {

View File

@ -4,11 +4,12 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.pace.condition.*;
import eu.dnetlib.pace.config.PaceConfig;
import eu.dnetlib.pace.util.PaceException;
import eu.dnetlib.pace.util.PaceResolver;
import org.codehaus.jackson.map.ObjectMapper;
public class CondDef implements Serializable {

View File

@ -1,7 +1,8 @@
package eu.dnetlib.pace.model;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.pace.util.PaceException;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;

View File

@ -36,6 +36,18 @@ public class BlockProcessor {
this.dedupConf = dedupConf;
}
public void processSortedBlock(final String key, final List<MapDocument> documents, final Reporter context) {
if (documents.size() > 1) {
// log.info("reducing key: '" + key + "' records: " + q.size());
//process(q, context);
process(prepare(documents), context);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
}
}
public void process(final String key, final Iterable<MapDocument> documents, final Reporter context) {
final Queue<MapDocument> q = prepare(documents);

View File

@ -0,0 +1,188 @@
package eu.dnetlib.pace.util;
import com.arakelian.jq.ImmutableJqLibrary;
import com.arakelian.jq.ImmutableJqRequest;
import com.arakelian.jq.JqLibrary;
import com.arakelian.jq.JqResponse;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.config.Type;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldListImpl;
import eu.dnetlib.pace.model.FieldValueImpl;
import eu.dnetlib.pace.model.MapDocument;
import net.minidev.json.JSONArray;
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class MapDocumentUtil {
private static final JqLibrary library = ImmutableJqLibrary.of();
private static final ObjectMapper mapper = new ObjectMapper();
public static final String URL_REGEX = "^(http|https|ftp)\\://.*";
public static Predicate<String> urlFilter = s -> s.trim().matches(URL_REGEX);
public static MapDocument asMapDocument(DedupConfig conf, final String json) {
MapDocument m = new MapDocument();
final ImmutableJqRequest.Builder requestBuilder = ImmutableJqRequest.builder() //
.lib(library) //
.input(json);
m.setIdentifier(getStringValue(conf.getWf().getIdPath(), requestBuilder));
Map<String, Field> stringField = new HashMap<>();
conf.getPace().getModel().forEach(fdef -> {
switch (fdef.getType()) {
case String:
case Int:
stringField.put(fdef.getName(), new FieldValueImpl(fdef.getType(), fdef.getName(), getStringValue(fdef.getPath(), requestBuilder)));
break;
case URL:
String uv = getStringValue(fdef.getPath(), requestBuilder);
if (!urlFilter.test(uv)) uv = "";
stringField.put(fdef.getName(), new FieldValueImpl(fdef.getType(), fdef.getName(), uv));
break;
case List:
case JSON:
FieldListImpl fi = new FieldListImpl(fdef.getName(), fdef.getType());
getListValue(fdef.getPath(), requestBuilder)
.stream()
.map(item -> new FieldValueImpl(fdef.getType(), fdef.getName(), item))
.forEach(fi::add);
stringField.put(fdef.getName(), fi);
break;
}
});
m.setFieldMap(stringField);
return m;
}
public static MapDocument asMapDocumentWithJPath(DedupConfig conf, final String json) {
MapDocument m = new MapDocument();
m.setIdentifier(getJPathString(conf.getWf().getIdPath(), json));
Map<String, Field> stringField = new HashMap<>();
conf.getPace().getModel().forEach(fdef -> {
switch (fdef.getType()) {
case String:
case Int:
stringField.put(fdef.getName(), new FieldValueImpl(fdef.getType(), fdef.getName(), getJPathString(fdef.getPath(), json)));
break;
case URL:
String uv = getJPathString(fdef.getPath(), json);
if (!urlFilter.test(uv)) uv = "";
stringField.put(fdef.getName(), new FieldValueImpl(fdef.getType(), fdef.getName(), uv));
break;
case List:
case JSON:
FieldListImpl fi = new FieldListImpl(fdef.getName(), fdef.getType());
getJPathList(fdef.getPath(), json, fdef.getType())
.stream()
.map(item -> new FieldValueImpl(fdef.getType(), fdef.getName(), item))
.forEach(fi::add);
stringField.put(fdef.getName(), fi);
break;
}
});
m.setFieldMap(stringField);
return m;
}
private static List<String> getJPathList(String path, String json, Type type) {
if (type == Type.List)
return JsonPath.read(json, path);
Object jresult;
List<String> result = new ArrayList<>();
try {
jresult = JsonPath.read(json, path);
} catch (Throwable e) {
return result;
}
if (jresult instanceof JSONArray) {
((JSONArray) jresult).forEach(it -> {
try {
result.add(new ObjectMapper().writeValueAsString(it));
} catch (JsonProcessingException e) {
}
}
);
return result;
}
if (jresult instanceof LinkedHashMap) {
try {
result.add(new ObjectMapper().writeValueAsString(jresult));
} catch (JsonProcessingException e) {
}
return result;
}
if (jresult instanceof String) {
result.add((String) jresult);
}
return result;
}
private static String getJPathString(final String jsonPath, final String json) {
Object o = JsonPath.read(json, jsonPath);
if (o instanceof String)
return (String)o;
if (o instanceof JSONArray && ((JSONArray)o).size()>0)
return (String)((JSONArray)o).get(0);
return "";
}
private static String getStringValue(final String jqPath, final ImmutableJqRequest.Builder requestBuilder) {
final JqResponse response = requestBuilder
.filter(jqPath)
.build()
.execute();
String output = response.getOutput();
if (StringUtils.isNotBlank(output)) {
output = output.replaceAll("\"", "");
}
return output;
}
private static List<String> getListValue(final String jqPath, final ImmutableJqRequest.Builder requestBuilder) {
final JqResponse response = requestBuilder
.filter(jqPath)
.build()
.execute();
// if (response.hasErrors())
// throw new PaceException(String.format("Error on getting jqPath, xpath:%s, error : %s", jqPath, response.getErrors().toString()));
List<String> result = new ArrayList<>();
final JsonNode root;
try {
root = mapper.readTree(response.getOutput());
} catch (IOException e) {
throw new PaceException("Error on parsing json", e);
}
final Iterator<JsonNode> elements = root.elements();
while (elements.hasNext()) {
result.add(elements.next().toString());
}
return result;
}
}

View File

@ -1,11 +1,21 @@
package eu.dnetlib.pace.config;
import com.arakelian.jq.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.pace.AbstractPaceTest;
import eu.dnetlib.pace.model.FieldList;
import eu.dnetlib.pace.model.FieldListImpl;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
import java.util.Iterator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class ConfigTest extends AbstractPaceTest {
@ -54,4 +64,70 @@ public class ConfigTest extends AbstractPaceTest {
assertEquals(0, load.getPace().translationMap().keySet().size());
}
@Test
public void testAsMapDocument() throws Exception {
DedupConfig load = DedupConfig.load(readFromClasspath("result.pace.conf.json"));
System.out.println(load.getWf().getIdPath());
final String result =IOUtils.toString(this.getClass().getResourceAsStream("result.json"));
System.out.println(result);
final MapDocument mapDocument = MapDocumentUtil.asMapDocument(load, result);
System.out.println(mapDocument.getFieldMap().get("dateofacceptance").stringValue());
}
@Test
public void testAsMapDocumentJPath() throws Exception {
DedupConfig load = DedupConfig.load(readFromClasspath("result.pace.conf_jpath.json"));
System.out.println(load.getWf().getIdPath());
final String result =IOUtils.toString(this.getClass().getResourceAsStream("result.json"));
System.out.println(result);
final MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(load, result);
System.out.println(mapDocument.getFieldMap());
}
@Test
public void testJQ() throws Exception {
final String result =IOUtils.toString(this.getClass().getResourceAsStream("result.json"));
System.out.println(result);
final JqLibrary library = ImmutableJqLibrary.of();
final JqRequest request = ImmutableJqRequest.builder() //
.lib(library) //
.input(result) //
.filter("[.entity.result.metadata.author[]]") //
.build();
final JqResponse response = request.execute();
ObjectMapper mapper = new ObjectMapper();
final String output = response.getOutput();
System.out.println(output);
final JsonNode root = mapper.readTree(output);
System.out.println("root"+root);
final Iterator<JsonNode> elements = root.elements();
while (elements.hasNext()){
System.out.println(elements.next().toString());
}
}
}

File diff suppressed because one or more lines are too long

View File

@ -7,6 +7,7 @@
"queueMaxSize" : "2000",
"groupMaxSize" : "10",
"slidingWindowSize" : "200",
"idPath": ".entity.id",
"rootBuilder" : [ "result" ],
"includeChildren" : "true"
},
@ -25,10 +26,10 @@
{ "name" : "sizeMatch", "fields" : [ "authors" ] }
],
"model" : [
{ "name" : "pid", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "pid[qualifier#classid = {doi}]/value", "overrideMatch" : "true" },
{ "name" : "title", "algo" : "JaroWinkler", "type" : "String", "weight" : "1.0", "ignoreMissing" : "false", "path" : "result/metadata/title[qualifier#classid = {main title}]/value" },
{ "name" : "dateofacceptance", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "result/metadata/dateofacceptance/value" } ,
{ "name" : "authors", "algo" : "Null", "type" : "List", "weight" : "0.0", "ignoreMissing" : "true", "path" : "result/author/metadata/fullname/value" }
{ "name" : "pid", "algo" : "Null", "type" : "JSON", "weight" : "0.0", "ignoreMissing" : "true", "path" : ".entity.pid", "overrideMatch" : "true" },
{ "name" : "title", "algo" : "JaroWinkler", "type" : "String", "weight" : "1.0", "ignoreMissing" : "false", "path" : ".entity.result.metadata.title[] | select(.qualifier.classid==\"main title\") | .value" },
{ "name" : "dateofacceptance", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : ".entity.result.metadata.dateofacceptance.value" } ,
{ "name" : "authors", "algo" : "Null", "type" : "List", "weight" : "0.0", "ignoreMissing" : "true", "path" : "[.entity.result.metadata.author[].fullname]" }
],
"blacklists" : {
"title" : [
@ -47,7 +48,8 @@
"^(WHP Cruise Summary Information of section).*$",
"^(Measurement of the top quark\\-pair production cross section with ATLAS in pp collisions at).*$",
"^(Measurement of the spin\\-dependent structure function).*"
] }
] } ,
"synonyms": {}
}
}

View File

@ -0,0 +1,55 @@
{
"wf" : {
"threshold" : "0.99",
"dedupRun" : "001",
"entityType" : "result",
"orderField" : "title",
"queueMaxSize" : "2000",
"groupMaxSize" : "10",
"slidingWindowSize" : "200",
"idPath": "$.entity.id",
"rootBuilder" : [ "result" ],
"includeChildren" : "true"
},
"pace" : {
"clustering" : [
{ "name" : "acronyms", "fields" : [ "title" ], "params" : { "max" : "1", "minLen" : "2", "maxLen" : "4"} },
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }
],
"strictConditions" : [
{ "name" : "exactMatch", "fields" : [ "pid" ] }
],
"conditions" : [
{ "name" : "yearMatch", "fields" : [ "dateofacceptance" ] },
{ "name" : "titleVersionMatch", "fields" : [ "title" ] },
{ "name" : "sizeMatch", "fields" : [ "authors" ] }
],
"model" : [
{ "name" : "pid", "algo" : "Null", "type" : "JSON", "weight" : "0.0", "ignoreMissing" : "true", "path" : "$.entity.pid", "overrideMatch" : "true" },
{ "name" : "dateofacceptance", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "$.entity.result.metadata.dateofacceptance.value", "overrideMatch" : "true" },
{ "name" : "title", "algo" : "JaroWinkler", "type" : "String", "weight" : "1.0", "ignoreMissing" : "false", "path" : "$.entity.result.metadata.title[?(@.qualifier.classid ==\"main title\")].value" },
{ "name" : "authors", "algo" : "Null", "type" : "List", "weight" : "0.0", "ignoreMissing" : "true", "path" : "$.entity.result.metadata.author[*].fullname" }
],
"blacklists" : {
"title" : [
"^(Corpus Oral Dialectal \\(COD\\)\\.).*$",
"^(Kiri Karl Morgensternile).*$",
"^(\\[Eksliibris Aleksandr).*\\]$",
"^(\\[Eksliibris Aleksandr).*$",
"^(Eksliibris Aleksandr).*$",
"^(Kiri A\\. de Vignolles).*$",
"^(2 kirja Karl Morgensternile).*$",
"^(Pirita kloostri idaosa arheoloogilised).*$",
"^(Kiri tundmatule).*$",
"^(Kiri Jenaer Allgemeine Literaturzeitung toimetusele).*$",
"^(Eksliibris Nikolai Birukovile).*$",
"^(Eksliibris Nikolai Issakovile).*$",
"^(WHP Cruise Summary Information of section).*$",
"^(Measurement of the top quark\\-pair production cross section with ATLAS in pp collisions at).*$",
"^(Measurement of the spin\\-dependent structure function).*"
] } ,
"synonyms": {}
}
}