From 5c17e768b26789df1e8bf120eeebab93854a716e Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Mon, 23 Nov 2020 16:01:23 +0100 Subject: [PATCH] set wf configuration with spark.dynamicAllocation.maxExecutors 20 over 20 input partitions --- .../orcid/SparkDownloadOrcidAuthors.java | 39 +++++++++++++------ .../oozie_app/workflow.xml | 4 +- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java index 850a654d4..68f44541a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java @@ -65,9 +65,14 @@ public class SparkDownloadOrcidAuthors { spark -> { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - LongAccumulator parsedRecordsAcc = spark.sparkContext().longAccumulator("parsedRecords"); - LongAccumulator modifiedRecordsAcc = spark.sparkContext().longAccumulator("modifiedRecords"); - LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloadedRecords"); + LongAccumulator parsedRecordsAcc = spark.sparkContext().longAccumulator("parsed_records"); + LongAccumulator modifiedRecordsAcc = spark.sparkContext().longAccumulator("to_download_records"); + LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloaded_records"); + LongAccumulator errorHTTP403Acc = spark.sparkContext().longAccumulator("error_HTTP_403"); + LongAccumulator errorHTTP409Acc = spark.sparkContext().longAccumulator("error_HTTP_409"); + LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503"); + LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525"); + LongAccumulator errorHTTPGenericAcc = spark.sparkContext().longAccumulator("error_HTTP_Generic"); logger.info("Retrieving data from lamda sequence file"); JavaPairRDD lamdaFileRDD = sc @@ -99,6 +104,18 @@ public class SparkDownloadOrcidAuthors { int statusCode = response.getStatusLine().getStatusCode(); downloaded.setStatusCode(statusCode); if (statusCode != 200) { + switch (statusCode) { + case 403: + errorHTTP403Acc.add(1); + case 409: + errorHTTP409Acc.add(1); + case 503: + errorHTTP503Acc.add(1); + case 525: + errorHTTP525Acc.add(1); + default: + errorHTTPGenericAcc.add(1); + } logger .info( "Downloading " + orcidId + " status code: " @@ -106,10 +123,6 @@ public class SparkDownloadOrcidAuthors { return downloaded.toTuple2(); } downloadedRecordsAcc.add(1); - long currentDownloaded = downloadedRecordsAcc.value(); - if ((currentDownloaded % 10000) == 0) { - logger.info("Current downloaded: " + currentDownloaded); - } downloaded .setCompressedData( ArgumentApplicationParser @@ -125,14 +138,11 @@ public class SparkDownloadOrcidAuthors { sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true"); logger.info("Start execution ..."); -// List> sampleList = lamdaFileRDD.take(500); -// JavaRDD> sampleRDD = sc.parallelize(sampleList); -// sampleRDD - JavaPairRDD authorsModifiedRDD = lamdaFileRDD - .filter(isModifiedAfterFilter); + JavaPairRDD authorsModifiedRDD = lamdaFileRDD.filter(isModifiedAfterFilter); logger.info("Authors modified count: " + authorsModifiedRDD.count()); logger.info("Start downloading ..."); authorsModifiedRDD + .repartition(20) .map(downloadRecordFunction) .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) .saveAsNewAPIHadoopFile( @@ -144,6 +154,11 @@ public class SparkDownloadOrcidAuthors { logger.info("parsedRecordsAcc: " + parsedRecordsAcc.value().toString()); logger.info("modifiedRecordsAcc: " + modifiedRecordsAcc.value().toString()); logger.info("downloadedRecordsAcc: " + downloadedRecordsAcc.value().toString()); + logger.info("errorHTTP403Acc: " + errorHTTP403Acc.value().toString()); + logger.info("errorHTTP409Acc: " + errorHTTP409Acc.value().toString()); + logger.info("errorHTTP503Acc: " + errorHTTP503Acc.value().toString()); + logger.info("errorHTTP525Acc: " + errorHTTP525Acc.value().toString()); + logger.info("errorHTTPGenericAcc: " + errorHTTPGenericAcc.value().toString()); }); } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml index 5f728d35b..1c2a7b588 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml @@ -149,9 +149,9 @@ eu.dnetlib.doiboost.orcid.SparkDownloadOrcidAuthors dhp-doiboost-${projectVersion}.jar - --num-executors=${sparkExecutorNumber} + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}