From 02636e802c26c284efa1415d168815c5b23ed655 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 2 Oct 2023 09:25:12 +0200 Subject: [PATCH 1/5] SparkCreateSimRels: - Create dedup blocks from the complete queue of records matching cluster key instead of truncating the results - Clean titles once before clustering and similarity comparisons - Added support for filtered fields in model - Added support for sorting List fields in model - Added new JSONListClustering and numAuthorsTitleSuffixPrefixChain clustering functions - Added new maxLengthMatch comparator function - Use reduced complexity Levenshtein with threshold in levensteinTitle - Use reduced complexity AuthorsMatch with threshold early-quit - Use incremental Connected Component to decrease comparisons in similarity match in BlockProcessor - Use new clusterings configuration in Dedup tests SparkWhitelistSimRels: use left semi join for clarity and performance SparkCreateMergeRels: - Use new connected component algorithm that converge faster than Spark GraphX provided algorithm - Refactored to use Windowing sorting rather than groupBy to reduce memory pressure - Use historical pivot table to generate singleton rels, merged rels and keep continuity with dedupIds used in the past - Comparator for pivot record selection now uses "tomorrow" as filler for missing or incorrect date instead of "2000-01-01" - Changed generation of ids of type dedup_wf_001 to avoid collisions DedupRecordFactory: use reduceGroups instead of mapGroups to decrease memory pressure --- .../AbstractClusteringFunction.java | 23 +- .../eu/dnetlib/pace/clustering/Acronyms.java | 2 +- .../pace/clustering/ClusteringFunction.java | 2 +- .../pace/clustering/ImmutableFieldValue.java | 2 +- .../pace/clustering/JSONListClustering.java | 69 ++++ .../pace/clustering/KeywordsClustering.java | 12 +- .../pace/clustering/LastNameFirstInitial.java | 7 +- .../pace/clustering/LowercaseClustering.java | 2 +- .../dnetlib/pace/clustering/NgramPairs.java | 4 +- .../eu/dnetlib/pace/clustering/Ngrams.java | 4 +- .../NumAuthorsTitleSuffixPrefixChain.java | 113 ++++++ .../pace/clustering/PersonClustering.java | 6 +- .../dnetlib/pace/clustering/PersonHash.java | 2 +- .../clustering/RandomClusteringFunction.java | 2 +- .../pace/clustering/SortedNgramPairs.java | 7 +- .../clustering/SpaceTrimmingFieldValue.java | 4 +- .../dnetlib/pace/clustering/SuffixPrefix.java | 2 +- .../pace/clustering/UrlClustering.java | 14 +- .../WordsStatsSuffixPrefixChain.java | 2 +- .../pace/clustering/WordsSuffixPrefix.java | 2 +- .../pace/common/AbstractPaceFunctions.java | 66 ++-- .../eu/dnetlib/pace/model/ClusteringDef.java | 6 +- .../java/eu/dnetlib/pace/model/FieldDef.java | 35 ++ .../eu/dnetlib/pace/model/SparkDeduper.scala | 38 +- .../eu/dnetlib/pace/model/SparkModel.scala | 46 ++- .../eu/dnetlib/pace/tree/AuthorsMatch.java | 25 +- .../dnetlib/pace/tree/InstanceTypeMatch.java | 2 +- .../eu/dnetlib/pace/tree/LevensteinTitle.java | 20 +- .../eu/dnetlib/pace/tree/MaxLengthMatch.java | 29 ++ .../pace/tree/support/AbstractComparator.java | 10 + .../eu/dnetlib/pace/util/BlockProcessor.java | 24 +- .../util/IncrementalConnectedComponents.java | 50 +++ .../eu/dnetlib/pace/util/MapDocumentUtil.java | 2 + .../eu/dnetlib/pace/util/PaceResolver.java | 2 +- .../clustering/ClusteringFunctionTest.java | 40 +-- .../IncrementalConnectedComponentsTest.java | 40 +++ .../dhp/oa/dedup/AbstractSparkAction.java | 4 + .../dhp/oa/dedup/DedupRecordFactory.java | 134 +++---- .../eu/dnetlib/dhp/oa/dedup/IdGenerator.java | 21 +- .../dhp/oa/dedup/SparkCreateMergeRels.java | 332 ++++++++++------- .../dhp/oa/dedup/SparkWhitelistSimRels.java | 16 +- .../oa/dedup/graph/ConnectedComponent.java | 100 ------ .../dhp/oa/dedup/graph/GraphProcessor.scala | 37 -- .../dhp/oa/dedup/model/Identifier.java | 18 +- .../dhp/oa/dedup/createCC_parameters.json | 12 + .../dedup/scan/oozie_app/config-default.xml | 4 + .../dhp/oa/dedup/scan/oozie_app/workflow.xml | 2 + .../kwartile/lib/cc/ConnectedComponent.scala | 335 ++++++++++++++++++ .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 160 +++++++-- .../dnetlib/dhp/dedup/conf/ds.curr.conf.json | 3 +- .../dnetlib/dhp/dedup/conf/orp.curr.conf.json | 3 +- .../dnetlib/dhp/dedup/conf/pub.curr.conf.json | 49 ++- .../dnetlib/dhp/dedup/conf/sw.curr.conf.json | 3 +- .../dedup/pivot_history/pivot_history.json | 1 + pom.xml | 20 ++ 55 files changed, 1437 insertions(+), 533 deletions(-) create mode 100644 dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/JSONListClustering.java create mode 100644 dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NumAuthorsTitleSuffixPrefixChain.java create mode 100644 dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/MaxLengthMatch.java create mode 100644 dhp-pace-core/src/main/java/eu/dnetlib/pace/util/IncrementalConnectedComponents.java create mode 100644 dhp-pace-core/src/test/java/eu/dnetlib/pace/util/IncrementalConnectedComponentsTest.java delete mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java delete mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/scala/com/kwartile/lib/cc/ConnectedComponent.scala create mode 100644 dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/pivot_history/pivot_history.json diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/AbstractClusteringFunction.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/AbstractClusteringFunction.java index 3da8eb490..e971ec5bb 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/AbstractClusteringFunction.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/AbstractClusteringFunction.java @@ -14,9 +14,9 @@ import eu.dnetlib.pace.config.Config; public abstract class AbstractClusteringFunction extends AbstractPaceFunctions implements ClusteringFunction { - protected Map params; + protected Map params; - public AbstractClusteringFunction(final Map params) { + public AbstractClusteringFunction(final Map params) { this.params = params; } @@ -27,7 +27,7 @@ public abstract class AbstractClusteringFunction extends AbstractPaceFunctions i return fields .stream() .filter(f -> !f.isEmpty()) - .map(this::normalize) + .map(s -> normalize(s)) .map(s -> filterAllStopWords(s)) .map(s -> doApply(conf, s)) .map(c -> filterBlacklisted(c, ngramBlacklist)) @@ -36,11 +36,24 @@ public abstract class AbstractClusteringFunction extends AbstractPaceFunctions i .collect(Collectors.toCollection(HashSet::new)); } - public Map getParams() { + public Map getParams() { return params; } protected Integer param(String name) { - return params.get(name); + Object val = params.get(name); + if (val == null) + return null; + if (val instanceof Number) { + return ((Number) val).intValue(); + } + return Integer.parseInt(val.toString()); + } + + protected int paramOrDefault(String name, int i) { + Integer res = param(name); + if (res == null) + res = i; + return res; } } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Acronyms.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Acronyms.java index 9072fbb4b..b5db27106 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Acronyms.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Acronyms.java @@ -13,7 +13,7 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("acronyms") public class Acronyms extends AbstractClusteringFunction { - public Acronyms(Map params) { + public Acronyms(Map params) { super(params); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ClusteringFunction.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ClusteringFunction.java index 8b7852418..269de867d 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ClusteringFunction.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ClusteringFunction.java @@ -11,6 +11,6 @@ public interface ClusteringFunction { public Collection apply(Config config, List fields); - public Map getParams(); + public Map getParams(); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ImmutableFieldValue.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ImmutableFieldValue.java index bc8844aee..cbfcde266 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ImmutableFieldValue.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/ImmutableFieldValue.java @@ -12,7 +12,7 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("immutablefieldvalue") public class ImmutableFieldValue extends AbstractClusteringFunction { - public ImmutableFieldValue(final Map params) { + public ImmutableFieldValue(final Map params) { super(params); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/JSONListClustering.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/JSONListClustering.java new file mode 100644 index 000000000..e00092bd0 --- /dev/null +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/JSONListClustering.java @@ -0,0 +1,69 @@ + +package eu.dnetlib.pace.clustering; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; + +import eu.dnetlib.pace.common.AbstractPaceFunctions; +import eu.dnetlib.pace.config.Config; +import eu.dnetlib.pace.util.MapDocumentUtil; + +@ClusteringClass("jsonlistclustering") +public class JSONListClustering extends AbstractPaceFunctions implements ClusteringFunction { + + private Map params; + + public JSONListClustering(Map params) { + this.params = params; + } + + @Override + public Map getParams() { + return params; + } + + @Override + public Collection apply(Config conf, List fields) { + return fields + .stream() + .filter(f -> !f.isEmpty()) + .map(s -> doApply(conf, s)) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toCollection(HashSet::new)); + } + + private String doApply(Config conf, String json) { + StringBuilder st = new StringBuilder(); // to build the string used for comparisons basing on the jpath into + // parameters + final DocumentContext documentContext = JsonPath + .using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)) + .parse(json); + + // for each path in the param list + for (String key : params.keySet().stream().filter(k -> k.contains("jpath")).collect(Collectors.toList())) { + String path = params.get(key).toString(); + String value = MapDocumentUtil.getJPathString(path, documentContext); + if (value == null || value.isEmpty()) + value = ""; + st.append(value); + st.append(" "); + } + + st.setLength(st.length() - 1); + + if (StringUtils.isBlank(st)) { + return "1"; + } + return st.toString(); + } +} diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/KeywordsClustering.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/KeywordsClustering.java index 38299adb4..fdd8d1fb1 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/KeywordsClustering.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/KeywordsClustering.java @@ -11,7 +11,7 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("keywordsclustering") public class KeywordsClustering extends AbstractClusteringFunction { - public KeywordsClustering(Map params) { + public KeywordsClustering(Map params) { super(params); } @@ -19,8 +19,8 @@ public class KeywordsClustering extends AbstractClusteringFunction { protected Collection doApply(final Config conf, String s) { // takes city codes and keywords codes without duplicates - Set keywords = getKeywords(s, conf.translationMap(), params.getOrDefault("windowSize", 4)); - Set cities = getCities(s, params.getOrDefault("windowSize", 4)); + Set keywords = getKeywords(s, conf.translationMap(), paramOrDefault("windowSize", 4)); + Set cities = getCities(s, paramOrDefault("windowSize", 4)); // list of combination to return as result final Collection combinations = new LinkedHashSet(); @@ -28,7 +28,7 @@ public class KeywordsClustering extends AbstractClusteringFunction { for (String keyword : keywordsToCodes(keywords, conf.translationMap())) { for (String city : citiesToCodes(cities)) { combinations.add(keyword + "-" + city); - if (combinations.size() >= params.getOrDefault("max", 2)) { + if (combinations.size() >= paramOrDefault("max", 2)) { return combinations; } } @@ -42,8 +42,8 @@ public class KeywordsClustering extends AbstractClusteringFunction { return fields .stream() .filter(f -> !f.isEmpty()) - .map(this::cleanup) - .map(this::normalize) + .map(KeywordsClustering::cleanup) + .map(KeywordsClustering::normalize) .map(s -> filterAllStopWords(s)) .map(s -> doApply(conf, s)) .map(c -> filterBlacklisted(c, ngramBlacklist)) diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LastNameFirstInitial.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LastNameFirstInitial.java index 5a385961a..9692f5762 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LastNameFirstInitial.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LastNameFirstInitial.java @@ -16,7 +16,7 @@ public class LastNameFirstInitial extends AbstractClusteringFunction { private boolean DEFAULT_AGGRESSIVE = true; - public LastNameFirstInitial(final Map params) { + public LastNameFirstInitial(final Map params) { super(params); } @@ -25,7 +25,7 @@ public class LastNameFirstInitial extends AbstractClusteringFunction { return fields .stream() .filter(f -> !f.isEmpty()) - .map(this::normalize) + .map(LastNameFirstInitial::normalize) .map(s -> doApply(conf, s)) .map(c -> filterBlacklisted(c, ngramBlacklist)) .flatMap(c -> c.stream()) @@ -33,8 +33,7 @@ public class LastNameFirstInitial extends AbstractClusteringFunction { .collect(Collectors.toCollection(HashSet::new)); } - @Override - protected String normalize(final String s) { + public static String normalize(final String s) { return fixAliases(transliterate(nfd(unicodeNormalization(s)))) // do not compact the regexes in a single expression, would cause StackOverflowError in case of large input // strings diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LowercaseClustering.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LowercaseClustering.java index a3a6c4881..807f41dd5 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LowercaseClustering.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/LowercaseClustering.java @@ -15,7 +15,7 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("lowercase") public class LowercaseClustering extends AbstractClusteringFunction { - public LowercaseClustering(final Map params) { + public LowercaseClustering(final Map params) { super(params); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NgramPairs.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NgramPairs.java index aa06aa408..bcc9667a8 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NgramPairs.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NgramPairs.java @@ -12,11 +12,11 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("ngrampairs") public class NgramPairs extends Ngrams { - public NgramPairs(Map params) { + public NgramPairs(Map params) { super(params, false); } - public NgramPairs(Map params, boolean sorted) { + public NgramPairs(Map params, boolean sorted) { super(params, sorted); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Ngrams.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Ngrams.java index 96c305a16..7b862c729 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Ngrams.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/Ngrams.java @@ -10,11 +10,11 @@ public class Ngrams extends AbstractClusteringFunction { private final boolean sorted; - public Ngrams(Map params) { + public Ngrams(Map params) { this(params, false); } - public Ngrams(Map params, boolean sorted) { + public Ngrams(Map params, boolean sorted) { super(params); this.sorted = sorted; } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NumAuthorsTitleSuffixPrefixChain.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NumAuthorsTitleSuffixPrefixChain.java new file mode 100644 index 000000000..f1d1e17b9 --- /dev/null +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/NumAuthorsTitleSuffixPrefixChain.java @@ -0,0 +1,113 @@ + +package eu.dnetlib.pace.clustering; + +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import com.google.common.base.Splitter; +import com.google.common.collect.Sets; + +import eu.dnetlib.pace.config.Config; + +@ClusteringClass("numAuthorsTitleSuffixPrefixChain") +public class NumAuthorsTitleSuffixPrefixChain extends AbstractClusteringFunction { + + public NumAuthorsTitleSuffixPrefixChain(Map params) { + super(params); + } + + @Override + public Collection apply(Config conf, List fields) { + + try { + int num_authors = Math.min(Integer.parseInt(fields.get(0)), 21); // SIZE threshold is 20, +1 + + if (num_authors > 0) { + return super.apply(conf, fields.subList(1, fields.size())) + .stream() + .map(s -> num_authors + "-" + s) + .collect(Collectors.toList()); + } + } catch (NumberFormatException e) { + // missing or null authors array + } + + return Collections.emptyList(); + } + + @Override + protected Collection doApply(Config conf, String s) { + return suffixPrefixChain(cleanup(s), param("mod")); + } + + private Collection suffixPrefixChain(String s, int mod) { + // create the list of words from the string (remove short words) + List wordsList = Arrays + .stream(s.split(" ")) + .filter(si -> si.length() > 3) + .collect(Collectors.toList()); + + final int words = wordsList.size(); + final int letters = s.length(); + + // create the prefix: number of words + number of letters/mod + String prefix = words / mod + "-"; + + return doSuffixPrefixChain(wordsList, prefix); + + } + + private Collection doSuffixPrefixChain(List wordsList, String prefix) { + + Set set = Sets.newLinkedHashSet(); + switch (wordsList.size()) { + case 0: + break; + case 1: + set.add(wordsList.get(0)); + break; + case 2: + set + .add( + prefix + + suffix(wordsList.get(0), 3) + + prefix(wordsList.get(1), 3)); + + set + .add( + prefix + + prefix(wordsList.get(0), 3) + + suffix(wordsList.get(1), 3)); + + break; + default: + set + .add( + prefix + + suffix(wordsList.get(0), 3) + + prefix(wordsList.get(1), 3) + + suffix(wordsList.get(2), 3)); + + set + .add( + prefix + + prefix(wordsList.get(0), 3) + + suffix(wordsList.get(1), 3) + + prefix(wordsList.get(2), 3)); + break; + } + + return set; + + } + + private String suffix(String s, int len) { + return s.substring(s.length() - len); + } + + private String prefix(String s, int len) { + return s.substring(0, len); + } + +} diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonClustering.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonClustering.java index b4a04ce65..91b51bebb 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonClustering.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonClustering.java @@ -17,11 +17,11 @@ import eu.dnetlib.pace.model.Person; @ClusteringClass("personClustering") public class PersonClustering extends AbstractPaceFunctions implements ClusteringFunction { - private Map params; + private Map params; private static final int MAX_TOKENS = 5; - public PersonClustering(final Map params) { + public PersonClustering(final Map params) { this.params = params; } @@ -77,7 +77,7 @@ public class PersonClustering extends AbstractPaceFunctions implements Clusterin // } @Override - public Map getParams() { + public Map getParams() { return params; } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonHash.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonHash.java index a3d58a9be..09a112c37 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonHash.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/PersonHash.java @@ -15,7 +15,7 @@ public class PersonHash extends AbstractClusteringFunction { private boolean DEFAULT_AGGRESSIVE = false; - public PersonHash(final Map params) { + public PersonHash(final Map params) { super(params); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/RandomClusteringFunction.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/RandomClusteringFunction.java index 2aab926da..3733dfc74 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/RandomClusteringFunction.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/RandomClusteringFunction.java @@ -8,7 +8,7 @@ import eu.dnetlib.pace.config.Config; public class RandomClusteringFunction extends AbstractClusteringFunction { - public RandomClusteringFunction(Map params) { + public RandomClusteringFunction(Map params) { super(params); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SortedNgramPairs.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SortedNgramPairs.java index b085ae26d..ca1b4189b 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SortedNgramPairs.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SortedNgramPairs.java @@ -1,7 +1,10 @@ package eu.dnetlib.pace.clustering; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; import com.google.common.base.Joiner; import com.google.common.base.Splitter; @@ -12,7 +15,7 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("sortedngrampairs") public class SortedNgramPairs extends NgramPairs { - public SortedNgramPairs(Map params) { + public SortedNgramPairs(Map params) { super(params, false); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SpaceTrimmingFieldValue.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SpaceTrimmingFieldValue.java index 392aecc79..048380f7e 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SpaceTrimmingFieldValue.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SpaceTrimmingFieldValue.java @@ -15,7 +15,7 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("spacetrimmingfieldvalue") public class SpaceTrimmingFieldValue extends AbstractClusteringFunction { - public SpaceTrimmingFieldValue(final Map params) { + public SpaceTrimmingFieldValue(final Map params) { super(params); } @@ -25,7 +25,7 @@ public class SpaceTrimmingFieldValue extends AbstractClusteringFunction { res .add( - StringUtils.isBlank(s) ? RandomStringUtils.random(getParams().get("randomLength")) + StringUtils.isBlank(s) ? RandomStringUtils.random(param("randomLength")) : s.toLowerCase().replaceAll("\\s+", "")); return res; diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SuffixPrefix.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SuffixPrefix.java index 2a1c023a9..b6921e9f1 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SuffixPrefix.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/SuffixPrefix.java @@ -12,7 +12,7 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("suffixprefix") public class SuffixPrefix extends AbstractClusteringFunction { - public SuffixPrefix(Map params) { + public SuffixPrefix(Map params) { super(params); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/UrlClustering.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/UrlClustering.java index 5b267ad10..34f41085b 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/UrlClustering.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/UrlClustering.java @@ -15,12 +15,17 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("urlclustering") public class UrlClustering extends AbstractPaceFunctions implements ClusteringFunction { - protected Map params; + protected Map params; - public UrlClustering(final Map params) { + public UrlClustering(final Map params) { this.params = params; } + @Override + public Map getParams() { + return params; + } + @Override public Collection apply(final Config conf, List fields) { try { @@ -35,11 +40,6 @@ public class UrlClustering extends AbstractPaceFunctions implements ClusteringFu } } - @Override - public Map getParams() { - return null; - } - private URL asUrl(String value) { try { return new URL(value); diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java index c8e02f8f0..22351cf8f 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java @@ -11,7 +11,7 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("wordsStatsSuffixPrefixChain") public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction { - public WordsStatsSuffixPrefixChain(Map params) { + public WordsStatsSuffixPrefixChain(Map params) { super(params); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsSuffixPrefix.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsSuffixPrefix.java index e606590a5..f9fef376b 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsSuffixPrefix.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsSuffixPrefix.java @@ -12,7 +12,7 @@ import eu.dnetlib.pace.config.Config; @ClusteringClass("wordssuffixprefix") public class WordsSuffixPrefix extends AbstractClusteringFunction { - public WordsSuffixPrefix(Map params) { + public WordsSuffixPrefix(Map params) { super(params); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java index b440686de..ba7639ada 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java @@ -16,7 +16,6 @@ import org.apache.commons.lang3.StringUtils; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.ibm.icu.text.Transliterator; @@ -27,7 +26,7 @@ import eu.dnetlib.pace.clustering.NGramUtils; * * @author claudio */ -public abstract class AbstractPaceFunctions { +public class AbstractPaceFunctions { // city map to be used when translating the city names into codes private static Map cityMap = AbstractPaceFunctions @@ -62,11 +61,14 @@ public abstract class AbstractPaceFunctions { private static Pattern hexUnicodePattern = Pattern.compile("\\\\u(\\p{XDigit}{4})"); - protected String concat(final List l) { + private static Pattern romanNumberPattern = Pattern + .compile("^M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})$"); + + protected static String concat(final List l) { return Joiner.on(" ").skipNulls().join(l); } - protected String cleanup(final String s) { + public static String cleanup(final String s) { final String s1 = HTML_REGEX.matcher(s).replaceAll(""); final String s2 = unicodeNormalization(s1.toLowerCase()); final String s3 = nfd(s2); @@ -82,7 +84,7 @@ public abstract class AbstractPaceFunctions { return s12; } - protected String fixXML(final String a) { + protected static String fixXML(final String a) { return a .replaceAll("–", " ") @@ -91,7 +93,7 @@ public abstract class AbstractPaceFunctions { .replaceAll("−", " "); } - protected boolean checkNumbers(final String a, final String b) { + protected static boolean checkNumbers(final String a, final String b) { final String numbersA = getNumbers(a); final String numbersB = getNumbers(b); final String romansA = getRomans(a); @@ -99,7 +101,7 @@ public abstract class AbstractPaceFunctions { return !numbersA.equals(numbersB) || !romansA.equals(romansB); } - protected String getRomans(final String s) { + protected static String getRomans(final String s) { final StringBuilder sb = new StringBuilder(); for (final String t : s.split(" ")) { sb.append(isRoman(t) ? t : ""); @@ -107,13 +109,12 @@ public abstract class AbstractPaceFunctions { return sb.toString(); } - protected boolean isRoman(final String s) { - return s - .replaceAll("^M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})$", "qwertyuiop") - .equals("qwertyuiop"); + protected static boolean isRoman(final String s) { + Matcher m = romanNumberPattern.matcher(s); + return m.matches() && m.hitEnd(); } - protected String getNumbers(final String s) { + protected static String getNumbers(final String s) { final StringBuilder sb = new StringBuilder(); for (final String t : s.split(" ")) { sb.append(isNumber(t) ? t : ""); @@ -121,7 +122,7 @@ public abstract class AbstractPaceFunctions { return sb.toString(); } - public boolean isNumber(String strNum) { + public static boolean isNumber(String strNum) { if (strNum == null) { return false; } @@ -147,7 +148,7 @@ public abstract class AbstractPaceFunctions { } } - protected String removeSymbols(final String s) { + protected static String removeSymbols(final String s) { final StringBuilder sb = new StringBuilder(); s.chars().forEach(ch -> { @@ -157,11 +158,11 @@ public abstract class AbstractPaceFunctions { return sb.toString().replaceAll("\\s+", " "); } - protected boolean notNull(final String s) { + protected static boolean notNull(final String s) { return s != null; } - protected String normalize(final String s) { + public static String normalize(final String s) { return fixAliases(transliterate(nfd(unicodeNormalization(s)))) .toLowerCase() // do not compact the regexes in a single expression, would cause StackOverflowError in case of large input @@ -174,16 +175,16 @@ public abstract class AbstractPaceFunctions { .trim(); } - public String nfd(final String s) { + public static String nfd(final String s) { return Normalizer.normalize(s, Normalizer.Form.NFD); } - public String utf8(final String s) { + public static String utf8(final String s) { byte[] bytes = s.getBytes(StandardCharsets.UTF_8); return new String(bytes, StandardCharsets.UTF_8); } - public String unicodeNormalization(final String s) { + public static String unicodeNormalization(final String s) { Matcher m = hexUnicodePattern.matcher(s); StringBuffer buf = new StringBuffer(s.length()); @@ -195,7 +196,7 @@ public abstract class AbstractPaceFunctions { return buf.toString(); } - protected String filterStopWords(final String s, final Set stopwords) { + protected static String filterStopWords(final String s, final Set stopwords) { final StringTokenizer st = new StringTokenizer(s); final StringBuilder sb = new StringBuilder(); while (st.hasMoreTokens()) { @@ -208,7 +209,7 @@ public abstract class AbstractPaceFunctions { return sb.toString().trim(); } - public String filterAllStopWords(String s) { + public static String filterAllStopWords(String s) { s = filterStopWords(s, stopwords_en); s = filterStopWords(s, stopwords_de); @@ -221,7 +222,8 @@ public abstract class AbstractPaceFunctions { return s; } - protected Collection filterBlacklisted(final Collection set, final Set ngramBlacklist) { + protected static Collection filterBlacklisted(final Collection set, + final Set ngramBlacklist) { final Set newset = Sets.newLinkedHashSet(); for (final String s : set) { if (!ngramBlacklist.contains(s)) { @@ -268,7 +270,7 @@ public abstract class AbstractPaceFunctions { return m; } - public String removeKeywords(String s, Set keywords) { + public static String removeKeywords(String s, Set keywords) { s = " " + s + " "; for (String k : keywords) { @@ -278,39 +280,39 @@ public abstract class AbstractPaceFunctions { return s.trim(); } - public double commonElementsPercentage(Set s1, Set s2) { + public static double commonElementsPercentage(Set s1, Set s2) { double longer = Math.max(s1.size(), s2.size()); return (double) s1.stream().filter(s2::contains).count() / longer; } // convert the set of keywords to codes - public Set toCodes(Set keywords, Map translationMap) { + public static Set toCodes(Set keywords, Map translationMap) { return keywords.stream().map(s -> translationMap.get(s)).collect(Collectors.toSet()); } - public Set keywordsToCodes(Set keywords, Map translationMap) { + public static Set keywordsToCodes(Set keywords, Map translationMap) { return toCodes(keywords, translationMap); } - public Set citiesToCodes(Set keywords) { + public static Set citiesToCodes(Set keywords) { return toCodes(keywords, cityMap); } - protected String firstLC(final String s) { + protected static String firstLC(final String s) { return StringUtils.substring(s, 0, 1).toLowerCase(); } - protected Iterable tokens(final String s, final int maxTokens) { + protected static Iterable tokens(final String s, final int maxTokens) { return Iterables.limit(Splitter.on(" ").omitEmptyStrings().trimResults().split(s), maxTokens); } - public String normalizePid(String pid) { + public static String normalizePid(String pid) { return DOI_PREFIX.matcher(pid.toLowerCase()).replaceAll(""); } // get the list of keywords into the input string - public Set getKeywords(String s1, Map translationMap, int windowSize) { + public static Set getKeywords(String s1, Map translationMap, int windowSize) { String s = s1; @@ -340,7 +342,7 @@ public abstract class AbstractPaceFunctions { return codes; } - public Set getCities(String s1, int windowSize) { + public static Set getCities(String s1, int windowSize) { return getKeywords(s1, cityMap, windowSize); } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java index d9ad81d42..5ede2c380 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/ClusteringDef.java @@ -18,7 +18,7 @@ public class ClusteringDef implements Serializable { private List fields; - private Map params; + private Map params; public ClusteringDef() { } @@ -43,11 +43,11 @@ public class ClusteringDef implements Serializable { this.fields = fields; } - public Map getParams() { + public Map getParams() { return params; } - public void setParams(final Map params) { + public void setParams(final Map params) { this.params = params; } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java index f34545e6d..7ad9b7445 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/FieldDef.java @@ -2,6 +2,7 @@ package eu.dnetlib.pace.model; import java.io.Serializable; +import java.util.HashSet; import java.util.List; import com.fasterxml.jackson.core.JsonProcessingException; @@ -36,6 +37,16 @@ public class FieldDef implements Serializable { */ private int length = -1; + private HashSet filter; + + private boolean sorted; + + public boolean isSorted() { + return sorted; + } + + private String clean; + public FieldDef() { } @@ -91,6 +102,30 @@ public class FieldDef implements Serializable { this.path = path; } + public HashSet getFilter() { + return filter; + } + + public void setFilter(HashSet filter) { + this.filter = filter; + } + + public boolean getSorted() { + return sorted; + } + + public void setSorted(boolean sorted) { + this.sorted = sorted; + } + + public String getClean() { + return clean; + } + + public void setClean(String clean) { + this.clean = clean; + } + @Override public String toString() { try { diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala index b3f56bcdb..bc702b9e2 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDeduper.scala @@ -5,9 +5,9 @@ import eu.dnetlib.pace.util.{BlockProcessor, SparkReporter} import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.expressions._ -import org.apache.spark.sql.functions.{col, lit, udf} +import org.apache.spark.sql.functions.{col, desc, expr, lit, udf} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, Dataset, Row, functions} +import org.apache.spark.sql.{Column, Dataset, Row, SaveMode, functions} import java.util.function.Predicate import java.util.stream.Collectors @@ -80,6 +80,8 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable { .withColumn("key", functions.explode(clusterValuesUDF(cd).apply(functions.array(inputColumns: _*)))) // Add position column having the position of the row within the set of rows having the same key value ordered by the sorting value .withColumn("position", functions.row_number().over(Window.partitionBy("key").orderBy(col(model.orderingFieldName), col(model.identifierFieldName)))) + // .withColumn("count", functions.max("position").over(Window.partitionBy("key").orderBy(col(model.orderingFieldName), col(model.identifierFieldName)).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing) )) + // .filter("count > 1") if (df_with_clustering_keys == null) df_with_clustering_keys = ds @@ -88,20 +90,44 @@ case class SparkDeduper(conf: DedupConfig) extends Serializable { } //TODO: analytics + /*df_with_clustering_keys.groupBy(col("clustering"), col("key")) + .agg(expr("max(count) AS size")) + .orderBy(desc("size")) + .show*/ val df_with_blocks = df_with_clustering_keys - // filter out rows with position exceeding the maxqueuesize parameter - .filter(col("position").leq(conf.getWf.getQueueMaxSize)) - .groupBy("clustering", "key") + // split the clustering block into smaller blocks of queuemaxsize + .groupBy(col("clustering"), col("key"), functions.floor(col("position").divide(lit(conf.getWf.getQueueMaxSize)))) .agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block")) .filter(functions.size(new Column("block")).gt(1)) + .union( + //adjacency blocks + df_with_clustering_keys + // filter out leading and trailing elements + .filter(col("position").gt(conf.getWf.getSlidingWindowSize/2)) + //.filter(col("position").lt(col("count").minus(conf.getWf.getSlidingWindowSize/2))) + // create small blocks of records on "the border" of maxqueuesize: getSlidingWindowSize/2 elements before and after + .filter( + col("position").mod(conf.getWf.getQueueMaxSize).lt(conf.getWf.getSlidingWindowSize/2) // slice of the start of block + || col("position").mod(conf.getWf.getQueueMaxSize).gt(conf.getWf.getQueueMaxSize - (conf.getWf.getSlidingWindowSize/2)) //slice of the end of the block + ) + .groupBy(col("clustering"), col("key"), functions.floor((col("position") + lit(conf.getWf.getSlidingWindowSize/2)).divide(lit(conf.getWf.getQueueMaxSize)))) + .agg(functions.collect_set(functions.struct(model.schema.fieldNames.map(col): _*)).as("block")) + .filter(functions.size(new Column("block")).gt(1)) + ) df_with_blocks } def clusterValuesUDF(cd: ClusteringDef) = { udf[mutable.WrappedArray[String], mutable.WrappedArray[Any]](values => { - values.flatMap(f => cd.clusteringFunction().apply(conf, Seq(f.toString).asJava).asScala) + val valueList = values.flatMap { + case a: mutable.WrappedArray[Any] => a.map(_.toString) + case s: Any => Seq(s.toString) + }.asJava; + + mutable.WrappedArray.make(cd.clusteringFunction().apply(conf, valueList).toArray()) + }) } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala index aa997c6e9..aa04188da 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala @@ -1,13 +1,16 @@ package eu.dnetlib.pace.model import com.jayway.jsonpath.{Configuration, JsonPath} +import eu.dnetlib.pace.common.AbstractPaceFunctions import eu.dnetlib.pace.config.{DedupConfig, Type} import eu.dnetlib.pace.util.MapDocumentUtil +import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} import org.apache.spark.sql.{Dataset, Row} +import java.util.Locale import java.util.regex.Pattern import scala.collection.JavaConverters._ @@ -60,7 +63,7 @@ case class SparkModel(conf: DedupConfig) { values(identityFieldPosition) = MapDocumentUtil.getJPathString(conf.getWf.getIdPath, documentContext) schema.fieldNames.zipWithIndex.foldLeft(values) { - case ((res, (fname, index))) => { + case ((res, (fname, index))) => val fdef = conf.getPace.getModelMap.get(fname) if (fdef != null) { @@ -96,13 +99,52 @@ case class SparkModel(conf: DedupConfig) { case Type.DoubleArray => MapDocumentUtil.getJPathArray(fdef.getPath, json) } + + val filter = fdef.getFilter + + if (StringUtils.isNotBlank(fdef.getClean)) { + res(index) = res(index) match { + case x: Seq[String] => x.map(clean(_, fdef.getClean)).toSeq + case _ => clean(res(index).toString, fdef.getClean) + } + } + + if (filter != null && !filter.isEmpty) { + res(index) = res(index) match { + case x: String if filter.contains(x.toLowerCase(Locale.ROOT)) => null + case x: Seq[String] => x.filter(s => !filter.contains(s.toLowerCase(Locale.ROOT))).toSeq + case _ => res(index) + } + } + + if (fdef.getSorted) { + res(index) = res(index) match { + case x: Seq[String] => x.sorted.toSeq + case _ => res(index) + } + } } res - } } new GenericRowWithSchema(values, schema) } + + def clean(value: String, cleantype: String) : String = { + val res = cleantype match { + case "title" => AbstractPaceFunctions.cleanup(value) + case _ => value + } + +// if (!res.equals(AbstractPaceFunctions.normalize(value))) { +// println(res) +// println(AbstractPaceFunctions.normalize(value)) +// println() +// } + + res + } + } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/AuthorsMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/AuthorsMatch.java index 5c6939e60..edad0ae2e 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/AuthorsMatch.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/AuthorsMatch.java @@ -23,7 +23,6 @@ public class AuthorsMatch extends AbstractListComparator { private String MODE; // full or surname private int SIZE_THRESHOLD; private String TYPE; // count or percentage - private int common; public AuthorsMatch(Map params) { super(params, new com.wcohen.ss.JaroWinkler()); @@ -35,7 +34,6 @@ public class AuthorsMatch extends AbstractListComparator { FULLNAME_THRESHOLD = Double.parseDouble(params.getOrDefault("fullname_th", "0.9")); SIZE_THRESHOLD = Integer.parseInt(params.getOrDefault("size_th", "20")); TYPE = params.getOrDefault("type", "percentage"); - common = 0; } protected AuthorsMatch(double w, AbstractStringDistance ssalgo) { @@ -44,22 +42,27 @@ public class AuthorsMatch extends AbstractListComparator { @Override public double compare(final List a, final List b, final Config conf) { - if (a.isEmpty() || b.isEmpty()) return -1; if (a.size() > SIZE_THRESHOLD || b.size() > SIZE_THRESHOLD) return 1.0; - List aList = a.stream().map(author -> new Person(author, false)).collect(Collectors.toList()); + int maxMiss = Integer.MAX_VALUE; List bList = b.stream().map(author -> new Person(author, false)).collect(Collectors.toList()); - common = 0; + Double threshold = getDoubleParam("threshold"); + + if (threshold != null && threshold >= 0.0 && threshold <= 1.0 && a.size() == b.size()) { + maxMiss = (int) Math.floor((1 - threshold) * Math.max(a.size(), b.size())); + } + + int common = 0; // compare each element of List1 with each element of List2 - for (Person p1 : aList) + for (int i = 0; i < a.size(); i++) { + Person p1 = new Person(a.get(i), false); for (Person p2 : bList) { - // both persons are inaccurate if (!p1.isAccurate() && !p2.isAccurate()) { // compare just normalized fullnames @@ -118,11 +121,15 @@ public class AuthorsMatch extends AbstractListComparator { } } - } + if (i - common > maxMiss) { + return 0.0; + } + } + // normalization factor to compute the score - int normFactor = aList.size() == bList.size() ? aList.size() : (aList.size() + bList.size() - common); + int normFactor = a.size() == b.size() ? a.size() : (a.size() + b.size() - common); if (TYPE.equals("percentage")) { return (double) common / normFactor; diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/InstanceTypeMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/InstanceTypeMatch.java index 238cb16ce..34ebcf7a7 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/InstanceTypeMatch.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/InstanceTypeMatch.java @@ -25,6 +25,7 @@ public class InstanceTypeMatch extends AbstractListComparator { translationMap.put("Conference object", "*"); translationMap.put("Other literature type", "*"); translationMap.put("Unknown", "*"); + translationMap.put("UNKNOWN", "*"); // article types translationMap.put("Article", "Article"); @@ -76,5 +77,4 @@ public class InstanceTypeMatch extends AbstractListComparator { protected double normalize(final double d) { return d; } - } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/LevensteinTitle.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/LevensteinTitle.java index 877cb95ab..e2ee062b5 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/LevensteinTitle.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/LevensteinTitle.java @@ -3,6 +3,7 @@ package eu.dnetlib.pace.tree; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,16 +31,25 @@ public class LevensteinTitle extends AbstractStringComparator { } @Override - public double distance(final String a, final String b, final Config conf) { - final String ca = cleanup(a); - final String cb = cleanup(b); - + public double distance(final String ca, final String cb, final Config conf) { final boolean check = checkNumbers(ca, cb); if (check) return 0.5; - return normalize(ssalgo.score(ca, cb), ca.length(), cb.length()); + Double threshold = getDoubleParam("threshold"); + + // reduce Levenshtein algo complexity when target threshold is known + if (threshold != null && threshold >= 0.0 && threshold <= 1.0) { + int maxdistance = (int) Math.floor((1 - threshold) * Math.max(ca.length(), cb.length())); + int score = StringUtils.getLevenshteinDistance(ca, cb, maxdistance); + if (score == -1) { + return 0; + } + return normalize(score, ca.length(), cb.length()); + } else { + return normalize(StringUtils.getLevenshteinDistance(ca, cb), ca.length(), cb.length()); + } } private double normalize(final double score, final int la, final int lb) { diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/MaxLengthMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/MaxLengthMatch.java new file mode 100644 index 000000000..8f525c6d5 --- /dev/null +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/MaxLengthMatch.java @@ -0,0 +1,29 @@ + +package eu.dnetlib.pace.tree; + +import java.util.Map; + +import eu.dnetlib.pace.config.Config; +import eu.dnetlib.pace.tree.support.AbstractStringComparator; +import eu.dnetlib.pace.tree.support.ComparatorClass; + +@ComparatorClass("maxLengthMatch") +public class MaxLengthMatch extends AbstractStringComparator { + + private final int limit; + + public MaxLengthMatch(Map params) { + super(params); + + limit = Integer.parseInt(params.getOrDefault("limit", "200")); + } + + @Override + public double compare(String a, String b, final Config conf) { + return a.length() < limit && b.length() < limit ? 1.0 : -1.0; + } + + protected String toString(final Object object) { + return toFirstString(object); + } +} diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/support/AbstractComparator.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/support/AbstractComparator.java index 8a957c5e3..cde73fd2b 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/support/AbstractComparator.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/support/AbstractComparator.java @@ -127,4 +127,14 @@ public abstract class AbstractComparator extends AbstractPaceFunctions implem return this.weight; } + public Double getDoubleParam(String name) { + String svalue = params.get(name); + + try { + return Double.parseDouble(svalue); + } catch (Throwable t) { + } + + return null; + } } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java index c2b0ddda7..177ad73df 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java @@ -67,8 +67,10 @@ public class BlockProcessor { private void processRows(final List queue, final Reporter context) { - for (int pivotPos = 0; pivotPos < queue.size(); pivotPos++) { - final Row pivot = queue.get(pivotPos); + IncrementalConnectedComponents icc = new IncrementalConnectedComponents(queue.size()); + + for (int i = 0; i < queue.size(); i++) { + final Row pivot = queue.get(i); final String idPivot = pivot.getString(identifierFieldPos); // identifier final Object fieldsPivot = getJavaValue(pivot, orderFieldPos); @@ -76,9 +78,9 @@ public class BlockProcessor { final WfConfig wf = dedupConf.getWf(); if (fieldPivot != null) { - int i = 0; - for (int windowPos = pivotPos + 1; windowPos < queue.size(); windowPos++) { - final Row curr = queue.get(windowPos); + for (int j = icc.nextUnconnected(i, i + 1); j >= 0 + && j < queue.size(); j = icc.nextUnconnected(i, j + 1)) { + final Row curr = queue.get(j); final String idCurr = curr.getString(identifierFieldPos); // identifier if (mustSkip(idCurr)) { @@ -86,7 +88,7 @@ public class BlockProcessor { break; } - if (++i > wf.getSlidingWindowSize()) { + if (wf.getSlidingWindowSize() > 0 && (j - i) > wf.getSlidingWindowSize()) { break; } @@ -97,7 +99,9 @@ public class BlockProcessor { final TreeProcessor treeProcessor = new TreeProcessor(dedupConf); - emitOutput(treeProcessor.compare(pivot, curr), idPivot, idCurr, context); + if (emitOutput(treeProcessor.compare(pivot, curr), idPivot, idCurr, context)) { + icc.connect(i, j); + } } } } @@ -115,7 +119,8 @@ public class BlockProcessor { return null; } - private void emitOutput(final boolean result, final String idPivot, final String idCurr, final Reporter context) { + private boolean emitOutput(final boolean result, final String idPivot, final String idCurr, + final Reporter context) { if (result) { if (idPivot.compareTo(idCurr) <= 0) { @@ -127,6 +132,8 @@ public class BlockProcessor { } else { context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1); } + + return result; } private boolean mustSkip(final String idPivot) { @@ -142,5 +149,4 @@ public class BlockProcessor { context.emit(type, from, to); } - } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/IncrementalConnectedComponents.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/IncrementalConnectedComponents.java new file mode 100644 index 000000000..ed35239a8 --- /dev/null +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/IncrementalConnectedComponents.java @@ -0,0 +1,50 @@ + +package eu.dnetlib.pace.util; + +import java.util.BitSet; + +public class IncrementalConnectedComponents { + final private int size; + + final private BitSet[] indexes; + + IncrementalConnectedComponents(int size) { + this.size = size; + this.indexes = new BitSet[size]; + } + + public void connect(int i, int j) { + if (indexes[i] == null) { + if (indexes[j] == null) { + indexes[i] = new BitSet(size); + } else { + indexes[i] = indexes[j]; + } + } else { + if (indexes[j] != null && indexes[i] != indexes[j]) { + // merge adjacency lists for i and j + indexes[i].or(indexes[j]); + } + } + + indexes[i].set(i); + indexes[i].set(j); + indexes[j] = indexes[i]; + } + + public int nextUnconnected(int i, int j) { + if (indexes[i] == null) { + return j; + } + int result = indexes[i].nextClearBit(j); + + return (result >= size) ? -1 : result; + } + + public BitSet getConnections(int i) { + if (indexes[i] == null) { + return null; + } + return indexes[i]; + } +} diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/MapDocumentUtil.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/MapDocumentUtil.java index 28244cb3b..7dc340663 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/MapDocumentUtil.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/MapDocumentUtil.java @@ -97,6 +97,8 @@ public class MapDocumentUtil { Object o = json.read(jsonPath); if (o instanceof String) return (String) o; + if (o instanceof Number) + return (String) o.toString(); if (o instanceof JSONArray && ((JSONArray) o).size() > 0) return (String) ((JSONArray) o).get(0); return ""; diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/PaceResolver.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/PaceResolver.java index 252205c79..746892f0c 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/PaceResolver.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/PaceResolver.java @@ -40,7 +40,7 @@ public class PaceResolver implements Serializable { Collectors.toMap(cl -> cl.getAnnotation(ComparatorClass.class).value(), cl -> (Class) cl)); } - public ClusteringFunction getClusteringFunction(String name, Map params) throws PaceException { + public ClusteringFunction getClusteringFunction(String name, Map params) throws PaceException { try { return clusteringFunctions.get(name).getDeclaredConstructor(Map.class).newInstance(params); } catch (InstantiationException | IllegalAccessException | InvocationTargetException diff --git a/dhp-pace-core/src/test/java/eu/dnetlib/pace/clustering/ClusteringFunctionTest.java b/dhp-pace-core/src/test/java/eu/dnetlib/pace/clustering/ClusteringFunctionTest.java index f9a1ea9e2..80e349a3f 100644 --- a/dhp-pace-core/src/test/java/eu/dnetlib/pace/clustering/ClusteringFunctionTest.java +++ b/dhp-pace-core/src/test/java/eu/dnetlib/pace/clustering/ClusteringFunctionTest.java @@ -15,7 +15,7 @@ import eu.dnetlib.pace.config.DedupConfig; public class ClusteringFunctionTest extends AbstractPaceTest { - private static Map params; + private static Map params; private static DedupConfig conf; @BeforeAll @@ -40,10 +40,10 @@ public class ClusteringFunctionTest extends AbstractPaceTest { @Test public void testNgram() { - params.put("ngramLen", 3); - params.put("max", 8); - params.put("maxPerToken", 2); - params.put("minNgramLen", 1); + params.put("ngramLen", "3"); + params.put("max", "8"); + params.put("maxPerToken", "2"); + params.put("minNgramLen", "1"); final ClusteringFunction ngram = new Ngrams(params); @@ -54,8 +54,8 @@ public class ClusteringFunctionTest extends AbstractPaceTest { @Test public void testNgramPairs() { - params.put("ngramLen", 3); - params.put("max", 2); + params.put("ngramLen", "3"); + params.put("max", "2"); final ClusteringFunction np = new NgramPairs(params); @@ -66,8 +66,8 @@ public class ClusteringFunctionTest extends AbstractPaceTest { @Test public void testSortedNgramPairs() { - params.put("ngramLen", 3); - params.put("max", 2); + params.put("ngramLen", "3"); + params.put("max", "2"); final ClusteringFunction np = new SortedNgramPairs(params); @@ -87,9 +87,9 @@ public class ClusteringFunctionTest extends AbstractPaceTest { @Test public void testAcronym() { - params.put("max", 4); - params.put("minLen", 1); - params.put("maxLen", 3); + params.put("max", "4"); + params.put("minLen", "1"); + params.put("maxLen", "3"); final ClusteringFunction acro = new Acronyms(params); @@ -100,8 +100,8 @@ public class ClusteringFunctionTest extends AbstractPaceTest { @Test public void testSuffixPrefix() { - params.put("len", 3); - params.put("max", 4); + params.put("len", "3"); + params.put("max", "4"); final ClusteringFunction sp = new SuffixPrefix(params); @@ -109,8 +109,8 @@ public class ClusteringFunctionTest extends AbstractPaceTest { System.out.println(s); System.out.println(sp.apply(conf, Lists.newArrayList(s))); - params.put("len", 3); - params.put("max", 1); + params.put("len", "3"); + params.put("max", "1"); System.out.println(sp.apply(conf, Lists.newArrayList("Framework for general-purpose deduplication"))); } @@ -118,8 +118,8 @@ public class ClusteringFunctionTest extends AbstractPaceTest { @Test public void testWordsSuffixPrefix() { - params.put("len", 3); - params.put("max", 4); + params.put("len", "3"); + params.put("max", "4"); final ClusteringFunction sp = new WordsSuffixPrefix(params); @@ -130,7 +130,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest { @Test public void testWordsStatsSuffixPrefix() { - params.put("mod", 10); + params.put("mod", "10"); final ClusteringFunction sp = new WordsStatsSuffixPrefixChain(params); @@ -167,7 +167,7 @@ public class ClusteringFunctionTest extends AbstractPaceTest { @Test public void testFieldValue() { - params.put("randomLength", 5); + params.put("randomLength", "5"); final ClusteringFunction sp = new SpaceTrimmingFieldValue(params); diff --git a/dhp-pace-core/src/test/java/eu/dnetlib/pace/util/IncrementalConnectedComponentsTest.java b/dhp-pace-core/src/test/java/eu/dnetlib/pace/util/IncrementalConnectedComponentsTest.java new file mode 100644 index 000000000..b0f105d7c --- /dev/null +++ b/dhp-pace-core/src/test/java/eu/dnetlib/pace/util/IncrementalConnectedComponentsTest.java @@ -0,0 +1,40 @@ + +package eu.dnetlib.pace.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.junit.jupiter.api.Test; + +public class IncrementalConnectedComponentsTest { + + @Test + public void transitiveClosureTest() { + IncrementalConnectedComponents icc = new IncrementalConnectedComponents(10); + + icc.connect(0, 1); + icc.connect(0, 2); + icc.connect(0, 3); + + icc.connect(1, 2); + icc.connect(1, 4); + icc.connect(1, 5); + + icc.connect(6, 7); + icc.connect(6, 9); + + assertEquals(icc.getConnections(0).toString(), "{0, 1, 2, 3, 4, 5}"); + assertEquals(icc.getConnections(1).toString(), "{0, 1, 2, 3, 4, 5}"); + assertEquals(icc.getConnections(2).toString(), "{0, 1, 2, 3, 4, 5}"); + assertEquals(icc.getConnections(3).toString(), "{0, 1, 2, 3, 4, 5}"); + assertEquals(icc.getConnections(4).toString(), "{0, 1, 2, 3, 4, 5}"); + assertEquals(icc.getConnections(5).toString(), "{0, 1, 2, 3, 4, 5}"); + + assertEquals(icc.getConnections(6).toString(), "{6, 7, 9}"); + assertEquals(icc.getConnections(7).toString(), "{6, 7, 9}"); + assertEquals(icc.getConnections(9).toString(), "{6, 7, 9}"); + + assertNull(icc.getConnections(8)); + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 68af3d699..0af7bb6d0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -101,6 +101,10 @@ abstract class AbstractSparkAction implements Serializable { return SparkSession.builder().config(conf).getOrCreate(); } + protected static SparkSession getSparkWithHiveSession(SparkConf conf) { + return SparkSession.builder().enableHiveSupport().config(conf).getOrCreate(); + } + protected static void save(Dataset dataset, String outPath, SaveMode mode) { dataset.write().option("compression", "gzip").mode(mode).json(outPath); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 60669106a..d9fb24078 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -2,20 +2,19 @@ package eu.dnetlib.dhp.oa.dedup; import java.lang.reflect.InvocationTargetException; -import java.util.*; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; -import org.apache.commons.beanutils.BeanUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; @@ -41,88 +40,91 @@ public class DedupRecordFactory { long ts = System.currentTimeMillis(); // - Dataset> entities = spark + Dataset entities = spark .read() - .textFile(entitiesInputPath) + .schema(Encoders.bean(clazz).schema()) + .json(entitiesInputPath) + .as(Encoders.bean(clazz)) .map( - (MapFunction>) it -> { - T entity = OBJECT_MAPPER.readValue(it, clazz); + (MapFunction>) entity -> { return new Tuple2<>(entity.getId(), entity); }, - Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); + Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) + .selectExpr("_1 AS id", "_2 AS kryoObject"); // : source is the dedup_id, target is the id of the mergedIn - Dataset> mergeRels = spark + Dataset mergeRels = spark .read() .load(mergeRelsInputPath) - .as(Encoders.bean(Relation.class)) .where("relClass == 'merges'") - .map( - (MapFunction>) r -> new Tuple2<>(r.getSource(), r.getTarget()), - Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + .selectExpr("source as dedupId", "target as id"); return mergeRels - .joinWith(entities, mergeRels.col("_2").equalTo(entities.col("_1")), "inner") + .join(entities, "id") + .select("dedupId", "kryoObject") + .as(Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) + .groupByKey((MapFunction, String>) Tuple2::_1, Encoders.STRING()) + .reduceGroups( + (ReduceFunction>) (t1, t2) -> new Tuple2<>(t1._1(), + reduceEntity(t1._1(), t1._2(), t2._2(), clazz))) .map( - (MapFunction, Tuple2>, Tuple2>) value -> new Tuple2<>( - value._1()._1(), value._2()._2()), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) - .groupByKey( - (MapFunction, String>) Tuple2::_1, Encoders.STRING()) - .mapGroups( - (MapGroupsFunction, T>) (key, - values) -> entityMerger(key, values, ts, dataInfo, clazz), + (MapFunction>, T>) t -> { + T res = t._2()._2(); + res.setDataInfo(dataInfo); + res.setLastupdatetimestamp(ts); + return res; + }, Encoders.bean(clazz)); } + public static T reduceEntity( + String id, T entity, T duplicate, Class clazz) { + + int compare = new IdentifierComparator() + .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); + + if (compare > 0) { + T swap = duplicate; + duplicate = entity; + entity = swap; + } + + entity.mergeFrom(duplicate); + entity.setId(id); + + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result re = (Result) entity; + Result rd = (Result) duplicate; + + List> authors = new ArrayList<>(); + if (re.getAuthor() != null) { + authors.add(re.getAuthor()); + } + if (rd.getAuthor() != null) { + authors.add(rd.getAuthor()); + } + + re.setAuthor(AuthorMerger.merge(authors)); + } + + return entity; + } + public static T entityMerger( String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) throws IllegalAccessException, InstantiationException, InvocationTargetException { + T base = entities.next()._2(); - final Comparator> idComparator = new IdentifierComparator<>(); - - final LinkedList entityList = Lists - .newArrayList(entities) - .stream() - .map(t -> Identifier.newInstance(t._2())) - .sorted(idComparator) - .map(Identifier::getEntity) - .collect(Collectors.toCollection(LinkedList::new)); - - final T entity = clazz.newInstance(); - final T first = entityList.removeFirst(); - - BeanUtils.copyProperties(entity, first); - - final List> authors = Lists.newArrayList(); - - entityList - .forEach( - duplicate -> { - entity.mergeFrom(duplicate); - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result r1 = (Result) duplicate; - Optional - .ofNullable(r1.getAuthor()) - .ifPresent(a -> authors.add(a)); - } - }); - - // set authors and date - if (ModelSupport.isSubClass(entity, Result.class)) { - Optional - .ofNullable(((Result) entity).getAuthor()) - .ifPresent(a -> authors.add(a)); - - ((Result) entity).setAuthor(AuthorMerger.merge(authors)); + while (entities.hasNext()) { + T duplicate = entities.next()._2(); + if (duplicate != null) + base = reduceEntity(id, base, duplicate, clazz); } - entity.setId(id); + base.setDataInfo(dataInfo); + base.setLastupdatetimestamp(ts); - entity.setLastupdatetimestamp(ts); - entity.setDataInfo(dataInfo); - - return entity; + return base; } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java index 7e0d66062..37e1bfd15 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.oa.dedup; +import static eu.dnetlib.dhp.utils.DHPUtils.md5; import static org.apache.commons.lang3.StringUtils.substringAfter; import static org.apache.commons.lang3.StringUtils.substringBefore; @@ -14,33 +15,36 @@ import eu.dnetlib.dhp.schema.oaf.utils.PidType; public class IdGenerator implements Serializable { // pick the best pid from the list (consider date and pidtype) - public static String generate(List> pids, String defaultID) { + public static String generate(List pids, String defaultID) { if (pids == null || pids.isEmpty()) return defaultID; return generateId(pids); } - private static String generateId(List> pids) { - Identifier bp = pids + private static String generateId(List pids) { + Identifier bp = pids .stream() .min(Identifier::compareTo) .orElseThrow(() -> new IllegalStateException("unable to generate id")); - String prefix = substringBefore(bp.getOriginalID(), "|"); - String ns = substringBefore(substringAfter(bp.getOriginalID(), "|"), "::"); - String suffix = substringAfter(bp.getOriginalID(), "::"); + return generate(bp.getOriginalID()); + } + + public static String generate(String originalId) { + String prefix = substringBefore(originalId, "|"); + String ns = substringBefore(substringAfter(originalId, "|"), "::"); + String suffix = substringAfter(originalId, "::"); final String pidType = substringBefore(ns, "_"); if (PidType.isValid(pidType)) { return prefix + "|" + dedupify(ns) + "::" + suffix; } else { - return prefix + "|dedup_wf_001::" + suffix; + return prefix + "|dedup_wf_001::" + md5(originalId); // hash the whole originalId to avoid collisions } } private static String dedupify(String ns) { - StringBuilder prefix; if (PidType.valueOf(substringBefore(ns, "_")) == PidType.openorgs) { prefix = new StringBuilder(substringBefore(ns, "_")); @@ -53,5 +57,4 @@ public class IdGenerator implements Serializable { } return prefix.substring(0, 12); } - } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index babbaaabd..5bb132b89 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -1,51 +1,47 @@ package eu.dnetlib.dhp.oa.dedup; -import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; -import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; - -import java.io.IOException; -import java.util.*; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.graphx.Edge; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.dom4j.DocumentException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.xml.sax.SAXException; - -import com.google.common.collect.Lists; import com.google.common.hash.Hashing; - +import com.kwartile.lib.cc.ConnectedComponent; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; -import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; -import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.util.MapDocumentUtil; -import scala.Tuple2; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.*; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.expressions.Window; +import org.apache.spark.sql.expressions.WindowSpec; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; +import scala.Tuple3; +import scala.collection.JavaConversions; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; +import static org.apache.spark.sql.functions.*; public class SparkCreateMergeRels extends AbstractSparkAction { @@ -68,10 +64,12 @@ public class SparkCreateMergeRels extends AbstractSparkAction { log.info("isLookupUrl {}", isLookUpUrl); SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris")); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - new SparkCreateMergeRels(parser, getSparkSession(conf)) + new SparkCreateMergeRels(parser, getSparkWithHiveSession(conf)) .run(ISLookupClientFactory.getLookUpService(isLookUpUrl)); } @@ -87,14 +85,15 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .ofNullable(parser.get("cutConnectedComponent")) .map(Integer::valueOf) .orElse(0); + + final String pivotHistoryDatabase = parser.get("pivotHistoryDatabase"); + log.info("connected component cut: '{}'", cut); log.info("graphBasePath: '{}'", graphBasePath); log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { final String subEntity = dedupConf.getWf().getSubEntityValue(); final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); @@ -106,113 +105,170 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); - // - JavaPairRDD vertexes = createVertexes(sc, graphBasePath, subEntity, dedupConf); - - final RDD> edgeRdd = spark + final Dataset simRels = spark .read() .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) - .as(Encoders.bean(Relation.class)) - .javaRDD() - .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) - .rdd(); + .select("source", "target"); - Dataset> rawMergeRels = spark - .createDataset( - GraphProcessor - .findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut) - .toJavaRDD() - .filter(k -> k.getIds().size() > 1) - .flatMap(this::ccToRels) - .rdd(), - Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + UserDefinedFunction hashUDF = functions + .udf( + (String s) -> hash(s), DataTypes.LongType); - Dataset> entities = spark + // + Dataset vertexIdMap = simRels + .selectExpr("source as id") + .union(simRels.selectExpr("target as id")) + .distinct() + .withColumn("vertexId", hashUDF.apply(functions.col("id"))); + + final Dataset edges = spark .read() - .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .map( - (MapFunction>) it -> { - OafEntity entity = OBJECT_MAPPER.readValue(it, clazz); - return new Tuple2<>(entity.getId(), entity); - }, - Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); + .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) + .select("source", "target") + .withColumn("source", hashUDF.apply(functions.col("source"))) + .withColumn("target", hashUDF.apply(functions.col("target"))); - Dataset mergeRels = rawMergeRels - .joinWith(entities, rawMergeRels.col("_2").equalTo(entities.col("_1")), "inner") - // , - .map( - (MapFunction, Tuple2>, Tuple2>) value -> new Tuple2<>( - value._1()._1(), value._2()._2()), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) - // - .groupByKey( - (MapFunction, String>) Tuple2::_1, Encoders.STRING()) - .mapGroups( - (MapGroupsFunction, ConnectedComponent>) this::generateID, - Encoders.bean(ConnectedComponent.class)) - // + Dataset cliques = ConnectedComponent + .runOnPairs(edges, 50, spark); + + Dataset rawMergeRels = cliques + .join(vertexIdMap, JavaConversions.asScalaBuffer(Collections.singletonList("vertexId")), "inner") + .drop("vertexId") + .distinct(); + + Dataset pivotHistory = spark + .createDataset( + Collections.emptyList(), + RowEncoder + .apply(StructType.fromDDL("id STRING, firstUsage STRING, lastUsage STRING, dedupId STRING"))); + + if (StringUtils.isNotBlank(pivotHistoryDatabase)) { + pivotHistory = spark + .read() + .table(pivotHistoryDatabase + "." + subEntity) + .selectExpr("id", "lastUsage", "dedupId"); + } + + String collectedfromExpr = "false AS collectedfrom"; + String dateExpr = "'' AS date"; + + if (Result.class.isAssignableFrom(clazz)) { + if (Publication.class.isAssignableFrom(clazz)) { + collectedfromExpr = "array_contains(collectedfrom.key, '" + ModelConstants.CROSSREF_ID + + "') AS collectedfrom"; + } else if (eu.dnetlib.dhp.schema.oaf.Dataset.class.isAssignableFrom(clazz)) { + collectedfromExpr = "array_contains(collectedfrom.key, '" + ModelConstants.DATACITE_ID + + "') AS collectedfrom"; + } + + dateExpr = "dateofacceptance.value AS date"; + } + + UserDefinedFunction mapPid = udf( + (String s) -> Math.min(PidType.tryValueOf(s).ordinal(), PidType.w3id.ordinal()), DataTypes.IntegerType); + UserDefinedFunction validDate = udf((String date) -> { + if (StringUtils.isNotBlank(date) + && date.matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date)) { + return date; + } + return LocalDate.now().plusWeeks(1).toString(); + }, DataTypes.StringType); + + Dataset pivotingData = spark + .read() + .schema(Encoders.bean(clazz).schema()) + .json(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .selectExpr( + "id", + "regexp_extract(id, '^\\\\d+\\\\|([^_]+).*::', 1) AS pidType", + collectedfromExpr, + dateExpr) + .withColumn("pidType", mapPid.apply(col("pidType"))) // ordinal of pid type + .withColumn("date", validDate.apply(col("date"))); + + UserDefinedFunction generateDedupId = udf((String s) -> IdGenerator.generate(s), DataTypes.StringType); + + // ordering to selected pivot id + WindowSpec w = Window + .partitionBy("groupId") + .orderBy( + col("lastUsage").desc_nulls_last(), + col("pidType").asc_nulls_last(), + col("collectedfrom").desc_nulls_last(), + col("date").asc_nulls_last(), + col("id").asc_nulls_last()); + + Dataset output = rawMergeRels + .join(pivotHistory, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "full") + .join(pivotingData, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") + .withColumn("pivot", functions.first("id").over(w)) + .withColumn("pivotDedupId", functions.first("dedupId").over(w)) + .withColumn("position", functions.row_number().over(w)) + .filter(cut > 0 ? col("position").lt(lit(cut)) : lit(true)) + // .select("id", "groupId", "collectedfrom", "pivot", "dedupId", "pivotDedupId") + // .distinct() .flatMap( - (FlatMapFunction) cc -> ccToMergeRel(cc, dedupConf), - Encoders.bean(Relation.class)); + (FlatMapFunction>) (Row r) -> { + String id = r.getAs("id"); + String pivot = r.getAs("pivot"); + String pivotDedupId = r.getAs("pivotDedupId"); // dedupId associated with the pivot + String dedupId = r.getAs("dedupId"); // dedupId associated with this id if it was a pivot - saveParquet(mergeRels, mergeRelPath, SaveMode.Overwrite); + // filter out id == pivotDedupId + // those are caused by claim expressed on pivotDedupId + // information will be merged after creating deduprecord + if (id.equals(pivotDedupId)) { + return Collections.emptyIterator(); + } + ArrayList> res = new ArrayList<>(); + + // singleton pivots have null groupId as they do not match rawMergeRels + if (r.isNullAt(r.fieldIndex("groupId"))) { + // the record is existing if it matches pivotingData + if (!r.isNullAt(r.fieldIndex("collectedfrom"))) { + // create relation with old dedup id + res.add(new Tuple3<>(id, dedupId, null)); + } + return res.iterator(); + } + + // new pivot, assign pivotDedupId with current IdGenerator + if (StringUtils.isBlank(pivotDedupId)) { + pivotDedupId = IdGenerator.generate(pivot); + } + + // this was a pivot in a preceding graph but it has been merged into a new group with different + // pivot + if (StringUtils.isNotBlank(dedupId) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) { + // materialize the previous dedup record as a merge relation with the new one + res.add(new Tuple3<>(dedupId, pivotDedupId, null)); + } + + // add merge relations + res.add(new Tuple3<>(id, pivotDedupId, pivot)); + + return res.iterator(); + }, Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING())) + .distinct() + .flatMap( + (FlatMapFunction, Relation>) (Tuple3 r) -> { + String id = r._1(); + String dedupId = r._2(); + String pivot = r._3(); + + ArrayList res = new ArrayList<>(); + res.add(rel(pivot, dedupId, id, ModelConstants.MERGES, dedupConf)); + res.add(rel(pivot, id, dedupId, ModelConstants.IS_MERGED_IN, dedupConf)); + + return res.iterator(); + }, Encoders.bean(Relation.class)); + + saveParquet(output, mergeRelPath, SaveMode.Overwrite); } } - private ConnectedComponent generateID(String key, Iterator> values) { - - List> identifiers = Lists - .newArrayList(values) - .stream() - .map(v -> Identifier.newInstance(v._2())) - .collect(Collectors.toList()); - - String rootID = IdGenerator.generate(identifiers, key); - - if (Objects.equals(rootID, key)) - throw new IllegalStateException("generated default ID: " + rootID); - - return new ConnectedComponent(rootID, - identifiers.stream().map(i -> i.getEntity().getId()).collect(Collectors.toSet())); - } - - private JavaPairRDD createVertexes(JavaSparkContext sc, String graphBasePath, String subEntity, - DedupConfig dedupConf) { - - return sc - .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .mapToPair(json -> { - String id = MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), json); - return new Tuple2<>(hash(id), id); - }); - } - - private Iterator> ccToRels(ConnectedComponent cc) { - return cc - .getIds() - .stream() - .map(id -> new Tuple2<>(cc.getCcId(), id)) - .iterator(); - } - - private Iterator ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) { - return cc - .getIds() - .stream() - .flatMap( - id -> { - List tmp = new ArrayList<>(); - - tmp.add(rel(cc.getCcId(), id, ModelConstants.MERGES, dedupConf)); - tmp.add(rel(id, cc.getCcId(), ModelConstants.IS_MERGED_IN, dedupConf)); - - return tmp.stream(); - }) - .iterator(); - } - - private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { + private static Relation rel(String pivot, String source, String target, String relClass, DedupConfig dedupConf) { String entityType = dedupConf.getWf().getEntityType(); @@ -238,6 +294,14 @@ public class SparkCreateMergeRels extends AbstractSparkAction { // TODO calculate the trust value based on the similarity score of the elements in the CC r.setDataInfo(info); + + if (pivot != null) { + KeyValue pivotKV = new KeyValue(); + pivotKV.setKey("pivot"); + pivotKV.setValue(pivot); + + r.setProperties(Arrays.asList(pivotKV)); + } return r; } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java index 65ad0c327..60752a457 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java @@ -91,18 +91,12 @@ public class SparkWhitelistSimRels extends AbstractSparkAction { Dataset entities = spark .read() .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .repartition(numPartitions) - .withColumn("id", functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath())); + .select(functions.get_json_object(new Column("value"), dedupConf.getWf().getIdPath()).as("id")) + .distinct(); - Dataset whiteListRels1 = whiteListRels - .join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "inner") - .select("from", "to"); - - Dataset whiteListRels2 = whiteListRels1 - .join(entities, whiteListRels1.col("to").equalTo(entities.col("id")), "inner") - .select("from", "to"); - - Dataset whiteListSimRels = whiteListRels2 + Dataset whiteListSimRels = whiteListRels + .join(entities, entities.col("id").equalTo(whiteListRels.col("from")), "leftsemi") + .join(entities, functions.col("to").equalTo(entities.col("id")), "leftsemi") .map( (MapFunction) r -> DedupUtility .createSimRel(r.getString(0), r.getString(1), entity), diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java deleted file mode 100644 index 4a39a175d..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ /dev/null @@ -1,100 +0,0 @@ - -package eu.dnetlib.dhp.oa.dedup.graph; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; -import org.codehaus.jackson.annotate.JsonIgnore; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.utils.DHPUtils; -import eu.dnetlib.pace.util.PaceException; - -public class ConnectedComponent implements Serializable { - - private String ccId; - private Set ids; - - private static final String CONNECTED_COMPONENT_ID_PREFIX = "connect_comp"; - - public ConnectedComponent(Set ids, final int cut) { - this.ids = ids; - - this.ccId = createDefaultID(); - - if (cut > 0 && ids.size() > cut) { - this.ids = ids - .stream() - .filter(id -> !ccId.equalsIgnoreCase(id)) - .limit(cut - 1) - .collect(Collectors.toSet()); -// this.ids.add(ccId); ?? - } - } - - public ConnectedComponent(String ccId, Set ids) { - this.ccId = ccId; - this.ids = ids; - } - - public String createDefaultID() { - if (ids.size() > 1) { - final String s = getMin(); - String prefix = s.split("\\|")[0]; - ccId = prefix + "|" + CONNECTED_COMPONENT_ID_PREFIX + "::" + DHPUtils.md5(s); - return ccId; - } else { - return ids.iterator().next(); - } - } - - @JsonIgnore - public String getMin() { - - final StringBuilder min = new StringBuilder(); - - ids - .forEach( - id -> { - if (StringUtils.isBlank(min.toString())) { - min.append(id); - } else { - if (min.toString().compareTo(id) > 0) { - min.setLength(0); - min.append(id); - } - } - }); - return min.toString(); - } - - @Override - public String toString() { - ObjectMapper mapper = new ObjectMapper(); - try { - return mapper.writeValueAsString(this); - } catch (IOException e) { - throw new PaceException("Failed to create Json: ", e); - } - } - - public Set getIds() { - return ids; - } - - public void setIds(Set ids) { - this.ids = ids; - } - - public String getCcId() { - return ccId; - } - - public void setCcId(String ccId) { - this.ccId = ccId; - } -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala deleted file mode 100644 index f4dd85d75..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala +++ /dev/null @@ -1,37 +0,0 @@ -package eu.dnetlib.dhp.oa.dedup.graph - -import org.apache.spark.graphx._ -import org.apache.spark.rdd.RDD - -import scala.collection.JavaConversions; - -object GraphProcessor { - - def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { - val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby - val cc = graph.connectedComponents(maxIterations).vertices - - val joinResult = vertexes.leftOuterJoin(cc).map { - case (id, (openaireId, cc)) => { - if (cc.isEmpty) { - (id, openaireId) - } - else { - (cc.get, openaireId) - } - } - } - val connectedComponents = joinResult.groupByKey() - .map[ConnectedComponent](cc => asConnectedComponent(cc, cut)) - connectedComponents - } - - - - def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = { - val docs = group._2.toSet[String] - val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut); - connectedComponent - } - -} \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java index 0cba4fc3b..e03c3bf95 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java @@ -3,21 +3,21 @@ package eu.dnetlib.dhp.oa.dedup.model; import java.io.Serializable; import java.text.SimpleDateFormat; -import java.util.*; -import java.util.stream.Collectors; +import java.time.LocalDate; +import java.util.Date; +import java.util.List; +import java.util.Objects; import org.apache.commons.lang3.StringUtils; -import com.google.common.collect.Sets; - import eu.dnetlib.dhp.oa.dedup.DatePicker; import eu.dnetlib.dhp.oa.dedup.IdentifierComparator; import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import eu.dnetlib.dhp.schema.oaf.utils.PidComparator; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.PidType; public class Identifier implements Serializable, Comparable> { @@ -50,7 +50,7 @@ public class Identifier implements Serializable, Comparable if (Objects.nonNull(date)) { return date; } else { - String sDate = BASE_DATE; + String sDate = LocalDate.now().plusDays(1).toString(); if (ModelSupport.isSubClass(getEntity(), Result.class)) { Result result = (Result) getEntity(); if (isWellformed(result.getDateofacceptance())) { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json index b1df08535..4f9f4b0b5 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json @@ -28,5 +28,17 @@ "paramLongName": "workingPath", "paramDescription": "path for the working directory", "paramRequired": true + }, + { + "paramName":"h", + "paramLongName":"hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "pivotHistoryDatabase", + "paramDescription": "Pivot history database", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/config-default.xml index 2e0ed9aee..cd29965e3 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/config-default.xml @@ -15,4 +15,8 @@ oozie.action.sharelib.for.spark spark2 + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index ba2270c8a..49a331def 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -188,6 +188,8 @@ --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetId} --cutConnectedComponent${cutConnectedComponent} + --hiveMetastoreUris${hiveMetastoreUris} + --pivotHistoryDatabase${pivotHistoryDatabase} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/scala/com/kwartile/lib/cc/ConnectedComponent.scala b/dhp-workflows/dhp-dedup-openaire/src/main/scala/com/kwartile/lib/cc/ConnectedComponent.scala new file mode 100644 index 000000000..4c3362235 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/scala/com/kwartile/lib/cc/ConnectedComponent.scala @@ -0,0 +1,335 @@ +/** Copyright (c) 2017 Kwartile, Inc., http://www.kwartile.com + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/** Map-reduce implementation of Connected Component + * Given lists of subgraphs, returns all the nodes that are connected. + */ + +package com.kwartile.lib.cc + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.storage.StorageLevel + +import scala.annotation.tailrec +import scala.collection.mutable + +object ConnectedComponent extends Serializable { + + /** Applies Small Star operation on RDD of nodePairs + * + * @param nodePairs on which to apply Small Star operations + * @return new nodePairs after the operation and conncectivy change count + */ + private def smallStar(nodePairs: RDD[(Long, Long)]): (RDD[(Long, Long)], Long) = { + + /** generate RDD of (self, List(neighbors)) where self > neighbors + * E.g.: nodePairs (1, 4), (6, 1), (3, 2), (6, 5) + * will result into (4, List(1)), (6, List(1)), (3, List(2)), (6, List(5)) + */ + val neighbors = nodePairs.map(x => { + val (self, neighbor) = (x._1, x._2) + if (self > neighbor) + (self, neighbor) + else + (neighbor, self) + }) + + /** reduce on self to get list of all its neighbors. + * E.g: (4, List(1)), (6, List(1)), (3, List(2)), (6, List(5)) + * will result into (4, List(1)), (6, List(1, 5)), (3, List(2)) + * Note: + * (1) you may need to tweak number of partitions. + * (2) also, watch out for data skew. In that case, consider using rangePartitioner + */ + val empty = mutable.HashSet[Long]() + val allNeighbors = neighbors.aggregateByKey(empty)( + (lb, v) => lb += v, + (lb1, lb2) => lb1 ++ lb2 + ) + + /** Apply Small Star operation on (self, List(neighbor)) to get newNodePairs and count the change in connectivity + */ + + val newNodePairsWithChangeCount = allNeighbors + .map(x => { + val self = x._1 + val neighbors = x._2.toList + val minNode = argMin(self :: neighbors) + val newNodePairs = (self :: neighbors) + .map(neighbor => { + (neighbor, minNode) + }) + .filter(x => { + val neighbor = x._1 + val minNode = x._2 + (neighbor <= self && neighbor != minNode) || (self == neighbor) + }) + val uniqueNewNodePairs = newNodePairs.toSet.toList + + /** We count the change by taking a diff of the new node pairs with the old node pairs + */ + val connectivityChangeCount = (uniqueNewNodePairs diff neighbors.map((self, _))).length + (uniqueNewNodePairs, connectivityChangeCount) + }) + .persist(StorageLevel.MEMORY_AND_DISK_SER) + + /** Sum all the changeCounts + */ + val totalConnectivityCountChange = newNodePairsWithChangeCount + .mapPartitions(iter => { + val (v, l) = iter.toSeq.unzip + val sum = l.sum + Iterator(sum) + }) + .sum + .toLong + + val newNodePairs = newNodePairsWithChangeCount.map(x => x._1).flatMap(x => x) + newNodePairsWithChangeCount.unpersist(false) + (newNodePairs, totalConnectivityCountChange) + } + + /** Apply Large Star operation on a RDD of nodePairs + * + * @param nodePairs on which to apply Large Star operations + * @return new nodePairs after the operation and conncectivy change count + */ + private def largeStar(nodePairs: RDD[(Long, Long)]): (RDD[(Long, Long)], Long) = { + + /** generate RDD of (self, List(neighbors)) + * E.g.: nodePairs (1, 4), (6, 1), (3, 2), (6, 5) + * will result into (4, List(1)), (1, List(4)), (6, List(1)), (1, List(6)), (3, List(2)), (2, List(3)), (6, List(5)), (5, List(6)) + */ + + val neighbors = nodePairs.flatMap(x => { + val (self, neighbor) = (x._1, x._2) + if (self == neighbor) + List((self, neighbor)) + else + List((self, neighbor), (neighbor, self)) + }) + + /** reduce on self to get list of all its neighbors. + * E.g: (4, List(1)), (1, List(4)), (6, List(1)), (1, List(6)), (3, List(2)), (2, List(3)), (6, List(5)), (5, List(6)) + * will result into (4, List(1)), (1, List(4, 6)), (6, List(1, 5)), (3, List(2)), (2, List(3)), (5, List(6)) + * Note: + * (1) you may need to tweak number of partitions. + * (2) also, watch out for data skew. In that case, consider using rangePartitioner + */ + + val localAdd = (s: mutable.HashSet[Long], v: Long) => s += v + val partitionAdd = (s1: mutable.HashSet[Long], s2: mutable.HashSet[Long]) => s1 ++= s2 + val allNeighbors = + neighbors.aggregateByKey(mutable.HashSet.empty[Long] /*, rangePartitioner*/ )(localAdd, partitionAdd) + + /** Apply Large Star operation on (self, List(neighbor)) to get newNodePairs and count the change in connectivity + */ + + val newNodePairsWithChangeCount = allNeighbors + .map(x => { + val self = x._1 + val neighbors = x._2.toList + val minNode = argMin(self :: neighbors) + val newNodePairs = (self :: neighbors) + .map(neighbor => { + (neighbor, minNode) + }) + .filter(x => { + val neighbor = x._1 + val minNode = x._2 + neighbor > self || neighbor == minNode + }) + + val uniqueNewNodePairs = newNodePairs.toSet.toList + val connectivityChangeCount = (uniqueNewNodePairs diff neighbors.map((self, _))).length + (uniqueNewNodePairs, connectivityChangeCount) + }) + .persist(StorageLevel.MEMORY_AND_DISK_SER) + + val totalConnectivityCountChange = newNodePairsWithChangeCount + .mapPartitions(iter => { + val (v, l) = iter.toSeq.unzip + val sum = l.sum + Iterator(sum) + }) + .sum + .toLong + + /** Sum all the changeCounts + */ + val newNodePairs = newNodePairsWithChangeCount.map(x => x._1).flatMap(x => x) + newNodePairsWithChangeCount.unpersist(false) + (newNodePairs, totalConnectivityCountChange) + } + + private def argMin(nodes: List[Long]): Long = { + nodes.min(Ordering.by((node: Long) => node)) + } + + /** Build nodePairs given a list of nodes. A list of nodes represents a subgraph. + * + * @param nodes that are part of a subgraph + * @return nodePairs for a subgraph + */ + private def buildPairs(nodes: List[Long]): List[(Long, Long)] = { + buildPairs(nodes.head, nodes.tail, null.asInstanceOf[List[(Long, Long)]]) + } + + @tailrec + private def buildPairs(node: Long, neighbors: List[Long], partialPairs: List[(Long, Long)]): List[(Long, Long)] = { + if (neighbors.isEmpty) { + if (partialPairs != null) + List((node, node)) ::: partialPairs + else + List((node, node)) + } else if (neighbors.length == 1) { + val neighbor = neighbors(0) + if (node > neighbor) + if (partialPairs != null) List((node, neighbor)) ::: partialPairs else List((node, neighbor)) + else if (partialPairs != null) List((neighbor, node)) ::: partialPairs + else List((neighbor, node)) + } else { + val newPartialPairs = neighbors + .map(neighbor => { + if (node > neighbor) + List((node, neighbor)) + else + List((neighbor, node)) + }) + .flatMap(x => x) + + if (partialPairs != null) + buildPairs(neighbors.head, neighbors.tail, newPartialPairs ::: partialPairs) + else + buildPairs(neighbors.head, neighbors.tail, newPartialPairs) + } + } + + /** Implements alternatingAlgo. Converges when the changeCount is either 0 or does not change from the previous iteration + * + * @param nodePairs for a graph + * @param largeStarConnectivityChangeCount change count that resulted from the previous iteration + * @param smallStarConnectivityChangeCount change count that resulted from the previous iteration + * @param didConverge flag to indicate the alorigth converged + * @param currIterationCount counter to capture number of iterations + * @param maxIterationCount maximum number iterations to try before giving up + * @return RDD of nodePairs + */ + + @tailrec + private def alternatingAlgo( + nodePairs: RDD[(Long, Long)], + largeStarConnectivityChangeCount: Long, + smallStarConnectivityChangeCount: Long, + didConverge: Boolean, + currIterationCount: Int, + maxIterationCount: Int + ): (RDD[(Long, Long)], Boolean, Long) = { + + val iterationCount = currIterationCount + 1 + if (didConverge) + (nodePairs, true, currIterationCount) + else if (currIterationCount >= maxIterationCount) { + (nodePairs, false, currIterationCount) + } else { + + val (nodePairsLargeStar, currLargeStarConnectivityChangeCount) = largeStar(nodePairs) + val (nodePairsSmallStar, currSmallStarConnectivityChangeCount) = smallStar(nodePairsLargeStar) + + if ( + (currLargeStarConnectivityChangeCount == largeStarConnectivityChangeCount && + currSmallStarConnectivityChangeCount == smallStarConnectivityChangeCount) || + (currSmallStarConnectivityChangeCount == 0 && currLargeStarConnectivityChangeCount == 0) + ) { + alternatingAlgo( + nodePairsSmallStar, + currLargeStarConnectivityChangeCount, + currSmallStarConnectivityChangeCount, + true, + iterationCount, + maxIterationCount + ) + } else { + alternatingAlgo( + nodePairsSmallStar, + currLargeStarConnectivityChangeCount, + currSmallStarConnectivityChangeCount, + false, + iterationCount, + maxIterationCount + ) + } + } + } + + /** Driver function + * + * @param cliques list of nodes representing subgraphs (or cliques) + * @param maxIterationCount maximum number iterations to try before giving up + * @return Connected Components as nodePairs where second member of the nodePair is the minimum node in the component + */ + def run(cliques: RDD[List[Long]], maxIterationCount: Int): (RDD[(Long, Long)], Boolean, Long) = { + + val nodePairs = cliques + .map(aClique => { + buildPairs(aClique) + }) + .flatMap(x => x) + + val (cc, didConverge, iterCount) = alternatingAlgo(nodePairs, 9999999L, 9999999L, false, 0, maxIterationCount) + + if (didConverge) { + (cc, didConverge, iterCount) + } else { + (null.asInstanceOf[RDD[(Long, Long)]], didConverge, iterCount) + } + } + + def runOnPairs(nodePairs: RDD[(Long, Long)], maxIterationCount: Int): (RDD[(Long, Long)], Boolean, Long) = { + val (cc, didConverge, iterCount) = alternatingAlgo(nodePairs, 9999999L, 9999999L, false, 0, maxIterationCount) + + if (didConverge) { + (cc, didConverge, iterCount) + } else { + (null.asInstanceOf[RDD[(Long, Long)]], didConverge, iterCount) + } + } + + def runOnPairs(nodePairs: Dataset[Row], maxIterationCount: Int)(implicit spark: SparkSession): Dataset[Row] = { + import spark.implicits._ + + val (cc, didConverge, iterCount) = alternatingAlgo( + nodePairs.map(e => (e.getLong(0), e.getLong(1))).rdd, + 9999999L, + 9999999L, + false, + 0, + maxIterationCount + ) + + if (didConverge) { + cc.toDF("vertexId", "groupId") + } else { + null.asInstanceOf[Dataset[Row]] + } + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 6c4935637..bd5a04e62 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -41,9 +41,13 @@ import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.sx.OafUtils; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import scala.Tuple2; @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @@ -97,6 +101,7 @@ public class SparkDedupTest implements Serializable { final SparkConf conf = new SparkConf(); conf.set("spark.sql.shuffle.partitions", "200"); + conf.set("spark.sql.warehouse.dir", testOutputBasePath + "/spark-warehouse"); spark = SparkSession .builder() .appName(SparkDedupTest.class.getSimpleName()) @@ -186,11 +191,11 @@ public class SparkDedupTest implements Serializable { System.out.println("ds_simrel = " + ds_simrel); System.out.println("orp_simrel = " + orp_simrel); - assertEquals(1538, orgs_simrel); - assertEquals(3523, pubs_simrel); - assertEquals(168, sw_simrel); - assertEquals(221, ds_simrel); - assertEquals(3392, orp_simrel); + assertEquals(751, orgs_simrel); + assertEquals(546, pubs_simrel); + assertEquals(113, sw_simrel); + assertEquals(148, ds_simrel); + assertEquals(280, orp_simrel); } @@ -235,10 +240,10 @@ public class SparkDedupTest implements Serializable { .count(); // entities simrels supposed to be equal to the number of previous step (no rels in whitelist) - assertEquals(1538, orgs_simrel); - assertEquals(3523, pubs_simrel); - assertEquals(221, ds_simrel); - assertEquals(3392, orp_simrel); + assertEquals(751, orgs_simrel); + assertEquals(546, pubs_simrel); + assertEquals(148, ds_simrel); + assertEquals(280, orp_simrel); // System.out.println("orgs_simrel = " + orgs_simrel); // System.out.println("pubs_simrel = " + pubs_simrel); // System.out.println("ds_simrel = " + ds_simrel); @@ -268,7 +273,7 @@ public class SparkDedupTest implements Serializable { && rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1])) .count() > 0); - assertEquals(170, sw_simrel.count()); + assertEquals(115, sw_simrel.count()); // System.out.println("sw_simrel = " + sw_simrel.count()); } @@ -292,7 +297,9 @@ public class SparkDedupTest implements Serializable { "-w", testOutputBasePath, "-cc", - "3" + "3", + "-h", + "" }); new SparkCreateMergeRels(parser, spark).run(isLookUpService); @@ -365,6 +372,113 @@ public class SparkDedupTest implements Serializable { .deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")); } + @Test + @Order(3) + void createMergeRelsWithPivotHistoryTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")); + + spark.sql("CREATE DATABASE IF NOT EXISTS pivot_history_test"); + ModelSupport.oafTypes.keySet().forEach(entityType -> { + try { + spark + .read() + .json( + Paths + .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/pivot_history").toURI()) + .toFile() + .getAbsolutePath()) + .write() + .mode("overwrite") + .saveAsTable("pivot_history_test." + entityType); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + }); + + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath, + "-h", + "", + "-pivotHistoryDatabase", + "pivot_history_test" + + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .count(); + final Dataset pubs = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)); + long sw_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") + .count(); + long ds_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") + .count(); + + long orp_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") + .count(); + + final List merges = pubs + .filter("source == '50|arXiv_dedup_::c93aeb433eb90ed7a86e29be00791b7c'") + .collectAsList(); + assertEquals(3, merges.size()); + Set dups = Sets + .newHashSet( + "50|doi_________::3b1d0d8e8f930826665df9d6b82fbb73", + "50|doi_________::d5021b53204e4fdeab6ff5d5bc468032", + "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c"); + merges.forEach(r -> { + assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); + assertEquals(ModelConstants.DEDUP, r.getSubRelType()); + assertEquals(ModelConstants.MERGES, r.getRelClass()); + assertTrue(dups.contains(r.getTarget())); + }); + + final List mergedIn = pubs + .filter("target == '50|arXiv_dedup_::c93aeb433eb90ed7a86e29be00791b7c'") + .collectAsList(); + assertEquals(3, mergedIn.size()); + mergedIn.forEach(r -> { + assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); + assertEquals(ModelConstants.DEDUP, r.getSubRelType()); + assertEquals(ModelConstants.IS_MERGED_IN, r.getRelClass()); + assertTrue(dups.contains(r.getSource())); + }); + + assertEquals(1268, orgs_mergerel); + assertEquals(1112, pubs.count()); + assertEquals(292, sw_mergerel); + assertEquals(476, ds_mergerel); + assertEquals(742, orp_mergerel); +// System.out.println("orgs_mergerel = " + orgs_mergerel); +// System.out.println("pubs_mergerel = " + pubs_mergerel); +// System.out.println("sw_mergerel = " + sw_mergerel); +// System.out.println("ds_mergerel = " + ds_mergerel); +// System.out.println("orp_mergerel = " + orp_mergerel); + + } + @Test @Order(4) void createMergeRelsTest() throws Exception { @@ -382,7 +496,9 @@ public class SparkDedupTest implements Serializable { "-la", "lookupurl", "-w", - testOutputBasePath + testOutputBasePath, + "-h", + "" }); new SparkCreateMergeRels(parser, spark).run(isLookUpService); @@ -437,10 +553,10 @@ public class SparkDedupTest implements Serializable { }); assertEquals(1268, orgs_mergerel); - assertEquals(1450, pubs.count()); - assertEquals(286, sw_mergerel); - assertEquals(472, ds_mergerel); - assertEquals(738, orp_mergerel); + assertEquals(1112, pubs.count()); + assertEquals(292, sw_mergerel); + assertEquals(476, ds_mergerel); + assertEquals(742, orp_mergerel); // System.out.println("orgs_mergerel = " + orgs_mergerel); // System.out.println("pubs_mergerel = " + pubs_mergerel); // System.out.println("sw_mergerel = " + sw_mergerel); @@ -492,10 +608,10 @@ public class SparkDedupTest implements Serializable { .count(); assertEquals(86, orgs_deduprecord); - assertEquals(68, pubs.count()); - assertEquals(49, sw_deduprecord); + assertEquals(91, pubs.count()); + assertEquals(47, sw_deduprecord); assertEquals(97, ds_deduprecord); - assertEquals(92, orp_deduprecord); + assertEquals(93, orp_deduprecord); verifyRoot_1(mapper, pubs); @@ -629,13 +745,13 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - assertEquals(902, publications); + assertEquals(925, publications); assertEquals(839, organizations); assertEquals(100, projects); assertEquals(100, datasource); - assertEquals(198, softwares); + assertEquals(196, softwares); assertEquals(389, dataset); - assertEquals(520, otherresearchproduct); + assertEquals(521, otherresearchproduct); // System.out.println("publications = " + publications); // System.out.println("organizations = " + organizations); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json index fa889d63b..ff6670f1e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json @@ -101,7 +101,8 @@ "type" : "String", "path" : "$.title[?(@.qualifier.classid == 'main title')].value", "length" : 250, - "size" : 5 + "size" : 5, + "clean": "title" }, { "name" : "authors", diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json index b45b6ae83..a4a3761a3 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json @@ -101,7 +101,8 @@ "type" : "String", "path" : "$.title[?(@.qualifier.classid == 'main title')].value", "length" : 250, - "size" : 5 + "size" : 5, + "clean": "title" }, { "name" : "authors", diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json index 15ebc7a6a..c3a769874 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json @@ -29,9 +29,8 @@ }, "pace": { "clustering" : [ - { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, - { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }, - { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } + { "name" : "numAuthorsTitleSuffixPrefixChain", "fields" : [ "num_authors", "title" ], "params" : { "mod" : "10" } }, + { "name" : "jsonlistclustering", "fields" : [ "pid" ], "params" : { "jpath_value": "$.value", "jpath_classid": "$.qualifier.classid"} } ], "decisionTree": { "start": { @@ -79,13 +78,37 @@ "ignoreUndefined": "false" }, "layer3": { + "fields": [ + { + "field": "authors", + "comparator": "authorsMatch", + "weight": 1.0, + "countIfUndefined": "false", + "params": { + "surname_th": 0.75, + "fullname_th": 0.75, + "threshold": 0.6, + "mode": "full" + } + } + ], + "threshold": 0.6, + "aggregation": "MAX", + "positive": "layer4", + "negative": "NO_MATCH", + "undefined": "MATCH", + "ignoreUndefined": "true" + }, + "layer4": { "fields": [ { "field": "title", "comparator": "levensteinTitle", "weight": 1.0, "countIfUndefined": "true", - "params": {} + "params": { + "threshold": "0.99" + } } ], "threshold": 0.99, @@ -97,23 +120,25 @@ } }, "model": [ - { - "name": "doi", - "type": "String", - "path": "$.pid[?(@.qualifier.classid == 'doi')].value" - }, { "name": "pid", "type": "JSON", "path": "$.pid", "overrideMatch": "true" }, + { + "name": "alternateid", + "type": "JSON", + "path": "$.instance[*].alternateIdentifier[*]", + "overrideMatch": "true" + }, { "name": "title", "type": "String", "path": "$.title[?(@.qualifier.classid == 'main title')].value", "length": 250, - "size": 5 + "size": 5, + "clean": "title" }, { "name": "authors", @@ -122,9 +147,9 @@ "size": 200 }, { - "name": "resulttype", + "name": "num_authors", "type": "String", - "path": "$.resulttype.classid" + "path": "$.author.length()" } ], "blacklists": { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json index f53ff385f..3c6c8aa5f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json @@ -75,7 +75,8 @@ "type" : "String", "path" : "$.title[?(@.qualifier.classid == 'main title')].value", "length" : 250, - "size" : 5 + "size" : 5, + "clean": "title" }, { "name" : "url", diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/pivot_history/pivot_history.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/pivot_history/pivot_history.json new file mode 100644 index 000000000..8af1a6d06 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/pivot_history/pivot_history.json @@ -0,0 +1 @@ +{"id": "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c", "firstUsage": "2022-01-01", "lastUsage": "2022-01-01", "dedupId": "50|arXiv_dedup_::c93aeb433eb90ed7a86e29be00791b7c" } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 3fd351c1d..6ef320253 100644 --- a/pom.xml +++ b/pom.xml @@ -931,5 +931,25 @@ --> + + + + arm-silicon-mac + + + aarch64 + mac + + + + + + org.xerial.snappy + snappy-java + 1.1.8.4 + + + + \ No newline at end of file From 1287315ffb546397bcbcac588fd5b80a62cab665 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 11 Dec 2023 21:26:05 +0100 Subject: [PATCH 2/5] Do no longer use dedupId information from pivotHistory Database --- .../dhp/oa/dedup/SparkCreateMergeRels.java | 85 ++++++++++--------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 5bb132b89..46c29494e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -1,24 +1,23 @@ package eu.dnetlib.dhp.oa.dedup; -import com.google.common.hash.Hashing; -import com.kwartile.lib.cc.ConnectedComponent; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.PidType; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; +import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; +import static org.apache.spark.sql.functions.*; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.expressions.UserDefinedFunction; import org.apache.spark.sql.expressions.Window; @@ -29,20 +28,23 @@ import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; + +import com.google.common.hash.Hashing; +import com.kwartile.lib.cc.ConnectedComponent; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; import scala.Tuple3; import scala.collection.JavaConversions; -import java.io.IOException; -import java.time.LocalDate; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Optional; - -import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; -import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; -import static org.apache.spark.sql.functions.*; - public class SparkCreateMergeRels extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class); @@ -121,6 +123,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .distinct() .withColumn("vertexId", hashUDF.apply(functions.col("id"))); + // transform simrels into pairs of numeric ids final Dataset edges = spark .read() .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) @@ -128,27 +131,34 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .withColumn("source", hashUDF.apply(functions.col("source"))) .withColumn("target", hashUDF.apply(functions.col("target"))); + // resolve connected components + // ("vertexId", "groupId") Dataset cliques = ConnectedComponent .runOnPairs(edges, 50, spark); + // transform "vertexId" back to its original string value + // groupId is kept numeric as its string value is not used + // ("id", "groupId") Dataset rawMergeRels = cliques .join(vertexIdMap, JavaConversions.asScalaBuffer(Collections.singletonList("vertexId")), "inner") .drop("vertexId") .distinct(); + // empty dataframe if historydatabase is not used Dataset pivotHistory = spark .createDataset( Collections.emptyList(), RowEncoder - .apply(StructType.fromDDL("id STRING, firstUsage STRING, lastUsage STRING, dedupId STRING"))); + .apply(StructType.fromDDL("id STRING, lastUsage STRING"))); if (StringUtils.isNotBlank(pivotHistoryDatabase)) { pivotHistory = spark .read() .table(pivotHistoryDatabase + "." + subEntity) - .selectExpr("id", "lastUsage", "dedupId"); + .selectExpr("id", "lastUsage"); } + // depending on resulttype collectefrom and dateofacceptance are evaluated differently String collectedfromExpr = "false AS collectedfrom"; String dateExpr = "'' AS date"; @@ -164,8 +174,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction { dateExpr = "dateofacceptance.value AS date"; } + // cap pidType at w3id as from there on they are considered equal UserDefinedFunction mapPid = udf( (String s) -> Math.min(PidType.tryValueOf(s).ordinal(), PidType.w3id.ordinal()), DataTypes.IntegerType); + UserDefinedFunction validDate = udf((String date) -> { if (StringUtils.isNotBlank(date) && date.matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date)) { @@ -186,8 +198,6 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .withColumn("pidType", mapPid.apply(col("pidType"))) // ordinal of pid type .withColumn("date", validDate.apply(col("date"))); - UserDefinedFunction generateDedupId = udf((String s) -> IdGenerator.generate(s), DataTypes.StringType); - // ordering to selected pivot id WindowSpec w = Window .partitionBy("groupId") @@ -202,17 +212,15 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .join(pivotHistory, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "full") .join(pivotingData, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") .withColumn("pivot", functions.first("id").over(w)) - .withColumn("pivotDedupId", functions.first("dedupId").over(w)) .withColumn("position", functions.row_number().over(w)) - .filter(cut > 0 ? col("position").lt(lit(cut)) : lit(true)) - // .select("id", "groupId", "collectedfrom", "pivot", "dedupId", "pivotDedupId") - // .distinct() + .filter(cut > 0 ? col("position").lt(lit(cut)) : lit(true)) // apply cut after choosing pivot .flatMap( (FlatMapFunction>) (Row r) -> { String id = r.getAs("id"); + String dedupId = IdGenerator.generate(id); + String pivot = r.getAs("pivot"); - String pivotDedupId = r.getAs("pivotDedupId"); // dedupId associated with the pivot - String dedupId = r.getAs("dedupId"); // dedupId associated with this id if it was a pivot + String pivotDedupId = IdGenerator.generate(pivot); // filter out id == pivotDedupId // those are caused by claim expressed on pivotDedupId @@ -233,14 +241,9 @@ public class SparkCreateMergeRels extends AbstractSparkAction { return res.iterator(); } - // new pivot, assign pivotDedupId with current IdGenerator - if (StringUtils.isBlank(pivotDedupId)) { - pivotDedupId = IdGenerator.generate(pivot); - } - - // this was a pivot in a preceding graph but it has been merged into a new group with different + // this was a pivot in a previous graph but it has been merged into a new group with different // pivot - if (StringUtils.isNotBlank(dedupId) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) { + if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) { // materialize the previous dedup record as a merge relation with the new one res.add(new Tuple3<>(dedupId, pivotDedupId, null)); } From 831cc1fddececffc80701931ad9dab4d9926192b Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Thu, 14 Dec 2023 11:51:02 +0100 Subject: [PATCH 3/5] Generate "merged" dedup id relations also for records that are filtered out by the cut parameters --- .../java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 46c29494e..191870d3b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -213,7 +213,6 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .join(pivotingData, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") .withColumn("pivot", functions.first("id").over(w)) .withColumn("position", functions.row_number().over(w)) - .filter(cut > 0 ? col("position").lt(lit(cut)) : lit(true)) // apply cut after choosing pivot .flatMap( (FlatMapFunction>) (Row r) -> { String id = r.getAs("id"); @@ -249,7 +248,9 @@ public class SparkCreateMergeRels extends AbstractSparkAction { } // add merge relations - res.add(new Tuple3<>(id, pivotDedupId, pivot)); + if (cut <=0 || r.getAs("position") <= cut) { + res.add(new Tuple3<>(id, pivotDedupId, pivot)); + } return res.iterator(); }, Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING())) From 10e135db1eb26cf6383d02f2318c8e6701631553 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 22 Dec 2023 09:55:10 +0100 Subject: [PATCH 4/5] Use dedup_wf_002 in place of dedup_wf_001 to make explicit a different algorithm has been used to generate those kind of ids --- .../src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java index 37e1bfd15..1d3d4afdd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java @@ -40,7 +40,7 @@ public class IdGenerator implements Serializable { if (PidType.isValid(pidType)) { return prefix + "|" + dedupify(ns) + "::" + suffix; } else { - return prefix + "|dedup_wf_001::" + md5(originalId); // hash the whole originalId to avoid collisions + return prefix + "|dedup_wf_002::" + md5(originalId); // hash the whole originalId to avoid collisions } } From 3c66e3bd7bd7fbe14f068b5176ae3681e941fda9 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 22 Dec 2023 09:57:30 +0100 Subject: [PATCH 5/5] Create dedup record for "merged" pivots Do not create dedup records for group that have more than 20 different acceptance date --- .../dhp/oa/dedup/DedupRecordFactory.java | 255 +++++++++++------- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 4 +- 2 files changed, 158 insertions(+), 101 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index d9fb24078..4c12d1dc6 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,130 +1,187 @@ package eu.dnetlib.dhp.oa.dedup; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.ReduceFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.sql.*; import scala.Tuple2; +import scala.Tuple3; +import scala.collection.JavaConversions; + +import java.util.*; +import java.util.stream.Stream; public class DedupRecordFactory { + public static final class DedupRecordReduceState { + public final String dedupId; - protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + public final ArrayList aliases = new ArrayList<>(); - private DedupRecordFactory() { - } + public final HashSet acceptanceDate = new HashSet<>(); - public static Dataset createDedupRecord( - final SparkSession spark, - final DataInfo dataInfo, - final String mergeRelsInputPath, - final String entitiesInputPath, - final Class clazz) { + public OafEntity entity; - long ts = System.currentTimeMillis(); + public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { + this.dedupId = dedupId; + this.entity = entity; + if (entity == null) { + aliases.add(id); + } else { + if (Result.class.isAssignableFrom(entity.getClass())) { + Result result = (Result) entity; + if (result.getDateofacceptance() != null && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { + acceptanceDate.add(result.getDateofacceptance().getValue()); + } + } + } + } - // - Dataset entities = spark - .read() - .schema(Encoders.bean(clazz).schema()) - .json(entitiesInputPath) - .as(Encoders.bean(clazz)) - .map( - (MapFunction>) entity -> { - return new Tuple2<>(entity.getId(), entity); - }, - Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) - .selectExpr("_1 AS id", "_2 AS kryoObject"); + public String getDedupId() { + return dedupId; + } + } + private static final int MAX_ACCEPTANCE_DATE = 20; - // : source is the dedup_id, target is the id of the mergedIn - Dataset mergeRels = spark - .read() - .load(mergeRelsInputPath) - .where("relClass == 'merges'") - .selectExpr("source as dedupId", "target as id"); + private DedupRecordFactory() { + } - return mergeRels - .join(entities, "id") - .select("dedupId", "kryoObject") - .as(Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) - .groupByKey((MapFunction, String>) Tuple2::_1, Encoders.STRING()) - .reduceGroups( - (ReduceFunction>) (t1, t2) -> new Tuple2<>(t1._1(), - reduceEntity(t1._1(), t1._2(), t2._2(), clazz))) - .map( - (MapFunction>, T>) t -> { - T res = t._2()._2(); - res.setDataInfo(dataInfo); - res.setLastupdatetimestamp(ts); - return res; - }, - Encoders.bean(clazz)); - } + public static Dataset createDedupRecord( + final SparkSession spark, + final DataInfo dataInfo, + final String mergeRelsInputPath, + final String entitiesInputPath, + final Class clazz) { - public static T reduceEntity( - String id, T entity, T duplicate, Class clazz) { + final long ts = System.currentTimeMillis(); + final Encoder beanEncoder = Encoders.bean(clazz); + final Encoder kryoEncoder = Encoders.kryo(clazz); - int compare = new IdentifierComparator() - .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); + // + Dataset entities = spark + .read() + .schema(Encoders.bean(clazz).schema()) + .json(entitiesInputPath) + .as(beanEncoder) + .map( + (MapFunction>) entity -> { + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), kryoEncoder)) + .selectExpr("_1 AS id", "_2 AS kryoObject"); - if (compare > 0) { - T swap = duplicate; - duplicate = entity; - entity = swap; + // : source is the dedup_id, target is the id of the mergedIn + Dataset mergeRels = spark + .read() + .load(mergeRelsInputPath) + .where("relClass == 'merges'") + .selectExpr("source as dedupId", "target as id"); + + return mergeRels + .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") + .select("dedupId", "id", "kryoObject") + .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) + .map((MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState(t._1(), t._2(), t._3()), Encoders.kryo(DedupRecordReduceState.class)) + .groupByKey((MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) + .reduceGroups( + (ReduceFunction) (t1, t2) -> { + if (t1.entity == null) { + t2.aliases.addAll(t1.aliases); + return t2; + } + if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { + t1.acceptanceDate.addAll(t2.acceptanceDate); + } + t1.aliases.addAll(t2.aliases); + t1.entity = reduceEntity(t1.entity, t2.entity); + + return t1; + } + ) + .flatMap + ((FlatMapFunction, OafEntity>) t -> { + String dedupId = t._1(); + DedupRecordReduceState agg = t._2(); + + if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { + return Collections.emptyIterator(); + } + + return Stream.concat(Stream.of(agg.getDedupId()), agg.aliases.stream()) + .map(id -> { + try { + OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity); + res.setId(id); + res.setDataInfo(dataInfo); + res.setLastupdatetimestamp(ts); + return res; + } catch (Exception e) { + throw new RuntimeException(e); + } + }).iterator(); + }, beanEncoder); + } + + private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { + + if (duplicate == null) { + return entity; } - entity.mergeFrom(duplicate); - entity.setId(id); - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result re = (Result) entity; - Result rd = (Result) duplicate; + int compare = new IdentifierComparator<>() + .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); - List> authors = new ArrayList<>(); - if (re.getAuthor() != null) { - authors.add(re.getAuthor()); - } - if (rd.getAuthor() != null) { - authors.add(rd.getAuthor()); - } + if (compare > 0) { + OafEntity swap = duplicate; + duplicate = entity; + entity = swap; + } - re.setAuthor(AuthorMerger.merge(authors)); - } + entity.mergeFrom(duplicate); - return entity; - } + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result re = (Result) entity; + Result rd = (Result) duplicate; - public static T entityMerger( - String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) - throws IllegalAccessException, InstantiationException, InvocationTargetException { - T base = entities.next()._2(); + List> authors = new ArrayList<>(); + if (re.getAuthor() != null) { + authors.add(re.getAuthor()); + } + if (rd.getAuthor() != null) { + authors.add(rd.getAuthor()); + } - while (entities.hasNext()) { - T duplicate = entities.next()._2(); - if (duplicate != null) - base = reduceEntity(id, base, duplicate, clazz); - } + re.setAuthor(AuthorMerger.merge(authors)); + } - base.setDataInfo(dataInfo); - base.setLastupdatetimestamp(ts); + return entity; + } - return base; - } + public static T entityMerger( + String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { + T base = entities.next()._2(); + + while (entities.hasNext()) { + T duplicate = entities.next()._2(); + if (duplicate != null) + base = (T) reduceEntity(base, duplicate); + } + + base.setId(id); + base.setDataInfo(dataInfo); + base.setLastupdatetimestamp(ts); + + return base; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index bd5a04e62..8b3480e60 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -611,7 +611,7 @@ public class SparkDedupTest implements Serializable { assertEquals(91, pubs.count()); assertEquals(47, sw_deduprecord); assertEquals(97, ds_deduprecord); - assertEquals(93, orp_deduprecord); + assertEquals(92, orp_deduprecord); verifyRoot_1(mapper, pubs); @@ -751,7 +751,7 @@ public class SparkDedupTest implements Serializable { assertEquals(100, datasource); assertEquals(196, softwares); assertEquals(389, dataset); - assertEquals(521, otherresearchproduct); + assertEquals(520, otherresearchproduct); // System.out.println("publications = " + publications); // System.out.println("organizations = " + organizations);