1
0
Fork 0

max concurrent executors set to 10, according to ORCID Director of Technology mail request

This commit is contained in:
Enrico Ottonello 2020-11-24 17:49:32 +01:00
parent 5c17e768b2
commit 99a086f0c6
3 changed files with 57 additions and 13 deletions

View File

@ -100,7 +100,13 @@ public class SparkDownloadOrcidAuthors {
HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record");
httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Accept", "application/vnd.orcid+xml");
httpGet.addHeader("Authorization", String.format("Bearer %s", token)); httpGet.addHeader("Authorization", String.format("Bearer %s", token));
long startReq = System.currentTimeMillis();
CloseableHttpResponse response = client.execute(httpGet); CloseableHttpResponse response = client.execute(httpGet);
long endReq = System.currentTimeMillis();
long reqTime = endReq - startReq;
if (reqTime < 1000) {
Thread.sleep(1000 - reqTime);
}
int statusCode = response.getStatusLine().getStatusCode(); int statusCode = response.getStatusLine().getStatusCode();
downloaded.setStatusCode(statusCode); downloaded.setStatusCode(statusCode);
if (statusCode != 200) { if (statusCode != 200) {
@ -111,15 +117,16 @@ public class SparkDownloadOrcidAuthors {
errorHTTP409Acc.add(1); errorHTTP409Acc.add(1);
case 503: case 503:
errorHTTP503Acc.add(1); errorHTTP503Acc.add(1);
throw new RuntimeException("Orcid request rate limit reached (HTTP 503)");
case 525: case 525:
errorHTTP525Acc.add(1); errorHTTP525Acc.add(1);
default: default:
errorHTTPGenericAcc.add(1); errorHTTPGenericAcc.add(1);
logger
.info(
"Downloading " + orcidId + " status code: "
+ response.getStatusLine().getStatusCode());
} }
logger
.info(
"Downloading " + orcidId + " status code: "
+ response.getStatusLine().getStatusCode());
return downloaded.toTuple2(); return downloaded.toTuple2();
} }
downloadedRecordsAcc.add(1); downloadedRecordsAcc.add(1);
@ -142,7 +149,7 @@ public class SparkDownloadOrcidAuthors {
logger.info("Authors modified count: " + authorsModifiedRDD.count()); logger.info("Authors modified count: " + authorsModifiedRDD.count());
logger.info("Start downloading ..."); logger.info("Start downloading ...");
authorsModifiedRDD authorsModifiedRDD
.repartition(20) .repartition(10)
.map(downloadRecordFunction) .map(downloadRecordFunction)
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
.saveAsNewAPIHadoopFile( .saveAsNewAPIHadoopFile(

View File

@ -14,10 +14,6 @@
</value> </value>
<description>the shell command that downloads the lambda file from orcid containing last orcid update informations</description> <description>the shell command that downloads the lambda file from orcid containing last orcid update informations</description>
</property> </property>
<property>
<name>sparkExecutorNumber</name>
<value>20</value>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<value>7G</value> <value>7G</value>
@ -35,7 +31,7 @@
</property> </property>
<property> <property>
<name>spark2MaxExecutors</name> <name>spark2MaxExecutors</name>
<value>20</value> <value>10</value>
</property> </property>
<property> <property>
<name>oozieActionShareLibForSpark2</name> <name>oozieActionShareLibForSpark2</name>

View File

@ -10,6 +10,9 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.TemporalUnit;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -24,6 +27,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.HttpClients;
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull; import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mortbay.log.Log;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import jdk.nashorn.internal.ir.annotations.Ignore; import jdk.nashorn.internal.ir.annotations.Ignore;
@ -45,7 +49,7 @@ public class OrcidClientTest {
@Test @Test
private void multipleDownloadTest() throws Exception { private void multipleDownloadTest() throws Exception {
int toDownload = 1; int toDownload = 10;
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
OrcidDownloader downloader = new OrcidDownloader(); OrcidDownloader downloader = new OrcidDownloader();
TarArchiveInputStream input = new TarArchiveInputStream( TarArchiveInputStream input = new TarArchiveInputStream(
@ -64,7 +68,7 @@ public class OrcidClientTest {
List<String> recordInfo = Arrays.asList(values); List<String> recordInfo = Arrays.asList(values);
String orcidId = recordInfo.get(0); String orcidId = recordInfo.get(0);
if (downloader.isModified(orcidId, recordInfo.get(3))) { if (downloader.isModified(orcidId, recordInfo.get(3))) {
downloadTest(orcidId); slowedDownDownload(orcidId);
modified++; modified++;
} }
rowNum++; rowNum++;
@ -181,7 +185,7 @@ public class OrcidClientTest {
} }
@Test @Test
public void testReadBase64CompressedRecord() throws Exception { private void testReadBase64CompressedRecord() throws Exception {
final String base64CompressedRecord = IOUtils final String base64CompressedRecord = IOUtils
.toString(getClass().getResourceAsStream("0000-0003-3028-6161.compressed.base64")); .toString(getClass().getResourceAsStream("0000-0003-3028-6161.compressed.base64"));
final String recordFromSeqFile = ArgumentApplicationParser.decompressValue(base64CompressedRecord); final String recordFromSeqFile = ArgumentApplicationParser.decompressValue(base64CompressedRecord);
@ -257,4 +261,41 @@ public class OrcidClientTest {
Path path = Paths.get("/tmp/orcid_log.txt"); Path path = Paths.get("/tmp/orcid_log.txt");
Files.write(path, log.getBytes(), StandardOpenOption.APPEND); Files.write(path, log.getBytes(), StandardOpenOption.APPEND);
} }
@Test
private void slowedDownDownloadTest() throws Exception {
String orcid = "0000-0001-5496-1243";
String record = slowedDownDownload(orcid);
String filename = "/tmp/downloaded_".concat(orcid).concat(".xml");
File f = new File(filename);
OutputStream outStream = new FileOutputStream(f);
IOUtils.write(record.getBytes(), outStream);
}
private String slowedDownDownload(String orcidId) throws Exception {
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record");
httpGet.addHeader("Accept", "application/vnd.orcid+xml");
httpGet.addHeader("Authorization", "Bearer 78fdb232-7105-4086-8570-e153f4198e3d");
long start = System.currentTimeMillis();
CloseableHttpResponse response = client.execute(httpGet);
long endReq = System.currentTimeMillis();
long reqSessionDuration = endReq - start;
logToFile("req time (millisec): " + reqSessionDuration);
if (reqSessionDuration < 1000) {
logToFile("wait ....");
Thread.sleep(1000 - reqSessionDuration);
}
long end = System.currentTimeMillis();
long total = end - start;
logToFile("total time (millisec): " + total);
if (response.getStatusLine().getStatusCode() != 200) {
logToFile("Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode());
}
return IOUtils.toString(response.getEntity().getContent());
} catch (Throwable e) {
e.printStackTrace();
}
return new String("");
}
} }