>, P>) value -> {
Optional p = Optional.ofNullable(value._1()).map(Tuple2::_2);
Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2);
+
+ if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) {
+ return mergeDatasource(p, b);
+ }
switch (priority) {
default:
case "BETA":
@@ -119,6 +133,36 @@ public class MergeGraphSparkJob {
.json(outputPath);
}
+ /**
+ * Datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom
+ * behaviour that, given two datasources from beta and prod returns the one from prod with the highest
+ * compatibility among the two.
+ *
+ * @param p datasource from PROD
+ * @param b datasource from BETA
+ * @param
Datasource class type from PROD
+ * @param Datasource class type from BETA
+ * @return the datasource from PROD with the highest compatibility level.
+ */
+ protected static
P mergeDatasource(Optional
p, Optional b) {
+ if (p.isPresent() & !b.isPresent()) {
+ return p.get();
+ }
+ if (b.isPresent() & !p.isPresent()) {
+ return (P) b.get();
+ }
+ if (!b.isPresent() & !p.isPresent()) {
+ return null; // unlikely, at least one should be produced by the join operation
+ }
+
+ Datasource dp = (Datasource) p.get();
+ Datasource db = (Datasource) b.get();
+
+ List list = Arrays.asList(dp.getOpenairecompatibility(), db.getOpenairecompatibility());
+ dp.setOpenairecompatibility(Collections.min(list, new DatasourceCompatibilityComparator()));
+ return (P) dp;
+ }
+
private static P mergeWithPriorityToPROD(Optional
p, Optional b) {
if (b.isPresent() & !p.isPresent()) {
return (P) b.get();
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala
index 90d665e0c1..ee2dbadfda 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala
@@ -1,5 +1,5 @@
package eu.dnetlib.dhp.sx.ebi
-import eu.dnetlib.dhp.oa.dedup.AuthorMerger
+import eu.dnetlib.dhp.oa.merge.AuthorMerger
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
import org.apache.spark.sql.{Encoder, Encoders}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java
new file mode 100644
index 0000000000..28e8e5abce
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java
@@ -0,0 +1,84 @@
+
+package eu.dnetlib.dhp.oa.graph.merge;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.schema.oaf.Datasource;
+
+public class MergeGraphSparkJobTest {
+
+ private ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp() {
+ mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ @Test
+ public void testMergeDatasources() throws IOException {
+ assertEquals(
+ "openaire-cris_1.1",
+ MergeGraphSparkJob
+ .mergeDatasource(
+ d("datasource_cris.json"),
+ d("datasource_UNKNOWN.json"))
+ .getOpenairecompatibility()
+ .getClassid());
+ assertEquals(
+ "openaire-cris_1.1",
+ MergeGraphSparkJob
+ .mergeDatasource(
+ d("datasource_UNKNOWN.json"),
+ d("datasource_cris.json"))
+ .getOpenairecompatibility()
+ .getClassid());
+ assertEquals(
+ "driver-openaire2.0",
+ MergeGraphSparkJob
+ .mergeDatasource(
+ d("datasource_native.json"),
+ d("datasource_driver-openaire2.0.json"))
+ .getOpenairecompatibility()
+ .getClassid());
+ assertEquals(
+ "driver-openaire2.0",
+ MergeGraphSparkJob
+ .mergeDatasource(
+ d("datasource_driver-openaire2.0.json"),
+ d("datasource_native.json"))
+ .getOpenairecompatibility()
+ .getClassid());
+ assertEquals(
+ "openaire4.0",
+ MergeGraphSparkJob
+ .mergeDatasource(
+ d("datasource_notCompatible.json"),
+ d("datasource_openaire4.0.json"))
+ .getOpenairecompatibility()
+ .getClassid());
+ assertEquals(
+ "notCompatible",
+ MergeGraphSparkJob
+ .mergeDatasource(
+ d("datasource_notCompatible.json"),
+ d("datasource_UNKNOWN.json"))
+ .getOpenairecompatibility()
+ .getClassid());
+ }
+
+ private Optional d(String file) throws IOException {
+ String json = IOUtils.toString(getClass().getResourceAsStream(file));
+ return Optional.of(mapper.readValue(json, Datasource.class));
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala
new file mode 100644
index 0000000000..4d83057f2e
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala
@@ -0,0 +1,54 @@
+package eu.dnetlib.dhp.sx.graph
+
+import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
+import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication
+import eu.dnetlib.dhp.sx.ebi.EBIAggregator
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.io.Source
+
+class SparkScholexplorerAggregationTest {
+
+
+ @Test
+ def testFunderRelationshipsMapping(): Unit = {
+ val publications = Source.fromInputStream(getClass.getResourceAsStream("publication.json")).mkString
+
+ var s: List[DLIPublication] = List[DLIPublication]()
+
+ val m: ObjectMapper = new ObjectMapper()
+
+ m.enable(SerializationFeature.INDENT_OUTPUT)
+
+ for (line <- publications.lines) {
+ s = m.readValue(line, classOf[DLIPublication]) :: s
+
+
+ }
+
+
+ implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
+ val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").getOrCreate()
+
+
+ val ds: Dataset[DLIPublication] = spark.createDataset(spark.sparkContext.parallelize(s)).as[DLIPublication]
+
+ val unique = ds.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
+ .groupByKey(_._1)(Encoders.STRING)
+ .agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
+ .map(p => p._2)
+
+ val uniquePubs: DLIPublication = unique.first()
+
+ s.foreach(pp => assertFalse(pp.getAuthor.isEmpty))
+
+
+ assertNotNull(uniquePubs.getAuthor)
+ assertFalse(uniquePubs.getAuthor.isEmpty)
+
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_UNKNOWN.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_UNKNOWN.json
new file mode 100644
index 0000000000..a01085c8f4
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_UNKNOWN.json
@@ -0,0 +1 @@
+{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "UNKNOWN" }}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json
new file mode 100644
index 0000000000..6f2b7aa7d1
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json
@@ -0,0 +1 @@
+{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" }}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_driver-openaire2.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_driver-openaire2.0.json
new file mode 100644
index 0000000000..d2e375f552
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_driver-openaire2.0.json
@@ -0,0 +1 @@
+{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "driver-openaire2.0" }}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_hostedby.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_hostedby.json
new file mode 100644
index 0000000000..03db887f52
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_hostedby.json
@@ -0,0 +1 @@
+{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "hostedBy" }}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_native.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_native.json
new file mode 100644
index 0000000000..7831a3fc37
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_native.json
@@ -0,0 +1 @@
+{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "native" }}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_notCompatible.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_notCompatible.json
new file mode 100644
index 0000000000..8dabe5d2c0
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_notCompatible.json
@@ -0,0 +1 @@
+{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "notCompatible" }}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json
new file mode 100644
index 0000000000..e2db479432
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json
@@ -0,0 +1 @@
+{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" }}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0_data.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0_data.json
new file mode 100644
index 0000000000..b8480faf07
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0_data.json
@@ -0,0 +1 @@
+{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0_data" }}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire3.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire3.0.json
new file mode 100644
index 0000000000..43bb0a7a4b
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire3.0.json
@@ -0,0 +1 @@
+{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire3.0" }}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire4.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire4.0.json
new file mode 100644
index 0000000000..7cdba6a4e3
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire4.0.json
@@ -0,0 +1 @@
+{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire4.0" }}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala
index abac41b89a..705160a2bb 100644
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala
+++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala
@@ -47,6 +47,7 @@ object DLIToOAF {
"References" -> ("isRelatedTo", "relationship"),
"IsRelatedTo" -> ("isRelatedTo", "relationship"),
"IsSupplementedBy" -> ("isSupplementedBy", "supplement"),
+ "Documents"-> ("isRelatedTo", "relationship"),
"Cites" -> ("cites", "citation"),
"Unknown" -> ("isRelatedTo", "relationship"),
"IsSourceOf" -> ("isRelatedTo", "relationship"),
@@ -83,7 +84,7 @@ object DLIToOAF {
val rel_inverse: Map[String, String] = Map(
"isRelatedTo" -> "isRelatedTo",
- "IsSupplementedBy" -> "isSupplementTo",
+ "isSupplementedBy" -> "isSupplementTo",
"cites" -> "IsCitedBy",
"IsCitedBy" -> "cites",
"reviews" -> "IsReviewedBy"
@@ -273,29 +274,18 @@ object DLIToOAF {
}
-// def convertDLIRelation(r: DLIRelation): Relation = {
-//
-// val result = new Relation
-// if (!relationTypeMapping.contains(r.getRelType))
-// return null
-//
-// if (r.getProperties == null || r.getProperties.size() == 0 || (r.getProperties.size() == 1 && r.getProperties.get(0) == null))
-// return null
-// val t = relationTypeMapping.get(r.getRelType)
-//
-// result.setRelType("resultResult")
-// result.setRelClass(t.get._1)
-// result.setSubRelType(t.get._2)
-// result.setCollectedfrom(r.getProperties.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
-// result.setSource(generateId(r.getSource))
-// result.setTarget(generateId(r.getTarget))
-//
-// if (result.getSource.equals(result.getTarget))
-// return null
-// result.setDataInfo(generateDataInfo())
-//
-// result
-// }
+ def convertDLIRelation(r: Relation): Relation = {
+
+ val rt = r.getRelType
+ if (!relationTypeMapping.contains(rt))
+ return null
+ r.setRelType("resultResult")
+ r.setRelClass(relationTypeMapping(rt)._1)
+ r.setSubRelType(relationTypeMapping(rt)._2)
+ r.setSource(generateId(r.getSource))
+ r.setTarget(generateId(r.getTarget))
+ r
+ }
def convertDLIDatasetTOOAF(d: DLIDataset): Dataset = {
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala
index 6c6e2c8356..f1e374f954 100644
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala
+++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala
@@ -15,11 +15,13 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.codehaus.jackson.map.ObjectMapper
import scala.collection.mutable.ArrayBuffer
-
+import scala.collection.JavaConverters._
object SparkExportContentForOpenAire {
+
+
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkExportContentForOpenAire.getClass.getResourceAsStream("input_export_content_parameters.json")))
@@ -42,9 +44,11 @@ object SparkExportContentForOpenAire {
import spark.implicits._
-
val dsRel = spark.read.load(s"$workingPath/relation_b").as[Relation]
- dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS")
+ dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false)
+ .map(DLIToOAF.convertDLIRelation)
+ .filter(r => r!= null)
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS")
val dsPubs = spark.read.load(s"$workingPath/publication").as[DLIPublication]
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala
index 0bd746cffc..cb04cf9e95 100644
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala
+++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala
@@ -5,9 +5,7 @@ import java.time.format.DateTimeFormatter
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
-import org.apache.spark.SparkConf
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
+
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.junit.jupiter.api.Test
@@ -23,6 +21,19 @@ class ExportDLITOOAFTest {
}
+
+ @Test
+ def testMappingRele():Unit = {
+
+ val r:Relation = new Relation
+ r.setSource("60|fbff1d424e045eecf24151a5fe3aa738")
+ r.setTarget("50|dedup_wf_001::ec409f09e63347d4e834087fe1483877")
+
+ val r1 =DLIToOAF.convertDLIRelation(r)
+ println(r1.getSource, r1.getTarget)
+
+ }
+
@Test
def testPublicationMapping():Unit = {