1
0
Fork 0

merged conflicts on beta

This commit is contained in:
sandro.labruzzo 2024-11-13 09:43:16 +01:00
commit b0478c380e
12 changed files with 331 additions and 239 deletions

View File

@ -2,8 +2,7 @@
package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.when;
import static org.apache.spark.sql.functions.*;
import java.util.Map;
import java.util.Optional;
@ -135,7 +134,9 @@ public class GroupEntitiesSparkJob {
.applyCoarVocabularies(entity, vocs),
OAFENTITY_KRYO_ENC)
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.mapGroups((MapGroupsFunction<String, OafEntity, OafEntity>) MergeUtils::mergeById, OAFENTITY_KRYO_ENC)
.mapGroups(
(MapGroupsFunction<String, OafEntity, OafEntity>) (key, group) -> MergeUtils.mergeById(group, vocs),
OAFENTITY_KRYO_ENC)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
t.getClass().getName(), t),

View File

@ -2,7 +2,6 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OPENAIRE_META_RESOURCE_TYPE;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.getProvenance;
import java.net.MalformedURLException;
@ -696,6 +695,7 @@ public class GraphCleaningFunctions extends CleaningFunctions {
}
}
// set ORCID_PENDING to all orcid values that are not coming from ORCID provenance
for (Author a : r.getAuthor()) {
if (Objects.isNull(a.getPid())) {
a.setPid(Lists.newArrayList());
@ -752,6 +752,40 @@ public class GraphCleaningFunctions extends CleaningFunctions {
.collect(Collectors.toList()));
}
}
// Identify clashing ORCIDS:that is same ORCID associated to multiple authors in this result
Map<String, Integer> clashing_orcid = new HashMap<>();
for (Author a : r.getAuthor()) {
a
.getPid()
.stream()
.filter(
p -> StringUtils
.contains(StringUtils.lowerCase(p.getQualifier().getClassid()), ORCID_PENDING))
.map(StructuredProperty::getValue)
.distinct()
.forEach(orcid -> clashing_orcid.compute(orcid, (k, v) -> (v == null) ? 1 : v + 1));
}
Set<String> clashing = clashing_orcid
.entrySet()
.stream()
.filter(ee -> ee.getValue() > 1)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
// filter out clashing orcids
for (Author a : r.getAuthor()) {
a
.setPid(
a
.getPid()
.stream()
.filter(p -> !clashing.contains(p.getValue()))
.collect(Collectors.toList()));
}
}
if (value instanceof Publication) {

View File

@ -16,6 +16,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.EntityType;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@ -31,16 +33,20 @@ import eu.dnetlib.dhp.schema.oaf.*;
public class MergeUtils {
public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) {
return mergeGroup(s, oafEntityIterator, true);
public static <T extends Oaf> T mergeById(Iterator<T> oafEntityIterator, VocabularyGroup vocs) {
return mergeGroup(oafEntityIterator, true, vocs);
}
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator) {
return mergeGroup(s, oafEntityIterator, false);
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator) {
return mergeGroup(oafEntityIterator, false);
}
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator,
boolean checkDelegateAuthority) {
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator, boolean checkDelegateAuthority) {
return mergeGroup(oafEntityIterator, checkDelegateAuthority, null);
}
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator,
boolean checkDelegateAuthority, VocabularyGroup vocs) {
ArrayList<T> sortedEntities = new ArrayList<>();
oafEntityIterator.forEachRemaining(sortedEntities::add);
@ -49,13 +55,54 @@ public class MergeUtils {
Iterator<T> it = sortedEntities.iterator();
T merged = it.next();
while (it.hasNext()) {
merged = checkedMerge(merged, it.next(), checkDelegateAuthority);
if (!it.hasNext() && merged instanceof Result && vocs != null) {
return enforceResultType(vocs, (Result) merged);
} else {
while (it.hasNext()) {
merged = checkedMerge(merged, it.next(), checkDelegateAuthority);
}
}
return merged;
}
private static <T extends Oaf> T enforceResultType(VocabularyGroup vocs, Result mergedResult) {
if (Optional.ofNullable(mergedResult.getInstance()).map(List::isEmpty).orElse(true)) {
return (T) mergedResult;
} else {
final Instance i = mergedResult.getInstance().get(0);
if (!vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
return (T) mergedResult;
} else {
final Qualifier expectedResultType = vocs.lookupTermBySynonym(
ModelConstants.DNET_RESULT_TYPOLOGIES,
i.getInstancetype().getClassid());
if (Objects.isNull(expectedResultType)) {
throw new IllegalArgumentException(
"instance type not bound to any result type in dnet:result_typologies: " +
i.getInstancetype().getClassid());
}
// there is a clash among the result types
if (!expectedResultType.getClassid().equals(mergedResult.getResulttype().getClassid())) {
try {
String resulttype = expectedResultType.getClassid();
if (EntityType.otherresearchproduct.toString().equals(resulttype)) {
resulttype = "other";
}
Result result = (Result) ModelSupport.oafTypes.get(resulttype).newInstance();
return (T) mergeResultFields(result, mergedResult);
} catch (InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e);
}
} else {
return (T) mergedResult;
}
}
}
}
public static <T extends Oaf> T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) {
return (T) merge(left, right, checkDelegateAuthority);
}
@ -75,10 +122,10 @@ public class MergeUtils {
return mergeRelation((Relation) left, (Relation) right);
} else {
throw new RuntimeException(
String
.format(
"MERGE_FROM_AND_GET incompatible types: %s, %s",
left.getClass().getCanonicalName(), right.getClass().getCanonicalName()));
String
.format(
"MERGE_FROM_AND_GET incompatible types: %s, %s",
left.getClass().getCanonicalName(), right.getClass().getCanonicalName()));
}
}
@ -106,7 +153,7 @@ public class MergeUtils {
return mergeSoftware((Software) left, (Software) right);
}
return mergeResultFields((Result) left, (Result) right);
return left;
} else if (sameClass(left, right, Datasource.class)) {
// TODO
final int trust = compareTrust(left, right);
@ -117,10 +164,10 @@ public class MergeUtils {
return mergeProject((Project) left, (Project) right);
} else {
throw new RuntimeException(
String
.format(
"MERGE_FROM_AND_GET incompatible types: %s, %s",
left.getClass().getCanonicalName(), right.getClass().getCanonicalName()));
String
.format(
"MERGE_FROM_AND_GET incompatible types: %s, %s",
left.getClass().getCanonicalName(), right.getClass().getCanonicalName()));
}
}
@ -211,7 +258,7 @@ public class MergeUtils {
}
private static <T, K> List<T> mergeLists(final List<T> left, final List<T> right, int trust,
Function<T, K> keyExtractor, BinaryOperator<T> merger) {
Function<T, K> keyExtractor, BinaryOperator<T> merger) {
if (left == null || left.isEmpty()) {
return right != null ? right : new ArrayList<>();
} else if (right == null || right.isEmpty()) {
@ -222,11 +269,11 @@ public class MergeUtils {
List<T> l = trust >= 0 ? right : left;
return new ArrayList<>(Stream
.concat(h.stream(), l.stream())
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toMap(keyExtractor, v -> v, merger, LinkedHashMap::new))
.values());
.concat(h.stream(), l.stream())
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toMap(keyExtractor, v -> v, merger, LinkedHashMap::new))
.values());
}
private static <T, K> List<T> unionDistinctLists(final List<T> left, final List<T> right, int trust) {
@ -240,10 +287,10 @@ public class MergeUtils {
List<T> l = trust >= 0 ? right : left;
return Stream
.concat(h.stream(), l.stream())
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
.concat(h.stream(), l.stream())
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
}
private static List<String> unionDistinctListOfString(final List<String> l, final List<String> r) {
@ -254,10 +301,10 @@ public class MergeUtils {
}
return Stream
.concat(l.stream(), r.stream())
.filter(StringUtils::isNotBlank)
.distinct()
.collect(Collectors.toList());
.concat(l.stream(), r.stream())
.filter(StringUtils::isNotBlank)
.distinct()
.collect(Collectors.toList());
}
// TODO review
@ -283,7 +330,7 @@ public class MergeUtils {
}
private static List<StructuredProperty> unionTitle(List<StructuredProperty> left, List<StructuredProperty> right,
int trust) {
int trust) {
if (left == null) {
return right;
} else if (right == null) {
@ -294,10 +341,10 @@ public class MergeUtils {
List<StructuredProperty> l = trust >= 0 ? right : left;
return Stream
.concat(h.stream(), l.stream())
.filter(Objects::isNull)
.distinct()
.collect(Collectors.toList());
.concat(h.stream(), l.stream())
.filter(Objects::isNull)
.distinct()
.collect(Collectors.toList());
}
/**
@ -332,8 +379,8 @@ public class MergeUtils {
merged.setPid(mergeLists(merged.getPid(), enrich.getPid(), trust, MergeUtils::spKeyExtractor, (p1, p2) -> p1));
merged.setDateofcollection(LocalDateTime.now().toString());
merged
.setDateoftransformation(
chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust));
.setDateoftransformation(
chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust));
merged.setExtraInfo(unionDistinctLists(merged.getExtraInfo(), enrich.getExtraInfo(), trust));
// When merging records OAI provenance becomes null
merged.setOaiprovenance(null);
@ -350,7 +397,7 @@ public class MergeUtils {
checkArgument(Objects.equals(merge.getTarget(), enrich.getTarget()), "target ids must be equal");
checkArgument(Objects.equals(merge.getRelType(), enrich.getRelType()), "relType(s) must be equal");
checkArgument(
Objects.equals(merge.getSubRelType(), enrich.getSubRelType()), "subRelType(s) must be equal");
Objects.equals(merge.getSubRelType(), enrich.getSubRelType()), "subRelType(s) must be equal");
checkArgument(Objects.equals(merge.getRelClass(), enrich.getRelClass()), "relClass(es) must be equal");
// merge.setProvenance(mergeLists(merge.getProvenance(), enrich.getProvenance()));
@ -361,10 +408,10 @@ public class MergeUtils {
merge.setValidationDate(ModelSupport.oldest(merge.getValidationDate(), enrich.getValidationDate()));
} catch (ParseException e) {
throw new IllegalArgumentException(String
.format(
"invalid validation date format in relation [s:%s, t:%s]: %s", merge.getSource(),
merge.getTarget(),
merge.getValidationDate()));
.format(
"invalid validation date format in relation [s:%s, t:%s]: %s", merge.getSource(),
merge.getTarget(),
merge.getValidationDate()));
}
// TODO keyvalue merge
@ -378,7 +425,7 @@ public class MergeUtils {
T merge = mergeOafEntityFields(original, enrich, trust);
if (merge.getProcessingchargeamount() == null
|| StringUtils.isBlank(merge.getProcessingchargeamount().getValue())) {
|| StringUtils.isBlank(merge.getProcessingchargeamount().getValue())) {
merge.setProcessingchargeamount(enrich.getProcessingchargeamount());
merge.setProcessingchargecurrency(enrich.getProcessingchargecurrency());
}
@ -410,8 +457,8 @@ public class MergeUtils {
}
merge
.setDateofacceptance(
mergeDateOfAcceptance(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust));
.setDateofacceptance(
mergeDateOfAcceptance(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust));
merge.setPublisher(coalesce(merge.getPublisher(), enrich.getPublisher()));
merge.setEmbargoenddate(coalesce(merge.getEmbargoenddate(), enrich.getEmbargoenddate()));
@ -426,7 +473,7 @@ public class MergeUtils {
merge.setCoverage(unionDistinctLists(merge.getCoverage(), enrich.getCoverage(), trust));
if (enrich.getBestaccessright() != null
&& new AccessRightComparator<>()
&& new AccessRightComparator<>()
.compare(enrich.getBestaccessright(), merge.getBestaccessright()) < 0) {
merge.setBestaccessright(enrich.getBestaccessright());
}
@ -439,8 +486,8 @@ public class MergeUtils {
// ok
merge
.setExternalReference(
mergeExternalReference(merge.getExternalReference(), enrich.getExternalReference(), trust));
.setExternalReference(
mergeExternalReference(merge.getExternalReference(), enrich.getExternalReference(), trust));
// instance enrichment or union
// review instance equals => add pid to comparision
@ -448,17 +495,17 @@ public class MergeUtils {
merge.setInstance(mergeInstances(merge.getInstance(), enrich.getInstance(), trust));
} else {
final List<Instance> enrichmentInstances = isAnEnrichment(merge) ? merge.getInstance()
: enrich.getInstance();
: enrich.getInstance();
final List<Instance> enrichedInstances = isAnEnrichment(merge) ? enrich.getInstance()
: merge.getInstance();
: merge.getInstance();
if (isAnEnrichment(merge))
merge.setDataInfo(enrich.getDataInfo());
merge.setInstance(enrichInstances(enrichedInstances, enrichmentInstances));
}
merge
.setEoscifguidelines(
mergeEosciifguidelines(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust));
.setEoscifguidelines(
mergeEosciifguidelines(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust));
merge.setIsGreen(booleanOR(merge.getIsGreen(), enrich.getIsGreen()));
// OK but should be list of values
merge.setOpenAccessColor(coalesce(merge.getOpenAccessColor(), enrich.getOpenAccessColor()));
@ -484,7 +531,7 @@ public class MergeUtils {
LocalDate enrich_date = LocalDate.parse(enrich.getValue(), DateTimeFormatter.ISO_DATE);
if (enrich_date.getYear() > 1300
&& (merge_date.getYear() < 1300 || merge_date.isAfter(enrich_date))) {
&& (merge_date.getYear() < 1300 || merge_date.isAfter(enrich_date))) {
return enrich;
}
} catch (NullPointerException | DateTimeParseException e) {
@ -502,56 +549,56 @@ public class MergeUtils {
private static List<Instance> mergeInstances(List<Instance> v1, List<Instance> v2, int trust) {
return mergeLists(
v1, v2, trust,
MergeUtils::instanceKeyExtractor,
MergeUtils::instanceMerger);
v1, v2, trust,
MergeUtils::instanceKeyExtractor,
MergeUtils::instanceMerger);
}
private static List<EoscIfGuidelines> mergeEosciifguidelines(List<EoscIfGuidelines> v1, List<EoscIfGuidelines> v2,
int trust) {
int trust) {
return mergeLists(
v1, v2, trust, er -> Joiner
.on("||")
.useForNull("")
.join(er.getCode(), er.getLabel(), er.getUrl(), er.getSemanticRelation()),
(r, l) -> r);
v1, v2, trust, er -> Joiner
.on("||")
.useForNull("")
.join(er.getCode(), er.getLabel(), er.getUrl(), er.getSemanticRelation()),
(r, l) -> r);
}
private static List<ExternalReference> mergeExternalReference(List<ExternalReference> v1,
List<ExternalReference> v2, int trust) {
List<ExternalReference> v2, int trust) {
return mergeLists(
v1, v2, trust, er -> Joiner
.on(',')
.useForNull("")
.join(
er.getSitename(), er.getLabel(),
er.getUrl(), toString(er.getQualifier()), er.getRefidentifier(),
er.getQuery(), toString(er.getDataInfo())),
(r, l) -> r);
v1, v2, trust, er -> Joiner
.on(',')
.useForNull("")
.join(
er.getSitename(), er.getLabel(),
er.getUrl(), toString(er.getQualifier()), er.getRefidentifier(),
er.getQuery(), toString(er.getDataInfo())),
(r, l) -> r);
}
private static String toString(DataInfo di) {
return Joiner
.on(',')
.useForNull("")
.join(
di.getInvisible(), di.getInferred(), di.getDeletedbyinference(), di.getTrust(),
di.getInferenceprovenance(), toString(di.getProvenanceaction()));
.on(',')
.useForNull("")
.join(
di.getInvisible(), di.getInferred(), di.getDeletedbyinference(), di.getTrust(),
di.getInferenceprovenance(), toString(di.getProvenanceaction()));
}
private static String toString(Qualifier q) {
return Joiner
.on(',')
.useForNull("")
.join(q.getClassid(), q.getClassname(), q.getSchemeid(), q.getSchemename());
.on(',')
.useForNull("")
.join(q.getClassid(), q.getClassname(), q.getSchemeid(), q.getSchemename());
}
private static String toString(StructuredProperty sp) {
return Joiner
.on(',')
.useForNull("")
.join(toString(sp.getQualifier()), sp.getValue());
.on(',')
.useForNull("")
.join(toString(sp.getQualifier()), sp.getValue());
}
private static <T extends StructuredProperty> List<T> mergeStructuredProperties(List<T> v1, List<T> v2, int trust) {
@ -590,17 +637,17 @@ public class MergeUtils {
// 2. @@
// 3. ||
return String
.join(
"::",
kvKeyExtractor(i.getHostedby()),
kvKeyExtractor(i.getCollectedfrom()),
qualifierKeyExtractor(i.getAccessright()),
qualifierKeyExtractor(i.getInstancetype()),
Optional.ofNullable(i.getUrl()).map(u -> String.join("@@", u)).orElse(null),
Optional
.ofNullable(i.getPid())
.map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("@@")))
.orElse(null));
.join(
"::",
kvKeyExtractor(i.getHostedby()),
kvKeyExtractor(i.getCollectedfrom()),
qualifierKeyExtractor(i.getAccessright()),
qualifierKeyExtractor(i.getInstancetype()),
Optional.ofNullable(i.getUrl()).map(u -> String.join("@@", u)).orElse(null),
Optional
.ofNullable(i.getPid())
.map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("@@")))
.orElse(null));
}
private static Instance instanceMerger(Instance i1, Instance i2) {
@ -611,30 +658,30 @@ public class MergeUtils {
i.setInstancetype(i1.getInstancetype());
i.setPid(mergeLists(i1.getPid(), i2.getPid(), 0, MergeUtils::spKeyExtractor, (sp1, sp2) -> sp1));
i
.setAlternateIdentifier(
mergeLists(
i1.getAlternateIdentifier(), i2.getAlternateIdentifier(), 0, MergeUtils::spKeyExtractor,
(sp1, sp2) -> sp1));
.setAlternateIdentifier(
mergeLists(
i1.getAlternateIdentifier(), i2.getAlternateIdentifier(), 0, MergeUtils::spKeyExtractor,
(sp1, sp2) -> sp1));
i
.setRefereed(
Collections
.min(
Stream.of(i1.getRefereed(), i2.getRefereed()).collect(Collectors.toList()),
new RefereedComparator()));
.setRefereed(
Collections
.min(
Stream.of(i1.getRefereed(), i2.getRefereed()).collect(Collectors.toList()),
new RefereedComparator()));
i
.setInstanceTypeMapping(
mergeLists(
i1.getInstanceTypeMapping(), i2.getInstanceTypeMapping(), 0,
MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1));
.setInstanceTypeMapping(
mergeLists(
i1.getInstanceTypeMapping(), i2.getInstanceTypeMapping(), 0,
MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1));
i.setFulltext(selectFulltext(i1.getFulltext(), i2.getFulltext()));
i.setDateofacceptance(selectOldestDate(i1.getDateofacceptance(), i2.getDateofacceptance()));
i.setLicense(coalesce(i1.getLicense(), i2.getLicense()));
i.setProcessingchargeamount(coalesce(i1.getProcessingchargeamount(), i2.getProcessingchargeamount()));
i.setProcessingchargecurrency(coalesce(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency()));
i
.setMeasures(
mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1));
.setMeasures(
mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1));
i.setUrl(unionDistinctListOfString(i1.getUrl(), i2.getUrl()));
@ -643,14 +690,14 @@ public class MergeUtils {
private static String measureKeyExtractor(Measure m) {
return String
.join(
"::",
m.getId(),
m
.getUnit()
.stream()
.map(KeyValue::getKey)
.collect(Collectors.joining("::")));
.join(
"::",
m.getId(),
m
.getUnit()
.stream()
.map(KeyValue::getKey)
.collect(Collectors.joining("::")));
}
private static Field<String> selectOldestDate(Field<String> d1, Field<String> d2) {
@ -661,16 +708,16 @@ public class MergeUtils {
}
return Stream
.of(d1, d2)
.min(
Comparator
.comparing(
f -> DateParserUtils
.parseDate(f.getValue())
.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDate()))
.orElse(d1);
.of(d1, d2)
.min(
Comparator
.comparing(
f -> DateParserUtils
.parseDate(f.getValue())
.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDate()))
.orElse(d1);
}
private static String selectFulltext(String ft1, String ft2) {
@ -685,12 +732,12 @@ public class MergeUtils {
private static String instanceTypeMappingKeyExtractor(InstanceTypeMapping itm) {
return String
.join(
"::",
itm.getOriginalType(),
itm.getTypeCode(),
itm.getTypeLabel(),
itm.getVocabularyName());
.join(
"::",
itm.getOriginalType(),
itm.getTypeCode(),
itm.getTypeLabel(),
itm.getVocabularyName());
}
private static String kvKeyExtractor(KeyValue kv) {
@ -707,13 +754,13 @@ public class MergeUtils {
private static String spKeyExtractor(StructuredProperty sp) {
return Optional
.ofNullable(sp)
.map(
s -> Joiner
.on("||")
.useForNull("")
.join(qualifierKeyExtractor(s.getQualifier()), s.getValue()))
.orElse(null);
.ofNullable(sp)
.map(
s -> Joiner
.on("||")
.useForNull("")
.join(qualifierKeyExtractor(s.getQualifier()), s.getValue()))
.orElse(null);
}
private static <T extends OtherResearchProduct> T mergeORP(T original, T enrich) {
@ -735,8 +782,8 @@ public class MergeUtils {
merge.setLicense(unionDistinctLists(merge.getLicense(), enrich.getLicense(), trust));
merge.setCodeRepositoryUrl(chooseReference(merge.getCodeRepositoryUrl(), enrich.getCodeRepositoryUrl(), trust));
merge
.setProgrammingLanguage(
chooseReference(merge.getProgrammingLanguage(), enrich.getProgrammingLanguage(), trust));
.setProgrammingLanguage(
chooseReference(merge.getProgrammingLanguage(), enrich.getProgrammingLanguage(), trust));
return merge;
}
@ -750,11 +797,11 @@ public class MergeUtils {
merge.setSize(chooseReference(merge.getSize(), enrich.getSize(), trust));
merge.setVersion(chooseReference(merge.getVersion(), enrich.getVersion(), trust));
merge
.setLastmetadataupdate(
chooseReference(merge.getLastmetadataupdate(), enrich.getLastmetadataupdate(), trust));
.setLastmetadataupdate(
chooseReference(merge.getLastmetadataupdate(), enrich.getLastmetadataupdate(), trust));
merge
.setMetadataversionnumber(
chooseReference(merge.getMetadataversionnumber(), enrich.getMetadataversionnumber(), trust));
.setMetadataversionnumber(
chooseReference(merge.getMetadataversionnumber(), enrich.getMetadataversionnumber(), trust));
merge.setGeolocation(unionDistinctLists(merge.getGeolocation(), enrich.getGeolocation(), trust));
return merge;
@ -776,26 +823,26 @@ public class MergeUtils {
merged.setLegalshortname(chooseReference(merged.getLegalshortname(), enrich.getLegalshortname(), trust));
merged.setLegalname(chooseReference(merged.getLegalname(), enrich.getLegalname(), trust));
merged
.setAlternativeNames(unionDistinctLists(enrich.getAlternativeNames(), merged.getAlternativeNames(), trust));
.setAlternativeNames(unionDistinctLists(enrich.getAlternativeNames(), merged.getAlternativeNames(), trust));
merged.setWebsiteurl(chooseReference(merged.getWebsiteurl(), enrich.getWebsiteurl(), trust));
merged.setLogourl(chooseReference(merged.getLogourl(), enrich.getLogourl(), trust));
merged.setEclegalbody(chooseReference(merged.getEclegalbody(), enrich.getEclegalbody(), trust));
merged.setEclegalperson(chooseReference(merged.getEclegalperson(), enrich.getEclegalperson(), trust));
merged.setEcnonprofit(chooseReference(merged.getEcnonprofit(), enrich.getEcnonprofit(), trust));
merged
.setEcresearchorganization(
chooseReference(merged.getEcresearchorganization(), enrich.getEcresearchorganization(), trust));
.setEcresearchorganization(
chooseReference(merged.getEcresearchorganization(), enrich.getEcresearchorganization(), trust));
merged
.setEchighereducation(chooseReference(merged.getEchighereducation(), enrich.getEchighereducation(), trust));
.setEchighereducation(chooseReference(merged.getEchighereducation(), enrich.getEchighereducation(), trust));
merged
.setEcinternationalorganizationeurinterests(
chooseReference(
merged.getEcinternationalorganizationeurinterests(),
enrich.getEcinternationalorganizationeurinterests(), trust));
.setEcinternationalorganizationeurinterests(
chooseReference(
merged.getEcinternationalorganizationeurinterests(),
enrich.getEcinternationalorganizationeurinterests(), trust));
merged
.setEcinternationalorganization(
chooseReference(
merged.getEcinternationalorganization(), enrich.getEcinternationalorganization(), trust));
.setEcinternationalorganization(
chooseReference(
merged.getEcinternationalorganization(), enrich.getEcinternationalorganization(), trust));
merged.setEcenterprise(chooseReference(merged.getEcenterprise(), enrich.getEcenterprise(), trust));
merged.setEcsmevalidated(chooseReference(merged.getEcsmevalidated(), enrich.getEcsmevalidated(), trust));
merged.setEcnutscode(chooseReference(merged.getEcnutscode(), enrich.getEcnutscode(), trust));
@ -819,8 +866,8 @@ public class MergeUtils {
merged.setDuration(chooseReference(merged.getDuration(), enrich.getDuration(), trust));
merged.setEcsc39(chooseReference(merged.getEcsc39(), enrich.getEcsc39(), trust));
merged
.setOamandatepublications(
chooseReference(merged.getOamandatepublications(), enrich.getOamandatepublications(), trust));
.setOamandatepublications(
chooseReference(merged.getOamandatepublications(), enrich.getOamandatepublications(), trust));
merged.setEcarticle29_3(chooseReference(merged.getEcarticle29_3(), enrich.getEcarticle29_3(), trust));
merged.setSubjects(unionDistinctLists(merged.getSubjects(), enrich.getSubjects(), trust));
merged.setFundingtree(unionDistinctLists(merged.getFundingtree(), enrich.getFundingtree(), trust));
@ -846,8 +893,8 @@ public class MergeUtils {
}
merged
.setH2020classification(
unionDistinctLists(merged.getH2020classification(), enrich.getH2020classification(), trust));
.setH2020classification(
unionDistinctLists(merged.getH2020classification(), enrich.getH2020classification(), trust));
return merged;
}
@ -874,7 +921,7 @@ public class MergeUtils {
* @return list of instances possibly enriched
*/
private static List<Instance> enrichInstances(final List<Instance> toEnrichInstances,
final List<Instance> enrichmentInstances) {
final List<Instance> enrichmentInstances) {
final List<Instance> enrichmentResult = new ArrayList<>();
if (toEnrichInstances == null) {
@ -912,42 +959,42 @@ public class MergeUtils {
*/
private static Map<String, Instance> toInstanceMap(final List<Instance> ri) {
return ri
.stream()
.filter(i -> i.getPid() != null || i.getAlternateIdentifier() != null)
.flatMap(i -> {
final List<Pair<String, Instance>> result = new ArrayList<>();
if (i.getPid() != null)
i
.getPid()
.stream()
.filter(MergeUtils::validPid)
.forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i)));
if (i.getAlternateIdentifier() != null)
i
.getAlternateIdentifier()
.stream()
.filter(MergeUtils::validPid)
.forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i)));
return result.stream();
})
.collect(
Collectors
.toMap(
Pair::getLeft,
Pair::getRight,
(a, b) -> a));
.stream()
.filter(i -> i.getPid() != null || i.getAlternateIdentifier() != null)
.flatMap(i -> {
final List<Pair<String, Instance>> result = new ArrayList<>();
if (i.getPid() != null)
i
.getPid()
.stream()
.filter(MergeUtils::validPid)
.forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i)));
if (i.getAlternateIdentifier() != null)
i
.getAlternateIdentifier()
.stream()
.filter(MergeUtils::validPid)
.forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i)));
return result.stream();
})
.collect(
Collectors
.toMap(
Pair::getLeft,
Pair::getRight,
(a, b) -> a));
}
private static boolean isFromDelegatedAuthority(Result r) {
return Optional
.ofNullable(r.getInstance())
.map(
instance -> instance
.stream()
.filter(i -> Objects.nonNull(i.getCollectedfrom()))
.map(i -> i.getCollectedfrom().getKey())
.anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId)))
.orElse(false);
.ofNullable(r.getInstance())
.map(
instance -> instance
.stream()
.filter(i -> Objects.nonNull(i.getCollectedfrom()))
.map(i -> i.getCollectedfrom().getKey())
.anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId)))
.orElse(false);
}
/**
@ -983,15 +1030,15 @@ public class MergeUtils {
* @return the list
*/
private static List<Instance> findEnrichmentsByPID(final List<StructuredProperty> pids,
final Map<String, Instance> enrichments) {
final Map<String, Instance> enrichments) {
if (pids == null || enrichments == null)
return null;
return pids
.stream()
.map(MergeUtils::extractKeyFromPid)
.map(enrichments::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
.stream()
.map(MergeUtils::extractKeyFromPid)
.map(enrichments::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/**
@ -1002,8 +1049,8 @@ public class MergeUtils {
*/
private static boolean isAnEnrichment(OafEntity e) {
return e.getDataInfo() != null &&
e.getDataInfo().getProvenanceaction() != null
&& ModelConstants.PROVENANCE_ENRICH.equalsIgnoreCase(e.getDataInfo().getProvenanceaction().getClassid());
e.getDataInfo().getProvenanceaction() != null
&& ModelConstants.PROVENANCE_ENRICH.equalsIgnoreCase(e.getDataInfo().getProvenanceaction().getClassid());
}
/**
@ -1026,17 +1073,17 @@ public class MergeUtils {
merge.setHostedby(firstNonNull(merge.getHostedby(), enrichment.getHostedby()));
merge.setUrl(unionDistinctLists(merge.getUrl(), enrichment.getUrl(), 0));
merge
.setDistributionlocation(
firstNonNull(merge.getDistributionlocation(), enrichment.getDistributionlocation()));
.setDistributionlocation(
firstNonNull(merge.getDistributionlocation(), enrichment.getDistributionlocation()));
merge.setCollectedfrom(firstNonNull(merge.getCollectedfrom(), enrichment.getCollectedfrom()));
// pid and alternateId are used for matching
merge.setDateofacceptance(firstNonNull(merge.getDateofacceptance(), enrichment.getDateofacceptance()));
merge
.setProcessingchargeamount(
firstNonNull(merge.getProcessingchargeamount(), enrichment.getProcessingchargeamount()));
.setProcessingchargeamount(
firstNonNull(merge.getProcessingchargeamount(), enrichment.getProcessingchargeamount()));
merge
.setProcessingchargecurrency(
firstNonNull(merge.getProcessingchargecurrency(), enrichment.getProcessingchargecurrency()));
.setProcessingchargecurrency(
firstNonNull(merge.getProcessingchargecurrency(), enrichment.getProcessingchargecurrency()));
merge.setRefereed(firstNonNull(merge.getRefereed(), enrichment.getRefereed()));
merge.setMeasures(unionDistinctLists(merge.getMeasures(), enrichment.getMeasures(), 0));
merge.setFulltext(firstNonNull(merge.getFulltext(), enrichment.getFulltext()));
@ -1044,14 +1091,14 @@ public class MergeUtils {
private static int compareTrust(Oaf a, Oaf b) {
String left = Optional
.ofNullable(a.getDataInfo())
.map(DataInfo::getTrust)
.orElse("0.0");
.ofNullable(a.getDataInfo())
.map(DataInfo::getTrust)
.orElse("0.0");
String right = Optional
.ofNullable(b.getDataInfo())
.map(DataInfo::getTrust)
.orElse("0.0");
.ofNullable(b.getDataInfo())
.map(DataInfo::getTrust)
.orElse("0.0");
return left.compareTo(right);
}

View File

@ -104,22 +104,22 @@ public class PrepareAffiliationRelations implements Serializable {
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelationsNewModel(
spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::crossref");
spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":crossref");
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::pubmed");
spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":pubmed");
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelationsNewModel(
spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::openapc");
spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":openapc");
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelationsNewModel(
spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::datacite");
spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":datacite");
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelationsNewModel(
spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::rawaff");
spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":rawaff");
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisherNewModel(
spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::webcrawl");
spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":webcrawl");
crossrefRelations
.union(pubmedRelations)

View File

@ -193,8 +193,8 @@ public class ExtractPerson implements Serializable {
private static Relation getProjectRelation(String project, String orcid, String role) {
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid);
String target = PROJECT_ID_PREFIX + project.substring(0, 14)
+ IdentifierFactory.md5(project.substring(15));
String target = PROJECT_ID_PREFIX + StringUtils.substringBefore(project, "::") + "::"
+ IdentifierFactory.md5(StringUtils.substringAfter(project, "::"));
List<KeyValue> properties = new ArrayList<>();
Relation relation = OafMapperUtils
@ -345,7 +345,16 @@ public class ExtractPerson implements Serializable {
OafMapperUtils
.structuredProperty(
op.getOrcid(), ModelConstants.ORCID, ModelConstants.ORCID_CLASSNAME,
ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, null));
ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES,
OafMapperUtils.dataInfo(false,
null,
false,
false,
OafMapperUtils.qualifier(ModelConstants.SYSIMPORT_CROSSWALK_ENTITYREGISTRY,
ModelConstants.SYSIMPORT_CROSSWALK_ENTITYREGISTRY,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES),
"0.91")));
person.setDateofcollection(op.getLastModifiedDate());
person.setOriginalId(Arrays.asList(op.getOrcid()));
person.setDataInfo(ORCIDDATAINFO);

View File

@ -135,7 +135,7 @@ public class DedupRecordFactory {
return Collections.emptyIterator();
}
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator());
OafEntity mergedEntity = MergeUtils.mergeGroup(cliques.iterator());
// dedup records do not have date of transformation attribute
mergedEntity.setDateoftransformation(null);
mergedEntity

View File

@ -69,6 +69,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
Dataset<Relation> mergeRels = spark
.read()
.schema(REL_BEAN_ENC.schema())
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(REL_BEAN_ENC);

View File

@ -46,8 +46,8 @@ class DatasetMergerTest implements Serializable {
}
@Test
void datasetMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException {
Dataset pub_merged = MergeUtils.mergeGroup(dedupId, datasets.stream().map(Tuple2::_2).iterator());
void datasetMergerTest() {
Dataset pub_merged = MergeUtils.mergeGroup(datasets.stream().map(Tuple2::_2).iterator());
// verify id
assertEquals(dedupId, pub_merged.getId());

View File

@ -155,7 +155,7 @@ public abstract class AbstractMdRecordToOafMapper {
final List<Instance> instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy);
final String type = getResultType(doc, instances);
final String type = getResultType(instances);
return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp);
} catch (final DocumentException e) {
@ -164,10 +164,9 @@ public abstract class AbstractMdRecordToOafMapper {
}
}
protected String getResultType(final Document doc, final List<Instance> instances) {
final String type = doc.valueOf("//dr:CobjCategory/@type");
protected String getResultType(final List<Instance> instances) {
if (StringUtils.isBlank(type) && this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
if (this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
final String instanceType = instances
.stream()
.map(i -> i.getInstancetype().getClassid())
@ -178,9 +177,9 @@ public abstract class AbstractMdRecordToOafMapper {
.ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType))
.map(Qualifier::getClassid)
.orElse("0000");
} else {
throw new IllegalStateException("Missing vocabulary: " + ModelConstants.DNET_RESULT_TYPOLOGIES);
}
return type;
}
private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) {

View File

@ -133,7 +133,7 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication {
inputRdd
.keyBy(oaf -> ModelSupport.idFn().apply(oaf))
.groupByKey()
.map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())),
.map(t -> MergeUtils.mergeGroup(t._2.iterator())),
// .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
// .reduceByKey(MergeUtils::merge)
// .map(Tuple2::_2),

View File

@ -51,6 +51,7 @@
<arg>--orcidPath</arg><arg>${orcidPath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--graphPath</arg><arg>${graphPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
<arg>--master</arg><arg>yarn</arg>
</spark>
<ok to="reset_outputpath"/>

View File

@ -133,7 +133,7 @@ object SparkCreateInputGraph {
val ds: Dataset[T] = spark.read.load(sourcePath).as[T]
ds.groupByKey(_.getId)
.mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] }
.mapGroups { (id, it) => MergeUtils.mergeGroup(it.asJava).asInstanceOf[T] }
// .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
// .map(_)
.write