[BipFinder] create unresolved entities with measures at the level of the instance

This commit is contained in:
Miriam Baglioni 2021-12-22 16:03:41 +01:00
parent 0807fdb65a
commit 2c126ed014
4 changed files with 20 additions and 8 deletions

View File

@ -6,10 +6,12 @@ import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.UPDATE_CLASS_NAME;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.Instance;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -39,7 +41,7 @@ public class PrepareBipFinder implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <I extends Result> void main(String[] args) throws Exception {
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
@ -75,7 +77,7 @@ public class PrepareBipFinder implements Serializable {
});
}
private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath) {
private static void prepareResults(SparkSession spark, String inputPath, String outputPath) {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -95,7 +97,9 @@ public class PrepareBipFinder implements Serializable {
Result r = new Result();
r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI));
r.setMeasures(getMeasure(v));
Instance inst = new Instance();
inst.setMeasures(getMeasure(v));
r.setInstance(Arrays.asList(inst));
return r;
}, Encoders.bean(Result.class))
.write()

View File

@ -20,7 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Result;
public class SparkSaveUnresolved implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class);
private static final Logger log = LoggerFactory.getLogger(SparkSaveUnresolved.class);
public static void main(String[] args) throws Exception {
@ -64,7 +64,7 @@ public class SparkSaveUnresolved implements Serializable {
.map(
(MapFunction<String, Result>) l -> OBJECT_MAPPER.readValue(l, Result.class),
Encoders.bean(Result.class))
.groupByKey((MapFunction<Result, String>) r -> r.getId(), Encoders.STRING())
.groupByKey((MapFunction<Result, String>) Result::getId, Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Result, Result>) (k, it) -> {
Result ret = it.next();
it.forEachRemaining(r -> ret.mergeFrom(r));

View File

@ -96,13 +96,15 @@ public class PrepareTest {
String doi1 = "unresolved::10.0000/096020199389707::doi";
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count());
Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getMeasures().size());
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().size());
Assertions.assertEquals(3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size());
Assertions
.assertEquals(
"6.34596412687e-09", tmp
.filter(r -> r.getId().equals(doi1))
.collect()
.get(0)
.getInstance().get(0)
.getMeasures()
.stream()
.filter(sl -> sl.getId().equals("influence"))
@ -117,6 +119,8 @@ public class PrepareTest {
.filter(r -> r.getId().equals(doi1))
.collect()
.get(0)
.getInstance()
.get(0)
.getMeasures()
.stream()
.filter(sl -> sl.getId().equals("popularity_alt"))
@ -131,6 +135,8 @@ public class PrepareTest {
.filter(r -> r.getId().equals(doi1))
.collect()
.get(0)
.getInstance()
.get(0)
.getMeasures()
.stream()
.filter(sl -> sl.getId().equals("popularity"))

View File

@ -126,6 +126,7 @@ public class ProduceTest {
.filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi"))
.collect()
.get(0)
.getInstance().get(0)
.getMeasures()
.size());
@ -179,7 +180,8 @@ public class ProduceTest {
List<Measure> measures = tmp
.filter(row -> row.getId().equals("unresolved::10.3390/s18072310::doi"))
.flatMap(row -> row.getMeasures().iterator())
.flatMap(row -> row.getInstance().iterator())
.flatMap(inst -> inst.getMeasures().iterator())
.collect();
Assertions
.assertEquals(
@ -226,7 +228,7 @@ public class ProduceTest {
85,
tmp
.filter(row -> !row.getId().equals("unresolved::10.3390/s18072310::doi"))
.filter(r -> r.getMeasures() != null)
.filter(r -> r.getInstance() != null)
.count());
}