diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java index c7049430e..ab740a024 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java @@ -6,10 +6,12 @@ import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.UPDATE_CLASS_NAME; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.oaf.Instance; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -39,7 +41,7 @@ public class PrepareBipFinder implements Serializable { private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( @@ -75,7 +77,7 @@ public class PrepareBipFinder implements Serializable { }); } - private static void prepareResults(SparkSession spark, String inputPath, String outputPath) { + private static void prepareResults(SparkSession spark, String inputPath, String outputPath) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -95,7 +97,9 @@ public class PrepareBipFinder implements Serializable { Result r = new Result(); r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI)); - r.setMeasures(getMeasure(v)); + Instance inst = new Instance(); + inst.setMeasures(getMeasure(v)); + r.setInstance(Arrays.asList(inst)); return r; }, Encoders.bean(Result.class)) .write() diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java index 918f6a417..1a115e18f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java @@ -20,7 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkSaveUnresolved implements Serializable { - private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(SparkSaveUnresolved.class); public static void main(String[] args) throws Exception { @@ -64,7 +64,7 @@ public class SparkSaveUnresolved implements Serializable { .map( (MapFunction) l -> OBJECT_MAPPER.readValue(l, Result.class), Encoders.bean(Result.class)) - .groupByKey((MapFunction) r -> r.getId(), Encoders.STRING()) + .groupByKey((MapFunction) Result::getId, Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, it) -> { Result ret = it.next(); it.forEachRemaining(r -> ret.mergeFrom(r)); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java index c48ccc8c2..eb9956d80 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java @@ -96,13 +96,15 @@ public class PrepareTest { String doi1 = "unresolved::10.0000/096020199389707::doi"; Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count()); - Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getMeasures().size()); + Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().size()); + Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size()); Assertions .assertEquals( "6.34596412687e-09", tmp .filter(r -> r.getId().equals(doi1)) .collect() .get(0) + .getInstance().get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("influence")) @@ -117,6 +119,8 @@ public class PrepareTest { .filter(r -> r.getId().equals(doi1)) .collect() .get(0) + .getInstance() + .get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("popularity_alt")) @@ -131,6 +135,8 @@ public class PrepareTest { .filter(r -> r.getId().equals(doi1)) .collect() .get(0) + .getInstance() + .get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("popularity")) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index b77b5bb36..4522cbf07 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -126,6 +126,7 @@ public class ProduceTest { .filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi")) .collect() .get(0) + .getInstance().get(0) .getMeasures() .size()); @@ -179,7 +180,8 @@ public class ProduceTest { List measures = tmp .filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi")) - .flatMap(row -> row.getMeasures().iterator()) + .flatMap(row -> row.getInstance().iterator()) + .flatMap(inst -> inst.getMeasures().iterator()) .collect(); Assertions .assertEquals( @@ -226,7 +228,7 @@ public class ProduceTest { 85, tmp .filter(row -> !row.getId().equals("unresolved::10.3390/s18072310::doi")) - .filter(r -> r.getMeasures() != null) + .filter(r -> r.getInstance() != null) .count()); }