diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index 474944260b..5f2cd68082 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -136,7 +137,7 @@ public class MergeGraphTableSparkJob { /** * Datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom * behaviour that, given two datasources from beta and prod returns the one from prod with the highest - * compatibility among the two. + * compatibility among the two. Furthermore, the procedure merges the collectedfrom, originalId, and pid lists. * * @param p datasource from PROD * @param b datasource from BETA @@ -160,9 +161,37 @@ public class MergeGraphTableSparkJob { List list = Arrays.asList(dp.getOpenairecompatibility(), db.getOpenairecompatibility()); dp.setOpenairecompatibility(Collections.min(list, new DatasourceCompatibilityComparator())); + dp + .setCollectedfrom( + Stream + .concat( + Optional + .ofNullable(dp.getCollectedfrom()) + .map(Collection::stream) + .orElse(Stream.empty()), + Optional + .ofNullable(db.getCollectedfrom()) + .map(Collection::stream) + .orElse(Stream.empty())) + .distinct() // relies on KeyValue.equals + .collect(Collectors.toList())); + + dp.setOriginalId(mergeLists(dp.getOriginalId(), db.getOriginalId())); + dp.setPid(mergeLists(dp.getPid(), db.getPid())); + return (P) dp; } + private static final List mergeLists(final List... lists) { + return Arrays + .stream(lists) + .filter(Objects::nonNull) + .flatMap(List::stream) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + } + private static

P mergeWithPriorityToPROD(Optional

p, Optional b) { if (b.isPresent() & !p.isPresent()) { return (P) b.get(); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java index 2d28ee3054..7dcb4bca43 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.graph.merge; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.io.IOException; import java.util.Optional; @@ -25,7 +26,23 @@ class MergeGraphTableSparkJobTest { } @Test - void testMergeDatasources() throws IOException { + void testMerge() throws IOException { + Datasource d = MergeGraphTableSparkJob + .mergeDatasource( + d("datasource_cris.json"), + d("datasource_openaire2.0.json")); + + assertEquals("10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", d.getId()); + assertNotNull(d.getOriginalId()); + assertEquals(2, d.getOriginalId().size()); + assertNotNull(d.getCollectedfrom()); + assertEquals(2, d.getCollectedfrom().size()); + assertNotNull(d.getPid()); + assertEquals(1, d.getPid().size()); + } + + @Test + void testMergeCompatibility() throws IOException { assertEquals( "openaire-cris_1.1", MergeGraphTableSparkJob diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json index 6f2b7aa7d1..64627dd2bb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json @@ -1 +1,5 @@ -{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" }} \ No newline at end of file +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" }, + "originalId": ["eurocrisdris::1234"], + "collectedfrom": [{"key": "eurocrisdris::2b29d08e383ff4cd8a2b6b226ce37e38", "value": "Directory of Research Information System (DRIS)"}], + "pid": [{"value": "10.1010.xyx", "qualifier": {"classid": "doi"}}] +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json index e2db479432..cb2916a27c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json @@ -1 +1,4 @@ -{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" }} \ No newline at end of file +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" }, + "originalId": ["opendoar____::1234"], + "collectedfrom": [{"key": "openaire____::47ce9e9f4fad46e732cff06419ecaabb", "value": "OpenDOAR"}] +} \ No newline at end of file