SparkCreateSimRels:
- Create dedup blocks from the complete queue of records matching cluster key instead of truncating the results - Clean titles once before clustering and similarity comparisons - Added support for filtered fields in model - Added support for sorting List fields in model - Added new JSONListClustering and numAuthorsTitleSuffixPrefixChain clustering functions - Added new maxLengthMatch comparator function - Use reduced complexity Levenshtein with threshold in levensteinTitle - Use reduced complexity AuthorsMatch with threshold early-quit - Use incremental Connected Component to decrease comparisons in similarity match in BlockProcessor - Use new clusterings configuration in Dedup tests SparkWhitelistSimRels: use left semi join for clarity and performance SparkCreateMergeRels: - Use new connected component algorithm that converge faster than Spark GraphX provided algorithm - Refactored to use Windowing sorting rather than groupBy to reduce memory pressure - Use historical pivot table to generate singleton rels, merged rels and keep continuity with dedupIds used in the past - Comparator for pivot record selection now uses "tomorrow" as filler for missing or incorrect date instead of "2000-01-01" - Changed generation of ids of type dedup_wf_001 to avoid collisions DedupRecordFactory: use reduceGroups instead of mapGroups to decrease memory pressure
This commit is contained in:
parent
7c3041b276
commit
b0fc113749
|
@ -14,9 +14,9 @@ import eu.dnetlib.pace.config.Config;
|
|||
|
||||
public abstract class AbstractClusteringFunction extends AbstractPaceFunctions implements ClusteringFunction {
|
||||
|
||||
protected Map<String, Integer> params;
|
||||
protected Map<String, Object> params;
|
||||
|
||||
public AbstractClusteringFunction(final Map<String, Integer> params) {
|
||||
public AbstractClusteringFunction(final Map<String, Object> params) {
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ public abstract class AbstractClusteringFunction extends AbstractPaceFunctions i
|
|||
return fields
|
||||
.stream()
|
||||
.filter(f -> !f.isEmpty())
|
||||
.map(this::normalize)
|
||||
.map(s -> normalize(s))
|
||||
.map(s -> filterAllStopWords(s))
|
||||
.map(s -> doApply(conf, s))
|
||||
.map(c -> filterBlacklisted(c, ngramBlacklist))
|
||||
|
@ -36,11 +36,24 @@ public abstract class AbstractClusteringFunction extends AbstractPaceFunctions i
|
|||
.collect(Collectors.toCollection(HashSet::new));
|
||||
}
|
||||
|
||||
public Map<String, Integer> getParams() {
|
||||
public Map<String, Object> getParams() {
|
||||
return params;
|
||||
}
|
||||
|
||||
protected Integer param(String name) {
|
||||
return params.get(name);
|
||||
Object val = params.get(name);
|
||||
if (val == null)
|
||||
return null;
|
||||
if (val instanceof Number) {
|
||||
return ((Number) val).intValue();
|
||||
}
|
||||
return Integer.parseInt(val.toString());
|
||||
}
|
||||
|
||||
protected int paramOrDefault(String name, int i) {
|
||||
Integer res = param(name);
|
||||
if (res == null)
|
||||
res = i;
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("acronyms")
|
||||
public class Acronyms extends AbstractClusteringFunction {
|
||||
|
||||
public Acronyms(Map<String, Integer> params) {
|
||||
public Acronyms(Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,6 @@ public interface ClusteringFunction {
|
|||
|
||||
public Collection<String> apply(Config config, List<String> fields);
|
||||
|
||||
public Map<String, Integer> getParams();
|
||||
public Map<String, Object> getParams();
|
||||
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("immutablefieldvalue")
|
||||
public class ImmutableFieldValue extends AbstractClusteringFunction {
|
||||
|
||||
public ImmutableFieldValue(final Map<String, Integer> params) {
|
||||
public ImmutableFieldValue(final Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
|
||||
package eu.dnetlib.pace.clustering;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import com.jayway.jsonpath.Configuration;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.Option;
|
||||
|
||||
import eu.dnetlib.pace.common.AbstractPaceFunctions;
|
||||
import eu.dnetlib.pace.config.Config;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
|
||||
@ClusteringClass("jsonlistclustering")
|
||||
public class JSONListClustering extends AbstractPaceFunctions implements ClusteringFunction {
|
||||
|
||||
private Map<String, Object> params;
|
||||
|
||||
public JSONListClustering(Map<String, Object> params) {
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getParams() {
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> apply(Config conf, List<String> fields) {
|
||||
return fields
|
||||
.stream()
|
||||
.filter(f -> !f.isEmpty())
|
||||
.map(s -> doApply(conf, s))
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.collect(Collectors.toCollection(HashSet::new));
|
||||
}
|
||||
|
||||
private String doApply(Config conf, String json) {
|
||||
StringBuilder st = new StringBuilder(); // to build the string used for comparisons basing on the jpath into
|
||||
// parameters
|
||||
final DocumentContext documentContext = JsonPath
|
||||
.using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS))
|
||||
.parse(json);
|
||||
|
||||
// for each path in the param list
|
||||
for (String key : params.keySet().stream().filter(k -> k.contains("jpath")).collect(Collectors.toList())) {
|
||||
String path = params.get(key).toString();
|
||||
String value = MapDocumentUtil.getJPathString(path, documentContext);
|
||||
if (value == null || value.isEmpty())
|
||||
value = "";
|
||||
st.append(value);
|
||||
st.append(" ");
|
||||
}
|
||||
|
||||
st.setLength(st.length() - 1);
|
||||
|
||||
if (StringUtils.isBlank(st)) {
|
||||
return "1";
|
||||
}
|
||||
return st.toString();
|
||||
}
|
||||
}
|
|
@ -11,7 +11,7 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("keywordsclustering")
|
||||
public class KeywordsClustering extends AbstractClusteringFunction {
|
||||
|
||||
public KeywordsClustering(Map<String, Integer> params) {
|
||||
public KeywordsClustering(Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
@ -19,8 +19,8 @@ public class KeywordsClustering extends AbstractClusteringFunction {
|
|||
protected Collection<String> doApply(final Config conf, String s) {
|
||||
|
||||
// takes city codes and keywords codes without duplicates
|
||||
Set<String> keywords = getKeywords(s, conf.translationMap(), params.getOrDefault("windowSize", 4));
|
||||
Set<String> cities = getCities(s, params.getOrDefault("windowSize", 4));
|
||||
Set<String> keywords = getKeywords(s, conf.translationMap(), paramOrDefault("windowSize", 4));
|
||||
Set<String> cities = getCities(s, paramOrDefault("windowSize", 4));
|
||||
|
||||
// list of combination to return as result
|
||||
final Collection<String> combinations = new LinkedHashSet<String>();
|
||||
|
@ -28,7 +28,7 @@ public class KeywordsClustering extends AbstractClusteringFunction {
|
|||
for (String keyword : keywordsToCodes(keywords, conf.translationMap())) {
|
||||
for (String city : citiesToCodes(cities)) {
|
||||
combinations.add(keyword + "-" + city);
|
||||
if (combinations.size() >= params.getOrDefault("max", 2)) {
|
||||
if (combinations.size() >= paramOrDefault("max", 2)) {
|
||||
return combinations;
|
||||
}
|
||||
}
|
||||
|
@ -42,8 +42,8 @@ public class KeywordsClustering extends AbstractClusteringFunction {
|
|||
return fields
|
||||
.stream()
|
||||
.filter(f -> !f.isEmpty())
|
||||
.map(this::cleanup)
|
||||
.map(this::normalize)
|
||||
.map(KeywordsClustering::cleanup)
|
||||
.map(KeywordsClustering::normalize)
|
||||
.map(s -> filterAllStopWords(s))
|
||||
.map(s -> doApply(conf, s))
|
||||
.map(c -> filterBlacklisted(c, ngramBlacklist))
|
||||
|
|
|
@ -16,7 +16,7 @@ public class LastNameFirstInitial extends AbstractClusteringFunction {
|
|||
|
||||
private boolean DEFAULT_AGGRESSIVE = true;
|
||||
|
||||
public LastNameFirstInitial(final Map<String, Integer> params) {
|
||||
public LastNameFirstInitial(final Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,7 @@ public class LastNameFirstInitial extends AbstractClusteringFunction {
|
|||
return fields
|
||||
.stream()
|
||||
.filter(f -> !f.isEmpty())
|
||||
.map(this::normalize)
|
||||
.map(LastNameFirstInitial::normalize)
|
||||
.map(s -> doApply(conf, s))
|
||||
.map(c -> filterBlacklisted(c, ngramBlacklist))
|
||||
.flatMap(c -> c.stream())
|
||||
|
@ -33,8 +33,7 @@ public class LastNameFirstInitial extends AbstractClusteringFunction {
|
|||
.collect(Collectors.toCollection(HashSet::new));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String normalize(final String s) {
|
||||
public static String normalize(final String s) {
|
||||
return fixAliases(transliterate(nfd(unicodeNormalization(s))))
|
||||
// do not compact the regexes in a single expression, would cause StackOverflowError in case of large input
|
||||
// strings
|
||||
|
|
|
@ -15,7 +15,7 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("lowercase")
|
||||
public class LowercaseClustering extends AbstractClusteringFunction {
|
||||
|
||||
public LowercaseClustering(final Map<String, Integer> params) {
|
||||
public LowercaseClustering(final Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
|
|
@ -12,11 +12,11 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("ngrampairs")
|
||||
public class NgramPairs extends Ngrams {
|
||||
|
||||
public NgramPairs(Map<String, Integer> params) {
|
||||
public NgramPairs(Map<String, Object> params) {
|
||||
super(params, false);
|
||||
}
|
||||
|
||||
public NgramPairs(Map<String, Integer> params, boolean sorted) {
|
||||
public NgramPairs(Map<String, Object> params, boolean sorted) {
|
||||
super(params, sorted);
|
||||
}
|
||||
|
||||
|
|
|
@ -10,11 +10,11 @@ public class Ngrams extends AbstractClusteringFunction {
|
|||
|
||||
private final boolean sorted;
|
||||
|
||||
public Ngrams(Map<String, Integer> params) {
|
||||
public Ngrams(Map<String, Object> params) {
|
||||
this(params, false);
|
||||
}
|
||||
|
||||
public Ngrams(Map<String, Integer> params, boolean sorted) {
|
||||
public Ngrams(Map<String, Object> params, boolean sorted) {
|
||||
super(params);
|
||||
this.sorted = sorted;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
|
||||
package eu.dnetlib.pace.clustering;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import eu.dnetlib.pace.config.Config;
|
||||
|
||||
@ClusteringClass("numAuthorsTitleSuffixPrefixChain")
|
||||
public class NumAuthorsTitleSuffixPrefixChain extends AbstractClusteringFunction {
|
||||
|
||||
public NumAuthorsTitleSuffixPrefixChain(Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> apply(Config conf, List<String> fields) {
|
||||
|
||||
try {
|
||||
int num_authors = Math.min(Integer.parseInt(fields.get(0)), 21); // SIZE threshold is 20, +1
|
||||
|
||||
if (num_authors > 0) {
|
||||
return super.apply(conf, fields.subList(1, fields.size()))
|
||||
.stream()
|
||||
.map(s -> num_authors + "-" + s)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
// missing or null authors array
|
||||
}
|
||||
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<String> doApply(Config conf, String s) {
|
||||
return suffixPrefixChain(cleanup(s), param("mod"));
|
||||
}
|
||||
|
||||
private Collection<String> suffixPrefixChain(String s, int mod) {
|
||||
// create the list of words from the string (remove short words)
|
||||
List<String> wordsList = Arrays
|
||||
.stream(s.split(" "))
|
||||
.filter(si -> si.length() > 3)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final int words = wordsList.size();
|
||||
final int letters = s.length();
|
||||
|
||||
// create the prefix: number of words + number of letters/mod
|
||||
String prefix = words / mod + "-";
|
||||
|
||||
return doSuffixPrefixChain(wordsList, prefix);
|
||||
|
||||
}
|
||||
|
||||
private Collection<String> doSuffixPrefixChain(List<String> wordsList, String prefix) {
|
||||
|
||||
Set<String> set = Sets.newLinkedHashSet();
|
||||
switch (wordsList.size()) {
|
||||
case 0:
|
||||
break;
|
||||
case 1:
|
||||
set.add(wordsList.get(0));
|
||||
break;
|
||||
case 2:
|
||||
set
|
||||
.add(
|
||||
prefix +
|
||||
suffix(wordsList.get(0), 3) +
|
||||
prefix(wordsList.get(1), 3));
|
||||
|
||||
set
|
||||
.add(
|
||||
prefix +
|
||||
prefix(wordsList.get(0), 3) +
|
||||
suffix(wordsList.get(1), 3));
|
||||
|
||||
break;
|
||||
default:
|
||||
set
|
||||
.add(
|
||||
prefix +
|
||||
suffix(wordsList.get(0), 3) +
|
||||
prefix(wordsList.get(1), 3) +
|
||||
suffix(wordsList.get(2), 3));
|
||||
|
||||
set
|
||||
.add(
|
||||
prefix +
|
||||
prefix(wordsList.get(0), 3) +
|
||||
suffix(wordsList.get(1), 3) +
|
||||
prefix(wordsList.get(2), 3));
|
||||
break;
|
||||
}
|
||||
|
||||
return set;
|
||||
|
||||
}
|
||||
|
||||
private String suffix(String s, int len) {
|
||||
return s.substring(s.length() - len);
|
||||
}
|
||||
|
||||
private String prefix(String s, int len) {
|
||||
return s.substring(0, len);
|
||||
}
|
||||
|
||||
}
|
|
@ -17,11 +17,11 @@ import eu.dnetlib.pace.model.Person;
|
|||
@ClusteringClass("personClustering")
|
||||
public class PersonClustering extends AbstractPaceFunctions implements ClusteringFunction {
|
||||
|
||||
private Map<String, Integer> params;
|
||||
private Map<String, Object> params;
|
||||
|
||||
private static final int MAX_TOKENS = 5;
|
||||
|
||||
public PersonClustering(final Map<String, Integer> params) {
|
||||
public PersonClustering(final Map<String, Object> params) {
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
|
@ -77,7 +77,7 @@ public class PersonClustering extends AbstractPaceFunctions implements Clusterin
|
|||
// }
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> getParams() {
|
||||
public Map<String, Object> getParams() {
|
||||
return params;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ public class PersonHash extends AbstractClusteringFunction {
|
|||
|
||||
private boolean DEFAULT_AGGRESSIVE = false;
|
||||
|
||||
public PersonHash(final Map<String, Integer> params) {
|
||||
public PersonHash(final Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ import eu.dnetlib.pace.config.Config;
|
|||
|
||||
public class RandomClusteringFunction extends AbstractClusteringFunction {
|
||||
|
||||
public RandomClusteringFunction(Map<String, Integer> params) {
|
||||
public RandomClusteringFunction(Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
|
||||
package eu.dnetlib.pace.clustering;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Splitter;
|
||||
|
@ -12,7 +15,7 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("sortedngrampairs")
|
||||
public class SortedNgramPairs extends NgramPairs {
|
||||
|
||||
public SortedNgramPairs(Map<String, Integer> params) {
|
||||
public SortedNgramPairs(Map<String, Object> params) {
|
||||
super(params, false);
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("spacetrimmingfieldvalue")
|
||||
public class SpaceTrimmingFieldValue extends AbstractClusteringFunction {
|
||||
|
||||
public SpaceTrimmingFieldValue(final Map<String, Integer> params) {
|
||||
public SpaceTrimmingFieldValue(final Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,7 @@ public class SpaceTrimmingFieldValue extends AbstractClusteringFunction {
|
|||
|
||||
res
|
||||
.add(
|
||||
StringUtils.isBlank(s) ? RandomStringUtils.random(getParams().get("randomLength"))
|
||||
StringUtils.isBlank(s) ? RandomStringUtils.random(param("randomLength"))
|
||||
: s.toLowerCase().replaceAll("\\s+", ""));
|
||||
|
||||
return res;
|
||||
|
|
|
@ -12,7 +12,7 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("suffixprefix")
|
||||
public class SuffixPrefix extends AbstractClusteringFunction {
|
||||
|
||||
public SuffixPrefix(Map<String, Integer> params) {
|
||||
public SuffixPrefix(Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
|
|
@ -15,12 +15,17 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("urlclustering")
|
||||
public class UrlClustering extends AbstractPaceFunctions implements ClusteringFunction {
|
||||
|
||||
protected Map<String, Integer> params;
|
||||
protected Map<String, Object> params;
|
||||
|
||||
public UrlClustering(final Map<String, Integer> params) {
|
||||
public UrlClustering(final Map<String, Object> params) {
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getParams() {
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> apply(final Config conf, List<String> fields) {
|
||||
try {
|
||||
|
@ -35,11 +40,6 @@ public class UrlClustering extends AbstractPaceFunctions implements ClusteringFu
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> getParams() {
|
||||
return null;
|
||||
}
|
||||
|
||||
private URL asUrl(String value) {
|
||||
try {
|
||||
return new URL(value);
|
||||
|
|
|
@ -11,7 +11,7 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("wordsStatsSuffixPrefixChain")
|
||||
public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
|
||||
|
||||
public WordsStatsSuffixPrefixChain(Map<String, Integer> params) {
|
||||
public WordsStatsSuffixPrefixChain(Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ import eu.dnetlib.pace.config.Config;
|
|||
@ClusteringClass("wordssuffixprefix")
|
||||
public class WordsSuffixPrefix extends AbstractClusteringFunction {
|
||||
|
||||
public WordsSuffixPrefix(Map<String, Integer> params) {
|
||||
public WordsSuffixPrefix(Map<String, Object> params) {
|
||||
super(params);
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.ibm.icu.text.Transliterator;
|
||||
|
||||
|
@ -27,7 +26,7 @@ import eu.dnetlib.pace.clustering.NGramUtils;
|
|||
*
|
||||
* @author claudio
|
||||
*/
|
||||
public abstract class AbstractPaceFunctions {
|
||||
public class AbstractPaceFunctions {
|
||||
|
||||
// city map to be used when translating the city names into codes
|
||||
private static Map<String, String> cityMap = AbstractPaceFunctions
|
||||
|
@ -62,11 +61,14 @@ public abstract class AbstractPaceFunctions {
|
|||
|
||||
private static Pattern hexUnicodePattern = Pattern.compile("\\\\u(\\p{XDigit}{4})");
|
||||
|
||||
protected String concat(final List<String> l) {
|
||||
private static Pattern romanNumberPattern = Pattern
|
||||
.compile("^M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})$");
|
||||
|
||||
protected static String concat(final List<String> l) {
|
||||
return Joiner.on(" ").skipNulls().join(l);
|
||||
}
|
||||
|
||||
protected String cleanup(final String s) {
|
||||
public static String cleanup(final String s) {
|
||||
final String s1 = HTML_REGEX.matcher(s).replaceAll("");
|
||||
final String s2 = unicodeNormalization(s1.toLowerCase());
|
||||
final String s3 = nfd(s2);
|
||||
|
@ -82,7 +84,7 @@ public abstract class AbstractPaceFunctions {
|
|||
return s12;
|
||||
}
|
||||
|
||||
protected String fixXML(final String a) {
|
||||
protected static String fixXML(final String a) {
|
||||
|
||||
return a
|
||||
.replaceAll("–", " ")
|
||||
|
@ -91,7 +93,7 @@ public abstract class AbstractPaceFunctions {
|
|||
.replaceAll("−", " ");
|
||||
}
|
||||
|
||||
protected boolean checkNumbers(final String a, final String b) {
|
||||
protected static boolean checkNumbers(final String a, final String b) {
|
||||
final String numbersA = getNumbers(a);
|
||||
final String numbersB = getNumbers(b);
|
||||
final String romansA = getRomans(a);
|
||||
|
@ -99,7 +101,7 @@ public abstract class AbstractPaceFunctions {
|
|||
return !numbersA.equals(numbersB) || !romansA.equals(romansB);
|
||||
}
|
||||
|
||||
protected String getRomans(final String s) {
|
||||
protected static String getRomans(final String s) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
for (final String t : s.split(" ")) {
|
||||
sb.append(isRoman(t) ? t : "");
|
||||
|
@ -107,13 +109,12 @@ public abstract class AbstractPaceFunctions {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
protected boolean isRoman(final String s) {
|
||||
return s
|
||||
.replaceAll("^M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})$", "qwertyuiop")
|
||||
.equals("qwertyuiop");
|
||||
protected static boolean isRoman(final String s) {
|
||||
Matcher m = romanNumberPattern.matcher(s);
|
||||
return m.matches() && m.hitEnd();
|
||||
}
|
||||
|
||||
protected String getNumbers(final String s) {
|
||||
protected static String getNumbers(final String s) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
for (final String t : s.split(" ")) {
|
||||
sb.append(isNumber(t) ? t : "");
|
||||
|
@ -121,7 +122,7 @@ public abstract class AbstractPaceFunctions {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
public boolean isNumber(String strNum) {
|
||||
public static boolean isNumber(String strNum) {
|
||||
if (strNum == null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -147,7 +148,7 @@ public abstract class AbstractPaceFunctions {
|
|||
}
|
||||
}
|
||||
|
||||
protected String removeSymbols(final String s) {
|
||||
protected static String removeSymbols(final String s) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
|
||||
s.chars().forEach(ch -> {
|
||||
|
@ -157,11 +158,11 @@ public abstract class AbstractPaceFunctions {
|
|||
return sb.toString().replaceAll("\\s+", " ");
|
||||
}
|
||||
|
||||
protected boolean notNull(final String s) {
|
||||
protected static boolean notNull(final String s) {
|
||||
return s != null;
|
||||
}
|
||||
|
||||
protected String normalize(final String s) {
|
||||
public static String normalize(final String s) {
|
||||
return fixAliases(transliterate(nfd(unicodeNormalization(s))))
|
||||
.toLowerCase()
|
||||
// do not compact the regexes in a single expression, would cause StackOverflowError in case of large input
|
||||
|
@ -174,16 +175,16 @@ public abstract class AbstractPaceFunctions {
|
|||
.trim();
|
||||
}
|
||||
|
||||
public String nfd(final String s) {
|
||||
public static String nfd(final String s) {
|
||||
return Normalizer.normalize(s, Normalizer.Form.NFD);
|
||||
}
|
||||
|
||||
public String utf8(final String s) {
|
||||
public static String utf8(final String s) {
|
||||
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
|
||||
return new String(bytes, StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
public String unicodeNormalization(final String s) {
|
||||
public static String unicodeNormalization(final String s) {
|
||||
|
||||
Matcher m = hexUnicodePattern.matcher(s);
|
||||
StringBuffer buf = new StringBuffer(s.length());
|
||||
|
@ -195,7 +196,7 @@ public abstract class AbstractPaceFunctions {
|
|||
return buf.toString();
|
||||
}
|
||||
|
||||
protected String filterStopWords(final String s, final Set<String> stopwords) {
|
||||
protected static String filterStopWords(final String s, final Set<String> stopwords) {
|
||||
final StringTokenizer st = new StringTokenizer(s);
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
while (st.hasMoreTokens()) {
|
||||
|
@ -208,7 +209,7 @@ public abstract class AbstractPaceFunctions {
|
|||
return sb.toString().trim();
|
||||
}
|
||||
|
||||
public String filterAllStopWords(String s) {
|
||||
public static String filterAllStopWords(String s) {
|
||||
|
||||
s = filterStopWords(s, stopwords_en);
|
||||
s = filterStopWords(s, stopwords_de);
|
||||
|
@ -221,7 +222,8 @@ public abstract class AbstractPaceFunctions {
|
|||
return s;
|
||||
}
|
||||
|
||||
protected Collection<String> filterBlacklisted(final Collection<String> set, final Set<String> ngramBlacklist) {
|
||||
protected static Collection<String> filterBlacklisted(final Collection<String> set,
|
||||
final Set<String> ngramBlacklist) {
|
||||
final Set<String> newset = Sets.newLinkedHashSet();
|
||||
for (final String s : set) {
|
||||
if (!ngramBlacklist.contains(s)) {
|
||||
|
@ -268,7 +270,7 @@ public abstract class AbstractPaceFunctions {
|
|||
return m;
|
||||
}
|
||||
|
||||
public String removeKeywords(String s, Set<String> keywords) {
|
||||
public static String removeKeywords(String s, Set<String> keywords) {
|
||||
|
||||
s = " " + s + " ";
|
||||
for (String k : keywords) {
|
||||
|
@ -278,39 +280,39 @@ public abstract class AbstractPaceFunctions {
|
|||
return s.trim();
|
||||
}
|
||||
|
||||
public double commonElementsPercentage(Set<String> s1, Set<String> s2) {
|
||||
public static double commonElementsPercentage(Set<String> s1, Set<String> s2) {
|
||||
|
||||
double longer = Math.max(s1.size(), s2.size());
|
||||
return (double) s1.stream().filter(s2::contains).count() / longer;
|
||||
}
|
||||
|
||||
// convert the set of keywords to codes
|
||||
public Set<String> toCodes(Set<String> keywords, Map<String, String> translationMap) {
|
||||
public static Set<String> toCodes(Set<String> keywords, Map<String, String> translationMap) {
|
||||
return keywords.stream().map(s -> translationMap.get(s)).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
public Set<String> keywordsToCodes(Set<String> keywords, Map<String, String> translationMap) {
|
||||
public static Set<String> keywordsToCodes(Set<String> keywords, Map<String, String> translationMap) {
|
||||
return toCodes(keywords, translationMap);
|
||||
}
|
||||
|
||||
public Set<String> citiesToCodes(Set<String> keywords) {
|
||||
public static Set<String> citiesToCodes(Set<String> keywords) {
|
||||
return toCodes(keywords, cityMap);
|
||||
}
|
||||
|
||||
protected String firstLC(final String s) {
|
||||
protected static String firstLC(final String s) {
|
||||
return StringUtils.substring(s, 0, 1).toLowerCase();
|
||||
}
|
||||
|
||||
protected Iterable<String> tokens(final String s, final int maxTokens) {
|
||||
protected static Iterable<String> tokens(final String s, final int maxTokens) {
|
||||
return Iterables.limit(Splitter.on(" ").omitEmptyStrings().trimResults().split(s), maxTokens);
|
||||
}
|
||||
|
||||
public String normalizePid(String pid) {
|
||||
public static String normalizePid(String pid) {
|
||||
return DOI_PREFIX.matcher(pid.toLowerCase()).replaceAll("");
|
||||
}
|
||||
|
||||
// get the list of keywords into the input string
|
||||
public Set<String> getKeywords(String s1, Map<String, String> translationMap, int windowSize) {
|
||||
public static Set<String> getKeywords(String s1, Map<String, String> translationMap, int windowSize) {
|
||||
|
||||
String s = s1;
|
||||
|
||||
|
@ -340,7 +342,7 @@ public abstract class AbstractPaceFunctions {
|
|||
return codes;
|
||||
}
|
||||
|
||||
public Set<String> getCities(String s1, int windowSize) {
|
||||
public static Set<String> getCities(String s1, int windowSize) {
|
||||
return getKeywords(s1, cityMap, windowSize);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ public class ClusteringDef implements Serializable {
|
|||
|
||||
private List<String> fields;
|
||||
|
||||
private Map<String, Integer> params;
|
||||
private Map<String, Object> params;
|
||||
|
||||
public ClusteringDef() {
|
||||
}
|
||||
|
@ -43,11 +43,11 @@ public class ClusteringDef implements Serializable {
|
|||
this.fields = fields;
|
||||
}
|
||||
|
||||
public Map<String, Integer> getParams() {
|
||||
public Map<String, Object> getParams() {
|
||||
return params;
|
||||
}
|
||||
|
||||
public void setParams(final Map<String, Integer> params) {
|
||||
public void setParams(final Map<String, Object> params) {
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
package eu.dnetlib.pace.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
|
@ -36,6 +37,16 @@ public class FieldDef implements Serializable {
|
|||
*/
|
||||
private int length = -1;
|
||||
|
||||
private HashSet<String> filter;
|
||||
|
||||
private boolean sorted;
|
||||
|
||||
public boolean isSorted() {
|
||||
return sorted;
|
||||
}
|
||||
|
||||
private String clean;
|
||||
|
||||
public FieldDef() {
|
||||
}
|
||||
|
||||
|
@ -91,6 +102,30 @@ public class FieldDef implements Serializable {
|
|||
this.path = path;
|
||||
}
|
||||
|
||||
public HashSet<String> getFilter() {
|
||||
return filter;
|
||||
}
|
||||
|
||||
public void setFilter(HashSet<String> filter) {
|
||||
this.filter = filter;
|
||||
}
|
||||
|
||||
public boolean getSorted() {
|
||||
return sorted;
|
||||
}
|
||||
|
||||
public void setSorted(boolean sorted) {
|
||||
this.sorted = sorted;
|
||||
}
|
||||
|
||||
public String getClean() {
|
||||
return clean;
|
||||
}
|
||||
|
||||
public void setClean(String clean) {
|
||||
this.clean = clean;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
try {
|
||||
|
|
|
@ -5,9 +5,9 @@ import eu.dnetlib.pace.util.{BlockProcessor, SparkReporter}
|
|||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.sql.catalyst.expressions.Literal
|
||||
import org.apache.spark.sql.expressions._
|
||||
import org.apache.spark.sql.functions.{col, lit, udf}
|
||||
import org.apache.spark.sql.functions.{col, desc, expr, lit, udf}
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql.{Column, Dataset, Row, functions}
|
||||
import org.apache.spark.sql.{Column, Dataset, Row, SaveMode, functions}
|
||||
|
||||
import java.util.function.Predicate
|
||||
import java.util.stream.Collectors
|
||||
|
@ -80,6 +80,8 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable {
|
|||
.withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(inputColumns: _*))))
|
||||
// Add position column having the position of the row within the set of rows having the same key value ordered by the sorting value
|
||||
.withColumn("position", functions.row_number().over(Window.partitionBy("key").orderBy(col(model.orderingFieldName), col(model.identifierFieldName))))
|
||||
// .withColumn("count", functions.max("position").over(Window.partitionBy("key").orderBy(col(model.orderingFieldName), col(model.identifierFieldName)).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing) ))
|
||||
// .filter("count > 1")
|
||||
|
||||
if (df_with_clustering_keys == null)
|
||||
df_with_clustering_keys = ds
|
||||
|
@ -88,20 +90,44 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable {
|
|||
}
|
||||
|
||||
//TODO: analytics
|
||||
/*df_with_clustering_keys.groupBy(col("clustering"), col("key"))
|
||||
.agg(expr("max(count) AS size"))
|
||||
.orderBy(desc("size"))
|
||||
.show*/
|
||||
|
||||
val df_with_blocks = df_with_clustering_keys
|
||||
// filter out rows with position exceeding the maxqueuesize parameter
|
||||
.filter(col("position").leq(conf.getWf.getQueueMaxSize))
|
||||
.groupBy("clustering", "key")
|
||||
// split the clustering block into smaller blocks of queuemaxsize
|
||||
.groupBy(col("clustering"), col("key"), functions.floor(col("position").divide(lit(conf.getWf.getQueueMaxSize))))
|
||||
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
|
||||
.filter(functions.size(new Column("block")).gt(1))
|
||||
.union(
|
||||
//adjacency blocks
|
||||
df_with_clustering_keys
|
||||
// filter out leading and trailing elements
|
||||
.filter(col("position").gt(conf.getWf.getSlidingWindowSize/2))
|
||||
//.filter(col("position").lt(col("count").minus(conf.getWf.getSlidingWindowSize/2)))
|
||||
// create small blocks of records on "the border" of maxqueuesize: getSlidingWindowSize/2 elements before and after
|
||||
.filter(
|
||||
col("position").mod(conf.getWf.getQueueMaxSize).lt(conf.getWf.getSlidingWindowSize/2) // slice of the start of block
|
||||
|| col("position").mod(conf.getWf.getQueueMaxSize).gt(conf.getWf.getQueueMaxSize - (conf.getWf.getSlidingWindowSize/2)) //slice of the end of the block
|
||||
)
|
||||
.groupBy(col("clustering"), col("key"), functions.floor((col("position") + lit(conf.getWf.getSlidingWindowSize/2)).divide(lit(conf.getWf.getQueueMaxSize))))
|
||||
.agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block"))
|
||||
.filter(functions.size(new Column("block")).gt(1))
|
||||
)
|
||||
|
||||
df_with_blocks
|
||||
}
|
||||
|
||||
def clusterValuesUDF(cd: ClusteringDef) = {
|
||||
udf[mutable.WrappedArray[String], mutable.WrappedArray[Any]](values => {
|
||||
values.flatMap(f => cd.clusteringFunction().apply(conf, Seq(f.toString).asJava).asScala)
|
||||
val valueList = values.flatMap {
|
||||
case a: mutable.WrappedArray[Any] => a.map(_.toString)
|
||||
case s: Any => Seq(s.toString)
|
||||
}.asJava;
|
||||
|
||||
mutable.WrappedArray.make(cd.clusteringFunction().apply(conf, valueList).toArray())
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -1,13 +1,16 @@
|
|||
package eu.dnetlib.pace.model
|
||||
|
||||
import com.jayway.jsonpath.{Configuration, JsonPath}
|
||||
import eu.dnetlib.pace.common.AbstractPaceFunctions
|
||||
import eu.dnetlib.pace.config.{DedupConfig, Type}
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil
|
||||
import org.apache.commons.lang3.StringUtils
|
||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder
|
||||
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
|
||||
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
|
||||
import org.apache.spark.sql.{Dataset, Row}
|
||||
|
||||
import java.util.Locale
|
||||
import java.util.regex.Pattern
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
|
@ -60,7 +63,7 @@ case class SparkModel(conf: DedupConfig) {
|
|||
values(identityFieldPosition) = MapDocumentUtil.getJPathString(conf.getWf.getIdPath, documentContext)
|
||||
|
||||
schema.fieldNames.zipWithIndex.foldLeft(values) {
|
||||
case ((res, (fname, index))) => {
|
||||
case ((res, (fname, index))) =>
|
||||
val fdef = conf.getPace.getModelMap.get(fname)
|
||||
|
||||
if (fdef != null) {
|
||||
|
@ -96,13 +99,52 @@ case class SparkModel(conf: DedupConfig) {
|
|||
case Type.DoubleArray =>
|
||||
MapDocumentUtil.getJPathArray(fdef.getPath, json)
|
||||
}
|
||||
|
||||
val filter = fdef.getFilter
|
||||
|
||||
if (StringUtils.isNotBlank(fdef.getClean)) {
|
||||
res(index) = res(index) match {
|
||||
case x: Seq[String] => x.map(clean(_, fdef.getClean)).toSeq
|
||||
case _ => clean(res(index).toString, fdef.getClean)
|
||||
}
|
||||
}
|
||||
|
||||
if (filter != null && !filter.isEmpty) {
|
||||
res(index) = res(index) match {
|
||||
case x: String if filter.contains(x.toLowerCase(Locale.ROOT)) => null
|
||||
case x: Seq[String] => x.filter(s => !filter.contains(s.toLowerCase(Locale.ROOT))).toSeq
|
||||
case _ => res(index)
|
||||
}
|
||||
}
|
||||
|
||||
if (fdef.getSorted) {
|
||||
res(index) = res(index) match {
|
||||
case x: Seq[String] => x.sorted.toSeq
|
||||
case _ => res(index)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
new GenericRowWithSchema(values, schema)
|
||||
}
|
||||
|
||||
def clean(value: String, cleantype: String) : String = {
|
||||
val res = cleantype match {
|
||||
case "title" => AbstractPaceFunctions.cleanup(value)
|
||||
case _ => value
|
||||
}
|
||||
|
||||
// if (!res.equals(AbstractPaceFunctions.normalize(value))) {
|
||||
// println(res)
|
||||
// println(AbstractPaceFunctions.normalize(value))
|
||||
// println()
|
||||
// }
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ public class AuthorsMatch extends AbstractListComparator {
|
|||
private String MODE; // full or surname
|
||||
private int SIZE_THRESHOLD;
|
||||
private String TYPE; // count or percentage
|
||||
private int common;
|
||||
|
||||
public AuthorsMatch(Map<String, String> params) {
|
||||
super(params, new com.wcohen.ss.JaroWinkler());
|
||||
|
@ -35,7 +34,6 @@ public class AuthorsMatch extends AbstractListComparator {
|
|||
FULLNAME_THRESHOLD = Double.parseDouble(params.getOrDefault("fullname_th", "0.9"));
|
||||
SIZE_THRESHOLD = Integer.parseInt(params.getOrDefault("size_th", "20"));
|
||||
TYPE = params.getOrDefault("type", "percentage");
|
||||
common = 0;
|
||||
}
|
||||
|
||||
protected AuthorsMatch(double w, AbstractStringDistance ssalgo) {
|
||||
|
@ -44,22 +42,27 @@ public class AuthorsMatch extends AbstractListComparator {
|
|||
|
||||
@Override
|
||||
public double compare(final List<String> a, final List<String> b, final Config conf) {
|
||||
|
||||
if (a.isEmpty() || b.isEmpty())
|
||||
return -1;
|
||||
|
||||
if (a.size() > SIZE_THRESHOLD || b.size() > SIZE_THRESHOLD)
|
||||
return 1.0;
|
||||
|
||||
List<Person> aList = a.stream().map(author -> new Person(author, false)).collect(Collectors.toList());
|
||||
int maxMiss = Integer.MAX_VALUE;
|
||||
List<Person> bList = b.stream().map(author -> new Person(author, false)).collect(Collectors.toList());
|
||||
|
||||
common = 0;
|
||||
Double threshold = getDoubleParam("threshold");
|
||||
|
||||
if (threshold != null && threshold >= 0.0 && threshold <= 1.0 && a.size() == b.size()) {
|
||||
maxMiss = (int) Math.floor((1 - threshold) * Math.max(a.size(), b.size()));
|
||||
}
|
||||
|
||||
int common = 0;
|
||||
// compare each element of List1 with each element of List2
|
||||
for (Person p1 : aList)
|
||||
for (int i = 0; i < a.size(); i++) {
|
||||
Person p1 = new Person(a.get(i), false);
|
||||
|
||||
for (Person p2 : bList) {
|
||||
|
||||
// both persons are inaccurate
|
||||
if (!p1.isAccurate() && !p2.isAccurate()) {
|
||||
// compare just normalized fullnames
|
||||
|
@ -118,11 +121,15 @@ public class AuthorsMatch extends AbstractListComparator {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (i - common > maxMiss) {
|
||||
return 0.0;
|
||||
}
|
||||
}
|
||||
|
||||
// normalization factor to compute the score
|
||||
int normFactor = aList.size() == bList.size() ? aList.size() : (aList.size() + bList.size() - common);
|
||||
int normFactor = a.size() == b.size() ? a.size() : (a.size() + b.size() - common);
|
||||
|
||||
if (TYPE.equals("percentage")) {
|
||||
return (double) common / normFactor;
|
||||
|
|
|
@ -25,6 +25,7 @@ public class InstanceTypeMatch extends AbstractListComparator {
|
|||
translationMap.put("Conference object", "*");
|
||||
translationMap.put("Other literature type", "*");
|
||||
translationMap.put("Unknown", "*");
|
||||
translationMap.put("UNKNOWN", "*");
|
||||
|
||||
// article types
|
||||
translationMap.put("Article", "Article");
|
||||
|
@ -76,5 +77,4 @@ public class InstanceTypeMatch extends AbstractListComparator {
|
|||
protected double normalize(final double d) {
|
||||
return d;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.pace.tree;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -30,16 +31,25 @@ public class LevensteinTitle extends AbstractStringComparator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public double distance(final String a, final String b, final Config conf) {
|
||||
final String ca = cleanup(a);
|
||||
final String cb = cleanup(b);
|
||||
|
||||
public double distance(final String ca, final String cb, final Config conf) {
|
||||
final boolean check = checkNumbers(ca, cb);
|
||||
|
||||
if (check)
|
||||
return 0.5;
|
||||
|
||||
return normalize(ssalgo.score(ca, cb), ca.length(), cb.length());
|
||||
Double threshold = getDoubleParam("threshold");
|
||||
|
||||
// reduce Levenshtein algo complexity when target threshold is known
|
||||
if (threshold != null && threshold >= 0.0 && threshold <= 1.0) {
|
||||
int maxdistance = (int) Math.floor((1 - threshold) * Math.max(ca.length(), cb.length()));
|
||||
int score = StringUtils.getLevenshteinDistance(ca, cb, maxdistance);
|
||||
if (score == -1) {
|
||||
return 0;
|
||||
}
|
||||
return normalize(score, ca.length(), cb.length());
|
||||
} else {
|
||||
return normalize(StringUtils.getLevenshteinDistance(ca, cb), ca.length(), cb.length());
|
||||
}
|
||||
}
|
||||
|
||||
private double normalize(final double score, final int la, final int lb) {
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
|
||||
package eu.dnetlib.pace.tree;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import eu.dnetlib.pace.config.Config;
|
||||
import eu.dnetlib.pace.tree.support.AbstractStringComparator;
|
||||
import eu.dnetlib.pace.tree.support.ComparatorClass;
|
||||
|
||||
@ComparatorClass("maxLengthMatch")
|
||||
public class MaxLengthMatch extends AbstractStringComparator {
|
||||
|
||||
private final int limit;
|
||||
|
||||
public MaxLengthMatch(Map<String, String> params) {
|
||||
super(params);
|
||||
|
||||
limit = Integer.parseInt(params.getOrDefault("limit", "200"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public double compare(String a, String b, final Config conf) {
|
||||
return a.length() < limit && b.length() < limit ? 1.0 : -1.0;
|
||||
}
|
||||
|
||||
protected String toString(final Object object) {
|
||||
return toFirstString(object);
|
||||
}
|
||||
}
|
|
@ -127,4 +127,14 @@ public abstract class AbstractComparator<T> extends AbstractPaceFunctions implem
|
|||
return this.weight;
|
||||
}
|
||||
|
||||
public Double getDoubleParam(String name) {
|
||||
String svalue = params.get(name);
|
||||
|
||||
try {
|
||||
return Double.parseDouble(svalue);
|
||||
} catch (Throwable t) {
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,8 +67,10 @@ public class BlockProcessor {
|
|||
|
||||
private void processRows(final List<Row> queue, final Reporter context) {
|
||||
|
||||
for (int pivotPos = 0; pivotPos < queue.size(); pivotPos++) {
|
||||
final Row pivot = queue.get(pivotPos);
|
||||
IncrementalConnectedComponents icc = new IncrementalConnectedComponents(queue.size());
|
||||
|
||||
for (int i = 0; i < queue.size(); i++) {
|
||||
final Row pivot = queue.get(i);
|
||||
|
||||
final String idPivot = pivot.getString(identifierFieldPos); // identifier
|
||||
final Object fieldsPivot = getJavaValue(pivot, orderFieldPos);
|
||||
|
@ -76,9 +78,9 @@ public class BlockProcessor {
|
|||
final WfConfig wf = dedupConf.getWf();
|
||||
|
||||
if (fieldPivot != null) {
|
||||
int i = 0;
|
||||
for (int windowPos = pivotPos + 1; windowPos < queue.size(); windowPos++) {
|
||||
final Row curr = queue.get(windowPos);
|
||||
for (int j = icc.nextUnconnected(i, i + 1); j >= 0
|
||||
&& j < queue.size(); j = icc.nextUnconnected(i, j + 1)) {
|
||||
final Row curr = queue.get(j);
|
||||
final String idCurr = curr.getString(identifierFieldPos); // identifier
|
||||
|
||||
if (mustSkip(idCurr)) {
|
||||
|
@ -86,7 +88,7 @@ public class BlockProcessor {
|
|||
break;
|
||||
}
|
||||
|
||||
if (++i > wf.getSlidingWindowSize()) {
|
||||
if (wf.getSlidingWindowSize() > 0 && (j - i) > wf.getSlidingWindowSize()) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -97,7 +99,9 @@ public class BlockProcessor {
|
|||
|
||||
final TreeProcessor treeProcessor = new TreeProcessor(dedupConf);
|
||||
|
||||
emitOutput(treeProcessor.compare(pivot, curr), idPivot, idCurr, context);
|
||||
if (emitOutput(treeProcessor.compare(pivot, curr), idPivot, idCurr, context)) {
|
||||
icc.connect(i, j);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -115,7 +119,8 @@ public class BlockProcessor {
|
|||
return null;
|
||||
}
|
||||
|
||||
private void emitOutput(final boolean result, final String idPivot, final String idCurr, final Reporter context) {
|
||||
private boolean emitOutput(final boolean result, final String idPivot, final String idCurr,
|
||||
final Reporter context) {
|
||||
|
||||
if (result) {
|
||||
if (idPivot.compareTo(idCurr) <= 0) {
|
||||
|
@ -127,6 +132,8 @@ public class BlockProcessor {
|
|||
} else {
|
||||
context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private boolean mustSkip(final String idPivot) {
|
||||
|
@ -142,5 +149,4 @@ public class BlockProcessor {
|
|||
|
||||
context.emit(type, from, to);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
|
||||
package eu.dnetlib.pace.util;
|
||||
|
||||
import java.util.BitSet;
|
||||
|
||||
public class IncrementalConnectedComponents {
|
||||
final private int size;
|
||||
|
||||
final private BitSet[] indexes;
|
||||
|
||||
IncrementalConnectedComponents(int size) {
|
||||
this.size = size;
|
||||
this.indexes = new BitSet[size];
|
||||
}
|
||||
|
||||
public void connect(int i, int j) {
|
||||
if (indexes[i] == null) {
|
||||
if (indexes[j] == null) {
|
||||
indexes[i] = new BitSet(size);
|
||||
} else {
|
||||
indexes[i] = indexes[j];
|
||||
}
|
||||
} else {
|
||||
if (indexes[j] != null && indexes[i] != indexes[j]) {
|
||||
// merge adjacency lists for i and j
|
||||
indexes[i].or(indexes[j]);
|
||||
}
|
||||
}
|
||||
|
||||
indexes[i].set(i);
|
||||
indexes[i].set(j);
|
||||
indexes[j] = indexes[i];
|
||||
}
|
||||
|
||||
public int nextUnconnected(int i, int j) {
|
||||
if (indexes[i] == null) {
|
||||
return j;
|
||||
}
|
||||
int result = indexes[i].nextClearBit(j);
|
||||
|
||||
return (result >= size) ? -1 : result;
|
||||
}
|
||||
|
||||
public BitSet getConnections(int i) {
|
||||
if (indexes[i] == null) {
|
||||
return null;
|
||||
}
|
||||
return indexes[i];
|
||||
}
|
||||
}
|
|
@ -97,6 +97,8 @@ public class MapDocumentUtil {
|
|||
Object o = json.read(jsonPath);
|
||||
if (o instanceof String)
|
||||
return (String) o;
|
||||
if (o instanceof Number)
|
||||
return (String) o.toString();
|
||||
if (o instanceof JSONArray && ((JSONArray) o).size() > 0)
|
||||
return (String) ((JSONArray) o).get(0);
|
||||
return "";
|
||||
|
|
|
@ -40,7 +40,7 @@ public class PaceResolver implements Serializable {
|
|||
Collectors.toMap(cl -> cl.getAnnotation(ComparatorClass.class).value(), cl -> (Class<Comparator>) cl));
|
||||
}
|
||||
|
||||
public ClusteringFunction getClusteringFunction(String name, Map<String, Integer> params) throws PaceException {
|
||||
public ClusteringFunction getClusteringFunction(String name, Map<String, Object> params) throws PaceException {
|
||||
try {
|
||||
return clusteringFunctions.get(name).getDeclaredConstructor(Map.class).newInstance(params);
|
||||
} catch (InstantiationException | IllegalAccessException | InvocationTargetException
|
||||
|
|
|
@ -15,7 +15,7 @@ import eu.dnetlib.pace.config.DedupConfig;
|
|||
|
||||
public class ClusteringFunctionTest extends AbstractPaceTest {
|
||||
|
||||
private static Map<String, Integer> params;
|
||||
private static Map<String, Object> params;
|
||||
private static DedupConfig conf;
|
||||
|
||||
@BeforeAll
|
||||
|
@ -40,10 +40,10 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
|
|||
|
||||
@Test
|
||||
public void testNgram() {
|
||||
params.put("ngramLen", 3);
|
||||
params.put("max", 8);
|
||||
params.put("maxPerToken", 2);
|
||||
params.put("minNgramLen", 1);
|
||||
params.put("ngramLen", "3");
|
||||
params.put("max", "8");
|
||||
params.put("maxPerToken", "2");
|
||||
params.put("minNgramLen", "1");
|
||||
|
||||
final ClusteringFunction ngram = new Ngrams(params);
|
||||
|
||||
|
@ -54,8 +54,8 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
|
|||
|
||||
@Test
|
||||
public void testNgramPairs() {
|
||||
params.put("ngramLen", 3);
|
||||
params.put("max", 2);
|
||||
params.put("ngramLen", "3");
|
||||
params.put("max", "2");
|
||||
|
||||
final ClusteringFunction np = new NgramPairs(params);
|
||||
|
||||
|
@ -66,8 +66,8 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
|
|||
|
||||
@Test
|
||||
public void testSortedNgramPairs() {
|
||||
params.put("ngramLen", 3);
|
||||
params.put("max", 2);
|
||||
params.put("ngramLen", "3");
|
||||
params.put("max", "2");
|
||||
|
||||
final ClusteringFunction np = new SortedNgramPairs(params);
|
||||
|
||||
|
@ -87,9 +87,9 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
|
|||
|
||||
@Test
|
||||
public void testAcronym() {
|
||||
params.put("max", 4);
|
||||
params.put("minLen", 1);
|
||||
params.put("maxLen", 3);
|
||||
params.put("max", "4");
|
||||
params.put("minLen", "1");
|
||||
params.put("maxLen", "3");
|
||||
|
||||
final ClusteringFunction acro = new Acronyms(params);
|
||||
|
||||
|
@ -100,8 +100,8 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
|
|||
|
||||
@Test
|
||||
public void testSuffixPrefix() {
|
||||
params.put("len", 3);
|
||||
params.put("max", 4);
|
||||
params.put("len", "3");
|
||||
params.put("max", "4");
|
||||
|
||||
final ClusteringFunction sp = new SuffixPrefix(params);
|
||||
|
||||
|
@ -109,8 +109,8 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
|
|||
System.out.println(s);
|
||||
System.out.println(sp.apply(conf, Lists.newArrayList(s)));
|
||||
|
||||
params.put("len", 3);
|
||||
params.put("max", 1);
|
||||
params.put("len", "3");
|
||||
params.put("max", "1");
|
||||
|
||||
System.out.println(sp.apply(conf, Lists.newArrayList("Framework for general-purpose deduplication")));
|
||||
}
|
||||
|
@ -118,8 +118,8 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
|
|||
@Test
|
||||
public void testWordsSuffixPrefix() {
|
||||
|
||||
params.put("len", 3);
|
||||
params.put("max", 4);
|
||||
params.put("len", "3");
|
||||
params.put("max", "4");
|
||||
|
||||
final ClusteringFunction sp = new WordsSuffixPrefix(params);
|
||||
|
||||
|
@ -130,7 +130,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
|
|||
|
||||
@Test
|
||||
public void testWordsStatsSuffixPrefix() {
|
||||
params.put("mod", 10);
|
||||
params.put("mod", "10");
|
||||
|
||||
final ClusteringFunction sp = new WordsStatsSuffixPrefixChain(params);
|
||||
|
||||
|
@ -167,7 +167,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest {
|
|||
@Test
|
||||
public void testFieldValue() {
|
||||
|
||||
params.put("randomLength", 5);
|
||||
params.put("randomLength", "5");
|
||||
|
||||
final ClusteringFunction sp = new SpaceTrimmingFieldValue(params);
|
||||
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
|
||||
package eu.dnetlib.pace.util;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class IncrementalConnectedComponentsTest {
|
||||
|
||||
@Test
|
||||
public void transitiveClosureTest() {
|
||||
IncrementalConnectedComponents icc = new IncrementalConnectedComponents(10);
|
||||
|
||||
icc.connect(0, 1);
|
||||
icc.connect(0, 2);
|
||||
icc.connect(0, 3);
|
||||
|
||||
icc.connect(1, 2);
|
||||
icc.connect(1, 4);
|
||||
icc.connect(1, 5);
|
||||
|
||||
icc.connect(6, 7);
|
||||
icc.connect(6, 9);
|
||||
|
||||
assertEquals(icc.getConnections(0).toString(), "{0, 1, 2, 3, 4, 5}");
|
||||
assertEquals(icc.getConnections(1).toString(), "{0, 1, 2, 3, 4, 5}");
|
||||
assertEquals(icc.getConnections(2).toString(), "{0, 1, 2, 3, 4, 5}");
|
||||
assertEquals(icc.getConnections(3).toString(), "{0, 1, 2, 3, 4, 5}");
|
||||
assertEquals(icc.getConnections(4).toString(), "{0, 1, 2, 3, 4, 5}");
|
||||
assertEquals(icc.getConnections(5).toString(), "{0, 1, 2, 3, 4, 5}");
|
||||
|
||||
assertEquals(icc.getConnections(6).toString(), "{6, 7, 9}");
|
||||
assertEquals(icc.getConnections(7).toString(), "{6, 7, 9}");
|
||||
assertEquals(icc.getConnections(9).toString(), "{6, 7, 9}");
|
||||
|
||||
assertNull(icc.getConnections(8));
|
||||
}
|
||||
|
||||
}
|
|
@ -101,6 +101,10 @@ abstract class AbstractSparkAction implements Serializable {
|
|||
return SparkSession.builder().config(conf).getOrCreate();
|
||||
}
|
||||
|
||||
protected static SparkSession getSparkWithHiveSession(SparkConf conf) {
|
||||
return SparkSession.builder().enableHiveSupport().config(conf).getOrCreate();
|
||||
}
|
||||
|
||||
protected static <T> void save(Dataset<T> dataset, String outPath, SaveMode mode) {
|
||||
dataset.write().option("compression", "gzip").mode(mode).json(outPath);
|
||||
}
|
||||
|
|
|
@ -2,20 +2,19 @@
|
|||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.beanutils.BeanUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.api.java.function.ReduceFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
||||
|
@ -41,88 +40,91 @@ public class DedupRecordFactory {
|
|||
long ts = System.currentTimeMillis();
|
||||
|
||||
// <id, json_entity>
|
||||
Dataset<Tuple2<String, T>> entities = spark
|
||||
Dataset<Row> entities = spark
|
||||
.read()
|
||||
.textFile(entitiesInputPath)
|
||||
.schema(Encoders.bean(clazz).schema())
|
||||
.json(entitiesInputPath)
|
||||
.as(Encoders.bean(clazz))
|
||||
.map(
|
||||
(MapFunction<String, Tuple2<String, T>>) it -> {
|
||||
T entity = OBJECT_MAPPER.readValue(it, clazz);
|
||||
(MapFunction<T, Tuple2<String, T>>) entity -> {
|
||||
return new Tuple2<>(entity.getId(), entity);
|
||||
},
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)))
|
||||
.selectExpr("_1 AS id", "_2 AS kryoObject");
|
||||
|
||||
// <source, target>: source is the dedup_id, target is the id of the mergedIn
|
||||
Dataset<Tuple2<String, String>> mergeRels = spark
|
||||
Dataset<Row> mergeRels = spark
|
||||
.read()
|
||||
.load(mergeRelsInputPath)
|
||||
.as(Encoders.bean(Relation.class))
|
||||
.where("relClass == 'merges'")
|
||||
.map(
|
||||
(MapFunction<Relation, Tuple2<String, String>>) r -> new Tuple2<>(r.getSource(), r.getTarget()),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
|
||||
.selectExpr("source as dedupId", "target as id");
|
||||
|
||||
return mergeRels
|
||||
.joinWith(entities, mergeRels.col("_2").equalTo(entities.col("_1")), "inner")
|
||||
.join(entities, "id")
|
||||
.select("dedupId", "kryoObject")
|
||||
.as(Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)))
|
||||
.groupByKey((MapFunction<Tuple2<String, T>, String>) Tuple2::_1, Encoders.STRING())
|
||||
.reduceGroups(
|
||||
(ReduceFunction<Tuple2<String, T>>) (t1, t2) -> new Tuple2<>(t1._1(),
|
||||
reduceEntity(t1._1(), t1._2(), t2._2(), clazz)))
|
||||
.map(
|
||||
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, T>>, Tuple2<String, T>>) value -> new Tuple2<>(
|
||||
value._1()._1(), value._2()._2()),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)))
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<String, T>, String>) Tuple2::_1, Encoders.STRING())
|
||||
.mapGroups(
|
||||
(MapGroupsFunction<String, Tuple2<String, T>, T>) (key,
|
||||
values) -> entityMerger(key, values, ts, dataInfo, clazz),
|
||||
(MapFunction<Tuple2<String, Tuple2<String, T>>, T>) t -> {
|
||||
T res = t._2()._2();
|
||||
res.setDataInfo(dataInfo);
|
||||
res.setLastupdatetimestamp(ts);
|
||||
return res;
|
||||
},
|
||||
Encoders.bean(clazz));
|
||||
}
|
||||
|
||||
public static <T extends OafEntity> T reduceEntity(
|
||||
String id, T entity, T duplicate, Class<T> clazz) {
|
||||
|
||||
int compare = new IdentifierComparator()
|
||||
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
|
||||
|
||||
if (compare > 0) {
|
||||
T swap = duplicate;
|
||||
duplicate = entity;
|
||||
entity = swap;
|
||||
}
|
||||
|
||||
entity.mergeFrom(duplicate);
|
||||
entity.setId(id);
|
||||
|
||||
if (ModelSupport.isSubClass(duplicate, Result.class)) {
|
||||
Result re = (Result) entity;
|
||||
Result rd = (Result) duplicate;
|
||||
|
||||
List<List<Author>> authors = new ArrayList<>();
|
||||
if (re.getAuthor() != null) {
|
||||
authors.add(re.getAuthor());
|
||||
}
|
||||
if (rd.getAuthor() != null) {
|
||||
authors.add(rd.getAuthor());
|
||||
}
|
||||
|
||||
re.setAuthor(AuthorMerger.merge(authors));
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
public static <T extends OafEntity> T entityMerger(
|
||||
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz)
|
||||
throws IllegalAccessException, InstantiationException, InvocationTargetException {
|
||||
T base = entities.next()._2();
|
||||
|
||||
final Comparator<Identifier<T>> idComparator = new IdentifierComparator<>();
|
||||
|
||||
final LinkedList<T> entityList = Lists
|
||||
.newArrayList(entities)
|
||||
.stream()
|
||||
.map(t -> Identifier.newInstance(t._2()))
|
||||
.sorted(idComparator)
|
||||
.map(Identifier::getEntity)
|
||||
.collect(Collectors.toCollection(LinkedList::new));
|
||||
|
||||
final T entity = clazz.newInstance();
|
||||
final T first = entityList.removeFirst();
|
||||
|
||||
BeanUtils.copyProperties(entity, first);
|
||||
|
||||
final List<List<Author>> authors = Lists.newArrayList();
|
||||
|
||||
entityList
|
||||
.forEach(
|
||||
duplicate -> {
|
||||
entity.mergeFrom(duplicate);
|
||||
if (ModelSupport.isSubClass(duplicate, Result.class)) {
|
||||
Result r1 = (Result) duplicate;
|
||||
Optional
|
||||
.ofNullable(r1.getAuthor())
|
||||
.ifPresent(a -> authors.add(a));
|
||||
}
|
||||
});
|
||||
|
||||
// set authors and date
|
||||
if (ModelSupport.isSubClass(entity, Result.class)) {
|
||||
Optional
|
||||
.ofNullable(((Result) entity).getAuthor())
|
||||
.ifPresent(a -> authors.add(a));
|
||||
|
||||
((Result) entity).setAuthor(AuthorMerger.merge(authors));
|
||||
while (entities.hasNext()) {
|
||||
T duplicate = entities.next()._2();
|
||||
if (duplicate != null)
|
||||
base = reduceEntity(id, base, duplicate, clazz);
|
||||
}
|
||||
|
||||
entity.setId(id);
|
||||
base.setDataInfo(dataInfo);
|
||||
base.setLastupdatetimestamp(ts);
|
||||
|
||||
entity.setLastupdatetimestamp(ts);
|
||||
entity.setDataInfo(dataInfo);
|
||||
|
||||
return entity;
|
||||
return base;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.md5;
|
||||
import static org.apache.commons.lang3.StringUtils.substringAfter;
|
||||
import static org.apache.commons.lang3.StringUtils.substringBefore;
|
||||
|
||||
|
@ -14,33 +15,36 @@ import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
|||
public class IdGenerator implements Serializable {
|
||||
|
||||
// pick the best pid from the list (consider date and pidtype)
|
||||
public static <T extends OafEntity> String generate(List<Identifier<T>> pids, String defaultID) {
|
||||
public static <T extends OafEntity> String generate(List<? extends Identifier> pids, String defaultID) {
|
||||
if (pids == null || pids.isEmpty())
|
||||
return defaultID;
|
||||
|
||||
return generateId(pids);
|
||||
}
|
||||
|
||||
private static <T extends OafEntity> String generateId(List<Identifier<T>> pids) {
|
||||
Identifier<T> bp = pids
|
||||
private static String generateId(List<? extends Identifier> pids) {
|
||||
Identifier bp = pids
|
||||
.stream()
|
||||
.min(Identifier::compareTo)
|
||||
.orElseThrow(() -> new IllegalStateException("unable to generate id"));
|
||||
|
||||
String prefix = substringBefore(bp.getOriginalID(), "|");
|
||||
String ns = substringBefore(substringAfter(bp.getOriginalID(), "|"), "::");
|
||||
String suffix = substringAfter(bp.getOriginalID(), "::");
|
||||
return generate(bp.getOriginalID());
|
||||
}
|
||||
|
||||
public static String generate(String originalId) {
|
||||
String prefix = substringBefore(originalId, "|");
|
||||
String ns = substringBefore(substringAfter(originalId, "|"), "::");
|
||||
String suffix = substringAfter(originalId, "::");
|
||||
|
||||
final String pidType = substringBefore(ns, "_");
|
||||
if (PidType.isValid(pidType)) {
|
||||
return prefix + "|" + dedupify(ns) + "::" + suffix;
|
||||
} else {
|
||||
return prefix + "|dedup_wf_001::" + suffix;
|
||||
return prefix + "|dedup_wf_001::" + md5(originalId); // hash the whole originalId to avoid collisions
|
||||
}
|
||||
}
|
||||
|
||||
private static String dedupify(String ns) {
|
||||
|
||||
StringBuilder prefix;
|
||||
if (PidType.valueOf(substringBefore(ns, "_")) == PidType.openorgs) {
|
||||
prefix = new StringBuilder(substringBefore(ns, "_"));
|
||||
|
@ -53,5 +57,4 @@ public class IdGenerator implements Serializable {
|
|||
}
|
||||
return prefix.substring(0, 12);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,51 +1,47 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.graphx.Edge;
|
||||
import org.apache.spark.rdd.RDD;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.hash.Hashing;
|
||||
|
||||
import com.kwartile.lib.cc.ConnectedComponent;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent;
|
||||
import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor;
|
||||
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import scala.Tuple2;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
|
||||
import org.apache.spark.sql.expressions.UserDefinedFunction;
|
||||
import org.apache.spark.sql.expressions.Window;
|
||||
import org.apache.spark.sql.expressions.WindowSpec;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructType;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.xml.sax.SAXException;
|
||||
import scala.Tuple3;
|
||||
import scala.collection.JavaConversions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDate;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
|
||||
import static org.apache.spark.sql.functions.*;
|
||||
|
||||
public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||
|
||||
|
@ -68,10 +64,12 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
|||
log.info("isLookupUrl {}", isLookUpUrl);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"));
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
|
||||
new SparkCreateMergeRels(parser, getSparkSession(conf))
|
||||
new SparkCreateMergeRels(parser, getSparkWithHiveSession(conf))
|
||||
.run(ISLookupClientFactory.getLookUpService(isLookUpUrl));
|
||||
}
|
||||
|
||||
|
@ -87,14 +85,15 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
|||
.ofNullable(parser.get("cutConnectedComponent"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(0);
|
||||
|
||||
final String pivotHistoryDatabase = parser.get("pivotHistoryDatabase");
|
||||
|
||||
log.info("connected component cut: '{}'", cut);
|
||||
log.info("graphBasePath: '{}'", graphBasePath);
|
||||
log.info("isLookUpUrl: '{}'", isLookUpUrl);
|
||||
log.info("actionSetId: '{}'", actionSetId);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
|
||||
final String subEntity = dedupConf.getWf().getSubEntityValue();
|
||||
final Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
|
||||
|
@ -106,113 +105,170 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
|||
|
||||
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
|
||||
|
||||
// <hash(id), id>
|
||||
JavaPairRDD<Object, String> vertexes = createVertexes(sc, graphBasePath, subEntity, dedupConf);
|
||||
|
||||
final RDD<Edge<String>> edgeRdd = spark
|
||||
final Dataset<Row> simRels = spark
|
||||
.read()
|
||||
.load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
|
||||
.as(Encoders.bean(Relation.class))
|
||||
.javaRDD()
|
||||
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
|
||||
.rdd();
|
||||
.select("source", "target");
|
||||
|
||||
Dataset<Tuple2<String, String>> rawMergeRels = spark
|
||||
.createDataset(
|
||||
GraphProcessor
|
||||
.findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut)
|
||||
.toJavaRDD()
|
||||
.filter(k -> k.getIds().size() > 1)
|
||||
.flatMap(this::ccToRels)
|
||||
.rdd(),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
|
||||
UserDefinedFunction hashUDF = functions
|
||||
.udf(
|
||||
(String s) -> hash(s), DataTypes.LongType);
|
||||
|
||||
Dataset<Tuple2<String, OafEntity>> entities = spark
|
||||
// <hash(id), id>
|
||||
Dataset<Row> vertexIdMap = simRels
|
||||
.selectExpr("source as id")
|
||||
.union(simRels.selectExpr("target as id"))
|
||||
.distinct()
|
||||
.withColumn("vertexId", hashUDF.apply(functions.col("id")));
|
||||
|
||||
final Dataset<Row> edges = spark
|
||||
.read()
|
||||
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||
.map(
|
||||
(MapFunction<String, Tuple2<String, OafEntity>>) it -> {
|
||||
OafEntity entity = OBJECT_MAPPER.readValue(it, clazz);
|
||||
return new Tuple2<>(entity.getId(), entity);
|
||||
},
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
|
||||
.load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
|
||||
.select("source", "target")
|
||||
.withColumn("source", hashUDF.apply(functions.col("source")))
|
||||
.withColumn("target", hashUDF.apply(functions.col("target")));
|
||||
|
||||
Dataset<Relation> mergeRels = rawMergeRels
|
||||
.joinWith(entities, rawMergeRels.col("_2").equalTo(entities.col("_1")), "inner")
|
||||
// <tmp_source,target>,<target,entity>
|
||||
.map(
|
||||
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, OafEntity>>, Tuple2<String, OafEntity>>) value -> new Tuple2<>(
|
||||
value._1()._1(), value._2()._2()),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)))
|
||||
// <tmp_source,entity>
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<String, OafEntity>, String>) Tuple2::_1, Encoders.STRING())
|
||||
.mapGroups(
|
||||
(MapGroupsFunction<String, Tuple2<String, OafEntity>, ConnectedComponent>) this::generateID,
|
||||
Encoders.bean(ConnectedComponent.class))
|
||||
// <root_id, list(target)>
|
||||
Dataset<Row> cliques = ConnectedComponent
|
||||
.runOnPairs(edges, 50, spark);
|
||||
|
||||
Dataset<Row> rawMergeRels = cliques
|
||||
.join(vertexIdMap, JavaConversions.asScalaBuffer(Collections.singletonList("vertexId")), "inner")
|
||||
.drop("vertexId")
|
||||
.distinct();
|
||||
|
||||
Dataset<Row> pivotHistory = spark
|
||||
.createDataset(
|
||||
Collections.emptyList(),
|
||||
RowEncoder
|
||||
.apply(StructType.fromDDL("id STRING, firstUsage STRING, lastUsage STRING, dedupId STRING")));
|
||||
|
||||
if (StringUtils.isNotBlank(pivotHistoryDatabase)) {
|
||||
pivotHistory = spark
|
||||
.read()
|
||||
.table(pivotHistoryDatabase + "." + subEntity)
|
||||
.selectExpr("id", "lastUsage", "dedupId");
|
||||
}
|
||||
|
||||
String collectedfromExpr = "false AS collectedfrom";
|
||||
String dateExpr = "'' AS date";
|
||||
|
||||
if (Result.class.isAssignableFrom(clazz)) {
|
||||
if (Publication.class.isAssignableFrom(clazz)) {
|
||||
collectedfromExpr = "array_contains(collectedfrom.key, '" + ModelConstants.CROSSREF_ID
|
||||
+ "') AS collectedfrom";
|
||||
} else if (eu.dnetlib.dhp.schema.oaf.Dataset.class.isAssignableFrom(clazz)) {
|
||||
collectedfromExpr = "array_contains(collectedfrom.key, '" + ModelConstants.DATACITE_ID
|
||||
+ "') AS collectedfrom";
|
||||
}
|
||||
|
||||
dateExpr = "dateofacceptance.value AS date";
|
||||
}
|
||||
|
||||
UserDefinedFunction mapPid = udf(
|
||||
(String s) -> Math.min(PidType.tryValueOf(s).ordinal(), PidType.w3id.ordinal()), DataTypes.IntegerType);
|
||||
UserDefinedFunction validDate = udf((String date) -> {
|
||||
if (StringUtils.isNotBlank(date)
|
||||
&& date.matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date)) {
|
||||
return date;
|
||||
}
|
||||
return LocalDate.now().plusWeeks(1).toString();
|
||||
}, DataTypes.StringType);
|
||||
|
||||
Dataset<Row> pivotingData = spark
|
||||
.read()
|
||||
.schema(Encoders.bean(clazz).schema())
|
||||
.json(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||
.selectExpr(
|
||||
"id",
|
||||
"regexp_extract(id, '^\\\\d+\\\\|([^_]+).*::', 1) AS pidType",
|
||||
collectedfromExpr,
|
||||
dateExpr)
|
||||
.withColumn("pidType", mapPid.apply(col("pidType"))) // ordinal of pid type
|
||||
.withColumn("date", validDate.apply(col("date")));
|
||||
|
||||
UserDefinedFunction generateDedupId = udf((String s) -> IdGenerator.generate(s), DataTypes.StringType);
|
||||
|
||||
// ordering to selected pivot id
|
||||
WindowSpec w = Window
|
||||
.partitionBy("groupId")
|
||||
.orderBy(
|
||||
col("lastUsage").desc_nulls_last(),
|
||||
col("pidType").asc_nulls_last(),
|
||||
col("collectedfrom").desc_nulls_last(),
|
||||
col("date").asc_nulls_last(),
|
||||
col("id").asc_nulls_last());
|
||||
|
||||
Dataset<Relation> output = rawMergeRels
|
||||
.join(pivotHistory, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "full")
|
||||
.join(pivotingData, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
|
||||
.withColumn("pivot", functions.first("id").over(w))
|
||||
.withColumn("pivotDedupId", functions.first("dedupId").over(w))
|
||||
.withColumn("position", functions.row_number().over(w))
|
||||
.filter(cut > 0 ? col("position").lt(lit(cut)) : lit(true))
|
||||
// .select("id", "groupId", "collectedfrom", "pivot", "dedupId", "pivotDedupId")
|
||||
// .distinct()
|
||||
.flatMap(
|
||||
(FlatMapFunction<ConnectedComponent, Relation>) cc -> ccToMergeRel(cc, dedupConf),
|
||||
Encoders.bean(Relation.class));
|
||||
(FlatMapFunction<Row, Tuple3<String, String, String>>) (Row r) -> {
|
||||
String id = r.getAs("id");
|
||||
String pivot = r.getAs("pivot");
|
||||
String pivotDedupId = r.getAs("pivotDedupId"); // dedupId associated with the pivot
|
||||
String dedupId = r.getAs("dedupId"); // dedupId associated with this id if it was a pivot
|
||||
|
||||
saveParquet(mergeRels, mergeRelPath, SaveMode.Overwrite);
|
||||
// filter out id == pivotDedupId
|
||||
// those are caused by claim expressed on pivotDedupId
|
||||
// information will be merged after creating deduprecord
|
||||
if (id.equals(pivotDedupId)) {
|
||||
return Collections.emptyIterator();
|
||||
}
|
||||
|
||||
ArrayList<Tuple3<String, String, String>> res = new ArrayList<>();
|
||||
|
||||
// singleton pivots have null groupId as they do not match rawMergeRels
|
||||
if (r.isNullAt(r.fieldIndex("groupId"))) {
|
||||
// the record is existing if it matches pivotingData
|
||||
if (!r.isNullAt(r.fieldIndex("collectedfrom"))) {
|
||||
// create relation with old dedup id
|
||||
res.add(new Tuple3<>(id, dedupId, null));
|
||||
}
|
||||
return res.iterator();
|
||||
}
|
||||
|
||||
// new pivot, assign pivotDedupId with current IdGenerator
|
||||
if (StringUtils.isBlank(pivotDedupId)) {
|
||||
pivotDedupId = IdGenerator.generate(pivot);
|
||||
}
|
||||
|
||||
// this was a pivot in a preceding graph but it has been merged into a new group with different
|
||||
// pivot
|
||||
if (StringUtils.isNotBlank(dedupId) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) {
|
||||
// materialize the previous dedup record as a merge relation with the new one
|
||||
res.add(new Tuple3<>(dedupId, pivotDedupId, null));
|
||||
}
|
||||
|
||||
// add merge relations
|
||||
res.add(new Tuple3<>(id, pivotDedupId, pivot));
|
||||
|
||||
return res.iterator();
|
||||
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()))
|
||||
.distinct()
|
||||
.flatMap(
|
||||
(FlatMapFunction<Tuple3<String, String, String>, Relation>) (Tuple3<String, String, String> r) -> {
|
||||
String id = r._1();
|
||||
String dedupId = r._2();
|
||||
String pivot = r._3();
|
||||
|
||||
ArrayList<Relation> res = new ArrayList<>();
|
||||
res.add(rel(pivot, dedupId, id, ModelConstants.MERGES, dedupConf));
|
||||
res.add(rel(pivot, id, dedupId, ModelConstants.IS_MERGED_IN, dedupConf));
|
||||
|
||||
return res.iterator();
|
||||
}, Encoders.bean(Relation.class));
|
||||
|
||||
saveParquet(output, mergeRelPath, SaveMode.Overwrite);
|
||||
}
|
||||
}
|
||||
|
||||
private <T extends OafEntity> ConnectedComponent generateID(String key, Iterator<Tuple2<String, T>> values) {
|
||||
|
||||
List<Identifier<T>> identifiers = Lists
|
||||
.newArrayList(values)
|
||||
.stream()
|
||||
.map(v -> Identifier.newInstance(v._2()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
String rootID = IdGenerator.generate(identifiers, key);
|
||||
|
||||
if (Objects.equals(rootID, key))
|
||||
throw new IllegalStateException("generated default ID: " + rootID);
|
||||
|
||||
return new ConnectedComponent(rootID,
|
||||
identifiers.stream().map(i -> i.getEntity().getId()).collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
private JavaPairRDD<Object, String> createVertexes(JavaSparkContext sc, String graphBasePath, String subEntity,
|
||||
DedupConfig dedupConf) {
|
||||
|
||||
return sc
|
||||
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||
.mapToPair(json -> {
|
||||
String id = MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), json);
|
||||
return new Tuple2<>(hash(id), id);
|
||||
});
|
||||
}
|
||||
|
||||
private Iterator<Tuple2<String, String>> ccToRels(ConnectedComponent cc) {
|
||||
return cc
|
||||
.getIds()
|
||||
.stream()
|
||||
.map(id -> new Tuple2<>(cc.getCcId(), id))
|
||||
.iterator();
|
||||
}
|
||||
|
||||
private Iterator<Relation> ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) {
|
||||
return cc
|
||||
.getIds()
|
||||
.stream()
|
||||
.flatMap(
|
||||
id -> {
|
||||
List<Relation> tmp = new ArrayList<>();
|
||||
|
||||
tmp.add(rel(cc.getCcId(), id, ModelConstants.MERGES, dedupConf));
|
||||
tmp.add(rel(id, cc.getCcId(), ModelConstants.IS_MERGED_IN, dedupConf));
|
||||
|
||||
return tmp.stream();
|
||||
})
|
||||
.iterator();
|
||||
}
|
||||
|
||||
private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) {
|
||||
private static Relation rel(String pivot, String source, String target, String relClass, DedupConfig dedupConf) {
|
||||
|
||||
String entityType = dedupConf.getWf().getEntityType();
|
||||
|
||||
|
@ -238,6 +294,14 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
|||
// TODO calculate the trust value based on the similarity score of the elements in the CC
|
||||
|
||||
r.setDataInfo(info);
|
||||
|
||||
if (pivot != null) {
|
||||
KeyValue pivotKV = new KeyValue();
|
||||
pivotKV.setKey("pivot");
|
||||
pivotKV.setValue(pivot);
|
||||
|
||||
r.setProperties(Arrays.asList(pivotKV));
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
|
|
|
@ -91,18 +91,12 @@ public class SparkWhitelistSimRels extends AbstractSparkAction {
|
|||
Dataset<Row> entities = spark
|
||||
.read()
|
||||
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||
.repartition(numPartitions)
|
||||
.withColumn("id", functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()));
|
||||
.select(functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()).as("id"))
|
||||
.distinct();
|
||||
|
||||
Dataset<Row> whiteListRels1 = whiteListRels
|
||||
.join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "inner")
|
||||
.select("from", "to");
|
||||
|
||||
Dataset<Row> whiteListRels2 = whiteListRels1
|
||||
.join(entities, whiteListRels1.col("to").equalTo(entities.col("id")), "inner")
|
||||
.select("from", "to");
|
||||
|
||||
Dataset<Relation> whiteListSimRels = whiteListRels2
|
||||
Dataset<Relation> whiteListSimRels = whiteListRels
|
||||
.join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "leftsemi")
|
||||
.join(entities, functions.col("to").equalTo(entities.col("id")), "leftsemi")
|
||||
.map(
|
||||
(MapFunction<Row, Relation>) r -> DedupUtility
|
||||
.createSimRel(r.getString(0), r.getString(1), entity),
|
||||
|
|
|
@ -1,100 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.dedup.graph;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import eu.dnetlib.pace.util.PaceException;
|
||||
|
||||
public class ConnectedComponent implements Serializable {
|
||||
|
||||
private String ccId;
|
||||
private Set<String> ids;
|
||||
|
||||
private static final String CONNECTED_COMPONENT_ID_PREFIX = "connect_comp";
|
||||
|
||||
public ConnectedComponent(Set<String> ids, final int cut) {
|
||||
this.ids = ids;
|
||||
|
||||
this.ccId = createDefaultID();
|
||||
|
||||
if (cut > 0 && ids.size() > cut) {
|
||||
this.ids = ids
|
||||
.stream()
|
||||
.filter(id -> !ccId.equalsIgnoreCase(id))
|
||||
.limit(cut - 1)
|
||||
.collect(Collectors.toSet());
|
||||
// this.ids.add(ccId); ??
|
||||
}
|
||||
}
|
||||
|
||||
public ConnectedComponent(String ccId, Set<String> ids) {
|
||||
this.ccId = ccId;
|
||||
this.ids = ids;
|
||||
}
|
||||
|
||||
public String createDefaultID() {
|
||||
if (ids.size() > 1) {
|
||||
final String s = getMin();
|
||||
String prefix = s.split("\\|")[0];
|
||||
ccId = prefix + "|" + CONNECTED_COMPONENT_ID_PREFIX + "::" + DHPUtils.md5(s);
|
||||
return ccId;
|
||||
} else {
|
||||
return ids.iterator().next();
|
||||
}
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public String getMin() {
|
||||
|
||||
final StringBuilder min = new StringBuilder();
|
||||
|
||||
ids
|
||||
.forEach(
|
||||
id -> {
|
||||
if (StringUtils.isBlank(min.toString())) {
|
||||
min.append(id);
|
||||
} else {
|
||||
if (min.toString().compareTo(id) > 0) {
|
||||
min.setLength(0);
|
||||
min.append(id);
|
||||
}
|
||||
}
|
||||
});
|
||||
return min.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
try {
|
||||
return mapper.writeValueAsString(this);
|
||||
} catch (IOException e) {
|
||||
throw new PaceException("Failed to create Json: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
public Set<String> getIds() {
|
||||
return ids;
|
||||
}
|
||||
|
||||
public void setIds(Set<String> ids) {
|
||||
this.ids = ids;
|
||||
}
|
||||
|
||||
public String getCcId() {
|
||||
return ccId;
|
||||
}
|
||||
|
||||
public void setCcId(String ccId) {
|
||||
this.ccId = ccId;
|
||||
}
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
package eu.dnetlib.dhp.oa.dedup.graph
|
||||
|
||||
import org.apache.spark.graphx._
|
||||
import org.apache.spark.rdd.RDD
|
||||
|
||||
import scala.collection.JavaConversions;
|
||||
|
||||
object GraphProcessor {
|
||||
|
||||
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
|
||||
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
|
||||
val cc = graph.connectedComponents(maxIterations).vertices
|
||||
|
||||
val joinResult = vertexes.leftOuterJoin(cc).map {
|
||||
case (id, (openaireId, cc)) => {
|
||||
if (cc.isEmpty) {
|
||||
(id, openaireId)
|
||||
}
|
||||
else {
|
||||
(cc.get, openaireId)
|
||||
}
|
||||
}
|
||||
}
|
||||
val connectedComponents = joinResult.groupByKey()
|
||||
.map[ConnectedComponent](cc => asConnectedComponent(cc, cut))
|
||||
connectedComponents
|
||||
}
|
||||
|
||||
|
||||
|
||||
def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = {
|
||||
val docs = group._2.toSet[String]
|
||||
val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut);
|
||||
connectedComponent
|
||||
}
|
||||
|
||||
}
|
|
@ -3,21 +3,21 @@ package eu.dnetlib.dhp.oa.dedup.model;
|
|||
|
||||
import java.io.Serializable;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.time.LocalDate;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import eu.dnetlib.dhp.oa.dedup.DatePicker;
|
||||
import eu.dnetlib.dhp.oa.dedup.IdentifierComparator;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidComparator;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||
|
||||
public class Identifier<T extends OafEntity> implements Serializable, Comparable<Identifier<T>> {
|
||||
|
@ -50,7 +50,7 @@ public class Identifier<T extends OafEntity> implements Serializable, Comparable
|
|||
if (Objects.nonNull(date)) {
|
||||
return date;
|
||||
} else {
|
||||
String sDate = BASE_DATE;
|
||||
String sDate = LocalDate.now().plusDays(1).toString();
|
||||
if (ModelSupport.isSubClass(getEntity(), Result.class)) {
|
||||
Result result = (Result) getEntity();
|
||||
if (isWellformed(result.getDateofacceptance())) {
|
||||
|
|
|
@ -28,5 +28,17 @@
|
|||
"paramLongName": "workingPath",
|
||||
"paramDescription": "path for the working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName":"h",
|
||||
"paramLongName":"hiveMetastoreUris",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "p",
|
||||
"paramLongName": "pivotHistoryDatabase",
|
||||
"paramDescription": "Pivot history database",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -15,4 +15,8 @@
|
|||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -188,6 +188,8 @@
|
|||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
<arg>--pivotHistoryDatabase</arg><arg>${pivotHistoryDatabase}</arg>
|
||||
</spark>
|
||||
<ok to="CreateDedupRecord"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -0,0 +1,335 @@
|
|||
/** Copyright (c) 2017 Kwartile, Inc., http://www.kwartile.com
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to deal
|
||||
* in the Software without restriction, including without limitation the rights
|
||||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*/
|
||||
|
||||
/** Map-reduce implementation of Connected Component
|
||||
* Given lists of subgraphs, returns all the nodes that are connected.
|
||||
*/
|
||||
|
||||
package com.kwartile.lib.cc
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{Dataset, Row, SparkSession}
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.mutable
|
||||
|
||||
object ConnectedComponent extends Serializable {
|
||||
|
||||
/** Applies Small Star operation on RDD of nodePairs
|
||||
*
|
||||
* @param nodePairs on which to apply Small Star operations
|
||||
* @return new nodePairs after the operation and conncectivy change count
|
||||
*/
|
||||
private def smallStar(nodePairs: RDD[(Long, Long)]): (RDD[(Long, Long)], Long) = {
|
||||
|
||||
/** generate RDD of (self, List(neighbors)) where self > neighbors
|
||||
* E.g.: nodePairs (1, 4), (6, 1), (3, 2), (6, 5)
|
||||
* will result into (4, List(1)), (6, List(1)), (3, List(2)), (6, List(5))
|
||||
*/
|
||||
val neighbors = nodePairs.map(x => {
|
||||
val (self, neighbor) = (x._1, x._2)
|
||||
if (self > neighbor)
|
||||
(self, neighbor)
|
||||
else
|
||||
(neighbor, self)
|
||||
})
|
||||
|
||||
/** reduce on self to get list of all its neighbors.
|
||||
* E.g: (4, List(1)), (6, List(1)), (3, List(2)), (6, List(5))
|
||||
* will result into (4, List(1)), (6, List(1, 5)), (3, List(2))
|
||||
* Note:
|
||||
* (1) you may need to tweak number of partitions.
|
||||
* (2) also, watch out for data skew. In that case, consider using rangePartitioner
|
||||
*/
|
||||
val empty = mutable.HashSet[Long]()
|
||||
val allNeighbors = neighbors.aggregateByKey(empty)(
|
||||
(lb, v) => lb += v,
|
||||
(lb1, lb2) => lb1 ++ lb2
|
||||
)
|
||||
|
||||
/** Apply Small Star operation on (self, List(neighbor)) to get newNodePairs and count the change in connectivity
|
||||
*/
|
||||
|
||||
val newNodePairsWithChangeCount = allNeighbors
|
||||
.map(x => {
|
||||
val self = x._1
|
||||
val neighbors = x._2.toList
|
||||
val minNode = argMin(self :: neighbors)
|
||||
val newNodePairs = (self :: neighbors)
|
||||
.map(neighbor => {
|
||||
(neighbor, minNode)
|
||||
})
|
||||
.filter(x => {
|
||||
val neighbor = x._1
|
||||
val minNode = x._2
|
||||
(neighbor <= self && neighbor != minNode) || (self == neighbor)
|
||||
})
|
||||
val uniqueNewNodePairs = newNodePairs.toSet.toList
|
||||
|
||||
/** We count the change by taking a diff of the new node pairs with the old node pairs
|
||||
*/
|
||||
val connectivityChangeCount = (uniqueNewNodePairs diff neighbors.map((self, _))).length
|
||||
(uniqueNewNodePairs, connectivityChangeCount)
|
||||
})
|
||||
.persist(StorageLevel.MEMORY_AND_DISK_SER)
|
||||
|
||||
/** Sum all the changeCounts
|
||||
*/
|
||||
val totalConnectivityCountChange = newNodePairsWithChangeCount
|
||||
.mapPartitions(iter => {
|
||||
val (v, l) = iter.toSeq.unzip
|
||||
val sum = l.sum
|
||||
Iterator(sum)
|
||||
})
|
||||
.sum
|
||||
.toLong
|
||||
|
||||
val newNodePairs = newNodePairsWithChangeCount.map(x => x._1).flatMap(x => x)
|
||||
newNodePairsWithChangeCount.unpersist(false)
|
||||
(newNodePairs, totalConnectivityCountChange)
|
||||
}
|
||||
|
||||
/** Apply Large Star operation on a RDD of nodePairs
|
||||
*
|
||||
* @param nodePairs on which to apply Large Star operations
|
||||
* @return new nodePairs after the operation and conncectivy change count
|
||||
*/
|
||||
private def largeStar(nodePairs: RDD[(Long, Long)]): (RDD[(Long, Long)], Long) = {
|
||||
|
||||
/** generate RDD of (self, List(neighbors))
|
||||
* E.g.: nodePairs (1, 4), (6, 1), (3, 2), (6, 5)
|
||||
* will result into (4, List(1)), (1, List(4)), (6, List(1)), (1, List(6)), (3, List(2)), (2, List(3)), (6, List(5)), (5, List(6))
|
||||
*/
|
||||
|
||||
val neighbors = nodePairs.flatMap(x => {
|
||||
val (self, neighbor) = (x._1, x._2)
|
||||
if (self == neighbor)
|
||||
List((self, neighbor))
|
||||
else
|
||||
List((self, neighbor), (neighbor, self))
|
||||
})
|
||||
|
||||
/** reduce on self to get list of all its neighbors.
|
||||
* E.g: (4, List(1)), (1, List(4)), (6, List(1)), (1, List(6)), (3, List(2)), (2, List(3)), (6, List(5)), (5, List(6))
|
||||
* will result into (4, List(1)), (1, List(4, 6)), (6, List(1, 5)), (3, List(2)), (2, List(3)), (5, List(6))
|
||||
* Note:
|
||||
* (1) you may need to tweak number of partitions.
|
||||
* (2) also, watch out for data skew. In that case, consider using rangePartitioner
|
||||
*/
|
||||
|
||||
val localAdd = (s: mutable.HashSet[Long], v: Long) => s += v
|
||||
val partitionAdd = (s1: mutable.HashSet[Long], s2: mutable.HashSet[Long]) => s1 ++= s2
|
||||
val allNeighbors =
|
||||
neighbors.aggregateByKey(mutable.HashSet.empty[Long] /*, rangePartitioner*/ )(localAdd, partitionAdd)
|
||||
|
||||
/** Apply Large Star operation on (self, List(neighbor)) to get newNodePairs and count the change in connectivity
|
||||
*/
|
||||
|
||||
val newNodePairsWithChangeCount = allNeighbors
|
||||
.map(x => {
|
||||
val self = x._1
|
||||
val neighbors = x._2.toList
|
||||
val minNode = argMin(self :: neighbors)
|
||||
val newNodePairs = (self :: neighbors)
|
||||
.map(neighbor => {
|
||||
(neighbor, minNode)
|
||||
})
|
||||
.filter(x => {
|
||||
val neighbor = x._1
|
||||
val minNode = x._2
|
||||
neighbor > self || neighbor == minNode
|
||||
})
|
||||
|
||||
val uniqueNewNodePairs = newNodePairs.toSet.toList
|
||||
val connectivityChangeCount = (uniqueNewNodePairs diff neighbors.map((self, _))).length
|
||||
(uniqueNewNodePairs, connectivityChangeCount)
|
||||
})
|
||||
.persist(StorageLevel.MEMORY_AND_DISK_SER)
|
||||
|
||||
val totalConnectivityCountChange = newNodePairsWithChangeCount
|
||||
.mapPartitions(iter => {
|
||||
val (v, l) = iter.toSeq.unzip
|
||||
val sum = l.sum
|
||||
Iterator(sum)
|
||||
})
|
||||
.sum
|
||||
.toLong
|
||||
|
||||
/** Sum all the changeCounts
|
||||
*/
|
||||
val newNodePairs = newNodePairsWithChangeCount.map(x => x._1).flatMap(x => x)
|
||||
newNodePairsWithChangeCount.unpersist(false)
|
||||
(newNodePairs, totalConnectivityCountChange)
|
||||
}
|
||||
|
||||
private def argMin(nodes: List[Long]): Long = {
|
||||
nodes.min(Ordering.by((node: Long) => node))
|
||||
}
|
||||
|
||||
/** Build nodePairs given a list of nodes. A list of nodes represents a subgraph.
|
||||
*
|
||||
* @param nodes that are part of a subgraph
|
||||
* @return nodePairs for a subgraph
|
||||
*/
|
||||
private def buildPairs(nodes: List[Long]): List[(Long, Long)] = {
|
||||
buildPairs(nodes.head, nodes.tail, null.asInstanceOf[List[(Long, Long)]])
|
||||
}
|
||||
|
||||
@tailrec
|
||||
private def buildPairs(node: Long, neighbors: List[Long], partialPairs: List[(Long, Long)]): List[(Long, Long)] = {
|
||||
if (neighbors.isEmpty) {
|
||||
if (partialPairs != null)
|
||||
List((node, node)) ::: partialPairs
|
||||
else
|
||||
List((node, node))
|
||||
} else if (neighbors.length == 1) {
|
||||
val neighbor = neighbors(0)
|
||||
if (node > neighbor)
|
||||
if (partialPairs != null) List((node, neighbor)) ::: partialPairs else List((node, neighbor))
|
||||
else if (partialPairs != null) List((neighbor, node)) ::: partialPairs
|
||||
else List((neighbor, node))
|
||||
} else {
|
||||
val newPartialPairs = neighbors
|
||||
.map(neighbor => {
|
||||
if (node > neighbor)
|
||||
List((node, neighbor))
|
||||
else
|
||||
List((neighbor, node))
|
||||
})
|
||||
.flatMap(x => x)
|
||||
|
||||
if (partialPairs != null)
|
||||
buildPairs(neighbors.head, neighbors.tail, newPartialPairs ::: partialPairs)
|
||||
else
|
||||
buildPairs(neighbors.head, neighbors.tail, newPartialPairs)
|
||||
}
|
||||
}
|
||||
|
||||
/** Implements alternatingAlgo. Converges when the changeCount is either 0 or does not change from the previous iteration
|
||||
*
|
||||
* @param nodePairs for a graph
|
||||
* @param largeStarConnectivityChangeCount change count that resulted from the previous iteration
|
||||
* @param smallStarConnectivityChangeCount change count that resulted from the previous iteration
|
||||
* @param didConverge flag to indicate the alorigth converged
|
||||
* @param currIterationCount counter to capture number of iterations
|
||||
* @param maxIterationCount maximum number iterations to try before giving up
|
||||
* @return RDD of nodePairs
|
||||
*/
|
||||
|
||||
@tailrec
|
||||
private def alternatingAlgo(
|
||||
nodePairs: RDD[(Long, Long)],
|
||||
largeStarConnectivityChangeCount: Long,
|
||||
smallStarConnectivityChangeCount: Long,
|
||||
didConverge: Boolean,
|
||||
currIterationCount: Int,
|
||||
maxIterationCount: Int
|
||||
): (RDD[(Long, Long)], Boolean, Long) = {
|
||||
|
||||
val iterationCount = currIterationCount + 1
|
||||
if (didConverge)
|
||||
(nodePairs, true, currIterationCount)
|
||||
else if (currIterationCount >= maxIterationCount) {
|
||||
(nodePairs, false, currIterationCount)
|
||||
} else {
|
||||
|
||||
val (nodePairsLargeStar, currLargeStarConnectivityChangeCount) = largeStar(nodePairs)
|
||||
val (nodePairsSmallStar, currSmallStarConnectivityChangeCount) = smallStar(nodePairsLargeStar)
|
||||
|
||||
if (
|
||||
(currLargeStarConnectivityChangeCount == largeStarConnectivityChangeCount &&
|
||||
currSmallStarConnectivityChangeCount == smallStarConnectivityChangeCount) ||
|
||||
(currSmallStarConnectivityChangeCount == 0 && currLargeStarConnectivityChangeCount == 0)
|
||||
) {
|
||||
alternatingAlgo(
|
||||
nodePairsSmallStar,
|
||||
currLargeStarConnectivityChangeCount,
|
||||
currSmallStarConnectivityChangeCount,
|
||||
true,
|
||||
iterationCount,
|
||||
maxIterationCount
|
||||
)
|
||||
} else {
|
||||
alternatingAlgo(
|
||||
nodePairsSmallStar,
|
||||
currLargeStarConnectivityChangeCount,
|
||||
currSmallStarConnectivityChangeCount,
|
||||
false,
|
||||
iterationCount,
|
||||
maxIterationCount
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Driver function
|
||||
*
|
||||
* @param cliques list of nodes representing subgraphs (or cliques)
|
||||
* @param maxIterationCount maximum number iterations to try before giving up
|
||||
* @return Connected Components as nodePairs where second member of the nodePair is the minimum node in the component
|
||||
*/
|
||||
def run(cliques: RDD[List[Long]], maxIterationCount: Int): (RDD[(Long, Long)], Boolean, Long) = {
|
||||
|
||||
val nodePairs = cliques
|
||||
.map(aClique => {
|
||||
buildPairs(aClique)
|
||||
})
|
||||
.flatMap(x => x)
|
||||
|
||||
val (cc, didConverge, iterCount) = alternatingAlgo(nodePairs, 9999999L, 9999999L, false, 0, maxIterationCount)
|
||||
|
||||
if (didConverge) {
|
||||
(cc, didConverge, iterCount)
|
||||
} else {
|
||||
(null.asInstanceOf[RDD[(Long, Long)]], didConverge, iterCount)
|
||||
}
|
||||
}
|
||||
|
||||
def runOnPairs(nodePairs: RDD[(Long, Long)], maxIterationCount: Int): (RDD[(Long, Long)], Boolean, Long) = {
|
||||
val (cc, didConverge, iterCount) = alternatingAlgo(nodePairs, 9999999L, 9999999L, false, 0, maxIterationCount)
|
||||
|
||||
if (didConverge) {
|
||||
(cc, didConverge, iterCount)
|
||||
} else {
|
||||
(null.asInstanceOf[RDD[(Long, Long)]], didConverge, iterCount)
|
||||
}
|
||||
}
|
||||
|
||||
def runOnPairs(nodePairs: Dataset[Row], maxIterationCount: Int)(implicit spark: SparkSession): Dataset[Row] = {
|
||||
import spark.implicits._
|
||||
|
||||
val (cc, didConverge, iterCount) = alternatingAlgo(
|
||||
nodePairs.map(e => (e.getLong(0), e.getLong(1))).rdd,
|
||||
9999999L,
|
||||
9999999L,
|
||||
false,
|
||||
0,
|
||||
maxIterationCount
|
||||
)
|
||||
|
||||
if (didConverge) {
|
||||
cc.toDF("vertexId", "groupId")
|
||||
} else {
|
||||
null.asInstanceOf[Dataset[Row]]
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -41,9 +41,13 @@ import com.google.common.collect.Sets;
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.schema.sx.OafUtils;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import scala.Tuple2;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
|
@ -97,6 +101,7 @@ public class SparkDedupTest implements Serializable {
|
|||
|
||||
final SparkConf conf = new SparkConf();
|
||||
conf.set("spark.sql.shuffle.partitions", "200");
|
||||
conf.set("spark.sql.warehouse.dir", testOutputBasePath + "/spark-warehouse");
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(SparkDedupTest.class.getSimpleName())
|
||||
|
@ -186,11 +191,11 @@ public class SparkDedupTest implements Serializable {
|
|||
System.out.println("ds_simrel = " + ds_simrel);
|
||||
System.out.println("orp_simrel = " + orp_simrel);
|
||||
|
||||
assertEquals(1538, orgs_simrel);
|
||||
assertEquals(3523, pubs_simrel);
|
||||
assertEquals(168, sw_simrel);
|
||||
assertEquals(221, ds_simrel);
|
||||
assertEquals(3392, orp_simrel);
|
||||
assertEquals(751, orgs_simrel);
|
||||
assertEquals(546, pubs_simrel);
|
||||
assertEquals(113, sw_simrel);
|
||||
assertEquals(148, ds_simrel);
|
||||
assertEquals(280, orp_simrel);
|
||||
|
||||
}
|
||||
|
||||
|
@ -235,10 +240,10 @@ public class SparkDedupTest implements Serializable {
|
|||
.count();
|
||||
|
||||
// entities simrels supposed to be equal to the number of previous step (no rels in whitelist)
|
||||
assertEquals(1538, orgs_simrel);
|
||||
assertEquals(3523, pubs_simrel);
|
||||
assertEquals(221, ds_simrel);
|
||||
assertEquals(3392, orp_simrel);
|
||||
assertEquals(751, orgs_simrel);
|
||||
assertEquals(546, pubs_simrel);
|
||||
assertEquals(148, ds_simrel);
|
||||
assertEquals(280, orp_simrel);
|
||||
// System.out.println("orgs_simrel = " + orgs_simrel);
|
||||
// System.out.println("pubs_simrel = " + pubs_simrel);
|
||||
// System.out.println("ds_simrel = " + ds_simrel);
|
||||
|
@ -268,7 +273,7 @@ public class SparkDedupTest implements Serializable {
|
|||
&& rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1]))
|
||||
.count() > 0);
|
||||
|
||||
assertEquals(170, sw_simrel.count());
|
||||
assertEquals(115, sw_simrel.count());
|
||||
// System.out.println("sw_simrel = " + sw_simrel.count());
|
||||
|
||||
}
|
||||
|
@ -292,7 +297,9 @@ public class SparkDedupTest implements Serializable {
|
|||
"-w",
|
||||
testOutputBasePath,
|
||||
"-cc",
|
||||
"3"
|
||||
"3",
|
||||
"-h",
|
||||
""
|
||||
});
|
||||
|
||||
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
|
||||
|
@ -365,6 +372,113 @@ public class SparkDedupTest implements Serializable {
|
|||
.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(3)
|
||||
void createMergeRelsWithPivotHistoryTest() throws Exception {
|
||||
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"));
|
||||
|
||||
spark.sql("CREATE DATABASE IF NOT EXISTS pivot_history_test");
|
||||
ModelSupport.oafTypes.keySet().forEach(entityType -> {
|
||||
try {
|
||||
spark
|
||||
.read()
|
||||
.json(
|
||||
Paths
|
||||
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/pivot_history").toURI())
|
||||
.toFile()
|
||||
.getAbsolutePath())
|
||||
.write()
|
||||
.mode("overwrite")
|
||||
.saveAsTable("pivot_history_test." + entityType);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
parser
|
||||
.parseArgument(
|
||||
new String[] {
|
||||
"-i",
|
||||
testGraphBasePath,
|
||||
"-asi",
|
||||
testActionSetId,
|
||||
"-la",
|
||||
"lookupurl",
|
||||
"-w",
|
||||
testOutputBasePath,
|
||||
"-h",
|
||||
"",
|
||||
"-pivotHistoryDatabase",
|
||||
"pivot_history_test"
|
||||
|
||||
});
|
||||
|
||||
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
|
||||
|
||||
long orgs_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
|
||||
.count();
|
||||
final Dataset<Relation> pubs = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
|
||||
.as(Encoders.bean(Relation.class));
|
||||
long sw_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
|
||||
.count();
|
||||
long ds_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
|
||||
.count();
|
||||
|
||||
long orp_mergerel = spark
|
||||
.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
|
||||
.count();
|
||||
|
||||
final List<Relation> merges = pubs
|
||||
.filter("source == '50|arXiv_dedup_::c93aeb433eb90ed7a86e29be00791b7c'")
|
||||
.collectAsList();
|
||||
assertEquals(3, merges.size());
|
||||
Set<String> dups = Sets
|
||||
.newHashSet(
|
||||
"50|doi_________::3b1d0d8e8f930826665df9d6b82fbb73",
|
||||
"50|doi_________::d5021b53204e4fdeab6ff5d5bc468032",
|
||||
"50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c");
|
||||
merges.forEach(r -> {
|
||||
assertEquals(ModelConstants.RESULT_RESULT, r.getRelType());
|
||||
assertEquals(ModelConstants.DEDUP, r.getSubRelType());
|
||||
assertEquals(ModelConstants.MERGES, r.getRelClass());
|
||||
assertTrue(dups.contains(r.getTarget()));
|
||||
});
|
||||
|
||||
final List<Relation> mergedIn = pubs
|
||||
.filter("target == '50|arXiv_dedup_::c93aeb433eb90ed7a86e29be00791b7c'")
|
||||
.collectAsList();
|
||||
assertEquals(3, mergedIn.size());
|
||||
mergedIn.forEach(r -> {
|
||||
assertEquals(ModelConstants.RESULT_RESULT, r.getRelType());
|
||||
assertEquals(ModelConstants.DEDUP, r.getSubRelType());
|
||||
assertEquals(ModelConstants.IS_MERGED_IN, r.getRelClass());
|
||||
assertTrue(dups.contains(r.getSource()));
|
||||
});
|
||||
|
||||
assertEquals(1268, orgs_mergerel);
|
||||
assertEquals(1112, pubs.count());
|
||||
assertEquals(292, sw_mergerel);
|
||||
assertEquals(476, ds_mergerel);
|
||||
assertEquals(742, orp_mergerel);
|
||||
// System.out.println("orgs_mergerel = " + orgs_mergerel);
|
||||
// System.out.println("pubs_mergerel = " + pubs_mergerel);
|
||||
// System.out.println("sw_mergerel = " + sw_mergerel);
|
||||
// System.out.println("ds_mergerel = " + ds_mergerel);
|
||||
// System.out.println("orp_mergerel = " + orp_mergerel);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(4)
|
||||
void createMergeRelsTest() throws Exception {
|
||||
|
@ -382,7 +496,9 @@ public class SparkDedupTest implements Serializable {
|
|||
"-la",
|
||||
"lookupurl",
|
||||
"-w",
|
||||
testOutputBasePath
|
||||
testOutputBasePath,
|
||||
"-h",
|
||||
""
|
||||
});
|
||||
|
||||
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
|
||||
|
@ -437,10 +553,10 @@ public class SparkDedupTest implements Serializable {
|
|||
});
|
||||
|
||||
assertEquals(1268, orgs_mergerel);
|
||||
assertEquals(1450, pubs.count());
|
||||
assertEquals(286, sw_mergerel);
|
||||
assertEquals(472, ds_mergerel);
|
||||
assertEquals(738, orp_mergerel);
|
||||
assertEquals(1112, pubs.count());
|
||||
assertEquals(292, sw_mergerel);
|
||||
assertEquals(476, ds_mergerel);
|
||||
assertEquals(742, orp_mergerel);
|
||||
// System.out.println("orgs_mergerel = " + orgs_mergerel);
|
||||
// System.out.println("pubs_mergerel = " + pubs_mergerel);
|
||||
// System.out.println("sw_mergerel = " + sw_mergerel);
|
||||
|
@ -492,10 +608,10 @@ public class SparkDedupTest implements Serializable {
|
|||
.count();
|
||||
|
||||
assertEquals(86, orgs_deduprecord);
|
||||
assertEquals(68, pubs.count());
|
||||
assertEquals(49, sw_deduprecord);
|
||||
assertEquals(91, pubs.count());
|
||||
assertEquals(47, sw_deduprecord);
|
||||
assertEquals(97, ds_deduprecord);
|
||||
assertEquals(92, orp_deduprecord);
|
||||
assertEquals(93, orp_deduprecord);
|
||||
|
||||
verifyRoot_1(mapper, pubs);
|
||||
|
||||
|
@ -629,13 +745,13 @@ public class SparkDedupTest implements Serializable {
|
|||
.distinct()
|
||||
.count();
|
||||
|
||||
assertEquals(902, publications);
|
||||
assertEquals(925, publications);
|
||||
assertEquals(839, organizations);
|
||||
assertEquals(100, projects);
|
||||
assertEquals(100, datasource);
|
||||
assertEquals(198, softwares);
|
||||
assertEquals(196, softwares);
|
||||
assertEquals(389, dataset);
|
||||
assertEquals(520, otherresearchproduct);
|
||||
assertEquals(521, otherresearchproduct);
|
||||
|
||||
// System.out.println("publications = " + publications);
|
||||
// System.out.println("organizations = " + organizations);
|
||||
|
|
|
@ -101,7 +101,8 @@
|
|||
"type" : "String",
|
||||
"path" : "$.title[?(@.qualifier.classid == 'main title')].value",
|
||||
"length" : 250,
|
||||
"size" : 5
|
||||
"size" : 5,
|
||||
"clean": "title"
|
||||
},
|
||||
{
|
||||
"name" : "authors",
|
||||
|
|
|
@ -101,7 +101,8 @@
|
|||
"type" : "String",
|
||||
"path" : "$.title[?(@.qualifier.classid == 'main title')].value",
|
||||
"length" : 250,
|
||||
"size" : 5
|
||||
"size" : 5,
|
||||
"clean": "title"
|
||||
},
|
||||
{
|
||||
"name" : "authors",
|
||||
|
|
|
@ -29,9 +29,8 @@
|
|||
},
|
||||
"pace": {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||
{ "name" : "numAuthorsTitleSuffixPrefixChain", "fields" : [ "num_authors", "title" ], "params" : { "mod" : "10" } },
|
||||
{ "name" : "jsonlistclustering", "fields" : [ "pid" ], "params" : { "jpath_value": "$.value", "jpath_classid": "$.qualifier.classid"} }
|
||||
],
|
||||
"decisionTree": {
|
||||
"start": {
|
||||
|
@ -79,13 +78,37 @@
|
|||
"ignoreUndefined": "false"
|
||||
},
|
||||
"layer3": {
|
||||
"fields": [
|
||||
{
|
||||
"field": "authors",
|
||||
"comparator": "authorsMatch",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "false",
|
||||
"params": {
|
||||
"surname_th": 0.75,
|
||||
"fullname_th": 0.75,
|
||||
"threshold": 0.6,
|
||||
"mode": "full"
|
||||
}
|
||||
}
|
||||
],
|
||||
"threshold": 0.6,
|
||||
"aggregation": "MAX",
|
||||
"positive": "layer4",
|
||||
"negative": "NO_MATCH",
|
||||
"undefined": "MATCH",
|
||||
"ignoreUndefined": "true"
|
||||
},
|
||||
"layer4": {
|
||||
"fields": [
|
||||
{
|
||||
"field": "title",
|
||||
"comparator": "levensteinTitle",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "true",
|
||||
"params": {}
|
||||
"params": {
|
||||
"threshold": "0.99"
|
||||
}
|
||||
}
|
||||
],
|
||||
"threshold": 0.99,
|
||||
|
@ -97,23 +120,25 @@
|
|||
}
|
||||
},
|
||||
"model": [
|
||||
{
|
||||
"name": "doi",
|
||||
"type": "String",
|
||||
"path": "$.pid[?(@.qualifier.classid == 'doi')].value"
|
||||
},
|
||||
{
|
||||
"name": "pid",
|
||||
"type": "JSON",
|
||||
"path": "$.pid",
|
||||
"overrideMatch": "true"
|
||||
},
|
||||
{
|
||||
"name": "alternateid",
|
||||
"type": "JSON",
|
||||
"path": "$.instance[*].alternateIdentifier[*]",
|
||||
"overrideMatch": "true"
|
||||
},
|
||||
{
|
||||
"name": "title",
|
||||
"type": "String",
|
||||
"path": "$.title[?(@.qualifier.classid == 'main title')].value",
|
||||
"length": 250,
|
||||
"size": 5
|
||||
"size": 5,
|
||||
"clean": "title"
|
||||
},
|
||||
{
|
||||
"name": "authors",
|
||||
|
@ -122,9 +147,9 @@
|
|||
"size": 200
|
||||
},
|
||||
{
|
||||
"name": "resulttype",
|
||||
"name": "num_authors",
|
||||
"type": "String",
|
||||
"path": "$.resulttype.classid"
|
||||
"path": "$.author.length()"
|
||||
}
|
||||
],
|
||||
"blacklists": {
|
||||
|
|
|
@ -75,7 +75,8 @@
|
|||
"type" : "String",
|
||||
"path" : "$.title[?(@.qualifier.classid == 'main title')].value",
|
||||
"length" : 250,
|
||||
"size" : 5
|
||||
"size" : 5,
|
||||
"clean": "title"
|
||||
},
|
||||
{
|
||||
"name" : "url",
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
{"id": "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c", "firstUsage": "2022-01-01", "lastUsage": "2022-01-01", "dedupId": "50|arXiv_dedup_::c93aeb433eb90ed7a86e29be00791b7c" }
|
20
pom.xml
20
pom.xml
|
@ -931,5 +931,25 @@
|
|||
-->
|
||||
</properties>
|
||||
</profile>
|
||||
|
||||
<!-- Activate ARM-compatible snappy dependency on new Silicon Macs -->
|
||||
<profile>
|
||||
<id>arm-silicon-mac</id>
|
||||
<activation>
|
||||
<os>
|
||||
<arch>aarch64</arch>
|
||||
<family>mac</family>
|
||||
</os>
|
||||
</activation>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<version>1.1.8.4</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
Loading…
Reference in New Issue