From 8d85a2fced371aa00f909a5d22f829409076abec Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 7 Oct 2020 16:28:52 +0200 Subject: [PATCH 1/6] [BETA wf only] datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom behaviour that, given two datasources from beta and prod returns the one from prod with the highest compatibility among the two --- .../DatasourceCompatibilityComparator.java | 97 +++++++++++++++++++ .../oa/graph/merge/MergeGraphSparkJob.java | 37 ++++++- .../graph/merge/MergeGraphSparkJobTest.java | 84 ++++++++++++++++ .../oa/graph/merge/datasource_UNKNOWN.json | 1 + .../dhp/oa/graph/merge/datasource_cris.json | 1 + .../merge/datasource_driver-openaire2.0.json | 1 + .../oa/graph/merge/datasource_hostedby.json | 1 + .../dhp/oa/graph/merge/datasource_native.json | 1 + .../graph/merge/datasource_notCompatible.json | 1 + .../graph/merge/datasource_openaire2.0.json | 1 + .../merge/datasource_openaire2.0_data.json | 1 + .../graph/merge/datasource_openaire3.0.json | 1 + .../graph/merge/datasource_openaire4.0.json | 1 + 13 files changed, 226 insertions(+), 2 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/DatasourceCompatibilityComparator.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_UNKNOWN.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_driver-openaire2.0.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_hostedby.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_native.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_notCompatible.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0_data.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire3.0.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire4.0.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/DatasourceCompatibilityComparator.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/DatasourceCompatibilityComparator.java new file mode 100644 index 000000000..59bdb3914 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/DatasourceCompatibilityComparator.java @@ -0,0 +1,97 @@ + +package eu.dnetlib.dhp.oa.graph.merge; + +import java.util.Comparator; + +import eu.dnetlib.dhp.schema.oaf.Qualifier; + +public class DatasourceCompatibilityComparator implements Comparator { + @Override + public int compare(Qualifier left, Qualifier right) { + + String lClass = left.getClassid(); + String rClass = right.getClassid(); + + if (lClass.equals(rClass)) + return 0; + + if (lClass.equals("openaire-cris_1.1")) + return -1; + if (rClass.equals("openaire-cris_1.1")) + return 1; + + if (lClass.equals("openaire4.0")) + return -1; + if (rClass.equals("openaire4.0")) + return 1; + + if (lClass.equals("driver-openaire2.0")) + return -1; + if (rClass.equals("driver-openaire2.0")) + return 1; + + if (lClass.equals("driver")) + return -1; + if (rClass.equals("driver")) + return 1; + + if (lClass.equals("openaire2.0")) + return -1; + if (rClass.equals("openaire2.0")) + return 1; + + if (lClass.equals("openaire3.0")) + return -1; + if (rClass.equals("openaire3.0")) + return 1; + + if (lClass.equals("openaire2.0_data")) + return -1; + if (rClass.equals("openaire2.0_data")) + return 1; + + if (lClass.equals("native")) + return -1; + if (rClass.equals("native")) + return 1; + + if (lClass.equals("hostedBy")) + return -1; + if (rClass.equals("hostedBy")) + return 1; + + if (lClass.equals("notCompatible")) + return -1; + if (rClass.equals("notCompatible")) + return 1; + + if (lClass.equals("UNKNOWN")) + return -1; + if (rClass.equals("UNKNOWN")) + return 1; + + // Else (but unlikely), lexicographical ordering will do. + return lClass.compareTo(rClass); + } + + /* + * CASE WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY + * ['openaire-cris_1.1']) THEN 'openaire-cris_1.1@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT + * COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire4.0']) THEN + * 'openaire4.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, + * a.compatibility):: TEXT) @> ARRAY ['driver', 'openaire2.0']) THEN + * 'driver-openaire2.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE + * (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['driver']) THEN + * 'driver@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, + * a.compatibility) :: TEXT) @> ARRAY ['openaire2.0']) THEN 'openaire2.0@@@dnet:datasourceCompatibilityLevel' WHEN + * (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire3.0']) THEN + * 'openaire3.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, + * a.compatibility) :: TEXT) @> ARRAY ['openaire2.0_data']) THEN + * 'openaire2.0_data@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE + * (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['native']) THEN + * 'native@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, + * a.compatibility) :: TEXT) @> ARRAY ['hostedBy']) THEN 'hostedBy@@@dnet:datasourceCompatibilityLevel' WHEN + * (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['notCompatible']) + * THEN 'notCompatible@@@dnet:datasourceCompatibilityLevel' ELSE 'UNKNOWN@@@dnet:datasourceCompatibilityLevel' END + */ +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java index b723de955..fa18820ae 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java @@ -3,8 +3,9 @@ package eu.dnetlib.dhp.oa.graph.merge; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.Objects; -import java.util.Optional; +import java.util.*; + +import javax.xml.crypto.Data; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -14,6 +15,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +41,14 @@ public class MergeGraphSparkJob { private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD + private static final Datasource DATASOURCE = new Datasource(); + + static { + Qualifier compatibility = new Qualifier(); + compatibility.setClassid("UNKNOWN"); + DATASOURCE.setOpenairecompatibility(compatibility); + } + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -104,6 +114,10 @@ public class MergeGraphSparkJob { .map((MapFunction, Tuple2>, P>) value -> { Optional

p = Optional.ofNullable(value._1()).map(Tuple2::_2); Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2); + + if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) { + return mergeDatasource(p, b); + } switch (priority) { default: case "BETA": @@ -119,6 +133,25 @@ public class MergeGraphSparkJob { .json(outputPath); } + protected static

P mergeDatasource(Optional

p, Optional b) { + if (p.isPresent() & !b.isPresent()) { + return p.get(); + } + if (b.isPresent() & !p.isPresent()) { + return (P) b.get(); + } + if (!b.isPresent() & !p.isPresent()) { + return null; // unlikely, at least one should be produced by the join operation + } + + Datasource dp = (Datasource) p.get(); + Datasource db = (Datasource) b.get(); + + List list = Arrays.asList(dp.getOpenairecompatibility(), db.getOpenairecompatibility()); + dp.setOpenairecompatibility(Collections.min(list, new DatasourceCompatibilityComparator())); + return (P) dp; + } + private static

P mergeWithPriorityToPROD(Optional

p, Optional b) { if (b.isPresent() & !p.isPresent()) { return (P) b.get(); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java new file mode 100644 index 000000000..28e8e5abc --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java @@ -0,0 +1,84 @@ + +package eu.dnetlib.dhp.oa.graph.merge; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.oaf.Datasource; + +public class MergeGraphSparkJobTest { + + private ObjectMapper mapper; + + @BeforeEach + public void setUp() { + mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Test + public void testMergeDatasources() throws IOException { + assertEquals( + "openaire-cris_1.1", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_cris.json"), + d("datasource_UNKNOWN.json")) + .getOpenairecompatibility() + .getClassid()); + assertEquals( + "openaire-cris_1.1", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_UNKNOWN.json"), + d("datasource_cris.json")) + .getOpenairecompatibility() + .getClassid()); + assertEquals( + "driver-openaire2.0", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_native.json"), + d("datasource_driver-openaire2.0.json")) + .getOpenairecompatibility() + .getClassid()); + assertEquals( + "driver-openaire2.0", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_driver-openaire2.0.json"), + d("datasource_native.json")) + .getOpenairecompatibility() + .getClassid()); + assertEquals( + "openaire4.0", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_notCompatible.json"), + d("datasource_openaire4.0.json")) + .getOpenairecompatibility() + .getClassid()); + assertEquals( + "notCompatible", + MergeGraphSparkJob + .mergeDatasource( + d("datasource_notCompatible.json"), + d("datasource_UNKNOWN.json")) + .getOpenairecompatibility() + .getClassid()); + } + + private Optional d(String file) throws IOException { + String json = IOUtils.toString(getClass().getResourceAsStream(file)); + return Optional.of(mapper.readValue(json, Datasource.class)); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_UNKNOWN.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_UNKNOWN.json new file mode 100644 index 000000000..a01085c8f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_UNKNOWN.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "UNKNOWN" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json new file mode 100644 index 000000000..6f2b7aa7d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_cris.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_driver-openaire2.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_driver-openaire2.0.json new file mode 100644 index 000000000..d2e375f55 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_driver-openaire2.0.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "driver-openaire2.0" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_hostedby.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_hostedby.json new file mode 100644 index 000000000..03db887f5 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_hostedby.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "hostedBy" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_native.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_native.json new file mode 100644 index 000000000..7831a3fc3 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_native.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "native" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_notCompatible.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_notCompatible.json new file mode 100644 index 000000000..8dabe5d2c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_notCompatible.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "notCompatible" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json new file mode 100644 index 000000000..e2db47943 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0_data.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0_data.json new file mode 100644 index 000000000..b8480faf0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire2.0_data.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0_data" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire3.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire3.0.json new file mode 100644 index 000000000..43bb0a7a4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire3.0.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire3.0" }} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire4.0.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire4.0.json new file mode 100644 index 000000000..7cdba6a4e --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/merge/datasource_openaire4.0.json @@ -0,0 +1 @@ +{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire4.0" }} \ No newline at end of file From a3f37a9414823d2743e0830a8f21b7289d62a572 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 7 Oct 2020 16:44:22 +0200 Subject: [PATCH 2/6] javadoc --- .../dhp/oa/graph/merge/MergeGraphSparkJob.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java index fa18820ae..037683604 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java @@ -133,6 +133,17 @@ public class MergeGraphSparkJob { .json(outputPath); } + /** + * Datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom + * behaviour that, given two datasources from beta and prod returns the one from prod with the highest + * compatibility among the two. + * + * @param p datasource from PROD + * @param b datasource from BETA + * @param

Datasource class type from PROD + * @param Datasource class type from BETA + * @return the datasource from PROD with the highest compatibility level. + */ protected static

P mergeDatasource(Optional

p, Optional b) { if (p.isPresent() & !b.isPresent()) { return p.get(); From fe0a7870e67f3f19ad55e715df2fe132770e54ca Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 8 Oct 2020 10:33:12 +0200 Subject: [PATCH 3/6] Added test to check if merge authors works --- .../SparkScholexplorerAggregationTest.scala | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala new file mode 100644 index 000000000..4d83057f2 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala @@ -0,0 +1,54 @@ +package eu.dnetlib.dhp.sx.graph + +import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} +import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication +import eu.dnetlib.dhp.sx.ebi.EBIAggregator +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.io.Source + +class SparkScholexplorerAggregationTest { + + + @Test + def testFunderRelationshipsMapping(): Unit = { + val publications = Source.fromInputStream(getClass.getResourceAsStream("publication.json")).mkString + + var s: List[DLIPublication] = List[DLIPublication]() + + val m: ObjectMapper = new ObjectMapper() + + m.enable(SerializationFeature.INDENT_OUTPUT) + + for (line <- publications.lines) { + s = m.readValue(line, classOf[DLIPublication]) :: s + + + } + + + implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] + val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").getOrCreate() + + + val ds: Dataset[DLIPublication] = spark.createDataset(spark.sparkContext.parallelize(s)).as[DLIPublication] + + val unique = ds.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIPublicationAggregator().toColumn) + .map(p => p._2) + + val uniquePubs: DLIPublication = unique.first() + + s.foreach(pp => assertFalse(pp.getAuthor.isEmpty)) + + + assertNotNull(uniquePubs.getAuthor) + assertFalse(uniquePubs.getAuthor.isEmpty) + + + } + +} From eec418cd26d8724887f6f36cf7a98af6a8711c0a Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 8 Oct 2020 10:33:55 +0200 Subject: [PATCH 4/6] moved AuthoreMerger into dhp-common --- dhp-common/pom.xml | 11 ++ .../eu/dnetlib/dhp/oa/merge/AuthorMerger.java | 168 +++++++++++++++++ .../eu/dnetlib/dhp/oa/dedup/AuthorMerger.java | 170 ------------------ .../dhp/oa/dedup/DedupRecordFactory.java | 3 +- .../dhp/oa/dedup/EntityMergerTest.java | 1 + dhp-workflows/dhp-graph-mapper/pom.xml | 7 - .../eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala | 2 +- 7 files changed, 183 insertions(+), 179 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java delete mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AuthorMerger.java diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index 1dc3208b5..b295bc1f1 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -92,6 +92,17 @@ com.squareup.okhttp3 okhttp + + + eu.dnetlib + dnet-pace-core + + + + eu.dnetlib.dhp + dhp-schemas + ${project.version} + diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java new file mode 100644 index 000000000..bc86a0245 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java @@ -0,0 +1,168 @@ +package eu.dnetlib.dhp.oa.merge; +import java.text.Normalizer; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + +import com.wcohen.ss.JaroWinkler; + +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.pace.model.Person; +import scala.Tuple2; + +public class AuthorMerger { + + private static final Double THRESHOLD = 0.95; + + public static List merge(List> authors) { + + authors.sort((o1, o2) -> -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2))); + + List author = new ArrayList<>(); + + for (List a : authors) { + author = mergeAuthor(author, a); + } + + return author; + + } + + public static List mergeAuthor(final List a, final List b) { + int pa = countAuthorsPids(a); + int pb = countAuthorsPids(b); + List base, enrich; + int sa = authorsSize(a); + int sb = authorsSize(b); + + if (pa == pb) { + base = sa > sb ? a : b; + enrich = sa > sb ? b : a; + } else { + base = pa > pb ? a : b; + enrich = pa > pb ? b : a; + } + enrichPidFromList(base, enrich); + return base; + } + + private static void enrichPidFromList(List base, List enrich) { + if (base == null || enrich == null) + return; + final Map basePidAuthorMap = base + .stream() + .filter(a -> a.getPid() != null && a.getPid().size() > 0) + .flatMap( + a -> a + .getPid() + .stream() + .map(p -> new Tuple2<>(pidToComparableString(p), a))) + .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1)); + + final List> pidToEnrich = enrich + .stream() + .filter(a -> a.getPid() != null && a.getPid().size() > 0) + .flatMap( + a -> a + .getPid() + .stream() + .filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p))) + .map(p -> new Tuple2<>(p, a))) + .collect(Collectors.toList()); + + pidToEnrich + .forEach( + a -> { + Optional> simAuthor = base + .stream() + .map(ba -> new Tuple2<>(sim(ba, a._2()), ba)) + .max(Comparator.comparing(Tuple2::_1)); + + if (simAuthor.isPresent()) { + double th = THRESHOLD; + // increase the threshold if the surname is too short + if (simAuthor.get()._2().getSurname() != null + && simAuthor.get()._2().getSurname().length() <= 3) + th = 0.99; + + if (simAuthor.get()._1() > th) { + Author r = simAuthor.get()._2(); + if (r.getPid() == null) { + r.setPid(new ArrayList<>()); + } + r.getPid().add(a._1()); + } + } + }); + } + + public static String pidToComparableString(StructuredProperty pid) { + return (pid.getQualifier() != null + ? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : "" + : "") + + (pid.getValue() != null ? pid.getValue().toLowerCase() : ""); + } + + public static int countAuthorsPids(List authors) { + if (authors == null) + return 0; + + return (int) authors.stream().filter(AuthorMerger::hasPid).count(); + } + + private static int authorsSize(List authors) { + if (authors == null) + return 0; + return authors.size(); + } + + private static Double sim(Author a, Author b) { + + final Person pa = parse(a); + final Person pb = parse(b); + + // if both are accurate (e.g. they have name and surname) + if (pa.isAccurate() & pb.isAccurate()) { + return new JaroWinkler().score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString())) * 0.5 + + new JaroWinkler().score(normalize(pa.getNameString()), normalize(pb.getNameString())) * 0.5; + } else { + return new JaroWinkler() + .score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname())); + } + } + + private static boolean hasPid(Author a) { + if (a == null || a.getPid() == null || a.getPid().size() == 0) + return false; + return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue())); + } + + private static Person parse(Author author) { + if (StringUtils.isNotBlank(author.getSurname())) { + return new Person(author.getSurname() + ", " + author.getName(), false); + } else { + return new Person(author.getFullname(), false); + } + } + + private static String normalize(final String s) { + return nfd(s) + .toLowerCase() + // do not compact the regexes in a single expression, would cause StackOverflowError + // in case + // of large input strings + .replaceAll("(\\W)+", " ") + .replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ") + .replaceAll("(\\p{Punct})+", " ") + .replaceAll("(\\d)+", " ") + .replaceAll("(\\n)+", " ") + .trim(); + } + + private static String nfd(final String s) { + return Normalizer.normalize(s, Normalizer.Form.NFD); + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AuthorMerger.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AuthorMerger.java deleted file mode 100644 index ee5fd5165..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AuthorMerger.java +++ /dev/null @@ -1,170 +0,0 @@ - -package eu.dnetlib.dhp.oa.dedup; - -import java.text.Normalizer; -import java.util.*; -import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; - -import com.wcohen.ss.JaroWinkler; - -import eu.dnetlib.dhp.schema.oaf.Author; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import eu.dnetlib.pace.model.Person; -import scala.Tuple2; - -public class AuthorMerger { - - private static final Double THRESHOLD = 0.95; - - public static List merge(List> authors) { - - authors.sort((o1, o2) -> -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2))); - - List author = new ArrayList<>(); - - for (List a : authors) { - author = mergeAuthor(author, a); - } - - return author; - - } - - public static List mergeAuthor(final List a, final List b) { - int pa = countAuthorsPids(a); - int pb = countAuthorsPids(b); - List base, enrich; - int sa = authorsSize(a); - int sb = authorsSize(b); - - if (pa == pb) { - base = sa > sb ? a : b; - enrich = sa > sb ? b : a; - } else { - base = pa > pb ? a : b; - enrich = pa > pb ? b : a; - } - enrichPidFromList(base, enrich); - return base; - } - - private static void enrichPidFromList(List base, List enrich) { - if (base == null || enrich == null) - return; - final Map basePidAuthorMap = base - .stream() - .filter(a -> a.getPid() != null && a.getPid().size() > 0) - .flatMap( - a -> a - .getPid() - .stream() - .map(p -> new Tuple2<>(pidToComparableString(p), a))) - .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1)); - - final List> pidToEnrich = enrich - .stream() - .filter(a -> a.getPid() != null && a.getPid().size() > 0) - .flatMap( - a -> a - .getPid() - .stream() - .filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p))) - .map(p -> new Tuple2<>(p, a))) - .collect(Collectors.toList()); - - pidToEnrich - .forEach( - a -> { - Optional> simAuthor = base - .stream() - .map(ba -> new Tuple2<>(sim(ba, a._2()), ba)) - .max(Comparator.comparing(Tuple2::_1)); - - if (simAuthor.isPresent()) { - double th = THRESHOLD; - // increase the threshold if the surname is too short - if (simAuthor.get()._2().getSurname() != null - && simAuthor.get()._2().getSurname().length() <= 3) - th = 0.99; - - if (simAuthor.get()._1() > th) { - Author r = simAuthor.get()._2(); - if (r.getPid() == null) { - r.setPid(new ArrayList<>()); - } - r.getPid().add(a._1()); - } - } - }); - } - - public static String pidToComparableString(StructuredProperty pid) { - return (pid.getQualifier() != null - ? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : "" - : "") - + (pid.getValue() != null ? pid.getValue().toLowerCase() : ""); - } - - public static int countAuthorsPids(List authors) { - if (authors == null) - return 0; - - return (int) authors.stream().filter(AuthorMerger::hasPid).count(); - } - - private static int authorsSize(List authors) { - if (authors == null) - return 0; - return authors.size(); - } - - private static Double sim(Author a, Author b) { - - final Person pa = parse(a); - final Person pb = parse(b); - - // if both are accurate (e.g. they have name and surname) - if (pa.isAccurate() & pb.isAccurate()) { - return new JaroWinkler().score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString())) * 0.5 - + new JaroWinkler().score(normalize(pa.getNameString()), normalize(pb.getNameString())) * 0.5; - } else { - return new JaroWinkler() - .score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname())); - } - } - - private static boolean hasPid(Author a) { - if (a == null || a.getPid() == null || a.getPid().size() == 0) - return false; - return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue())); - } - - private static Person parse(Author author) { - if (StringUtils.isNotBlank(author.getSurname())) { - return new Person(author.getSurname() + ", " + author.getName(), false); - } else { - return new Person(author.getFullname(), false); - } - } - - private static String normalize(final String s) { - return nfd(s) - .toLowerCase() - // do not compact the regexes in a single expression, would cause StackOverflowError - // in case - // of large input strings - .replaceAll("(\\W)+", " ") - .replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ") - .replaceAll("(\\p{Punct})+", " ") - .replaceAll("(\\d)+", " ") - .replaceAll("(\\n)+", " ") - .trim(); - } - - private static String nfd(final String s) { - return Normalizer.normalize(s, Normalizer.Form.NFD); - } - -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 8028d5a94..6a030f376 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,11 +1,12 @@ package eu.dnetlib.dhp.oa.dedup; -import java.io.Serializable; + import java.util.Collection; import java.util.Iterator; import java.util.List; +import eu.dnetlib.dhp.oa.merge.AuthorMerger; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java index 4fbd7c223..e00f6ac2a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java @@ -10,6 +10,7 @@ import java.io.Serializable; import java.nio.file.Paths; import java.util.*; +import eu.dnetlib.dhp.oa.merge.AuthorMerger; import org.codehaus.jackson.map.ObjectMapper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index 38c5c8af7..3e1d84c01 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -83,13 +83,6 @@ dhp-schemas ${project.version} - - - eu.dnetlib.dhp - dhp-dedup-openaire - ${project.version} - - com.jayway.jsonpath json-path diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala index 90d665e0c..ee2dbadfd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala @@ -1,5 +1,5 @@ package eu.dnetlib.dhp.sx.ebi -import eu.dnetlib.dhp.oa.dedup.AuthorMerger +import eu.dnetlib.dhp.oa.merge.AuthorMerger import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown} import org.apache.spark.sql.{Encoder, Encoders} From 734934e2eb428b902f385edde5ec1264c27a5d73 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 8 Oct 2020 17:29:29 +0200 Subject: [PATCH 5/6] fixed error on empty intersection with publication and relation on export to OAF --- .../eu/dnetlib/dhp/oa/merge/AuthorMerger.java | 254 +++++++++--------- .../dhp/oa/dedup/DedupRecordFactory.java | 3 +- .../dhp/oa/dedup/EntityMergerTest.java | 2 +- .../java/eu/dnetlib/dhp/export/DLIToOAF.scala | 29 +- .../SparkExportContentForOpenAire.scala | 2 +- 5 files changed, 136 insertions(+), 154 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java index bc86a0245..3fa5fcbab 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java @@ -1,4 +1,6 @@ + package eu.dnetlib.dhp.oa.merge; + import java.text.Normalizer; import java.util.*; import java.util.stream.Collectors; @@ -14,155 +16,155 @@ import scala.Tuple2; public class AuthorMerger { - private static final Double THRESHOLD = 0.95; + private static final Double THRESHOLD = 0.95; - public static List merge(List> authors) { + public static List merge(List> authors) { - authors.sort((o1, o2) -> -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2))); + authors.sort((o1, o2) -> -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2))); - List author = new ArrayList<>(); + List author = new ArrayList<>(); - for (List a : authors) { - author = mergeAuthor(author, a); - } + for (List a : authors) { + author = mergeAuthor(author, a); + } - return author; + return author; - } + } - public static List mergeAuthor(final List a, final List b) { - int pa = countAuthorsPids(a); - int pb = countAuthorsPids(b); - List base, enrich; - int sa = authorsSize(a); - int sb = authorsSize(b); + public static List mergeAuthor(final List a, final List b) { + int pa = countAuthorsPids(a); + int pb = countAuthorsPids(b); + List base, enrich; + int sa = authorsSize(a); + int sb = authorsSize(b); - if (pa == pb) { - base = sa > sb ? a : b; - enrich = sa > sb ? b : a; - } else { - base = pa > pb ? a : b; - enrich = pa > pb ? b : a; - } - enrichPidFromList(base, enrich); - return base; - } + if (pa == pb) { + base = sa > sb ? a : b; + enrich = sa > sb ? b : a; + } else { + base = pa > pb ? a : b; + enrich = pa > pb ? b : a; + } + enrichPidFromList(base, enrich); + return base; + } - private static void enrichPidFromList(List base, List enrich) { - if (base == null || enrich == null) - return; - final Map basePidAuthorMap = base - .stream() - .filter(a -> a.getPid() != null && a.getPid().size() > 0) - .flatMap( - a -> a - .getPid() - .stream() - .map(p -> new Tuple2<>(pidToComparableString(p), a))) - .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1)); + private static void enrichPidFromList(List base, List enrich) { + if (base == null || enrich == null) + return; + final Map basePidAuthorMap = base + .stream() + .filter(a -> a.getPid() != null && a.getPid().size() > 0) + .flatMap( + a -> a + .getPid() + .stream() + .map(p -> new Tuple2<>(pidToComparableString(p), a))) + .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1)); - final List> pidToEnrich = enrich - .stream() - .filter(a -> a.getPid() != null && a.getPid().size() > 0) - .flatMap( - a -> a - .getPid() - .stream() - .filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p))) - .map(p -> new Tuple2<>(p, a))) - .collect(Collectors.toList()); + final List> pidToEnrich = enrich + .stream() + .filter(a -> a.getPid() != null && a.getPid().size() > 0) + .flatMap( + a -> a + .getPid() + .stream() + .filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p))) + .map(p -> new Tuple2<>(p, a))) + .collect(Collectors.toList()); - pidToEnrich - .forEach( - a -> { - Optional> simAuthor = base - .stream() - .map(ba -> new Tuple2<>(sim(ba, a._2()), ba)) - .max(Comparator.comparing(Tuple2::_1)); + pidToEnrich + .forEach( + a -> { + Optional> simAuthor = base + .stream() + .map(ba -> new Tuple2<>(sim(ba, a._2()), ba)) + .max(Comparator.comparing(Tuple2::_1)); - if (simAuthor.isPresent()) { - double th = THRESHOLD; - // increase the threshold if the surname is too short - if (simAuthor.get()._2().getSurname() != null - && simAuthor.get()._2().getSurname().length() <= 3) - th = 0.99; + if (simAuthor.isPresent()) { + double th = THRESHOLD; + // increase the threshold if the surname is too short + if (simAuthor.get()._2().getSurname() != null + && simAuthor.get()._2().getSurname().length() <= 3) + th = 0.99; - if (simAuthor.get()._1() > th) { - Author r = simAuthor.get()._2(); - if (r.getPid() == null) { - r.setPid(new ArrayList<>()); - } - r.getPid().add(a._1()); - } - } - }); - } + if (simAuthor.get()._1() > th) { + Author r = simAuthor.get()._2(); + if (r.getPid() == null) { + r.setPid(new ArrayList<>()); + } + r.getPid().add(a._1()); + } + } + }); + } - public static String pidToComparableString(StructuredProperty pid) { - return (pid.getQualifier() != null - ? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : "" - : "") - + (pid.getValue() != null ? pid.getValue().toLowerCase() : ""); - } + public static String pidToComparableString(StructuredProperty pid) { + return (pid.getQualifier() != null + ? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : "" + : "") + + (pid.getValue() != null ? pid.getValue().toLowerCase() : ""); + } - public static int countAuthorsPids(List authors) { - if (authors == null) - return 0; + public static int countAuthorsPids(List authors) { + if (authors == null) + return 0; - return (int) authors.stream().filter(AuthorMerger::hasPid).count(); - } + return (int) authors.stream().filter(AuthorMerger::hasPid).count(); + } - private static int authorsSize(List authors) { - if (authors == null) - return 0; - return authors.size(); - } + private static int authorsSize(List authors) { + if (authors == null) + return 0; + return authors.size(); + } - private static Double sim(Author a, Author b) { + private static Double sim(Author a, Author b) { - final Person pa = parse(a); - final Person pb = parse(b); + final Person pa = parse(a); + final Person pb = parse(b); - // if both are accurate (e.g. they have name and surname) - if (pa.isAccurate() & pb.isAccurate()) { - return new JaroWinkler().score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString())) * 0.5 - + new JaroWinkler().score(normalize(pa.getNameString()), normalize(pb.getNameString())) * 0.5; - } else { - return new JaroWinkler() - .score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname())); - } - } + // if both are accurate (e.g. they have name and surname) + if (pa.isAccurate() & pb.isAccurate()) { + return new JaroWinkler().score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString())) * 0.5 + + new JaroWinkler().score(normalize(pa.getNameString()), normalize(pb.getNameString())) * 0.5; + } else { + return new JaroWinkler() + .score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname())); + } + } - private static boolean hasPid(Author a) { - if (a == null || a.getPid() == null || a.getPid().size() == 0) - return false; - return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue())); - } + private static boolean hasPid(Author a) { + if (a == null || a.getPid() == null || a.getPid().size() == 0) + return false; + return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue())); + } - private static Person parse(Author author) { - if (StringUtils.isNotBlank(author.getSurname())) { - return new Person(author.getSurname() + ", " + author.getName(), false); - } else { - return new Person(author.getFullname(), false); - } - } + private static Person parse(Author author) { + if (StringUtils.isNotBlank(author.getSurname())) { + return new Person(author.getSurname() + ", " + author.getName(), false); + } else { + return new Person(author.getFullname(), false); + } + } - private static String normalize(final String s) { - return nfd(s) - .toLowerCase() - // do not compact the regexes in a single expression, would cause StackOverflowError - // in case - // of large input strings - .replaceAll("(\\W)+", " ") - .replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ") - .replaceAll("(\\p{Punct})+", " ") - .replaceAll("(\\d)+", " ") - .replaceAll("(\\n)+", " ") - .trim(); - } + private static String normalize(final String s) { + return nfd(s) + .toLowerCase() + // do not compact the regexes in a single expression, would cause StackOverflowError + // in case + // of large input strings + .replaceAll("(\\W)+", " ") + .replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ") + .replaceAll("(\\p{Punct})+", " ") + .replaceAll("(\\d)+", " ") + .replaceAll("(\\n)+", " ") + .trim(); + } - private static String nfd(final String s) { - return Normalizer.normalize(s, Normalizer.Form.NFD); - } + private static String nfd(final String s) { + return Normalizer.normalize(s, Normalizer.Form.NFD); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 6a030f376..f7e2e9ae0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,12 +1,10 @@ package eu.dnetlib.dhp.oa.dedup; - import java.util.Collection; import java.util.Iterator; import java.util.List; -import eu.dnetlib.dhp.oa.merge.AuthorMerger; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -19,6 +17,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java index e00f6ac2a..30cfebe79 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java @@ -10,11 +10,11 @@ import java.io.Serializable; import java.nio.file.Paths; import java.util.*; -import eu.dnetlib.dhp.oa.merge.AuthorMerger; import org.codehaus.jackson.map.ObjectMapper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.pace.util.MapDocumentUtil; import scala.Tuple2; diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala index abac41b89..625ccdde9 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala @@ -272,30 +272,11 @@ object DLIToOAF { result } - -// def convertDLIRelation(r: DLIRelation): Relation = { -// -// val result = new Relation -// if (!relationTypeMapping.contains(r.getRelType)) -// return null -// -// if (r.getProperties == null || r.getProperties.size() == 0 || (r.getProperties.size() == 1 && r.getProperties.get(0) == null)) -// return null -// val t = relationTypeMapping.get(r.getRelType) -// -// result.setRelType("resultResult") -// result.setRelClass(t.get._1) -// result.setSubRelType(t.get._2) -// result.setCollectedfrom(r.getProperties.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava) -// result.setSource(generateId(r.getSource)) -// result.setTarget(generateId(r.getTarget)) -// -// if (result.getSource.equals(result.getTarget)) -// return null -// result.setDataInfo(generateDataInfo()) -// -// result -// } + def convertDLIRelation(r: Relation): Relation = { + r.setSource(r.getSource.replaceFirst("50|","50|scholix_____::" ).replaceFirst("60|", "60|scholix_____::")) + r.setTarget(r.getTarget.replaceFirst("50|","50|scholix_____::" ).replaceFirst("60|", "60|scholix_____::")) + r + } def convertDLIDatasetTOOAF(d: DLIDataset): Dataset = { diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala index 6c6e2c835..6a6140d15 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala @@ -44,7 +44,7 @@ object SparkExportContentForOpenAire { val dsRel = spark.read.load(s"$workingPath/relation_b").as[Relation] - dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS") + dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).map(DLIToOAF.convertDLIRelation).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS") val dsPubs = spark.read.load(s"$workingPath/publication").as[DLIPublication] From 34bf64c94fb37c1f947147d5d09149b734a39474 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 13 Oct 2020 08:47:58 +0200 Subject: [PATCH 6/6] fixed export Scholexplorer to OpenAire --- .../eu/dnetlib/dhp/doiboost/QueryTest.scala | 37 ++++++++++++------- .../java/eu/dnetlib/dhp/export/DLIToOAF.scala | 15 ++++++-- .../SparkExportContentForOpenAire.scala | 10 +++-- .../dhp/export/ExportDLITOOAFTest.scala | 17 +++++++-- 4 files changed, 56 insertions(+), 23 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala index f23996420..243719549 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala @@ -1,17 +1,13 @@ package eu.dnetlib.dhp.doiboost -import eu.dnetlib.dhp.schema.oaf.Project + +import eu.dnetlib.dhp.schema.oaf.Publication import org.apache.spark.SparkContext -import org.apache.spark.sql.functions.{col, sum} -import org.apache.hadoop.io.Text -import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} -import org.codehaus.jackson.map.ObjectMapper -import org.json4s.DefaultFormats +import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} import org.json4s import org.json4s.DefaultFormats -import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ -import scala.:: + import scala.collection.JavaConverters._ class QueryTest { @@ -27,19 +23,32 @@ class QueryTest { } + def hasInstanceWithUrl(p:Publication):Boolean = { + val c = p.getInstance.asScala.map(i => i.getUrl!= null && !i.getUrl.isEmpty).size + !(!p.getInstance.isEmpty && c == p.getInstance().size) + } + + + def hasNullAccessRights(p:Publication):Boolean = { + val c = p.getInstance.asScala.map(i => i.getAccessright!= null && i.getAccessright.getClassname.nonEmpty).size + !p.getInstance.isEmpty && c == p.getInstance().size() + } + def myQuery(spark:SparkSession, sc:SparkContext): Unit = { - implicit val mapEncoderPub: Encoder[Project] = Encoders.kryo[Project] + implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] + + val mapper = new ObjectMapper() + mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) -// val ds:Dataset[Project] = spark.createDataset(sc.sequenceFile("", classOf[Text], classOf[Text]) -// .map(_._2.toString) -// .map(s => new ObjectMapper().readValue(s, classOf[Project]))) -// -// ds.write.saveAsTable() + val ds:Dataset[Publication] = spark.read.load("/tmp/p").as[Publication] + ds.filter(p =>p.getBestaccessright!= null && p.getBestaccessright.getClassname.nonEmpty).count() + + } } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala index 625ccdde9..705160a2b 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala @@ -47,6 +47,7 @@ object DLIToOAF { "References" -> ("isRelatedTo", "relationship"), "IsRelatedTo" -> ("isRelatedTo", "relationship"), "IsSupplementedBy" -> ("isSupplementedBy", "supplement"), + "Documents"-> ("isRelatedTo", "relationship"), "Cites" -> ("cites", "citation"), "Unknown" -> ("isRelatedTo", "relationship"), "IsSourceOf" -> ("isRelatedTo", "relationship"), @@ -83,7 +84,7 @@ object DLIToOAF { val rel_inverse: Map[String, String] = Map( "isRelatedTo" -> "isRelatedTo", - "IsSupplementedBy" -> "isSupplementTo", + "isSupplementedBy" -> "isSupplementTo", "cites" -> "IsCitedBy", "IsCitedBy" -> "cites", "reviews" -> "IsReviewedBy" @@ -272,9 +273,17 @@ object DLIToOAF { result } + def convertDLIRelation(r: Relation): Relation = { - r.setSource(r.getSource.replaceFirst("50|","50|scholix_____::" ).replaceFirst("60|", "60|scholix_____::")) - r.setTarget(r.getTarget.replaceFirst("50|","50|scholix_____::" ).replaceFirst("60|", "60|scholix_____::")) + + val rt = r.getRelType + if (!relationTypeMapping.contains(rt)) + return null + r.setRelType("resultResult") + r.setRelClass(relationTypeMapping(rt)._1) + r.setSubRelType(relationTypeMapping(rt)._2) + r.setSource(generateId(r.getSource)) + r.setTarget(generateId(r.getTarget)) r } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala index 6a6140d15..f1e374f95 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala @@ -15,11 +15,13 @@ import org.apache.spark.{SparkConf, SparkContext} import org.codehaus.jackson.map.ObjectMapper import scala.collection.mutable.ArrayBuffer - +import scala.collection.JavaConverters._ object SparkExportContentForOpenAire { + + def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser(IOUtils.toString(SparkExportContentForOpenAire.getClass.getResourceAsStream("input_export_content_parameters.json"))) @@ -42,9 +44,11 @@ object SparkExportContentForOpenAire { import spark.implicits._ - val dsRel = spark.read.load(s"$workingPath/relation_b").as[Relation] - dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).map(DLIToOAF.convertDLIRelation).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS") + dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false) + .map(DLIToOAF.convertDLIRelation) + .filter(r => r!= null) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS") val dsPubs = spark.read.load(s"$workingPath/publication").as[DLIPublication] diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala index 0bd746cff..cb04cf9e9 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala @@ -5,9 +5,7 @@ import java.time.format.DateTimeFormatter import eu.dnetlib.dhp.schema.oaf.Relation import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication} -import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession + import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} import org.junit.jupiter.api.Test @@ -23,6 +21,19 @@ class ExportDLITOOAFTest { } + + @Test + def testMappingRele():Unit = { + + val r:Relation = new Relation + r.setSource("60|fbff1d424e045eecf24151a5fe3aa738") + r.setTarget("50|dedup_wf_001::ec409f09e63347d4e834087fe1483877") + + val r1 =DLIToOAF.convertDLIRelation(r) + println(r1.getSource, r1.getTarget) + + } + @Test def testPublicationMapping():Unit = {