Refinements to PR #404: refactoring the Oaf records merge utilities into dhp-common

This commit is contained in:
Giambattista Bloisi 2024-04-11 15:49:29 +02:00
parent 589bce3520
commit 8ac167e420
10 changed files with 313 additions and 133 deletions

View File

@ -14,7 +14,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -135,10 +135,10 @@ public class GroupEntitiesSparkJob {
.applyCoarVocabularies(entity, vocs),
OAFENTITY_KRYO_ENC)
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<OafEntity>) MergeUtils::checkedMerge)
.mapGroups((MapGroupsFunction<String, OafEntity, OafEntity>) MergeUtils::mergeGroup, OAFENTITY_KRYO_ENC)
.map(
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
t._2().getClass().getName(), t._2()),
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
t.getClass().getName(), t),
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
// pivot on "_1" (classname of the entity)

View File

@ -5,7 +5,11 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.ObjectUtils.firstNonNull;
import java.text.ParseException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.*;
import java.util.function.BinaryOperator;
import java.util.function.Function;
@ -19,6 +23,7 @@ import org.apache.commons.lang3.tuple.Pair;
import com.github.sisyphsu.dateparser.DateParserUtils;
import com.google.common.base.Joiner;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.AccessRightComparator;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
@ -26,6 +31,37 @@ import eu.dnetlib.dhp.schema.oaf.*;
public class MergeUtils {
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator) {
TreeSet<T> sortedEntities = new TreeSet<>((o1, o2) -> {
int res = 0;
if (o1.getDataInfo() != null && o2.getDataInfo() != null) {
res = o1.getDataInfo().getTrust().compareTo(o2.getDataInfo().getTrust());
}
if (res == 0) {
if (o1 instanceof Result && o2 instanceof Result) {
return ResultTypeComparator.INSTANCE.compare((Result) o1, (Result) o2);
}
}
return res;
});
while (oafEntityIterator.hasNext()) {
sortedEntities.add(oafEntityIterator.next());
}
T merged = sortedEntities.descendingIterator().next();
Iterator<T> it = sortedEntities.descendingIterator();
while (it.hasNext()) {
merged = checkedMerge(merged, it.next());
}
return merged;
}
public static <T extends Oaf> T checkedMerge(final T left, final T right) {
return (T) merge(left, right, false);
}
@ -34,7 +70,7 @@ public class MergeUtils {
return merge(left, right, false);
}
public static Oaf merge(final Oaf left, final Oaf right, boolean checkDelegatedAuthority) {
static Oaf merge(final Oaf left, final Oaf right, boolean checkDelegatedAuthority) {
if (sameClass(left, right, OafEntity.class)) {
return mergeEntities(left, right, checkDelegatedAuthority);
} else if (sameClass(left, right, Relation.class)) {
@ -190,7 +226,7 @@ public class MergeUtils {
.concat(h.stream(), l.stream())
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toMap(keyExtractor, v -> v, merger))
.collect(Collectors.toMap(keyExtractor, v -> v, merger, LinkedHashMap::new))
.values());
}
@ -226,7 +262,7 @@ public class MergeUtils {
}
// TODO review
private static List<KeyValue> mergeKeyValue(List<KeyValue> left, List<KeyValue> right, int trust) {
private static List<KeyValue> mergeByKey(List<KeyValue> left, List<KeyValue> right, int trust) {
if (trust < 0) {
List<KeyValue> s = left;
left = right;
@ -268,8 +304,7 @@ public class MergeUtils {
*/
private static <T extends Oaf> T mergeOafFields(T merged, T enrich, int trust) {
// TODO: union of all values, but what does it mean with KeyValue pairs???
merged.setCollectedfrom(mergeKeyValue(merged.getCollectedfrom(), enrich.getCollectedfrom(), trust));
merged.setCollectedfrom(mergeByKey(merged.getCollectedfrom(), enrich.getCollectedfrom(), trust));
merged.setDataInfo(chooseDataInfo(merged.getDataInfo(), enrich.getDataInfo(), trust));
merged.setLastupdatetimestamp(max(merged.getLastupdatetimestamp(), enrich.getLastupdatetimestamp()));
@ -289,16 +324,13 @@ public class MergeUtils {
merged.setOriginalId(unionDistinctListOfString(merged.getOriginalId(), enrich.getOriginalId()));
merged.setPid(unionDistinctLists(merged.getPid(), enrich.getPid(), trust));
// dateofcollection mettere today quando si fa merge
merged.setDateofcollection(chooseString(merged.getDateofcollection(), enrich.getDateofcollection(), trust));
// setDateoftransformation mettere vuota in dedup, nota per Claudio
merged.setDateofcollection(LocalDateTime.now().toString());
merged
.setDateoftransformation(
chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust));
// TODO: was missing in OafEntity.merge
merged.setExtraInfo(unionDistinctLists(merged.getExtraInfo(), enrich.getExtraInfo(), trust));
// oaiprovenanze da mettere a null quando si genera merge
merged.setOaiprovenance(chooseReference(merged.getOaiprovenance(), enrich.getOaiprovenance(), trust));
// When merging records OAI provenance becomes null
merged.setOaiprovenance(null);
merged.setMeasures(unionDistinctLists(merged.getMeasures(), enrich.getMeasures(), trust));
return merged;
@ -330,7 +362,7 @@ public class MergeUtils {
}
// TODO keyvalue merge
merge.setProperties(mergeKeyValue(merge.getProperties(), enrich.getProperties(), trust));
merge.setProperties(mergeByKey(merge.getProperties(), enrich.getProperties(), trust));
return merge;
}
@ -345,73 +377,70 @@ public class MergeUtils {
merge.setProcessingchargecurrency(enrich.getProcessingchargecurrency());
}
// author = usare la stessa logica che in dedup
merge.setAuthor(chooseReference(merge.getAuthor(), enrich.getAuthor(), trust));
// il primo che mi arriva secondo l'ordinamento per priorita'
merge.setResulttype(chooseReference(merge.getResulttype(), enrich.getResulttype(), trust));
// gestito come il resulttype perche' e' un subtype
merge.setMetaResourceType(chooseReference(merge.getMetaResourceType(), enrich.getMetaResourceType(), trust));
// spostiamo nell'instance e qui prendo il primo che arriva
merge.setLanguage(chooseReference(merge.getLanguage(), enrich.getLanguage(), trust));
// country lasicamo,o cosi' -> parentesi sul datainfo
merge.setCountry(unionDistinctLists(merge.getCountry(), enrich.getCountry(), trust));
// ok
merge.setSubject(unionDistinctLists(merge.getSubject(), enrich.getSubject(), trust));
// union per priority quindi vanno in append
merge.setTitle(unionTitle(merge.getTitle(), enrich.getTitle(), trust));
// ok
merge.setRelevantdate(unionDistinctLists(merge.getRelevantdate(), enrich.getRelevantdate(), trust));
// prima trust e poi longest list
merge.setDescription(longestLists(merge.getDescription(), enrich.getDescription()));
// trust piu' alto e poi piu' vecchia
merge.setDateofacceptance(chooseReference(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust));
// ok, ma publisher va messo ripetibile
merge.setPublisher(chooseReference(merge.getPublisher(), enrich.getPublisher(), trust));
// ok
merge.setEmbargoenddate(chooseReference(merge.getEmbargoenddate(), enrich.getEmbargoenddate(), trust));
// ok
merge.setAuthor(mergeAuthors(merge.getAuthor(), enrich.getAuthor(), trust));
// keep merge value if present
if (merge.getResulttype() == null) {
merge.setResulttype(enrich.getResulttype());
merge.setMetaResourceType(enrich.getMetaResourceType());
}
// should be an instance attribute, get the first non-null value
merge.setLanguage(coalesce(merge.getLanguage(), enrich.getLanguage()));
// distinct countries, do not manage datainfo
merge.setCountry(mergeQualifiers(merge.getCountry(), enrich.getCountry(), trust));
// distinct subjects
merge.setSubject(mergeStructuredProperties(merge.getSubject(), enrich.getSubject(), trust));
// distinct titles
merge.setTitle(mergeStructuredProperties(merge.getTitle(), enrich.getTitle(), trust));
merge.setRelevantdate(mergeStructuredProperties(merge.getRelevantdate(), enrich.getRelevantdate(), trust));
if (merge.getDescription() == null || merge.getDescription().isEmpty() || trust == 0) {
merge.setDescription(longestLists(merge.getDescription(), enrich.getDescription()));
}
merge
.setDateofacceptance(
mergeDateOfAcceptance(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust));
merge.setPublisher(coalesce(merge.getPublisher(), enrich.getPublisher()));
merge.setEmbargoenddate(coalesce(merge.getEmbargoenddate(), enrich.getEmbargoenddate()));
merge.setSource(unionDistinctLists(merge.getSource(), enrich.getSource(), trust));
// ok
merge.setFulltext(unionDistinctLists(merge.getFulltext(), enrich.getFulltext(), trust));
// ok
merge.setFormat(unionDistinctLists(merge.getFormat(), enrich.getFormat(), trust));
// ok
merge.setContributor(unionDistinctLists(merge.getContributor(), enrich.getContributor(), trust));
// prima prendo l'higher trust, su questo prendo il valore migliore nelle istanze TODO
// trust maggiore ma a parita' di trust il piu' specifico (base del vocabolario)
// vedi note
// cannot use com.google.common.base.Objects.firstNonNull as it throws NPE when both terms are null
merge.setResourcetype(firstNonNull(merge.getResourcetype(), enrich.getResourcetype()));
// this field might contain the original type from the raw metadata, no strategy yet to merge it
merge.setResourcetype(coalesce(merge.getResourcetype(), enrich.getResourcetype()));
// ok
merge.setCoverage(unionDistinctLists(merge.getCoverage(), enrich.getCoverage(), trust));
// most open ok
if (enrich.getBestaccessright() != null
&& new AccessRightComparator<>()
.compare(enrich.getBestaccessright(), merge.getBestaccessright()) < 0) {
merge.setBestaccessright(enrich.getBestaccessright());
}
// TODO merge of datainfo given same id
merge.setContext(unionDistinctLists(merge.getContext(), enrich.getContext(), trust));
// merge datainfo for same context id
merge.setContext(mergeLists(merge.getContext(), enrich.getContext(), trust, Context::getId, (r, l) -> {
r.getDataInfo().addAll(l.getDataInfo());
return r;
}));
// ok
merge
.setExternalReference(
unionDistinctLists(merge.getExternalReference(), enrich.getExternalReference(), trust));
mergeExternalReference(merge.getExternalReference(), enrich.getExternalReference(), trust));
// instance enrichment or union
// review instance equals => add pid to comparision
if (!isAnEnrichment(merge) && !isAnEnrichment(enrich))
merge
.setInstance(
mergeLists(
merge.getInstance(), enrich.getInstance(), trust,
MergeUtils::instanceKeyExtractor,
MergeUtils::instanceMerger));
else {
if (!isAnEnrichment(merge) && !isAnEnrichment(enrich)) {
merge.setInstance(mergeInstances(merge.getInstance(), enrich.getInstance(), trust));
} else {
final List<Instance> enrichmentInstances = isAnEnrichment(merge) ? merge.getInstance()
: enrich.getInstance();
final List<Instance> enrichedInstances = isAnEnrichment(merge) ? enrich.getInstance()
@ -421,16 +450,123 @@ public class MergeUtils {
merge.setInstance(enrichInstances(enrichedInstances, enrichmentInstances));
}
merge.setEoscifguidelines(unionDistinctLists(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust));
merge
.setEoscifguidelines(
mergeEosciifguidelines(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust));
merge.setIsGreen(booleanOR(merge.getIsGreen(), enrich.getIsGreen()));
// OK but should be list of values
merge.setOpenAccessColor(chooseReference(merge.getOpenAccessColor(), enrich.getOpenAccessColor(), trust));
merge.setOpenAccessColor(coalesce(merge.getOpenAccessColor(), enrich.getOpenAccessColor()));
merge.setIsInDiamondJournal(booleanOR(merge.getIsInDiamondJournal(), enrich.getIsInDiamondJournal()));
merge.setPubliclyFunded(booleanOR(merge.getPubliclyFunded(), enrich.getPubliclyFunded()));
return merge;
}
private static Field<String> mergeDateOfAcceptance(Field<String> merge, Field<String> enrich, int trust) {
// higher trust then oldest date
if ((merge == null || trust == 0) && enrich != null) {
if (merge == null) {
return enrich;
} else {
try {
LocalDate merge_date = LocalDate.parse(merge.getValue(), DateTimeFormatter.ISO_DATE);
try {
LocalDate enrich_date = LocalDate.parse(enrich.getValue(), DateTimeFormatter.ISO_DATE);
if (enrich_date.getYear() > 1300
&& (merge_date.getYear() < 1300 || merge_date.isAfter(enrich_date))) {
return enrich;
}
} catch (NullPointerException | DateTimeParseException e) {
return merge;
}
} catch (NullPointerException | DateTimeParseException e) {
return enrich;
}
}
}
// keep value
return merge;
}
private static List<Instance> mergeInstances(List<Instance> v1, List<Instance> v2, int trust) {
return mergeLists(
v1, v2, trust,
MergeUtils::instanceKeyExtractor,
MergeUtils::instanceMerger);
}
private static List<EoscIfGuidelines> mergeEosciifguidelines(List<EoscIfGuidelines> v1, List<EoscIfGuidelines> v2,
int trust) {
return mergeLists(
v1, v2, trust, er -> Joiner
.on("||")
.useForNull("")
.join(er.getCode(), er.getLabel(), er.getUrl(), er.getSemanticRelation()),
(r, l) -> r);
}
private static List<ExternalReference> mergeExternalReference(List<ExternalReference> v1,
List<ExternalReference> v2, int trust) {
return mergeLists(
v1, v2, trust, er -> Joiner
.on(',')
.useForNull("")
.join(
er.getSitename(), er.getLabel(),
er.getUrl(), toString(er.getQualifier()), er.getRefidentifier(),
er.getQuery(), toString(er.getDataInfo())),
(r, l) -> r);
}
private static String toString(DataInfo di) {
return Joiner
.on(',')
.useForNull("")
.join(
di.getInvisible(), di.getInferred(), di.getDeletedbyinference(), di.getTrust(),
di.getInferenceprovenance(), toString(di.getProvenanceaction()));
}
private static String toString(Qualifier q) {
return Joiner
.on(',')
.useForNull("")
.join(q.getClassid(), q.getClassname(), q.getSchemeid(), q.getSchemename());
}
private static String toString(StructuredProperty sp) {
return Joiner
.on(',')
.useForNull("")
.join(toString(sp.getQualifier()), sp.getValue());
}
private static <T extends StructuredProperty> List<T> mergeStructuredProperties(List<T> v1, List<T> v2, int trust) {
return mergeLists(v1, v2, trust, MergeUtils::toString, (r, l) -> r);
}
private static <T extends Qualifier> List<T> mergeQualifiers(List<T> v1, List<T> v2, int trust) {
return mergeLists(v1, v2, trust, MergeUtils::toString, (r, l) -> r);
}
private static <T> T coalesce(T m, T e) {
return m != null ? m : e;
}
private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
List<List<Author>> authors = new ArrayList<>();
if (author != null) {
authors.add(author);
}
if (author1 != null) {
authors.add(author1);
}
return AuthorMerger.merge(authors);
}
private static String instanceKeyExtractor(Instance i) {
return String
.join(
@ -472,9 +608,9 @@ public class MergeUtils {
MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1));
i.setFulltext(selectFulltext(i1.getFulltext(), i2.getFulltext()));
i.setDateofacceptance(selectOldestDate(i1.getDateofacceptance(), i2.getDateofacceptance()));
i.setLicense(firstNonNull(i1.getLicense(), i2.getLicense()));
i.setProcessingchargeamount(firstNonNull(i1.getProcessingchargeamount(), i2.getProcessingchargeamount()));
i.setProcessingchargecurrency(firstNonNull(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency()));
i.setLicense(coalesce(i1.getLicense(), i2.getLicense()));
i.setProcessingchargeamount(coalesce(i1.getProcessingchargeamount(), i2.getProcessingchargeamount()));
i.setProcessingchargecurrency(coalesce(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency()));
i
.setMeasures(
mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1));
@ -698,7 +834,7 @@ public class MergeUtils {
* @param b the b
* @return the list
*/
public static List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) {
private static List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) {
if (a == null || b == null)
return a == null ? b : a;
@ -849,8 +985,8 @@ public class MergeUtils {
* single attribute only if in the current instance is missing
* The only repeatable field enriched is measures
*
* @param merge the current instance
* @param enrichment the enrichment instance
* @param merge the current instance
* @param enrichment the enrichment instance
*/
private static void applyEnrichment(final Instance merge, final Instance enrichment) {
if (merge == null || enrichment == null)

View File

@ -9,10 +9,18 @@ public class OrganizationPidComparator implements Comparator<StructuredProperty>
@Override
public int compare(StructuredProperty left, StructuredProperty right) {
if (left == null) {
return right == null ? 0 : -1;
} else if (right == null) {
return 1;
}
PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid());
PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid());
if (lClass.equals(rClass))
return 0;
if (lClass.equals(PidType.openorgs))
return -1;
if (rClass.equals(PidType.openorgs))

View File

@ -4,7 +4,6 @@ package eu.dnetlib.dhp.schema.oaf.utils;
import java.util.Comparator;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
/**
* Comparator for sorting the values from the dnet:review_levels vocabulary, implements the following ordering
@ -15,10 +14,18 @@ public class RefereedComparator implements Comparator<Qualifier> {
@Override
public int compare(Qualifier left, Qualifier right) {
if (left == null || left.getClassid() == null) {
return (right == null || right.getClassid() == null) ? 0 : -1;
} else if (right == null || right.getClassid() == null) {
return 1;
}
String lClass = left.getClassid();
String rClass = right.getClassid();
if (lClass.equals(rClass))
return 0;
if ("0001".equals(lClass))
return -1;
if ("0001".equals(rClass))

View File

@ -13,6 +13,9 @@ public class ResultPidComparator implements Comparator<StructuredProperty> {
PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid());
PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid());
if (lClass.equals(rClass))
return 0;
if (lClass.equals(PidType.doi))
return -1;
if (rClass.equals(PidType.doi))

View File

@ -14,6 +14,8 @@ import eu.dnetlib.dhp.schema.oaf.Result;
public class ResultTypeComparator implements Comparator<Result> {
public static final ResultTypeComparator INSTANCE = new ResultTypeComparator();
@Override
public int compare(Result left, Result right) {
@ -37,28 +39,27 @@ public class ResultTypeComparator implements Comparator<Result> {
String lClass = left.getResulttype().getClassid();
String rClass = right.getResulttype().getClassid();
if (lClass.equals(rClass))
return 0;
if (!lClass.equals(rClass)) {
if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return 1;
}
// Else (but unlikely), lexicographical ordering will do.
return lClass.compareTo(rClass);

View File

@ -20,7 +20,7 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
return suffixPrefixChain(s, param("mod"));
}
private Collection<String> suffixPrefixChain(String s, int mod) {
static Collection<String> suffixPrefixChain(String s, int mod) {
// create the list of words from the string (remove short words)
List<String> wordsList = Arrays
@ -38,7 +38,7 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
}
private Collection<String> doSuffixPrefixChain(List<String> wordsList, String prefix) {
static private Collection<String> doSuffixPrefixChain(List<String> wordsList, String prefix) {
Set<String> set = Sets.newLinkedHashSet();
switch (wordsList.size()) {
@ -80,12 +80,16 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
}
private String suffix(String s, int len) {
private static String suffix(String s, int len) {
return s.substring(s.length() - len);
}
private String prefix(String s, int len) {
private static String prefix(String s, int len) {
return s.substring(0, len);
}
static public void main(String[] args) {
String title = "MY LIFE AS A BOSON: THE STORY OF \"THE HIGGS\"".toLowerCase();
System.out.println(suffixPrefixChain(title, 10));
}
}

View File

@ -7,6 +7,7 @@ import java.util.stream.Stream;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
@ -94,44 +95,59 @@ public class DedupRecordFactory {
.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
.select("dedupId", "id", "kryoObject")
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
.map(
(MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(
t._1(), t._2(), t._3()),
Encoders.kryo(DedupRecordReduceState.class))
.groupByKey(
(MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
.reduceGroups(
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
if (t1.entity == null) {
t2.aliases.addAll(t1.aliases);
return t2;
.groupByKey((MapFunction<Tuple3<String, String, OafEntity>, String>) Tuple3::_1, Encoders.STRING())
.flatMapGroups(
(FlatMapGroupsFunction<String, Tuple3<String, String, OafEntity>, OafEntity>) (dedupId, it) -> {
if (!it.hasNext())
return Collections.emptyIterator();
final ArrayList<OafEntity> cliques = new ArrayList<>();
final ArrayList<String> aliases = new ArrayList<>();
final HashSet<String> acceptanceDate = new HashSet<>();
while (it.hasNext()) {
Tuple3<String, String, OafEntity> t = it.next();
OafEntity entity = t._3();
if (entity == null) {
aliases.add(t._2());
} else {
cliques.add(entity);
if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
if (Result.class.isAssignableFrom(entity.getClass())) {
Result result = (Result) entity;
if (result.getDateofacceptance() != null
&& StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
acceptanceDate.add(result.getDateofacceptance().getValue());
}
}
}
}
}
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) {
return Collections.emptyIterator();
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
return t1;
})
.flatMap((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
String dedupId = t._1();
DedupRecordReduceState agg = t._2();
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator());
// dedup records do not have date of transformation attribute
mergedEntity.setDateoftransformation(null);
if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) {
return Collections.emptyIterator();
}
return Stream
.concat(
Stream
.of(dedupId)
.map(id -> createDedupOafEntity(id, mergedEntity, dataInfo, ts)),
aliases
.stream()
.map(id -> createMergedDedupAliasOafEntity(id, mergedEntity, dataInfo, ts)))
.iterator();
return Stream
.concat(
Stream
.of(agg.getDedupId())
.map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)),
agg.aliases
.stream()
.map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts)))
.iterator();
}, beanEncoder);
}, beanEncoder);
}
private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) {

View File

@ -130,10 +130,12 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication {
switch (mode) {
case claim:
save(
inputRdd
.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
.reduceByKey(MergeUtils::merge)
.map(Tuple2::_2),
inputRdd.keyBy(oaf -> ModelSupport.idFn().apply(oaf))
.groupByKey()
.map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())),
//.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
//.reduceByKey(MergeUtils::merge)
//.map(Tuple2::_2),
targetPath);
break;
case graph:

View File

@ -8,6 +8,8 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkCreateInputGraph {
def main(args: Array[String]): Unit = {
@ -131,8 +133,9 @@ object SparkCreateInputGraph {
val ds: Dataset[T] = spark.read.load(sourcePath).as[T]
ds.groupByKey(_.getId)
.reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
.map(_._2)
.mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] }
// .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
// .map(_)
.write
.mode(SaveMode.Overwrite)
.save(targetPath)