From 99a086f0c68f17489e4b39e32e9fbbb24418b21d Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Tue, 24 Nov 2020 17:49:32 +0100 Subject: [PATCH] max concurrent executors set to 10, according to ORCID Director of Technology mail request --- .../orcid/SparkDownloadOrcidAuthors.java | 17 +++++-- .../oozie_app/workflow.xml | 6 +-- .../doiboost/orcid/OrcidClientTest.java | 47 +++++++++++++++++-- 3 files changed, 57 insertions(+), 13 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java index 68f44541a8..598835a000 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java @@ -100,7 +100,13 @@ public class SparkDownloadOrcidAuthors { HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Authorization", String.format("Bearer %s", token)); + long startReq = System.currentTimeMillis(); CloseableHttpResponse response = client.execute(httpGet); + long endReq = System.currentTimeMillis(); + long reqTime = endReq - startReq; + if (reqTime < 1000) { + Thread.sleep(1000 - reqTime); + } int statusCode = response.getStatusLine().getStatusCode(); downloaded.setStatusCode(statusCode); if (statusCode != 200) { @@ -111,15 +117,16 @@ public class SparkDownloadOrcidAuthors { errorHTTP409Acc.add(1); case 503: errorHTTP503Acc.add(1); + throw new RuntimeException("Orcid request rate limit reached (HTTP 503)"); case 525: errorHTTP525Acc.add(1); default: errorHTTPGenericAcc.add(1); + logger + .info( + "Downloading " + orcidId + " status code: " + + response.getStatusLine().getStatusCode()); } - logger - .info( - "Downloading " + orcidId + " status code: " - + response.getStatusLine().getStatusCode()); return downloaded.toTuple2(); } downloadedRecordsAcc.add(1); @@ -142,7 +149,7 @@ public class SparkDownloadOrcidAuthors { logger.info("Authors modified count: " + authorsModifiedRDD.count()); logger.info("Start downloading ..."); authorsModifiedRDD - .repartition(20) + .repartition(10) .map(downloadRecordFunction) .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) .saveAsNewAPIHadoopFile( diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml index 1c2a7b5887..b9383558cc 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml @@ -14,10 +14,6 @@ the shell command that downloads the lambda file from orcid containing last orcid update informations - - sparkExecutorNumber - 20 - sparkDriverMemory 7G @@ -35,7 +31,7 @@ spark2MaxExecutors - 20 + 10 oozieActionShareLibForSpark2 diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java index d6ce99f1c6..66a7badb7b 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java @@ -10,6 +10,9 @@ import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.temporal.TemporalUnit; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -24,6 +27,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull; import org.junit.jupiter.api.Test; +import org.mortbay.log.Log; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import jdk.nashorn.internal.ir.annotations.Ignore; @@ -45,7 +49,7 @@ public class OrcidClientTest { @Test private void multipleDownloadTest() throws Exception { - int toDownload = 1; + int toDownload = 10; long start = System.currentTimeMillis(); OrcidDownloader downloader = new OrcidDownloader(); TarArchiveInputStream input = new TarArchiveInputStream( @@ -64,7 +68,7 @@ public class OrcidClientTest { List recordInfo = Arrays.asList(values); String orcidId = recordInfo.get(0); if (downloader.isModified(orcidId, recordInfo.get(3))) { - downloadTest(orcidId); + slowedDownDownload(orcidId); modified++; } rowNum++; @@ -181,7 +185,7 @@ public class OrcidClientTest { } @Test - public void testReadBase64CompressedRecord() throws Exception { + private void testReadBase64CompressedRecord() throws Exception { final String base64CompressedRecord = IOUtils .toString(getClass().getResourceAsStream("0000-0003-3028-6161.compressed.base64")); final String recordFromSeqFile = ArgumentApplicationParser.decompressValue(base64CompressedRecord); @@ -257,4 +261,41 @@ public class OrcidClientTest { Path path = Paths.get("/tmp/orcid_log.txt"); Files.write(path, log.getBytes(), StandardOpenOption.APPEND); } + + @Test + private void slowedDownDownloadTest() throws Exception { + String orcid = "0000-0001-5496-1243"; + String record = slowedDownDownload(orcid); + String filename = "/tmp/downloaded_".concat(orcid).concat(".xml"); + File f = new File(filename); + OutputStream outStream = new FileOutputStream(f); + IOUtils.write(record.getBytes(), outStream); + } + + private String slowedDownDownload(String orcidId) throws Exception { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); + httpGet.addHeader("Accept", "application/vnd.orcid+xml"); + httpGet.addHeader("Authorization", "Bearer 78fdb232-7105-4086-8570-e153f4198e3d"); + long start = System.currentTimeMillis(); + CloseableHttpResponse response = client.execute(httpGet); + long endReq = System.currentTimeMillis(); + long reqSessionDuration = endReq - start; + logToFile("req time (millisec): " + reqSessionDuration); + if (reqSessionDuration < 1000) { + logToFile("wait ...."); + Thread.sleep(1000 - reqSessionDuration); + } + long end = System.currentTimeMillis(); + long total = end - start; + logToFile("total time (millisec): " + total); + if (response.getStatusLine().getStatusCode() != 200) { + logToFile("Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); + } + return IOUtils.toString(response.getEntity().getContent()); + } catch (Throwable e) { + e.printStackTrace(); + } + return new String(""); + } }