forked from antonis.lempesis/dnet-hadoop
Merge pull request 'Merging different compatibility levels (pinocchio operator)' (#47) from merge_graph into master
This commit is contained in:
commit
8775a64bc1
|
@ -0,0 +1,97 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.merge;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
|
||||
public class DatasourceCompatibilityComparator implements Comparator<Qualifier> {
|
||||
@Override
|
||||
public int compare(Qualifier left, Qualifier right) {
|
||||
|
||||
String lClass = left.getClassid();
|
||||
String rClass = right.getClassid();
|
||||
|
||||
if (lClass.equals(rClass))
|
||||
return 0;
|
||||
|
||||
if (lClass.equals("openaire-cris_1.1"))
|
||||
return -1;
|
||||
if (rClass.equals("openaire-cris_1.1"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("openaire4.0"))
|
||||
return -1;
|
||||
if (rClass.equals("openaire4.0"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("driver-openaire2.0"))
|
||||
return -1;
|
||||
if (rClass.equals("driver-openaire2.0"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("driver"))
|
||||
return -1;
|
||||
if (rClass.equals("driver"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("openaire2.0"))
|
||||
return -1;
|
||||
if (rClass.equals("openaire2.0"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("openaire3.0"))
|
||||
return -1;
|
||||
if (rClass.equals("openaire3.0"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("openaire2.0_data"))
|
||||
return -1;
|
||||
if (rClass.equals("openaire2.0_data"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("native"))
|
||||
return -1;
|
||||
if (rClass.equals("native"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("hostedBy"))
|
||||
return -1;
|
||||
if (rClass.equals("hostedBy"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("notCompatible"))
|
||||
return -1;
|
||||
if (rClass.equals("notCompatible"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("UNKNOWN"))
|
||||
return -1;
|
||||
if (rClass.equals("UNKNOWN"))
|
||||
return 1;
|
||||
|
||||
// Else (but unlikely), lexicographical ordering will do.
|
||||
return lClass.compareTo(rClass);
|
||||
}
|
||||
|
||||
/*
|
||||
* CASE WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY
|
||||
* ['openaire-cris_1.1']) THEN 'openaire-cris_1.1@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT
|
||||
* COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire4.0']) THEN
|
||||
* 'openaire4.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
|
||||
* a.compatibility):: TEXT) @> ARRAY ['driver', 'openaire2.0']) THEN
|
||||
* 'driver-openaire2.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE
|
||||
* (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['driver']) THEN
|
||||
* 'driver@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
|
||||
* a.compatibility) :: TEXT) @> ARRAY ['openaire2.0']) THEN 'openaire2.0@@@dnet:datasourceCompatibilityLevel' WHEN
|
||||
* (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire3.0']) THEN
|
||||
* 'openaire3.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
|
||||
* a.compatibility) :: TEXT) @> ARRAY ['openaire2.0_data']) THEN
|
||||
* 'openaire2.0_data@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE
|
||||
* (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['native']) THEN
|
||||
* 'native@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
|
||||
* a.compatibility) :: TEXT) @> ARRAY ['hostedBy']) THEN 'hostedBy@@@dnet:datasourceCompatibilityLevel' WHEN
|
||||
* (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['notCompatible'])
|
||||
* THEN 'notCompatible@@@dnet:datasourceCompatibilityLevel' ELSE 'UNKNOWN@@@dnet:datasourceCompatibilityLevel' END
|
||||
*/
|
||||
}
|
|
@ -3,8 +3,9 @@ package eu.dnetlib.dhp.oa.graph.merge;
|
|||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
|
||||
import javax.xml.crypto.Data;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -14,6 +15,7 @@ import org.apache.spark.sql.Dataset;
|
|||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -39,6 +41,14 @@ public class MergeGraphSparkJob {
|
|||
|
||||
private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
|
||||
|
||||
private static final Datasource DATASOURCE = new Datasource();
|
||||
|
||||
static {
|
||||
Qualifier compatibility = new Qualifier();
|
||||
compatibility.setClassid("UNKNOWN");
|
||||
DATASOURCE.setOpenairecompatibility(compatibility);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
|
@ -104,6 +114,10 @@ public class MergeGraphSparkJob {
|
|||
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
|
||||
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
|
||||
Optional<B> b = Optional.ofNullable(value._2()).map(Tuple2::_2);
|
||||
|
||||
if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) {
|
||||
return mergeDatasource(p, b);
|
||||
}
|
||||
switch (priority) {
|
||||
default:
|
||||
case "BETA":
|
||||
|
@ -119,6 +133,36 @@ public class MergeGraphSparkJob {
|
|||
.json(outputPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom
|
||||
* behaviour that, given two datasources from beta and prod returns the one from prod with the highest
|
||||
* compatibility among the two.
|
||||
*
|
||||
* @param p datasource from PROD
|
||||
* @param b datasource from BETA
|
||||
* @param <P> Datasource class type from PROD
|
||||
* @param <B> Datasource class type from BETA
|
||||
* @return the datasource from PROD with the highest compatibility level.
|
||||
*/
|
||||
protected static <P extends Oaf, B extends Oaf> P mergeDatasource(Optional<P> p, Optional<B> b) {
|
||||
if (p.isPresent() & !b.isPresent()) {
|
||||
return p.get();
|
||||
}
|
||||
if (b.isPresent() & !p.isPresent()) {
|
||||
return (P) b.get();
|
||||
}
|
||||
if (!b.isPresent() & !p.isPresent()) {
|
||||
return null; // unlikely, at least one should be produced by the join operation
|
||||
}
|
||||
|
||||
Datasource dp = (Datasource) p.get();
|
||||
Datasource db = (Datasource) b.get();
|
||||
|
||||
List<Qualifier> list = Arrays.asList(dp.getOpenairecompatibility(), db.getOpenairecompatibility());
|
||||
dp.setOpenairecompatibility(Collections.min(list, new DatasourceCompatibilityComparator()));
|
||||
return (P) dp;
|
||||
}
|
||||
|
||||
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
|
||||
if (b.isPresent() & !p.isPresent()) {
|
||||
return (P) b.get();
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.merge;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
|
||||
public class MergeGraphSparkJobTest {
|
||||
|
||||
private ObjectMapper mapper;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeDatasources() throws IOException {
|
||||
assertEquals(
|
||||
"openaire-cris_1.1",
|
||||
MergeGraphSparkJob
|
||||
.mergeDatasource(
|
||||
d("datasource_cris.json"),
|
||||
d("datasource_UNKNOWN.json"))
|
||||
.getOpenairecompatibility()
|
||||
.getClassid());
|
||||
assertEquals(
|
||||
"openaire-cris_1.1",
|
||||
MergeGraphSparkJob
|
||||
.mergeDatasource(
|
||||
d("datasource_UNKNOWN.json"),
|
||||
d("datasource_cris.json"))
|
||||
.getOpenairecompatibility()
|
||||
.getClassid());
|
||||
assertEquals(
|
||||
"driver-openaire2.0",
|
||||
MergeGraphSparkJob
|
||||
.mergeDatasource(
|
||||
d("datasource_native.json"),
|
||||
d("datasource_driver-openaire2.0.json"))
|
||||
.getOpenairecompatibility()
|
||||
.getClassid());
|
||||
assertEquals(
|
||||
"driver-openaire2.0",
|
||||
MergeGraphSparkJob
|
||||
.mergeDatasource(
|
||||
d("datasource_driver-openaire2.0.json"),
|
||||
d("datasource_native.json"))
|
||||
.getOpenairecompatibility()
|
||||
.getClassid());
|
||||
assertEquals(
|
||||
"openaire4.0",
|
||||
MergeGraphSparkJob
|
||||
.mergeDatasource(
|
||||
d("datasource_notCompatible.json"),
|
||||
d("datasource_openaire4.0.json"))
|
||||
.getOpenairecompatibility()
|
||||
.getClassid());
|
||||
assertEquals(
|
||||
"notCompatible",
|
||||
MergeGraphSparkJob
|
||||
.mergeDatasource(
|
||||
d("datasource_notCompatible.json"),
|
||||
d("datasource_UNKNOWN.json"))
|
||||
.getOpenairecompatibility()
|
||||
.getClassid());
|
||||
}
|
||||
|
||||
private Optional<Datasource> d(String file) throws IOException {
|
||||
String json = IOUtils.toString(getClass().getResourceAsStream(file));
|
||||
return Optional.of(mapper.readValue(json, Datasource.class));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "UNKNOWN" }}
|
|
@ -0,0 +1 @@
|
|||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" }}
|
|
@ -0,0 +1 @@
|
|||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "driver-openaire2.0" }}
|
|
@ -0,0 +1 @@
|
|||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "hostedBy" }}
|
|
@ -0,0 +1 @@
|
|||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "native" }}
|
|
@ -0,0 +1 @@
|
|||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "notCompatible" }}
|
|
@ -0,0 +1 @@
|
|||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" }}
|
|
@ -0,0 +1 @@
|
|||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0_data" }}
|
|
@ -0,0 +1 @@
|
|||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire3.0" }}
|
|
@ -0,0 +1 @@
|
|||
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire4.0" }}
|
Loading…
Reference in New Issue