From dd2e698a724de08c381171b67f55499f20fa71e0 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 5 May 2020 17:03:43 +0200 Subject: [PATCH] added a sequentialization step on the spark job. Addedd new parameter --- .../PrepareResultCountrySet.java | 4 + .../SparkCountryPropagationJob2.java | 160 +++++++++++++----- .../countrypropagation/oozie_app/workflow.xml | 4 + .../oozie_app/workflow.xml | 12 ++ 4 files changed, 133 insertions(+), 47 deletions(-) create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java new file mode 100644 index 000000000..9a8df9d3e --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.countrypropagation; + +public class PrepareResultCountrySet { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java index 15f9e9b60..58bc741de 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java @@ -3,8 +3,10 @@ package eu.dnetlib.dhp.countrypropagation; import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static jdk.nashorn.internal.objects.NativeDebug.map; import java.util.*; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -52,6 +54,11 @@ public class SparkCountryPropagationJob2 { final String datasourcecountrypath = parser.get("preparedInfoPath"); log.info("preparedInfoPath: {}", datasourcecountrypath); + final String possibleUpdatesPath = datasourcecountrypath + .substring(0, datasourcecountrypath.lastIndexOf("/") + 1) + + "possibleUpdates"; + log.info("possibleUpdatesPath: {}", possibleUpdatesPath); + final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); @@ -70,13 +77,14 @@ public class SparkCountryPropagationJob2 { conf, isSparkSessionManaged, spark -> { + removeOutputDir(spark, possibleUpdatesPath); execPropagation( spark, datasourcecountrypath, inputPath, outputPath, resultClazz, - saveGraph); + saveGraph, possibleUpdatesPath); }); } @@ -86,19 +94,30 @@ public class SparkCountryPropagationJob2 { String inputPath, String outputPath, Class resultClazz, - boolean saveGraph) { - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + boolean saveGraph, String possilbeUpdatesPath) { + // final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); // Load file with preprocessed association datasource - country Dataset datasourcecountryassoc = readAssocDatasourceCountry(spark, datasourcecountrypath); // broadcasting the result of the preparation step - Broadcast> broadcast_datasourcecountryassoc = sc.broadcast(datasourcecountryassoc); + // Broadcast> broadcast_datasourcecountryassoc = + // sc.broadcast(datasourcecountryassoc); Dataset potentialUpdates = getPotentialResultToUpdate( - spark, inputPath, resultClazz, broadcast_datasourcecountryassoc) + spark, inputPath, resultClazz, datasourcecountryassoc) .as(Encoders.bean(ResultCountrySet.class)); + potentialUpdates.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(possilbeUpdatesPath); + if (saveGraph) { + // updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath); + potentialUpdates = spark + .read() + .textFile(possilbeUpdatesPath) + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, ResultCountrySet.class), + Encoders.bean(ResultCountrySet.class)); updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath); } } @@ -113,69 +132,116 @@ public class SparkCountryPropagationJob2 { log.info("Reading Graph table from: {}", inputPath); Dataset result = readPathEntity(spark, inputPath, resultClazz); - Dataset> result_pair = result - .map( - r -> new Tuple2<>(r.getId(), r), - Encoders.tuple(Encoders.STRING(), Encoders.bean(resultClazz))); - - Dataset new_table = result_pair + Dataset new_table = result .joinWith( - potentialUpdates, - result_pair.col("_1").equalTo(potentialUpdates.col("resultId")), + potentialUpdates, result + .col("id") + .equalTo(potentialUpdates.col("resultId")), "left_outer") - .map( - (MapFunction, ResultCountrySet>, R>) value -> { - R r = value._1()._2(); - Optional potentialNewCountries = Optional.ofNullable(value._2()); - if (potentialNewCountries.isPresent()) { - HashSet countries = new HashSet<>(); - for (Qualifier country : r.getCountry()) { - countries.add(country.getClassid()); - } - Result res = new Result(); - res.setId(r.getId()); - List countryList = new ArrayList<>(); - for (CountrySbs country : potentialNewCountries - .get() - .getCountrySet()) { - if (!countries.contains(country.getClassid())) { - countryList - .add( - getCountry( - country.getClassid(), - country.getClassname())); - } - } - res.setCountry(countryList); - r.mergeFrom(res); - } - return r; - }, - Encoders.bean(resultClazz)); + .map((MapFunction, R>) value -> { + R r = value._1(); + Optional potentialNewCountries = Optional.ofNullable(value._2()); + if (potentialNewCountries.isPresent()) { + HashSet countries = r + .getCountry() + .stream() + .map(c -> c.getClassid()) + .collect(Collectors.toCollection(HashSet::new)); + + r + .getCountry() + .addAll( + potentialNewCountries + .get() + .getCountrySet() + .stream() + .filter(c -> !countries.contains(c.getClassid())) + .map(c -> getCountry(c.getClassid(), c.getClassname())) + .collect(Collectors.toList())); + +// Result res = new Result(); +// res.setId(r.getId()); +// List countryList = new ArrayList<>(); +// for (CountrySbs country : potentialNewCountries +// .get() +// .getCountrySet()) { +// if (!countries.contains(country.getClassid())) { +// countryList +// .add( +// getCountry( +// country.getClassid(), +// country.getClassname())); +// } +// } +// res.setCountry(countryList); +// r.mergeFrom(res); + } + return r; + }, Encoders.bean(resultClazz)); +// Dataset> result_pair = result +// .map( +// r -> new Tuple2<>(r.getId(), r), +// Encoders.tuple(Encoders.STRING(), Encoders.bean(resultClazz))); +// +// Dataset new_table = result_pair +// .joinWith( +// potentialUpdates, +// result_pair.col("_1").equalTo(potentialUpdates.col("resultId")), +// "left_outer") +// .map( +// (MapFunction, ResultCountrySet>, R>) value -> { +// R r = value._1()._2(); +// Optional potentialNewCountries = Optional.ofNullable(value._2()); +// if (potentialNewCountries.isPresent()) { +// HashSet countries = new HashSet<>(); +// for (Qualifier country : r.getCountry()) { +// countries.add(country.getClassid()); +// } +// Result res = new Result(); +// res.setId(r.getId()); +// List countryList = new ArrayList<>(); +// for (CountrySbs country : potentialNewCountries +// .get() +// .getCountrySet()) { +// if (!countries.contains(country.getClassid())) { +// countryList +// .add( +// getCountry( +// country.getClassid(), +// country.getClassname())); +// } +// } +// res.setCountry(countryList); +// r.mergeFrom(res); +// } +// return r; +// }, +// Encoders.bean(resultClazz)); log.info("Saving graph table to path: {}", outputPath); // log.info("number of saved recordsa: {}", new_table.count()); - new_table.toJSON().write().option("compression", "gzip").text(outputPath); + new_table.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(outputPath); + } private static Dataset getPotentialResultToUpdate( SparkSession spark, String inputPath, Class resultClazz, - Broadcast> broadcast_datasourcecountryassoc) { + Dataset datasourcecountryassoc) { Dataset result = readPathEntity(spark, inputPath, resultClazz); result.createOrReplaceTempView("result"); // log.info("number of results: {}", result.count()); createCfHbforresult(spark); - return countryPropagationAssoc(spark, broadcast_datasourcecountryassoc); + return countryPropagationAssoc(spark, datasourcecountryassoc); } private static Dataset countryPropagationAssoc( SparkSession spark, - Broadcast> broadcast_datasourcecountryassoc) { + Dataset datasource_country) { - Dataset datasource_country = broadcast_datasourcecountryassoc.value(); + // Dataset datasource_country = broadcast_datasourcecountryassoc.value(); datasource_country.createOrReplaceTempView("datasource_country"); log.info("datasource_country number : {}", datasource_country.count()); diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml index d5fb199cd..70ac77434 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml @@ -140,6 +140,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false --sourcePath${sourcePath}/publication --hive_metastore_uris${hive_metastore_uris} @@ -169,6 +170,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false --sourcePath${sourcePath}/dataset --hive_metastore_uris${hive_metastore_uris} @@ -198,6 +200,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false --sourcePath${sourcePath}/otherresearchproduct --hive_metastore_uris${hive_metastore_uris} @@ -227,6 +230,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false --sourcePath${sourcePath}/software --hive_metastore_uris${hive_metastore_uris} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml index ac25b6728..798717c3f 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml @@ -261,6 +261,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false --possibleUpdatesPath${workingDir}/preparedInfo/mergedOrcidAssoc --sourcePath${sourcePath}/publication @@ -289,6 +292,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false --possibleUpdatesPath${workingDir}/preparedInfo/mergedOrcidAssoc --sourcePath${sourcePath}/dataset @@ -317,6 +323,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false --possibleUpdatesPath${workingDir}/preparedInfo/mergedOrcidAssoc --sourcePath${sourcePath}/otherresearchproduct @@ -345,6 +354,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false --possibleUpdatesPath${workingDir}/preparedInfo/mergedOrcidAssoc --sourcePath${sourcePath}/software