Dataset based implementation for SparkCountryPropagationJob3

This commit is contained in:
Claudio Atzori 2020-05-07 11:15:24 +02:00
parent 128c3bf1c8
commit 73243793b2
2 changed files with 61 additions and 63 deletions

View File

@ -4,10 +4,8 @@ package eu.dnetlib.dhp.countrypropagation;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
@ -17,6 +15,7 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
@ -27,6 +26,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Result;
@ -71,6 +71,7 @@ public class SparkCountryPropagationJob3 {
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(
conf,
@ -96,50 +97,51 @@ public class SparkCountryPropagationJob3 {
if (saveGraph) {
// updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath);
log.info("Reading Graph table from: {}", inputPath);
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaPairRDD<String, R> results = sc
.textFile(inputPath)
.map(r -> OBJECT_MAPPER.readValue(r, resultClazz))
.mapToPair(r -> new Tuple2<>(r.getId(), r));
JavaPairRDD<String, R> tmp = results.reduceByKey((r1, r2) -> {
if (r1 == null) {
return r2;
spark
.read()
.json(inputPath)
.as(Encoders.kryo(resultClazz))
.groupByKey((MapFunction<R, String>) result1 -> result1.getId(), Encoders.STRING())
.mapGroups(getCountryMergeFn(resultClazz), Encoders.bean(resultClazz))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
if (r2 == null) {
return r1;
}
if (Optional.ofNullable(r1.getCollectedfrom()).isPresent()) {
r1.setCountry(getUnionCountries(r1.getCountry(), r2.getCountry()));
return r1;
}
if (Optional.ofNullable(r2.getCollectedfrom()).isPresent()) {
r2.setCountry(getUnionCountries(r1.getCountry(), r2.getCountry()));
return r2;
}
r1.setCountry(getUnionCountries(r1.getCountry(), r2.getCountry()));
return r1;
private static <R extends Result> MapGroupsFunction<String, R, R> getCountryMergeFn(Class<R> resultClazz) {
return (MapGroupsFunction<String, R, R>) (key, values) -> {
R res = resultClazz.newInstance();
List<Country> countries = new ArrayList<>();
values.forEachRemaining(r -> {
res.mergeFrom(r);
countries.addAll(r.getCountry());
});
tmp
.map(c -> c._2())
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath, GzipCodec.class);
}
}
private static List<Country> getUnionCountries(List<Country> country, List<Country> country1) {
HashSet<String> countries = country
res
.setCountry(
countries
.stream()
.map(c -> c.getClassid())
.collect(Collectors.toCollection(HashSet::new));
country
.addAll(
country1
.collect(
Collectors
.toMap(
Country::getClassid,
Function.identity(),
(c1, c2) -> {
if (Optional
.ofNullable(
c1.getDataInfo().getInferenceprovenance())
.isPresent()) {
return c2;
}
return c1;
}))
.values()
.stream()
.filter(c -> !(countries.contains(c.getClassid())))
.collect(Collectors.toList()));
return country;
return res;
};
}
}

View File

@ -24,6 +24,7 @@
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset-outputpath">
<fs>
<delete path='${workingDir}/preparedInfo'/>
@ -158,6 +159,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=300
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
@ -191,11 +193,10 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/publication</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -221,11 +222,10 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dataset</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -251,11 +251,10 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -281,11 +280,10 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/software</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
@ -313,15 +311,13 @@
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
<!-- &#45;&#45;conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}-->
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
@ -351,11 +347,10 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dataset</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
@ -381,11 +376,10 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
@ -411,11 +405,10 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/software</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
@ -425,6 +418,9 @@
<ok to="wait"/>
<error to="Kill"/>
</action>
<join name="wait" to="End"/>
<join name="wait" to="End"/>
<end name="End"/>
</workflow-app>