set wf configuration with spark.dynamicAllocation.maxExecutors 20 over 20 input partitions

This commit is contained in:
Enrico Ottonello 2020-11-23 16:01:23 +01:00
parent 5c9a727895
commit 5c17e768b2
2 changed files with 29 additions and 14 deletions

View File

@ -65,9 +65,14 @@ public class SparkDownloadOrcidAuthors {
spark -> { spark -> {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
LongAccumulator parsedRecordsAcc = spark.sparkContext().longAccumulator("parsedRecords"); LongAccumulator parsedRecordsAcc = spark.sparkContext().longAccumulator("parsed_records");
LongAccumulator modifiedRecordsAcc = spark.sparkContext().longAccumulator("modifiedRecords"); LongAccumulator modifiedRecordsAcc = spark.sparkContext().longAccumulator("to_download_records");
LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloadedRecords"); LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloaded_records");
LongAccumulator errorHTTP403Acc = spark.sparkContext().longAccumulator("error_HTTP_403");
LongAccumulator errorHTTP409Acc = spark.sparkContext().longAccumulator("error_HTTP_409");
LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503");
LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525");
LongAccumulator errorHTTPGenericAcc = spark.sparkContext().longAccumulator("error_HTTP_Generic");
logger.info("Retrieving data from lamda sequence file"); logger.info("Retrieving data from lamda sequence file");
JavaPairRDD<Text, Text> lamdaFileRDD = sc JavaPairRDD<Text, Text> lamdaFileRDD = sc
@ -99,6 +104,18 @@ public class SparkDownloadOrcidAuthors {
int statusCode = response.getStatusLine().getStatusCode(); int statusCode = response.getStatusLine().getStatusCode();
downloaded.setStatusCode(statusCode); downloaded.setStatusCode(statusCode);
if (statusCode != 200) { if (statusCode != 200) {
switch (statusCode) {
case 403:
errorHTTP403Acc.add(1);
case 409:
errorHTTP409Acc.add(1);
case 503:
errorHTTP503Acc.add(1);
case 525:
errorHTTP525Acc.add(1);
default:
errorHTTPGenericAcc.add(1);
}
logger logger
.info( .info(
"Downloading " + orcidId + " status code: " "Downloading " + orcidId + " status code: "
@ -106,10 +123,6 @@ public class SparkDownloadOrcidAuthors {
return downloaded.toTuple2(); return downloaded.toTuple2();
} }
downloadedRecordsAcc.add(1); downloadedRecordsAcc.add(1);
long currentDownloaded = downloadedRecordsAcc.value();
if ((currentDownloaded % 10000) == 0) {
logger.info("Current downloaded: " + currentDownloaded);
}
downloaded downloaded
.setCompressedData( .setCompressedData(
ArgumentApplicationParser ArgumentApplicationParser
@ -125,14 +138,11 @@ public class SparkDownloadOrcidAuthors {
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true"); sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
logger.info("Start execution ..."); logger.info("Start execution ...");
// List<Tuple2<Text, Text>> sampleList = lamdaFileRDD.take(500); JavaPairRDD<Text, Text> authorsModifiedRDD = lamdaFileRDD.filter(isModifiedAfterFilter);
// JavaRDD<Tuple2<Text, Text>> sampleRDD = sc.parallelize(sampleList);
// sampleRDD
JavaPairRDD<Text, Text> authorsModifiedRDD = lamdaFileRDD
.filter(isModifiedAfterFilter);
logger.info("Authors modified count: " + authorsModifiedRDD.count()); logger.info("Authors modified count: " + authorsModifiedRDD.count());
logger.info("Start downloading ..."); logger.info("Start downloading ...");
authorsModifiedRDD authorsModifiedRDD
.repartition(20)
.map(downloadRecordFunction) .map(downloadRecordFunction)
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
.saveAsNewAPIHadoopFile( .saveAsNewAPIHadoopFile(
@ -144,6 +154,11 @@ public class SparkDownloadOrcidAuthors {
logger.info("parsedRecordsAcc: " + parsedRecordsAcc.value().toString()); logger.info("parsedRecordsAcc: " + parsedRecordsAcc.value().toString());
logger.info("modifiedRecordsAcc: " + modifiedRecordsAcc.value().toString()); logger.info("modifiedRecordsAcc: " + modifiedRecordsAcc.value().toString());
logger.info("downloadedRecordsAcc: " + downloadedRecordsAcc.value().toString()); logger.info("downloadedRecordsAcc: " + downloadedRecordsAcc.value().toString());
logger.info("errorHTTP403Acc: " + errorHTTP403Acc.value().toString());
logger.info("errorHTTP409Acc: " + errorHTTP409Acc.value().toString());
logger.info("errorHTTP503Acc: " + errorHTTP503Acc.value().toString());
logger.info("errorHTTP525Acc: " + errorHTTP525Acc.value().toString());
logger.info("errorHTTPGenericAcc: " + errorHTTPGenericAcc.value().toString());
}); });
} }

View File

@ -149,9 +149,9 @@
<class>eu.dnetlib.doiboost.orcid.SparkDownloadOrcidAuthors</class> <class>eu.dnetlib.doiboost.orcid.SparkDownloadOrcidAuthors</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar> <jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--num-executors=${sparkExecutorNumber} --conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}