diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java index 9e2cac7577..3d526ec695 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java @@ -4,10 +4,8 @@ package eu.dnetlib.dhp.countrypropagation; import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; +import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; @@ -17,6 +15,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -27,6 +26,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Country; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Result; @@ -71,6 +71,7 @@ public class SparkCountryPropagationJob3 { Class resultClazz = (Class) Class.forName(resultClassName); SparkConf conf = new SparkConf(); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); runWithSparkSession( conf, @@ -96,50 +97,51 @@ public class SparkCountryPropagationJob3 { if (saveGraph) { // updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath); log.info("Reading Graph table from: {}", inputPath); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaPairRDD results = sc - .textFile(inputPath) - .map(r -> OBJECT_MAPPER.readValue(r, resultClazz)) - .mapToPair(r -> new Tuple2<>(r.getId(), r)); - JavaPairRDD tmp = results.reduceByKey((r1, r2) -> { - if (r1 == null) { - return r2; - } - if (r2 == null) { - return r1; - } - if (Optional.ofNullable(r1.getCollectedfrom()).isPresent()) { - r1.setCountry(getUnionCountries(r1.getCountry(), r2.getCountry())); - return r1; - } - if (Optional.ofNullable(r2.getCollectedfrom()).isPresent()) { - r2.setCountry(getUnionCountries(r1.getCountry(), r2.getCountry())); - return r2; - } - r1.setCountry(getUnionCountries(r1.getCountry(), r2.getCountry())); - return r1; - }); - - tmp - .map(c -> c._2()) - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(outputPath, GzipCodec.class); + spark + .read() + .json(inputPath) + .as(Encoders.kryo(resultClazz)) + .groupByKey((MapFunction) result1 -> result1.getId(), Encoders.STRING()) + .mapGroups(getCountryMergeFn(resultClazz), Encoders.bean(resultClazz)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); } } - private static List getUnionCountries(List country, List country1) { - HashSet countries = country - .stream() - .map(c -> c.getClassid()) - .collect(Collectors.toCollection(HashSet::new)); - country - .addAll( - country1 - .stream() - .filter(c -> !(countries.contains(c.getClassid()))) - .collect(Collectors.toList())); - return country; + private static MapGroupsFunction getCountryMergeFn(Class resultClazz) { + return (MapGroupsFunction) (key, values) -> { + R res = resultClazz.newInstance(); + List countries = new ArrayList<>(); + values.forEachRemaining(r -> { + res.mergeFrom(r); + countries.addAll(r.getCountry()); + }); + res + .setCountry( + countries + .stream() + .collect( + Collectors + .toMap( + Country::getClassid, + Function.identity(), + (c1, c2) -> { + if (Optional + .ofNullable( + c1.getDataInfo().getInferenceprovenance()) + .isPresent()) { + return c2; + } + return c1; + })) + .values() + .stream() + .collect(Collectors.toList())); + return res; + }; } } diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml index 469aa25623..ac0fff2c0b 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml @@ -24,6 +24,7 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + @@ -158,6 +159,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=300 --sourcePath${sourcePath} --whitelist${whitelist} @@ -191,11 +193,10 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.speculation=false --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false + --conf spark.sql.shuffle.partitions=3840 --sourcePath${workingDir}/publication --hive_metastore_uris${hive_metastore_uris} @@ -221,11 +222,10 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.speculation=false --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false + --conf spark.sql.shuffle.partitions=3840 --sourcePath${workingDir}/dataset --hive_metastore_uris${hive_metastore_uris} @@ -251,11 +251,10 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.speculation=false --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false + --conf spark.sql.shuffle.partitions=3840 --sourcePath${workingDir}/otherresearchproduct --hive_metastore_uris${hive_metastore_uris} @@ -281,11 +280,10 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.speculation=false --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false + --conf spark.sql.shuffle.partitions=3840 --sourcePath${workingDir}/software --hive_metastore_uris${hive_metastore_uris} @@ -313,15 +311,13 @@ eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3 dhp-propagation-${projectVersion}.jar - - + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.dynamicAllocation.enabled=true - --conf spark.speculation=false --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false @@ -351,11 +347,10 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.speculation=false --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false + --conf spark.sql.shuffle.partitions=3840 --sourcePath${workingDir}/dataset --saveGraph${saveGraph} @@ -381,11 +376,10 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.speculation=false --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false + --conf spark.sql.shuffle.partitions=3840 --sourcePath${workingDir}/otherresearchproduct --saveGraph${saveGraph} @@ -411,11 +405,10 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.speculation=false --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false + --conf spark.sql.shuffle.partitions=3840 --sourcePath${workingDir}/software --saveGraph${saveGraph} @@ -425,6 +418,9 @@ - + + + + \ No newline at end of file