diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index 0225a5063..eadca231e 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -1,24 +1,6 @@ package eu.dnetlib.dhp.oa.merge; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static org.apache.spark.sql.functions.col; -import static org.apache.spark.sql.functions.when; - -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.ReduceFunction; -import org.apache.spark.sql.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; @@ -26,169 +8,186 @@ import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.when; + /** * Groups the graph content by entity identifier to ensure ID uniqueness */ public class GroupEntitiesSparkJob { - private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); - private static final Encoder OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class); + private static final Encoder OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class); - private ArgumentApplicationParser parser; + private ArgumentApplicationParser parser; - public GroupEntitiesSparkJob(ArgumentApplicationParser parser) { - this.parser = parser; - } + public GroupEntitiesSparkJob(ArgumentApplicationParser parser) { + this.parser = parser; + } - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - GroupEntitiesSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + String jsonConfiguration = IOUtils + .toString( + GroupEntitiesSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); - final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); + final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); - new GroupEntitiesSparkJob(parser).run(isSparkSessionManaged, isLookupService); - } + new GroupEntitiesSparkJob(parser).run(isSparkSessionManaged, isLookupService); + } - public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService) - throws ISLookUpException { + public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService) + throws ISLookUpException { - String graphInputPath = parser.get("graphInputPath"); - log.info("graphInputPath: {}", graphInputPath); + String graphInputPath = parser.get("graphInputPath"); + log.info("graphInputPath: {}", graphInputPath); - String checkpointPath = parser.get("checkpointPath"); - log.info("checkpointPath: {}", checkpointPath); + String checkpointPath = parser.get("checkpointPath"); + log.info("checkpointPath: {}", checkpointPath); - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible")); - log.info("filterInvisible: {}", filterInvisible); + boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible")); + log.info("filterInvisible: {}", filterInvisible); - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookUpService); + final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookUpService); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration()); - groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible, vocs); - }); - } + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration()); + groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible, vocs); + }); + } - private static void groupEntities( - SparkSession spark, - String inputPath, - String checkpointPath, - String outputPath, - boolean filterInvisible, VocabularyGroup vocs) { + private static void groupEntities( + SparkSession spark, + String inputPath, + String checkpointPath, + String outputPath, + boolean filterInvisible, VocabularyGroup vocs) { - Dataset allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC); + Dataset allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC); - for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { - String entity = e.getKey().name(); - Class entityClass = e.getValue(); - String entityInputPath = inputPath + "/" + entity; + for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); + String entityInputPath = inputPath + "/" + entity; - if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) { - continue; - } + if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) { + continue; + } - allEntities = allEntities - .union( - ((Dataset) spark - .read() - .schema(Encoders.bean(entityClass).schema()) - .json(entityInputPath) - .filter("length(id) > 0") - .as(Encoders.bean(entityClass))) - .map((MapFunction) r -> r, OAFENTITY_KRYO_ENC)); - } + allEntities = allEntities + .union( + ((Dataset) spark + .read() + .schema(Encoders.bean(entityClass).schema()) + .json(entityInputPath) + .filter("length(id) > 0") + .as(Encoders.bean(entityClass))) + .map((MapFunction) r -> r, OAFENTITY_KRYO_ENC)); + } - Dataset groupedEntities = allEntities - .map( - (MapFunction) entity -> GraphCleaningFunctions - .applyCoarVocabularies(entity, vocs), - OAFENTITY_KRYO_ENC) - .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) - .reduceGroups((ReduceFunction) OafMapperUtils::mergeEntities) - .map( - (MapFunction, Tuple2>) t -> new Tuple2<>( - t._2().getClass().getName(), t._2()), - Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC)); + Dataset groupedEntities = allEntities + .map( + (MapFunction) entity -> GraphCleaningFunctions + .applyCoarVocabularies(entity, vocs), + OAFENTITY_KRYO_ENC) + .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) + .reduceGroups((ReduceFunction) MergeUtils::checkedMerge) + .map( + (MapFunction, Tuple2>) t -> new Tuple2<>( + t._2().getClass().getName(), t._2()), + Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC)); - // pivot on "_1" (classname of the entity) - // created columns containing only entities of the same class - for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { - String entity = e.getKey().name(); - Class entityClass = e.getValue(); + // pivot on "_1" (classname of the entity) + // created columns containing only entities of the same class + for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); - groupedEntities = groupedEntities - .withColumn( - entity, - when(col("_1").equalTo(entityClass.getName()), col("_2"))); - } + groupedEntities = groupedEntities + .withColumn( + entity, + when(col("_1").equalTo(entityClass.getName()), col("_2"))); + } - groupedEntities - .drop("_1", "_2") - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .save(checkpointPath); + groupedEntities + .drop("_1", "_2") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .save(checkpointPath); - ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size()); + ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size()); - ModelSupport.entityTypes - .entrySet() - .stream() - .map(e -> parPool.submit(() -> { - String entity = e.getKey().name(); - Class entityClass = e.getValue(); + ModelSupport.entityTypes + .entrySet() + .stream() + .map(e -> parPool.submit(() -> { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); - spark - .read() - .load(checkpointPath) - .select(col(entity).as("value")) - .filter("value IS NOT NULL") - .as(OAFENTITY_KRYO_ENC) - .map((MapFunction) r -> r, (Encoder) Encoders.bean(entityClass)) - .filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE") - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/" + entity); - })) - .collect(Collectors.toList()) - .forEach(t -> { - try { - t.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); - } + spark + .read() + .load(checkpointPath) + .select(col(entity).as("value")) + .filter("value IS NOT NULL") + .as(OAFENTITY_KRYO_ENC) + .map((MapFunction) r -> r, (Encoder) Encoders.bean(entityClass)) + .filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + entity); + })) + .collect(Collectors.toList()) + .forEach(t -> { + try { + t.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeComparator.java new file mode 100644 index 000000000..ffb0e600a --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeComparator.java @@ -0,0 +1,79 @@ +package eu.dnetlib.dhp.schema.oaf.utils; + +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by FernFlower decompiler) +// + + +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; + +import java.util.Comparator; +import java.util.HashSet; +import java.util.Optional; +import java.util.stream.Collectors; + +public class MergeComparator implements Comparator { + public MergeComparator() { + } + + public int compare(Oaf left, Oaf right) { + // nulls at the end + if (left == null && right == null) { + return 0; + } else if (left == null) { + return -1; + } else if (right == null) { + return 1; + } + + // invisible + if (left.getDataInfo() != null && left.getDataInfo().getInvisible() == true) { + if (right.getDataInfo() != null && right.getDataInfo().getInvisible() == false) { + return -1; + } + } + + // collectedfrom + HashSet lCf = getCollectedFromIds(left); + HashSet rCf = getCollectedFromIds(right); + if (lCf.contains("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2") && !rCf.contains("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2")) { + return -1; + } else if (!lCf.contains("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2") && rCf.contains("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2")) { + return 1; + } + + + + SubEntityType lClass = SubEntityType.fromClass(left.getClass()); + SubEntityType rClass = SubEntityType.fromClass(right.getClass()); + return lClass.ordinal() - rClass.ordinal(); + + } + + protected HashSet getCollectedFromIds(Oaf left) { + return (HashSet) Optional.ofNullable(left.getCollectedfrom()).map((cf) -> { + return (HashSet) cf.stream().map(KeyValue::getKey).collect(Collectors.toCollection(HashSet::new)); + }).orElse(new HashSet()); + } + + enum SubEntityType { + publication, dataset, software, otherresearchproduct, datasource, organization, project; + + /** + * Resolves the EntityType, given the relative class name + * + * @param clazz the given class name + * @param actual OafEntity subclass + * @return the EntityType associated to the given class + */ + public static SubEntityType fromClass(Class clazz) { + return valueOf(clazz.getSimpleName().toLowerCase()); + } + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java new file mode 100644 index 000000000..072a86bf1 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java @@ -0,0 +1,707 @@ + +package eu.dnetlib.dhp.schema.oaf.utils; + +import eu.dnetlib.dhp.schema.common.AccessRightComparator; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + +import java.text.ParseException; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.google.common.base.Objects.firstNonNull; +import static com.google.common.base.Preconditions.checkArgument; + +public class MergeUtils { + + public static T checkedMerge(final T left, final T right) { + return (T) merge(left, right, false); + } + + public static Oaf merge(final Oaf left, final Oaf right) { + return merge(left, right, false); + } + + public static Oaf merge(final Oaf left, final Oaf right, boolean checkDelegatedAuthority) { + if (sameClass(left, right, OafEntity.class)) { + return mergeEntities(left, right, checkDelegatedAuthority); + } else if (sameClass(left, right, Relation.class)) { + return mergeRelation((Relation) left, (Relation) right); + } else { + throw new RuntimeException( + String + .format( + "MERGE_FROM_AND_GET incompatible types: %s, %s", + left.getClass().getCanonicalName(), right.getClass().getCanonicalName())); + } + } + + private static boolean sameClass(Object left, Object right, Class cls) { + return cls.isAssignableFrom(left.getClass()) && cls.isAssignableFrom(right.getClass()); + } + + private static Oaf mergeEntities(Oaf left, Oaf right, boolean checkDelegatedAuthority) { + + if (sameClass(left, right, Result.class)) { + if (!left.getClass().equals(right.getClass()) || checkDelegatedAuthority) { + return mergeResultsOfDifferentTypes((Result)left, (Result) right); + } + + if (sameClass(left, right, Publication.class)) { + return mergePublication((Publication) left, (Publication) right); + } + if (sameClass(left, right, Dataset.class)) { + return mergeDataset((Dataset) left, (Dataset) right); + } + if (sameClass(left, right, OtherResearchProduct.class)) { + return mergeORP((OtherResearchProduct) left, (OtherResearchProduct) right); + } + if (sameClass(left, right, Software.class)) { + return mergeSoftware((Software) left, (Software) right); + } + + return mergeResult((Result) left, (Result) right); + } else if (sameClass(left, right, Datasource.class)) { + // TODO + final int trust = compareTrust(left, right); + return mergeOafEntityFields((Datasource) left, (Datasource) right, trust); + } else if (sameClass(left, right, Organization.class)) { + return mergeOrganization((Organization) left, (Organization) right); + } else if (sameClass(left, right, Project.class)) { + return mergeProject((Project) left, (Project) right); + } else { + throw new RuntimeException( + String + .format( + "MERGE_FROM_AND_GET incompatible types: %s, %s", + left.getClass().getCanonicalName(), right.getClass().getCanonicalName())); + } + } + + /** + * This method is used in the global result grouping phase. It checks if one of the two is from a delegated authority + * https://graph.openaire.eu/docs/data-model/pids-and-identifiers#delegated-authorities and in that case it prefers + * such version. + *

+ * Otherwise, it considers a resulttype priority order implemented in {@link ResultTypeComparator} + * and proceeds with the canonical property merging. + * + * @param left + * @param right + * @return + */ + private static T mergeResultsOfDifferentTypes(T left, T right) { + + final boolean leftFromDelegatedAuthority = isFromDelegatedAuthority(left); + final boolean rightFromDelegatedAuthority = isFromDelegatedAuthority(right); + + if (leftFromDelegatedAuthority && !rightFromDelegatedAuthority) { + return left; + } + if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) { + return right; + } + //TODO: raise trust to have preferred fields from one or the other?? + if (new ResultTypeComparator().compare(left, right) < 0) { + return mergeResult(left, right); + } else { + return mergeResult(right, left); + } + } + + private static DataInfo chooseDataInfo(DataInfo left, DataInfo right, int trust) { + if (trust > 0) { + return left; + } else if (trust == 0) { + if (left == null || (left.getInvisible() != null && left.getInvisible().equals(Boolean.TRUE))) { + return right; + } else { + return left; + } + } else { + return right; + } + } + + private static String chooseString(String left, String right, int trust) { + if (trust > 0) { + return left; + } else if (trust == 0) { + return StringUtils.isNotBlank(left) ? left : right; + } else { + return right; + } + } + + private static T chooseReference(T left, T right, int trust) { + if (trust > 0) { + return left; + } else if (trust == 0) { + return left != null ? left : right; + } else { + return right; + } + } + + private static Long max(Long left, Long right) { + if (left == null) + return right; + if (right == null) + return left; + + return Math.max(left, right); + } + + // trust ?? + private static Boolean booleanOR(Boolean a, Boolean b) { + if (a == null) { + return b; + } else if (b == null) { + return a; + } + + return a || b; + } + + + private static List unionDistinctLists(final List left, final List right, int trust) { + if (left == null) { + return right; + } else if (right == null) { + return left; + } + + List h = trust >= 0 ? left : right; + List l = trust >= 0 ? right : left; + + return Stream.concat(h.stream(), l.stream()) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + } + + private static List unionDistinctListOfString(final List l, final List r) { + if (l == null) { + return r; + } else if (r == null) { + return l; + } + + return Stream.concat(l.stream(), r.stream()) + .filter(StringUtils::isNotBlank) + .distinct() + .collect(Collectors.toList()); + } + + //TODO review + private static List mergeKeyValue(List left, List right, int trust) { + if (trust < 0) { + List s = left; + left = right; + right = s; + } + + HashMap values = new HashMap<>(); + left.forEach(kv -> values.put(kv.getKey(), kv)); + right.forEach(kv -> values.putIfAbsent(kv.getKey(), kv)); + + return new ArrayList<>(values.values()); + } + + private static List unionTitle(List left, List right, int trust) { + if (left == null) { + return right; + } else if (right == null) { + return left; + } + + List h = trust >= 0 ? left : right; + List l = trust >= 0 ? right : left; + + return Stream.concat(h.stream(), l.stream()) + .filter(Objects::isNull) + .distinct() + .collect(Collectors.toList()); + } + + /** + * Internal utility that merges the common OafEntity fields + * + * @param merged + * @param enrich + * @param + * @return + */ + private static T mergeOafFields(T merged, T enrich, int trust) { + + //TODO: union of all values, but what does it mean with KeyValue pairs??? + merged.setCollectedfrom(mergeKeyValue(merged.getCollectedfrom(), enrich.getCollectedfrom(), trust)); + merged.setDataInfo(chooseDataInfo(merged.getDataInfo(), enrich.getDataInfo(), trust)); + merged.setLastupdatetimestamp(max(merged.getLastupdatetimestamp(), enrich.getLastupdatetimestamp())); + + return merged; + } + + /** + * Internal utility that merges the common OafEntity fields + * + * @param original + * @param enrich + * @param + * @return + */ + private static T mergeOafEntityFields(T original, T enrich, int trust) { + final T merged = mergeOafFields(original, enrich, trust); + + merged.setOriginalId(unionDistinctListOfString(merged.getOriginalId(), enrich.getOriginalId())); + merged.setPid(unionDistinctLists(merged.getPid(), enrich.getPid(), trust)); + // dateofcollection mettere today quando si fa merge + merged.setDateofcollection(chooseString(merged.getDateofcollection(), enrich.getDateofcollection(), trust)); + // setDateoftransformation mettere vuota in dedup, nota per Claudio + merged.setDateoftransformation(chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust)); + // TODO: was missing in OafEntity.merge + merged.setExtraInfo(unionDistinctLists(merged.getExtraInfo(), enrich.getExtraInfo(), trust)); + //oaiprovenanze da mettere a null quando si genera merge + merged.setOaiprovenance(chooseReference(merged.getOaiprovenance(), enrich.getOaiprovenance(), trust)); + merged.setMeasures(unionDistinctLists(merged.getMeasures(), enrich.getMeasures(), trust)); + + return merged; + } + + + public static T mergeRelation(T original, T enrich) { + int trust = compareTrust(original, enrich); + T merge = mergeOafFields(original, enrich, trust); + + checkArgument(Objects.equals(merge.getSource(), enrich.getSource()), "source ids must be equal"); + checkArgument(Objects.equals(merge.getTarget(), enrich.getTarget()), "target ids must be equal"); + checkArgument(Objects.equals(merge.getRelType(), enrich.getRelType()), "relType(s) must be equal"); + checkArgument( + Objects.equals(merge.getSubRelType(), enrich.getSubRelType()), "subRelType(s) must be equal"); + checkArgument(Objects.equals(merge.getRelClass(), enrich.getRelClass()), "relClass(es) must be equal"); + + //merge.setProvenance(mergeLists(merge.getProvenance(), enrich.getProvenance())); + + //TODO: trust ?? + merge.setValidated(booleanOR(merge.getValidated(), enrich.getValidated())); + try { + merge.setValidationDate(ModelSupport.oldest(merge.getValidationDate(), enrich.getValidationDate())); + } catch (ParseException e) { + throw new IllegalArgumentException(String + .format( + "invalid validation date format in relation [s:%s, t:%s]: %s", merge.getSource(), + merge.getTarget(), + merge.getValidationDate())); + } + + // TODO keyvalue merge + merge.setProperties(mergeKeyValue(merge.getProperties(), enrich.getProperties(), trust)); + + return merge; + } + + public static T mergeResult(T original, T enrich) { + final int trust = compareTrust(original, enrich); + T merge = mergeOafEntityFields(original, enrich, trust); + + if (merge.getProcessingchargeamount() == null || StringUtils.isBlank(merge.getProcessingchargeamount().getValue())) { + merge.setProcessingchargeamount(enrich.getProcessingchargeamount()); + merge.setProcessingchargecurrency(enrich.getProcessingchargecurrency()); + } + + // author = usare la stessa logica che in dedup + merge.setAuthor(chooseReference(merge.getAuthor(), enrich.getAuthor(), trust)); + // il primo che mi arriva secondo l'ordinamento per priorita' + merge.setResulttype(chooseReference(merge.getResulttype(), enrich.getResulttype(), trust)); + // gestito come il resulttype perche' e' un subtype + merge.setMetaResourceType(chooseReference(merge.getMetaResourceType(), enrich.getMetaResourceType(), trust)); + // spostiamo nell'instance e qui prendo il primo che arriva + merge.setLanguage(chooseReference(merge.getLanguage(), enrich.getLanguage(), trust)); + // country lasicamo,o cosi' -> parentesi sul datainfo + merge.setCountry(unionDistinctLists(merge.getCountry(), enrich.getCountry(), trust)); + //ok + merge.setSubject(unionDistinctLists(merge.getSubject(), enrich.getSubject(), trust)); + // union per priority quindi vanno in append + merge.setTitle(unionTitle(merge.getTitle(), enrich.getTitle(), trust)); + //ok + merge.setRelevantdate(unionDistinctLists(merge.getRelevantdate(), enrich.getRelevantdate(), trust)); + // prima trust e poi longest list + merge.setDescription(longestLists(merge.getDescription(), enrich.getDescription())); + // trust piu' alto e poi piu' vecchia + merge.setDateofacceptance(chooseReference(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust)); + // ok, ma publisher va messo ripetibile + merge.setPublisher(chooseReference(merge.getPublisher(), enrich.getPublisher(), trust)); + // ok + merge.setEmbargoenddate(chooseReference(merge.getEmbargoenddate(), enrich.getEmbargoenddate(), trust)); + // ok + merge.setSource(unionDistinctLists(merge.getSource(), enrich.getSource(), trust)); + // ok + merge.setFulltext(unionDistinctLists(merge.getFulltext(), enrich.getFulltext(), trust)); + // ok + merge.setFormat(unionDistinctLists(merge.getFormat(), enrich.getFormat(), trust)); + // ok + merge.setContributor(unionDistinctLists(merge.getContributor(), enrich.getContributor(), trust)); + + // prima prendo l'higher trust, su questo prendo il valore migliore nelle istanze TODO + // trust maggiore ma a parita' di trust il piu' specifico (base del vocabolario) + // vedi note + merge.setResourcetype(firstNonNull(merge.getResourcetype(), enrich.getResourcetype())); + + // ok + merge.setCoverage(unionDistinctLists(merge.getCoverage(), enrich.getCoverage(), trust)); + + // most open ok + if (enrich.getBestaccessright() != null + && new AccessRightComparator<>() + .compare(enrich.getBestaccessright(), merge.getBestaccessright()) < 0) { + merge.setBestaccessright(enrich.getBestaccessright()); + } + + // TODO merge of datainfo given same id + merge.setContext(unionDistinctLists(merge.getContext(), enrich.getContext(), trust)); + + //ok + merge.setExternalReference(unionDistinctLists(merge.getExternalReference(), enrich.getExternalReference(), trust)); + + //instance enrichment or union + // review instance equals => add pid to comparision + if (!isAnEnrichment(merge) && !isAnEnrichment(enrich)) + merge.setInstance(unionDistinctLists(merge.getInstance(), enrich.getInstance(), trust)); + else { + final List enrichmentInstances = isAnEnrichment(merge) ? merge.getInstance() + : enrich.getInstance(); + final List enrichedInstances = isAnEnrichment(merge) ? enrich.getInstance() + : merge.getInstance(); + if (isAnEnrichment(merge)) + merge.setDataInfo(enrich.getDataInfo()); + merge.setInstance(enrichInstances(enrichedInstances, enrichmentInstances)); + } + + merge.setEoscifguidelines(unionDistinctLists(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust)); + merge.setIsGreen(booleanOR(merge.getIsGreen(), enrich.getIsGreen())); + // OK but should be list of values + merge.setOpenAccessColor(chooseReference(merge.getOpenAccessColor(), enrich.getOpenAccessColor(), trust)); + merge.setIsInDiamondJournal(booleanOR(merge.getIsInDiamondJournal(), enrich.getIsInDiamondJournal())); + merge.setPubliclyFunded(booleanOR(merge.getPubliclyFunded(), enrich.getPubliclyFunded())); + + return merge; + } + + private static T mergeORP(T original, T enrich) { + int trust = compareTrust(original, enrich); + final T merge = mergeResult(original, enrich); + + merge.setContactperson(unionDistinctLists(merge.getContactperson(), enrich.getContactperson(), trust)); + merge.setContactgroup(unionDistinctLists(merge.getContactgroup(), enrich.getContactgroup(), trust)); + merge.setTool(unionDistinctLists(merge.getTool(), enrich.getTool(), trust)); + + return merge; + } + + private static T mergeSoftware(T original, T enrich) { + int trust = compareTrust(original, enrich); + final T merge = mergeResult(original, enrich); + + merge.setDocumentationUrl(unionDistinctLists(merge.getDocumentationUrl(), enrich.getDocumentationUrl(), trust)); + merge.setLicense(unionDistinctLists(merge.getLicense(), enrich.getLicense(), trust)); + merge.setCodeRepositoryUrl(chooseReference(merge.getCodeRepositoryUrl(), enrich.getCodeRepositoryUrl(), trust)); + merge.setProgrammingLanguage(chooseReference(merge.getProgrammingLanguage(), enrich.getProgrammingLanguage(), trust)); + + return merge; + } + + private static T mergeDataset(T original, T enrich) { + int trust = compareTrust(original, enrich); + T merge = mergeResult(original, enrich); + + merge.setStoragedate(chooseReference(merge.getStoragedate(), enrich.getStoragedate(), trust)); + merge.setDevice(chooseReference(merge.getDevice(), enrich.getDevice(), trust)); + merge.setSize(chooseReference(merge.getSize(), enrich.getSize(), trust)); + merge.setVersion(chooseReference(merge.getVersion(), enrich.getVersion(), trust)); + merge.setLastmetadataupdate(chooseReference(merge.getLastmetadataupdate(), enrich.getLastmetadataupdate(), trust)); + merge.setMetadataversionnumber(chooseReference(merge.getMetadataversionnumber(), enrich.getMetadataversionnumber(), trust)); + merge.setGeolocation(unionDistinctLists(merge.getGeolocation(), enrich.getGeolocation(), trust)); + + return merge; + } + + public static T mergePublication(T original, T enrich) { + final int trust = compareTrust(original, enrich); + T merged = mergeResult(original, enrich); + + merged.setJournal(chooseReference(merged.getJournal(), enrich.getJournal(), trust)); + + return merged; + } + + private static T mergeOrganization(T left, T enrich) { + int trust = compareTrust(left, enrich); + T merged = mergeOafEntityFields(left, enrich, trust); + + merged.setLegalshortname(chooseReference(merged.getLegalshortname(), enrich.getLegalshortname(), trust)); + merged.setLegalname(chooseReference(merged.getLegalname(), enrich.getLegalname(), trust)); + merged.setAlternativeNames(unionDistinctLists(enrich.getAlternativeNames(), merged.getAlternativeNames(), trust)); + merged.setWebsiteurl(chooseReference(merged.getWebsiteurl(), enrich.getWebsiteurl(), trust)); + merged.setLogourl(chooseReference(merged.getLogourl(), enrich.getLogourl(), trust)); + merged.setEclegalbody(chooseReference(merged.getEclegalbody(), enrich.getEclegalbody(), trust)); + merged.setEclegalperson(chooseReference(merged.getEclegalperson(), enrich.getEclegalperson(), trust)); + merged.setEcnonprofit(chooseReference(merged.getEcnonprofit(), enrich.getEcnonprofit(), trust)); + merged.setEcresearchorganization(chooseReference(merged.getEcresearchorganization(), enrich.getEcresearchorganization(), trust)); + merged.setEchighereducation(chooseReference(merged.getEchighereducation(), enrich.getEchighereducation(), trust)); + merged.setEcinternationalorganizationeurinterests(chooseReference(merged.getEcinternationalorganizationeurinterests(), enrich.getEcinternationalorganizationeurinterests(), trust)); + merged.setEcinternationalorganization(chooseReference(merged.getEcinternationalorganization(), enrich.getEcinternationalorganization(), trust)); + merged.setEcenterprise(chooseReference(merged.getEcenterprise(), enrich.getEcenterprise(), trust)); + merged.setEcsmevalidated(chooseReference(merged.getEcsmevalidated(), enrich.getEcsmevalidated(), trust)); + merged.setEcnutscode(chooseReference(merged.getEcnutscode(), enrich.getEcnutscode(), trust)); + merged.setCountry(chooseReference(merged.getCountry(), enrich.getCountry(), trust)); + + return merged; + } + + public static T mergeProject(T original, T enrich) { + int trust = compareTrust(original, enrich); + T merged = mergeOafEntityFields(original, enrich, trust); + + merged.setWebsiteurl(chooseReference(merged.getWebsiteurl(), enrich.getWebsiteurl(), trust)); + merged.setCode(chooseReference(merged.getCode(), enrich.getCode(), trust)); + merged.setAcronym(chooseReference(merged.getAcronym(), enrich.getAcronym(), trust)); + merged.setTitle(chooseReference(merged.getTitle(), enrich.getTitle(), trust)); + merged.setStartdate(chooseReference(merged.getStartdate(), enrich.getStartdate(), trust)); + merged.setEnddate(chooseReference(merged.getEnddate(), enrich.getEnddate(), trust)); + merged.setCallidentifier(chooseReference(merged.getCallidentifier(), enrich.getCallidentifier(), trust)); + merged.setKeywords(chooseReference(merged.getKeywords(), enrich.getKeywords(), trust)); + merged.setDuration(chooseReference(merged.getDuration(), enrich.getDuration(), trust)); + merged.setEcsc39(chooseReference(merged.getEcsc39(), enrich.getEcsc39(), trust)); + merged.setOamandatepublications(chooseReference(merged.getOamandatepublications(), enrich.getOamandatepublications(), trust)); + merged.setEcarticle29_3(chooseReference(merged.getEcarticle29_3(), enrich.getEcarticle29_3(), trust)); + merged.setSubjects(unionDistinctLists(merged.getSubjects(), enrich.getSubjects(), trust)); + merged.setFundingtree(unionDistinctLists(merged.getFundingtree(), enrich.getFundingtree(), trust)); + merged.setContracttype(chooseReference(merged.getContracttype(), enrich.getContracttype(), trust)); + merged.setOptional1(chooseReference(merged.getOptional1(), enrich.getOptional1(), trust)); + merged.setOptional2(chooseReference(merged.getOptional2(), enrich.getOptional2(), trust)); + merged.setJsonextrainfo(chooseReference(merged.getJsonextrainfo(), enrich.getJsonextrainfo(), trust)); + merged.setContactfullname(chooseReference(merged.getContactfullname(), enrich.getContactfullname(), trust)); + merged.setContactfax(chooseReference(merged.getContactfax(), enrich.getContactfax(), trust)); + merged.setContactphone(chooseReference(merged.getContactphone(), enrich.getContactphone(), trust)); + merged.setContactemail(chooseReference(merged.getContactemail(), enrich.getContactemail(), trust)); + merged.setSummary(chooseReference(merged.getSummary(), enrich.getSummary(), trust)); + merged.setCurrency(chooseReference(merged.getCurrency(), enrich.getCurrency(), trust)); + + //missin in Project.merge + merged.setTotalcost(chooseReference(merged.getTotalcost(), enrich.getTotalcost(), trust)); + merged.setFundedamount(chooseReference(merged.getFundedamount(), enrich.getFundedamount(), trust)); + + // trust ?? + if (enrich.getH2020topiccode() != null && StringUtils.isEmpty(merged.getH2020topiccode())) { + merged.setH2020topiccode(enrich.getH2020topiccode()); + merged.setH2020topicdescription(enrich.getH2020topicdescription()); + } + + merged.setH2020classification(unionDistinctLists(merged.getH2020classification(), enrich.getH2020classification(), trust)); + + return merged; + } + + + /** + * Longest lists list. + * + * @param a the a + * @param b the b + * @return the list + */ + public static List> longestLists(List> a, List> b) { + if (a == null || b == null) + return a == null ? b : a; + + return a.size() >= b.size() ? a : b; + } + + /** + * This main method apply the enrichment of the instances + * + * @param toEnrichInstances the instances that could be enriched + * @param enrichmentInstances the enrichment instances + * @return list of instances possibly enriched + */ + private static List enrichInstances(final List toEnrichInstances, + final List enrichmentInstances) { + final List enrichmentResult = new ArrayList<>(); + + if (toEnrichInstances == null) { + return enrichmentResult; + } + if (enrichmentInstances == null) { + return enrichmentResult; + } + Map ri = toInstanceMap(enrichmentInstances); + + toEnrichInstances.forEach(i -> { + final List e = findEnrichmentsByPID(i.getPid(), ri); + if (e != null && e.size() > 0) { + e.forEach(enr -> applyEnrichment(i, enr)); + } else { + final List a = findEnrichmentsByPID(i.getAlternateIdentifier(), ri); + if (a != null && a.size() > 0) { + a.forEach(enr -> applyEnrichment(i, enr)); + } + } + enrichmentResult.add(i); + }); + return enrichmentResult; + } + + /** + * This method converts the list of instance enrichments + * into a Map where the key is the normalized identifier + * and the value is the instance itself + * + * @param ri the list of enrichment instances + * @return the result map + */ + private static Map toInstanceMap(final List ri) { + return ri + .stream() + .filter(i -> i.getPid() != null || i.getAlternateIdentifier() != null) + .flatMap(i -> { + final List> result = new ArrayList<>(); + if (i.getPid() != null) + i + .getPid() + .stream() + .filter(MergeUtils::validPid) + .forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i))); + if (i.getAlternateIdentifier() != null) + i + .getAlternateIdentifier() + .stream() + .filter(MergeUtils::validPid) + .forEach(p -> result.add(new ImmutablePair<>(extractKeyFromPid(p), i))); + return result.stream(); + }) + .collect( + Collectors + .toMap( + Pair::getLeft, + Pair::getRight, + (a, b) -> a)); + } + + private static boolean isFromDelegatedAuthority(Result r) { + return Optional + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .filter(i -> Objects.nonNull(i.getCollectedfrom())) + .map(i -> i.getCollectedfrom().getKey()) + .anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId))) + .orElse(false); + } + + /** + * Valid pid boolean. + * + * @param p the p + * @return the boolean + */ + private static boolean validPid(final StructuredProperty p) { + return p.getValue() != null && p.getQualifier() != null && p.getQualifier().getClassid() != null; + } + + /** + * Normalize pid string. + * + * @param pid the pid + * @return the string + */ + private static String extractKeyFromPid(final StructuredProperty pid) { + if (pid == null) + return null; + final StructuredProperty normalizedPid = CleaningFunctions.normalizePidValue(pid); + + return String.format("%s::%s", normalizedPid.getQualifier().getClassid(), normalizedPid.getValue()); + } + + /** + * This utility method finds the list of enrichment instances + * that match one or more PIDs in the input list + * + * @param pids the list of PIDs + * @param enrichments the List of enrichment instances having the same pid + * @return the list + */ + private static List findEnrichmentsByPID(final List pids, + final Map enrichments) { + if (pids == null || enrichments == null) + return null; + return pids + .stream() + .map(MergeUtils::extractKeyFromPid) + .map(enrichments::get) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + /** + * Is an enrichment boolean. + * + * @param e the e + * @return the boolean + */ + private static boolean isAnEnrichment(OafEntity e) { + return e.getDataInfo() != null && + e.getDataInfo().getProvenanceaction() != null + && ModelConstants.PROVENANCE_ENRICH.equalsIgnoreCase(e.getDataInfo().getProvenanceaction().getClassid()); + } + + /** + * This method apply enrichment on a single instance + * The enrichment consists of replacing values on + * single attribute only if in the current instance is missing + * The only repeatable field enriched is measures + * + * @param merge the current instance + * @param enrichment the enrichment instance + */ + private static void applyEnrichment(final Instance merge, final Instance enrichment) { + if (merge == null || enrichment == null) + return; + + merge.setLicense(firstNonNull(merge.getLicense(), enrichment.getLicense())); + merge.setAccessright(firstNonNull(merge.getAccessright(), enrichment.getAccessright())); + merge.setInstancetype(firstNonNull(merge.getInstancetype(), enrichment.getInstancetype())); + merge.setInstanceTypeMapping(firstNonNull(merge.getInstanceTypeMapping(), enrichment.getInstanceTypeMapping())); + merge.setHostedby(firstNonNull(merge.getHostedby(), enrichment.getHostedby())); + merge.setUrl(unionDistinctLists(merge.getUrl(), enrichment.getUrl(), 0)); + merge.setDistributionlocation(firstNonNull(merge.getDistributionlocation(), enrichment.getDistributionlocation())); + merge.setCollectedfrom(firstNonNull(merge.getCollectedfrom(), enrichment.getCollectedfrom())); + // pid and alternateId are used for matching + merge.setDateofacceptance(firstNonNull(merge.getDateofacceptance(), enrichment.getDateofacceptance())); + merge.setProcessingchargeamount(firstNonNull(merge.getProcessingchargeamount(), enrichment.getProcessingchargeamount())); + merge.setProcessingchargecurrency(firstNonNull(merge.getProcessingchargecurrency(), enrichment.getProcessingchargecurrency())); + merge.setRefereed(firstNonNull(merge.getRefereed(), enrichment.getRefereed())); + merge.setMeasures(unionDistinctLists(merge.getMeasures(), enrichment.getMeasures(), 0)); + merge.setFulltext(firstNonNull(merge.getFulltext(), enrichment.getFulltext())); + } + + private static int compareTrust(Oaf a, Oaf b) { + String left = Optional + .ofNullable(a.getDataInfo()) + .map(DataInfo::getTrust) + .orElse("0.0"); + + String right = Optional + .ofNullable(b.getDataInfo()) + .map(DataInfo::getTrust) + .orElse("0.0"); + + return left.compareTo(right); + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java index 4cecd0895..417516b0f 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java @@ -14,7 +14,6 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import eu.dnetlib.dhp.schema.common.AccessRightComparator; -import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; public class OafMapperUtils { @@ -22,65 +21,6 @@ public class OafMapperUtils { private OafMapperUtils() { } - public static Oaf merge(final Oaf left, final Oaf right) { - if (ModelSupport.isSubClass(left, OafEntity.class)) { - return mergeEntities((OafEntity) left, (OafEntity) right); - } else if (ModelSupport.isSubClass(left, Relation.class)) { - ((Relation) left).mergeFrom((Relation) right); - } else { - throw new IllegalArgumentException("invalid Oaf type:" + left.getClass().getCanonicalName()); - } - return left; - } - - public static OafEntity mergeEntities(OafEntity left, OafEntity right) { - if (ModelSupport.isSubClass(left, Result.class)) { - return mergeResults((Result) left, (Result) right); - } else if (ModelSupport.isSubClass(left, Datasource.class)) { - left.mergeFrom(right); - } else if (ModelSupport.isSubClass(left, Organization.class)) { - left.mergeFrom(right); - } else if (ModelSupport.isSubClass(left, Project.class)) { - left.mergeFrom(right); - } else { - throw new IllegalArgumentException("invalid OafEntity subtype:" + left.getClass().getCanonicalName()); - } - return left; - } - - public static Result mergeResults(Result left, Result right) { - - final boolean leftFromDelegatedAuthority = isFromDelegatedAuthority(left); - final boolean rightFromDelegatedAuthority = isFromDelegatedAuthority(right); - - if (leftFromDelegatedAuthority && !rightFromDelegatedAuthority) { - return left; - } - if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) { - return right; - } - - if (new ResultTypeComparator().compare(left, right) < 0) { - left.mergeFrom(right); - return left; - } else { - right.mergeFrom(left); - return right; - } - } - - private static boolean isFromDelegatedAuthority(Result r) { - return Optional - .ofNullable(r.getInstance()) - .map( - instance -> instance - .stream() - .filter(i -> Objects.nonNull(i.getCollectedfrom())) - .map(i -> i.getCollectedfrom().getKey()) - .anyMatch(cfId -> IdentifierFactory.delegatedAuthorityDatasourceIds().contains(cfId))) - .orElse(false); - } - public static KeyValue keyValue(final String k, final String v) { final KeyValue kv = new KeyValue(); kv.setKey(k); diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtilsTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtilsTest.java new file mode 100644 index 000000000..ef35b50fb --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtilsTest.java @@ -0,0 +1,111 @@ + +package eu.dnetlib.dhp.schema.oaf.utils; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; + +import eu.dnetlib.dhp.schema.common.ModelSupport; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class MergeUtilsTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @Test + void testMergePubs() throws IOException { + Publication p1 = read("publication_1.json", Publication.class); + Publication p2 = read("publication_2.json", Publication.class); + Dataset d1 = read("dataset_1.json", Dataset.class); + Dataset d2 = read("dataset_2.json", Dataset.class); + + assertEquals(1, p1.getCollectedfrom().size()); + assertEquals(ModelConstants.CROSSREF_ID, p1.getCollectedfrom().get(0).getKey()); + assertEquals(1, d2.getCollectedfrom().size()); + assertFalse(cfId(d2.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID)); + + assertEquals(1, p2.getCollectedfrom().size()); + assertFalse(cfId(p2.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID)); + assertEquals(1, d1.getCollectedfrom().size()); + assertTrue(cfId(d1.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID)); + + final Result p1d2 = MergeUtils.checkedMerge(p1, d2); + assertEquals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID, p1d2.getResulttype().getClassid()); + assertTrue(p1d2 instanceof Publication); + assertEquals(p1.getId(), p1d2.getId()); + } + + @Test + void testMergePubs_1() throws IOException { + Publication p2 = read("publication_2.json", Publication.class); + Dataset d1 = read("dataset_1.json", Dataset.class); + + final Result p2d1 = MergeUtils.checkedMerge(p2, d1); + assertEquals((ModelConstants.DATASET_RESULTTYPE_CLASSID), p2d1.getResulttype().getClassid()); + assertTrue(p2d1 instanceof Dataset); + assertEquals(d1.getId(), p2d1.getId()); + assertEquals(2, p2d1.getCollectedfrom().size()); + } + + @Test + void testMergePubs_2() throws IOException { + Publication p1 = read("publication_1.json", Publication.class); + Publication p2 = read("publication_2.json", Publication.class); + + Result p1p2 = MergeUtils.checkedMerge(p1, p2); + assertTrue(p1p2 instanceof Publication); + assertEquals(p1.getId(), p1p2.getId()); + assertEquals(2, p1p2.getCollectedfrom().size()); + } + + @Test + void testDelegatedAuthority_1() throws IOException { + Dataset d1 = read("dataset_2.json", Dataset.class); + Dataset d2 = read("dataset_delegated.json", Dataset.class); + + assertEquals(1, d2.getCollectedfrom().size()); + assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID)); + + Result res = (Result) MergeUtils.merge(d1, d2, true); + + assertEquals(d2, res); + } + + @Test + void testDelegatedAuthority_2() throws IOException { + Dataset p1 = read("publication_1.json", Dataset.class); + Dataset d2 = read("dataset_delegated.json", Dataset.class); + + assertEquals(1, d2.getCollectedfrom().size()); + assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID)); + + Result res = (Result) MergeUtils.merge(p1, d2, true); + + assertEquals(d2, res); + } + + protected HashSet cfId(List collectedfrom) { + return collectedfrom.stream().map(KeyValue::getKey).collect(Collectors.toCollection(HashSet::new)); + } + + protected T read(String filename, Class clazz) throws IOException { + final String json = IOUtils.toString(getClass().getResourceAsStream(filename)); + return OBJECT_MAPPER.readValue(json, clazz); + } + +} diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java index 9111ac2df..2bbc3bc3e 100644 --- a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java @@ -149,7 +149,7 @@ class OafMapperUtilsTest { void testDate() { final String date = GraphCleaningFunctions.cleanDate("23-FEB-1998"); assertNotNull(date); - System.out.println(date); + assertEquals("1998-02-23", date); } @Test @@ -166,8 +166,8 @@ class OafMapperUtilsTest { assertEquals( ModelConstants.PUBLICATION_RESULTTYPE_CLASSID, - OafMapperUtils - .mergeResults(p1, d2) + MergeUtils + .mergeResult(p1, d2) .getResulttype() .getClassid()); @@ -178,8 +178,8 @@ class OafMapperUtilsTest { assertEquals( ModelConstants.DATASET_RESULTTYPE_CLASSID, - OafMapperUtils - .mergeResults(p2, d1) + MergeUtils + .mergeResult(p2, d1) .getResulttype() .getClassid()); } @@ -192,7 +192,7 @@ class OafMapperUtilsTest { assertEquals(1, d2.getCollectedfrom().size()); assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID)); - Result res = OafMapperUtils.mergeResults(d1, d2); + Result res = MergeUtils.mergeResult(d1, d2); assertEquals(d2, res); diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java index eccfa445c..6de631173 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java @@ -1,14 +1,13 @@ package eu.dnetlib.dhp.actionmanager.promote; -import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass; +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import java.util.function.BiFunction; -import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Relation; +import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass; /** OAF model merging support. */ public class MergeAndGet { @@ -46,20 +45,7 @@ public class MergeAndGet { } private static G mergeFromAndGet(G x, A y) { - if (isSubClass(x, Relation.class) && isSubClass(y, Relation.class)) { - ((Relation) x).mergeFrom((Relation) y); - return x; - } else if (isSubClass(x, OafEntity.class) - && isSubClass(y, OafEntity.class) - && isSubClass(x, y)) { - ((OafEntity) x).mergeFrom((OafEntity) y); - return x; - } - throw new RuntimeException( - String - .format( - "MERGE_FROM_AND_GET incompatible types: %s, %s", - x.getClass().getCanonicalName(), y.getClass().getCanonicalName())); + return (G) MergeUtils.merge(x, y); } @SuppressWarnings("unchecked") diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java index 680e12504..a41540564 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java @@ -34,6 +34,11 @@ public class BipProjectModel { String totalCitationCount; + public String getProjectId() { + return projectId; + } + + // each project bip measure has exactly one value, hence one key-value pair private Measure createMeasure(String measureId, String measureValue) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index a32d04e1f..21f3044a5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -1,12 +1,20 @@ package eu.dnetlib.dhp.actionmanager.project; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.Arrays; -import java.util.Objects; -import java.util.Optional; - +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProgramme; +import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProject; +import eu.dnetlib.dhp.actionmanager.project.utils.model.JsonTopic; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.H2020Classification; +import eu.dnetlib.dhp.schema.oaf.H2020Programme; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; +import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -18,24 +26,14 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProgramme; -import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProject; -import eu.dnetlib.dhp.actionmanager.project.utils.model.EXCELTopic; -import eu.dnetlib.dhp.actionmanager.project.utils.model.JsonTopic; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.H2020Classification; -import eu.dnetlib.dhp.schema.oaf.H2020Programme; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + /** * Class that makes the ActionSet. To prepare the AS two joins are needed * @@ -160,9 +158,11 @@ public class SparkAtomicActionJob { (MapFunction) OafEntity::getId, Encoders.STRING()) .mapGroups((MapGroupsFunction) (s, it) -> { - Project first = it.next(); - it.forEachRemaining(first::mergeFrom); - return first; + Project merge = it.next(); + while (it.hasNext()) { + merge = MergeUtils.mergeProject(merge, it.next()); + } + return merge; }, Encoders.bean(Project.class)) .toJavaRDD() .map(p -> new AtomicAction(Project.class, p)) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index d5b106c81..35fa61b94 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.dedup; import java.util.*; import java.util.stream.Stream; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.function.FlatMapFunction; @@ -14,6 +15,7 @@ import org.apache.spark.sql.*; import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.OafEntity; @@ -23,190 +25,190 @@ import scala.Tuple3; import scala.collection.JavaConversions; public class DedupRecordFactory { - public static final class DedupRecordReduceState { - public final String dedupId; + public static final class DedupRecordReduceState { + public final String dedupId; - public final ArrayList aliases = new ArrayList<>(); + public final ArrayList aliases = new ArrayList<>(); - public final HashSet acceptanceDate = new HashSet<>(); + public final HashSet acceptanceDate = new HashSet<>(); - public OafEntity entity; + public OafEntity entity; - public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { - this.dedupId = dedupId; - this.entity = entity; - if (entity == null) { - aliases.add(id); - } else { - if (Result.class.isAssignableFrom(entity.getClass())) { - Result result = (Result) entity; - if (result.getDateofacceptance() != null - && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { - acceptanceDate.add(result.getDateofacceptance().getValue()); - } - } - } - } + public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { + this.dedupId = dedupId; + this.entity = entity; + if (entity == null) { + aliases.add(id); + } else { + if (Result.class.isAssignableFrom(entity.getClass())) { + Result result = (Result) entity; + if (result.getDateofacceptance() != null + && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { + acceptanceDate.add(result.getDateofacceptance().getValue()); + } + } + } + } - public String getDedupId() { - return dedupId; - } - } + public String getDedupId() { + return dedupId; + } + } - private static final int MAX_ACCEPTANCE_DATE = 20; + private static final int MAX_ACCEPTANCE_DATE = 20; - private DedupRecordFactory() { - } + private DedupRecordFactory() { + } - public static Dataset createDedupRecord( - final SparkSession spark, - final DataInfo dataInfo, - final String mergeRelsInputPath, - final String entitiesInputPath, - final Class clazz) { + public static Dataset createDedupRecord( + final SparkSession spark, + final DataInfo dataInfo, + final String mergeRelsInputPath, + final String entitiesInputPath, + final Class clazz) { - final long ts = System.currentTimeMillis(); - final Encoder beanEncoder = Encoders.bean(clazz); - final Encoder kryoEncoder = Encoders.kryo(clazz); + final long ts = System.currentTimeMillis(); + final Encoder beanEncoder = Encoders.bean(clazz); + final Encoder kryoEncoder = Encoders.kryo(clazz); - // - Dataset entities = spark - .read() - .schema(Encoders.bean(clazz).schema()) - .json(entitiesInputPath) - .as(beanEncoder) - .map( - (MapFunction>) entity -> { - return new Tuple2<>(entity.getId(), entity); - }, - Encoders.tuple(Encoders.STRING(), kryoEncoder)) - .selectExpr("_1 AS id", "_2 AS kryoObject"); + // + Dataset entities = spark + .read() + .schema(Encoders.bean(clazz).schema()) + .json(entitiesInputPath) + .as(beanEncoder) + .map( + (MapFunction>) entity -> { + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), kryoEncoder)) + .selectExpr("_1 AS id", "_2 AS kryoObject"); - // : source is the dedup_id, target is the id of the mergedIn - Dataset mergeRels = spark - .read() - .load(mergeRelsInputPath) - .where("relClass == 'merges'") - .selectExpr("source as dedupId", "target as id"); + // : source is the dedup_id, target is the id of the mergedIn + Dataset mergeRels = spark + .read() + .load(mergeRelsInputPath) + .where("relClass == 'merges'") + .selectExpr("source as dedupId", "target as id"); - return mergeRels - .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") - .select("dedupId", "id", "kryoObject") - .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) - .map( - (MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState( - t._1(), t._2(), t._3()), - Encoders.kryo(DedupRecordReduceState.class)) - .groupByKey( - (MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) - .reduceGroups( - (ReduceFunction) (t1, t2) -> { - if (t1.entity == null) { - t2.aliases.addAll(t1.aliases); - return t2; - } - if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { - t1.acceptanceDate.addAll(t2.acceptanceDate); - } - t1.aliases.addAll(t2.aliases); - t1.entity = reduceEntity(t1.entity, t2.entity); + return mergeRels + .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") + .select("dedupId", "id", "kryoObject") + .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) + .map( + (MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState( + t._1(), t._2(), t._3()), + Encoders.kryo(DedupRecordReduceState.class)) + .groupByKey( + (MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) + .reduceGroups( + (ReduceFunction) (t1, t2) -> { + if (t1.entity == null) { + t2.aliases.addAll(t1.aliases); + return t2; + } + if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { + t1.acceptanceDate.addAll(t2.acceptanceDate); + } + t1.aliases.addAll(t2.aliases); + t1.entity = reduceEntity(t1.entity, t2.entity); - return t1; - }) - .flatMap((FlatMapFunction, OafEntity>) t -> { - String dedupId = t._1(); - DedupRecordReduceState agg = t._2(); + return t1; + }) + .flatMap((FlatMapFunction, OafEntity>) t -> { + String dedupId = t._1(); + DedupRecordReduceState agg = t._2(); - if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { - return Collections.emptyIterator(); - } + if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { + return Collections.emptyIterator(); + } - return Stream - .concat( - Stream - .of(agg.getDedupId()) - .map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)), - agg.aliases - .stream() - .map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts))) - .iterator(); - }, beanEncoder); - } + return Stream + .concat( + Stream + .of(agg.getDedupId()) + .map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)), + agg.aliases + .stream() + .map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts))) + .iterator(); + }, beanEncoder); + } - private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { - try { - OafEntity res = (OafEntity) BeanUtils.cloneBean(base); - res.setId(id); - res.setDataInfo(dataInfo); - res.setLastupdatetimestamp(ts); - return res; - } catch (Exception e) { - throw new RuntimeException(e); - } - } + private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { + try { + OafEntity res = (OafEntity) BeanUtils.cloneBean(base); + res.setId(id); + res.setDataInfo(dataInfo); + res.setLastupdatetimestamp(ts); + return res; + } catch (Exception e) { + throw new RuntimeException(e); + } + } - private static OafEntity createMergedDedupAliasOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { - try { - OafEntity res = createDedupOafEntity(id, base, dataInfo, ts); - DataInfo ds = (DataInfo) BeanUtils.cloneBean(dataInfo); - ds.setDeletedbyinference(true); - res.setDataInfo(ds); - return res; - } catch (Exception e) { - throw new RuntimeException(e); - } - } + private static OafEntity createMergedDedupAliasOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { + try { + OafEntity res = createDedupOafEntity(id, base, dataInfo, ts); + DataInfo ds = (DataInfo) BeanUtils.cloneBean(dataInfo); + ds.setDeletedbyinference(true); + res.setDataInfo(ds); + return res; + } catch (Exception e) { + throw new RuntimeException(e); + } + } - private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { + private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { - if (duplicate == null) { - return entity; - } + if (duplicate == null) { + return entity; + } - int compare = new IdentifierComparator<>() - .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); + int compare = new IdentifierComparator<>() + .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); - if (compare > 0) { - OafEntity swap = duplicate; - duplicate = entity; - entity = swap; - } + if (compare > 0) { + OafEntity swap = duplicate; + duplicate = entity; + entity = swap; + } - entity.mergeFrom(duplicate); + entity = MergeUtils.checkedMerge(entity, duplicate); - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result re = (Result) entity; - Result rd = (Result) duplicate; + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result re = (Result) entity; + Result rd = (Result) duplicate; - List> authors = new ArrayList<>(); - if (re.getAuthor() != null) { - authors.add(re.getAuthor()); - } - if (rd.getAuthor() != null) { - authors.add(rd.getAuthor()); - } + List> authors = new ArrayList<>(); + if (re.getAuthor() != null) { + authors.add(re.getAuthor()); + } + if (rd.getAuthor() != null) { + authors.add(rd.getAuthor()); + } - re.setAuthor(AuthorMerger.merge(authors)); - } + re.setAuthor(AuthorMerger.merge(authors)); + } - return entity; - } + return entity; + } - public static T entityMerger( - String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { - T base = entities.next()._2(); + public static T entityMerger( + String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { + T base = entities.next()._2(); - while (entities.hasNext()) { - T duplicate = entities.next()._2(); - if (duplicate != null) - base = (T) reduceEntity(base, duplicate); - } + while (entities.hasNext()) { + T duplicate = entities.next()._2(); + if (duplicate != null) + base = (T) reduceEntity(base, duplicate); + } - base.setId(id); - base.setDataInfo(dataInfo); - base.setLastupdatetimestamp(ts); + base.setId(id); + base.setDataInfo(dataInfo); + base.setLastupdatetimestamp(ts); - return base; - } + return base; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index cb1c70059..77e696501 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup; import static org.apache.spark.sql.functions.col; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -127,10 +128,8 @@ public class SparkPropagateRelation extends AbstractSparkAction { (MapFunction) r -> String .join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), Encoders.STRING()) - .reduceGroups((ReduceFunction) (b, a) -> { - b.mergeFrom(a); - return b; - }) + .reduceGroups((ReduceFunction) MergeUtils::mergeRelation + ) .map((MapFunction, Relation>) Tuple2::_2, REL_BEAN_ENC); final String outputRelationPath = graphOutputPath + "/relation"; diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 9ffaeeeef..42332b02e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -3,17 +3,18 @@ package eu.dnetlib.doiboost import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.oa.merge.AuthorMerger import eu.dnetlib.dhp.schema.common.ModelConstants +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset => OafDataset} import eu.dnetlib.doiboost.mag.ConversionUtil import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.functions.col import org.apache.spark.sql._ import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.parse import org.slf4j.{Logger, LoggerFactory} + import scala.collection.JavaConverters._ object SparkGenerateDoiBoost { @@ -78,8 +79,10 @@ object SparkGenerateDoiBoost { if (item._2 != null) { val otherPub = item._2._2 if (otherPub != null) { - crossrefPub.mergeFrom(otherPub) - crossrefPub.setAuthor(AuthorMerger.mergeAuthor(crossrefPub.getAuthor, otherPub.getAuthor)) + val mergedAuthor = AuthorMerger.mergeAuthor(crossrefPub.getAuthor, otherPub.getAuthor) + val res = MergeUtils.mergePublication(crossrefPub, otherPub) + res.setAuthor(mergedAuthor); + return res } } crossrefPub @@ -130,14 +133,13 @@ object SparkGenerateDoiBoost { // So we have to merge val b1 = left._2 val b2 = right._2 - b1.mergeFrom(b2) - b1.mergeOAFDataInfo(b2) val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) - b1.setAuthor(authors) + val merged = MergeUtils.mergePublication(b1, b2); + merged.setAuthor(authors) if (b2.getId != null && b2.getId.nonEmpty) - b1.setId(b2.getId) + merged.setId(b2.getId) //Return publication Merged - (b1.getId, b1) + (merged.getId, merged) } } else { // Left is Null so we return right diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/MagDataModel.scala index 9a0b0d845..185381f8f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -1,8 +1,8 @@ package eu.dnetlib.doiboost.mag import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory -import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, StructuredProperty, Subject} +import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, MergeUtils} +import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, Subject} import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import org.json4s @@ -142,8 +142,7 @@ case object ConversionUtil { def mergePublication(a: Publication, b: Publication): Publication = { if ((a != null) && (b != null)) { - a.mergeFrom(b) - a + MergeUtils.mergePublication(a, b) } else { if (a == null) b else a } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index 4f755266a..f84a10d1c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -149,7 +149,7 @@ public class SparkResultToCommunityFromOrganizationJob { } } // res.setContext(propagatedContexts); - // ret.mergeFrom(res); + // return MergeUtils.mergeResult(ret, res); } return ret; }; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java index f9c36d7ca..f791825a7 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java @@ -13,6 +13,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -24,8 +25,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; -import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Context; @@ -162,7 +161,7 @@ public class SparkResultToCommunityFromProject implements Serializable { } } res.setContext(propagatedContexts); - ret.mergeFrom(res); + return MergeUtils.checkedMerge(ret, res); } return ret; }; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index 3cf2f73c3..d2bfd75c1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -25,118 +25,118 @@ import scala.Tuple2; public class SparkResultToCommunityThroughSemRelJob { - private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob.class); + private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob.class); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkResultToCommunityThroughSemRelJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/input_communitytoresult_parameters.json")); + String jsonConfiguration = IOUtils + .toString( + SparkResultToCommunityThroughSemRelJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/input_communitytoresult_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + parser.parseArgument(args); - Boolean isSparkSessionManaged = isSparkSessionManaged(parser); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - final String preparedInfoPath = parser.get("preparedInfoPath"); - log.info("preparedInfoPath: {}", preparedInfoPath); + final String preparedInfoPath = parser.get("preparedInfoPath"); + log.info("preparedInfoPath: {}", preparedInfoPath); - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); - final Boolean saveGraph = Optional - .ofNullable(parser.get("saveGraph")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("saveGraph: {}", saveGraph); + final Boolean saveGraph = Optional + .ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("saveGraph: {}", saveGraph); - @SuppressWarnings("unchecked") - Class resultClazz = (Class) Class.forName(resultClassName); + @SuppressWarnings("unchecked") + Class resultClazz = (Class) Class.forName(resultClassName); - runWithSparkHiveSession( - conf, - isSparkSessionManaged, - spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } - if (saveGraph) { - execPropagation( - spark, inputPath, outputPath, preparedInfoPath, resultClazz); - } - }); - } + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + if (isTest(parser)) { + removeOutputDir(spark, outputPath); + } + if (saveGraph) { + execPropagation( + spark, inputPath, outputPath, preparedInfoPath, resultClazz); + } + }); + } - private static void execPropagation( - SparkSession spark, - String inputPath, - String outputPath, - String preparedInfoPath, - Class resultClazz) { + private static void execPropagation( + SparkSession spark, + String inputPath, + String outputPath, + String preparedInfoPath, + Class resultClazz) { - Dataset possibleUpdates = readPath(spark, preparedInfoPath, ResultCommunityList.class); - Dataset result = readPath(spark, inputPath, resultClazz); + Dataset possibleUpdates = readPath(spark, preparedInfoPath, ResultCommunityList.class); + Dataset result = readPath(spark, inputPath, resultClazz); - result - .joinWith( - possibleUpdates, - result.col("id").equalTo(possibleUpdates.col("resultId")), - "left_outer") - .map(contextUpdaterFn(), Encoders.bean(resultClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + result + .joinWith( + possibleUpdates, + result.col("id").equalTo(possibleUpdates.col("resultId")), + "left_outer") + .map(contextUpdaterFn(), Encoders.bean(resultClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); - } + } - private static MapFunction, R> contextUpdaterFn() { - return value -> { - R ret = value._1(); - Optional rcl = Optional.ofNullable(value._2()); - if (rcl.isPresent()) { - Set contexts = new HashSet<>(); - ret.getContext().forEach(c -> contexts.add(c.getId())); - rcl - .get() - .getCommunityList() - .stream() - .forEach( - c -> { - if (!contexts.contains(c)) { - Context newContext = new Context(); - newContext.setId(c); - newContext - .setDataInfo( - Arrays - .asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS))); - ret.getContext().add(newContext); - } + private static MapFunction, R> contextUpdaterFn() { + return value -> { + R ret = value._1(); + Optional rcl = Optional.ofNullable(value._2()); + if (rcl.isPresent()) { + Set contexts = new HashSet<>(); + ret.getContext().forEach(c -> contexts.add(c.getId())); + rcl + .get() + .getCommunityList() + .stream() + .forEach( + c -> { + if (!contexts.contains(c)) { + Context newContext = new Context(); + newContext.setId(c); + newContext + .setDataInfo( + Arrays + .asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS))); + ret.getContext().add(newContext); + } - }); + }); - } + } - return ret; - }; - } + return ret; + }; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index d4f1d9f55..66a178c70 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -7,6 +7,7 @@ import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -24,7 +25,6 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; /** @@ -248,11 +248,11 @@ public class MergeGraphTableSparkJob { private T mergeAndGet(T b, T a) { if (Objects.nonNull(a) && Objects.nonNull(b)) { if (ModelSupport.isSubClass(a, OafEntity.class) && ModelSupport.isSubClass(b, OafEntity.class)) { - return (T) OafMapperUtils.mergeEntities((OafEntity) b, (OafEntity) a); + return (T) MergeUtils.merge(b, a); } if (a instanceof Relation && b instanceof Relation) { ((Relation) a).mergeFrom(b); - return a; + return (T) MergeUtils.mergeRelation((Relation)a, (Relation) b); } } return Objects.isNull(a) ? b : a; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 79ee5c5b6..9e1f3c78c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -1,14 +1,16 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; - +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; @@ -16,27 +18,18 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + public class GenerateEntitiesApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(GenerateEntitiesApplication.class); @@ -137,7 +130,7 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication { save( inputRdd .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) - .reduceByKey(OafMapperUtils::merge) + .reduceByKey(MergeUtils::merge) .map(Tuple2::_2), targetPath); break; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala index f5a13e72b..0c9f0d7ba 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.graph.resolution import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.common.EntityType +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils import eu.dnetlib.dhp.schema.oaf.{Dataset => OafDataset, _} import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.{FileSystem, Path} @@ -118,15 +119,12 @@ object SparkResolveEntities { currentEntityDataset .joinWith(re, currentEntityDataset("_1").equalTo(re("_1")), "left") .map(k => { - val a = k._1 val b = k._2 if (b == null) a._2 - else { - a._2.mergeFrom(b._2) - a._2 - } + else + MergeUtils.mergeResult(a._2, b._2) }) .map(r => mapper.writeValueAsString(r))(Encoders.STRING) .write diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala index 704c9ab5c..ecf612cc7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.sx.graph import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils import eu.dnetlib.dhp.schema.oaf.{Dataset => OafDataset, _} import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf @@ -130,10 +131,7 @@ object SparkCreateInputGraph { val ds: Dataset[T] = spark.read.load(sourcePath).as[T] ds.groupByKey(_.getId) - .reduceGroups { (x: T, y: T) => - x.mergeFrom(y) - x - } + .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] } .map(_._2) .write .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java index 6d6b2ffbd..bf275814c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java @@ -1,13 +1,12 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.lenient; - -import java.io.IOException; -import java.util.List; - +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; import org.dom4j.DocumentException; import org.junit.jupiter.api.BeforeEach; @@ -16,13 +15,12 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import java.io.IOException; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.lenient; @ExtendWith(MockitoExtension.class) class GenerateEntitiesApplicationTest { @@ -72,7 +70,7 @@ class GenerateEntitiesApplicationTest { protected void verifyMerge(Result publication, Result dataset, Class clazz, String resultType) { - final Result merge = OafMapperUtils.mergeResults(publication, dataset); + final Result merge = MergeUtils.mergeResult(publication, dataset); assertTrue(clazz.isAssignableFrom(merge.getClass())); assertEquals(resultType, merge.getResulttype().getClassid()); }