forked from D-Net/dnet-hadoop
Merge remote-tracking branch 'origin/master' into merge_record_to_common
This commit is contained in:
commit
fed711da80
|
@ -0,0 +1,97 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.merge;
|
||||||
|
|
||||||
|
import java.util.Comparator;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
|
|
||||||
|
public class DatasourceCompatibilityComparator implements Comparator<Qualifier> {
|
||||||
|
@Override
|
||||||
|
public int compare(Qualifier left, Qualifier right) {
|
||||||
|
|
||||||
|
String lClass = left.getClassid();
|
||||||
|
String rClass = right.getClassid();
|
||||||
|
|
||||||
|
if (lClass.equals(rClass))
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
if (lClass.equals("openaire-cris_1.1"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("openaire-cris_1.1"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
if (lClass.equals("openaire4.0"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("openaire4.0"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
if (lClass.equals("driver-openaire2.0"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("driver-openaire2.0"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
if (lClass.equals("driver"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("driver"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
if (lClass.equals("openaire2.0"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("openaire2.0"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
if (lClass.equals("openaire3.0"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("openaire3.0"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
if (lClass.equals("openaire2.0_data"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("openaire2.0_data"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
if (lClass.equals("native"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("native"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
if (lClass.equals("hostedBy"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("hostedBy"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
if (lClass.equals("notCompatible"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("notCompatible"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
if (lClass.equals("UNKNOWN"))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals("UNKNOWN"))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
// Else (but unlikely), lexicographical ordering will do.
|
||||||
|
return lClass.compareTo(rClass);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CASE WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY
|
||||||
|
* ['openaire-cris_1.1']) THEN 'openaire-cris_1.1@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT
|
||||||
|
* COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire4.0']) THEN
|
||||||
|
* 'openaire4.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
|
||||||
|
* a.compatibility):: TEXT) @> ARRAY ['driver', 'openaire2.0']) THEN
|
||||||
|
* 'driver-openaire2.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE
|
||||||
|
* (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['driver']) THEN
|
||||||
|
* 'driver@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
|
||||||
|
* a.compatibility) :: TEXT) @> ARRAY ['openaire2.0']) THEN 'openaire2.0@@@dnet:datasourceCompatibilityLevel' WHEN
|
||||||
|
* (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire3.0']) THEN
|
||||||
|
* 'openaire3.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
|
||||||
|
* a.compatibility) :: TEXT) @> ARRAY ['openaire2.0_data']) THEN
|
||||||
|
* 'openaire2.0_data@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE
|
||||||
|
* (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['native']) THEN
|
||||||
|
* 'native@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
|
||||||
|
* a.compatibility) :: TEXT) @> ARRAY ['hostedBy']) THEN 'hostedBy@@@dnet:datasourceCompatibilityLevel' WHEN
|
||||||
|
* (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['notCompatible'])
|
||||||
|
* THEN 'notCompatible@@@dnet:datasourceCompatibilityLevel' ELSE 'UNKNOWN@@@dnet:datasourceCompatibilityLevel' END
|
||||||
|
*/
|
||||||
|
}
|
|
@ -3,8 +3,9 @@ package eu.dnetlib.dhp.oa.graph.merge;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.*;
|
||||||
import java.util.Optional;
|
|
||||||
|
import javax.xml.crypto.Data;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
@ -14,6 +15,7 @@ import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -39,6 +41,14 @@ public class MergeGraphSparkJob {
|
||||||
|
|
||||||
private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
|
private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
|
||||||
|
|
||||||
|
private static final Datasource DATASOURCE = new Datasource();
|
||||||
|
|
||||||
|
static {
|
||||||
|
Qualifier compatibility = new Qualifier();
|
||||||
|
compatibility.setClassid("UNKNOWN");
|
||||||
|
DATASOURCE.setOpenairecompatibility(compatibility);
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
|
@ -104,6 +114,10 @@ public class MergeGraphSparkJob {
|
||||||
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
|
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
|
||||||
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
|
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
|
||||||
Optional<B> b = Optional.ofNullable(value._2()).map(Tuple2::_2);
|
Optional<B> b = Optional.ofNullable(value._2()).map(Tuple2::_2);
|
||||||
|
|
||||||
|
if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) {
|
||||||
|
return mergeDatasource(p, b);
|
||||||
|
}
|
||||||
switch (priority) {
|
switch (priority) {
|
||||||
default:
|
default:
|
||||||
case "BETA":
|
case "BETA":
|
||||||
|
@ -119,6 +133,36 @@ public class MergeGraphSparkJob {
|
||||||
.json(outputPath);
|
.json(outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom
|
||||||
|
* behaviour that, given two datasources from beta and prod returns the one from prod with the highest
|
||||||
|
* compatibility among the two.
|
||||||
|
*
|
||||||
|
* @param p datasource from PROD
|
||||||
|
* @param b datasource from BETA
|
||||||
|
* @param <P> Datasource class type from PROD
|
||||||
|
* @param <B> Datasource class type from BETA
|
||||||
|
* @return the datasource from PROD with the highest compatibility level.
|
||||||
|
*/
|
||||||
|
protected static <P extends Oaf, B extends Oaf> P mergeDatasource(Optional<P> p, Optional<B> b) {
|
||||||
|
if (p.isPresent() & !b.isPresent()) {
|
||||||
|
return p.get();
|
||||||
|
}
|
||||||
|
if (b.isPresent() & !p.isPresent()) {
|
||||||
|
return (P) b.get();
|
||||||
|
}
|
||||||
|
if (!b.isPresent() & !p.isPresent()) {
|
||||||
|
return null; // unlikely, at least one should be produced by the join operation
|
||||||
|
}
|
||||||
|
|
||||||
|
Datasource dp = (Datasource) p.get();
|
||||||
|
Datasource db = (Datasource) b.get();
|
||||||
|
|
||||||
|
List<Qualifier> list = Arrays.asList(dp.getOpenairecompatibility(), db.getOpenairecompatibility());
|
||||||
|
dp.setOpenairecompatibility(Collections.min(list, new DatasourceCompatibilityComparator()));
|
||||||
|
return (P) dp;
|
||||||
|
}
|
||||||
|
|
||||||
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
|
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
|
||||||
if (b.isPresent() & !p.isPresent()) {
|
if (b.isPresent() & !p.isPresent()) {
|
||||||
return (P) b.get();
|
return (P) b.get();
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
DROP VIEW IF EXISTS ${hiveDbName}.result;
|
DROP VIEW IF EXISTS ${hiveDbName}.result;
|
||||||
|
|
||||||
CREATE VIEW IF NOT EXISTS ${hiveDbName}.result as
|
CREATE VIEW IF NOT EXISTS ${hiveDbName}.result as
|
||||||
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.publication p
|
select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.publication p
|
||||||
union all
|
union all
|
||||||
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.dataset d
|
select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.dataset d
|
||||||
union all
|
union all
|
||||||
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.software s
|
select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.software s
|
||||||
union all
|
union all
|
||||||
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.otherresearchproduct o;
|
select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.otherresearchproduct o;
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.merge;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
|
|
||||||
|
public class MergeGraphSparkJobTest {
|
||||||
|
|
||||||
|
private ObjectMapper mapper;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() {
|
||||||
|
mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMergeDatasources() throws IOException {
|
||||||
|
assertEquals(
|
||||||
|
"openaire-cris_1.1",
|
||||||
|
MergeGraphSparkJob
|
||||||
|
.mergeDatasource(
|
||||||
|
d("datasource_cris.json"),
|
||||||
|
d("datasource_UNKNOWN.json"))
|
||||||
|
.getOpenairecompatibility()
|
||||||
|
.getClassid());
|
||||||
|
assertEquals(
|
||||||
|
"openaire-cris_1.1",
|
||||||
|
MergeGraphSparkJob
|
||||||
|
.mergeDatasource(
|
||||||
|
d("datasource_UNKNOWN.json"),
|
||||||
|
d("datasource_cris.json"))
|
||||||
|
.getOpenairecompatibility()
|
||||||
|
.getClassid());
|
||||||
|
assertEquals(
|
||||||
|
"driver-openaire2.0",
|
||||||
|
MergeGraphSparkJob
|
||||||
|
.mergeDatasource(
|
||||||
|
d("datasource_native.json"),
|
||||||
|
d("datasource_driver-openaire2.0.json"))
|
||||||
|
.getOpenairecompatibility()
|
||||||
|
.getClassid());
|
||||||
|
assertEquals(
|
||||||
|
"driver-openaire2.0",
|
||||||
|
MergeGraphSparkJob
|
||||||
|
.mergeDatasource(
|
||||||
|
d("datasource_driver-openaire2.0.json"),
|
||||||
|
d("datasource_native.json"))
|
||||||
|
.getOpenairecompatibility()
|
||||||
|
.getClassid());
|
||||||
|
assertEquals(
|
||||||
|
"openaire4.0",
|
||||||
|
MergeGraphSparkJob
|
||||||
|
.mergeDatasource(
|
||||||
|
d("datasource_notCompatible.json"),
|
||||||
|
d("datasource_openaire4.0.json"))
|
||||||
|
.getOpenairecompatibility()
|
||||||
|
.getClassid());
|
||||||
|
assertEquals(
|
||||||
|
"notCompatible",
|
||||||
|
MergeGraphSparkJob
|
||||||
|
.mergeDatasource(
|
||||||
|
d("datasource_notCompatible.json"),
|
||||||
|
d("datasource_UNKNOWN.json"))
|
||||||
|
.getOpenairecompatibility()
|
||||||
|
.getClassid());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Optional<Datasource> d(String file) throws IOException {
|
||||||
|
String json = IOUtils.toString(getClass().getResourceAsStream(file));
|
||||||
|
return Optional.of(mapper.readValue(json, Datasource.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "UNKNOWN" }}
|
|
@ -0,0 +1 @@
|
||||||
|
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" }}
|
|
@ -0,0 +1 @@
|
||||||
|
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "driver-openaire2.0" }}
|
|
@ -0,0 +1 @@
|
||||||
|
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "hostedBy" }}
|
|
@ -0,0 +1 @@
|
||||||
|
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "native" }}
|
|
@ -0,0 +1 @@
|
||||||
|
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "notCompatible" }}
|
|
@ -0,0 +1 @@
|
||||||
|
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" }}
|
|
@ -0,0 +1 @@
|
||||||
|
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0_data" }}
|
|
@ -0,0 +1 @@
|
||||||
|
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire3.0" }}
|
|
@ -0,0 +1 @@
|
||||||
|
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire4.0" }}
|
Loading…
Reference in New Issue