diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java index d33b9e8468..8baa469bfe 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java @@ -1,11 +1,11 @@ + package eu.dnetlib.dhp.oa.graph.merge; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Objects; +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -16,13 +16,16 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; -import java.util.Objects; -import java.util.Optional; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - /** * Combines the content from two aggregator graph tables of the same type, entities (or relationships) with the same ids * are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined @@ -30,101 +33,102 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; */ public class MergeGraphSparkJob { - private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - CleanGraphSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + String jsonConfiguration = IOUtils + .toString( + CleanGraphSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String betaInputPath = parser.get("betaInputPath"); - log.info("betaInputPath: {}", betaInputPath); + String betaInputPath = parser.get("betaInputPath"); + log.info("betaInputPath: {}", betaInputPath); - String prodInputPath = parser.get("prodInputPath"); - log.info("prodInputPath: {}", prodInputPath); + String prodInputPath = parser.get("prodInputPath"); + log.info("prodInputPath: {}", prodInputPath); - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); - Class entityClazz = (Class) Class.forName(graphTableClassName); + Class entityClazz = (Class) Class.forName(graphTableClassName); - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - mergeGraphTable(spark, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath); - }); - } + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + mergeGraphTable(spark, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath); + }); + } - private static

void mergeGraphTable( - SparkSession spark, - String betaInputPath, - String prodInputPath, - Class

p_clazz, - Class b_clazz, - String outputPath) { + private static

void mergeGraphTable( + SparkSession spark, + String betaInputPath, + String prodInputPath, + Class

p_clazz, + Class b_clazz, + String outputPath) { - Dataset> beta = readTableFromPath(spark, betaInputPath, b_clazz); - Dataset> prod = readTableFromPath(spark, prodInputPath, p_clazz); + Dataset> beta = readTableFromPath(spark, betaInputPath, b_clazz); + Dataset> prod = readTableFromPath(spark, prodInputPath, p_clazz); - prod.joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer") - .map((MapFunction, Tuple2>, P>) value -> { - Optional

p = Optional.ofNullable(value._1()).map(Tuple2::_2); - Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2); - if (p.isPresent() & !b.isPresent()) { - return p.get(); - } - if (b.isPresent()) { - return (P) b.get(); - } - return null; - }, Encoders.bean(p_clazz)) - .filter((FilterFunction

) Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - } + prod + .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer") + .map((MapFunction, Tuple2>, P>) value -> { + Optional

p = Optional.ofNullable(value._1()).map(Tuple2::_2); + Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2); + if (p.isPresent() & !b.isPresent()) { + return p.get(); + } + if (b.isPresent()) { + return (P) b.get(); + } + return null; + }, Encoders.bean(p_clazz)) + .filter((FilterFunction

) Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } - private static Dataset> readTableFromPath( - SparkSession spark, String inputEntityPath, Class clazz) { + private static Dataset> readTableFromPath( + SparkSession spark, String inputEntityPath, Class clazz) { - log.info("Reading Graph table from: {}", inputEntityPath); - return spark - .read() - .textFile(inputEntityPath) - .map( - (MapFunction>) value -> { - final T t = OBJECT_MAPPER.readValue(value, clazz); - final String id = ModelSupport.idFn().apply(t); - return new Tuple2<>(id, t); - }, - Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); - } + log.info("Reading Graph table from: {}", inputEntityPath); + return spark + .read() + .textFile(inputEntityPath) + .map( + (MapFunction>) value -> { + final T t = OBJECT_MAPPER.readValue(value, clazz); + final String id = ModelSupport.idFn().apply(t); + return new Tuple2<>(id, t); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); + } - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } }