diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/DatasourceCompatibilityComparator.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/DatasourceCompatibilityComparator.java new file mode 100644 index 000000000..59bdb3914 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/DatasourceCompatibilityComparator.java @@ -0,0 +1,97 @@ + +package eu.dnetlib.dhp.oa.graph.merge; + +import java.util.Comparator; + +import eu.dnetlib.dhp.schema.oaf.Qualifier; + +public class DatasourceCompatibilityComparator implements Comparator { + @Override + public int compare(Qualifier left, Qualifier right) { + + String lClass = left.getClassid(); + String rClass = right.getClassid(); + + if (lClass.equals(rClass)) + return 0; + + if (lClass.equals("openaire-cris_1.1")) + return -1; + if (rClass.equals("openaire-cris_1.1")) + return 1; + + if (lClass.equals("openaire4.0")) + return -1; + if (rClass.equals("openaire4.0")) + return 1; + + if (lClass.equals("driver-openaire2.0")) + return -1; + if (rClass.equals("driver-openaire2.0")) + return 1; + + if (lClass.equals("driver")) + return -1; + if (rClass.equals("driver")) + return 1; + + if (lClass.equals("openaire2.0")) + return -1; + if (rClass.equals("openaire2.0")) + return 1; + + if (lClass.equals("openaire3.0")) + return -1; + if (rClass.equals("openaire3.0")) + return 1; + + if (lClass.equals("openaire2.0_data")) + return -1; + if (rClass.equals("openaire2.0_data")) + return 1; + + if (lClass.equals("native")) + return -1; + if (rClass.equals("native")) + return 1; + + if (lClass.equals("hostedBy")) + return -1; + if (rClass.equals("hostedBy")) + return 1; + + if (lClass.equals("notCompatible")) + return -1; + if (rClass.equals("notCompatible")) + return 1; + + if (lClass.equals("UNKNOWN")) + return -1; + if (rClass.equals("UNKNOWN")) + return 1; + + // Else (but unlikely), lexicographical ordering will do. + return lClass.compareTo(rClass); + } + + /* + * CASE WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY + * ['openaire-cris_1.1']) THEN 'openaire-cris_1.1@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT + * COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire4.0']) THEN + * 'openaire4.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, + * a.compatibility):: TEXT) @> ARRAY ['driver', 'openaire2.0']) THEN + * 'driver-openaire2.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE + * (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['driver']) THEN + * 'driver@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, + * a.compatibility) :: TEXT) @> ARRAY ['openaire2.0']) THEN 'openaire2.0@@@dnet:datasourceCompatibilityLevel' WHEN + * (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire3.0']) THEN + * 'openaire3.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, + * a.compatibility) :: TEXT) @> ARRAY ['openaire2.0_data']) THEN + * 'openaire2.0_data@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE + * (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['native']) THEN + * 'native@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, + * a.compatibility) :: TEXT) @> ARRAY ['hostedBy']) THEN 'hostedBy@@@dnet:datasourceCompatibilityLevel' WHEN + * (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['notCompatible']) + * THEN 'notCompatible@@@dnet:datasourceCompatibilityLevel' ELSE 'UNKNOWN@@@dnet:datasourceCompatibilityLevel' END + */ +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java index b723de955..037683604 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java @@ -3,8 +3,9 @@ package eu.dnetlib.dhp.oa.graph.merge; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.Objects; -import java.util.Optional; +import java.util.*; + +import javax.xml.crypto.Data; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -14,6 +15,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +41,14 @@ public class MergeGraphSparkJob { private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD + private static final Datasource DATASOURCE = new Datasource(); + + static { + Qualifier compatibility = new Qualifier(); + compatibility.setClassid("UNKNOWN"); + DATASOURCE.setOpenairecompatibility(compatibility); + } + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -104,6 +114,10 @@ public class MergeGraphSparkJob { .map((MapFunction, Tuple2>, P>) value -> { Optional

p = Optional.ofNullable(value._1()).map(Tuple2::_2); Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2); + + if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) { + return mergeDatasource(p, b); + } switch (priority) { default: case "BETA": @@ -119,6 +133,36 @@ public class MergeGraphSparkJob { .json(outputPath); } + /** + * Datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom + * behaviour that, given two datasources from beta and prod returns the one from prod with the highest + * compatibility among the two. + * + * @param p datasource from PROD + * @param b datasource from BETA + * @param

Datasource class type from PROD + * @param Datasource class type from BETA + * @return the datasource from PROD with the highest compatibility level. + */ + protected static

P mergeDatasource(Optional

p, Optional b) { + if (p.isPresent() & !b.isPresent()) { + return p.get(); + } + if (b.isPresent() & !p.isPresent()) { + return (P) b.get(); + } + if (!b.isPresent() & !p.isPresent()) { + return null; // unlikely, at least one should be produced by the join operation + } + + Datasource dp = (Datasource) p.get(); + Datasource db = (Datasource) b.get(); + + List list = Arrays.asList(dp.getOpenairecompatibility(), db.getOpenairecompatibility()); + dp.setOpenairecompatibility(Collections.min(list, new DatasourceCompatibilityComparator())); + return (P) dp; + } + private static

P mergeWithPriorityToPROD(Optional

p, Optional b) { if (b.isPresent() & !p.isPresent()) { return (P) b.get(); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java new file mode 100644 index 000000000..28e8e5abc --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java @@ -0,0 +1,84 @@ + +package eu.dnetlib.dhp.oa.graph.merge; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.oaf.Datasource; + +public class MergeGraphSparkJobTest { + + private ObjectMapper mapper; + + @BeforeEach + public void setUp() { + mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Test + public void testMergeDatasources() throws IOException { + assertEquals( + "openaire-cris_1.1", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_cris.json"), + d("datasource_UNKNOWN.json")) + .getOpenairecompatibility() + .getClassid()); + assertEquals( + "openaire-cris_1.1", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_UNKNOWN.json"), + d("datasource_cris.json")) + .getOpenairecompatibility() + .getClassid()); + assertEquals( + "driver-openaire2.0", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_native.json"), + d("datasource_driver-openaire2.0.json")) + .getOpenairecompatibility() + .getClassid()); + assertEquals( + "driver-openaire2.0", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_driver-openaire2.0.json"), + d("datasource_native.json")) + .getOpenairecompatibility() + .getClassid()); + assertEquals( + "openaire4.0", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_notCompatible.json"), + d("datasource_openaire4.0.json")) + .getOpenairecompatibility() + .getClassid()); + assertEquals( + "notCompatible", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_notCompatible.json"), + d("datasource_UNKNOWN.json")) + .getOpenairecompatibility() + .getClassid()); + } + + private Optional d(String file) throws IOException { + String json = IOUtils.toString(getClass().getResourceAsStream(file)); + return Optional.of(mapper.readValue(json, Datasource.class)); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_UNKNOWN.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_UNKNOWN.json new file mode 100644 index 000000000..a01085c8f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_UNKNOWN.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "UNKNOWN" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json new file mode 100644 index 000000000..6f2b7aa7d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_driver-openaire2.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_driver-openaire2.0.json new file mode 100644 index 000000000..d2e375f55 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_driver-openaire2.0.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "driver-openaire2.0" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_hostedby.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_hostedby.json new file mode 100644 index 000000000..03db887f5 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_hostedby.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "hostedBy" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_native.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_native.json new file mode 100644 index 000000000..7831a3fc3 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_native.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "native" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_notCompatible.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_notCompatible.json new file mode 100644 index 000000000..8dabe5d2c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_notCompatible.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "notCompatible" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json new file mode 100644 index 000000000..e2db47943 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0_data.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0_data.json new file mode 100644 index 000000000..b8480faf0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0_data.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0_data" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire3.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire3.0.json new file mode 100644 index 000000000..43bb0a7a4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire3.0.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire3.0" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire4.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire4.0.json new file mode 100644 index 000000000..7cdba6a4e --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire4.0.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire4.0" }} \ No newline at end of file