[Create Unresolved Entities] Moving Measure at the level of the Instance #160
|
@ -6,12 +6,13 @@ import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.UP
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
@ -29,6 +30,7 @@ import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.BipScore;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Measure;
|
import eu.dnetlib.dhp.schema.oaf.Measure;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
@ -40,7 +42,7 @@ public class PrepareBipFinder implements Serializable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class);
|
private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class);
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
public static <I extends Result> void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
|
@ -76,7 +78,7 @@ public class PrepareBipFinder implements Serializable {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath) {
|
private static void prepareResults(SparkSession spark, String inputPath, String outputPath) {
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
@ -93,9 +95,10 @@ public class PrepareBipFinder implements Serializable {
|
||||||
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class))
|
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class))
|
||||||
.map((MapFunction<BipScore, Result>) v -> {
|
.map((MapFunction<BipScore, Result>) v -> {
|
||||||
Result r = new Result();
|
Result r = new Result();
|
||||||
|
|
||||||
r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI));
|
r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI));
|
||||||
r.setMeasures(getMeasure(v));
|
Instance inst = new Instance();
|
||||||
|
inst.setMeasures(getMeasure(v));
|
||||||
|
r.setInstance(Arrays.asList(inst));
|
||||||
return r;
|
return r;
|
||||||
}, Encoders.bean(Result.class))
|
}, Encoders.bean(Result.class))
|
||||||
.write()
|
.write()
|
||||||
|
|
|
@ -55,13 +55,13 @@ public class PrepareFOSSparkJob implements Serializable {
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark ->
|
||||||
distributeFOSdois(
|
distributeFOSdois(
|
||||||
spark,
|
spark,
|
||||||
sourcePath,
|
sourcePath,
|
||||||
|
|
||||||
outputPath);
|
outputPath)
|
||||||
});
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void distributeFOSdois(SparkSession spark, String sourcePath, String outputPath) {
|
private static void distributeFOSdois(SparkSession spark, String sourcePath, String outputPath) {
|
||||||
|
|
|
@ -20,13 +20,13 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
|
||||||
public class SparkSaveUnresolved implements Serializable {
|
public class SparkSaveUnresolved implements Serializable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkSaveUnresolved.class);
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
PrepareFOSSparkJob.class
|
SparkSaveUnresolved.class
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/produce_unresolved_parameters.json"));
|
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/produce_unresolved_parameters.json"));
|
||||||
|
|
||||||
|
@ -47,13 +47,13 @@ public class SparkSaveUnresolved implements Serializable {
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark ->
|
||||||
saveUnresolved(
|
saveUnresolved(
|
||||||
spark,
|
spark,
|
||||||
sourcePath,
|
sourcePath,
|
||||||
|
|
||||||
outputPath);
|
outputPath)
|
||||||
});
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void saveUnresolved(SparkSession spark, String sourcePath, String outputPath) {
|
private static void saveUnresolved(SparkSession spark, String sourcePath, String outputPath) {
|
||||||
|
@ -64,7 +64,7 @@ public class SparkSaveUnresolved implements Serializable {
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, Result>) l -> OBJECT_MAPPER.readValue(l, Result.class),
|
(MapFunction<String, Result>) l -> OBJECT_MAPPER.readValue(l, Result.class),
|
||||||
Encoders.bean(Result.class))
|
Encoders.bean(Result.class))
|
||||||
.groupByKey((MapFunction<Result, String>) r -> r.getId(), Encoders.STRING())
|
.groupByKey((MapFunction<Result,String>)Result::getId, Encoders.STRING())
|
||||||
.mapGroups((MapGroupsFunction<String, Result, Result>) (k, it) -> {
|
.mapGroups((MapGroupsFunction<String, Result, Result>) (k, it) -> {
|
||||||
Result ret = it.next();
|
Result ret = it.next();
|
||||||
it.forEachRemaining(r -> ret.mergeFrom(r));
|
it.forEachRemaining(r -> ret.mergeFrom(r));
|
||||||
|
|
|
@ -96,13 +96,16 @@ public class PrepareTest {
|
||||||
String doi1 = "unresolved::10.0000/096020199389707::doi";
|
String doi1 = "unresolved::10.0000/096020199389707::doi";
|
||||||
|
|
||||||
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count());
|
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count());
|
||||||
Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getMeasures().size());
|
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().size());
|
||||||
|
Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size());
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
"6.34596412687e-09", tmp
|
"6.34596412687e-09", tmp
|
||||||
.filter(r -> r.getId().equals(doi1))
|
.filter(r -> r.getId().equals(doi1))
|
||||||
.collect()
|
.collect()
|
||||||
.get(0)
|
.get(0)
|
||||||
|
.getInstance()
|
||||||
|
.get(0)
|
||||||
.getMeasures()
|
.getMeasures()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(sl -> sl.getId().equals("influence"))
|
.filter(sl -> sl.getId().equals("influence"))
|
||||||
|
@ -117,6 +120,8 @@ public class PrepareTest {
|
||||||
.filter(r -> r.getId().equals(doi1))
|
.filter(r -> r.getId().equals(doi1))
|
||||||
.collect()
|
.collect()
|
||||||
.get(0)
|
.get(0)
|
||||||
|
.getInstance()
|
||||||
|
.get(0)
|
||||||
.getMeasures()
|
.getMeasures()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(sl -> sl.getId().equals("popularity_alt"))
|
.filter(sl -> sl.getId().equals("popularity_alt"))
|
||||||
|
@ -131,6 +136,8 @@ public class PrepareTest {
|
||||||
.filter(r -> r.getId().equals(doi1))
|
.filter(r -> r.getId().equals(doi1))
|
||||||
.collect()
|
.collect()
|
||||||
.get(0)
|
.get(0)
|
||||||
|
.getInstance()
|
||||||
|
.get(0)
|
||||||
.getMeasures()
|
.getMeasures()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(sl -> sl.getId().equals("popularity"))
|
.filter(sl -> sl.getId().equals("popularity"))
|
||||||
|
|
|
@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
|
||||||
public class ProduceTest {
|
class ProduceTest {
|
||||||
private static final Logger log = LoggerFactory.getLogger(ProduceTest.class);
|
private static final Logger log = LoggerFactory.getLogger(ProduceTest.class);
|
||||||
|
|
||||||
private static Path workingDir;
|
private static Path workingDir;
|
||||||
|
@ -126,6 +126,8 @@ public class ProduceTest {
|
||||||
.filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi"))
|
.filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi"))
|
||||||
.collect()
|
.collect()
|
||||||
.get(0)
|
.get(0)
|
||||||
|
.getInstance()
|
||||||
|
.get(0)
|
||||||
.getMeasures()
|
.getMeasures()
|
||||||
.size());
|
.size());
|
||||||
|
|
||||||
|
@ -179,7 +181,8 @@ public class ProduceTest {
|
||||||
|
|
||||||
List<Measure> measures = tmp
|
List<Measure> measures = tmp
|
||||||
.filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi"))
|
.filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi"))
|
||||||
.flatMap(row -> row.getMeasures().iterator())
|
.flatMap(row -> row.getInstance().iterator())
|
||||||
|
.flatMap(inst -> inst.getMeasures().iterator())
|
||||||
.collect();
|
.collect();
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
|
@ -226,7 +229,7 @@ public class ProduceTest {
|
||||||
85,
|
85,
|
||||||
tmp
|
tmp
|
||||||
.filter(row -> !row.getId().equals("unresolved::10.3390/s18072310::doi"))
|
.filter(row -> !row.getId().equals("unresolved::10.3390/s18072310::doi"))
|
||||||
.filter(r -> r.getMeasures() != null)
|
.filter(r -> r.getInstance() != null)
|
||||||
.count());
|
.count());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,15 +43,6 @@ public class ResultMapper implements Serializable {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
addTypeSpecificInformation(out, input, ort);
|
addTypeSpecificInformation(out, input, ort);
|
||||||
Optional<List<Measure>> mes = Optional.ofNullable(input.getMeasures());
|
|
||||||
if (mes.isPresent()) {
|
|
||||||
List<KeyValue> measure = new ArrayList<>();
|
|
||||||
mes
|
|
||||||
.get()
|
|
||||||
.forEach(
|
|
||||||
m -> m.getUnit().forEach(u -> measure.add(KeyValue.newInstance(m.getId(), u.getValue()))));
|
|
||||||
out.setMeasures(measure);
|
|
||||||
}
|
|
||||||
|
|
||||||
Optional
|
Optional
|
||||||
.ofNullable(input.getAuthor())
|
.ofNullable(input.getAuthor())
|
||||||
|
|
|
@ -171,24 +171,6 @@ public class DumpJobTest {
|
||||||
|
|
||||||
GraphResult gr = verificationDataset.first();
|
GraphResult gr = verificationDataset.first();
|
||||||
|
|
||||||
Assertions.assertEquals(2, gr.getMeasures().size());
|
|
||||||
Assertions
|
|
||||||
.assertTrue(
|
|
||||||
gr
|
|
||||||
.getMeasures()
|
|
||||||
.stream()
|
|
||||||
.anyMatch(
|
|
||||||
m -> m.getKey().equals("influence")
|
|
||||||
&& m.getValue().equals("1.62759106106e-08")));
|
|
||||||
Assertions
|
|
||||||
.assertTrue(
|
|
||||||
gr
|
|
||||||
.getMeasures()
|
|
||||||
.stream()
|
|
||||||
.anyMatch(
|
|
||||||
m -> m.getKey().equals("popularity")
|
|
||||||
&& m.getValue().equals("0.22519296")));
|
|
||||||
|
|
||||||
Assertions.assertEquals(6, gr.getAuthor().size());
|
Assertions.assertEquals(6, gr.getAuthor().size());
|
||||||
Assertions
|
Assertions
|
||||||
.assertTrue(
|
.assertTrue(
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -753,7 +753,7 @@
|
||||||
<mockito-core.version>3.3.3</mockito-core.version>
|
<mockito-core.version>3.3.3</mockito-core.version>
|
||||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||||
<vtd.version>[2.12,3.0)</vtd.version>
|
<vtd.version>[2.12,3.0)</vtd.version>
|
||||||
<dhp-schemas.version>[2.8.22]</dhp-schemas.version>
|
<dhp-schemas.version>[2.9.23]</dhp-schemas.version>
|
||||||
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
||||||
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
||||||
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
||||||
|
|
Loading…
Reference in New Issue