Merge pull request 'Unify merge logic of entities in MergeUtils.class' (#370) from mergeutils into beta

Reviewed-on: #370
mergeutils
Claudio Atzori 1 month ago
commit 7ae7e8aa06

@ -1,24 +1,6 @@
package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.when;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
@ -26,169 +8,186 @@ import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.when;
/**
* Groups the graph content by entity identifier to ensure ID uniqueness
*/
public class GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
private ArgumentApplicationParser parser;
public GroupEntitiesSparkJob(ArgumentApplicationParser parser) {
this.parser = parser;
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GroupEntitiesSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
new GroupEntitiesSparkJob(parser).run(isSparkSessionManaged, isLookupService);
}
public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService)
throws ISLookUpException {
String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath);
String checkpointPath = parser.get("checkpointPath");
log.info("checkpointPath: {}", checkpointPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible, vocs);
});
}
private static void groupEntities(
SparkSession spark,
String inputPath,
String checkpointPath,
String outputPath,
boolean filterInvisible, VocabularyGroup vocs) {
Dataset<OafEntity> allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
String entityInputPath = inputPath + "/" + entity;
if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) {
continue;
}
allEntities = allEntities
.union(
((Dataset<OafEntity>) spark
.read()
.schema(Encoders.bean(entityClass).schema())
.json(entityInputPath)
.filter("length(id) > 0")
.as(Encoders.bean(entityClass)))
.map((MapFunction<OafEntity, OafEntity>) r -> r, OAFENTITY_KRYO_ENC));
}
Dataset<?> groupedEntities = allEntities
.map(
(MapFunction<OafEntity, OafEntity>) entity -> GraphCleaningFunctions
.applyCoarVocabularies(entity, vocs),
OAFENTITY_KRYO_ENC)
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<OafEntity>) OafMapperUtils::mergeEntities)
.map(
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
t._2().getClass().getName(), t._2()),
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
// pivot on "_1" (classname of the entity)
// created columns containing only entities of the same class
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
groupedEntities = groupedEntities
.withColumn(
entity,
when(col("_1").equalTo(entityClass.getName()), col("_2")));
}
groupedEntities
.drop("_1", "_2")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.save(checkpointPath);
ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size());
ModelSupport.entityTypes
.entrySet()
.stream()
.map(e -> parPool.submit(() -> {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
spark
.read()
.load(checkpointPath)
.select(col(entity).as("value"))
.filter("value IS NOT NULL")
.as(OAFENTITY_KRYO_ENC)
.map((MapFunction<OafEntity, OafEntity>) r -> r, (Encoder<OafEntity>) Encoders.bean(entityClass))
.filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + entity);
}))
.collect(Collectors.toList())
.forEach(t -> {
try {
t.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
private ArgumentApplicationParser parser;
public GroupEntitiesSparkJob(ArgumentApplicationParser parser) {
this.parser = parser;
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GroupEntitiesSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
new GroupEntitiesSparkJob(parser).run(isSparkSessionManaged, isLookupService);
}
public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService)
throws ISLookUpException {
String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath);
String checkpointPath = parser.get("checkpointPath");
log.info("checkpointPath: {}", checkpointPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible, vocs);
});
}
private static void groupEntities(
SparkSession spark,
String inputPath,
String checkpointPath,
String outputPath,
boolean filterInvisible, VocabularyGroup vocs) {
Dataset<OafEntity> allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
String entityInputPath = inputPath + "/" + entity;
if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) {
continue;
}
allEntities = allEntities
.union(
((Dataset<OafEntity>) spark
.read()
.schema(Encoders.bean(entityClass).schema())
.json(entityInputPath)
.filter("length(id) > 0")
.as(Encoders.bean(entityClass)))
.map((MapFunction<OafEntity, OafEntity>) r -> r, OAFENTITY_KRYO_ENC));
}
Dataset<?> groupedEntities = allEntities
.map(
(MapFunction<OafEntity, OafEntity>) entity -> GraphCleaningFunctions
.applyCoarVocabularies(entity, vocs),
OAFENTITY_KRYO_ENC)
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<OafEntity>) MergeUtils::checkedMerge)
.map(
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
t._2().getClass().getName(), t._2()),
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
// pivot on "_1" (classname of the entity)
// created columns containing only entities of the same class
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
groupedEntities = groupedEntities
.withColumn(
entity,
when(col("_1").equalTo(entityClass.getName()), col("_2")));
}
groupedEntities
.drop("_1", "_2")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.save(checkpointPath);
ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size());
ModelSupport.entityTypes
.entrySet()
.stream()
.map(e -> parPool.submit(() -> {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
spark
.read()
.load(checkpointPath)
.select(col(entity).as("value"))
.filter("value IS NOT NULL")
.as(OAFENTITY_KRYO_ENC)
.map((MapFunction<OafEntity, OafEntity>) r -> r, (Encoder<OafEntity>) Encoders.bean(entityClass))
.filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + entity);
}))
.collect(Collectors.toList())
.forEach(t -> {
try {
t.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}

@ -0,0 +1,79 @@
package eu.dnetlib.dhp.schema.oaf.utils;
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Optional;
import java.util.stream.Collectors;
public class MergeComparator implements Comparator<Oaf> {
public MergeComparator() {
}
public int compare(Oaf left, Oaf right) {
// nulls at the end
if (left == null && right == null) {
return 0;
} else if (left == null) {
return -1;
} else if (right == null) {
return 1;
}
// invisible
if (left.getDataInfo() != null && left.getDataInfo().getInvisible() == true) {
if (right.getDataInfo() != null && right.getDataInfo().getInvisible() == false) {
return -1;
}
}
// collectedfrom
HashSet<String> lCf = getCollectedFromIds(left);
HashSet<String> rCf = getCollectedFromIds(right);
if (lCf.contains("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2") && !rCf.contains("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2")) {
return -1;
} else if (!lCf.contains("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2") && rCf.contains("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2")) {
return 1;
}
SubEntityType lClass = SubEntityType.fromClass(left.getClass());
SubEntityType rClass = SubEntityType.fromClass(right.getClass());
return lClass.ordinal() - rClass.ordinal();
}
protected HashSet<String> getCollectedFromIds(Oaf left) {
return (HashSet) Optional.ofNullable(left.getCollectedfrom()).map((cf) -> {
return (HashSet) cf.stream().map(KeyValue::getKey).collect(Collectors.toCollection(HashSet::new));
}).orElse(new HashSet());
}
enum SubEntityType {
publication, dataset, software, otherresearchproduct, datasource, organization, project;
/**
* Resolves the EntityType, given the relative class name
*
* @param clazz the given class name
* @param <T> actual OafEntity subclass
* @return the EntityType associated to the given class
*/
public static <T extends Oaf> SubEntityType fromClass(Class<T> clazz) {
return valueOf(clazz.getSimpleName().toLowerCase());
}
}
}

@ -0,0 +1,707 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import eu.dnetlib.dhp.schema.common.AccessRightComparator;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import java.text.ParseException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.google.common.base.Objects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
public class MergeUtils {
public static <T extends Oaf> T checkedMerge(final T left, final T right) {
return (T) merge(left, right, false);
}
public static Oaf merge(final Oaf left, final Oaf right) {
return merge(left, right, false);
}
public static Oaf merge(final Oaf left, final Oaf right, boolean checkDelegatedAuthority) {
if (sameClass(left, right, OafEntity.class)) {
return mergeEntities(left, right, checkDelegatedAuthority);
} else if (sameClass(left, right, Relation.class)) {
return mergeRelation((Relation) left, (Relation) right);
} else {
throw new RuntimeException(
String
.format(
"MERGE_FROM_AND_GET incompatible types: %s, %s",
left.getClass().getCanonicalName(), right.getClass().getCanonicalName()));
}
}
private static <T extends Oaf> boolean sameClass(Object left, Object right, Class<T> cls) {
return cls.isAssignableFrom(left.getClass()) && cls.isAssignableFrom(right.getClass());
}
private static Oaf mergeEntities(Oaf left, Oaf right, boolean checkDelegatedAuthority) {
if (sameClass(left, right, Result.class)) {
if (!left.getClass().equals(right.getClass()) || checkDelegatedAuthority) {
return mergeResultsOfDifferentTypes((Result)left, (Result) right);
}
if (sameClass(left, right, Publication.class)) {
return mergePublication((Publication) left, (Publication) right);
}
if (sameClass(left, right, Dataset.class)) {
return mergeDataset((Dataset) left, (Dataset) right);
}
if (sameClass(left, right, OtherResearchProduct.class)) {
return mergeORP((OtherResearchProduct) left, (OtherResearchProduct) right);
}
if (sameClass(left, right, Software.class)) {
return mergeSoftware((Software) left, (Software) right);
}
return mergeResult((Result) left, (Result) right);
} else if (sameClass(left, right, Datasource.class)) {
// TODO
final int trust = compareTrust(left, right);
return mergeOafEntityFields((Datasource) left, (Datasource) right, trust);
} else if (sameClass(left, right, Organization.class)) {
return mergeOrganization((Organization) left, (Organization) right);
} else if (sameClass(left, right, Project.class)) {
return mergeProject((Project) left, (Project) right);
} else {
throw new RuntimeException(
String
.format(
"MERGE_FROM_AND_GET incompatible types: %s, %s",
left.getClass().getCanonicalName(), right.getClass().getCanonicalName()));
}
}
/**
* This method is used in the global result grouping phase. It checks if one of the two is from a delegated authority
* https://graph.openaire.eu/docs/data-model/pids-and-identifiers#delegated-authorities and in that case it prefers
* such version.
* <p>
* Otherwise, it considers a resulttype priority order implemented in {@link ResultTypeComparator}
* and proceeds with the canonical property merging.
*
* @param left
* @param right
* @return
*/
private static <T extends Result> T mergeResultsOfDifferentTypes(T left, T right) {
final boolean leftFromDelegatedAuthority = isFromDelegatedAuthority(left);
final boolean rightFromDelegatedAuthority = isFromDelegatedAuthority(right);
if (leftFromDelegatedAuthority && !rightFromDelegatedAuthority) {
return left;
}
if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) {
return right;
}
//TODO: raise trust to have preferred fields from one or the other??
if (new ResultTypeComparator().compare(left, right) < 0) {
return mergeResult(left, right);
} else {
return mergeResult(right, left);
}
}
private static DataInfo chooseDataInfo(DataInfo left, DataInfo right, int trust) {
if (trust > 0) {
return left;
} else if (trust == 0) {
if (left == null || (left.getInvisible() != null && left.getInvisible().equals(Boolean.TRUE))) {
return right;
} else {
return left;
}
} else {
return right;
}
}
private static String chooseString(String left, String right, int trust) {
if (trust > 0) {
return left;
} else if (trust == 0) {
return StringUtils.isNotBlank(left) ? left : right;
} else {
return right;
}
}
private static <T> T chooseReference(T left, T right, int trust) {
if (trust > 0) {
return left;
} else if (trust == 0) {
return left != null ? left : right;
} else {
return right;
}
}
private static Long max(Long left, Long right) {
if (left == null)
return right;
if (right == null)
return left;
return Math.max(left, right);
}
// trust ??
private static Boolean booleanOR(Boolean a, Boolean b) {
if (a == null) {
return b;
} else if (b == null) {
return a;
}
return a || b;
}
private static <T> List<T> unionDistinctLists(final List<T> left, final List<T> right, int trust) {
if (left == null) {
return right;
} else if (right == null) {
return left;
}
List<T> h = trust >= 0 ? left : right;
List<T> l = trust >= 0 ? right : left;
return Stream.concat(h.stream(), l.stream())
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
}
private static List<String> unionDistinctListOfString(final List<String> l, final List<String> r) {
if (l == null) {
return r;
} else if (r == null) {
return l;
}
return Stream.concat(l.stream(), r.stream())
.filter(StringUtils::isNotBlank)
.distinct()
.collect(Collectors.toList());
}
//TODO review
private static List<KeyValue> mergeKeyValue(List<KeyValue> left, List<KeyValue> right, int trust) {
if (trust < 0) {
List<KeyValue> s = left;
left = right;
right = s;
}
HashMap<String, KeyValue> values = new HashMap<>();
left.forEach(kv -> values.put(kv.getKey(), kv));
right.forEach(kv -> values.putIfAbsent(kv.getKey(), kv));
return new ArrayList<>(values.values());
}
private static List<StructuredProperty> unionTitle(List<StructuredProperty> left, List<StructuredProperty> right, int trust) {
if (left == null) {
return right;
} else if (right == null) {
return left;
}
List<StructuredProperty> h = trust >= 0 ? left : right;
List<StructuredProperty> l = trust >= 0 ? right : left;
return Stream.concat(h.stream(), l.stream())
.filter(Objects::isNull)
.distinct()
.collect(Collectors.toList());
}
/**
* Internal utility that merges the common OafEntity fields
*
* @param merged
* @param enrich
* @param <T>
* @return
*/
private static <T extends Oaf> T mergeOafFields(T merged, T enrich, int trust) {
//TODO: union of all values, but what does it mean with KeyValue pairs???
merged.setCollectedfrom(mergeKeyValue(merged.getCollectedfrom(), enrich.getCollectedfrom(), trust));
merged.setDataInfo(chooseDataInfo(merged.getDataInfo(), enrich.getDataInfo(), trust));
merged.setLastupdatetimestamp(max(merged.getLastupdatetimestamp(), enrich.getLastupdatetimestamp()));
return merged;
}
/**
* Internal utility that merges the common OafEntity fields
*
* @param original
* @param enrich
* @param <T>
* @return
*/
private static <T extends OafEntity> T mergeOafEntityFields(T original, T enrich, int trust) {
final T merged = mergeOafFields(original, enrich, trust);
merged.setOriginalId(unionDistinctListOfString(merged.getOriginalId(), enrich.getOriginalId()));
merged.setPid(unionDistinctLists(merged.getPid(), enrich.getPid(), trust));
// dateofcollection mettere today quando si fa merge
merged.setDateofcollection(chooseString(merged.getDateofcollection(), enrich.getDateofcollection(), trust));
// setDateoftransformation mettere vuota in dedup, nota per Claudio
merged.setDateoftransformation(chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust));
// TODO: was missing in OafEntity.merge
merged.setExtraInfo(unionDistinctLists(merged.getExtraInfo(), enrich.getExtraInfo(), trust));
//oaiprovenanze da mettere a null quando si genera merge
merged.setOaiprovenance(chooseReference(merged.getOaiprovenance(), enrich.getOaiprovenance(), trust));
merged.setMeasures(unionDistinctLists(merged.getMeasures(), enrich.getMeasures(), trust));
return merged;
}
public static <T extends Relation> T mergeRelation(T original, T enrich) {
int trust = compareTrust(original, enrich);
T merge = mergeOafFields(original, enrich, trust);
checkArgument(Objects.equals(merge.getSource(), enrich.getSource()), "source ids must be equal");
checkArgument(Objects.equals(merge.getTarget(), enrich.getTarget()), "target ids must be equal");
checkArgument(Objects.equals(merge.getRelType(), enrich.getRelType()), "relType(s) must be equal");
checkArgument(
Objects.equals(merge.getSubRelType(), enrich.getSubRelType()), "subRelType(s) must be equal");
checkArgument(Objects.equals(merge.getRelClass(), enrich.getRelClass()), "relClass(es) must be equal");
//merge.setProvenance(mergeLists(merge.getProvenance(), enrich.getProvenance()));
//TODO: trust ??
merge.setValidated(booleanOR(merge.getValidated(), enrich.getValidated()));
try {
merge.setValidationDate(ModelSupport.oldest(merge.getValidationDate(), enrich.getValidationDate()));
} catch (ParseException e) {
throw new IllegalArgumentException(String
.format(
"invalid validation date format in relation [s:%s, t:%s]: %s", merge.getSource(),
merge.getTarget(),
merge.getValidationDate()));
}
// TODO keyvalue merge
merge.setProperties(mergeKeyValue(merge.getProperties(), enrich.getProperties(), trust));
return merge;
}
public static <T extends Result> T mergeResult(T original, T enrich) {
final int trust = compareTrust(original, enrich);
T merge = mergeOafEntityFields(original, enrich, trust);
if (merge.getProcessingchargeamount() == null || StringUtils.isBlank(merge.getProcessingchargeamount().getValue())) {
merge.setProcessingchargeamount(enrich.getProcessingchargeamount());
merge.setProcessingchargecurrency(enrich.getProcessingchargecurrency());
}
// author = usare la stessa logica che in dedup
merge.setAuthor(chooseReference(merge.getAuthor(), enrich.getAuthor(), trust));
// il primo che mi arriva secondo l'ordinamento per priorita'
merge.setResulttype(chooseReference(merge.getResulttype(), enrich.getResulttype(), trust));
// gestito come il resulttype perche' e' un subtype
merge.setMetaResourceType(chooseReference(merge.getMetaResourceType(), enrich.getMetaResourceType(), trust));
// spostiamo nell'instance e qui prendo il primo che arriva
merge.setLanguage(chooseReference(merge.getLanguage(), enrich.getLanguage(), trust));
// country lasicamo,o cosi' -> parentesi sul datainfo
merge.setCountry(unionDistinctLists(merge.getCountry(), enrich.getCountry(), trust));
//ok
merge.setSubject(unionDistinctLists(merge.getSubject(), enrich.getSubject(), trust));
// union per priority quindi vanno in append
merge.setTitle(unionTitle(merge.getTitle(), enrich.getTitle(), trust));
//ok
merge.setRelevantdate(unionDistinctLists(merge.getRelevantdate(), enrich.getRelevantdate(), trust));
// prima trust e poi longest list
merge.setDescription(longestLists(merge.getDescription(), enrich.getDescription()));
// trust piu' alto e poi piu' vecchia
merge.setDateofacceptance(chooseReference(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust));
// ok, ma publisher va messo ripetibile
merge.setPublisher(chooseReference(merge.getPublisher(), enrich.getPublisher(), trust));
// ok
merge.setEmbargoenddate(chooseReference(merge.getEmbargoenddate(), enrich.getEmbargoenddate(), trust));
// ok
merge.setSource(unionDistinctLists(merge.getSource(), enrich.getSource(), trust));
// ok
merge.setFulltext(unionDistinctLists(merge.getFulltext(), enrich.getFulltext(), trust));
// ok
merge.setFormat(unionDistinctLists(merge.getFormat(), enrich.getFormat(), trust));
// ok
merge.setContributor(unionDistinctLists(merge.getContributor(), enrich.getContributor(), trust));
// prima prendo l'higher trust, su questo prendo il valore migliore nelle istanze TODO
// trust maggiore ma a parita' di trust il piu' specifico (base del vocabolario)
// vedi note
merge.setResourcetype(firstNonNull(merge.getResourcetype(), enrich.getResourcetype()));
// ok
merge.setCoverage(unionDistinctLists(merge.getCoverage(), enrich.getCoverage(), trust));
// most open ok
if (enrich.getBestaccessright() != null
&& new AccessRightComparator<>()
.compare(enrich.getBestaccessright(), merge.getBestaccessright()) < 0) {
merge.setBestaccessright(enrich.getBestaccessright());
}
// TODO merge of datainfo given same id
merge.setContext(unionDistinctLists(merge.getContext(), enrich.getContext(), trust));
//ok
merge.setExternalReference(unionDistinctLists(merge.getExternalReference(), enrich.getExternalReference(), trust));
//instance enrichment or union
// review instance equals => add pid to comparision
if (!isAnEnrichment(merge) && !isAnEnrichment(enrich))
merge.setInstance(unionDistinctLists(merge.getInstance(), enrich.getInstance(), trust));
else {
final List<Instance> enrichmentInstances = isAnEnrichment(merge) ? merge.getInstance()
: enrich.getInstance();
final List<Instance> enrichedInstances = isAnEnrichment(merge) ? enrich.getInstance()
: merge.getInstance();
if (isAnEnrichment(merge))
merge.setDataInfo(enrich.getDataInfo());
merge.setInstance(enrichInstances(enrichedInstances, enrichmentInstances));
}
merge.setEoscifguidelines(unionDistinctLists(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust));
merge.setIsGreen(booleanOR(merge.getIsGreen(), enrich.getIsGreen()));
// OK but should be list of values
merge.setOpenAccessColor(chooseReference(merge.getOpenAccessColor(), enrich.getOpenAccessColor(), trust));
merge.setIsInDiamondJournal(booleanOR(merge.getIsInDiamondJournal(), enrich.getIsInDiamondJournal()));
merge.setPubliclyFunded(booleanOR(merge.getPubliclyFunded(), enrich.getPubliclyFunded()));
return merge;
}
private static <T extends OtherResearchProduct> T mergeORP(T original, T enrich) {
int trust = compareTrust(original, enrich);
final T merge = mergeResult(original, enrich);
merge.setContactperson(unionDistinctLists(merge.getContactperson(), enrich.getContactperson(), trust));
merge.setContactgroup(unionDistinctLists(merge.getContactgroup(), enrich.getContactgroup(), trust));
merge.setTool(unionDistinctLists(merge.getTool(), enrich.getTool(), trust));
return merge;
}
private static <T extends Software> T mergeSoftware(T original, T enrich) {
int trust = compareTrust(original, enrich);
final T merge = mergeResult(original, enrich);
merge.setDocumentationUrl(unionDistinctLists(merge.getDocumentationUrl(), enrich.getDocumentationUrl(), trust));
merge.setLicense(unionDistinctLists(merge.getLicense(), enrich.getLicense(), trust));
merge.setCodeRepositoryUrl(chooseReference(merge.getCodeRepositoryUrl(), enrich.getCodeRepositoryUrl(), trust));
merge.setProgrammingLanguage(chooseReference(merge.getProgrammingLanguage(), enrich.getProgrammingLanguage(), trust));
return merge;
}
private static <T extends Dataset> T mergeDataset(T original, T enrich) {
int trust = compareTrust(original, enrich);
T merge = mergeResult(original, enrich);
merge.setStoragedate(chooseReference(merge.getStoragedate(), enrich.getStoragedate(), trust));
merge.setDevice(chooseReference(merge.getDevice(), enrich.getDevice(), trust));
merge.setSize(chooseReference(merge.getSize(), enrich.getSize(), trust));
merge.setVersion(chooseReference(merge.getVersion(), enrich.getVersion(), trust));
merge.setLastmetadataupdate(chooseReference(merge.getLastmetadataupdate(), enrich.getLastmetadataupdate(), trust));
merge.setMetadataversionnumber(chooseReference(merge.getMetadataversionnumber(), enrich.getMetadataversionnumber(), trust));
merge.setGeolocation(unionDistinctLists(merge.getGeolocation(), enrich.getGeolocation(), trust));
return merge;
}
public static <T extends Publication> T mergePublication(T original, T enrich) {
final int trust = compareTrust(original, enrich);
T merged = mergeResult(original, enrich);
merged.setJournal(chooseReference(merged.getJournal(), enrich.getJournal(), trust));
return merged;
}
private static <T extends Organization> T mergeOrganization(T left, T enrich) {
int trust = compareTrust(left, enrich);
T merged = mergeOafEntityFields(left, enrich, trust);
merged.setLegalshortname(chooseReference(merged.getLegalshortname(), enrich.getLegalshortname(), trust));
merged.setLegalname(chooseReference(merged.getLegalname(), enrich.getLegalname(), trust));
merged.setAlternativeNames(unionDistinctLists(enrich.getAlternativeNames(), merged.getAlternativeNames(), trust));
merged.setWebsiteurl(chooseReference(merged.getWebsiteurl(), enrich.getWebsiteurl(), trust));
merged.setLogourl(chooseReference(merged.getLogourl(), enrich.getLogourl(), trust));
merged.setEclegalbody(chooseReference(merged.getEclegalbody(), enrich.getEclegalbody(), trust));
merged.setEclegalperson(chooseReference(merged.getEclegalperson(), enrich.getEclegalperson(), trust));
merged.setEcnonprofit(chooseReference(merged.getEcnonprofit(), enrich.getEcnonprofit(), trust));
merged.setEcresearchorganization(chooseReference(merged.getEcresearchorganization(), enrich.getEcresearchorganization(), trust));
merged.setEchighereducation(chooseReference(merged.getEchighereducation(), enrich.getEchighereducation(), trust));
merged.setEcinternationalorganizationeurinterests(chooseReference(merged.getEcinternationalorganizationeurinterests(), enrich.getEcinternationalorganizationeurinterests(), trust));
merged.setEcinternationalorganization(chooseReference(merged.getEcinternationalorganization(), enrich.getEcinternationalorganization(), trust));
merged.setEcenterprise(chooseReference(merged.getEcenterprise(), enrich.getEcenterprise(), trust));
merged.setEcsmevalidated(chooseReference(merged.getEcsmevalidated(), enrich.getEcsmevalidated(), trust));
merged.setEcnutscode(chooseReference(merged.getEcnutscode(), enrich.getEcnutscode(), trust));
merged.setCountry(chooseReference(merged.getCountry(), enrich.getCountry(), trust));
return merged;
}
public static <T extends Project> T mergeProject(T original, T enrich) {
int trust = compareTrust(original, enrich);
T merged = mergeOafEntityFields(original, enrich, trust);
merged.setWebsiteurl(chooseReference(merged.getWebsiteurl(), enrich.getWebsiteurl(), trust));
merged.setCode(chooseReference(merged.getCode(), enrich.getCode(), trust));
merged.setAcronym(chooseReference(merged.getAcronym(), enrich.getAcronym(), trust));
merged.setTitle(chooseReference(merged.getTitle(), enrich.getTitle(), trust));
merged.setStartdate(chooseReference(merged.getStartdate(), enrich.getStartdate(), trust));
merged.setEnddate(chooseReference(merged.getEnddate(), enrich.getEnddate(), trust));
merged.setCallidentifier(chooseReference(merged.getCallidentifier(), enrich.getCallidentifier(), trust));
merged.setKeywords(chooseReference(merged.getKeywords(), enrich.getKeywords(), trust));
merged.setDuration(chooseReference(merged.getDuration(), enrich.getDuration(), trust));
merged.setEcsc39(chooseReference(merged.getEcsc39(), enrich.getEcsc39(), trust));
merged.setOamandatepublications(chooseReference(merged.getOamandatepublications(), enrich.getOamandatepublications(), trust));
merged.setEcarticle29_3(chooseReference(merged.getEcarticle29_3(), enrich.getEcarticle29_3(), trust));
merged.setSubjects(unionDistinctLists(merged.getSubjects(), enrich.getSubjects(), trust));
merged.setFundingtree(unionDistinctLists(merged.getFundingtree(), enrich.getFundingtree(), trust));
merged.setContracttype(chooseReference(merged.getContracttype(), enrich.getContracttype(), trust));
merged.setOptional1(chooseReference(merged.getOptional1(), enrich.getOptional1(), trust));
merged.setOptional2(chooseReference(merged.getOptional2(), enrich.getOptional2(), trust));
merged.setJsonextrainfo(chooseReference(merged.getJsonextrainfo(), enrich.getJsonextrainfo(), trust));
merged.setContactfullname(chooseReference(merged.getContactfullname(), enrich.getContactfullname(), trust));
merged.setContactfax(chooseReference(merged.getContactfax(), enrich.getContactfax(), trust));
merged.setContactphone(chooseReference(merged.getContactphone(), enrich.getContactphone(), trust));
merged.setContactemail(chooseReference(merged.getContactemail(), enrich.getContactemail(), trust));
merged.setSummary(chooseReference(merged.getSummary(), enrich.getSummary(), trust));
merged.setCurrency(chooseReference(merged.getCurrency(), enrich.getCurrency(), trust));
//missin in Project.merge
merged.setTotalcost(chooseReference(merged.getTotalcost(), enrich.getTotalcost(), trust));
merged.setFundedamount(chooseReference(merged.getFundedamount(), enrich.getFundedamount(), trust));
// trust ??
if (enrich.getH2020topiccode() != null && StringUtils.isEmpty(merged.getH2020topiccode())) {
merged.setH2020topiccode(enrich.getH2020topiccode());
merged.setH2020topicdescription(enrich.getH2020topicdescription());
}
merged.setH2020classification(unionDistinctLists(merged.getH2020classification(), enrich.getH2020classification(), trust));
return merged;
}
/**
* Longest lists list.
*
* @param a the a
* @param b the b
* @return the list
*/
public static List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) {
if (a == null || b == null)
return a == null ? b : a;
return a.size() >= b.size() ? a : b;
}
/**
* This main method apply the enrichment of the instances
*
* @param toEnrichInstances the instances that could be enriched
* @param enrichmentInstances the enrichment instances
* @return list of instances possibly enriched
*/
private static List<Instance> enrichInstances(final List<Instance> toEnrichInstances,
final List<Instance> enrichmentInstances) {
final List<Instance> enrichmentResult = new ArrayList<>();
if (toEnrichInstances == null) {
return enrichmentResult;
}
if (enrichmentInstances == null) {
return enrichmentResult;
}
Map<String, Instance> ri = toInstanceMap(enrichmentInstances);
toEnrichInstances.forEach(i -> {
final List<Instance> e = findEnrichmentsByPID(i.getPid(), ri);
if (e != null && e.size() > 0) {
e.forEach(enr -> applyEnrichment(i, enr));
} else {
final List<Instance> a = findEnrichmentsByPID(i.getAlternateIdentifier(), ri);
if (a != null && a.size() > 0) {
a.forEach(enr -> applyEnrichment(i, enr));
}
}
enrichmentResult.add(i);
});
return enrichmentResult;
}
/**
* This method converts the list of instance enrichments
* into a Map where the key is the normalized identifier
* and the value is the instance itself
*
* @param ri the list of enrichment instances
* @return the result map
*/
private static Map<String, Instance> toInstanceMap(final List<Instance> ri) {
return ri
.stream()
.filter(i -> i.getPid() != null || i.getAlternateIdentifier() != null)
.flatMap(i -> {
final List<Pair<String, Instance>> result = new ArrayList<>();
if (i.getPid() != null)
i
.getPid()
.stream()
.filter(MergeUtils::validPid)
.forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i)));
if (i.getAlternateIdentifier() != null)
i
.getAlternateIdentifier()
.stream()
.filter(MergeUtils::validPid)
.forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i)));
return result.stream();
})
.collect(
Collectors
.toMap(
Pair::getLeft,
Pair::getRight,
(a, b) -> a));
}
private static boolean isFromDelegatedAuthority(Result r) {
return Optional
.ofNullable(r.getInstance())
.map(
instance -> instance
.stream()
.filter(i -> Objects.nonNull(i.getCollectedfrom()))
.map(i -> i.getCollectedfrom().getKey())
.anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId)))
.orElse(false);
}
/**
* Valid pid boolean.
*
* @param p the p
* @return the boolean
*/
private static boolean validPid(final StructuredProperty p) {
return p.getValue() != null && p.getQualifier() != null && p.getQualifier().getClassid() != null;
}
/**
* Normalize pid string.
*
* @param pid the pid
* @return the string
*/
private static String extractKeyFromPid(final StructuredProperty pid) {
if (pid == null)
return null;
final StructuredProperty normalizedPid = CleaningFunctions.normalizePidValue(pid);
return String.format("%s::%s", normalizedPid.getQualifier().getClassid(), normalizedPid.getValue());
}
/**
* This utility method finds the list of enrichment instances
* that match one or more PIDs in the input list
*
* @param pids the list of PIDs
* @param enrichments the List of enrichment instances having the same pid
* @return the list
*/
private static List<Instance> findEnrichmentsByPID(final List<StructuredProperty> pids,
final Map<String, Instance> enrichments) {
if (pids == null || enrichments == null)
return null;
return pids
.stream()
.map(MergeUtils::extractKeyFromPid)
.map(enrichments::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/**
* Is an enrichment boolean.
*
* @param e the e
* @return the boolean
*/
private static boolean isAnEnrichment(OafEntity e) {
return e.getDataInfo() != null &&
e.getDataInfo().getProvenanceaction() != null
&& ModelConstants.PROVENANCE_ENRICH.equalsIgnoreCase(e.getDataInfo().getProvenanceaction().getClassid());
}
/**
* This method apply enrichment on a single instance
* The enrichment consists of replacing values on
* single attribute only if in the current instance is missing
* The only repeatable field enriched is measures
*
* @param merge the current instance
* @param enrichment the enrichment instance
*/
private static void applyEnrichment(final Instance merge, final Instance enrichment) {
if (merge == null || enrichment == null)
return;
merge.setLicense(firstNonNull(merge.getLicense(), enrichment.getLicense()));
merge.setAccessright(firstNonNull(merge.getAccessright(), enrichment.getAccessright()));
merge.setInstancetype(firstNonNull(merge.getInstancetype(), enrichment.getInstancetype()));
merge.setInstanceTypeMapping(firstNonNull(merge.getInstanceTypeMapping(), enrichment.getInstanceTypeMapping()));
merge.setHostedby(firstNonNull(merge.getHostedby(), enrichment.getHostedby()));
merge.setUrl(unionDistinctLists(merge.getUrl(), enrichment.getUrl(), 0));
merge.setDistributionlocation(firstNonNull(merge.getDistributionlocation(), enrichment.getDistributionlocation()));
merge.setCollectedfrom(firstNonNull(merge.getCollectedfrom(), enrichment.getCollectedfrom()));
// pid and alternateId are used for matching
merge.setDateofacceptance(firstNonNull(merge.getDateofacceptance(), enrichment.getDateofacceptance()));
merge.setProcessingchargeamount(firstNonNull(merge.getProcessingchargeamount(), enrichment.getProcessingchargeamount()));
merge.setProcessingchargecurrency(firstNonNull(merge.getProcessingchargecurrency(), enrichment.getProcessingchargecurrency()));
merge.setRefereed(firstNonNull(merge.getRefereed(), enrichment.getRefereed()));
merge.setMeasures(unionDistinctLists(merge.getMeasures(), enrichment.getMeasures(), 0));
merge.setFulltext(firstNonNull(merge.getFulltext(), enrichment.getFulltext()));
}
private static int compareTrust(Oaf a, Oaf b) {
String left = Optional
.ofNullable(a.getDataInfo())
.map(DataInfo::getTrust)
.orElse("0.0");
String right = Optional
.ofNullable(b.getDataInfo())
.map(DataInfo::getTrust)
.orElse("0.0");
return left.compareTo(right);
}
}

@ -14,7 +14,6 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.common.AccessRightComparator;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
public class OafMapperUtils {
@ -22,65 +21,6 @@ public class OafMapperUtils {
private OafMapperUtils() {
}
public static Oaf merge(final Oaf left, final Oaf right) {
if (ModelSupport.isSubClass(left, OafEntity.class)) {
return mergeEntities((OafEntity) left, (OafEntity) right);
} else if (ModelSupport.isSubClass(left, Relation.class)) {
((Relation) left).mergeFrom((Relation) right);
} else {
throw new IllegalArgumentException("invalid Oaf type:" + left.getClass().getCanonicalName());
}
return left;
}
public static OafEntity mergeEntities(OafEntity left, OafEntity right) {
if (ModelSupport.isSubClass(left, Result.class)) {
return mergeResults((Result) left, (Result) right);
} else if (ModelSupport.isSubClass(left, Datasource.class)) {
left.mergeFrom(right);
} else if (ModelSupport.isSubClass(left, Organization.class)) {
left.mergeFrom(right);
} else if (ModelSupport.isSubClass(left, Project.class)) {
left.mergeFrom(right);
} else {
throw new IllegalArgumentException("invalid OafEntity subtype:" + left.getClass().getCanonicalName());
}
return left;
}
public static Result mergeResults(Result left, Result right) {
final boolean leftFromDelegatedAuthority = isFromDelegatedAuthority(left);
final boolean rightFromDelegatedAuthority = isFromDelegatedAuthority(right);
if (leftFromDelegatedAuthority && !rightFromDelegatedAuthority) {
return left;
}
if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) {
return right;
}
if (new ResultTypeComparator().compare(left, right) < 0) {
left.mergeFrom(right);
return left;
} else {
right.mergeFrom(left);
return right;
}
}
private static boolean isFromDelegatedAuthority(Result r) {
return Optional
.ofNullable(r.getInstance())
.map(
instance -> instance
.stream()
.filter(i -> Objects.nonNull(i.getCollectedfrom()))
.map(i -> i.getCollectedfrom().getKey())
.anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId)))
.orElse(false);
}
public static KeyValue keyValue(final String k, final String v) {
final KeyValue kv = new KeyValue();
kv.setKey(k);

@ -0,0 +1,111 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
public class MergeUtilsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@Test
void testMergePubs() throws IOException {
Publication p1 = read("publication_1.json", Publication.class);
Publication p2 = read("publication_2.json", Publication.class);
Dataset d1 = read("dataset_1.json", Dataset.class);
Dataset d2 = read("dataset_2.json", Dataset.class);
assertEquals(1, p1.getCollectedfrom().size());
assertEquals(ModelConstants.CROSSREF_ID, p1.getCollectedfrom().get(0).getKey());
assertEquals(1, d2.getCollectedfrom().size());
assertFalse(cfId(d2.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
assertEquals(1, p2.getCollectedfrom().size());
assertFalse(cfId(p2.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
assertEquals(1, d1.getCollectedfrom().size());
assertTrue(cfId(d1.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
final Result p1d2 = MergeUtils.checkedMerge(p1, d2);
assertEquals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID, p1d2.getResulttype().getClassid());
assertTrue(p1d2 instanceof Publication);
assertEquals(p1.getId(), p1d2.getId());
}
@Test
void testMergePubs_1() throws IOException {
Publication p2 = read("publication_2.json", Publication.class);
Dataset d1 = read("dataset_1.json", Dataset.class);
final Result p2d1 = MergeUtils.checkedMerge(p2, d1);
assertEquals((ModelConstants.DATASET_RESULTTYPE_CLASSID), p2d1.getResulttype().getClassid());
assertTrue(p2d1 instanceof Dataset);
assertEquals(d1.getId(), p2d1.getId());
assertEquals(2, p2d1.getCollectedfrom().size());
}
@Test
void testMergePubs_2() throws IOException {
Publication p1 = read("publication_1.json", Publication.class);
Publication p2 = read("publication_2.json", Publication.class);
Result p1p2 = MergeUtils.checkedMerge(p1, p2);
assertTrue(p1p2 instanceof Publication);
assertEquals(p1.getId(), p1p2.getId());
assertEquals(2, p1p2.getCollectedfrom().size());
}
@Test
void testDelegatedAuthority_1() throws IOException {
Dataset d1 = read("dataset_2.json", Dataset.class);
Dataset d2 = read("dataset_delegated.json", Dataset.class);
assertEquals(1, d2.getCollectedfrom().size());
assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID));
Result res = (Result) MergeUtils.merge(d1, d2, true);
assertEquals(d2, res);
}
@Test
void testDelegatedAuthority_2() throws IOException {
Dataset p1 = read("publication_1.json", Dataset.class);
Dataset d2 = read("dataset_delegated.json", Dataset.class);
assertEquals(1, d2.getCollectedfrom().size());
assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID));
Result res = (Result) MergeUtils.merge(p1, d2, true);
assertEquals(d2, res);
}
protected HashSet<String> cfId(List<KeyValue> collectedfrom) {
return collectedfrom.stream().map(KeyValue::getKey).collect(Collectors.toCollection(HashSet::new));
}
protected <T extends Result> T read(String filename, Class<T> clazz) throws IOException {
final String json = IOUtils.toString(getClass().getResourceAsStream(filename));
return OBJECT_MAPPER.readValue(json, clazz);
}
}

@ -149,7 +149,7 @@ class OafMapperUtilsTest {
void testDate() {
final String date = GraphCleaningFunctions.cleanDate("23-FEB-1998");
assertNotNull(date);
System.out.println(date);
assertEquals("1998-02-23", date);
}
@Test
@ -166,8 +166,8 @@ class OafMapperUtilsTest {
assertEquals(
ModelConstants.PUBLICATION_RESULTTYPE_CLASSID,
OafMapperUtils
.mergeResults(p1, d2)
MergeUtils
.mergeResult(p1, d2)
.getResulttype()
.getClassid());
@ -178,8 +178,8 @@ class OafMapperUtilsTest {
assertEquals(
ModelConstants.DATASET_RESULTTYPE_CLASSID,
OafMapperUtils
.mergeResults(p2, d1)
MergeUtils
.mergeResult(p2, d1)
.getResulttype()
.getClassid());
}
@ -192,7 +192,7 @@ class OafMapperUtilsTest {
assertEquals(1, d2.getCollectedfrom().size());
assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID));
Result res = OafMapperUtils.mergeResults(d1, d2);
Result res = MergeUtils.mergeResult(d1, d2);
assertEquals(d2, res);

@ -1,14 +1,13 @@
package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import java.util.function.BiFunction;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
/** OAF model merging support. */
public class MergeAndGet {
@ -46,20 +45,7 @@ public class MergeAndGet {
}
private static <G extends Oaf, A extends Oaf> G mergeFromAndGet(G x, A y) {
if (isSubClass(x, Relation.class) && isSubClass(y, Relation.class)) {
((Relation) x).mergeFrom((Relation) y);
return x;
} else if (isSubClass(x, OafEntity.class)
&& isSubClass(y, OafEntity.class)
&& isSubClass(x, y)) {
((OafEntity) x).mergeFrom((OafEntity) y);
return x;
}
throw new RuntimeException(
String
.format(
"MERGE_FROM_AND_GET incompatible types: %s, %s",
x.getClass().getCanonicalName(), y.getClass().getCanonicalName()));
return (G) MergeUtils.merge(x, y);
}
@SuppressWarnings("unchecked")

@ -34,6 +34,11 @@ public class BipProjectModel {
String totalCitationCount;
public String getProjectId() {
return projectId;
}
// each project bip measure has exactly one value, hence one key-value pair
private Measure createMeasure(String measureId, String measureValue) {

@ -1,29 +1,9 @@
package eu.dnetlib.dhp.actionmanager.project;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProgramme;
import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProject;
import eu.dnetlib.dhp.actionmanager.project.utils.model.EXCELTopic;
import eu.dnetlib.dhp.actionmanager.project.utils.model.JsonTopic;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
@ -33,9 +13,27 @@ import eu.dnetlib.dhp.schema.oaf.H2020Classification;
import eu.dnetlib.dhp.schema.oaf.H2020Programme;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* Class that makes the ActionSet. To prepare the AS two joins are needed
*
@ -160,9 +158,11 @@ public class SparkAtomicActionJob {
(MapFunction<Project, String>) OafEntity::getId,
Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Project, Project>) (s, it) -> {
Project first = it.next();
it.forEachRemaining(first::mergeFrom);
return first;
Project merge = it.next();
while (it.hasNext()) {
merge = MergeUtils.mergeProject(merge, it.next());
}
return merge;
}, Encoders.bean(Project.class))
.toJavaRDD()
.map(p -> new AtomicAction(Project.class, p))

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.dedup;
import java.util.*;
import java.util.stream.Stream;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
@ -14,6 +15,7 @@ import org.apache.spark.sql.*;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
@ -23,190 +25,190 @@ import scala.Tuple3;
import scala.collection.JavaConversions;
public class DedupRecordFactory {
public static final class DedupRecordReduceState {
public final String dedupId;
public final ArrayList<String> aliases = new ArrayList<>();
public final HashSet<String> acceptanceDate = new HashSet<>();
public OafEntity entity;
public DedupRecordReduceState(String dedupId, String id, OafEntity entity) {
this.dedupId = dedupId;
this.entity = entity;
if (entity == null) {
aliases.add(id);
} else {
if (Result.class.isAssignableFrom(entity.getClass())) {
Result result = (Result) entity;
if (result.getDateofacceptance() != null
&& StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
acceptanceDate.add(result.getDateofacceptance().getValue());
}
}
}
}
public String getDedupId() {
return dedupId;
}
}
private static final int MAX_ACCEPTANCE_DATE = 20;
private DedupRecordFactory() {
}
public static Dataset<OafEntity> createDedupRecord(
final SparkSession spark,
final DataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<OafEntity> clazz) {
final long ts = System.currentTimeMillis();
final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz);
final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz);
// <id, json_entity>
Dataset<Row> entities = spark
.read()
.schema(Encoders.bean(clazz).schema())
.json(entitiesInputPath)
.as(beanEncoder)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> {
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), kryoEncoder))
.selectExpr("_1 AS id", "_2 AS kryoObject");
// <source, target>: source is the dedup_id, target is the id of the mergedIn
Dataset<Row> mergeRels = spark
.read()
.load(mergeRelsInputPath)
.where("relClass == 'merges'")
.selectExpr("source as dedupId", "target as id");
return mergeRels
.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
.select("dedupId", "id", "kryoObject")
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
.map(
(MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(
t._1(), t._2(), t._3()),
Encoders.kryo(DedupRecordReduceState.class))
.groupByKey(
(MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
.reduceGroups(
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
if (t1.entity == null) {
t2.aliases.addAll(t1.aliases);
return t2;
}
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
return t1;
})
.flatMap((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
String dedupId = t._1();
DedupRecordReduceState agg = t._2();
if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) {
return Collections.emptyIterator();
}
return Stream
.concat(
Stream
.of(agg.getDedupId())
.map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)),
agg.aliases
.stream()
.map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts)))
.iterator();
}, beanEncoder);
}
private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) {
try {
OafEntity res = (OafEntity) BeanUtils.cloneBean(base);
res.setId(id);
res.setDataInfo(dataInfo);
res.setLastupdatetimestamp(ts);
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static OafEntity createMergedDedupAliasOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) {
try {
OafEntity res = createDedupOafEntity(id, base, dataInfo, ts);
DataInfo ds = (DataInfo) BeanUtils.cloneBean(dataInfo);
ds.setDeletedbyinference(true);
res.setDataInfo(ds);
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) {
if (duplicate == null) {
return entity;
}
int compare = new IdentifierComparator<>()
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
if (compare > 0) {
OafEntity swap = duplicate;
duplicate = entity;
entity = swap;
}
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result re = (Result) entity;
Result rd = (Result) duplicate;
List<List<Author>> authors = new ArrayList<>();
if (re.getAuthor() != null) {
authors.add(re.getAuthor());
}
if (rd.getAuthor() != null) {
authors.add(rd.getAuthor());
}
re.setAuthor(AuthorMerger.merge(authors));
}
return entity;
}
public static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) {
T base = entities.next()._2();
while (entities.hasNext()) {
T duplicate = entities.next()._2();
if (duplicate != null)
base = (T) reduceEntity(base, duplicate);
}
base.setId(id);
base.setDataInfo(dataInfo);
base.setLastupdatetimestamp(ts);
return base;
}
public static final class DedupRecordReduceState {
public final String dedupId;
public final ArrayList<String> aliases = new ArrayList<>();
public final HashSet<String> acceptanceDate = new HashSet<>();
public OafEntity entity;
public DedupRecordReduceState(String dedupId, String id, OafEntity entity) {
this.dedupId = dedupId;
this.entity = entity;
if (entity == null) {
aliases.add(id);
} else {
if (Result.class.isAssignableFrom(entity.getClass())) {
Result result = (Result) entity;
if (result.getDateofacceptance() != null
&& StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
acceptanceDate.add(result.getDateofacceptance().getValue());
}
}
}
}
public String getDedupId() {
return dedupId;
}
}
private static final int MAX_ACCEPTANCE_DATE = 20;
private DedupRecordFactory() {
}
public static Dataset<OafEntity> createDedupRecord(
final SparkSession spark,
final DataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<OafEntity> clazz) {
final long ts = System.currentTimeMillis();
final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz);
final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz);
// <id, json_entity>
Dataset<Row> entities = spark
.read()
.schema(Encoders.bean(clazz).schema())
.json(entitiesInputPath)
.as(beanEncoder)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> {
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), kryoEncoder))
.selectExpr("_1 AS id", "_2 AS kryoObject");
// <source, target>: source is the dedup_id, target is the id of the mergedIn
Dataset<Row> mergeRels = spark
.read()
.load(mergeRelsInputPath)
.where("relClass == 'merges'")
.selectExpr("source as dedupId", "target as id");
return mergeRels
.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
.select("dedupId", "id", "kryoObject")
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
.map(
(MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(
t._1(), t._2(), t._3()),
Encoders.kryo(DedupRecordReduceState.class))
.groupByKey(
(MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
.reduceGroups(
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
if (t1.entity == null) {
t2.aliases.addAll(t1.aliases);
return t2;
}
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
return t1;
})
.flatMap((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
String dedupId = t._1();
DedupRecordReduceState agg = t._2();
if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) {
return Collections.emptyIterator();
}
return Stream
.concat(
Stream
.of(agg.getDedupId())
.map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)),
agg.aliases
.stream()
.map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts)))
.iterator();
}, beanEncoder);
}
private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) {
try {
OafEntity res = (OafEntity) BeanUtils.cloneBean(base);
res.setId(id);
res.setDataInfo(dataInfo);
res.setLastupdatetimestamp(ts);
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static OafEntity createMergedDedupAliasOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) {
try {
OafEntity res = createDedupOafEntity(id, base, dataInfo, ts);
DataInfo ds = (DataInfo) BeanUtils.cloneBean(dataInfo);
ds.setDeletedbyinference(true);
res.setDataInfo(ds);
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) {
if (duplicate == null) {
return entity;
}
int compare = new IdentifierComparator<>()
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
if (compare > 0) {
OafEntity swap = duplicate;
duplicate = entity;
entity = swap;
}
entity = MergeUtils.checkedMerge(entity, duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result re = (Result) entity;
Result rd = (Result) duplicate;
List<List<Author>> authors = new ArrayList<>();
if (re.getAuthor() != null) {
authors.add(re.getAuthor());
}
if (rd.getAuthor() != null) {
authors.add(rd.getAuthor());
}
re.setAuthor(AuthorMerger.merge(authors));
}
return entity;
}
public static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) {
T base = entities.next()._2();
while (entities.hasNext()) {
T duplicate = entities.next()._2();
if (duplicate != null)
base = (T) reduceEntity(base, duplicate);
}
base.setId(id);
base.setDataInfo(dataInfo);
base.setLastupdatetimestamp(ts);
return base;
}
}

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@ -127,10 +128,8 @@ public class SparkPropagateRelation extends AbstractSparkAction {
(MapFunction<Relation, String>) r -> String
.join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
Encoders.STRING())
.reduceGroups((ReduceFunction<Relation>) (b, a) -> {
b.mergeFrom(a);
return b;
})
.reduceGroups((ReduceFunction<Relation>) MergeUtils::mergeRelation
)
.map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC);
final String outputRelationPath = graphOutputPath + "/relation";

@ -3,17 +3,18 @@ package eu.dnetlib.doiboost
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.oa.merge.AuthorMerger
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils
import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.doiboost.mag.ConversionUtil
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions.col
import org.apache.spark.sql._
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkGenerateDoiBoost {
@ -78,8 +79,10 @@ object SparkGenerateDoiBoost {
if (item._2 != null) {
val otherPub = item._2._2
if (otherPub != null) {
crossrefPub.mergeFrom(otherPub)
crossrefPub.setAuthor(AuthorMerger.mergeAuthor(crossrefPub.getAuthor, otherPub.getAuthor))
val mergedAuthor = AuthorMerger.mergeAuthor(crossrefPub.getAuthor, otherPub.getAuthor)
val res = MergeUtils.mergePublication(crossrefPub, otherPub)
res.setAuthor(mergedAuthor);
return res
}
}
crossrefPub
@ -130,14 +133,13 @@ object SparkGenerateDoiBoost {
// So we have to merge
val b1 = left._2
val b2 = right._2
b1.mergeFrom(b2)
b1.mergeOAFDataInfo(b2)
val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
b1.setAuthor(authors)
val merged = MergeUtils.mergePublication(b1, b2);
merged.setAuthor(authors)
if (b2.getId != null && b2.getId.nonEmpty)
b1.setId(b2.getId)
merged.setId(b2.getId)
//Return publication Merged
(b1.getId, b1)
(merged.getId, merged)
}
} else {
// Left is Null so we return right

@ -1,8 +1,8 @@
package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, StructuredProperty, Subject}
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, MergeUtils}
import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, Subject}
import eu.dnetlib.doiboost.DoiBoostMappingUtil
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import org.json4s
@ -142,8 +142,7 @@ case object ConversionUtil {
def mergePublication(a: Publication, b: Publication): Publication = {
if ((a != null) && (b != null)) {
a.mergeFrom(b)
a
MergeUtils.mergePublication(a, b)
} else {
if (a == null) b else a
}

@ -149,7 +149,7 @@ public class SparkResultToCommunityFromOrganizationJob {
}
}
// res.setContext(propagatedContexts);
// ret.mergeFrom(res);
// return MergeUtils.mergeResult(ret, res);
}
return ret;
};

@ -13,6 +13,7 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@ -24,8 +25,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
@ -162,7 +161,7 @@ public class SparkResultToCommunityFromProject implements Serializable {
}
}
res.setContext(propagatedContexts);
ret.mergeFrom(res);
return MergeUtils.checkedMerge(ret, res);
}
return ret;
};

@ -25,118 +25,118 @@ import scala.Tuple2;
public class SparkResultToCommunityThroughSemRelJob {
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToCommunityThroughSemRelJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", preparedInfoPath);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
@SuppressWarnings("unchecked")
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
if (saveGraph) {
execPropagation(
spark, inputPath, outputPath, preparedInfoPath, resultClazz);
}
});
}
private static <R extends Result> void execPropagation(
SparkSession spark,
String inputPath,
String outputPath,
String preparedInfoPath,
Class<R> resultClazz) {
Dataset<ResultCommunityList> possibleUpdates = readPath(spark, preparedInfoPath, ResultCommunityList.class);
Dataset<R> result = readPath(spark, inputPath, resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(contextUpdaterFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> contextUpdaterFn() {
return value -> {
R ret = value._1();
Optional<ResultCommunityList> rcl = Optional.ofNullable(value._2());
if (rcl.isPresent()) {
Set<String> contexts = new HashSet<>();
ret.getContext().forEach(c -> contexts.add(c.getId()));
rcl
.get()
.getCommunityList()
.stream()
.forEach(
c -> {
if (!contexts.contains(c)) {
Context newContext = new Context();
newContext.setId(c);
newContext
.setDataInfo(
Arrays
.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS)));
ret.getContext().add(newContext);
}
});
}
return ret;
};
}
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToCommunityThroughSemRelJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", preparedInfoPath);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
@SuppressWarnings("unchecked")
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
if (saveGraph) {
execPropagation(
spark, inputPath, outputPath, preparedInfoPath, resultClazz);
}
});
}
private static <R extends Result> void execPropagation(
SparkSession spark,
String inputPath,
String outputPath,
String preparedInfoPath,
Class<R> resultClazz) {
Dataset<ResultCommunityList> possibleUpdates = readPath(spark, preparedInfoPath, ResultCommunityList.class);
Dataset<R> result = readPath(spark, inputPath, resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(contextUpdaterFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> contextUpdaterFn() {
return value -> {
R ret = value._1();
Optional<ResultCommunityList> rcl = Optional.ofNullable(value._2());
if (rcl.isPresent()) {
Set<String> contexts = new HashSet<>();
ret.getContext().forEach(c -> contexts.add(c.getId()));
rcl
.get()
.getCommunityList()
.stream()
.forEach(
c -> {
if (!contexts.contains(c)) {
Context newContext = new Context();
newContext.setId(c);
newContext
.setDataInfo(
Arrays
.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS)));
ret.getContext().add(newContext);
}
});
}
return ret;
};
}
}

@ -7,6 +7,7 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -24,7 +25,6 @@ import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
/**
@ -248,11 +248,11 @@ public class MergeGraphTableSparkJob {
private T mergeAndGet(T b, T a) {
if (Objects.nonNull(a) && Objects.nonNull(b)) {
if (ModelSupport.isSubClass(a, OafEntity.class) && ModelSupport.isSubClass(b, OafEntity.class)) {
return (T) OafMapperUtils.mergeEntities((OafEntity) b, (OafEntity) a);
return (T) MergeUtils.merge(b, a);
}
if (a instanceof Relation && b instanceof Relation) {
((Relation) a).mergeFrom(b);
return a;
return (T) MergeUtils.mergeRelation((Relation)a, (Relation) b);
}
}
return Objects.isNull(a) ? b : a;

@ -1,14 +1,16 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
@ -16,26 +18,17 @@ import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class GenerateEntitiesApplication extends AbstractMigrationApplication {
@ -137,7 +130,7 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication {
save(
inputRdd
.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
.reduceByKey(OafMapperUtils::merge)
.reduceByKey(MergeUtils::merge)
.map(Tuple2::_2),
targetPath);
break;

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.common.EntityType
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils
import eu.dnetlib.dhp.schema.oaf.{Dataset => OafDataset, _}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
@ -118,15 +119,12 @@ object SparkResolveEntities {
currentEntityDataset
.joinWith(re, currentEntityDataset("_1").equalTo(re("_1")), "left")
.map(k => {
val a = k._1
val b = k._2
if (b == null)
a._2
else {
a._2.mergeFrom(b._2)
a._2
}
else
MergeUtils.mergeResult(a._2, b._2)
})
.map(r => mapper.writeValueAsString(r))(Encoders.STRING)
.write

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils
import eu.dnetlib.dhp.schema.oaf.{Dataset => OafDataset, _}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
@ -130,10 +131,7 @@ object SparkCreateInputGraph {
val ds: Dataset[T] = spark.read.load(sourcePath).as[T]
ds.groupByKey(_.getId)
.reduceGroups { (x: T, y: T) =>
x.mergeFrom(y)
x
}
.reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
.map(_._2)
.write
.mode(SaveMode.Overwrite)

@ -1,13 +1,12 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.List;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.dom4j.DocumentException;
import org.junit.jupiter.api.BeforeEach;
@ -16,13 +15,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.IOException;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class)
class GenerateEntitiesApplicationTest {
@ -72,7 +70,7 @@ class GenerateEntitiesApplicationTest {
protected <T extends Result> void verifyMerge(Result publication, Result dataset, Class<T> clazz,
String resultType) {
final Result merge = OafMapperUtils.mergeResults(publication, dataset);
final Result merge = MergeUtils.mergeResult(publication, dataset);
assertTrue(clazz.isAssignableFrom(merge.getClass()));
assertEquals(resultType, merge.getResulttype().getClassid());
}

Loading…
Cancel
Save