1
0
Fork 0

Merge pull request 'Refactor Dedup using Spark Dataframe API, initial support for scala 2.12 and Spark 3.4' (#324) from dedup-with-dataframe-2 into beta

Reviewed-on: D-Net/dnet-hadoop#324
This commit is contained in:
Claudio Atzori 2023-07-25 10:17:17 +02:00
commit 8c63e4a864
129 changed files with 1760 additions and 2928 deletions

View File

@ -52,6 +52,8 @@
</execution>
</executions>
<configuration>
<failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
@ -81,11 +83,11 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
@ -159,7 +161,7 @@
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<artifactId>${dhp-schemas.artifact}</artifactId>
</dependency>
<dependency>

View File

@ -20,7 +20,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<version>${net.alchim31.maven.version}</version>
<executions>
<execution>
<id>scala-compile-first</id>
@ -39,8 +39,9 @@
</execution>
</executions>
<configuration>
<failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
<scalaVersion>${scala.version}</scalaVersion>
<addScalacArgs>-target:jvm-1.8</addScalacArgs>
</configuration>
</plugin>
</plugins>
@ -68,7 +69,6 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>stringtemplate</artifactId>
@ -89,17 +89,22 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -11,7 +11,6 @@ import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.pace.common.AbstractPaceFunctions;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
public abstract class AbstractClusteringFunction extends AbstractPaceFunctions implements ClusteringFunction {
@ -24,11 +23,10 @@ public abstract class AbstractClusteringFunction extends AbstractPaceFunctions i
protected abstract Collection<String> doApply(Config conf, String s);
@Override
public Collection<String> apply(Config conf, List<Field> fields) {
public Collection<String> apply(Config conf, List<String> fields) {
return fields
.stream()
.filter(f -> !f.isEmpty())
.map(Field::stringValue)
.map(this::normalize)
.map(s -> filterAllStopWords(s))
.map(s -> doApply(conf, s))

View File

@ -1,60 +0,0 @@
package eu.dnetlib.pace.clustering;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Pattern;
import com.google.common.collect.Maps;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Document;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldListImpl;
import eu.dnetlib.pace.model.MapDocument;
public class BlacklistAwareClusteringCombiner extends ClusteringCombiner {
public static Collection<String> filterAndCombine(final MapDocument a, final Config conf) {
Document filtered = filter(a, conf.blacklists());
return combine(filtered, conf);
}
private static MapDocument filter(final MapDocument a, final Map<String, List<Pattern>> blacklists) {
if (blacklists == null || blacklists.isEmpty()) {
return a;
}
final Map<String, Field> filtered = Maps.newHashMap(a.getFieldMap());
for (final Entry<String, List<Pattern>> e : blacklists.entrySet()) {
Field fields = a.getFieldMap().get(e.getKey());
if (fields != null) {
final FieldListImpl fl = new FieldListImpl();
for (Field f : fields) {
if (!isBlackListed(f.stringValue(), e.getValue())) {
fl.add(f);
}
}
filtered.put(e.getKey(), fl);
}
}
return new MapDocument(a.getIdentifier(), filtered);
}
private static boolean isBlackListed(String value, List<Pattern> blacklist) {
for (Pattern pattern : blacklist) {
if (pattern.matcher(value).matches()) {
return true;
}
}
return false;
}
}

View File

@ -1,64 +0,0 @@
package eu.dnetlib.pace.clustering;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Sets;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.ClusteringDef;
import eu.dnetlib.pace.model.Document;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldValueImpl;
public class ClusteringCombiner {
private static String SEPARATOR = ":";
private static String COLLAPSE_ON = "collapseOn";
public static Collection<String> combine(final Document a, final Config conf) {
final Collection<String> res = Sets.newLinkedHashSet();
for (final ClusteringDef cd : conf.clusterings()) {
for (final String fieldName : cd.getFields()) {
String prefix = getPrefix(cd, fieldName);
Field values = a.values(fieldName);
List<Field> fields = new ArrayList<>();
if (values instanceof FieldValueImpl) {
fields.add(values);
} else {
fields.addAll((List<Field>) values);
}
res
.addAll(
cd
.clusteringFunction()
.apply(conf, fields)
.stream()
.map(k -> prefix + SEPARATOR + k)
.collect(Collectors.toList()));
}
}
return res;
}
private static String getPrefix(ClusteringDef cd, String fieldName) {
return cd.getName() + SEPARATOR +
cd
.getParams()
.keySet()
.stream()
.filter(k -> k.contains(COLLAPSE_ON))
.findFirst()
.map(k -> StringUtils.substringAfter(k, SEPARATOR))
.orElse(fieldName);
}
}

View File

@ -6,11 +6,10 @@ import java.util.List;
import java.util.Map;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
public interface ClusteringFunction {
public Collection<String> apply(Config config, List<Field> fields);
public Collection<String> apply(Config config, List<String> fields);
public Map<String, Integer> getParams();

View File

@ -6,9 +6,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.pace.common.AbstractPaceFunctions;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
@ClusteringClass("keywordsclustering")
public class KeywordsClustering extends AbstractClusteringFunction {
@ -40,11 +38,10 @@ public class KeywordsClustering extends AbstractClusteringFunction {
}
@Override
public Collection<String> apply(final Config conf, List<Field> fields) {
public Collection<String> apply(final Config conf, List<String> fields) {
return fields
.stream()
.filter(f -> !f.isEmpty())
.map(Field::stringValue)
.map(this::cleanup)
.map(this::normalize)
.map(s -> filterAllStopWords(s))

View File

@ -9,7 +9,6 @@ import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Lists;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.Person;
@ClusteringClass("lnfi")
@ -22,11 +21,10 @@ public class LastNameFirstInitial extends AbstractClusteringFunction {
}
@Override
public Collection<String> apply(Config conf, List<Field> fields) {
public Collection<String> apply(Config conf, List<String> fields) {
return fields
.stream()
.filter(f -> !f.isEmpty())
.map(Field::stringValue)
.map(this::normalize)
.map(s -> doApply(conf, s))
.map(c -> filterBlacklisted(c, ngramBlacklist))

View File

@ -11,7 +11,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
@ClusteringClass("lowercase")
public class LowercaseClustering extends AbstractClusteringFunction {
@ -21,10 +20,10 @@ public class LowercaseClustering extends AbstractClusteringFunction {
}
@Override
public Collection<String> apply(Config conf, List<Field> fields) {
public Collection<String> apply(Config conf, List<String> fields) {
Collection<String> c = Sets.newLinkedHashSet();
for (Field f : fields) {
c.addAll(doApply(conf, f.stringValue()));
for (String f : fields) {
c.addAll(doApply(conf, f));
}
return c;
}

View File

@ -8,15 +8,15 @@ import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.pace.common.AbstractPaceFunctions;
public class NGramUtils extends AbstractPaceFunctions {
static private final NGramUtils NGRAMUTILS = new NGramUtils();
private static final int SIZE = 100;
private static Set<String> stopwords = AbstractPaceFunctions
private static final Set<String> stopwords = AbstractPaceFunctions
.loadFromClasspath("/eu/dnetlib/pace/config/stopwords_en.txt");
public static String cleanupForOrdering(String s) {
NGramUtils utils = new NGramUtils();
return (utils.filterStopWords(utils.normalize(s), stopwords) + StringUtils.repeat(" ", SIZE))
return (NGRAMUTILS.filterStopWords(NGRAMUTILS.normalize(s), stopwords) + StringUtils.repeat(" ", SIZE))
.substring(0, SIZE)
.replaceAll(" ", "");
}

View File

@ -2,7 +2,6 @@
package eu.dnetlib.pace.clustering;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -14,7 +13,11 @@ import eu.dnetlib.pace.config.Config;
public class NgramPairs extends Ngrams {
public NgramPairs(Map<String, Integer> params) {
super(params);
super(params, false);
}
public NgramPairs(Map<String, Integer> params, boolean sorted) {
super(params, sorted);
}
@Override

View File

@ -8,8 +8,15 @@ import eu.dnetlib.pace.config.Config;
@ClusteringClass("ngrams")
public class Ngrams extends AbstractClusteringFunction {
private final boolean sorted;
public Ngrams(Map<String, Integer> params) {
this(params, false);
}
public Ngrams(Map<String, Integer> params, boolean sorted) {
super(params);
this.sorted = sorted;
}
@Override
@ -19,20 +26,21 @@ public class Ngrams extends AbstractClusteringFunction {
protected Collection<String> getNgrams(String s, int ngramLen, int max, int maxPerToken, int minNgramLen) {
final Collection<String> ngrams = new LinkedHashSet<String>();
final Collection<String> ngrams = sorted ? new TreeSet<>() : new LinkedHashSet<String>();
final StringTokenizer st = new StringTokenizer(s);
while (st.hasMoreTokens()) {
final String token = st.nextToken();
if (!token.isEmpty()) {
for (int i = 0; i < maxPerToken && ngramLen + i <= token.length(); i++) {
String ngram = (token + " ").substring(i, ngramLen + i).trim();
String ngram = token.substring(i, Math.min(ngramLen + i, token.length())).trim();
if (ngram.length() >= minNgramLen) {
ngrams.add(ngram);
if (ngrams.size() >= max) {
return ngrams;
}
if (ngram.length() >= minNgramLen) {
ngrams.add(ngram);
}
}
}

View File

@ -12,7 +12,6 @@ import com.google.common.collect.Sets;
import eu.dnetlib.pace.common.AbstractPaceFunctions;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.Person;
@ClusteringClass("personClustering")
@ -27,19 +26,19 @@ public class PersonClustering extends AbstractPaceFunctions implements Clusterin
}
@Override
public Collection<String> apply(final Config conf, final List<Field> fields) {
public Collection<String> apply(final Config conf, final List<String> fields) {
final Set<String> hashes = Sets.newHashSet();
for (final Field f : fields) {
for (final String f : fields) {
final Person person = new Person(f.stringValue(), false);
final Person person = new Person(f, false);
if (StringUtils.isNotBlank(person.getNormalisedFirstName())
&& StringUtils.isNotBlank(person.getNormalisedSurname())) {
hashes.add(firstLC(person.getNormalisedFirstName()) + person.getNormalisedSurname().toLowerCase());
} else {
for (final String token1 : tokens(f.stringValue(), MAX_TOKENS)) {
for (final String token2 : tokens(f.stringValue(), MAX_TOKENS)) {
for (final String token1 : tokens(f, MAX_TOKENS)) {
for (final String token2 : tokens(f, MAX_TOKENS)) {
if (!token1.equals(token2)) {
hashes.add(firstLC(token1) + token2);
}

View File

@ -13,7 +13,7 @@ import eu.dnetlib.pace.config.Config;
public class SortedNgramPairs extends NgramPairs {
public SortedNgramPairs(Map<String, Integer> params) {
super(params);
super(params, false);
}
@Override

View File

@ -11,7 +11,6 @@ import java.util.stream.Collectors;
import eu.dnetlib.pace.common.AbstractPaceFunctions;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
@ClusteringClass("urlclustering")
public class UrlClustering extends AbstractPaceFunctions implements ClusteringFunction {
@ -23,12 +22,11 @@ public class UrlClustering extends AbstractPaceFunctions implements ClusteringFu
}
@Override
public Collection<String> apply(final Config conf, List<Field> fields) {
public Collection<String> apply(final Config conf, List<String> fields) {
try {
return fields
.stream()
.filter(f -> !f.isEmpty())
.map(Field::stringValue)
.map(this::asUrl)
.map(URL::getHost)
.collect(Collectors.toCollection(HashSet::new));

View File

@ -16,13 +16,11 @@ import org.apache.commons.lang3.StringUtils;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.ibm.icu.text.Transliterator;
import eu.dnetlib.pace.clustering.NGramUtils;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
import eu.dnetlib.pace.model.FieldListImpl;
/**
* Set of common functions for the framework
@ -51,28 +49,25 @@ public abstract class AbstractPaceFunctions {
protected static Set<String> ngramBlacklist = loadFromClasspath("/eu/dnetlib/pace/config/ngram_blacklist.txt");
// html regex for normalization
public final String HTML_REGEX = "<[^>]*>";
public static final Pattern HTML_REGEX = Pattern.compile("<[^>]*>");
private static final String alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 ";
private static final String aliases_from = "⁰¹²³⁴⁵⁶⁷⁸⁹⁺⁻⁼⁽⁾ⁿ₀₁₂₃₄₅₆₇₈₉₊₋₌₍₎àáâäæãåāèéêëēėęəîïíīįìôöòóœøōõûüùúūßśšłžźżçćčñń";
private static final String aliases_to = "0123456789+-=()n0123456789+-=()aaaaaaaaeeeeeeeeiiiiiioooooooouuuuussslzzzcccnn";
// doi prefix for normalization
public final String DOI_PREFIX = "(https?:\\/\\/dx\\.doi\\.org\\/)|(doi:)";
public static final Pattern DOI_PREFIX = Pattern.compile("(https?:\\/\\/dx\\.doi\\.org\\/)|(doi:)");
private Pattern numberPattern = Pattern.compile("-?\\d+(\\.\\d+)?");
private static Pattern numberPattern = Pattern.compile("-?\\d+(\\.\\d+)?");
private Pattern hexUnicodePattern = Pattern.compile("\\\\u(\\p{XDigit}{4})");
protected final static FieldList EMPTY_FIELD = new FieldListImpl();
private static Pattern hexUnicodePattern = Pattern.compile("\\\\u(\\p{XDigit}{4})");
protected String concat(final List<String> l) {
return Joiner.on(" ").skipNulls().join(l);
}
protected String cleanup(final String s) {
final String s1 = s.replaceAll(HTML_REGEX, "");
final String s1 = HTML_REGEX.matcher(s).replaceAll("");
final String s2 = unicodeNormalization(s1.toLowerCase());
final String s3 = nfd(s2);
final String s4 = fixXML(s3);
@ -162,11 +157,6 @@ public abstract class AbstractPaceFunctions {
return sb.toString().replaceAll("\\s+", " ");
}
protected String getFirstValue(final Field values) {
return (values != null) && !Iterables.isEmpty(values) ? Iterables.getFirst(values, EMPTY_FIELD).stringValue()
: "";
}
protected boolean notNull(final String s) {
return s != null;
}
@ -316,7 +306,7 @@ public abstract class AbstractPaceFunctions {
}
public String normalizePid(String pid) {
return pid.toLowerCase().replaceAll(DOI_PREFIX, "");
return DOI_PREFIX.matcher(pid.toLowerCase()).replaceAll("");
}
// get the list of keywords into the input string

View File

@ -3,7 +3,7 @@ package eu.dnetlib.pace.config;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.function.Predicate;
import eu.dnetlib.pace.model.ClusteringDef;
import eu.dnetlib.pace.model.FieldDef;
@ -30,13 +30,6 @@ public interface Config {
*/
public Map<String, TreeNodeDef> decisionTree();
/**
* Field configuration definitions.
*
* @return the list of definitions
*/
public Map<String, FieldDef> modelMap();
/**
* Clusterings.
*
@ -49,7 +42,7 @@ public interface Config {
*
* @return the map
*/
public Map<String, List<Pattern>> blacklists();
public Map<String, Predicate<String>> blacklists();
/**
* Translation map.

View File

@ -4,18 +4,19 @@ package eu.dnetlib.pace.config;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
import org.antlr.stringtemplate.StringTemplate;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -27,9 +28,6 @@ import eu.dnetlib.pace.tree.support.TreeNodeDef;
import eu.dnetlib.pace.util.PaceException;
public class DedupConfig implements Config, Serializable {
private static final Log log = LogFactory.getLog(DedupConfig.class);
private static String CONFIG_TEMPLATE = "dedupConfig.st";
private PaceConfig pace;
@ -37,7 +35,7 @@ public class DedupConfig implements Config, Serializable {
private WfConfig wf;
@JsonIgnore
private Map<String, List<Pattern>> blacklists;
private Map<String, Predicate<String>> blacklists;
private static Map<String, String> defaults = Maps.newHashMap();
@ -72,19 +70,29 @@ public class DedupConfig implements Config, Serializable {
.getBlacklists()
.entrySet()
.stream()
.collect(
Collectors
.toMap(
e -> e.getKey(),
e -> e
.map(
e -> new AbstractMap.SimpleEntry<String, List<Pattern>>(e.getKey(),
e
.getValue()
.stream()
.filter(s -> !StringUtils.isBlank(s))
.map(Pattern::compile)
.collect(Collectors.toList())));
.collect(Collectors.toList())))
.collect(
Collectors
.toMap(
e -> e.getKey(),
e -> (Predicate<String> & Serializable) s -> e
.getValue()
.stream()
.filter(p -> p.matcher(s).matches())
.findFirst()
.isPresent()))
;
return config;
} catch (IOException e) {
} catch (IOException | PatternSyntaxException e) {
throw new PaceException("Error in parsing configuration json", e);
}
@ -152,18 +160,13 @@ public class DedupConfig implements Config, Serializable {
return getPace().getModel();
}
@Override
public Map<String, FieldDef> modelMap() {
return getPace().getModelMap();
}
@Override
public List<ClusteringDef> clusterings() {
return getPace().getClustering();
}
@Override
public Map<String, List<Pattern>> blacklists() {
public Map<String, Predicate<String>> blacklists() {
return blacklists;
}

View File

@ -28,6 +28,10 @@ public class PaceConfig extends AbstractPaceFunctions implements Serializable {
@JsonIgnore
private Map<String, String> translationMap;
public Map<String, FieldDef> getModelMap() {
return modelMap;
}
@JsonIgnore
private Map<String, FieldDef> modelMap;
@ -101,13 +105,4 @@ public class PaceConfig extends AbstractPaceFunctions implements Serializable {
public void setSynonyms(Map<String, List<String>> synonyms) {
this.synonyms = synonyms;
}
public Map<String, FieldDef> getModelMap() {
return modelMap;
}
public void setModelMap(final Map<String, FieldDef> modelMap) {
this.modelMap = modelMap;
}
}

View File

@ -1,72 +0,0 @@
package eu.dnetlib.pace.model;
import eu.dnetlib.pace.config.Type;
/**
* The Class AbstractField.
*/
public abstract class AbstractField implements Field {
/** The type. */
protected Type type = Type.String;
/** The name. */
protected String name;
/**
* Instantiates a new abstract field.
*/
protected AbstractField() {
}
/**
* Instantiates a new abstract field.
*
* @param type
* the type
* @param name
* the name
*/
protected AbstractField(final Type type, final String name) {
this.type = type;
this.name = name;
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.Field#getName()
*/
@Override
public String getName() {
return name;
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.Field#getType()
*/
@Override
public Type getType() {
return type;
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.Field#setName(java.lang.String)
*/
@Override
public void setName(final String name) {
this.name = name;
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.Field#setType(eu.dnetlib.pace.config.Type)
*/
@Override
public void setType(final Type type) {
this.type = type;
}
}

View File

@ -1,40 +0,0 @@
package eu.dnetlib.pace.model;
import java.util.Set;
/**
* The Interface Document. Models the common operations available on a Pace Document.
*/
public interface Document {
/**
* Gets the identifier.
*
* @return the identifier
*/
String getIdentifier();
/**
* Fields.
*
* @return the iterable
*/
Iterable<Field> fields();
/**
* Values.
*
* @param name
* the name
* @return the field list
*/
Field values(String name);
/**
* Field names.
*
* @return the sets the
*/
Set<String> fieldNames();
}

View File

@ -1,57 +0,0 @@
package eu.dnetlib.pace.model;
import java.io.Serializable;
import eu.dnetlib.pace.config.Type;
/**
* The Interface Field.
*/
public interface Field extends Iterable<Field>, Serializable {
/**
* Gets the name.
*
* @return the name
*/
public String getName();
/**
* Sets the name.
*
* @param name
* the new name
*/
public void setName(String name);
/**
* Gets the type.
*
* @return the type
*/
public Type getType();
/**
* Sets the type.
*
* @param type
* the new type
*/
public void setType(Type type);
/**
* Checks if is empty.
*
* @return true, if is empty
*/
public boolean isEmpty();
/**
* String value.
*
* @return the string
*/
public String stringValue();
}

View File

@ -39,20 +39,6 @@ public class FieldDef implements Serializable {
public FieldDef() {
}
// def apply(s: String): Field[A]
public Field apply(final Type type, final String s) {
switch (type) {
case Int:
return new FieldValueImpl(type, name, Integer.parseInt(s));
case String:
return new FieldValueImpl(type, name, s);
case List:
return new FieldListImpl(name, type);
default:
throw new IllegalArgumentException("Casting not implemented for type " + type);
}
}
public String getName() {
return name;
}

View File

@ -1,25 +0,0 @@
package eu.dnetlib.pace.model;
import java.util.List;
/**
* The Interface FieldList.
*/
public interface FieldList extends List<Field>, Field {
/**
* String list.
*
* @return the list
*/
public List<String> stringList();
/**
* Double[] Array
*
* @return the double[] array
*/
public double[] doubleArray();
}

View File

@ -1,315 +0,0 @@
package eu.dnetlib.pace.model;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import eu.dnetlib.pace.config.Type;
/**
* The Class FieldListImpl.
*/
public class FieldListImpl extends AbstractField implements FieldList {
/** The fields. */
private List<Field> fields;
/**
* Instantiates a new field list impl.
*/
public FieldListImpl() {
fields = Lists.newArrayList();
}
/**
* Instantiates a new field list impl.
*
* @param name
* the name
*/
public FieldListImpl(final String name, final Type type) {
super(type, name);
fields = Lists.newArrayList();
}
/*
* (non-Javadoc)
* @see java.util.List#add(java.lang.Object)
*/
@Override
public boolean add(final Field f) {
return fields.add(f);
}
/*
* (non-Javadoc)
* @see java.util.List#add(int, java.lang.Object)
*/
@Override
public void add(final int i, final Field f) {
fields.add(i, f);
}
/*
* (non-Javadoc)
* @see java.util.List#addAll(java.util.Collection)
*/
@Override
public boolean addAll(final Collection<? extends Field> f) {
return fields.addAll(f);
}
/*
* (non-Javadoc)
* @see java.util.List#addAll(int, java.util.Collection)
*/
@Override
public boolean addAll(final int i, final Collection<? extends Field> f) {
return fields.addAll(i, f);
}
/*
* (non-Javadoc)
* @see java.util.List#clear()
*/
@Override
public void clear() {
fields.clear();
}
/*
* (non-Javadoc)
* @see java.util.List#contains(java.lang.Object)
*/
@Override
public boolean contains(final Object o) {
return fields.contains(o);
}
/*
* (non-Javadoc)
* @see java.util.List#containsAll(java.util.Collection)
*/
@Override
public boolean containsAll(final Collection<?> f) {
return fields.containsAll(f);
}
/*
* (non-Javadoc)
* @see java.util.List#get(int)
*/
@Override
public Field get(final int i) {
return fields.get(i);
}
/*
* (non-Javadoc)
* @see java.util.List#indexOf(java.lang.Object)
*/
@Override
public int indexOf(final Object o) {
return fields.indexOf(o);
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.Field#isEmpty()
*/
@Override
public boolean isEmpty() {
return Iterables.all(fields, f -> f.isEmpty());
}
/*
* (non-Javadoc)
* @see java.lang.Iterable#iterator()
*/
@Override
public Iterator<Field> iterator() {
return fields.iterator();
}
/*
* (non-Javadoc)
* @see java.util.List#lastIndexOf(java.lang.Object)
*/
@Override
public int lastIndexOf(final Object o) {
return fields.lastIndexOf(o);
}
/*
* (non-Javadoc)
* @see java.util.List#listIterator()
*/
@Override
public ListIterator<Field> listIterator() {
return fields.listIterator();
}
/*
* (non-Javadoc)
* @see java.util.List#listIterator(int)
*/
@Override
public ListIterator<Field> listIterator(final int i) {
return fields.listIterator(i);
}
/*
* (non-Javadoc)
* @see java.util.List#remove(java.lang.Object)
*/
@Override
public boolean remove(final Object o) {
return fields.remove(o);
}
/*
* (non-Javadoc)
* @see java.util.List#remove(int)
*/
@Override
public Field remove(final int i) {
return fields.remove(i);
}
/*
* (non-Javadoc)
* @see java.util.List#removeAll(java.util.Collection)
*/
@Override
public boolean removeAll(final Collection<?> f) {
return fields.removeAll(f);
}
/*
* (non-Javadoc)
* @see java.util.List#retainAll(java.util.Collection)
*/
@Override
public boolean retainAll(final Collection<?> f) {
return fields.retainAll(f);
}
/*
* (non-Javadoc)
* @see java.util.List#set(int, java.lang.Object)
*/
@Override
public Field set(final int i, final Field f) {
return fields.set(i, f);
}
/*
* (non-Javadoc)
* @see java.util.List#size()
*/
@Override
public int size() {
return fields.size();
}
/*
* (non-Javadoc)
* @see java.util.List#subList(int, int)
*/
@Override
public List<Field> subList(final int from, final int to) {
return fields.subList(from, to);
}
/*
* (non-Javadoc)
* @see java.util.List#toArray()
*/
@Override
public Object[] toArray() {
return fields.toArray();
}
/*
* (non-Javadoc)
* @see java.util.List#toArray(java.lang.Object[])
*/
@Override
public <T> T[] toArray(final T[] t) {
return fields.toArray(t);
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.Field#stringValue()
*/
@Override
public String stringValue() {
switch (getType()) {
case List:
case Int:
case String:
return Joiner.on(" ").join(stringList());
case JSON:
String json;
try {
json = new ObjectMapper().writeValueAsString(this);
} catch (JsonProcessingException e) {
json = null;
}
return json;
default:
throw new IllegalArgumentException("Unknown type: " + getType().toString());
}
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.FieldList#stringList()
*/
@Override
public List<String> stringList() {
return Lists.newArrayList(Iterables.transform(fields, getValuesTransformer()));
}
private Function<Field, String> getValuesTransformer() {
return new Function<Field, String>() {
@Override
public String apply(final Field f) {
return f.stringValue();
}
};
}
@Override
public double[] doubleArray() {
return Lists.newArrayList(Iterables.transform(fields, getDouble())).stream().mapToDouble(d -> d).toArray();
}
private Function<Field, Double> getDouble() {
return new Function<Field, Double>() {
@Override
public Double apply(final Field f) {
return Double.parseDouble(f.stringValue());
}
};
}
@Override
public String toString() {
return stringList().toString();
}
}

View File

@ -1,26 +0,0 @@
package eu.dnetlib.pace.model;
/**
* The Interface FieldValue.
*/
public interface FieldValue extends Field {
/**
* Gets the value.
*
* @return the value
*/
public Object getValue();
/**
* Sets the value.
*
* @param value
* the new value
*/
public void setValue(final Object value);
public double[] doubleArrayValue();
}

View File

@ -1,135 +0,0 @@
package eu.dnetlib.pace.model;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.pace.config.Type;
/**
* The Class FieldValueImpl.
*/
public class FieldValueImpl extends AbstractField implements FieldValue {
/** The value. */
private Object value = null;
/**
* Instantiates a new field value impl.
*/
public FieldValueImpl() {
}
/**
* Instantiates a new field value impl.
*
* @param type
* the type
* @param name
* the name
* @param value
* the value
*/
public FieldValueImpl(final Type type, final String name, final Object value) {
super(type, name);
this.value = value;
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.Field#isEmpty()
*/
@Override
public boolean isEmpty() {
if (value == null)
return false;
switch (type) {
case String:
case JSON:
return value.toString().isEmpty();
case List:
try {
List<?> list = (List<?>) value;
return list.isEmpty() || ((FieldValueImpl) list.get(0)).isEmpty();
} catch (Exception e) {
throw new RuntimeException(value.toString());
}
case URL:
String str = value.toString();
return StringUtils.isBlank(str) || !isValidURL(str);
case DoubleArray:
return doubleArrayValue().length == 0;
default:
return true;
}
}
private boolean isValidURL(final String s) {
try {
new URL(s);
return true;
} catch (MalformedURLException e) {
return false;
}
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.FieldValue#getValue()
*/
@Override
public Object getValue() {
return value;
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.FieldValue#setValue(java.lang.Object)
*/
@Override
public void setValue(final Object value) {
this.value = value;
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.Field#stringValue()
*/
@Override
// @SuppressWarnings("unchecked")
public String stringValue() {
return String.valueOf(getValue());
// switch (getType()) {
//
// case Int:
// return String.valueOf(getValue());
// case List:
// return Joiner.on(" ").join((List<String>) getValue());
// case String:
// return (String) getValue();
// default:
// throw new IllegalArgumentException("Unknown type: " + getType().toString());
// }
}
public double[] doubleArrayValue() {
return (double[]) getValue();
}
/*
* (non-Javadoc)
* @see java.lang.Iterable#iterator()
*/
@Override
@SuppressWarnings("unchecked")
public Iterator<Field> iterator() {
return Collections.singleton((Field) this).iterator();
}
}

View File

@ -1,143 +0,0 @@
package eu.dnetlib.pace.model;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* The Class MapDocument.
*/
public class MapDocument implements Document, Serializable {
/** The identifier. */
private String identifier;
/** The field map. */
private Map<String, Field> fieldMap;
/**
* Instantiates a new map document.
*/
public MapDocument() {
identifier = null;
fieldMap = Maps.newHashMap();
}
/**
* Instantiates a new map document.
*
* @param identifier
* the identifier
* @param fieldMap
* the field map
*/
public MapDocument(final String identifier, final Map<String, Field> fieldMap) {
this.setIdentifier(identifier);
this.fieldMap = fieldMap;
}
/**
* Instantiates a new map document.
*
* @param identifier
* the identifier
* @param data
* the data
*/
public MapDocument(final String identifier, final byte[] data) {
final MapDocument doc = MapDocumentSerializer.decode(data);
this.fieldMap = doc.fieldMap;
this.identifier = doc.identifier;
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.document.Document#fields()
*/
@Override
public Iterable<Field> fields() {
return Lists.newArrayList(Iterables.concat(fieldMap.values()));
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.document.Document#values(java.lang.String)
*/
@Override
public Field values(final String name) {
return fieldMap.get(name);
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.document.Document#fieldNames()
*/
@Override
public Set<String> fieldNames() {
return fieldMap.keySet();
}
/*
* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return MapDocumentSerializer.toString(this);
// return String.format("Document(%s)", fieldMap.toString());
}
/**
* To byte array.
*
* @return the byte[]
*/
public byte[] toByteArray() {
return MapDocumentSerializer.toByteArray(this);
}
/*
* (non-Javadoc)
* @see eu.dnetlib.pace.model.document.Document#getIdentifier()
*/
@Override
public String getIdentifier() {
return identifier;
}
/**
* Sets the identifier.
*
* @param identifier
* the new identifier
*/
public void setIdentifier(final String identifier) {
this.identifier = identifier;
}
/**
* Gets the field map.
*
* @return the field map
*/
public Map<String, Field> getFieldMap() {
return fieldMap;
}
/**
* Sets the field map.
*
* @param fieldMap
* the field map
*/
public void setFieldMap(final Map<String, Field> fieldMap) {
this.fieldMap = fieldMap;
}
}

View File

@ -1,52 +0,0 @@
package eu.dnetlib.pace.model;
import java.util.Comparator;
import com.google.common.collect.Iterables;
import eu.dnetlib.pace.clustering.NGramUtils;
/**
* The Class MapDocumentComparator.
*/
public class MapDocumentComparator implements Comparator<Document> {
/** The comparator field. */
private String comparatorField;
private final FieldList emptyField = new FieldListImpl();
/**
* Instantiates a new map document comparator.
*
* @param comparatorField
* the comparator field
*/
public MapDocumentComparator(final String comparatorField) {
this.comparatorField = comparatorField;
}
/*
* (non-Javadoc)
* @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
*/
@Override
public int compare(final Document d1, final Document d2) {
if (d1.values(comparatorField).isEmpty() || d2.values(comparatorField).isEmpty())
return 0;
final String o1 = Iterables.getFirst(d1.values(comparatorField), emptyField).stringValue();
final String o2 = Iterables.getFirst(d2.values(comparatorField), emptyField).stringValue();
if ((o1 == null) || (o2 == null))
return 0;
final String to1 = NGramUtils.cleanupForOrdering(o1);
final String to2 = NGramUtils.cleanupForOrdering(o2);
return to1.compareTo(to2);
}
}

View File

@ -1,103 +0,0 @@
package eu.dnetlib.pace.model;
import java.lang.reflect.Type;
import com.google.gson.GsonBuilder;
import com.google.gson.InstanceCreator;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
/**
* The Class MapDocumentSerializer.
*/
public class MapDocumentSerializer implements InstanceCreator<MapDocument> {
@Override
public MapDocument createInstance(final Type type) {
return new MapDocument();
}
/**
* Decode.
*
* @param s
* the String
* @return the map document
*/
public static MapDocument decode(final String s) {
final GsonBuilder gson = new GsonBuilder();
gson.registerTypeAdapter(Field.class, new JsonDeserializer<Field>() {
@Override
public Field deserialize(final JsonElement json, final Type typeOfT,
final JsonDeserializationContext context) throws JsonParseException {
final FieldListImpl fl = new FieldListImpl();
if (json.isJsonObject()) {
fl.add(handleJsonObject(json.getAsJsonObject()));
} else if (json.isJsonArray()) {
for (final JsonElement e : json.getAsJsonArray()) {
if (e.isJsonObject()) {
fl.add(handleJsonObject(e.getAsJsonObject()));
}
}
}
return fl;
}
private Field handleJsonObject(final JsonObject o) {
final FieldListImpl fl = new FieldListImpl();
final String name = o.get("name").getAsString();
final String type = o.get("type").getAsString();
final String value = o.get("value").getAsString();
fl.add(new FieldValueImpl(eu.dnetlib.pace.config.Type.valueOf(type), name, value));
return fl;
}
});
return gson.create().fromJson(s, MapDocument.class);
}
/**
* Decode.
*
* @param bytes
* the bytes
* @return the map document
*/
public static MapDocument decode(final byte[] bytes) {
return decode(new String(bytes));
}
/**
* To string.
*
* @param doc
* the doc
* @return the string
*/
public static String toString(final MapDocument doc) {
final GsonBuilder b = new GsonBuilder();
return b.setPrettyPrinting().create().toJson(doc);
}
/**
* To byte array.
*
* @param doc
* the doc
* @return the byte[]
*/
public static byte[] toByteArray(final MapDocument doc) {
return toString(doc).getBytes();
}
}

View File

@ -0,0 +1,65 @@
package eu.dnetlib.pace.model;
import java.util.Comparator;
import org.apache.spark.sql.Row;
import eu.dnetlib.pace.clustering.NGramUtils;
/**
* The Class MapDocumentComparator.
*/
public class RowDataOrderingComparator implements Comparator<Row> {
/** The comparator field. */
private final int comparatorField;
private final int identityFieldPosition;
/**
* Instantiates a new map document comparator.
*
* @param comparatorField
* the comparator field
*/
public RowDataOrderingComparator(final int comparatorField, int identityFieldPosition) {
this.comparatorField = comparatorField;
this.identityFieldPosition = identityFieldPosition;
}
/*
* (non-Javadoc)
* @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
*/
@Override
public int compare(final Row d1, final Row d2) {
if (d1 == null)
return d2 == null ? 0 : -1;
else if (d2 == null) {
return 1;
}
final String o1 = d1.getString(comparatorField);
final String o2 = d2.getString(comparatorField);
if (o1 == null)
return o2 == null ? 0 : -1;
else if (o2 == null) {
return 1;
}
final String to1 = NGramUtils.cleanupForOrdering(o1);
final String to2 = NGramUtils.cleanupForOrdering(o2);
int res = to1.compareTo(to2);
if (res == 0) {
res = o1.compareTo(o2);
if (res == 0) {
return d1.getString(identityFieldPosition).compareTo(d2.getString(identityFieldPosition));
}
}
return res;
}
}

View File

@ -0,0 +1,131 @@
package eu.dnetlib.pace.model
import eu.dnetlib.pace.config.{DedupConfig, Type}
import eu.dnetlib.pace.util.{BlockProcessor, SparkReporter}
import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions.{col, lit, udf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, Dataset, Row, functions}
import java.util.function.Predicate
import java.util.stream.Collectors
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
case class SparkDeduper(conf: DedupConfig) extends Serializable {
val model: SparkModel = SparkModel(conf)
val dedup: (Dataset[Row] => Dataset[Row]) = df => {
df.transform(filterAndCleanup)
.transform(generateClustersWithCollect)
.transform(processBlocks)
}
val filterAndCleanup: (Dataset[Row] => Dataset[Row]) = df => {
val df_with_filters = conf.getPace.getModel.asScala.foldLeft(df)((res, fdef) => {
if (conf.blacklists.containsKey(fdef.getName)) {
res.withColumn(
fdef.getName + "_filtered",
filterColumnUDF(fdef).apply(new Column(fdef.getName))
)
} else {
res
}
})
df_with_filters
}
def filterColumnUDF(fdef: FieldDef): UserDefinedFunction = {
val blacklist: Predicate[String] = conf.blacklists().get(fdef.getName)
if (blacklist == null) {
throw new IllegalArgumentException("Column: " + fdef.getName + " does not have any filter")
} else {
fdef.getType match {
case Type.List | Type.JSON =>
udf[Array[String], Array[String]](values => {
values.filter((v: String) => !blacklist.test(v))
})
case _ =>
udf[String, String](v => {
if (blacklist.test(v)) ""
else v
})
}
}
}
val generateClustersWithCollect: (Dataset[Row] => Dataset[Row]) = df_with_filters => {
var df_with_clustering_keys: Dataset[Row] = null
for ((cd, idx) <- conf.clusterings().zipWithIndex) {
val inputColumns = cd.getFields().foldLeft(Seq[Column]())((acc, fName) => {
val column = if (conf.blacklists.containsKey(fName))
Seq(col(fName + "_filtered"))
else
Seq(col(fName))
acc ++ column
})
// Add 'key' column with the value generated by the given clustering definition
val ds: Dataset[Row] = df_with_filters
.withColumn("clustering", lit(cd.getName + "::" + idx))
.withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(inputColumns: _*))))
// Add position column having the position of the row within the set of rows having the same key value ordered by the sorting value
.withColumn("position", functions.row_number().over(Window.partitionBy("key").orderBy(col(model.orderingFieldName), col(model.identifierFieldName))))
if (df_with_clustering_keys == null)
df_with_clustering_keys = ds
else
df_with_clustering_keys = df_with_clustering_keys.union(ds)
}
//TODO: analytics
val df_with_blocks = df_with_clustering_keys
// filter out rows with position exceeding the maxqueuesize parameter
.filter(col("position").leq(conf.getWf.getQueueMaxSize))
.groupBy("clustering", "key")
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
.filter(functions.size(new Column("block")).gt(1))
df_with_blocks
}
def clusterValuesUDF(cd: ClusteringDef) = {
udf[mutable.WrappedArray[String], mutable.WrappedArray[Any]](values => {
values.flatMap(f => cd.clusteringFunction().apply(conf, Seq(f.toString).asJava).asScala)
})
}
val processBlocks: (Dataset[Row] => Dataset[Row]) = df => {
df.filter(functions.size(new Column("block")).geq(new Literal(2, DataTypes.IntegerType)))
.withColumn("relations", processBlock(df.sqlContext.sparkContext).apply(new Column("block")))
.select(functions.explode(new Column("relations")).as("relation"))
}
def processBlock(implicit sc: SparkContext) = {
val accumulators = SparkReporter.constructAccumulator(conf, sc)
udf[Array[(String, String)], mutable.WrappedArray[Row]](block => {
val reporter = new SparkReporter(accumulators)
val mapDocuments = block.asJava.stream()
.sorted(new RowDataOrderingComparator(model.orderingFieldPosition, model.identityFieldPosition))
.limit(conf.getWf.getQueueMaxSize)
.collect(Collectors.toList[Row]())
new BlockProcessor(conf, model.identityFieldPosition, model.orderingFieldPosition).processSortedRows(mapDocuments, reporter)
reporter.getRelations.asScala.toArray
}).asNondeterministic()
}
}

View File

@ -0,0 +1,108 @@
package eu.dnetlib.pace.model
import com.jayway.jsonpath.{Configuration, JsonPath}
import eu.dnetlib.pace.config.{DedupConfig, Type}
import eu.dnetlib.pace.util.MapDocumentUtil
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import org.apache.spark.sql.{Dataset, Row}
import java.util.regex.Pattern
import scala.collection.JavaConverters._
case class SparkModel(conf: DedupConfig) {
private val URL_REGEX: Pattern = Pattern.compile("^\\s*(http|https|ftp)\\://.*")
private val CONCAT_REGEX: Pattern = Pattern.compile("\\|\\|\\|")
val identifierFieldName = "identifier"
val orderingFieldName = if (!conf.getWf.getOrderField.isEmpty) conf.getWf.getOrderField else identifierFieldName
val schema: StructType = {
// create an implicit identifier field
val identifier = new FieldDef()
identifier.setName(identifierFieldName)
identifier.setType(Type.String)
// Construct a Spark StructType representing the schema of the model
(Seq(identifier) ++ conf.getPace.getModel.asScala)
.foldLeft(
new StructType()
)((resType, fieldDef) => {
resType.add(fieldDef.getType match {
case Type.List | Type.JSON =>
StructField(fieldDef.getName, DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty)
case Type.DoubleArray =>
StructField(fieldDef.getName, DataTypes.createArrayType(DataTypes.DoubleType), true, Metadata.empty)
case _ =>
StructField(fieldDef.getName, DataTypes.StringType, true, Metadata.empty)
})
})
}
val identityFieldPosition: Int = schema.fieldIndex(identifierFieldName)
val orderingFieldPosition: Int = schema.fieldIndex(orderingFieldName)
val parseJsonDataset: (Dataset[String] => Dataset[Row]) = df => {
df.map(r => rowFromJson(r))(RowEncoder(schema))
}
def rowFromJson(json: String): Row = {
val documentContext =
JsonPath.using(Configuration.defaultConfiguration.addOptions(com.jayway.jsonpath.Option.SUPPRESS_EXCEPTIONS)).parse(json)
val values = new Array[Any](schema.size)
values(identityFieldPosition) = MapDocumentUtil.getJPathString(conf.getWf.getIdPath, documentContext)
schema.fieldNames.zipWithIndex.foldLeft(values) {
case ((res, (fname, index))) => {
val fdef = conf.getPace.getModelMap.get(fname)
if (fdef != null) {
res(index) = fdef.getType match {
case Type.String | Type.Int =>
MapDocumentUtil.truncateValue(
MapDocumentUtil.getJPathString(fdef.getPath, documentContext),
fdef.getLength
)
case Type.URL =>
var uv = MapDocumentUtil.getJPathString(fdef.getPath, documentContext)
if (!URL_REGEX.matcher(uv).matches)
uv = ""
uv
case Type.List | Type.JSON =>
MapDocumentUtil.truncateList(
MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
fdef.getSize
).toArray
case Type.StringConcat =>
val jpaths = CONCAT_REGEX.split(fdef.getPath)
MapDocumentUtil.truncateValue(
jpaths
.map(jpath => MapDocumentUtil.getJPathString(jpath, documentContext))
.mkString(" "),
fdef.getLength
)
case Type.DoubleArray =>
MapDocumentUtil.getJPathArray(fdef.getPath, json)
}
}
res
}
}
new GenericRowWithSchema(values, schema)
}
}

View File

@ -6,12 +6,11 @@ import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("alwaysMatch")
public class AlwaysMatch extends AbstractComparator {
public class AlwaysMatch<T> extends AbstractComparator<T> {
public AlwaysMatch(final Map<String, String> params) {
super(params, new com.wcohen.ss.JaroWinkler());
@ -26,7 +25,7 @@ public class AlwaysMatch extends AbstractComparator {
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
public double compare(final Object a, final Object b, final Config conf) {
return 1.0;
}

View File

@ -1,25 +1,19 @@
package eu.dnetlib.pace.tree;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.Iterables;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
import eu.dnetlib.pace.model.Person;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractListComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("authorsMatch")
public class AuthorsMatch extends AbstractComparator {
public class AuthorsMatch extends AbstractListComparator {
Map<String, String> params;
@ -49,24 +43,16 @@ public class AuthorsMatch extends AbstractComparator {
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
public double compare(final List<String> a, final List<String> b, final Config conf) {
if (a.isEmpty() || b.isEmpty())
return -1;
if (((FieldList) a).size() > SIZE_THRESHOLD || ((FieldList) b).size() > SIZE_THRESHOLD)
if (a.size() > SIZE_THRESHOLD || b.size() > SIZE_THRESHOLD)
return 1.0;
List<Person> aList = ((FieldList) a)
.stringList()
.stream()
.map(author -> new Person(author, false))
.collect(Collectors.toList());
List<Person> bList = ((FieldList) b)
.stringList()
.stream()
.map(author -> new Person(author, false))
.collect(Collectors.toList());
List<Person> aList = a.stream().map(author -> new Person(author, false)).collect(Collectors.toList());
List<Person> bList = b.stream().map(author -> new Person(author, false)).collect(Collectors.toList());
common = 0;
// compare each element of List1 with each element of List2

View File

@ -5,11 +5,11 @@ import java.util.Map;
import java.util.Set;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("cityMatch")
public class CityMatch extends AbstractComparator {
public class CityMatch extends AbstractStringComparator {
private Map<String, String> params;

View File

@ -1,21 +1,14 @@
package eu.dnetlib.pace.tree;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
import eu.dnetlib.pace.model.FieldValueImpl;
import eu.dnetlib.pace.model.Person;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("cosineSimilarity")
public class CosineSimilarity extends AbstractComparator {
public class CosineSimilarity extends AbstractComparator<double[]> {
Map<String, String> params;
@ -24,15 +17,16 @@ public class CosineSimilarity extends AbstractComparator {
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
public double compare(Object a, Object b, Config config) {
return compare((double[]) a, (double[]) b, config);
}
if (a.isEmpty() || b.isEmpty())
public double compare(final double[] a, final double[] b, final Config conf) {
if (a.length == 0 || b.length == 0)
return -1;
double[] aVector = ((FieldValueImpl) a).doubleArrayValue();
double[] bVector = ((FieldValueImpl) b).doubleArrayValue();
return cosineSimilarity(aVector, bVector);
return cosineSimilarity(a, b);
}
double cosineSimilarity(double[] a, double[] b) {

View File

@ -3,7 +3,6 @@ package eu.dnetlib.pace.tree;
import java.util.Map;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.tree.support.ComparatorClass;
/**
@ -21,8 +20,8 @@ public class DoiExactMatch extends ExactMatchIgnoreCase {
}
@Override
protected String getValue(final Field f) {
return super.getValue(f).replaceAll(PREFIX, "");
protected String toString(final Object f) {
return super.toString(f).replaceAll(PREFIX, "");
}
}

View File

@ -5,7 +5,6 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("domainExactMatch")
@ -16,10 +15,10 @@ public class DomainExactMatch extends ExactMatchIgnoreCase {
}
@Override
protected String getValue(final Field f) {
protected String toString(final Object f) {
try {
return asUrl(super.getValue(f)).getHost();
return asUrl(super.toString(f)).getHost();
} catch (MalformedURLException e) {
return "";
}

View File

@ -6,11 +6,11 @@ import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("exactMatch")
public class ExactMatch extends AbstractComparator {
public class ExactMatch extends AbstractStringComparator {
public ExactMatch(Map<String, String> params) {
super(params, new com.wcohen.ss.JaroWinkler());

View File

@ -4,30 +4,26 @@ package eu.dnetlib.pace.tree;
import java.util.Map;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("exactMatchIgnoreCase")
public class ExactMatchIgnoreCase extends AbstractComparator {
public class ExactMatchIgnoreCase extends AbstractStringComparator {
public ExactMatchIgnoreCase(Map<String, String> params) {
super(params);
}
@Override
public double compare(Field a, Field b, final Config conf) {
public double compare(String a, String b, final Config conf) {
final String fa = getValue(a);
final String fb = getValue(b);
if (fa.isEmpty() || fb.isEmpty())
if (a.isEmpty() || b.isEmpty())
return -1;
return fa.equalsIgnoreCase(fb) ? 1 : 0;
return a.equalsIgnoreCase(b) ? 1 : 0;
}
protected String getValue(final Field f) {
return getFirstValue(f);
protected String toString(final Object object) {
return toFirstString(object);
}
}

View File

@ -10,13 +10,11 @@ import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractListComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("instanceTypeMatch")
public class InstanceTypeMatch extends AbstractComparator {
public class InstanceTypeMatch extends AbstractListComparator {
final Map<String, String> translationMap = new HashMap<>();
@ -42,21 +40,18 @@ public class InstanceTypeMatch extends AbstractComparator {
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
public double compare(final List<String> a, final List<String> b, final Config conf) {
if (a == null || b == null) {
return -1;
}
final List<String> sa = ((FieldList) a).stringList();
final List<String> sb = ((FieldList) b).stringList();
if (sa.isEmpty() || sb.isEmpty()) {
if (a.isEmpty() || b.isEmpty()) {
return -1;
}
final Set<String> ca = sa.stream().map(this::translate).collect(Collectors.toSet());
final Set<String> cb = sb.stream().map(this::translate).collect(Collectors.toSet());
final Set<String> ca = a.stream().map(this::translate).collect(Collectors.toSet());
final Set<String> cb = b.stream().map(this::translate).collect(Collectors.toSet());
// if at least one is a jolly type, it must produce a match
if (ca.contains("*") || cb.contains("*"))

View File

@ -6,12 +6,12 @@ import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
//case class JaroWinkler(w: Double) extends SecondStringDistanceAlgo(w, new com.wcohen.ss.JaroWinkler())
@ComparatorClass("jaroWinkler")
public class JaroWinkler extends AbstractComparator {
public class JaroWinkler extends AbstractStringComparator {
public JaroWinkler(Map<String, String> params) {
super(params, new com.wcohen.ss.JaroWinkler());

View File

@ -7,11 +7,11 @@ import java.util.Set;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("jaroWinklerNormalizedName")
public class JaroWinklerNormalizedName extends AbstractComparator {
public class JaroWinklerNormalizedName extends AbstractStringComparator {
private Map<String, String> params;

View File

@ -6,12 +6,12 @@ import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
//case class JaroWinkler(w: Double) extends SecondStringDistanceAlgo(w, new com.wcohen.ss.JaroWinkler())
@ComparatorClass("jaroWinklerTitle")
public class JaroWinklerTitle extends AbstractComparator {
public class JaroWinklerTitle extends AbstractStringComparator {
public JaroWinklerTitle(Map<String, String> params) {
super(params, new com.wcohen.ss.JaroWinkler());

View File

@ -10,16 +10,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.collect.Sets;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractListComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
import eu.dnetlib.pace.util.MapDocumentUtil;
@ComparatorClass("jsonListMatch")
public class JsonListMatch extends AbstractComparator {
public class JsonListMatch extends AbstractListComparator {
private static final Log log = LogFactory.getLog(JsonListMatch.class);
private Map<String, String> params;
@ -34,11 +36,7 @@ public class JsonListMatch extends AbstractComparator {
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
final List<String> sa = ((FieldList) a).stringList();
final List<String> sb = ((FieldList) b).stringList();
public double compare(final List<String> sa, final List<String> sb, final Config conf) {
if (sa.isEmpty() || sb.isEmpty()) {
return -1;
}
@ -65,14 +63,17 @@ public class JsonListMatch extends AbstractComparator {
StringBuilder st = new StringBuilder(); // to build the string used for comparisons basing on the jpath into
// parameters
final DocumentContext documentContext = JsonPath
.using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS))
.parse(json);
// for each path in the param list
for (String key : params.keySet().stream().filter(k -> k.contains("jpath")).collect(Collectors.toList())) {
String path = params.get(key);
String value = MapDocumentUtil.getJPathString(path, json);
String value = MapDocumentUtil.getJPathString(path, documentContext);
if (value == null || value.isEmpty())
value = "";
st.append(value + "::");
st.append(value);
st.append("::");
}
st.setLength(st.length() - 2);

View File

@ -5,11 +5,11 @@ import java.util.Map;
import java.util.Set;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("keywordMatch")
public class KeywordMatch extends AbstractComparator {
public class KeywordMatch extends AbstractStringComparator {
Map<String, String> params;

View File

@ -5,11 +5,11 @@ import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("level2JaroWinkler")
public class Level2JaroWinkler extends AbstractComparator {
public class Level2JaroWinkler extends AbstractStringComparator {
public Level2JaroWinkler(Map<String, String> params) {
super(params, new com.wcohen.ss.Level2JaroWinkler());

View File

@ -6,11 +6,11 @@ import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("level2JaroWinklerTitle")
public class Level2JaroWinklerTitle extends AbstractComparator {
public class Level2JaroWinklerTitle extends AbstractStringComparator {
public Level2JaroWinklerTitle(Map<String, String> params) {
super(params, new com.wcohen.ss.Level2JaroWinkler());

View File

@ -5,11 +5,11 @@ import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("level2Levenstein")
public class Level2Levenstein extends AbstractComparator {
public class Level2Levenstein extends AbstractStringComparator {
public Level2Levenstein(Map<String, String> params) {
super(params, new com.wcohen.ss.Level2Levenstein());

View File

@ -5,11 +5,11 @@ import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("levenstein")
public class Levenstein extends AbstractComparator {
public class Levenstein extends AbstractStringComparator {
public Levenstein(Map<String, String> params) {
super(params, new com.wcohen.ss.Levenstein());

View File

@ -9,11 +9,11 @@ import org.apache.commons.logging.LogFactory;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("levensteinTitle")
public class LevensteinTitle extends AbstractComparator {
public class LevensteinTitle extends AbstractStringComparator {
private static final Log log = LogFactory.getLog(LevensteinTitle.class);

View File

@ -6,14 +6,14 @@ import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
/**
* Compared compare between two titles, ignoring version numbers. Suitable for Software entities.
*/
@ComparatorClass("levensteinTitleIgnoreVersion")
public class LevensteinTitleIgnoreVersion extends AbstractComparator {
public class LevensteinTitleIgnoreVersion extends AbstractStringComparator {
public LevensteinTitleIgnoreVersion(Map<String, String> params) {
super(params, new com.wcohen.ss.Levenstein());

View File

@ -3,15 +3,10 @@ package eu.dnetlib.pace.tree;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractListComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
/**
@ -20,7 +15,7 @@ import eu.dnetlib.pace.tree.support.ComparatorClass;
* @author miconis
* */
@ComparatorClass("listContainsMatch")
public class ListContainsMatch extends AbstractComparator {
public class ListContainsMatch extends AbstractListComparator {
private Map<String, String> params;
private boolean CASE_SENSITIVE;
@ -38,11 +33,7 @@ public class ListContainsMatch extends AbstractComparator {
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
List<String> sa = ((FieldList) a).stringList();
List<String> sb = ((FieldList) b).stringList();
public double compare(List<String> sa, List<String> sb, Config conf) {
if (sa.isEmpty() || sb.isEmpty()) {
return -1;
}

View File

@ -6,11 +6,11 @@ import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("mustBeDifferent")
public class MustBeDifferent extends AbstractComparator {
public class MustBeDifferent extends AbstractStringComparator {
public MustBeDifferent(Map<String, String> params) {
super(params, new com.wcohen.ss.Levenstein());

View File

@ -4,7 +4,6 @@ package eu.dnetlib.pace.tree;
import java.util.Map;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.tree.support.Comparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ -13,13 +12,13 @@ import eu.dnetlib.pace.tree.support.ComparatorClass;
* NullDistanceAlgo.
*/
@ComparatorClass("null")
public class NullDistanceAlgo implements Comparator {
public class NullDistanceAlgo<T> implements Comparator<T> {
public NullDistanceAlgo(Map<String, String> params) {
}
@Override
public double compare(Field a, Field b, Config config) {
public double compare(Object a, Object b, Config config) {
return 0;
}
}

View File

@ -4,11 +4,11 @@ package eu.dnetlib.pace.tree;
import java.util.Map;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("numbersComparator")
public class NumbersComparator extends AbstractComparator {
public class NumbersComparator extends AbstractStringComparator {
Map<String, String> params;

View File

@ -4,11 +4,11 @@ package eu.dnetlib.pace.tree;
import java.util.Map;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("numbersMatch")
public class NumbersMatch extends AbstractComparator {
public class NumbersMatch extends AbstractStringComparator {
public NumbersMatch(Map<String, String> params) {
super(params);

View File

@ -4,11 +4,11 @@ package eu.dnetlib.pace.tree;
import java.util.Map;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("romansMatch")
public class RomansMatch extends AbstractComparator {
public class RomansMatch extends AbstractStringComparator {
public RomansMatch(Map<String, String> params) {
super(params);

View File

@ -4,11 +4,8 @@ package eu.dnetlib.pace.tree;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Iterables;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractListComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
/**
@ -17,7 +14,7 @@ import eu.dnetlib.pace.tree.support.ComparatorClass;
* @author claudio
*/
@ComparatorClass("sizeMatch")
public class SizeMatch extends AbstractComparator {
public class SizeMatch extends AbstractListComparator {
/**
* Instantiates a new size match.
@ -30,23 +27,12 @@ public class SizeMatch extends AbstractComparator {
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
public double compare(final List<String> a, final List<String> b, final Config conf) {
if (a.isEmpty() || b.isEmpty())
return -1;
return -1.0;
return Iterables.size(a) == Iterables.size(b) ? 1 : 0;
}
/**
* Checks if is empty.
*
* @param a
* the a
* @return true, if is empty
*/
protected boolean isEmpty(final Iterable<?> a) {
return (a == null) || Iterables.isEmpty(a);
return a.size() == b.size() ? 1.0 : 0.0;
}
}

View File

@ -4,7 +4,7 @@ package eu.dnetlib.pace.tree;
import java.util.Map;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
/**
@ -13,7 +13,7 @@ import eu.dnetlib.pace.tree.support.ComparatorClass;
* @author miconis
* */
@ComparatorClass("stringContainsMatch")
public class StringContainsMatch extends AbstractComparator {
public class StringContainsMatch extends AbstractStringComparator {
private Map<String, String> params;

View File

@ -2,6 +2,7 @@
package eu.dnetlib.pace.tree;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -11,13 +12,11 @@ import org.apache.commons.logging.LogFactory;
import com.google.common.collect.Sets;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractListComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("stringListMatch")
public class StringListMatch extends AbstractComparator {
public class StringListMatch extends AbstractListComparator {
private static final Log log = LogFactory.getLog(StringListMatch.class);
private Map<String, String> params;
@ -32,10 +31,10 @@ public class StringListMatch extends AbstractComparator {
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
public double compare(final List<String> a, final List<String> b, final Config conf) {
final Set<String> pa = new HashSet<>(((FieldList) a).stringList());
final Set<String> pb = new HashSet<>(((FieldList) b).stringList());
final Set<String> pa = new HashSet<>(a);
final Set<String> pb = new HashSet<>(b);
if (pa.isEmpty() || pb.isEmpty()) {
return -1; // return undefined if one of the two lists is empty

View File

@ -8,25 +8,24 @@ import org.apache.commons.lang3.StringUtils;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.config.Type;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
/**
* The Class SubStringLevenstein.
*/
@ComparatorClass("subStringLevenstein")
public class SubStringLevenstein extends AbstractComparator {
public class SubStringLevenstein extends AbstractStringComparator {
/** The limit. */
/**
* The limit.
*/
protected int limit;
/**
* Instantiates a new sub string levenstein.
*
* @param w
* the w
* @param w the w
*/
public SubStringLevenstein(final double w) {
super(w, new com.wcohen.ss.Levenstein());
@ -40,10 +39,8 @@ public class SubStringLevenstein extends AbstractComparator {
/**
* Instantiates a new sub string levenstein.
*
* @param w
* the w
* @param limit
* the limit
* @param w the w
* @param limit the limit
*/
public SubStringLevenstein(final double w, final int limit) {
super(w, new com.wcohen.ss.Levenstein());
@ -53,12 +50,9 @@ public class SubStringLevenstein extends AbstractComparator {
/**
* Instantiates a new sub string levenstein.
*
* @param w
* the w
* @param limit
* the limit
* @param ssalgo
* the ssalgo
* @param w the w
* @param limit the limit
* @param ssalgo the ssalgo
*/
protected SubStringLevenstein(final double w, final int limit, final AbstractStringDistance ssalgo) {
super(w, ssalgo);
@ -71,11 +65,8 @@ public class SubStringLevenstein extends AbstractComparator {
* eu.dnetlib.pace.model.Field)
*/
@Override
public double distance(final Field a, final Field b, final Config conf) {
if (a.getType().equals(Type.String) && b.getType().equals(Type.String))
return distance(StringUtils.left(a.stringValue(), limit), StringUtils.left(b.stringValue(), limit), conf);
throw new IllegalArgumentException("invalid types\n- A: " + a.toString() + "\n- B: " + b.toString());
public double distance(final String a, final String b, final Config conf) {
return distance(StringUtils.left(a, limit), StringUtils.left(b, limit), conf);
}
/*

View File

@ -1,12 +1,10 @@
package eu.dnetlib.pace.tree;
import java.util.List;
import java.util.Map;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
/**
@ -16,17 +14,14 @@ import eu.dnetlib.pace.tree.support.ComparatorClass;
*
*/
@ComparatorClass("titleVersionMatch")
public class TitleVersionMatch extends AbstractComparator {
public class TitleVersionMatch extends AbstractStringComparator {
public TitleVersionMatch(final Map<String, String> params) {
super(params);
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
final String valueA = getFirstValue(a);
final String valueB = getFirstValue(b);
public double compare(final String valueA, final String valueB, final Config conf) {
if (valueA.isEmpty() || valueB.isEmpty())
return -1;
@ -38,4 +33,7 @@ public class TitleVersionMatch extends AbstractComparator {
return getClass().getSimpleName() + ":" + super.toString();
}
protected String toString(final Object object) {
return toFirstString(object);
}
}

View File

@ -8,7 +8,6 @@ import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.tree.support.ComparatorClass;
@ComparatorClass("urlMatcher")
@ -31,9 +30,9 @@ public class UrlMatcher extends Levenstein {
}
@Override
public double distance(Field a, Field b, final Config conf) {
final URL urlA = asUrl(getFirstValue(a));
final URL urlB = asUrl(getFirstValue(b));
public double distance(String a, String b, final Config conf) {
final URL urlA = asUrl(a);
final URL urlB = asUrl(b);
if (!urlA.getHost().equalsIgnoreCase(urlB.getHost())) {
return 0.0;
@ -58,4 +57,7 @@ public class UrlMatcher extends Levenstein {
}
}
protected String toString(final Object object) {
return toFirstString(object);
}
}

View File

@ -6,8 +6,7 @@ import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.tree.support.AbstractComparator;
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
import eu.dnetlib.pace.tree.support.ComparatorClass;
/**
@ -16,7 +15,7 @@ import eu.dnetlib.pace.tree.support.ComparatorClass;
* @author claudio
*/
@ComparatorClass("yearMatch")
public class YearMatch extends AbstractComparator {
public class YearMatch extends AbstractStringComparator {
private int limit = 4;
@ -25,7 +24,7 @@ public class YearMatch extends AbstractComparator {
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
public double compare(final String a, final String b, final Config conf) {
final String valueA = getNumbers(getFirstValue(a));
final String valueB = getNumbers(getFirstValue(b));
@ -42,8 +41,8 @@ public class YearMatch extends AbstractComparator {
return s.length() == limit;
}
protected String getFirstValue(final Field value) {
return (value != null) && !value.isEmpty() ? StringUtils.left(value.stringValue(), limit) : "";
protected String getFirstValue(final String value) {
return (value != null) && !value.isEmpty() ? StringUtils.left(value, limit) : "";
}
@Override

View File

@ -4,15 +4,14 @@ package eu.dnetlib.pace.tree.support;
import java.util.List;
import java.util.Map;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.common.AbstractPaceFunctions;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.config.Type;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
public abstract class AbstractComparator extends AbstractPaceFunctions implements Comparator {
public abstract class AbstractComparator<T> extends AbstractPaceFunctions implements Comparator<T> {
/** The ssalgo. */
protected AbstractStringDistance ssalgo;
@ -69,8 +68,8 @@ public abstract class AbstractComparator extends AbstractPaceFunctions implement
* the b
* @return the double
*/
public double distance(final String a, final String b, final Config conf) {
protected double distance(final String a, final String b, final Config conf) {
if (a.isEmpty() || b.isEmpty()) {
return -1; // return -1 if a field is missing
}
@ -78,49 +77,50 @@ public abstract class AbstractComparator extends AbstractPaceFunctions implement
return normalize(score);
}
/**
* Distance.
*
* @param a
* the a
* @param b
* the b
* @return the double
*/
protected double distance(final List<String> a, final List<String> b, final Config conf) {
return distance(concat(a), concat(b), conf);
}
public double distance(final Field a, final Field b, final Config conf) {
if (a.getType().equals(Type.String) && b.getType().equals(Type.String))
return distance(a.stringValue(), b.stringValue(), conf);
if (a.getType().equals(Type.List) && b.getType().equals(Type.List))
return distance(toList(a), toList(b), conf);
throw new IllegalArgumentException("invalid types\n- A: " + a.toString() + "\n- B: " + b.toString());
}
@Override
public double compare(final Field a, final Field b, final Config conf) {
protected double compare(final String a, final String b, final Config conf) {
if (a.isEmpty() || b.isEmpty())
return -1;
if (a.getType().equals(Type.String) && b.getType().equals(Type.String))
return distance(a.stringValue(), b.stringValue(), conf);
if (a.getType().equals(Type.List) && b.getType().equals(Type.List))
return distance(toList(a), toList(b), conf);
throw new IllegalArgumentException("invalid types\n- A: " + a.toString() + "\n- B: " + b.toString());
return distance(a, b, conf);
}
/**
* To list.
* Convert the given argument to a List of Strings
*
* @param list
* the list
* @param object
* function argument
* @return the list
*/
protected List<String> toList(final Field list) {
return ((FieldList) list).stringList();
protected List<String> toList(final Object object) {
if (object instanceof List) {
return (List<String>) object;
}
return Lists.newArrayList(object.toString());
}
/**
* Convert the given argument to a String
*
* @param object
* function argument
* @return the list
*/
protected String toString(final Object object) {
if (object instanceof List) {
List<String> l = (List<String>) object;
return Joiner.on(" ").join(l);
}
return object.toString();
}
protected String toFirstString(final Object object) {
if (object instanceof List) {
List<String> l = (List<String>) object;
return l.isEmpty() ? "" : l.get(0);
}
return object.toString();
}
public double getWeight() {

View File

@ -0,0 +1,39 @@
package eu.dnetlib.pace.tree.support;
import java.util.List;
import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
abstract public class AbstractListComparator extends AbstractComparator<List<String>> {
protected AbstractListComparator(Map<String, String> params) {
super(params);
}
protected AbstractListComparator(Map<String, String> params, AbstractStringDistance ssalgo) {
super(params, ssalgo);
}
protected AbstractListComparator(double weight, AbstractStringDistance ssalgo) {
super(weight, ssalgo);
}
protected AbstractListComparator(AbstractStringDistance ssalgo) {
super(ssalgo);
}
@Override
public double compare(Object a, Object b, Config conf) {
return compare(toList(a), toList(b), conf);
}
public double compare(final List<String> a, final List<String> b, final Config conf) {
if (a.isEmpty() || b.isEmpty())
return -1;
return distance(concat(a), concat(b), conf);
}
}

View File

@ -8,10 +8,7 @@ import java.util.Map;
import com.google.common.collect.Lists;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
public abstract class AbstractSortedComparator extends AbstractComparator {
public abstract class AbstractSortedComparator extends AbstractListComparator {
/**
* Instantiates a new sorted second string compare algo.
@ -30,11 +27,14 @@ public abstract class AbstractSortedComparator extends AbstractComparator {
}
@Override
protected List<String> toList(final Field list) {
FieldList fl = (FieldList) list;
List<String> values = Lists.newArrayList(fl.stringList());
protected List<String> toList(final Object object) {
if (object instanceof List) {
List<String> fl = (List<String>) object;
List<String> values = Lists.newArrayList(fl);
Collections.sort(values);
return values;
}
return Lists.newArrayList(object.toString());
}
}

View File

@ -0,0 +1,46 @@
package eu.dnetlib.pace.tree.support;
import java.util.Map;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.config.Config;
public abstract class AbstractStringComparator extends AbstractComparator<String> {
protected AbstractStringComparator(Map<String, String> params) {
super(params);
}
protected AbstractStringComparator(Map<String, String> params, AbstractStringDistance ssalgo) {
super(params, ssalgo);
}
protected AbstractStringComparator(double weight, AbstractStringDistance ssalgo) {
super(weight, ssalgo);
}
protected AbstractStringComparator(AbstractStringDistance ssalgo) {
super(ssalgo);
}
public double distance(final String a, final String b, final Config conf) {
if (a.isEmpty() || b.isEmpty()) {
return -1; // return -1 if a field is missing
}
double score = ssalgo.score(a, b);
return normalize(score);
}
@Override
public double compare(Object a, Object b, Config conf) {
return compare(toString(a), toString(b), conf);
}
public double compare(final String a, final String b, final Config conf) {
if (a.isEmpty() || b.isEmpty())
return -1;
return distance(a, b, conf);
}
}

View File

@ -2,13 +2,11 @@
package eu.dnetlib.pace.tree.support;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.Field;
public interface Comparator {
public interface Comparator<T> {
/*
* return : -1 -> can't decide (i.e. missing field) >0 -> similarity degree (depends on the algorithm)
*/
public double compare(Field a, Field b, Config conf);
public double compare(Object a, Object b, Config conf);
}

View File

@ -6,7 +6,6 @@ import java.io.Serializable;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.util.PaceException;
/**
@ -17,12 +16,12 @@ public class FieldStats implements Serializable {
private double weight; // weight for the field (to be used in the aggregation)
private double threshold; // threshold for the field (to be used in some kind of aggregations)
private double result; // the result of the comparison
private Field a;
private Field b;
private Object a;
private Object b;
private boolean countIfUndefined;
public FieldStats(double weight, double threshold, double result, boolean countIfUndefined, Field a, Field b) {
public FieldStats(double weight, double threshold, double result, boolean countIfUndefined, Object a, Object b) {
this.weight = weight;
this.threshold = threshold;
this.result = result;
@ -63,19 +62,19 @@ public class FieldStats implements Serializable {
this.countIfUndefined = countIfUndefined;
}
public Field getA() {
public Object getA() {
return a;
}
public void setA(Field a) {
public void setA(Object a) {
this.a = a;
}
public Field getB() {
public Object getB() {
return b;
}
public void setB(Field b) {
public void setB(Object b) {
this.b = b;
}

View File

@ -7,10 +7,19 @@ public enum MatchType {
public static MatchType parse(String value) {
try {
return MatchType.valueOf(value);
} catch (IllegalArgumentException e) {
return MatchType.UNDEFINED; // return UNDEFINED if the enum is not parsable
}
if (MATCH.name().equals(value)) {
return MATCH;
} else if (NO_MATCH.name().equals(value)) {
return NO_MATCH;
} else {
return UNDEFINED;
}
// try {
// return MatchType.valueOf(value);
// }
// catch (IllegalArgumentException e) {
// return MatchType.UNDEFINED; //return UNDEFINED if the enum is not parsable
// }
}
}

View File

@ -3,17 +3,17 @@ package eu.dnetlib.pace.tree.support;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.config.PaceConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.PaceException;
public class TreeNodeDef implements Serializable {
@ -46,31 +46,27 @@ public class TreeNodeDef implements Serializable {
}
// function for the evaluation of the node
public TreeNodeStats evaluate(MapDocument doc1, MapDocument doc2, Config conf) {
public TreeNodeStats evaluate(Row doc1, Row doc2, Config conf) {
TreeNodeStats stats = new TreeNodeStats();
// for each field in the node, it computes the
for (FieldConf fieldConf : fields) {
double weight = fieldConf.getWeight();
double result;
Object value1 = getJavaValue(doc1, fieldConf.getField());
Object value2 = getJavaValue(doc2, fieldConf.getField());
// if the param specifies a cross comparison (i.e. compare elements from different fields), compute the
// result for both sides and return the maximum
if (fieldConf.getParams().keySet().stream().anyMatch(k -> k.contains(CROSS_COMPARE))) {
String crossField = fieldConf.getParams().get(CROSS_COMPARE);
double result1 = comparator(fieldConf)
.compare(doc1.getFieldMap().get(fieldConf.getField()), doc2.getFieldMap().get(crossField), conf);
double result2 = comparator(fieldConf)
.compare(doc1.getFieldMap().get(crossField), doc2.getFieldMap().get(fieldConf.getField()), conf);
if (crossField != null) {
double result1 = comparator(fieldConf).compare(value1, getJavaValue(doc2, crossField), conf);
double result2 = comparator(fieldConf).compare(getJavaValue(doc1, crossField), value2, conf);
result = Math.max(result1, result2);
} else {
result = comparator(fieldConf)
.compare(
doc1.getFieldMap().get(fieldConf.getField()), doc2.getFieldMap().get(fieldConf.getField()),
conf);
result = comparator(fieldConf).compare(value1, value2, conf);
}
stats
@ -81,13 +77,27 @@ public class TreeNodeDef implements Serializable {
Double.parseDouble(fieldConf.getParams().getOrDefault("threshold", "1.0")),
result,
fieldConf.isCountIfUndefined(),
doc1.getFieldMap().get(fieldConf.getField()),
doc2.getFieldMap().get(fieldConf.getField())));
value1,
value2));
}
return stats;
}
public Object getJavaValue(Row row, String name) {
int pos = row.fieldIndex(name);
if (pos >= 0) {
DataType dt = row.schema().fields()[pos].dataType();
if (dt instanceof StringType) {
return row.getString(pos);
} else if (dt instanceof ArrayType) {
return row.getList(pos);
}
}
return null;
}
private Comparator comparator(final FieldConf field) {
return PaceConfig.resolver.getComparator(field.getComparator(), field.getParams());

View File

@ -3,9 +3,9 @@ package eu.dnetlib.pace.tree.support;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Row;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.PaceException;
/**
@ -21,72 +21,72 @@ public class TreeProcessor {
this.config = config;
}
public boolean compare(final MapDocument a, final MapDocument b) {
// row based copies
public boolean compare(final Row a, final Row b) {
// evaluate the decision tree
return evaluateTree(a, b).getResult() == MatchType.MATCH;
}
public TreeStats evaluateTree(final MapDocument doc1, final MapDocument doc2) {
public TreeStats evaluateTree(final Row doc1, final Row doc2) {
TreeStats treeStats = new TreeStats();
String current = "start";
String nextNodeName = "start";
while (MatchType.parse(current) == MatchType.UNDEFINED) {
do {
TreeNodeDef currentNode = config.decisionTree().get(current);
TreeNodeDef currentNode = config.decisionTree().get(nextNodeName);
// throw an exception if the node doesn't exist
if (currentNode == null)
throw new PaceException("Missing tree node: " + current);
throw new PaceException("Missing tree node: " + nextNodeName);
TreeNodeStats stats = currentNode.evaluate(doc1, doc2, config);
treeStats.addNodeStats(current, stats);
treeStats.addNodeStats(nextNodeName, stats);
// if ignoreUndefined=false the miss is considered as undefined
if (!currentNode.isIgnoreUndefined() && stats.undefinedCount() > 0) {
current = currentNode.getUndefined();
nextNodeName = currentNode.getUndefined();
}
// if ignoreUndefined=true the miss is ignored and the score computed anyway
else if (stats.getFinalScore(currentNode.getAggregation()) >= currentNode.getThreshold()) {
current = currentNode.getPositive();
nextNodeName = currentNode.getPositive();
} else {
current = currentNode.getNegative();
nextNodeName = currentNode.getNegative();
}
}
} while (MatchType.parse(nextNodeName) == MatchType.UNDEFINED);
treeStats.setResult(MatchType.parse(current));
treeStats.setResult(MatchType.parse(nextNodeName));
return treeStats;
}
public double computeScore(final MapDocument doc1, final MapDocument doc2) {
String current = "start";
public double computeScore(final Row doc1, final Row doc2) {
String nextNodeName = "start";
double score = 0.0;
while (MatchType.parse(current) == MatchType.UNDEFINED) {
do {
TreeNodeDef currentNode = config.decisionTree().get(current);
TreeNodeDef currentNode = config.decisionTree().get(nextNodeName);
// throw an exception if the node doesn't exist
if (currentNode == null)
throw new PaceException("The Tree Node doesn't exist: " + current);
throw new PaceException("The Tree Node doesn't exist: " + nextNodeName);
TreeNodeStats stats = currentNode.evaluate(doc1, doc2, config);
score = stats.getFinalScore(currentNode.getAggregation());
// if ignoreUndefined=false the miss is considered as undefined
if (!currentNode.isIgnoreUndefined() && stats.undefinedCount() > 0) {
current = currentNode.getUndefined();
nextNodeName = currentNode.getUndefined();
}
// if ignoreUndefined=true the miss is ignored and the score computed anyway
else if (stats.getFinalScore(currentNode.getAggregation()) >= currentNode.getThreshold()) {
current = currentNode.getPositive();
nextNodeName = currentNode.getPositive();
} else {
current = currentNode.getNegative();
}
nextNodeName = currentNode.getNegative();
}
} while (MatchType.parse(nextNodeName) == MatchType.UNDEFINED);
return score;
}
}

View File

@ -1,20 +1,19 @@
package eu.dnetlib.pace.util;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType;
import com.google.common.collect.Lists;
import eu.dnetlib.pace.clustering.NGramUtils;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.config.WfConfig;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.model.MapDocumentComparator;
import eu.dnetlib.pace.tree.support.TreeProcessor;
public class BlockProcessor {
@ -25,6 +24,9 @@ public class BlockProcessor {
private DedupConfig dedupConf;
private final int identifierFieldPos;
private final int orderFieldPos;
public static void constructAccumulator(final DedupConfig dedupConf) {
accumulators.add(String.format("%s::%s", dedupConf.getWf().getEntityType(), "records per hash key = 1"));
accumulators
@ -47,152 +49,80 @@ public class BlockProcessor {
.add(String.format("%s::%s", dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()));
}
public BlockProcessor(DedupConfig dedupConf) {
public BlockProcessor(DedupConfig dedupConf, int identifierFieldPos, int orderFieldPos) {
this.dedupConf = dedupConf;
this.identifierFieldPos = identifierFieldPos;
this.orderFieldPos = orderFieldPos;
}
public void processSortedBlock(final String key, final List<MapDocument> documents, final Reporter context) {
public void processSortedRows(final List<Row> documents, final Reporter context) {
if (documents.size() > 1) {
// log.info("reducing key: '" + key + "' records: " + q.size());
process(prepare(documents), context);
processRows(documents, context);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
}
}
public void process(final String key, final Iterable<MapDocument> documents, final Reporter context) {
private void processRows(final List<Row> queue, final Reporter context) {
final Queue<MapDocument> q = prepare(documents);
for (int pivotPos = 0; pivotPos < queue.size(); pivotPos++) {
final Row pivot = queue.get(pivotPos);
if (q.size() > 1) {
// log.info("reducing key: '" + key + "' records: " + q.size());
process(simplifyQueue(q, key, context), context);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
}
}
private Queue<MapDocument> prepare(final Iterable<MapDocument> documents) {
final Queue<MapDocument> queue = new PriorityQueue<>(100,
new MapDocumentComparator(dedupConf.getWf().getOrderField()));
final Set<String> seen = new HashSet<String>();
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
documents.forEach(doc -> {
if (queue.size() <= queueMaxSize) {
final String id = doc.getIdentifier();
if (!seen.contains(id)) {
seen.add(id);
queue.add(doc);
}
}
});
return queue;
}
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram,
final Reporter context) {
final Queue<MapDocument> q = new LinkedList<>();
String fieldRef = "";
final List<MapDocument> tempResults = Lists.newArrayList();
while (!queue.isEmpty()) {
final MapDocument result = queue.remove();
final String orderFieldName = dedupConf.getWf().getOrderField();
final Field orderFieldValue = result.values(orderFieldName);
if (!orderFieldValue.isEmpty()) {
final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
if (field.equals(fieldRef)) {
tempResults.add(result);
} else {
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
tempResults.clear();
tempResults.add(result);
fieldRef = field;
}
} else {
context
.incrementCounter(
dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField(), 1);
}
}
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
return q;
}
private void populateSimplifiedQueue(final Queue<MapDocument> q,
final List<MapDocument> tempResults,
final Reporter context,
final String fieldRef,
final String ngram) {
WfConfig wf = dedupConf.getWf();
if (tempResults.size() < wf.getGroupMaxSize()) {
q.addAll(tempResults);
} else {
context
.incrementCounter(
wf.getEntityType(),
String.format("Skipped records for count(%s) >= %s", wf.getOrderField(), wf.getGroupMaxSize()),
tempResults.size());
// log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
}
}
private void process(final Queue<MapDocument> queue, final Reporter context) {
while (!queue.isEmpty()) {
final MapDocument pivot = queue.remove();
final String idPivot = pivot.getIdentifier();
WfConfig wf = dedupConf.getWf();
final Field fieldsPivot = pivot.values(wf.getOrderField());
final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? "" : fieldsPivot.stringValue();
final String idPivot = pivot.getString(identifierFieldPos); // identifier
final Object fieldsPivot = getJavaValue(pivot, orderFieldPos);
final String fieldPivot = (fieldsPivot == null) ? "" : fieldsPivot.toString();
final WfConfig wf = dedupConf.getWf();
if (fieldPivot != null) {
int i = 0;
for (final MapDocument curr : queue) {
final String idCurr = curr.getIdentifier();
for (int windowPos = pivotPos + 1; windowPos < queue.size(); windowPos++) {
final Row curr = queue.get(windowPos);
final String idCurr = curr.getString(identifierFieldPos); // identifier
if (mustSkip(idCurr)) {
context.incrementCounter(wf.getEntityType(), "skip list", 1);
break;
}
if (i > wf.getSlidingWindowSize()) {
if (++i > wf.getSlidingWindowSize()) {
break;
}
final Field fieldsCurr = curr.values(wf.getOrderField());
final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null
: fieldsCurr.stringValue();
final Object fieldsCurr = getJavaValue(curr, orderFieldPos);
final String fieldCurr = (fieldsCurr == null) ? null : fieldsCurr.toString();
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
final TreeProcessor treeProcessor = new TreeProcessor(dedupConf);
emitOutput(treeProcessor.compare(pivot, curr), idPivot, idCurr, context);
}
}
}
}
}
public Object getJavaValue(Row row, int pos) {
DataType dt = row.schema().fields()[pos].dataType();
if (dt instanceof StringType) {
return row.getString(pos);
} else if (dt instanceof ArrayType) {
return row.getList(pos);
}
}
}
}
return null;
}
private void emitOutput(final boolean result, final String idPivot, final String idCurr, final Reporter context) {
if (result) {
if (idPivot.compareTo(idCurr) <= 0) {
writeSimilarity(context, idPivot, idCurr);
} else {
writeSimilarity(context, idCurr, idPivot);
}
context.incrementCounter(dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)", 1);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1);
@ -211,7 +141,6 @@ public class BlockProcessor {
final String type = dedupConf.getWf().getEntityType();
context.emit(type, from, to);
context.emit(type, to, from);
}
}

View File

@ -1,276 +0,0 @@
package eu.dnetlib.pace.util;
import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.collect.Lists;
import eu.dnetlib.pace.clustering.NGramUtils;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.config.WfConfig;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.model.MapDocumentComparator;
import eu.dnetlib.pace.tree.*;
import eu.dnetlib.pace.tree.support.TreeProcessor;
public class BlockProcessorForTesting {
public static final List<String> accumulators = new ArrayList<>();
private static final Log log = LogFactory.getLog(eu.dnetlib.pace.util.BlockProcessorForTesting.class);
private DedupConfig dedupConf;
public static void constructAccumulator(final DedupConfig dedupConf) {
accumulators.add(String.format("%s::%s", dedupConf.getWf().getEntityType(), "records per hash key = 1"));
accumulators
.add(
String
.format(
"%s::%s", dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()));
accumulators
.add(
String
.format(
"%s::%s", dedupConf.getWf().getEntityType(),
String
.format(
"Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(),
dedupConf.getWf().getGroupMaxSize())));
accumulators.add(String.format("%s::%s", dedupConf.getWf().getEntityType(), "skip list"));
accumulators.add(String.format("%s::%s", dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)"));
accumulators
.add(String.format("%s::%s", dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()));
}
public BlockProcessorForTesting(DedupConfig dedupConf) {
this.dedupConf = dedupConf;
}
public void processSortedBlock(final String key, final List<MapDocument> documents, final Reporter context,
boolean useTree, boolean noMatch) {
if (documents.size() > 1) {
// log.info("reducing key: '" + key + "' records: " + q.size());
process(prepare(documents), context, useTree, noMatch);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
}
}
public void process(final String key, final Iterable<MapDocument> documents, final Reporter context,
boolean useTree, boolean noMatch) {
final Queue<MapDocument> q = prepare(documents);
if (q.size() > 1) {
// log.info("reducing key: '" + key + "' records: " + q.size());
process(simplifyQueue(q, key, context), context, useTree, noMatch);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
}
}
private Queue<MapDocument> prepare(final Iterable<MapDocument> documents) {
final Queue<MapDocument> queue = new PriorityQueue<>(100,
new MapDocumentComparator(dedupConf.getWf().getOrderField()));
final Set<String> seen = new HashSet<String>();
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
documents.forEach(doc -> {
if (queue.size() <= queueMaxSize) {
final String id = doc.getIdentifier();
if (!seen.contains(id)) {
seen.add(id);
queue.add(doc);
}
}
});
return queue;
}
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram,
final Reporter context) {
final Queue<MapDocument> q = new LinkedList<>();
String fieldRef = "";
final List<MapDocument> tempResults = Lists.newArrayList();
while (!queue.isEmpty()) {
final MapDocument result = queue.remove();
final String orderFieldName = dedupConf.getWf().getOrderField();
final Field orderFieldValue = result.values(orderFieldName);
if (!orderFieldValue.isEmpty()) {
final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
if (field.equals(fieldRef)) {
tempResults.add(result);
} else {
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
tempResults.clear();
tempResults.add(result);
fieldRef = field;
}
} else {
context
.incrementCounter(
dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField(), 1);
}
}
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
return q;
}
private void populateSimplifiedQueue(final Queue<MapDocument> q,
final List<MapDocument> tempResults,
final Reporter context,
final String fieldRef,
final String ngram) {
WfConfig wf = dedupConf.getWf();
if (tempResults.size() < wf.getGroupMaxSize()) {
q.addAll(tempResults);
} else {
context
.incrementCounter(
wf.getEntityType(),
String.format("Skipped records for count(%s) >= %s", wf.getOrderField(), wf.getGroupMaxSize()),
tempResults.size());
// log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
}
}
private void process(final Queue<MapDocument> queue, final Reporter context, boolean useTree, boolean noMatch) {
while (!queue.isEmpty()) {
final MapDocument pivot = queue.remove();
final String idPivot = pivot.getIdentifier();
WfConfig wf = dedupConf.getWf();
final Field fieldsPivot = pivot.values(wf.getOrderField());
final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? "" : fieldsPivot.stringValue();
if (fieldPivot != null) {
int i = 0;
for (final MapDocument curr : queue) {
final String idCurr = curr.getIdentifier();
if (mustSkip(idCurr)) {
context.incrementCounter(wf.getEntityType(), "skip list", 1);
break;
}
if (i > wf.getSlidingWindowSize()) {
break;
}
final Field fieldsCurr = curr.values(wf.getOrderField());
final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null
: fieldsCurr.stringValue();
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
// draws no match relations (test purpose)
if (noMatch) {
emitOutput(!new TreeProcessor(dedupConf).compare(pivot, curr), idPivot, idCurr, context);
} else {
// use the decision tree implementation or the "normal" implementation of the similarity
// score (valid only for publications)
if (useTree)
emitOutput(new TreeProcessor(dedupConf).compare(pivot, curr), idPivot, idCurr, context);
else
emitOutput(publicationCompare(pivot, curr, dedupConf), idPivot, idCurr, context);
}
// if(new TreeProcessor(dedupConf).compare(pivot, curr) != publicationCompare(pivot, curr, dedupConf)) {
// emitOutput(true, idPivot, idCurr, context);
// }
}
}
}
}
}
protected static boolean compareInstanceType(MapDocument a, MapDocument b, DedupConfig conf) {
Map<String, String> params = new HashMap<>();
InstanceTypeMatch instanceTypeMatch = new InstanceTypeMatch(params);
double compare = instanceTypeMatch
.compare(a.getFieldMap().get("instance"), b.getFieldMap().get("instance"), conf);
return compare >= 1.0;
}
private boolean publicationCompare(MapDocument a, MapDocument b, DedupConfig config) {
// if the score gives 1, the publications are equivalent
Map<String, String> params = new HashMap<>();
params.put("jpath_value", "$.value");
params.put("jpath_classid", "$.qualifier.classid");
params.put("mode", "count");
double score = 0.0;
// levenstein title
LevensteinTitle levensteinTitle = new LevensteinTitle(params);
if (levensteinTitle.compare(a.getFieldMap().get("title"), b.getFieldMap().get("title"), config) >= 0.9) {
score += 0.2;
}
// pid
JsonListMatch jsonListMatch = new JsonListMatch(params);
if (jsonListMatch.compare(a.getFieldMap().get("pid"), b.getFieldMap().get("pid"), config) >= 1.0) {
score += 0.5;
}
// title version
TitleVersionMatch titleVersionMatch = new TitleVersionMatch(params);
double result1 = titleVersionMatch.compare(a.getFieldMap().get("title"), b.getFieldMap().get("title"), config);
if (result1 < 0 || result1 >= 1.0) {
score += 0.1;
}
// authors match
params.remove("mode");
AuthorsMatch authorsMatch = new AuthorsMatch(params);
double result2 = authorsMatch.compare(a.getFieldMap().get("authors"), b.getFieldMap().get("authors"), config);
if (result2 < 0 || result2 >= 0.6) {
score += 0.2;
}
return score >= 0.5;
}
private void emitOutput(final boolean result, final String idPivot, final String idCurr, final Reporter context) {
if (result) {
writeSimilarity(context, idPivot, idCurr);
context.incrementCounter(dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)", 1);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1);
}
}
private boolean mustSkip(final String idPivot) {
return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
}
private String getNsPrefix(final String id) {
return StringUtils.substringBetween(id, "|", "::");
}
private void writeSimilarity(final Reporter context, final String from, final String to) {
final String type = dedupConf.getWf().getEntityType();
context.emit(type, from, to);
}
}

View File

@ -18,6 +18,7 @@ package eu.dnetlib.pace.util;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;

View File

@ -2,19 +2,20 @@
package eu.dnetlib.pace.util;
import java.math.BigDecimal;
import java.util.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.cache.Cache;
import com.jayway.jsonpath.spi.cache.CacheProvider;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.config.Type;
import eu.dnetlib.pace.model.*;
import net.minidev.json.JSONArray;
public class MapDocumentUtil {
@ -22,103 +23,20 @@ public class MapDocumentUtil {
public static final String URL_REGEX = "^(http|https|ftp)\\://.*";
public static Predicate<String> urlFilter = s -> s.trim().matches(URL_REGEX);
public static MapDocument asMapDocumentWithJPath(DedupConfig conf, final String json) {
MapDocument m = new MapDocument();
m.setIdentifier(getJPathString(conf.getWf().getIdPath(), json));
Map<String, Field> stringField = new HashMap<>();
conf.getPace().getModel().forEach(fdef -> {
switch (fdef.getType()) {
case String:
case Int:
stringField
.put(
fdef.getName(), new FieldValueImpl(fdef.getType(), fdef.getName(),
truncateValue(getJPathString(fdef.getPath(), json), fdef.getLength())));
break;
case URL:
String uv = getJPathString(fdef.getPath(), json);
if (!urlFilter.test(uv))
uv = "";
stringField.put(fdef.getName(), new FieldValueImpl(fdef.getType(), fdef.getName(), uv));
break;
case List:
case JSON:
FieldListImpl fi = new FieldListImpl(fdef.getName(), fdef.getType());
truncateList(getJPathList(fdef.getPath(), json, fdef.getType()), fdef.getSize())
.stream()
.map(item -> new FieldValueImpl(Type.String, fdef.getName(), item))
.forEach(fi::add);
stringField.put(fdef.getName(), fi);
break;
case DoubleArray:
stringField
.put(
fdef.getName(),
new FieldValueImpl(Type.DoubleArray,
fdef.getName(),
getJPathArray(fdef.getPath(), json)));
break;
case StringConcat:
String[] jpaths = fdef.getPath().split("\\|\\|\\|");
stringField
.put(
fdef.getName(),
new FieldValueImpl(Type.String,
fdef.getName(),
truncateValue(
Arrays
.stream(jpaths)
.map(jpath -> getJPathString(jpath, json))
.collect(Collectors.joining(" ")),
fdef.getLength())));
break;
static {
CacheProvider.setCache(new Cache() {
private final ConcurrentHashMap<String, JsonPath> jsonPathCache = new ConcurrentHashMap();
@Override
public JsonPath get(String key) {
return jsonPathCache.get(key);
}
@Override
public void put(String key, JsonPath value) {
jsonPathCache.put(key, value);
}
});
m.setFieldMap(stringField);
return m;
}
public static List<String> getJPathList(String path, String json, Type type) {
if (type == Type.List)
return JsonPath
.using(
Configuration
.defaultConfiguration()
.addOptions(Option.ALWAYS_RETURN_LIST, Option.SUPPRESS_EXCEPTIONS))
.parse(json)
.read(path);
Object jresult;
List<String> result = new ArrayList<>();
try {
jresult = JsonPath.read(json, path);
} catch (Throwable e) {
return result;
}
if (jresult instanceof JSONArray) {
((JSONArray) jresult).forEach(it -> {
try {
result.add(new ObjectMapper().writeValueAsString(it));
} catch (JsonProcessingException e) {
}
});
return result;
}
if (jresult instanceof LinkedHashMap) {
try {
result.add(new ObjectMapper().writeValueAsString(jresult));
} catch (JsonProcessingException e) {
}
return result;
}
if (jresult instanceof String) {
result.add((String) jresult);
}
return result;
}
public static String getJPathString(final String jsonPath, final String json) {
@ -174,4 +92,54 @@ public class MapDocumentUtil {
return list.subList(0, size);
}
public static String getJPathString(final String jsonPath, final DocumentContext json) {
try {
Object o = json.read(jsonPath);
if (o instanceof String)
return (String) o;
if (o instanceof JSONArray && ((JSONArray) o).size() > 0)
return (String) ((JSONArray) o).get(0);
return "";
} catch (Exception e) {
return "";
}
}
public static List<String> getJPathList(String path, DocumentContext json, Type type) {
// if (type == Type.List)
// return JsonPath.using(Configuration.defaultConfiguration().addOptions(Option.ALWAYS_RETURN_LIST,
// Option.SUPPRESS_EXCEPTIONS)).parse(json).read(path);
Object jresult;
List<String> result = new ArrayList<>();
try {
jresult = json.read(path);
} catch (Throwable e) {
return result;
}
if (jresult instanceof JSONArray) {
((JSONArray) jresult).forEach(it -> {
try {
result.add(new ObjectMapper().writeValueAsString(it));
} catch (JsonProcessingException e) {
}
});
return result;
}
if (jresult instanceof LinkedHashMap) {
try {
result.add(new ObjectMapper().writeValueAsString(jresult));
} catch (JsonProcessingException e) {
}
return result;
}
if (jresult instanceof String) {
result.add((String) jresult);
}
return result;
}
}

View File

@ -0,0 +1,85 @@
package eu.dnetlib.pace.util;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkContext;
import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.pace.config.DedupConfig;
import scala.Serializable;
import scala.Tuple2;
public class SparkReporter implements Serializable, Reporter {
private final List<Tuple2<String, String>> relations = new ArrayList<>();
private final Map<String, LongAccumulator> accumulators;
public SparkReporter(Map<String, LongAccumulator> accumulators) {
this.accumulators = accumulators;
}
public void incrementCounter(
String counterGroup,
String counterName,
long delta,
Map<String, LongAccumulator> accumulators) {
final String accumulatorName = String.format("%s::%s", counterGroup, counterName);
if (accumulators.containsKey(accumulatorName)) {
accumulators.get(accumulatorName).add(delta);
}
}
@Override
public void incrementCounter(String counterGroup, String counterName, long delta) {
incrementCounter(counterGroup, counterName, delta, accumulators);
}
@Override
public void emit(String type, String from, String to) {
relations.add(new Tuple2<>(from, to));
}
public List<Tuple2<String, String>> getRelations() {
return relations;
}
public static Map<String, LongAccumulator> constructAccumulator(
final DedupConfig dedupConf, final SparkContext context) {
Map<String, LongAccumulator> accumulators = new HashMap<>();
String acc1 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "records per hash key = 1");
accumulators.put(acc1, context.longAccumulator(acc1));
String acc2 = String
.format(
"%s::%s",
dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField());
accumulators.put(acc2, context.longAccumulator(acc2));
String acc3 = String
.format(
"%s::%s",
dedupConf.getWf().getEntityType(),
String
.format(
"Skipped records for count(%s) >= %s",
dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize()));
accumulators.put(acc3, context.longAccumulator(acc3));
String acc4 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "skip list");
accumulators.put(acc4, context.longAccumulator(acc4));
String acc5 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)");
accumulators.put(acc5, context.longAccumulator(acc5));
String acc6 = String
.format(
"%s::%s", dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold());
accumulators.put(acc6, context.longAccumulator(acc6));
return accumulators;
}
}

View File

@ -3,57 +3,42 @@ package eu.dnetlib.pace;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import eu.dnetlib.pace.common.AbstractPaceFunctions;
import eu.dnetlib.pace.config.Type;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldListImpl;
import eu.dnetlib.pace.model.FieldValueImpl;
public abstract class AbstractPaceTest extends AbstractPaceFunctions {
protected String readFromClasspath(final String filename) {
final StringWriter sw = new StringWriter();
try {
IOUtils.copy(getClass().getResourceAsStream(filename), sw, StandardCharsets.UTF_8);
IOUtils.copy(getClass().getResourceAsStream(filename), sw);
return sw.toString();
} catch (final IOException e) {
throw new RuntimeException("cannot load resource from classpath: " + filename);
}
}
protected Field title(final String s) {
return new FieldValueImpl(Type.String, "title", s);
protected String title(final String s) {
return s;
}
protected Field person(final String s) {
return new FieldValueImpl(Type.JSON, "person", s);
protected String person(final String s) {
return s;
}
protected Field url(final String s) {
return new FieldValueImpl(Type.URL, "url", s);
protected String url(final String s) {
return s;
}
protected Field array(final double[] a) {
return new FieldValueImpl(Type.DoubleArray, "array", a);
}
protected Field createFieldList(List<String> strings, String fieldName) {
List<FieldValueImpl> fieldValueStream = strings
.stream()
.map(s -> new FieldValueImpl(Type.String, fieldName, s))
.collect(Collectors.toList());
FieldListImpl a = new FieldListImpl();
a.addAll(fieldValueStream);
protected double[] array(final double[] a) {
return a;
}
protected List<String> createFieldList(List<String> strings, String fieldName) {
return strings;
}
}

View File

@ -2,14 +2,12 @@
package eu.dnetlib.pace.clustering;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.dnetlib.pace.AbstractPaceTest;
import eu.dnetlib.pace.common.AbstractPaceFunctions;
@ -37,7 +35,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final String s = "http://www.test.it/path/to/resource";
System.out.println(s);
System.out.println(urlClustering.apply(conf, Lists.newArrayList(url(s))));
System.out.println(urlClustering.apply(conf, Lists.newArrayList(s)));
}
@Test
@ -51,7 +49,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final String s = "Search for the Standard Model Higgs Boson";
System.out.println(s);
System.out.println(ngram.apply(conf, Lists.newArrayList(title(s))));
System.out.println(ngram.apply(conf, Lists.newArrayList(s)));
}
@Test
@ -63,7 +61,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final String s = "Search for the Standard Model Higgs Boson";
System.out.println(s);
System.out.println(np.apply(conf, Lists.newArrayList(title(s))));
System.out.println(np.apply(conf, Lists.newArrayList(s)));
}
@Test
@ -75,15 +73,15 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final String s1 = "University of Pisa";
System.out.println(s1);
System.out.println(np.apply(conf, Lists.newArrayList(title(s1))));
System.out.println(np.apply(conf, Lists.newArrayList(s1)));
final String s2 = "Pisa University";
System.out.println(s2);
System.out.println(np.apply(conf, Lists.newArrayList(title(s2))));
System.out.println(np.apply(conf, Lists.newArrayList(s2)));
final String s3 = "Parco Tecnologico Agroalimentare Umbria";
System.out.println(s3);
System.out.println(np.apply(conf, Lists.newArrayList(title(s3))));
System.out.println(np.apply(conf, Lists.newArrayList(s3)));
}
@ -97,7 +95,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final String s = "Search for the Standard Model Higgs Boson";
System.out.println(s);
System.out.println(acro.apply(conf, Lists.newArrayList(title(s))));
System.out.println(acro.apply(conf, Lists.newArrayList(s)));
}
@Test
@ -109,12 +107,12 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final String s = "Search for the Standard Model Higgs Boson";
System.out.println(s);
System.out.println(sp.apply(conf, Lists.newArrayList(title(s))));
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
params.put("len", 3);
params.put("max", 1);
System.out.println(sp.apply(conf, Lists.newArrayList(title("Framework for general-purpose deduplication"))));
System.out.println(sp.apply(conf, Lists.newArrayList("Framework for general-purpose deduplication")));
}
@Test
@ -127,7 +125,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final String s = "Search for the Standard Model Higgs Boson";
System.out.println(s);
System.out.println(sp.apply(conf, Lists.newArrayList(title(s))));
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
}
@Test
@ -138,31 +136,31 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
String s = "Search for the Standard Model Higgs Boson";
System.out.println(s);
System.out.println(sp.apply(conf, Lists.newArrayList(title(s))));
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
s = "A Physical Education Teacher Is Like...: Examining Turkish Students Perceptions of Physical Education Teachers Through Metaphor Analysis";
System.out.println(s);
System.out.println(sp.apply(conf, Lists.newArrayList(title(s))));
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
s = "Structure of a Eukaryotic Nonribosomal Peptide Synthetase Adenylation Domain That Activates a Large Hydroxamate Amino Acid in Siderophore Biosynthesis";
System.out.println(s);
System.out.println(sp.apply(conf, Lists.newArrayList(title(s))));
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
s = "Performance Evaluation";
System.out.println(s);
System.out.println(sp.apply(conf, Lists.newArrayList(title(s))));
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
s = "JRC Open Power Plants Database (JRC-PPDB-OPEN)";
System.out.println(s);
System.out.println(sp.apply(conf, Lists.newArrayList(title(s))));
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
s = "JRC Open Power Plants Database";
System.out.println(s);
System.out.println(sp.apply(conf, Lists.newArrayList(title(s))));
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
s = "niivue/niivue: 0.21.1";
System.out.println(s);
System.out.println(sp.apply(conf, Lists.newArrayList(title(s))));
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
}
@ -175,7 +173,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final String s = "Search for the Standard Model Higgs Boson";
System.out.println(s);
System.out.println(sp.apply(conf, Lists.newArrayList(title(s))));
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
}
@Test
@ -184,35 +182,35 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final ClusteringFunction cf = new KeywordsClustering(params);
final String s = "Polytechnic University of Turin";
System.out.println(s);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s))));
System.out.println(cf.apply(conf, Lists.newArrayList(s)));
final String s1 = "POLITECNICO DI TORINO";
System.out.println(s1);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s1))));
System.out.println(cf.apply(conf, Lists.newArrayList(s1)));
final String s2 = "Universita farmaceutica culturale di milano bergamo";
System.out.println("s2 = " + s2);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s2))));
System.out.println(cf.apply(conf, Lists.newArrayList(s2)));
final String s3 = "universita universita milano milano";
System.out.println("s3 = " + s3);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s3))));
System.out.println(cf.apply(conf, Lists.newArrayList(s3)));
final String s4 = "Politechniki Warszawskiej (Warsaw University of Technology)";
System.out.println("s4 = " + s4);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s4))));
System.out.println(cf.apply(conf, Lists.newArrayList(s4)));
final String s5 = "İstanbul Ticarət Universiteti";
System.out.println("s5 = " + s5);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s5))));
System.out.println(cf.apply(conf, Lists.newArrayList(s5)));
final String s6 = "National and Kapodistrian University of Athens";
System.out.println("s6 = " + s6);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s6))));
System.out.println(cf.apply(conf, Lists.newArrayList(s6)));
final String s7 = "Εθνικό και Καποδιστριακό Πανεπιστήμιο Αθηνών";
System.out.println("s7 = " + s7);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s7))));
System.out.println(cf.apply(conf, Lists.newArrayList(s7)));
}
@ -222,11 +220,11 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final ClusteringFunction cf = new PersonClustering(params);
final String s = "Abd-Alla, Abo-el-nour N.";
System.out.println("s = " + s);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s))));
System.out.println(cf.apply(conf, Lists.newArrayList(s)));
final String s1 = "Manghi, Paolo";
System.out.println("s1 = " + s1);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s1))));
System.out.println(cf.apply(conf, Lists.newArrayList(s1)));
}
@ -236,11 +234,11 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final ClusteringFunction cf = new PersonHash(params);
final String s = "Manghi, Paolo";
System.out.println("s = " + s);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s))));
System.out.println(cf.apply(conf, Lists.newArrayList(s)));
final String s1 = "Manghi, P.";
System.out.println("s = " + s1);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s1))));
System.out.println(cf.apply(conf, Lists.newArrayList(s1)));
}
@ -250,7 +248,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
final ClusteringFunction cf = new LastNameFirstInitial(params);
final String s = "LI Yonghong";
System.out.println("s = " + s);
System.out.println(cf.apply(conf, Lists.newArrayList(title(s))));
System.out.println(cf.apply(conf, Lists.newArrayList(s)));
}
}

View File

@ -3,19 +3,16 @@ package eu.dnetlib.pace.comparators;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import eu.dnetlib.pace.AbstractPaceTest;
import eu.dnetlib.pace.clustering.NGramUtils;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.config.Type;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldValueImpl;
import eu.dnetlib.pace.tree.*;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ -99,8 +96,8 @@ public class ComparatorTest extends AbstractPaceTest {
@Test
public void listContainsMatchTest() {
Field a = createFieldList(Arrays.asList("Article", "Publication", "ORP"), "instanceType");
Field b = createFieldList(Arrays.asList("Publication", "Article", "ORP"), "instanceType");
List<String> a = createFieldList(Arrays.asList("Article", "Publication", "ORP"), "instanceType");
List<String> b = createFieldList(Arrays.asList("Publication", "Article", "ORP"), "instanceType");
params.put("string", "Article");
params.put("bool", "XOR");
@ -214,31 +211,32 @@ public class ComparatorTest extends AbstractPaceTest {
final InstanceTypeMatch instanceTypeMatch = new InstanceTypeMatch(params);
Field a = createFieldList(Arrays.asList("Article", "Article", "Article"), "instanceType");
Field b = createFieldList(Arrays.asList("Article", "Article", "Article"), "instanceType");
List<String> a = createFieldList(Arrays.asList("Article", "Article", "Article"), "instanceType");
List<String> b = createFieldList(Arrays.asList("Article", "Article", "Article"), "instanceType");
double result = instanceTypeMatch.compare(a, b, conf);
assertEquals(1.0, result);
Field c = createFieldList(
List<String> c = createFieldList(
Arrays.asList("Conference object", "Conference object", "Conference object"), "instanceType");
result = instanceTypeMatch.compare(c, b, conf);
assertEquals(1.0, result);
Field d = createFieldList(Arrays.asList("Master thesis", "Master thesis", "Master thesis"), "instanceType");
Field e = createFieldList(
List<String> d = createFieldList(
Arrays.asList("Master thesis", "Master thesis", "Master thesis"), "instanceType");
List<String> e = createFieldList(
Arrays.asList("Bachelor thesis", "Bachelor thesis", "Bachelor thesis"), "instanceType");
result = instanceTypeMatch.compare(d, e, conf);
assertEquals(1.0, result);
Field g = createFieldList(Arrays.asList("Software Paper", "Software Paper"), "instanceType");
List<String> g = createFieldList(Arrays.asList("Software Paper", "Software Paper"), "instanceType");
result = instanceTypeMatch.compare(e, g, conf);
assertEquals(0.0, result);
Field h = createFieldList(Arrays.asList("Other literature type", "Article"), "instanceType");
List<String> h = createFieldList(Arrays.asList("Other literature type", "Article"), "instanceType");
result = instanceTypeMatch.compare(a, h, conf);
assertEquals(1.0, result);
@ -249,15 +247,15 @@ public class ComparatorTest extends AbstractPaceTest {
AuthorsMatch authorsMatch = new AuthorsMatch(params);
Field a = createFieldList(
List<String> a = createFieldList(
Arrays.asList("La Bruzzo, Sandro", "Atzori, Claudio", "De Bonis, Michele"), "authors");
Field b = createFieldList(Arrays.asList("Atzori, C.", "La Bruzzo, S.", "De Bonis, M."), "authors");
List<String> b = createFieldList(Arrays.asList("Atzori, C.", "La Bruzzo, S.", "De Bonis, M."), "authors");
double result = authorsMatch.compare(a, b, conf);
assertEquals(1.0, result);
Field c = createFieldList(Arrays.asList("Manghi, Paolo"), "authors");
Field d = createFieldList(Arrays.asList("Manghi, Pasquale"), "authors");
List<String> c = createFieldList(Arrays.asList("Manghi, Paolo"), "authors");
List<String> d = createFieldList(Arrays.asList("Manghi, Pasquale"), "authors");
result = authorsMatch.compare(c, d, conf);
assertEquals(0.0, result);
@ -268,12 +266,12 @@ public class ComparatorTest extends AbstractPaceTest {
assertEquals(1.0, result);
Field e = createFieldList(Arrays.asList("Manghi, Paolo", "Atzori, Claudio"), "authors");
List<String> e = createFieldList(Arrays.asList("Manghi, Paolo", "Atzori, Claudio"), "authors");
result = authorsMatch.compare(a, e, conf);
assertEquals(0.25, result);
Field f = createFieldList(new ArrayList<>(), "authors");
List<String> f = createFieldList(new ArrayList<>(), "authors");
result = authorsMatch.compare(f, f, conf);
System.out.println("result = " + result);
@ -284,12 +282,12 @@ public class ComparatorTest extends AbstractPaceTest {
JsonListMatch jsonListMatch = new JsonListMatch(params);
Field a = createFieldList(
List<String> a = createFieldList(
Arrays
.asList(
"{\"datainfo\":{\"deletedbyinference\":false,\"inferenceprovenance\":null,\"inferred\":false,\"invisible\":false,\"provenanceaction\":{\"classid\":\"sysimport:actionset\",\"classname\":\"Harvested\",\"schemeid\":\"dnet:provenanceActions\",\"schemename\":\"dnet:provenanceActions\"},\"trust\":\"0.9\"},\"qualifier\":{\"classid\":\"doi\",\"classname\":\"Digital Object Identifier\",\"schemeid\":\"dnet:pid_types\",\"schemename\":\"dnet:pid_types\"},\"value\":\"10.1111/pbi.12655\"}"),
"authors");
Field b = createFieldList(
List<String> b = createFieldList(
Arrays
.asList(
"{\"datainfo\":{\"deletedbyinference\":false,\"inferenceprovenance\":\"\",\"inferred\":false,\"invisible\":false,\"provenanceaction\":{\"classid\":\"sysimport:crosswalk:repository\",\"classname\":\"Harvested\",\"schemeid\":\"dnet:provenanceActions\",\"schemename\":\"dnet:provenanceActions\"},\"trust\":\"0.9\"},\"qualifier\":{\"classid\":\"pmc\",\"classname\":\"PubMed Central ID\",\"schemeid\":\"dnet:pid_types\",\"schemename\":\"dnet:pid_types\"},\"value\":\"PMC5399005\"}",
@ -313,8 +311,8 @@ public class ComparatorTest extends AbstractPaceTest {
public void domainExactMatch() {
DomainExactMatch domainExactMatch = new DomainExactMatch(params);
Field a = url("http://www.flowrepository.org");
Field b = url("http://flowrepository.org/");
String a = url("http://www.flowrepository.org");
String b = url("http://flowrepository.org/");
double compare = domainExactMatch.compare(a, b, conf);
System.out.println("compare = " + compare);
@ -326,12 +324,12 @@ public class ComparatorTest extends AbstractPaceTest {
CosineSimilarity cosineSimilarity = new CosineSimilarity(params);
Field a = new FieldValueImpl(Type.DoubleArray, "array", new double[] {
double[] a = new double[] {
1, 2, 3
});
Field b = new FieldValueImpl(Type.DoubleArray, "array", new double[] {
};
double[] b = new double[] {
1, 2, 3
});
};
double compare = cosineSimilarity.compare(a, b, conf);

View File

@ -3,26 +3,14 @@ package eu.dnetlib.pace.config;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.*;
import java.util.stream.Collectors;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import eu.dnetlib.pace.AbstractPaceTest;
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
import eu.dnetlib.pace.clustering.ClusteringClass;
import eu.dnetlib.pace.clustering.ClusteringCombiner;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
import eu.dnetlib.pace.model.FieldValue;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.tree.JsonListMatch;
import eu.dnetlib.pace.tree.support.AggType;
import eu.dnetlib.pace.tree.support.FieldConf;
import eu.dnetlib.pace.tree.support.TreeNodeDef;
import eu.dnetlib.pace.tree.support.TreeNodeStats;
import eu.dnetlib.pace.util.MapDocumentUtil;
public class ConfigTest extends AbstractPaceTest {
@ -82,41 +70,6 @@ public class ConfigTest extends AbstractPaceTest {
assertEquals(0, load.getPace().translationMap().keySet().size());
}
@Test
public void asMapDocumentTest1() {
DedupConfig dedupConf = DedupConfig.load(readFromClasspath("publication.current.conf.json"));
final String json = readFromClasspath("publication.json");
final MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, json);
// System.out.println("mapDocument = " + mapDocument.getFieldMap());
// JsonListMatch jsonListMatch = new JsonListMatch(params);
//
// jsonListMatch.compare(mapDocument.getFieldMap().get("pid"), mapDocument.getFieldMap().get("pid"), null);
System.out.println("mapDocument = " + mapDocument.getFieldMap().get("title").stringValue());
}
@Test
public void authorAsMapDocument() {
DedupConfig dedupConf = DedupConfig.load(readFromClasspath("author.fdup.conf.json"));
final String json = readFromClasspath("author.json");
final MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, json);
System.out
.println(
"mapDocument = "
+ Arrays.toString(((FieldValue) mapDocument.getFieldMap().get("topics")).doubleArrayValue()));
}
@Test
public void testJPath() {
final String json = readFromClasspath("organization.json");
@ -126,53 +79,4 @@ public class ConfigTest extends AbstractPaceTest {
System.out.println("result = " + MapDocumentUtil.getJPathString(jpath, json));
}
@Test
public void clusteringCombinerTest() {
DedupConfig dedupConf = DedupConfig.load(readFromClasspath("publication.current.conf.json"));
final String json = readFromClasspath("publication.json");
final MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, json);
String[] combine = ClusteringCombiner.combine(mapDocument, dedupConf).toArray(new String[3]);
assertEquals("test", combine[0].split(":")[1]);
assertEquals("title", combine[1].split(":")[1]);
assertEquals("doi", combine[2].split(":")[1]);
}
@Test
public void filterAndCombineTest() {
DedupConfig dedupConf = DedupConfig.load(readFromClasspath("pub.prod.conf.json"));
final String json = readFromClasspath("publication.example.json");
final MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, json);
Collection<String> strings = BlacklistAwareClusteringCombiner.filterAndCombine(mapDocument, dedupConf);
for (String s : strings) {
System.out.println("s = " + s);
}
}
@Test
public void crossCompareTest() {
DedupConfig dedupConf = DedupConfig.load(readFromClasspath("organization.cross.compare.conf.json"));
TreeNodeDef treeNode = dedupConf.decisionTree().get("start");
final String json = readFromClasspath("organization.json");
final MapDocument doc = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, json);
TreeNodeStats nodeStats = treeNode.evaluate(doc, doc, dedupConf);
assertTrue(nodeStats.getFinalScore(AggType.MAX) > 0.7);
}
}

View File

@ -6,9 +6,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import eu.dnetlib.pace.model.Person;
import jdk.nashorn.internal.ir.annotations.Ignore;
public class UtilTest {
@ -20,6 +22,7 @@ public class UtilTest {
}
@Test
@Ignore
public void paceResolverTest() {
PaceResolver paceResolver = new PaceResolver();
paceResolver.getComparator("keywordMatch", params);

View File

@ -11,12 +11,12 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>

View File

@ -38,6 +38,8 @@
</execution>
</executions>
<configuration>
<failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
@ -54,11 +56,11 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
@ -75,6 +77,11 @@
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_${scala.binary.version}</artifactId>
<version>${scala-xml.version}</version>
</dependency>
<dependency>
<groupId>xml-apis</groupId>

View File

@ -7,8 +7,8 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import org.apache.poi.openxml4j.opc.OPCPackage;
import org.apache.poi.ss.usermodel.Cell;

View File

@ -1,12 +1,13 @@
package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.*;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
@ -66,7 +67,7 @@ public class ReadProjects implements Serializable {
FSDataInputStream inputStream = fs.open(hdfsreadpath);
ArrayList<Project> projects = OBJECT_MAPPER
List<Project> projects = OBJECT_MAPPER
.readValue(
IOUtils.toString(inputStream, "UTF-8"),
new TypeReference<List<Project>>() {

View File

@ -6,7 +6,6 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
@ -23,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.PrepareProjects;
import eu.dnetlib.dhp.actionmanager.project.utils.model.JsonTopic;
import eu.dnetlib.dhp.actionmanager.project.utils.model.Project;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
/**
@ -68,7 +66,7 @@ public class ReadTopics implements Serializable {
FSDataInputStream inputStream = fs.open(hdfsreadpath);
ArrayList<JsonTopic> topics = OBJECT_MAPPER
List<JsonTopic> topics = OBJECT_MAPPER
.readValue(
IOUtils.toString(inputStream, "UTF-8"),
new TypeReference<List<JsonTopic>>() {

View File

@ -9,7 +9,7 @@ import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;

View File

@ -16,11 +16,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
</dependencies>

View File

@ -18,11 +18,11 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>

View File

@ -2,7 +2,7 @@
package eu.dnetlib.dhp.broker.oa;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.io.IOException;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -10,9 +11,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.model.SparkDeduper;
import eu.dnetlib.pace.tree.support.TreeProcessor;
import eu.dnetlib.pace.util.MapDocumentUtil;
public class TrustUtils {
@ -20,13 +20,18 @@ public class TrustUtils {
private static DedupConfig dedupConfig;
private static SparkDeduper deduper;
private static final ObjectMapper mapper;
static {
final ObjectMapper mapper = new ObjectMapper();
mapper = new ObjectMapper();
try {
dedupConfig = mapper
.readValue(
DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"),
DedupConfig.class);
deduper = new SparkDeduper(dedupConfig);
} catch (final IOException e) {
log.error("Error loading dedupConfig, e");
}
@ -42,11 +47,8 @@ public class TrustUtils {
}
try {
final ObjectMapper objectMapper = new ObjectMapper();
final MapDocument doc1 = MapDocumentUtil
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1));
final MapDocument doc2 = MapDocumentUtil
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2));
final Row doc1 = deduper.model().rowFromJson(mapper.writeValueAsString(r1));
final Row doc2 = deduper.model().rowFromJson(mapper.writeValueAsString(r2));
final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2);

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;

View File

@ -12,6 +12,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
@ -82,8 +83,8 @@ public class SimpleVariableJobTest {
final long n = spark
.createDataset(inputList, Encoders.STRING())
.filter(s -> filter(map.get(s)))
.map((MapFunction<String, String>) s -> s.toLowerCase(), Encoders.STRING())
.filter((FilterFunction<String>) s -> filter(map.get(s)))
.map((MapFunction<String, String>) String::toLowerCase, Encoders.STRING())
.count();
System.out.println(n);
@ -96,8 +97,8 @@ public class SimpleVariableJobTest {
final long n = spark
.createDataset(inputList, Encoders.STRING())
.filter(s -> filter(staticMap.get(s)))
.map((MapFunction<String, String>) s -> s.toLowerCase(), Encoders.STRING())
.filter((FilterFunction<String>) s -> filter(staticMap.get(s)))
.map((MapFunction<String, String>) String::toLowerCase, Encoders.STRING())
.count();
System.out.println(n);

View File

@ -13,7 +13,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<version>${net.alchim31.maven.version}</version>
<executions>
<execution>
<id>scala-compile-first</id>
@ -32,6 +32,8 @@
</execution>
</executions>
<configuration>
<failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
@ -53,30 +55,35 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.11</artifactId>
<artifactId>scala-java8-compat_${scala.binary.version}</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-collection-compat_2.11</artifactId>
<version>2.8.0</version>
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<artifactId>spark-graphx_${scala.binary.version}</artifactId>
</dependency>
<dependency>

View File

@ -88,9 +88,7 @@ abstract class AbstractSparkAction implements Serializable {
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
configProfileId));
DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
dedupConfig.getPace().initModel();
dedupConfig.getPace().initTranslationMap();
DedupConfig dedupConfig = DedupConfig.load(conf);
dedupConfig.getWf().setConfigurationId(actionSetId);
return dedupConfig;

View File

@ -5,14 +5,14 @@ import static java.util.Collections.reverseOrder;
import static java.util.Map.Entry.comparingByValue;
import static java.util.stream.Collectors.toMap;
import static org.apache.commons.lang.StringUtils.endsWith;
import static org.apache.commons.lang.StringUtils.substringBefore;
import static org.apache.commons.lang3.StringUtils.endsWith;
import static org.apache.commons.lang3.StringUtils.substringBefore;
import java.time.Year;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.Field;

Some files were not shown because too many files have changed in this diff Show More