Merge pull request 'Refinements to PR #404: refactoring the Oaf records merge utilities into dhp-common' (#422) from revised_merge_logic into beta

Reviewed-on: #422
This commit is contained in:
Claudio Atzori 2024-04-17 11:58:26 +02:00
commit 0db7e4ae9a
10 changed files with 313 additions and 133 deletions

View File

@ -14,7 +14,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -135,10 +135,10 @@ public class GroupEntitiesSparkJob {
.applyCoarVocabularies(entity, vocs), .applyCoarVocabularies(entity, vocs),
OAFENTITY_KRYO_ENC) OAFENTITY_KRYO_ENC)
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING()) .groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<OafEntity>) MergeUtils::checkedMerge) .mapGroups((MapGroupsFunction<String, OafEntity, OafEntity>) MergeUtils::mergeGroup, OAFENTITY_KRYO_ENC)
.map( .map(
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2<>( (MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
t._2().getClass().getName(), t._2()), t.getClass().getName(), t),
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC)); Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
// pivot on "_1" (classname of the entity) // pivot on "_1" (classname of the entity)

View File

@ -5,7 +5,11 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.ObjectUtils.firstNonNull; import static org.apache.commons.lang3.ObjectUtils.firstNonNull;
import java.text.ParseException; import java.text.ParseException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.*; import java.util.*;
import java.util.function.BinaryOperator; import java.util.function.BinaryOperator;
import java.util.function.Function; import java.util.function.Function;
@ -19,6 +23,7 @@ import org.apache.commons.lang3.tuple.Pair;
import com.github.sisyphsu.dateparser.DateParserUtils; import com.github.sisyphsu.dateparser.DateParserUtils;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.AccessRightComparator; import eu.dnetlib.dhp.schema.common.AccessRightComparator;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
@ -26,6 +31,37 @@ import eu.dnetlib.dhp.schema.oaf.*;
public class MergeUtils { public class MergeUtils {
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator) {
TreeSet<T> sortedEntities = new TreeSet<>((o1, o2) -> {
int res = 0;
if (o1.getDataInfo() != null && o2.getDataInfo() != null) {
res = o1.getDataInfo().getTrust().compareTo(o2.getDataInfo().getTrust());
}
if (res == 0) {
if (o1 instanceof Result && o2 instanceof Result) {
return ResultTypeComparator.INSTANCE.compare((Result) o1, (Result) o2);
}
}
return res;
});
while (oafEntityIterator.hasNext()) {
sortedEntities.add(oafEntityIterator.next());
}
T merged = sortedEntities.descendingIterator().next();
Iterator<T> it = sortedEntities.descendingIterator();
while (it.hasNext()) {
merged = checkedMerge(merged, it.next());
}
return merged;
}
public static <T extends Oaf> T checkedMerge(final T left, final T right) { public static <T extends Oaf> T checkedMerge(final T left, final T right) {
return (T) merge(left, right, false); return (T) merge(left, right, false);
} }
@ -34,7 +70,7 @@ public class MergeUtils {
return merge(left, right, false); return merge(left, right, false);
} }
public static Oaf merge(final Oaf left, final Oaf right, boolean checkDelegatedAuthority) { static Oaf merge(final Oaf left, final Oaf right, boolean checkDelegatedAuthority) {
if (sameClass(left, right, OafEntity.class)) { if (sameClass(left, right, OafEntity.class)) {
return mergeEntities(left, right, checkDelegatedAuthority); return mergeEntities(left, right, checkDelegatedAuthority);
} else if (sameClass(left, right, Relation.class)) { } else if (sameClass(left, right, Relation.class)) {
@ -190,7 +226,7 @@ public class MergeUtils {
.concat(h.stream(), l.stream()) .concat(h.stream(), l.stream())
.filter(Objects::nonNull) .filter(Objects::nonNull)
.distinct() .distinct()
.collect(Collectors.toMap(keyExtractor, v -> v, merger)) .collect(Collectors.toMap(keyExtractor, v -> v, merger, LinkedHashMap::new))
.values()); .values());
} }
@ -226,7 +262,7 @@ public class MergeUtils {
} }
// TODO review // TODO review
private static List<KeyValue> mergeKeyValue(List<KeyValue> left, List<KeyValue> right, int trust) { private static List<KeyValue> mergeByKey(List<KeyValue> left, List<KeyValue> right, int trust) {
if (trust < 0) { if (trust < 0) {
List<KeyValue> s = left; List<KeyValue> s = left;
left = right; left = right;
@ -268,8 +304,7 @@ public class MergeUtils {
*/ */
private static <T extends Oaf> T mergeOafFields(T merged, T enrich, int trust) { private static <T extends Oaf> T mergeOafFields(T merged, T enrich, int trust) {
// TODO: union of all values, but what does it mean with KeyValue pairs??? merged.setCollectedfrom(mergeByKey(merged.getCollectedfrom(), enrich.getCollectedfrom(), trust));
merged.setCollectedfrom(mergeKeyValue(merged.getCollectedfrom(), enrich.getCollectedfrom(), trust));
merged.setDataInfo(chooseDataInfo(merged.getDataInfo(), enrich.getDataInfo(), trust)); merged.setDataInfo(chooseDataInfo(merged.getDataInfo(), enrich.getDataInfo(), trust));
merged.setLastupdatetimestamp(max(merged.getLastupdatetimestamp(), enrich.getLastupdatetimestamp())); merged.setLastupdatetimestamp(max(merged.getLastupdatetimestamp(), enrich.getLastupdatetimestamp()));
@ -289,16 +324,13 @@ public class MergeUtils {
merged.setOriginalId(unionDistinctListOfString(merged.getOriginalId(), enrich.getOriginalId())); merged.setOriginalId(unionDistinctListOfString(merged.getOriginalId(), enrich.getOriginalId()));
merged.setPid(unionDistinctLists(merged.getPid(), enrich.getPid(), trust)); merged.setPid(unionDistinctLists(merged.getPid(), enrich.getPid(), trust));
// dateofcollection mettere today quando si fa merge merged.setDateofcollection(LocalDateTime.now().toString());
merged.setDateofcollection(chooseString(merged.getDateofcollection(), enrich.getDateofcollection(), trust));
// setDateoftransformation mettere vuota in dedup, nota per Claudio
merged merged
.setDateoftransformation( .setDateoftransformation(
chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust)); chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust));
// TODO: was missing in OafEntity.merge
merged.setExtraInfo(unionDistinctLists(merged.getExtraInfo(), enrich.getExtraInfo(), trust)); merged.setExtraInfo(unionDistinctLists(merged.getExtraInfo(), enrich.getExtraInfo(), trust));
// oaiprovenanze da mettere a null quando si genera merge // When merging records OAI provenance becomes null
merged.setOaiprovenance(chooseReference(merged.getOaiprovenance(), enrich.getOaiprovenance(), trust)); merged.setOaiprovenance(null);
merged.setMeasures(unionDistinctLists(merged.getMeasures(), enrich.getMeasures(), trust)); merged.setMeasures(unionDistinctLists(merged.getMeasures(), enrich.getMeasures(), trust));
return merged; return merged;
@ -330,7 +362,7 @@ public class MergeUtils {
} }
// TODO keyvalue merge // TODO keyvalue merge
merge.setProperties(mergeKeyValue(merge.getProperties(), enrich.getProperties(), trust)); merge.setProperties(mergeByKey(merge.getProperties(), enrich.getProperties(), trust));
return merge; return merge;
} }
@ -345,73 +377,70 @@ public class MergeUtils {
merge.setProcessingchargecurrency(enrich.getProcessingchargecurrency()); merge.setProcessingchargecurrency(enrich.getProcessingchargecurrency());
} }
// author = usare la stessa logica che in dedup merge.setAuthor(mergeAuthors(merge.getAuthor(), enrich.getAuthor(), trust));
merge.setAuthor(chooseReference(merge.getAuthor(), enrich.getAuthor(), trust));
// il primo che mi arriva secondo l'ordinamento per priorita' // keep merge value if present
merge.setResulttype(chooseReference(merge.getResulttype(), enrich.getResulttype(), trust)); if (merge.getResulttype() == null) {
// gestito come il resulttype perche' e' un subtype merge.setResulttype(enrich.getResulttype());
merge.setMetaResourceType(chooseReference(merge.getMetaResourceType(), enrich.getMetaResourceType(), trust)); merge.setMetaResourceType(enrich.getMetaResourceType());
// spostiamo nell'instance e qui prendo il primo che arriva }
merge.setLanguage(chooseReference(merge.getLanguage(), enrich.getLanguage(), trust));
// country lasicamo,o cosi' -> parentesi sul datainfo // should be an instance attribute, get the first non-null value
merge.setCountry(unionDistinctLists(merge.getCountry(), enrich.getCountry(), trust)); merge.setLanguage(coalesce(merge.getLanguage(), enrich.getLanguage()));
// ok
merge.setSubject(unionDistinctLists(merge.getSubject(), enrich.getSubject(), trust)); // distinct countries, do not manage datainfo
// union per priority quindi vanno in append merge.setCountry(mergeQualifiers(merge.getCountry(), enrich.getCountry(), trust));
merge.setTitle(unionTitle(merge.getTitle(), enrich.getTitle(), trust));
// ok // distinct subjects
merge.setRelevantdate(unionDistinctLists(merge.getRelevantdate(), enrich.getRelevantdate(), trust)); merge.setSubject(mergeStructuredProperties(merge.getSubject(), enrich.getSubject(), trust));
// prima trust e poi longest list
// distinct titles
merge.setTitle(mergeStructuredProperties(merge.getTitle(), enrich.getTitle(), trust));
merge.setRelevantdate(mergeStructuredProperties(merge.getRelevantdate(), enrich.getRelevantdate(), trust));
if (merge.getDescription() == null || merge.getDescription().isEmpty() || trust == 0) {
merge.setDescription(longestLists(merge.getDescription(), enrich.getDescription())); merge.setDescription(longestLists(merge.getDescription(), enrich.getDescription()));
// trust piu' alto e poi piu' vecchia }
merge.setDateofacceptance(chooseReference(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust));
// ok, ma publisher va messo ripetibile merge
merge.setPublisher(chooseReference(merge.getPublisher(), enrich.getPublisher(), trust)); .setDateofacceptance(
// ok mergeDateOfAcceptance(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust));
merge.setEmbargoenddate(chooseReference(merge.getEmbargoenddate(), enrich.getEmbargoenddate(), trust));
// ok merge.setPublisher(coalesce(merge.getPublisher(), enrich.getPublisher()));
merge.setEmbargoenddate(coalesce(merge.getEmbargoenddate(), enrich.getEmbargoenddate()));
merge.setSource(unionDistinctLists(merge.getSource(), enrich.getSource(), trust)); merge.setSource(unionDistinctLists(merge.getSource(), enrich.getSource(), trust));
// ok
merge.setFulltext(unionDistinctLists(merge.getFulltext(), enrich.getFulltext(), trust)); merge.setFulltext(unionDistinctLists(merge.getFulltext(), enrich.getFulltext(), trust));
// ok
merge.setFormat(unionDistinctLists(merge.getFormat(), enrich.getFormat(), trust)); merge.setFormat(unionDistinctLists(merge.getFormat(), enrich.getFormat(), trust));
// ok
merge.setContributor(unionDistinctLists(merge.getContributor(), enrich.getContributor(), trust)); merge.setContributor(unionDistinctLists(merge.getContributor(), enrich.getContributor(), trust));
// prima prendo l'higher trust, su questo prendo il valore migliore nelle istanze TODO // this field might contain the original type from the raw metadata, no strategy yet to merge it
// trust maggiore ma a parita' di trust il piu' specifico (base del vocabolario) merge.setResourcetype(coalesce(merge.getResourcetype(), enrich.getResourcetype()));
// vedi note
// cannot use com.google.common.base.Objects.firstNonNull as it throws NPE when both terms are null
merge.setResourcetype(firstNonNull(merge.getResourcetype(), enrich.getResourcetype()));
// ok
merge.setCoverage(unionDistinctLists(merge.getCoverage(), enrich.getCoverage(), trust)); merge.setCoverage(unionDistinctLists(merge.getCoverage(), enrich.getCoverage(), trust));
// most open ok
if (enrich.getBestaccessright() != null if (enrich.getBestaccessright() != null
&& new AccessRightComparator<>() && new AccessRightComparator<>()
.compare(enrich.getBestaccessright(), merge.getBestaccessright()) < 0) { .compare(enrich.getBestaccessright(), merge.getBestaccessright()) < 0) {
merge.setBestaccessright(enrich.getBestaccessright()); merge.setBestaccessright(enrich.getBestaccessright());
} }
// TODO merge of datainfo given same id // merge datainfo for same context id
merge.setContext(unionDistinctLists(merge.getContext(), enrich.getContext(), trust)); merge.setContext(mergeLists(merge.getContext(), enrich.getContext(), trust, Context::getId, (r, l) -> {
r.getDataInfo().addAll(l.getDataInfo());
return r;
}));
// ok // ok
merge merge
.setExternalReference( .setExternalReference(
unionDistinctLists(merge.getExternalReference(), enrich.getExternalReference(), trust)); mergeExternalReference(merge.getExternalReference(), enrich.getExternalReference(), trust));
// instance enrichment or union // instance enrichment or union
// review instance equals => add pid to comparision // review instance equals => add pid to comparision
if (!isAnEnrichment(merge) && !isAnEnrichment(enrich)) if (!isAnEnrichment(merge) && !isAnEnrichment(enrich)) {
merge merge.setInstance(mergeInstances(merge.getInstance(), enrich.getInstance(), trust));
.setInstance( } else {
mergeLists(
merge.getInstance(), enrich.getInstance(), trust,
MergeUtils::instanceKeyExtractor,
MergeUtils::instanceMerger));
else {
final List<Instance> enrichmentInstances = isAnEnrichment(merge) ? merge.getInstance() final List<Instance> enrichmentInstances = isAnEnrichment(merge) ? merge.getInstance()
: enrich.getInstance(); : enrich.getInstance();
final List<Instance> enrichedInstances = isAnEnrichment(merge) ? enrich.getInstance() final List<Instance> enrichedInstances = isAnEnrichment(merge) ? enrich.getInstance()
@ -421,16 +450,123 @@ public class MergeUtils {
merge.setInstance(enrichInstances(enrichedInstances, enrichmentInstances)); merge.setInstance(enrichInstances(enrichedInstances, enrichmentInstances));
} }
merge.setEoscifguidelines(unionDistinctLists(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust)); merge
.setEoscifguidelines(
mergeEosciifguidelines(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust));
merge.setIsGreen(booleanOR(merge.getIsGreen(), enrich.getIsGreen())); merge.setIsGreen(booleanOR(merge.getIsGreen(), enrich.getIsGreen()));
// OK but should be list of values // OK but should be list of values
merge.setOpenAccessColor(chooseReference(merge.getOpenAccessColor(), enrich.getOpenAccessColor(), trust)); merge.setOpenAccessColor(coalesce(merge.getOpenAccessColor(), enrich.getOpenAccessColor()));
merge.setIsInDiamondJournal(booleanOR(merge.getIsInDiamondJournal(), enrich.getIsInDiamondJournal())); merge.setIsInDiamondJournal(booleanOR(merge.getIsInDiamondJournal(), enrich.getIsInDiamondJournal()));
merge.setPubliclyFunded(booleanOR(merge.getPubliclyFunded(), enrich.getPubliclyFunded())); merge.setPubliclyFunded(booleanOR(merge.getPubliclyFunded(), enrich.getPubliclyFunded()));
return merge; return merge;
} }
private static Field<String> mergeDateOfAcceptance(Field<String> merge, Field<String> enrich, int trust) {
// higher trust then oldest date
if ((merge == null || trust == 0) && enrich != null) {
if (merge == null) {
return enrich;
} else {
try {
LocalDate merge_date = LocalDate.parse(merge.getValue(), DateTimeFormatter.ISO_DATE);
try {
LocalDate enrich_date = LocalDate.parse(enrich.getValue(), DateTimeFormatter.ISO_DATE);
if (enrich_date.getYear() > 1300
&& (merge_date.getYear() < 1300 || merge_date.isAfter(enrich_date))) {
return enrich;
}
} catch (NullPointerException | DateTimeParseException e) {
return merge;
}
} catch (NullPointerException | DateTimeParseException e) {
return enrich;
}
}
}
// keep value
return merge;
}
private static List<Instance> mergeInstances(List<Instance> v1, List<Instance> v2, int trust) {
return mergeLists(
v1, v2, trust,
MergeUtils::instanceKeyExtractor,
MergeUtils::instanceMerger);
}
private static List<EoscIfGuidelines> mergeEosciifguidelines(List<EoscIfGuidelines> v1, List<EoscIfGuidelines> v2,
int trust) {
return mergeLists(
v1, v2, trust, er -> Joiner
.on("||")
.useForNull("")
.join(er.getCode(), er.getLabel(), er.getUrl(), er.getSemanticRelation()),
(r, l) -> r);
}
private static List<ExternalReference> mergeExternalReference(List<ExternalReference> v1,
List<ExternalReference> v2, int trust) {
return mergeLists(
v1, v2, trust, er -> Joiner
.on(',')
.useForNull("")
.join(
er.getSitename(), er.getLabel(),
er.getUrl(), toString(er.getQualifier()), er.getRefidentifier(),
er.getQuery(), toString(er.getDataInfo())),
(r, l) -> r);
}
private static String toString(DataInfo di) {
return Joiner
.on(',')
.useForNull("")
.join(
di.getInvisible(), di.getInferred(), di.getDeletedbyinference(), di.getTrust(),
di.getInferenceprovenance(), toString(di.getProvenanceaction()));
}
private static String toString(Qualifier q) {
return Joiner
.on(',')
.useForNull("")
.join(q.getClassid(), q.getClassname(), q.getSchemeid(), q.getSchemename());
}
private static String toString(StructuredProperty sp) {
return Joiner
.on(',')
.useForNull("")
.join(toString(sp.getQualifier()), sp.getValue());
}
private static <T extends StructuredProperty> List<T> mergeStructuredProperties(List<T> v1, List<T> v2, int trust) {
return mergeLists(v1, v2, trust, MergeUtils::toString, (r, l) -> r);
}
private static <T extends Qualifier> List<T> mergeQualifiers(List<T> v1, List<T> v2, int trust) {
return mergeLists(v1, v2, trust, MergeUtils::toString, (r, l) -> r);
}
private static <T> T coalesce(T m, T e) {
return m != null ? m : e;
}
private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
List<List<Author>> authors = new ArrayList<>();
if (author != null) {
authors.add(author);
}
if (author1 != null) {
authors.add(author1);
}
return AuthorMerger.merge(authors);
}
private static String instanceKeyExtractor(Instance i) { private static String instanceKeyExtractor(Instance i) {
return String return String
.join( .join(
@ -472,9 +608,9 @@ public class MergeUtils {
MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1)); MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1));
i.setFulltext(selectFulltext(i1.getFulltext(), i2.getFulltext())); i.setFulltext(selectFulltext(i1.getFulltext(), i2.getFulltext()));
i.setDateofacceptance(selectOldestDate(i1.getDateofacceptance(), i2.getDateofacceptance())); i.setDateofacceptance(selectOldestDate(i1.getDateofacceptance(), i2.getDateofacceptance()));
i.setLicense(firstNonNull(i1.getLicense(), i2.getLicense())); i.setLicense(coalesce(i1.getLicense(), i2.getLicense()));
i.setProcessingchargeamount(firstNonNull(i1.getProcessingchargeamount(), i2.getProcessingchargeamount())); i.setProcessingchargeamount(coalesce(i1.getProcessingchargeamount(), i2.getProcessingchargeamount()));
i.setProcessingchargecurrency(firstNonNull(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency())); i.setProcessingchargecurrency(coalesce(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency()));
i i
.setMeasures( .setMeasures(
mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1)); mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1));
@ -698,7 +834,7 @@ public class MergeUtils {
* @param b the b * @param b the b
* @return the list * @return the list
*/ */
public static List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) { private static List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) {
if (a == null || b == null) if (a == null || b == null)
return a == null ? b : a; return a == null ? b : a;

View File

@ -9,10 +9,18 @@ public class OrganizationPidComparator implements Comparator<StructuredProperty>
@Override @Override
public int compare(StructuredProperty left, StructuredProperty right) { public int compare(StructuredProperty left, StructuredProperty right) {
if (left == null) {
return right == null ? 0 : -1;
} else if (right == null) {
return 1;
}
PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid()); PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid());
PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid()); PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid());
if (lClass.equals(rClass))
return 0;
if (lClass.equals(PidType.openorgs)) if (lClass.equals(PidType.openorgs))
return -1; return -1;
if (rClass.equals(PidType.openorgs)) if (rClass.equals(PidType.openorgs))

View File

@ -4,7 +4,6 @@ package eu.dnetlib.dhp.schema.oaf.utils;
import java.util.Comparator; import java.util.Comparator;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
/** /**
* Comparator for sorting the values from the dnet:review_levels vocabulary, implements the following ordering * Comparator for sorting the values from the dnet:review_levels vocabulary, implements the following ordering
@ -15,10 +14,18 @@ public class RefereedComparator implements Comparator<Qualifier> {
@Override @Override
public int compare(Qualifier left, Qualifier right) { public int compare(Qualifier left, Qualifier right) {
if (left == null || left.getClassid() == null) {
return (right == null || right.getClassid() == null) ? 0 : -1;
} else if (right == null || right.getClassid() == null) {
return 1;
}
String lClass = left.getClassid(); String lClass = left.getClassid();
String rClass = right.getClassid(); String rClass = right.getClassid();
if (lClass.equals(rClass))
return 0;
if ("0001".equals(lClass)) if ("0001".equals(lClass))
return -1; return -1;
if ("0001".equals(rClass)) if ("0001".equals(rClass))

View File

@ -13,6 +13,9 @@ public class ResultPidComparator implements Comparator<StructuredProperty> {
PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid()); PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid());
PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid()); PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid());
if (lClass.equals(rClass))
return 0;
if (lClass.equals(PidType.doi)) if (lClass.equals(PidType.doi))
return -1; return -1;
if (rClass.equals(PidType.doi)) if (rClass.equals(PidType.doi))

View File

@ -14,6 +14,8 @@ import eu.dnetlib.dhp.schema.oaf.Result;
public class ResultTypeComparator implements Comparator<Result> { public class ResultTypeComparator implements Comparator<Result> {
public static final ResultTypeComparator INSTANCE = new ResultTypeComparator();
@Override @Override
public int compare(Result left, Result right) { public int compare(Result left, Result right) {
@ -37,9 +39,7 @@ public class ResultTypeComparator implements Comparator<Result> {
String lClass = left.getResulttype().getClassid(); String lClass = left.getResulttype().getClassid();
String rClass = right.getResulttype().getClassid(); String rClass = right.getResulttype().getClassid();
if (lClass.equals(rClass)) if (!lClass.equals(rClass)) {
return 0;
if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID)) if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return -1; return -1;
if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID)) if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
@ -59,6 +59,7 @@ public class ResultTypeComparator implements Comparator<Result> {
return -1; return -1;
if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID)) if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return 1; return 1;
}
// Else (but unlikely), lexicographical ordering will do. // Else (but unlikely), lexicographical ordering will do.
return lClass.compareTo(rClass); return lClass.compareTo(rClass);

View File

@ -20,7 +20,7 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
return suffixPrefixChain(s, param("mod")); return suffixPrefixChain(s, param("mod"));
} }
private Collection<String> suffixPrefixChain(String s, int mod) { static Collection<String> suffixPrefixChain(String s, int mod) {
// create the list of words from the string (remove short words) // create the list of words from the string (remove short words)
List<String> wordsList = Arrays List<String> wordsList = Arrays
@ -38,7 +38,7 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
} }
private Collection<String> doSuffixPrefixChain(List<String> wordsList, String prefix) { static private Collection<String> doSuffixPrefixChain(List<String> wordsList, String prefix) {
Set<String> set = Sets.newLinkedHashSet(); Set<String> set = Sets.newLinkedHashSet();
switch (wordsList.size()) { switch (wordsList.size()) {
@ -80,12 +80,16 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
} }
private String suffix(String s, int len) { private static String suffix(String s, int len) {
return s.substring(s.length() - len); return s.substring(s.length() - len);
} }
private String prefix(String s, int len) { private static String prefix(String s, int len) {
return s.substring(0, len); return s.substring(0, len);
} }
static public void main(String[] args) {
String title = "MY LIFE AS A BOSON: THE STORY OF \"THE HIGGS\"".toLowerCase();
System.out.println(suffixPrefixChain(title, 10));
}
} }

View File

@ -7,6 +7,7 @@ import java.util.stream.Stream;
import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
@ -94,43 +95,58 @@ public class DedupRecordFactory {
.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
.select("dedupId", "id", "kryoObject") .select("dedupId", "id", "kryoObject")
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
.map( .groupByKey((MapFunction<Tuple3<String, String, OafEntity>, String>) Tuple3::_1, Encoders.STRING())
(MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState( .flatMapGroups(
t._1(), t._2(), t._3()), (FlatMapGroupsFunction<String, Tuple3<String, String, OafEntity>, OafEntity>) (dedupId, it) -> {
Encoders.kryo(DedupRecordReduceState.class)) if (!it.hasNext())
.groupByKey( return Collections.emptyIterator();
(MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
.reduceGroups(
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
if (t1.entity == null) {
t2.aliases.addAll(t1.aliases);
return t2;
}
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
return t1; final ArrayList<OafEntity> cliques = new ArrayList<>();
})
.flatMap((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
String dedupId = t._1();
DedupRecordReduceState agg = t._2();
if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { final ArrayList<String> aliases = new ArrayList<>();
final HashSet<String> acceptanceDate = new HashSet<>();
while (it.hasNext()) {
Tuple3<String, String, OafEntity> t = it.next();
OafEntity entity = t._3();
if (entity == null) {
aliases.add(t._2());
} else {
cliques.add(entity);
if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
if (Result.class.isAssignableFrom(entity.getClass())) {
Result result = (Result) entity;
if (result.getDateofacceptance() != null
&& StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
acceptanceDate.add(result.getDateofacceptance().getValue());
}
}
}
}
}
if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) {
return Collections.emptyIterator(); return Collections.emptyIterator();
} }
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator());
// dedup records do not have date of transformation attribute
mergedEntity.setDateoftransformation(null);
return Stream return Stream
.concat( .concat(
Stream Stream
.of(agg.getDedupId()) .of(dedupId)
.map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)), .map(id -> createDedupOafEntity(id, mergedEntity, dataInfo, ts)),
agg.aliases aliases
.stream() .stream()
.map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts))) .map(id -> createMergedDedupAliasOafEntity(id, mergedEntity, dataInfo, ts)))
.iterator(); .iterator();
}, beanEncoder); }, beanEncoder);
} }

View File

@ -130,10 +130,12 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication {
switch (mode) { switch (mode) {
case claim: case claim:
save( save(
inputRdd inputRdd.keyBy(oaf -> ModelSupport.idFn().apply(oaf))
.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) .groupByKey()
.reduceByKey(MergeUtils::merge) .map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())),
.map(Tuple2::_2), //.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
//.reduceByKey(MergeUtils::merge)
//.map(Tuple2::_2),
targetPath); targetPath);
break; break;
case graph: case graph:

View File

@ -8,6 +8,8 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkCreateInputGraph { object SparkCreateInputGraph {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
@ -131,8 +133,9 @@ object SparkCreateInputGraph {
val ds: Dataset[T] = spark.read.load(sourcePath).as[T] val ds: Dataset[T] = spark.read.load(sourcePath).as[T]
ds.groupByKey(_.getId) ds.groupByKey(_.getId)
.reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] } .mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] }
.map(_._2) // .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
// .map(_)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(targetPath) .save(targetPath)