forked from D-Net/dnet-hadoop
merged JqMapping branch into tree2
This commit is contained in:
commit
5c01ae4c92
|
@ -38,10 +38,6 @@
|
|||
<groupId>commons-collections</groupId>
|
||||
<artifactId>commons-collections</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.googlecode.protobuf-java-format</groupId>
|
||||
<artifactId>protobuf-java-format</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.antlr</groupId>
|
||||
<artifactId>stringtemplate</artifactId>
|
||||
|
@ -59,22 +55,22 @@
|
|||
<groupId>org.reflections</groupId>
|
||||
<artifactId>reflections</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-mapper-asl</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-math3</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.jayway.jsonpath</groupId>
|
||||
<artifactId>json-path</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -1,25 +1,25 @@
|
|||
package eu.dnetlib.pace.config;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Maps;
|
||||
import eu.dnetlib.pace.model.ClusteringDef;
|
||||
import eu.dnetlib.pace.model.FieldDef;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
import org.antlr.stringtemplate.StringTemplate;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
|
||||
import eu.dnetlib.pace.tree.support.TreeNodeDef;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
import org.antlr.stringtemplate.StringTemplate;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import eu.dnetlib.pace.model.ClusteringDef;
|
||||
import eu.dnetlib.pace.model.FieldDef;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
public class DedupConfig implements Config, Serializable {
|
||||
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
package eu.dnetlib.pace.config;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.google.common.collect.Maps;
|
||||
import eu.dnetlib.pace.common.AbstractPaceFunctions;
|
||||
import eu.dnetlib.pace.model.ClusteringDef;
|
||||
import eu.dnetlib.pace.model.FieldDef;
|
||||
import eu.dnetlib.pace.tree.support.TreeNodeDef;
|
||||
import eu.dnetlib.pace.util.PaceResolver;
|
||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
package eu.dnetlib.pace.config;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
public class WfConfig implements Serializable {
|
||||
|
||||
|
@ -76,12 +76,17 @@ public class WfConfig implements Serializable {
|
|||
/** Maximum number of allowed children. */
|
||||
private int maxChildren = MAX_CHILDREN;
|
||||
|
||||
|
||||
/** Default maximum number of iterations. */
|
||||
private final static int MAX_ITERATIONS = 20;
|
||||
|
||||
/** Maximum number of iterations */
|
||||
private int maxIterations = MAX_ITERATIONS;
|
||||
|
||||
/** The Jquery path to retrieve the identifier */
|
||||
private String idPath = "$.id";
|
||||
|
||||
|
||||
public WfConfig() {}
|
||||
|
||||
/**
|
||||
|
@ -252,6 +257,7 @@ public class WfConfig implements Serializable {
|
|||
this.maxChildren = maxChildren;
|
||||
}
|
||||
|
||||
|
||||
public int getMaxIterations() {
|
||||
return maxIterations;
|
||||
}
|
||||
|
@ -260,6 +266,15 @@ public class WfConfig implements Serializable {
|
|||
this.maxIterations = maxIterations;
|
||||
}
|
||||
|
||||
public String getIdPath() {
|
||||
return idPath;
|
||||
}
|
||||
|
||||
public void setIdPath(String idPath) {
|
||||
this.idPath = idPath;
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
|
|
|
@ -1,19 +1,15 @@
|
|||
package eu.dnetlib.pace.model;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.pace.clustering.ClusteringFunction;
|
||||
import eu.dnetlib.pace.config.PaceConfig;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import eu.dnetlib.pace.clustering.*;
|
||||
import eu.dnetlib.pace.config.PaceConfig;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
import eu.dnetlib.pace.util.PaceResolver;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
public class ClusteringDef implements Serializable {
|
||||
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
package eu.dnetlib.pace.tree.support;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
package eu.dnetlib.pace.tree.support;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.pace.model.Field;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
package eu.dnetlib.pace.tree.support;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.pace.config.Config;
|
||||
import eu.dnetlib.pace.config.PaceConfig;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package eu.dnetlib.pace.tree.support;
|
||||
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
|
|
@ -36,6 +36,18 @@ public class BlockProcessor {
|
|||
this.dedupConf = dedupConf;
|
||||
}
|
||||
|
||||
|
||||
public void processSortedBlock(final String key, final List<MapDocument> documents, final Reporter context) {
|
||||
if (documents.size() > 1) {
|
||||
// log.info("reducing key: '" + key + "' records: " + q.size());
|
||||
//process(q, context);
|
||||
process(prepare(documents), context);
|
||||
|
||||
} else {
|
||||
context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
|
||||
}
|
||||
}
|
||||
|
||||
public void process(final String key, final Iterable<MapDocument> documents, final Reporter context) {
|
||||
|
||||
final Queue<MapDocument> q = prepare(documents);
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
package eu.dnetlib.pace.util;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.config.Type;
|
||||
import eu.dnetlib.pace.model.Field;
|
||||
import eu.dnetlib.pace.model.FieldListImpl;
|
||||
import eu.dnetlib.pace.model.FieldValueImpl;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import net.minidev.json.JSONArray;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class MapDocumentUtil {
|
||||
|
||||
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
public static final String URL_REGEX = "^(http|https|ftp)\\://.*";
|
||||
public static Predicate<String> urlFilter = s -> s.trim().matches(URL_REGEX);
|
||||
|
||||
|
||||
|
||||
public static MapDocument asMapDocumentWithJPath(DedupConfig conf, final String json) {
|
||||
MapDocument m = new MapDocument();
|
||||
m.setIdentifier(getJPathString(conf.getWf().getIdPath(), json));
|
||||
Map<String, Field> stringField = new HashMap<>();
|
||||
conf.getPace().getModel().forEach(fdef -> {
|
||||
switch (fdef.getType()) {
|
||||
case String:
|
||||
case Int:
|
||||
stringField.put(fdef.getName(), new FieldValueImpl(fdef.getType(), fdef.getName(), getJPathString(fdef.getPath(), json)));
|
||||
break;
|
||||
case URL:
|
||||
String uv = getJPathString(fdef.getPath(), json);
|
||||
if (!urlFilter.test(uv)) uv = "";
|
||||
stringField.put(fdef.getName(), new FieldValueImpl(fdef.getType(), fdef.getName(), uv));
|
||||
break;
|
||||
case List:
|
||||
case JSON:
|
||||
FieldListImpl fi = new FieldListImpl(fdef.getName(), fdef.getType());
|
||||
getJPathList(fdef.getPath(), json, fdef.getType())
|
||||
.stream()
|
||||
.map(item -> new FieldValueImpl(fdef.getType(), fdef.getName(), item))
|
||||
.forEach(fi::add);
|
||||
stringField.put(fdef.getName(), fi);
|
||||
break;
|
||||
}
|
||||
});
|
||||
m.setFieldMap(stringField);
|
||||
return m;
|
||||
}
|
||||
|
||||
public static List<String> getJPathList(String path, String json, Type type) {
|
||||
if (type == Type.List)
|
||||
return JsonPath.read(json, path);
|
||||
Object jresult;
|
||||
List<String> result = new ArrayList<>();
|
||||
try {
|
||||
jresult = JsonPath.read(json, path);
|
||||
} catch (Throwable e) {
|
||||
return result;
|
||||
}
|
||||
if (jresult instanceof JSONArray) {
|
||||
|
||||
((JSONArray) jresult).forEach(it -> {
|
||||
|
||||
try {
|
||||
result.add(new ObjectMapper().writeValueAsString(it));
|
||||
} catch (JsonProcessingException e) {
|
||||
|
||||
}
|
||||
}
|
||||
);
|
||||
return result;
|
||||
}
|
||||
|
||||
if (jresult instanceof LinkedHashMap) {
|
||||
try {
|
||||
result.add(new ObjectMapper().writeValueAsString(jresult));
|
||||
} catch (JsonProcessingException e) {
|
||||
|
||||
}
|
||||
return result;
|
||||
}
|
||||
if (jresult instanceof String) {
|
||||
result.add((String) jresult);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
public static String getJPathString(final String jsonPath, final String json) {
|
||||
try {
|
||||
Object o = JsonPath.read(json, jsonPath);
|
||||
if (o instanceof String)
|
||||
return (String)o;
|
||||
if (o instanceof JSONArray && ((JSONArray)o).size()>0)
|
||||
return (String)((JSONArray)o).get(0);
|
||||
return "";
|
||||
} catch (Exception e) {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,6 +1,10 @@
|
|||
package eu.dnetlib.pace.config;
|
||||
|
||||
|
||||
import eu.dnetlib.pace.AbstractPaceTest;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
@ -57,4 +61,27 @@ public class ConfigTest extends AbstractPaceTest {
|
|||
assertEquals(0, load.getPace().translationMap().keySet().size());
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testAsMapDocumentJPath() throws Exception {
|
||||
|
||||
DedupConfig load = DedupConfig.load(readFromClasspath("result.pace.conf_jpath.json"));
|
||||
|
||||
|
||||
System.out.println(load.getWf().getIdPath());
|
||||
|
||||
final String result =IOUtils.toString(this.getClass().getResourceAsStream("result.json"));
|
||||
|
||||
System.out.println(result);
|
||||
final MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(load, result);
|
||||
|
||||
System.out.println(mapDocument.getFieldMap());
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,48 @@
|
|||
{
|
||||
"wf" : {
|
||||
"threshold" : "0.99",
|
||||
"dedupRun" : "001",
|
||||
"entityType" : "result",
|
||||
"orderField" : "title",
|
||||
"queueMaxSize" : "2000",
|
||||
"groupMaxSize" : "10",
|
||||
"slidingWindowSize" : "200",
|
||||
"idPath": "$.entity.id",
|
||||
"rootBuilder" : [ "result" ],
|
||||
"includeChildren" : "true"
|
||||
},
|
||||
"pace" : {
|
||||
"clustering" : [
|
||||
{ "name" : "acronyms", "fields" : [ "title" ], "params" : { "max" : "1", "minLen" : "2", "maxLen" : "4"} },
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }
|
||||
],
|
||||
"decisionTree": {},
|
||||
"model" : [
|
||||
{ "name" : "pid", "type" : "JSON", "path" : "$.entity.pid"},
|
||||
{ "name" : "dateofacceptance", "type" : "String", "path" : "$.entity.result.metadata.dateofacceptance.value"},
|
||||
{ "name" : "title", "type" : "String","path" : "$.entity.result.metadata.title[?(@.qualifier.classid ==\"main title\")].value" },
|
||||
{ "name" : "authors", "type" : "List", "path" : "$.entity.result.metadata.author[*].fullname" }
|
||||
],
|
||||
"blacklists" : {
|
||||
"title" : [
|
||||
"^(Corpus Oral Dialectal \\(COD\\)\\.).*$",
|
||||
"^(Kiri Karl Morgensternile).*$",
|
||||
"^(\\[Eksliibris Aleksandr).*\\]$",
|
||||
"^(\\[Eksliibris Aleksandr).*$",
|
||||
"^(Eksliibris Aleksandr).*$",
|
||||
"^(Kiri A\\. de Vignolles).*$",
|
||||
"^(2 kirja Karl Morgensternile).*$",
|
||||
"^(Pirita kloostri idaosa arheoloogilised).*$",
|
||||
"^(Kiri tundmatule).*$",
|
||||
"^(Kiri Jenaer Allgemeine Literaturzeitung toimetusele).*$",
|
||||
"^(Eksliibris Nikolai Birukovile).*$",
|
||||
"^(Eksliibris Nikolai Issakovile).*$",
|
||||
"^(WHP Cruise Summary Information of section).*$",
|
||||
"^(Measurement of the top quark\\-pair production cross section with ATLAS in pp collisions at).*$",
|
||||
"^(Measurement of the spin\\-dependent structure function).*"
|
||||
] } ,
|
||||
"synonyms": {}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue