diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java index 3d68db27b..8daf7bc6a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java @@ -6,12 +6,13 @@ import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.UP import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hdfs.client.HdfsUtils; + import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -29,6 +30,7 @@ import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.BipScore; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Measure; import eu.dnetlib.dhp.schema.oaf.Result; @@ -40,7 +42,7 @@ public class PrepareBipFinder implements Serializable { private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( @@ -76,7 +78,7 @@ public class PrepareBipFinder implements Serializable { }); } - private static void prepareResults(SparkSession spark, String inputPath, String outputPath) { + private static void prepareResults(SparkSession spark, String inputPath, String outputPath) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -93,9 +95,10 @@ public class PrepareBipFinder implements Serializable { }).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class)) .map((MapFunction) v -> { Result r = new Result(); - r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI)); - r.setMeasures(getMeasure(v)); + Instance inst = new Instance(); + inst.setMeasures(getMeasure(v)); + r.setInstance(Arrays.asList(inst)); return r; }, Encoders.bean(Result.class)) .write() diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java index 5ae2f8c88..ba36a2a1f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java @@ -55,13 +55,13 @@ public class PrepareFOSSparkJob implements Serializable { runWithSparkSession( conf, isSparkSessionManaged, - spark -> { + spark -> distributeFOSdois( spark, sourcePath, - outputPath); - }); + outputPath) + ); } private static void distributeFOSdois(SparkSession spark, String sourcePath, String outputPath) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java index 62b813602..719aa9deb 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java @@ -20,13 +20,13 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkSaveUnresolved implements Serializable { - private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(SparkSaveUnresolved.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - PrepareFOSSparkJob.class + SparkSaveUnresolved.class .getResourceAsStream( "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/produce_unresolved_parameters.json")); @@ -47,13 +47,13 @@ public class SparkSaveUnresolved implements Serializable { runWithSparkSession( conf, isSparkSessionManaged, - spark -> { + spark -> saveUnresolved( spark, sourcePath, - outputPath); - }); + outputPath) + ); } private static void saveUnresolved(SparkSession spark, String sourcePath, String outputPath) { @@ -64,7 +64,7 @@ public class SparkSaveUnresolved implements Serializable { .map( (MapFunction) l -> OBJECT_MAPPER.readValue(l, Result.class), Encoders.bean(Result.class)) - .groupByKey((MapFunction) r -> r.getId(), Encoders.STRING()) + .groupByKey((MapFunction)Result::getId, Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, it) -> { Result ret = it.next(); it.forEachRemaining(r -> ret.mergeFrom(r)); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java index c48ccc8c2..df3be7c57 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java @@ -96,13 +96,16 @@ public class PrepareTest { String doi1 = "unresolved::10.0000/096020199389707::doi"; Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count()); - Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getMeasures().size()); + Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().size()); + Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size()); Assertions .assertEquals( "6.34596412687e-09", tmp .filter(r -> r.getId().equals(doi1)) .collect() .get(0) + .getInstance() + .get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("influence")) @@ -117,6 +120,8 @@ public class PrepareTest { .filter(r -> r.getId().equals(doi1)) .collect() .get(0) + .getInstance() + .get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("popularity_alt")) @@ -131,6 +136,8 @@ public class PrepareTest { .filter(r -> r.getId().equals(doi1)) .collect() .get(0) + .getInstance() + .get(0) .getMeasures() .stream() .filter(sl -> sl.getId().equals("popularity")) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index b77b5bb36..19726aaf2 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; -public class ProduceTest { +class ProduceTest { private static final Logger log = LoggerFactory.getLogger(ProduceTest.class); private static Path workingDir; @@ -126,6 +126,8 @@ public class ProduceTest { .filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi")) .collect() .get(0) + .getInstance() + .get(0) .getMeasures() .size()); @@ -179,7 +181,8 @@ public class ProduceTest { List measures = tmp .filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi")) - .flatMap(row -> row.getMeasures().iterator()) + .flatMap(row -> row.getInstance().iterator()) + .flatMap(inst -> inst.getMeasures().iterator()) .collect(); Assertions .assertEquals( @@ -226,7 +229,7 @@ public class ProduceTest { 85, tmp .filter(row -> !row.getId().equals("unresolved::10.3390/s18072310::doi")) - .filter(r -> r.getMeasures() != null) + .filter(r -> r.getInstance() != null) .count()); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java index 49468540d..1beea5cb4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java @@ -43,15 +43,6 @@ public class ResultMapper implements Serializable { try { addTypeSpecificInformation(out, input, ort); - Optional> mes = Optional.ofNullable(input.getMeasures()); - if (mes.isPresent()) { - List measure = new ArrayList<>(); - mes - .get() - .forEach( - m -> m.getUnit().forEach(u -> measure.add(KeyValue.newInstance(m.getId(), u.getValue())))); - out.setMeasures(measure); - } Optional .ofNullable(input.getAuthor()) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java index 602aaf6e6..07f663896 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java @@ -171,24 +171,6 @@ public class DumpJobTest { GraphResult gr = verificationDataset.first(); - Assertions.assertEquals(2, gr.getMeasures().size()); - Assertions - .assertTrue( - gr - .getMeasures() - .stream() - .anyMatch( - m -> m.getKey().equals("influence") - && m.getValue().equals("1.62759106106e-08"))); - Assertions - .assertTrue( - gr - .getMeasures() - .stream() - .anyMatch( - m -> m.getKey().equals("popularity") - && m.getValue().equals("0.22519296"))); - Assertions.assertEquals(6, gr.getAuthor().size()); Assertions .assertTrue( diff --git a/pom.xml b/pom.xml index 71c55d1f0..3acbdee9e 100644 --- a/pom.xml +++ b/pom.xml @@ -753,7 +753,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [2.8.22] + [2.9.23] [4.0.3] [6.0.5] [3.1.6]