[Create Unresolved Entities] Moving Measure at the level of the Instance #160

Closed
miriam.baglioni wants to merge 4 commits from bypass_acstionset into beta
8 changed files with 32 additions and 46 deletions

View File

@ -6,12 +6,13 @@ import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.UP
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdfs.client.HdfsUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@ -29,6 +30,7 @@ import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.BipScore;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Measure; import eu.dnetlib.dhp.schema.oaf.Measure;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
@ -40,7 +42,7 @@ public class PrepareBipFinder implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class); private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <I extends Result> void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
@ -76,7 +78,7 @@ public class PrepareBipFinder implements Serializable {
}); });
} }
private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath) { private static void prepareResults(SparkSession spark, String inputPath, String outputPath) {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -93,9 +95,10 @@ public class PrepareBipFinder implements Serializable {
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class)) }).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class))
.map((MapFunction<BipScore, Result>) v -> { .map((MapFunction<BipScore, Result>) v -> {
Result r = new Result(); Result r = new Result();
r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI)); r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI));
r.setMeasures(getMeasure(v)); Instance inst = new Instance();
inst.setMeasures(getMeasure(v));
r.setInstance(Arrays.asList(inst));
return r; return r;
}, Encoders.bean(Result.class)) }, Encoders.bean(Result.class))
.write() .write()

View File

@ -55,13 +55,13 @@ public class PrepareFOSSparkJob implements Serializable {
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark ->
distributeFOSdois( distributeFOSdois(
spark, spark,
sourcePath, sourcePath,
outputPath); outputPath)
}); );
} }
private static void distributeFOSdois(SparkSession spark, String sourcePath, String outputPath) { private static void distributeFOSdois(SparkSession spark, String sourcePath, String outputPath) {

View File

@ -20,13 +20,13 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
public class SparkSaveUnresolved implements Serializable { public class SparkSaveUnresolved implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class); private static final Logger log = LoggerFactory.getLogger(SparkSaveUnresolved.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
PrepareFOSSparkJob.class SparkSaveUnresolved.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/produce_unresolved_parameters.json")); "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/produce_unresolved_parameters.json"));
@ -47,13 +47,13 @@ public class SparkSaveUnresolved implements Serializable {
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark ->
saveUnresolved( saveUnresolved(
spark, spark,
sourcePath, sourcePath,
outputPath); outputPath)
}); );
} }
private static void saveUnresolved(SparkSession spark, String sourcePath, String outputPath) { private static void saveUnresolved(SparkSession spark, String sourcePath, String outputPath) {
@ -64,7 +64,7 @@ public class SparkSaveUnresolved implements Serializable {
.map( .map(
(MapFunction<String, Result>) l -> OBJECT_MAPPER.readValue(l, Result.class), (MapFunction<String, Result>) l -> OBJECT_MAPPER.readValue(l, Result.class),
Encoders.bean(Result.class)) Encoders.bean(Result.class))
.groupByKey((MapFunction<Result, String>) r -> r.getId(), Encoders.STRING()) .groupByKey((MapFunction<Result,String>)Result::getId, Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Result, Result>) (k, it) -> { .mapGroups((MapGroupsFunction<String, Result, Result>) (k, it) -> {
Result ret = it.next(); Result ret = it.next();
it.forEachRemaining(r -> ret.mergeFrom(r)); it.forEachRemaining(r -> ret.mergeFrom(r));

View File

@ -96,13 +96,16 @@ public class PrepareTest {
String doi1 = "unresolved::10.0000/096020199389707::doi"; String doi1 = "unresolved::10.0000/096020199389707::doi";
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count()); Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count());
Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getMeasures().size()); Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().size());
Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size());
Assertions Assertions
.assertEquals( .assertEquals(
"6.34596412687e-09", tmp "6.34596412687e-09", tmp
.filter(r -> r.getId().equals(doi1)) .filter(r -> r.getId().equals(doi1))
.collect() .collect()
.get(0) .get(0)
.getInstance()
.get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(sl -> sl.getId().equals("influence")) .filter(sl -> sl.getId().equals("influence"))
@ -117,6 +120,8 @@ public class PrepareTest {
.filter(r -> r.getId().equals(doi1)) .filter(r -> r.getId().equals(doi1))
.collect() .collect()
.get(0) .get(0)
.getInstance()
.get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(sl -> sl.getId().equals("popularity_alt")) .filter(sl -> sl.getId().equals("popularity_alt"))
@ -131,6 +136,8 @@ public class PrepareTest {
.filter(r -> r.getId().equals(doi1)) .filter(r -> r.getId().equals(doi1))
.collect() .collect()
.get(0) .get(0)
.getInstance()
.get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(sl -> sl.getId().equals("popularity")) .filter(sl -> sl.getId().equals("popularity"))

View File

@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
public class ProduceTest { class ProduceTest {
private static final Logger log = LoggerFactory.getLogger(ProduceTest.class); private static final Logger log = LoggerFactory.getLogger(ProduceTest.class);
private static Path workingDir; private static Path workingDir;
@ -126,6 +126,8 @@ public class ProduceTest {
.filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi")) .filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi"))
.collect() .collect()
.get(0) .get(0)
.getInstance()
.get(0)
.getMeasures() .getMeasures()
.size()); .size());
@ -179,7 +181,8 @@ public class ProduceTest {
List<Measure> measures = tmp List<Measure> measures = tmp
.filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi")) .filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi"))
.flatMap(row -> row.getMeasures().iterator()) .flatMap(row -> row.getInstance().iterator())
.flatMap(inst -> inst.getMeasures().iterator())
.collect(); .collect();
Assertions Assertions
.assertEquals( .assertEquals(
@ -226,7 +229,7 @@ public class ProduceTest {
85, 85,
tmp tmp
.filter(row -> !row.getId().equals("unresolved::10.3390/s18072310::doi")) .filter(row -> !row.getId().equals("unresolved::10.3390/s18072310::doi"))
.filter(r -> r.getMeasures() != null) .filter(r -> r.getInstance() != null)
.count()); .count());
} }

View File

@ -43,15 +43,6 @@ public class ResultMapper implements Serializable {
try { try {
addTypeSpecificInformation(out, input, ort); addTypeSpecificInformation(out, input, ort);
Optional<List<Measure>> mes = Optional.ofNullable(input.getMeasures());
if (mes.isPresent()) {
List<KeyValue> measure = new ArrayList<>();
mes
.get()
.forEach(
m -> m.getUnit().forEach(u -> measure.add(KeyValue.newInstance(m.getId(), u.getValue()))));
out.setMeasures(measure);
}
Optional Optional
.ofNullable(input.getAuthor()) .ofNullable(input.getAuthor())

View File

@ -171,24 +171,6 @@ public class DumpJobTest {
GraphResult gr = verificationDataset.first(); GraphResult gr = verificationDataset.first();
Assertions.assertEquals(2, gr.getMeasures().size());
Assertions
.assertTrue(
gr
.getMeasures()
.stream()
.anyMatch(
m -> m.getKey().equals("influence")
&& m.getValue().equals("1.62759106106e-08")));
Assertions
.assertTrue(
gr
.getMeasures()
.stream()
.anyMatch(
m -> m.getKey().equals("popularity")
&& m.getValue().equals("0.22519296")));
Assertions.assertEquals(6, gr.getAuthor().size()); Assertions.assertEquals(6, gr.getAuthor().size());
Assertions Assertions
.assertTrue( .assertTrue(

View File

@ -753,7 +753,7 @@
<mockito-core.version>3.3.3</mockito-core.version> <mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version> <mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version> <vtd.version>[2.12,3.0)</vtd.version>
<dhp-schemas.version>[2.8.22]</dhp-schemas.version> <dhp-schemas.version>[2.9.23]</dhp-schemas.version>
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version> <dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version> <dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version> <dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>