1
0
Fork 0

other task executions go ahead if UnknownHostException happens on a single task

This commit is contained in:
Enrico Ottonello 2021-09-14 17:26:15 +02:00
parent aed29156c7
commit aefa36c54b
2 changed files with 32 additions and 2 deletions

View File

@ -4,8 +4,11 @@ package eu.dnetlib.doiboost.orcid;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -18,6 +21,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
@ -78,6 +82,7 @@ public class SparkDownloadOrcidAuthors {
LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503"); LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503");
LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525"); LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525");
LongAccumulator errorHTTPGenericAcc = spark.sparkContext().longAccumulator("error_HTTP_Generic"); LongAccumulator errorHTTPGenericAcc = spark.sparkContext().longAccumulator("error_HTTP_Generic");
LongAccumulator unknowHostAcc = spark.sparkContext().longAccumulator("error_unknowHost");
logger.info("Retrieving data from lamda sequence file"); logger.info("Retrieving data from lamda sequence file");
JavaPairRDD<Text, Text> lamdaFileRDD = sc JavaPairRDD<Text, Text> lamdaFileRDD = sc
@ -107,7 +112,17 @@ public class SparkDownloadOrcidAuthors {
httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Accept", "application/vnd.orcid+xml");
httpGet.addHeader("Authorization", String.format("Bearer %s", token)); httpGet.addHeader("Authorization", String.format("Bearer %s", token));
long startReq = System.currentTimeMillis(); long startReq = System.currentTimeMillis();
CloseableHttpResponse response = client.execute(httpGet); CloseableHttpResponse response = null;
try {
response = client.execute(httpGet);
} catch (UnknownHostException u) {
downloaded.setStatusCode(-1);
unknowHostAcc.add(1);
if (client != null) {
client.close();
}
return downloaded.toTuple2();
}
long endReq = System.currentTimeMillis(); long endReq = System.currentTimeMillis();
long reqTime = endReq - startReq; long reqTime = endReq - startReq;
if (reqTime < 1000) { if (reqTime < 1000) {
@ -171,6 +186,7 @@ public class SparkDownloadOrcidAuthors {
logger.info("errorHTTP503Acc: {}", errorHTTP503Acc.value()); logger.info("errorHTTP503Acc: {}", errorHTTP503Acc.value());
logger.info("errorHTTP525Acc: {}", errorHTTP525Acc.value()); logger.info("errorHTTP525Acc: {}", errorHTTP525Acc.value());
logger.info("errorHTTPGenericAcc: {}", errorHTTPGenericAcc.value()); logger.info("errorHTTPGenericAcc: {}", errorHTTPGenericAcc.value());
logger.info("unknowHostAcc: {}", unknowHostAcc.value());
}); });
} }

View File

@ -3,6 +3,8 @@ package eu.dnetlib.doiboost.orcid;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
@ -96,6 +98,7 @@ public class SparkDownloadOrcidWorks {
LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503"); LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503");
LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525"); LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525");
LongAccumulator errorHTTPGenericAcc = spark.sparkContext().longAccumulator("error_HTTP_Generic"); LongAccumulator errorHTTPGenericAcc = spark.sparkContext().longAccumulator("error_HTTP_Generic");
LongAccumulator unknowHostAcc = spark.sparkContext().longAccumulator("error_unknowHost");
JavaPairRDD<Text, Text> updatedAuthorsRDD = sc JavaPairRDD<Text, Text> updatedAuthorsRDD = sc
.sequenceFile(workingPath + "downloads/updated_authors/*", Text.class, Text.class); .sequenceFile(workingPath + "downloads/updated_authors/*", Text.class, Text.class);
@ -154,7 +157,17 @@ public class SparkDownloadOrcidWorks {
httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Accept", "application/vnd.orcid+xml");
httpGet.addHeader("Authorization", String.format("Bearer %s", token)); httpGet.addHeader("Authorization", String.format("Bearer %s", token));
long startReq = System.currentTimeMillis(); long startReq = System.currentTimeMillis();
CloseableHttpResponse response = client.execute(httpGet); CloseableHttpResponse response = null;
try {
response = client.execute(httpGet);
} catch (UnknownHostException u) {
downloaded.setStatusCode(-1);
unknowHostAcc.add(1);
if (client != null) {
client.close();
}
return downloaded.toTuple2();
}
long endReq = System.currentTimeMillis(); long endReq = System.currentTimeMillis();
long reqTime = endReq - startReq; long reqTime = endReq - startReq;
if (reqTime < 1000) { if (reqTime < 1000) {
@ -219,6 +232,7 @@ public class SparkDownloadOrcidWorks {
logger.info("errorHTTP503Acc: {}", errorHTTP503Acc.value()); logger.info("errorHTTP503Acc: {}", errorHTTP503Acc.value());
logger.info("errorHTTP525Acc: {}", errorHTTP525Acc.value()); logger.info("errorHTTP525Acc: {}", errorHTTP525Acc.value());
logger.info("errorHTTPGenericAcc: {}", errorHTTPGenericAcc.value()); logger.info("errorHTTPGenericAcc: {}", errorHTTPGenericAcc.value());
logger.info("unknowHostAcc: {}", unknowHostAcc.value());
}); });
} }