integrated filter applied when merging BETA & PROD graphs to rule our records from Datacite

This commit is contained in:
Claudio Atzori 2021-03-19 11:34:44 +01:00
parent 3256b9c836
commit a4e82a65aa
1 changed files with 60 additions and 52 deletions

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.graph.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
import javax.xml.crypto.Data; import javax.xml.crypto.Data;
@ -52,22 +53,22 @@ public class MergeGraphTableSparkJob {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
CleanGraphSparkJob.class CleanGraphSparkJob.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json")); "/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
String priority = Optional String priority = Optional
.ofNullable(parser.get("priority")) .ofNullable(parser.get("priority"))
.orElse(PRIORITY_DEFAULT); .orElse(PRIORITY_DEFAULT);
log.info("priority: {}", priority); log.info("priority: {}", priority);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String betaInputPath = parser.get("betaInputPath"); String betaInputPath = parser.get("betaInputPath");
@ -89,48 +90,55 @@ public class MergeGraphTableSparkJob {
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath); mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
}); });
} }
private static <P extends Oaf, B extends Oaf> void mergeGraphTable( private static <P extends Oaf, B extends Oaf> void mergeGraphTable(
SparkSession spark, SparkSession spark,
String priority, String priority,
String betaInputPath, String betaInputPath,
String prodInputPath, String prodInputPath,
Class<P> p_clazz, Class<P> p_clazz,
Class<B> b_clazz, Class<B> b_clazz,
String outputPath) { String outputPath) {
Dataset<Tuple2<String, B>> beta = readTableFromPath(spark, betaInputPath, b_clazz); Dataset<Tuple2<String, B>> beta = readTableFromPath(spark, betaInputPath, b_clazz);
Dataset<Tuple2<String, P>> prod = readTableFromPath(spark, prodInputPath, p_clazz); Dataset<Tuple2<String, P>> prod = readTableFromPath(spark, prodInputPath, p_clazz);
prod prod
.joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer") .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> { .map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2); Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
Optional<B> b = Optional.ofNullable(value._2()).map(Tuple2::_2); Optional<B> b = Optional.ofNullable(value._2()).map(Tuple2::_2);
if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) { if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) {
return mergeDatasource(p, b); return mergeDatasource(p, b);
} }
switch (priority) { switch (priority) {
default: default:
case "BETA": case "BETA":
return mergeWithPriorityToBETA(p, b); return mergeWithPriorityToBETA(p, b);
case "PROD": case "PROD":
return mergeWithPriorityToPROD(p, b); return mergeWithPriorityToPROD(p, b);
} }
}, Encoders.bean(p_clazz)) }, Encoders.bean(p_clazz))
.filter((FilterFunction<P>) Objects::nonNull) .filter((FilterFunction<P>) Objects::nonNull)
.write() .filter((FilterFunction<P>) o -> {
.mode(SaveMode.Overwrite) HashSet<String> collectedFromNames = Optional
.option("compression", "gzip") .ofNullable(o.getCollectedfrom())
.json(outputPath); .map(c -> c.stream().map(KeyValue::getValue).collect(Collectors.toCollection(HashSet::new)))
.orElse(new HashSet<String>());
return !collectedFromNames.contains("Datacite");
})
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
} }
/** /**
@ -184,19 +192,19 @@ public class MergeGraphTableSparkJob {
} }
private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableFromPath( private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableFromPath(
SparkSession spark, String inputEntityPath, Class<T> clazz) { SparkSession spark, String inputEntityPath, Class<T> clazz) {
log.info("Reading Graph table from: {}", inputEntityPath); log.info("Reading Graph table from: {}", inputEntityPath);
return spark return spark
.read() .read()
.textFile(inputEntityPath) .textFile(inputEntityPath)
.map( .map(
(MapFunction<String, Tuple2<String, T>>) value -> { (MapFunction<String, Tuple2<String, T>>) value -> {
final T t = OBJECT_MAPPER.readValue(value, clazz); final T t = OBJECT_MAPPER.readValue(value, clazz);
final String id = ModelSupport.idFn().apply(t); final String id = ModelSupport.idFn().apply(t);
return new Tuple2<>(id, t); return new Tuple2<>(id, t);
}, },
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
} }
private static void removeOutputDir(SparkSession spark, String path) { private static void removeOutputDir(SparkSession spark, String path) {