Merge pull request 'bipFinder: unresolved entities' (#174) from bipFinder into beta
Reviewed-on: #174
This commit is contained in:
commit
b352fbe453
|
@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.UPDATE_CLASS_NAME;
|
|||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -28,6 +29,7 @@ import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore;
|
|||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Measure;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
@ -39,7 +41,7 @@ public class PrepareBipFinder implements Serializable {
|
|||
private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class);
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static <I extends Result> void main(String[] args) throws Exception {
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
|
@ -75,7 +77,7 @@ public class PrepareBipFinder implements Serializable {
|
|||
});
|
||||
}
|
||||
|
||||
private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath) {
|
||||
private static void prepareResults(SparkSession spark, String inputPath, String outputPath) {
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
|
@ -95,7 +97,9 @@ public class PrepareBipFinder implements Serializable {
|
|||
Result r = new Result();
|
||||
|
||||
r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI));
|
||||
r.setMeasures(getMeasure(v));
|
||||
Instance inst = new Instance();
|
||||
inst.setMeasures(getMeasure(v));
|
||||
r.setInstance(Arrays.asList(inst));
|
||||
return r;
|
||||
}, Encoders.bean(Result.class))
|
||||
.write()
|
||||
|
|
|
@ -20,7 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class SparkSaveUnresolved implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkSaveUnresolved.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
|
@ -64,7 +64,7 @@ public class SparkSaveUnresolved implements Serializable {
|
|||
.map(
|
||||
(MapFunction<String, Result>) l -> OBJECT_MAPPER.readValue(l, Result.class),
|
||||
Encoders.bean(Result.class))
|
||||
.groupByKey((MapFunction<Result, String>) r -> r.getId(), Encoders.STRING())
|
||||
.groupByKey((MapFunction<Result, String>) Result::getId, Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, Result, Result>) (k, it) -> {
|
||||
Result ret = it.next();
|
||||
it.forEachRemaining(r -> ret.mergeFrom(r));
|
||||
|
|
|
@ -1,9 +1,5 @@
|
|||
<workflow-app name="BipFinderScore" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>inputPath</name>
|
||||
<description>the input path of the resources to be extended</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>bipScorePath</name>
|
||||
|
@ -108,7 +104,5 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -96,13 +96,18 @@ public class PrepareTest {
|
|||
String doi1 = "unresolved::10.0000/096020199389707::doi";
|
||||
|
||||
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count());
|
||||
Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getMeasures().size());
|
||||
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().size());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"6.34596412687e-09", tmp
|
||||
.filter(r -> r.getId().equals(doi1))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getInstance()
|
||||
.get(0)
|
||||
.getMeasures()
|
||||
.stream()
|
||||
.filter(sl -> sl.getId().equals("influence"))
|
||||
|
@ -117,6 +122,8 @@ public class PrepareTest {
|
|||
.filter(r -> r.getId().equals(doi1))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getInstance()
|
||||
.get(0)
|
||||
.getMeasures()
|
||||
.stream()
|
||||
.filter(sl -> sl.getId().equals("popularity_alt"))
|
||||
|
@ -131,6 +138,8 @@ public class PrepareTest {
|
|||
.filter(r -> r.getId().equals(doi1))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getInstance()
|
||||
.get(0)
|
||||
.getMeasures()
|
||||
.stream()
|
||||
.filter(sl -> sl.getId().equals("popularity"))
|
||||
|
|
|
@ -126,6 +126,8 @@ public class ProduceTest {
|
|||
.filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getInstance()
|
||||
.get(0)
|
||||
.getMeasures()
|
||||
.size());
|
||||
|
||||
|
@ -179,7 +181,8 @@ public class ProduceTest {
|
|||
|
||||
List<Measure> measures = tmp
|
||||
.filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi"))
|
||||
.flatMap(row -> row.getMeasures().iterator())
|
||||
.flatMap(row -> row.getInstance().iterator())
|
||||
.flatMap(inst -> inst.getMeasures().iterator())
|
||||
.collect();
|
||||
Assertions
|
||||
.assertEquals(
|
||||
|
@ -226,7 +229,7 @@ public class ProduceTest {
|
|||
85,
|
||||
tmp
|
||||
.filter(row -> !row.getId().equals("unresolved::10.3390/s18072310::doi"))
|
||||
.filter(r -> r.getMeasures() != null)
|
||||
.filter(r -> r.getInstance() != null)
|
||||
.count());
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue