diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java index 9b99097ce..d5c310c1b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java @@ -3,8 +3,12 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -98,14 +102,9 @@ public class MergeClaimsApplication { raw .joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer") .map( - (MapFunction, Tuple2>, T>) value -> { - Optional> opRaw = Optional.ofNullable(value._1()); - Optional> opClaim = Optional.ofNullable(value._2()); - - return opRaw.isPresent() - ? opRaw.get()._2() - : opClaim.isPresent() ? opClaim.get()._2() : null; - }, + (MapFunction, Tuple2>, T>) value -> processClaims( + Optional.ofNullable(value._1()), + Optional.ofNullable(value._2())), Encoders.bean(clazz)) .filter(Objects::nonNull) .map( @@ -117,6 +116,78 @@ public class MergeClaimsApplication { .text(outPath); } + private static T processClaims(Optional> opRaw, + Optional> opClaim) { + + // when both are present + if (opClaim.isPresent() && opRaw.isPresent()) { + T oafClaim = opClaim.get()._2(); + if (oafClaim instanceof Result) { + T oafRaw = opRaw.get()._2(); + + // merge the context lists from both oaf objects ... + final List context = mergeContexts((Result) oafClaim, (Result) oafRaw); + + // ... and set it on the result from the aggregator + ((Result) oafRaw).setContext(context); + return oafRaw; + } + } + + // otherwise prefer the result from the aggregator + return opRaw.isPresent() + ? opRaw.get()._2() + : opClaim.map(Tuple2::_2).orElse(null); + } + + private static List mergeContexts(Result oafClaim, Result oafRaw) { + return new ArrayList<>( + Stream + .concat( + Optional + .ofNullable(oafClaim.getContext()) + .map(List::stream) + .orElse(Stream.empty()), + Optional + .ofNullable(oafRaw.getContext()) + .map(List::stream) + .orElse(Stream.empty())) + .collect( + Collectors + .toMap( + Context::getId, + c -> c, + (c1, c2) -> { + Context c = new Context(); + c.setId(c1.getId()); + c + .setDataInfo( + new ArrayList<>( + Stream + .concat( + Optional + .ofNullable(c1.getDataInfo()) + .map(List::stream) + .orElse(Stream.empty()), + Optional + .ofNullable(c2.getDataInfo()) + .map(List::stream) + .orElse(Stream.empty())) + .collect( + Collectors + .toMap( + d -> Optional + .ofNullable(d.getProvenanceaction()) + .map(Qualifier::getClassid) + .orElse(""), + d -> d, + (d1, d2) -> d1)) + .values())); + return c; + })) + .values()); + } + private static Dataset readFromPath( SparkSession spark, String path, Class clazz) { return spark