[Create Unresolved Entities] Moving Measure at the level of the Instance #160
|
@ -12,7 +12,7 @@ import java.util.Optional;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.hdfs.client.HdfsUtils;
|
||||
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
@ -42,7 +42,7 @@ public class PrepareBipFinder implements Serializable {
|
|||
private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class);
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static <I extends Result> void main(String[] args) throws Exception {
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
|
@ -78,7 +78,7 @@ public class PrepareBipFinder implements Serializable {
|
|||
});
|
||||
}
|
||||
|
||||
private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath) {
|
||||
private static void prepareResults(SparkSession spark, String inputPath, String outputPath) {
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
|
|
|
@ -55,13 +55,13 @@ public class PrepareFOSSparkJob implements Serializable {
|
|||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
spark ->
|
||||
distributeFOSdois(
|
||||
spark,
|
||||
sourcePath,
|
||||
|
||||
outputPath);
|
||||
});
|
||||
outputPath)
|
||||
);
|
||||
}
|
||||
|
||||
private static void distributeFOSdois(SparkSession spark, String sourcePath, String outputPath) {
|
||||
|
|
|
@ -20,13 +20,13 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class SparkSaveUnresolved implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkSaveUnresolved.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareFOSSparkJob.class
|
||||
SparkSaveUnresolved.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/produce_unresolved_parameters.json"));
|
||||
|
||||
|
@ -47,13 +47,13 @@ public class SparkSaveUnresolved implements Serializable {
|
|||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
spark ->
|
||||
saveUnresolved(
|
||||
spark,
|
||||
sourcePath,
|
||||
|
||||
outputPath);
|
||||
});
|
||||
outputPath)
|
||||
);
|
||||
}
|
||||
|
||||
private static void saveUnresolved(SparkSession spark, String sourcePath, String outputPath) {
|
||||
|
@ -64,7 +64,7 @@ public class SparkSaveUnresolved implements Serializable {
|
|||
.map(
|
||||
(MapFunction<String, Result>) l -> OBJECT_MAPPER.readValue(l, Result.class),
|
||||
Encoders.bean(Result.class))
|
||||
.groupByKey((MapFunction<Result, String>) r -> r.getId(), Encoders.STRING())
|
||||
.groupByKey((MapFunction<Result,String>)Result::getId, Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, Result, Result>) (k, it) -> {
|
||||
Result ret = it.next();
|
||||
it.forEachRemaining(r -> ret.mergeFrom(r));
|
||||
|
|
|
@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
public class ProduceTest {
|
||||
class ProduceTest {
|
||||
private static final Logger log = LoggerFactory.getLogger(ProduceTest.class);
|
||||
|
||||
private static Path workingDir;
|
||||
|
|
Loading…
Reference in New Issue