From 2c126ed014b2e0b032cb5eb9359acfbb5655765c Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 22 Dec 2021 16:03:41 +0100 Subject: [PATCH 1/4] [BipFinder] create unresolved entities with measures at the level of the instance --- .../createunresolvedentities/PrepareBipFinder.java | 10 +++++++--- .../createunresolvedentities/SparkSaveUnresolved.java | 4 ++-- .../createunresolvedentities/PrepareTest.java | 8 +++++++- .../createunresolvedentities/ProduceTest.java | 6 ++++-- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java index c7049430e..ab740a024 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java @@ -6,10 +6,12 @@ import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.UPDATE_CLASS_NAME; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.oaf.Instance; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -39,7 +41,7 @@ public class PrepareBipFinder implements Serializable { private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( @@ -75,7 +77,7 @@ public class PrepareBipFinder implements Serializable { }); } - private static void prepareResults(SparkSession spark, String inputPath, String outputPath) { + private static void prepareResults(SparkSession spark, String inputPath, String outputPath) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -95,7 +97,9 @@ public class PrepareBipFinder implements Serializable { Result r = new Result(); r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI)); - r.setMeasures(getMeasure(v)); + Instance inst = new Instance(); + inst.setMeasures(getMeasure(v)); + r.setInstance(Arrays.asList(inst)); return r; }, Encoders.bean(Result.class)) .write() diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java index 918f6a417..1a115e18f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java @@ -20,7 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkSaveUnresolved implements Serializable { - private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(SparkSaveUnresolved.class); public static void main(String[] args) throws Exception { @@ -64,7 +64,7 @@ public class SparkSaveUnresolved implements Serializable { .map( (MapFunction) l -> OBJECT_MAPPER.readValue(l, Result.class), Encoders.bean(Result.class)) - .groupByKey((MapFunction) r -> r.getId(), Encoders.STRING()) + .groupByKey((MapFunction) Result::getId, Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, it) -> { Result ret = it.next(); it.forEachRemaining(r -> ret.mergeFrom(r)); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java index c48ccc8c2..eb9956d80 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java @@ -96,13 +96,15 @@ public class PrepareTest { String doi1 = "unresolved::10.0000/096020199389707::doi"; Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count()); - Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getMeasures().size()); + Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().size()); + Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size()); Assertions .assertEquals( "6.34596412687e-09", tmp .filter(r -> r.getId().equals(doi1)) .collect() .get(0) + .getInstance().get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("influence")) @@ -117,6 +119,8 @@ public class PrepareTest { .filter(r -> r.getId().equals(doi1)) .collect() .get(0) + .getInstance() + .get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("popularity_alt")) @@ -131,6 +135,8 @@ public class PrepareTest { .filter(r -> r.getId().equals(doi1)) .collect() .get(0) + .getInstance() + .get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("popularity")) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index b77b5bb36..4522cbf07 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -126,6 +126,7 @@ public class ProduceTest { .filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi")) .collect() .get(0) + .getInstance().get(0) .getMeasures() .size()); @@ -179,7 +180,8 @@ public class ProduceTest { List measures = tmp .filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi")) - .flatMap(row -> row.getMeasures().iterator()) + .flatMap(row -> row.getInstance().iterator()) + .flatMap(inst -> inst.getMeasures().iterator()) .collect(); Assertions .assertEquals( @@ -226,7 +228,7 @@ public class ProduceTest { 85, tmp .filter(row -> !row.getId().equals("unresolved::10.3390/s18072310::doi")) - .filter(r -> r.getMeasures() != null) + .filter(r -> r.getInstance() != null) .count()); } From 813f856d3ffcb3d2b2d6afd9c18175ab6dd595b8 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 22 Dec 2021 16:11:12 +0100 Subject: [PATCH 2/4] [BipFinder] removing left over parameter in wf --- .../dhp/actionmanager/bipfinder/oozie_app/workflow.xml | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml index 5b97a3d1a..fdf7129d6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml @@ -1,9 +1,5 @@ - - inputPath - the input path of the resources to be extended - bipScorePath @@ -107,8 +103,6 @@ - - - + \ No newline at end of file From 20ef1d657ff93682a376145bd66f0e0dab0ac81e Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 22 Dec 2021 16:26:36 +0100 Subject: [PATCH 3/4] refactoring --- .../createunresolvedentities/PrepareBipFinder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java index ab740a024..8d89d958d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java @@ -11,7 +11,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.oaf.Instance; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -30,6 +29,7 @@ import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Measure; import eu.dnetlib.dhp.schema.oaf.Result; @@ -41,7 +41,7 @@ public class PrepareBipFinder implements Serializable { private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( From 34ac56565d33bfd4781036593bd649c05a5385d8 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 22 Dec 2021 16:28:11 +0100 Subject: [PATCH 4/4] refactoring --- .../bipfinder/oozie_app/workflow.xml | 2 +- .../createunresolvedentities/PrepareTest.java | 15 +++++++++------ .../createunresolvedentities/ProduceTest.java | 5 +++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml index fdf7129d6..a6fba853d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml @@ -103,6 +103,6 @@ - + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java index eb9956d80..125aa60fe 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java @@ -97,14 +97,17 @@ public class PrepareTest { Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count()); Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().size()); - Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size()); + Assertions + .assertEquals( + 3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size()); Assertions .assertEquals( "6.34596412687e-09", tmp .filter(r -> r.getId().equals(doi1)) .collect() .get(0) - .getInstance().get(0) + .getInstance() + .get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("influence")) @@ -119,8 +122,8 @@ public class PrepareTest { .filter(r -> r.getId().equals(doi1)) .collect() .get(0) - .getInstance() - .get(0) + .getInstance() + .get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("popularity_alt")) @@ -135,8 +138,8 @@ public class PrepareTest { .filter(r -> r.getId().equals(doi1)) .collect() .get(0) - .getInstance() - .get(0) + .getInstance() + .get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("popularity")) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index 4522cbf07..36417f614 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -126,7 +126,8 @@ public class ProduceTest { .filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi")) .collect() .get(0) - .getInstance().get(0) + .getInstance() + .get(0) .getMeasures() .size()); @@ -181,7 +182,7 @@ public class ProduceTest { List measures = tmp .filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi")) .flatMap(row -> row.getInstance().iterator()) - .flatMap(inst -> inst.getMeasures().iterator()) + .flatMap(inst -> inst.getMeasures().iterator()) .collect(); Assertions .assertEquals(