diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java new file mode 100644 index 0000000000..9e2cac7577 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java @@ -0,0 +1,145 @@ + +package eu.dnetlib.dhp.countrypropagation; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Country; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple2; + +public class SparkCountryPropagationJob3 { + + private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob3.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkCountryPropagationJob3.class + .getResourceAsStream( + "/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final Boolean saveGraph = Optional + .ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("saveGraph: {}", saveGraph); + + Class resultClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + execPropagation( + spark, + inputPath, + outputPath, + resultClazz, + saveGraph); + }); + } + + private static void execPropagation( + SparkSession spark, + String inputPath, + String outputPath, + Class resultClazz, + boolean saveGraph) { + + if (saveGraph) { + // updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath); + log.info("Reading Graph table from: {}", inputPath); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + JavaPairRDD results = sc + .textFile(inputPath) + .map(r -> OBJECT_MAPPER.readValue(r, resultClazz)) + .mapToPair(r -> new Tuple2<>(r.getId(), r)); + + JavaPairRDD tmp = results.reduceByKey((r1, r2) -> { + if (r1 == null) { + return r2; + } + if (r2 == null) { + return r1; + } + if (Optional.ofNullable(r1.getCollectedfrom()).isPresent()) { + r1.setCountry(getUnionCountries(r1.getCountry(), r2.getCountry())); + return r1; + } + if (Optional.ofNullable(r2.getCollectedfrom()).isPresent()) { + r2.setCountry(getUnionCountries(r1.getCountry(), r2.getCountry())); + return r2; + } + r1.setCountry(getUnionCountries(r1.getCountry(), r2.getCountry())); + return r1; + }); + + tmp + .map(c -> c._2()) + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(outputPath, GzipCodec.class); + } + } + + private static List getUnionCountries(List country, List country1) { + HashSet countries = country + .stream() + .map(c -> c.getClassid()) + .collect(Collectors.toCollection(HashSet::new)); + country + .addAll( + country1 + .stream() + .filter(c -> !(countries.contains(c.getClassid()))) + .collect(Collectors.toList())); + return country; + } + +} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json new file mode 100644 index 0000000000..9956f3474f --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName":"h", + "paramLongName":"hive_metastore_uris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName":"tn", + "paramLongName":"resultTableName", + "paramDescription": "the name of the result table we are currently working on", + "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "preparedInfoPath", + "paramDescription": "the path where prepared info have been stored", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + } +] \ No newline at end of file