diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index 2a31b264c4..a85afaf258 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -14,7 +14,7 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,10 +135,10 @@ public class GroupEntitiesSparkJob { .applyCoarVocabularies(entity, vocs), OAFENTITY_KRYO_ENC) .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) - .reduceGroups((ReduceFunction) MergeUtils::checkedMerge) + .mapGroups((MapGroupsFunction) MergeUtils::mergeGroup, OAFENTITY_KRYO_ENC) .map( - (MapFunction, Tuple2>) t -> new Tuple2<>( - t._2().getClass().getName(), t._2()), + (MapFunction>) t -> new Tuple2<>( + t.getClass().getName(), t), Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC)); // pivot on "_1" (classname of the entity) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java index 316891faf5..c95c31c512 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java @@ -5,7 +5,11 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.ObjectUtils.firstNonNull; import java.text.ParseException; +import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.*; import java.util.function.BinaryOperator; import java.util.function.Function; @@ -19,6 +23,7 @@ import org.apache.commons.lang3.tuple.Pair; import com.github.sisyphsu.dateparser.DateParserUtils; import com.google.common.base.Joiner; +import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.AccessRightComparator; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; @@ -26,6 +31,37 @@ import eu.dnetlib.dhp.schema.oaf.*; public class MergeUtils { + public static T mergeGroup(String s, Iterator oafEntityIterator) { + TreeSet sortedEntities = new TreeSet<>((o1, o2) -> { + int res = 0; + + if (o1.getDataInfo() != null && o2.getDataInfo() != null) { + res = o1.getDataInfo().getTrust().compareTo(o2.getDataInfo().getTrust()); + } + + if (res == 0) { + if (o1 instanceof Result && o2 instanceof Result) { + return ResultTypeComparator.INSTANCE.compare((Result) o1, (Result) o2); + } + } + + return res; + }); + + while (oafEntityIterator.hasNext()) { + sortedEntities.add(oafEntityIterator.next()); + } + + T merged = sortedEntities.descendingIterator().next(); + + Iterator it = sortedEntities.descendingIterator(); + while (it.hasNext()) { + merged = checkedMerge(merged, it.next()); + } + + return merged; + } + public static T checkedMerge(final T left, final T right) { return (T) merge(left, right, false); } @@ -34,7 +70,7 @@ public class MergeUtils { return merge(left, right, false); } - public static Oaf merge(final Oaf left, final Oaf right, boolean checkDelegatedAuthority) { + static Oaf merge(final Oaf left, final Oaf right, boolean checkDelegatedAuthority) { if (sameClass(left, right, OafEntity.class)) { return mergeEntities(left, right, checkDelegatedAuthority); } else if (sameClass(left, right, Relation.class)) { @@ -190,7 +226,7 @@ public class MergeUtils { .concat(h.stream(), l.stream()) .filter(Objects::nonNull) .distinct() - .collect(Collectors.toMap(keyExtractor, v -> v, merger)) + .collect(Collectors.toMap(keyExtractor, v -> v, merger, LinkedHashMap::new)) .values()); } @@ -226,7 +262,7 @@ public class MergeUtils { } // TODO review - private static List mergeKeyValue(List left, List right, int trust) { + private static List mergeByKey(List left, List right, int trust) { if (trust < 0) { List s = left; left = right; @@ -268,8 +304,7 @@ public class MergeUtils { */ private static T mergeOafFields(T merged, T enrich, int trust) { - // TODO: union of all values, but what does it mean with KeyValue pairs??? - merged.setCollectedfrom(mergeKeyValue(merged.getCollectedfrom(), enrich.getCollectedfrom(), trust)); + merged.setCollectedfrom(mergeByKey(merged.getCollectedfrom(), enrich.getCollectedfrom(), trust)); merged.setDataInfo(chooseDataInfo(merged.getDataInfo(), enrich.getDataInfo(), trust)); merged.setLastupdatetimestamp(max(merged.getLastupdatetimestamp(), enrich.getLastupdatetimestamp())); @@ -289,16 +324,13 @@ public class MergeUtils { merged.setOriginalId(unionDistinctListOfString(merged.getOriginalId(), enrich.getOriginalId())); merged.setPid(unionDistinctLists(merged.getPid(), enrich.getPid(), trust)); - // dateofcollection mettere today quando si fa merge - merged.setDateofcollection(chooseString(merged.getDateofcollection(), enrich.getDateofcollection(), trust)); - // setDateoftransformation mettere vuota in dedup, nota per Claudio + merged.setDateofcollection(LocalDateTime.now().toString()); merged .setDateoftransformation( chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust)); - // TODO: was missing in OafEntity.merge merged.setExtraInfo(unionDistinctLists(merged.getExtraInfo(), enrich.getExtraInfo(), trust)); - // oaiprovenanze da mettere a null quando si genera merge - merged.setOaiprovenance(chooseReference(merged.getOaiprovenance(), enrich.getOaiprovenance(), trust)); + // When merging records OAI provenance becomes null + merged.setOaiprovenance(null); merged.setMeasures(unionDistinctLists(merged.getMeasures(), enrich.getMeasures(), trust)); return merged; @@ -330,7 +362,7 @@ public class MergeUtils { } // TODO keyvalue merge - merge.setProperties(mergeKeyValue(merge.getProperties(), enrich.getProperties(), trust)); + merge.setProperties(mergeByKey(merge.getProperties(), enrich.getProperties(), trust)); return merge; } @@ -345,73 +377,70 @@ public class MergeUtils { merge.setProcessingchargecurrency(enrich.getProcessingchargecurrency()); } - // author = usare la stessa logica che in dedup - merge.setAuthor(chooseReference(merge.getAuthor(), enrich.getAuthor(), trust)); - // il primo che mi arriva secondo l'ordinamento per priorita' - merge.setResulttype(chooseReference(merge.getResulttype(), enrich.getResulttype(), trust)); - // gestito come il resulttype perche' e' un subtype - merge.setMetaResourceType(chooseReference(merge.getMetaResourceType(), enrich.getMetaResourceType(), trust)); - // spostiamo nell'instance e qui prendo il primo che arriva - merge.setLanguage(chooseReference(merge.getLanguage(), enrich.getLanguage(), trust)); - // country lasicamo,o cosi' -> parentesi sul datainfo - merge.setCountry(unionDistinctLists(merge.getCountry(), enrich.getCountry(), trust)); - // ok - merge.setSubject(unionDistinctLists(merge.getSubject(), enrich.getSubject(), trust)); - // union per priority quindi vanno in append - merge.setTitle(unionTitle(merge.getTitle(), enrich.getTitle(), trust)); - // ok - merge.setRelevantdate(unionDistinctLists(merge.getRelevantdate(), enrich.getRelevantdate(), trust)); - // prima trust e poi longest list - merge.setDescription(longestLists(merge.getDescription(), enrich.getDescription())); - // trust piu' alto e poi piu' vecchia - merge.setDateofacceptance(chooseReference(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust)); - // ok, ma publisher va messo ripetibile - merge.setPublisher(chooseReference(merge.getPublisher(), enrich.getPublisher(), trust)); - // ok - merge.setEmbargoenddate(chooseReference(merge.getEmbargoenddate(), enrich.getEmbargoenddate(), trust)); - // ok + merge.setAuthor(mergeAuthors(merge.getAuthor(), enrich.getAuthor(), trust)); + + // keep merge value if present + if (merge.getResulttype() == null) { + merge.setResulttype(enrich.getResulttype()); + merge.setMetaResourceType(enrich.getMetaResourceType()); + } + + // should be an instance attribute, get the first non-null value + merge.setLanguage(coalesce(merge.getLanguage(), enrich.getLanguage())); + + // distinct countries, do not manage datainfo + merge.setCountry(mergeQualifiers(merge.getCountry(), enrich.getCountry(), trust)); + + // distinct subjects + merge.setSubject(mergeStructuredProperties(merge.getSubject(), enrich.getSubject(), trust)); + + // distinct titles + merge.setTitle(mergeStructuredProperties(merge.getTitle(), enrich.getTitle(), trust)); + + merge.setRelevantdate(mergeStructuredProperties(merge.getRelevantdate(), enrich.getRelevantdate(), trust)); + + if (merge.getDescription() == null || merge.getDescription().isEmpty() || trust == 0) { + merge.setDescription(longestLists(merge.getDescription(), enrich.getDescription())); + } + + merge + .setDateofacceptance( + mergeDateOfAcceptance(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust)); + + merge.setPublisher(coalesce(merge.getPublisher(), enrich.getPublisher())); + merge.setEmbargoenddate(coalesce(merge.getEmbargoenddate(), enrich.getEmbargoenddate())); merge.setSource(unionDistinctLists(merge.getSource(), enrich.getSource(), trust)); - // ok merge.setFulltext(unionDistinctLists(merge.getFulltext(), enrich.getFulltext(), trust)); - // ok merge.setFormat(unionDistinctLists(merge.getFormat(), enrich.getFormat(), trust)); - // ok merge.setContributor(unionDistinctLists(merge.getContributor(), enrich.getContributor(), trust)); - // prima prendo l'higher trust, su questo prendo il valore migliore nelle istanze TODO - // trust maggiore ma a parita' di trust il piu' specifico (base del vocabolario) - // vedi note - // cannot use com.google.common.base.Objects.firstNonNull as it throws NPE when both terms are null - merge.setResourcetype(firstNonNull(merge.getResourcetype(), enrich.getResourcetype())); + // this field might contain the original type from the raw metadata, no strategy yet to merge it + merge.setResourcetype(coalesce(merge.getResourcetype(), enrich.getResourcetype())); - // ok merge.setCoverage(unionDistinctLists(merge.getCoverage(), enrich.getCoverage(), trust)); - // most open ok if (enrich.getBestaccessright() != null && new AccessRightComparator<>() .compare(enrich.getBestaccessright(), merge.getBestaccessright()) < 0) { merge.setBestaccessright(enrich.getBestaccessright()); } - // TODO merge of datainfo given same id - merge.setContext(unionDistinctLists(merge.getContext(), enrich.getContext(), trust)); + // merge datainfo for same context id + merge.setContext(mergeLists(merge.getContext(), enrich.getContext(), trust, Context::getId, (r, l) -> { + r.getDataInfo().addAll(l.getDataInfo()); + return r; + })); // ok merge .setExternalReference( - unionDistinctLists(merge.getExternalReference(), enrich.getExternalReference(), trust)); + mergeExternalReference(merge.getExternalReference(), enrich.getExternalReference(), trust)); // instance enrichment or union // review instance equals => add pid to comparision - if (!isAnEnrichment(merge) && !isAnEnrichment(enrich)) - merge - .setInstance( - mergeLists( - merge.getInstance(), enrich.getInstance(), trust, - MergeUtils::instanceKeyExtractor, - MergeUtils::instanceMerger)); - else { + if (!isAnEnrichment(merge) && !isAnEnrichment(enrich)) { + merge.setInstance(mergeInstances(merge.getInstance(), enrich.getInstance(), trust)); + } else { final List enrichmentInstances = isAnEnrichment(merge) ? merge.getInstance() : enrich.getInstance(); final List enrichedInstances = isAnEnrichment(merge) ? enrich.getInstance() @@ -421,16 +450,123 @@ public class MergeUtils { merge.setInstance(enrichInstances(enrichedInstances, enrichmentInstances)); } - merge.setEoscifguidelines(unionDistinctLists(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust)); + merge + .setEoscifguidelines( + mergeEosciifguidelines(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust)); merge.setIsGreen(booleanOR(merge.getIsGreen(), enrich.getIsGreen())); // OK but should be list of values - merge.setOpenAccessColor(chooseReference(merge.getOpenAccessColor(), enrich.getOpenAccessColor(), trust)); + merge.setOpenAccessColor(coalesce(merge.getOpenAccessColor(), enrich.getOpenAccessColor())); merge.setIsInDiamondJournal(booleanOR(merge.getIsInDiamondJournal(), enrich.getIsInDiamondJournal())); merge.setPubliclyFunded(booleanOR(merge.getPubliclyFunded(), enrich.getPubliclyFunded())); return merge; } + private static Field mergeDateOfAcceptance(Field merge, Field enrich, int trust) { + // higher trust then oldest date + if ((merge == null || trust == 0) && enrich != null) { + if (merge == null) { + return enrich; + } else { + try { + LocalDate merge_date = LocalDate.parse(merge.getValue(), DateTimeFormatter.ISO_DATE); + try { + LocalDate enrich_date = LocalDate.parse(enrich.getValue(), DateTimeFormatter.ISO_DATE); + + if (enrich_date.getYear() > 1300 + && (merge_date.getYear() < 1300 || merge_date.isAfter(enrich_date))) { + return enrich; + } + } catch (NullPointerException | DateTimeParseException e) { + return merge; + } + } catch (NullPointerException | DateTimeParseException e) { + return enrich; + } + } + } + + // keep value + return merge; + } + + private static List mergeInstances(List v1, List v2, int trust) { + return mergeLists( + v1, v2, trust, + MergeUtils::instanceKeyExtractor, + MergeUtils::instanceMerger); + } + + private static List mergeEosciifguidelines(List v1, List v2, + int trust) { + return mergeLists( + v1, v2, trust, er -> Joiner + .on("||") + .useForNull("") + .join(er.getCode(), er.getLabel(), er.getUrl(), er.getSemanticRelation()), + (r, l) -> r); + + } + + private static List mergeExternalReference(List v1, + List v2, int trust) { + return mergeLists( + v1, v2, trust, er -> Joiner + .on(',') + .useForNull("") + .join( + er.getSitename(), er.getLabel(), + er.getUrl(), toString(er.getQualifier()), er.getRefidentifier(), + er.getQuery(), toString(er.getDataInfo())), + (r, l) -> r); + } + + private static String toString(DataInfo di) { + return Joiner + .on(',') + .useForNull("") + .join( + di.getInvisible(), di.getInferred(), di.getDeletedbyinference(), di.getTrust(), + di.getInferenceprovenance(), toString(di.getProvenanceaction())); + } + + private static String toString(Qualifier q) { + return Joiner + .on(',') + .useForNull("") + .join(q.getClassid(), q.getClassname(), q.getSchemeid(), q.getSchemename()); + } + + private static String toString(StructuredProperty sp) { + return Joiner + .on(',') + .useForNull("") + .join(toString(sp.getQualifier()), sp.getValue()); + } + + private static List mergeStructuredProperties(List v1, List v2, int trust) { + return mergeLists(v1, v2, trust, MergeUtils::toString, (r, l) -> r); + } + + private static List mergeQualifiers(List v1, List v2, int trust) { + return mergeLists(v1, v2, trust, MergeUtils::toString, (r, l) -> r); + } + + private static T coalesce(T m, T e) { + return m != null ? m : e; + } + + private static List mergeAuthors(List author, List author1, int trust) { + List> authors = new ArrayList<>(); + if (author != null) { + authors.add(author); + } + if (author1 != null) { + authors.add(author1); + } + return AuthorMerger.merge(authors); + } + private static String instanceKeyExtractor(Instance i) { return String .join( @@ -472,9 +608,9 @@ public class MergeUtils { MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1)); i.setFulltext(selectFulltext(i1.getFulltext(), i2.getFulltext())); i.setDateofacceptance(selectOldestDate(i1.getDateofacceptance(), i2.getDateofacceptance())); - i.setLicense(firstNonNull(i1.getLicense(), i2.getLicense())); - i.setProcessingchargeamount(firstNonNull(i1.getProcessingchargeamount(), i2.getProcessingchargeamount())); - i.setProcessingchargecurrency(firstNonNull(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency())); + i.setLicense(coalesce(i1.getLicense(), i2.getLicense())); + i.setProcessingchargeamount(coalesce(i1.getProcessingchargeamount(), i2.getProcessingchargeamount())); + i.setProcessingchargecurrency(coalesce(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency())); i .setMeasures( mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1)); @@ -698,7 +834,7 @@ public class MergeUtils { * @param b the b * @return the list */ - public static List> longestLists(List> a, List> b) { + private static List> longestLists(List> a, List> b) { if (a == null || b == null) return a == null ? b : a; @@ -849,8 +985,8 @@ public class MergeUtils { * single attribute only if in the current instance is missing * The only repeatable field enriched is measures * - * @param merge the current instance - * @param enrichment the enrichment instance + * @param merge the current instance + * @param enrichment the enrichment instance */ private static void applyEnrichment(final Instance merge, final Instance enrichment) { if (merge == null || enrichment == null) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java index 3a6df2924c..7649c3fec1 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java @@ -9,10 +9,18 @@ public class OrganizationPidComparator implements Comparator @Override public int compare(StructuredProperty left, StructuredProperty right) { + if (left == null) { + return right == null ? 0 : -1; + } else if (right == null) { + return 1; + } PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid()); PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid()); + if (lClass.equals(rClass)) + return 0; + if (lClass.equals(PidType.openorgs)) return -1; if (rClass.equals(PidType.openorgs)) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/RefereedComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/RefereedComparator.java index 91c7b68451..8669336c75 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/RefereedComparator.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/RefereedComparator.java @@ -4,7 +4,6 @@ package eu.dnetlib.dhp.schema.oaf.utils; import java.util.Comparator; import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; /** * Comparator for sorting the values from the dnet:review_levels vocabulary, implements the following ordering @@ -15,10 +14,18 @@ public class RefereedComparator implements Comparator { @Override public int compare(Qualifier left, Qualifier right) { + if (left == null || left.getClassid() == null) { + return (right == null || right.getClassid() == null) ? 0 : -1; + } else if (right == null || right.getClassid() == null) { + return 1; + } String lClass = left.getClassid(); String rClass = right.getClassid(); + if (lClass.equals(rClass)) + return 0; + if ("0001".equals(lClass)) return -1; if ("0001".equals(rClass)) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultPidComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultPidComparator.java index e51c4801f5..00ad3bdc0c 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultPidComparator.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultPidComparator.java @@ -13,6 +13,9 @@ public class ResultPidComparator implements Comparator { PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid()); PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid()); + if (lClass.equals(rClass)) + return 0; + if (lClass.equals(PidType.doi)) return -1; if (rClass.equals(PidType.doi)) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultTypeComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultTypeComparator.java index 04d855d1c0..ba55621e55 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultTypeComparator.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultTypeComparator.java @@ -14,6 +14,8 @@ import eu.dnetlib.dhp.schema.oaf.Result; public class ResultTypeComparator implements Comparator { + public static final ResultTypeComparator INSTANCE = new ResultTypeComparator(); + @Override public int compare(Result left, Result right) { @@ -37,28 +39,27 @@ public class ResultTypeComparator implements Comparator { String lClass = left.getResulttype().getClassid(); String rClass = right.getResulttype().getClassid(); - if (lClass.equals(rClass)) - return 0; + if (!lClass.equals(rClass)) { + if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID)) + return 1; - if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID)) - return -1; - if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID)) - return 1; + if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID)) + return 1; - if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID)) - return -1; - if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID)) - return 1; + if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID)) + return 1; - if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID)) - return -1; - if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID)) - return 1; - - if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID)) - return -1; - if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID)) - return 1; + if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID)) + return 1; + } // Else (but unlikely), lexicographical ordering will do. return lClass.compareTo(rClass); diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java index 22351cf8ff..1e897182db 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java @@ -20,7 +20,7 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction { return suffixPrefixChain(s, param("mod")); } - private Collection suffixPrefixChain(String s, int mod) { + static Collection suffixPrefixChain(String s, int mod) { // create the list of words from the string (remove short words) List wordsList = Arrays @@ -38,7 +38,7 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction { } - private Collection doSuffixPrefixChain(List wordsList, String prefix) { + static private Collection doSuffixPrefixChain(List wordsList, String prefix) { Set set = Sets.newLinkedHashSet(); switch (wordsList.size()) { @@ -80,12 +80,16 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction { } - private String suffix(String s, int len) { + private static String suffix(String s, int len) { return s.substring(s.length() - len); } - private String prefix(String s, int len) { + private static String prefix(String s, int len) { return s.substring(0, len); } + static public void main(String[] args) { + String title = "MY LIFE AS A BOSON: THE STORY OF \"THE HIGGS\"".toLowerCase(); + System.out.println(suffixPrefixChain(title, 10)); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 574b13b263..cf8c9ac3bd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -7,6 +7,7 @@ import java.util.stream.Stream; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.FlatMapGroupsFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; @@ -94,44 +95,59 @@ public class DedupRecordFactory { .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") .select("dedupId", "id", "kryoObject") .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) - .map( - (MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState( - t._1(), t._2(), t._3()), - Encoders.kryo(DedupRecordReduceState.class)) - .groupByKey( - (MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) - .reduceGroups( - (ReduceFunction) (t1, t2) -> { - if (t1.entity == null) { - t2.aliases.addAll(t1.aliases); - return t2; + .groupByKey((MapFunction, String>) Tuple3::_1, Encoders.STRING()) + .flatMapGroups( + (FlatMapGroupsFunction, OafEntity>) (dedupId, it) -> { + if (!it.hasNext()) + return Collections.emptyIterator(); + + final ArrayList cliques = new ArrayList<>(); + + final ArrayList aliases = new ArrayList<>(); + + final HashSet acceptanceDate = new HashSet<>(); + + while (it.hasNext()) { + Tuple3 t = it.next(); + OafEntity entity = t._3(); + + if (entity == null) { + aliases.add(t._2()); + } else { + cliques.add(entity); + + if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { + if (Result.class.isAssignableFrom(entity.getClass())) { + Result result = (Result) entity; + if (result.getDateofacceptance() != null + && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { + acceptanceDate.add(result.getDateofacceptance().getValue()); + } + } + } + } + } - if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { - t1.acceptanceDate.addAll(t2.acceptanceDate); + + if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) { + return Collections.emptyIterator(); } - t1.aliases.addAll(t2.aliases); - t1.entity = reduceEntity(t1.entity, t2.entity); - return t1; - }) - .flatMap((FlatMapFunction, OafEntity>) t -> { - String dedupId = t._1(); - DedupRecordReduceState agg = t._2(); + OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator()); + // dedup records do not have date of transformation attribute + mergedEntity.setDateoftransformation(null); - if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { - return Collections.emptyIterator(); - } + return Stream + .concat( + Stream + .of(dedupId) + .map(id -> createDedupOafEntity(id, mergedEntity, dataInfo, ts)), + aliases + .stream() + .map(id -> createMergedDedupAliasOafEntity(id, mergedEntity, dataInfo, ts))) + .iterator(); - return Stream - .concat( - Stream - .of(agg.getDedupId()) - .map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)), - agg.aliases - .stream() - .map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts))) - .iterator(); - }, beanEncoder); + }, beanEncoder); } private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index df87288511..21d06692fe 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -130,10 +130,12 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication { switch (mode) { case claim: save( - inputRdd - .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) - .reduceByKey(MergeUtils::merge) - .map(Tuple2::_2), + inputRdd.keyBy(oaf -> ModelSupport.idFn().apply(oaf)) + .groupByKey() + .map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())), + //.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) + //.reduceByKey(MergeUtils::merge) + //.map(Tuple2::_2), targetPath); break; case graph: diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala index ecf612cc74..d94a23947a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala @@ -8,6 +8,8 @@ import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} +import scala.collection.JavaConverters._ + object SparkCreateInputGraph { def main(args: Array[String]): Unit = { @@ -131,8 +133,9 @@ object SparkCreateInputGraph { val ds: Dataset[T] = spark.read.load(sourcePath).as[T] ds.groupByKey(_.getId) - .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] } - .map(_._2) + .mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] } +// .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] } +// .map(_) .write .mode(SaveMode.Overwrite) .save(targetPath)