forked from D-Net/dnet-hadoop
spark job to download orcid record modified after a fixed date
This commit is contained in:
parent
12756f9d41
commit
0b29bb7e3b
|
@ -27,8 +27,8 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
public class OrcidDownloader extends OrcidDSManager {
|
public class OrcidDownloader extends OrcidDSManager {
|
||||||
|
|
||||||
static final int REQ_LIMIT = 24;
|
static final int REQ_LIMIT = 24;
|
||||||
static final int REQ_MAX_TEST = 100;
|
// static final int REQ_MAX_TEST = 100;
|
||||||
static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 10;
|
static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 10000;
|
||||||
static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||||
static final String lastUpdate = "2019-09-30 00:00:00";
|
static final String lastUpdate = "2019-09-30 00:00:00";
|
||||||
private String lambdaFileName;
|
private String lambdaFileName;
|
||||||
|
@ -136,9 +136,9 @@ public class OrcidDownloader extends OrcidDSManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (parsedRecordsCounter > REQ_MAX_TEST) {
|
// if (parsedRecordsCounter > REQ_MAX_TEST) {
|
||||||
break;
|
// break;
|
||||||
}
|
// }
|
||||||
if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) {
|
if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) {
|
||||||
Log
|
Log
|
||||||
.info(
|
.info(
|
||||||
|
@ -148,9 +148,9 @@ public class OrcidDownloader extends OrcidDSManager {
|
||||||
+ downloadedRecordsCounter
|
+ downloadedRecordsCounter
|
||||||
+ " saved: "
|
+ " saved: "
|
||||||
+ savedRecordsCounter);
|
+ savedRecordsCounter);
|
||||||
if (parsedRecordsCounter > REQ_MAX_TEST) {
|
// if (parsedRecordsCounter > REQ_MAX_TEST) {
|
||||||
break;
|
// break;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
long endDownload = System.currentTimeMillis();
|
long endDownload = System.currentTimeMillis();
|
||||||
|
|
|
@ -3,23 +3,39 @@ package eu.dnetlib.doiboost.orcid;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Date;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
|
import org.apache.http.client.methods.HttpGet;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.impl.client.HttpClients;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.Function;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.mortbay.log.Log;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class SparkOrcidGenerateAuthors {
|
public class SparkOrcidGenerateAuthors {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||||
|
static final String lastUpdate = "2019-09-30 00:00:00";
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException, Exception {
|
||||||
Logger logger = LoggerFactory.getLogger(SparkOrcidGenerateAuthors.class);
|
Logger logger = LoggerFactory.getLogger(SparkOrcidGenerateAuthors.class);
|
||||||
logger.info("[ SparkOrcidGenerateAuthors STARTED]");
|
logger.info("[ SparkOrcidGenerateAuthors STARTED]");
|
||||||
try {
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
|
@ -33,23 +49,82 @@ public class SparkOrcidGenerateAuthors {
|
||||||
.map(Boolean::valueOf)
|
.map(Boolean::valueOf)
|
||||||
.orElse(Boolean.TRUE);
|
.orElse(Boolean.TRUE);
|
||||||
logger.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
logger.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
final String workingDirPath = parser.get("workingPath_orcid");
|
final String workingPath = parser.get("workingPath");
|
||||||
logger.info("workingDirPath: ", workingDirPath);
|
logger.info("workingPath: ", workingPath);
|
||||||
|
final String outputAuthorsPath = parser.get("outputAuthorsPath");
|
||||||
|
logger.info("outputAuthorsPath: ", outputAuthorsPath);
|
||||||
|
final String token = parser.get("token");
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
Dataset<Row> lambda = spark.read().load(workingDirPath + "last_modified.csv");
|
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
logger.info("lambda file loaded.");
|
JavaRDD<String> lamdaFileRDD = sc.textFile(workingPath + "last_modified.csv");
|
||||||
|
Function<String, Boolean> isModifiedAfterFilter = line -> {
|
||||||
|
String[] values = line.split(",");
|
||||||
|
String orcidId = values[0];
|
||||||
|
if (isModified(orcidId, values[3])) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
Function<String, Tuple2<String, String>> downloadRecordFunction = line -> {
|
||||||
|
String[] values = line.split(",");
|
||||||
|
String orcidId = values[0];
|
||||||
|
return downloadRecord(orcidId, token);
|
||||||
|
};
|
||||||
|
|
||||||
|
lamdaFileRDD
|
||||||
|
.filter(isModifiedAfterFilter)
|
||||||
|
.map(downloadRecordFunction)
|
||||||
|
.rdd()
|
||||||
|
.saveAsTextFile(workingPath.concat(outputAuthorsPath));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isModified(String orcidId, String modifiedDate) {
|
||||||
|
Date modifiedDateDt = null;
|
||||||
|
Date lastUpdateDt = null;
|
||||||
|
try {
|
||||||
|
if (modifiedDate.length() != 19) {
|
||||||
|
modifiedDate = modifiedDate.substring(0, 19);
|
||||||
|
}
|
||||||
|
modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate);
|
||||||
|
lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
Log.warn("[" + orcidId + "] Parsing date: ", e.getMessage());
|
||||||
logger.info("****************************** " + e.getMessage());
|
return true;
|
||||||
|
}
|
||||||
|
return modifiedDateDt.after(lastUpdateDt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Tuple2<String, String> downloadRecord(String orcidId, String token) {
|
||||||
|
final DownloadedRecordData data = new DownloadedRecordData();
|
||||||
|
data.setOrcidId(orcidId);
|
||||||
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
|
HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record");
|
||||||
|
httpGet.addHeader("Accept", "application/vnd.orcid+xml");
|
||||||
|
httpGet.addHeader("Authorization", String.format("Bearer %s", token));
|
||||||
|
CloseableHttpResponse response = client.execute(httpGet);
|
||||||
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
data.setStatusCode(statusCode);
|
||||||
|
if (statusCode != 200) {
|
||||||
|
Log
|
||||||
|
.warn(
|
||||||
|
"Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode());
|
||||||
|
return data.toTuple2();
|
||||||
|
}
|
||||||
|
data
|
||||||
|
.setCompressedData(
|
||||||
|
ArgumentApplicationParser.compressArgument(IOUtils.toString(response.getEntity().getContent())));
|
||||||
|
} catch (Throwable e) {
|
||||||
|
Log.warn("Downloading " + orcidId, e.getMessage());
|
||||||
|
data.setErrorMessage(e.getMessage());
|
||||||
|
return data.toTuple2();
|
||||||
|
}
|
||||||
|
return data.toTuple2();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.doiboost.orcid.model;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
|
||||||
|
import com.google.gson.JsonObject;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class DownloadedRecordData implements Serializable {
|
||||||
|
|
||||||
|
private String orcidId;
|
||||||
|
private String statusCode;
|
||||||
|
private String compressedData;
|
||||||
|
private String errorMessage;
|
||||||
|
|
||||||
|
public Tuple2<String, String> toTuple2() {
|
||||||
|
JsonObject data = new JsonObject();
|
||||||
|
data.addProperty("statusCode", getStatusCode());
|
||||||
|
if (getCompressedData() != null) {
|
||||||
|
data.addProperty("compressedData", getCompressedData());
|
||||||
|
}
|
||||||
|
if (getErrorMessage() != null) {
|
||||||
|
data.addProperty("errorMessage", getErrorMessage());
|
||||||
|
}
|
||||||
|
return new Tuple2<>(orcidId, data.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getErrorMessage() {
|
||||||
|
return errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setErrorMessage(String errorMessage) {
|
||||||
|
this.errorMessage = errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOrcidId() {
|
||||||
|
return orcidId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOrcidId(String orcidId) {
|
||||||
|
this.orcidId = orcidId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getStatusCode() {
|
||||||
|
return Integer.parseInt(statusCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStatusCode(int statusCode) {
|
||||||
|
this.statusCode = Integer.toString(statusCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCompressedData() {
|
||||||
|
return compressedData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCompressedData(String compressedData) {
|
||||||
|
this.compressedData = compressedData;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
[{"paramName": "mt","paramLongName": "master","paramDescription": "should be local or yarn","paramRequired": true},
|
[{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path", "paramRequired": true},
|
||||||
{"paramName":"d", "paramLongName":"workingPath_orcid", "paramDescription": "the default work path", "paramRequired": true},
|
{"paramName":"t", "paramLongName":"token", "paramDescription": "token to grant access", "paramRequired": true},
|
||||||
{"paramName":"o", "paramLongName":"outputAuthorsPath", "paramDescription": "the relative folder of the sequencial file to write", "paramRequired": true}
|
{"paramName":"o", "paramLongName":"outputAuthorsPath", "paramDescription": "the relative folder of the sequencial file to write the authors data", "paramRequired": true}
|
||||||
]
|
]
|
|
@ -1,9 +1,13 @@
|
||||||
<workflow-app name="Gen Orcid Authors" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="Gen Orcid Authors" xmlns="uri:oozie:workflow:0.5">
|
||||||
<parameters>
|
<parameters>
|
||||||
<property>
|
<property>
|
||||||
<name>workingPath_activities</name>
|
<name>workingPath</name>
|
||||||
<description>the working dir base path</description>
|
<description>the working dir base path</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>token</name>
|
||||||
|
<description>access token</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
<description>memory for driver process</description>
|
<description>memory for driver process</description>
|
||||||
|
@ -28,7 +32,6 @@
|
||||||
<action name="ResetWorkingPath">
|
<action name="ResetWorkingPath">
|
||||||
<fs>
|
<fs>
|
||||||
<delete path='${workingPath_activities}/authors'/>
|
<delete path='${workingPath_activities}/authors'/>
|
||||||
<mkdir path='${workingPath_activities}/authors'/>
|
|
||||||
</fs>
|
</fs>
|
||||||
<ok to="Gen_Orcid_Authors"/>
|
<ok to="Gen_Orcid_Authors"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -43,10 +46,11 @@
|
||||||
<name>Gen_Orcid_Authors</name>
|
<name>Gen_Orcid_Authors</name>
|
||||||
<class>eu.dnetlib.doiboost.orcid.SparkOrcidGenerateAuthors</class>
|
<class>eu.dnetlib.doiboost.orcid.SparkOrcidGenerateAuthors</class>
|
||||||
<jar>dhp-doiboost-1.2.1-SNAPSHOT.jar</jar>
|
<jar>dhp-doiboost-1.2.1-SNAPSHOT.jar</jar>
|
||||||
<spark-opts>--num-executors 50 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory}
|
<spark-opts>--num-executors 24 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>-mt</arg> <arg>yarn</arg>
|
<arg>-w</arg><arg>${workingPath}/</arg>
|
||||||
<arg>--workingPath_orcid</arg><arg>${workingPath_activities}/</arg>
|
<arg>-o</arg><arg>authors/</arg>
|
||||||
|
<arg>-t</arg><arg>${token}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -2,15 +2,6 @@
|
||||||
package eu.dnetlib.doiboost.orcid;
|
package eu.dnetlib.doiboost.orcid;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
|
||||||
import org.apache.http.client.methods.HttpPost;
|
|
||||||
import org.apache.http.entity.StringEntity;
|
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
|
||||||
import org.apache.http.impl.client.HttpClients;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -19,7 +10,15 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
|
import org.apache.http.client.methods.HttpPost;
|
||||||
|
import org.apache.http.entity.StringEntity;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.impl.client.HttpClients;
|
||||||
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
@ -120,19 +119,24 @@ public class ElasticSearchTest {
|
||||||
String orcidId = String.format("%s", i);
|
String orcidId = String.format("%s", i);
|
||||||
String record = String.format(recordTemplate, orcidId);
|
String record = String.format(recordTemplate, orcidId);
|
||||||
countAll++;
|
countAll++;
|
||||||
if(partial == nTasks) {
|
if (partial == nTasks) {
|
||||||
System.out.println("Waiting for tasks to complete before resubmitting to executor (countAll = "+countAll+") . . . ");
|
System.out
|
||||||
|
.println(
|
||||||
|
"Waiting for tasks to complete before resubmitting to executor (countAll = " + countAll
|
||||||
|
+ ") . . . ");
|
||||||
System.out.println("Getting replies");
|
System.out.println("Getting replies");
|
||||||
long startWait = System.currentTimeMillis();
|
long startWait = System.currentTimeMillis();
|
||||||
for(Future<Integer> res : resList){
|
for (Future<Integer> res : resList) {
|
||||||
if(res.get() == 200) countOk++;
|
if (res.get() == 200)
|
||||||
|
countOk++;
|
||||||
}
|
}
|
||||||
resList.clear();
|
resList.clear();
|
||||||
partial = 0;
|
partial = 0;
|
||||||
System.out.println(". . . Ready to submit again after "+(System.currentTimeMillis() - startWait)+" ms" );
|
System.out
|
||||||
|
.println(". . . Ready to submit again after " + (System.currentTimeMillis() - startWait) + " ms");
|
||||||
}
|
}
|
||||||
partial++;
|
partial++;
|
||||||
Future<Integer> res = executorService.submit( () -> {
|
Future<Integer> res = executorService.submit(() -> {
|
||||||
CloseableHttpResponse responsePPOST = null;
|
CloseableHttpResponse responsePPOST = null;
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -149,20 +153,22 @@ public class ElasticSearchTest {
|
||||||
case 201:
|
case 201:
|
||||||
return statusCode;
|
return statusCode;
|
||||||
default:
|
default:
|
||||||
System.out.println(responsePPOST.getStatusLine().getStatusCode() + ": " + responsePPOST.getStatusLine().getReasonPhrase());
|
System.out
|
||||||
|
.println(
|
||||||
|
responsePPOST.getStatusLine().getStatusCode() + ": "
|
||||||
|
+ responsePPOST.getStatusLine().getReasonPhrase());
|
||||||
System.out.println("Source record causing error: " + record);
|
System.out.println("Source record causing error: " + record);
|
||||||
errors.merge(statusCode, 1, Integer::sum);
|
errors.merge(statusCode, 1, Integer::sum);
|
||||||
return statusCode;
|
return statusCode;
|
||||||
}
|
}
|
||||||
} catch (ConnectException ce) {
|
} catch (ConnectException ce) {
|
||||||
throw ce;
|
throw ce;
|
||||||
}
|
} catch (IOException e) {
|
||||||
catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
errors.merge(-1, 1, Integer::sum);
|
errors.merge(-1, 1, Integer::sum);
|
||||||
}
|
} finally {
|
||||||
finally{
|
if (responsePPOST != null)
|
||||||
if(responsePPOST != null) responsePPOST.close();
|
responsePPOST.close();
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
});
|
});
|
||||||
|
@ -170,16 +176,17 @@ public class ElasticSearchTest {
|
||||||
}
|
}
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
|
|
||||||
//now let's wait for the results. We can block ourselves here: we have nothing else to do
|
// now let's wait for the results. We can block ourselves here: we have nothing else to do
|
||||||
System.out.println("Waiting for responses");
|
System.out.println("Waiting for responses");
|
||||||
for(Future<Integer> res : resList){
|
for (Future<Integer> res : resList) {
|
||||||
if(res.get() == 200) countOk++;
|
if (res.get() == 200)
|
||||||
|
countOk++;
|
||||||
}
|
}
|
||||||
client.close();
|
client.close();
|
||||||
cm.shutdown();
|
cm.shutdown();
|
||||||
|
|
||||||
System.out.println("countOk: "+countOk);
|
System.out.println("countOk: " + countOk);
|
||||||
System.out.println("countAll: "+countAll);
|
System.out.println("countAll: " + countAll);
|
||||||
System.out.println("errors count: "+errors.size());
|
System.out.println("errors count: " + errors.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,136 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.doiboost.orcid;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
|
import org.apache.http.client.methods.HttpGet;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.impl.client.HttpClients;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
|
public class OrcidClientTest {
|
||||||
|
final String orcidId = "0000-0001-7291-3210";
|
||||||
|
final int REQ_LIMIT = 24;
|
||||||
|
final int REQ_MAX_TEST = 100;
|
||||||
|
final int RECORD_DOWNLOADED_COUNTER_LOG_INTERVAL = 10;
|
||||||
|
final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||||
|
final String toRetrieveDate = "2020-05-06 23:59:46.031145";
|
||||||
|
String toNotRetrieveDate = "2019-09-29 23:59:59.000000";
|
||||||
|
String lastUpdate = "2019-09-30 00:00:00";
|
||||||
|
String shortDate = "2020-05-06 16:06:11";
|
||||||
|
|
||||||
|
// curl -i -H "Accept: application/vnd.orcid+xml"
|
||||||
|
// -H 'Authorization: Bearer 78fdb232-7105-4086-8570-e153f4198e3d'
|
||||||
|
// 'https://api.orcid.org/v3.0/0000-0001-7291-3210/record'
|
||||||
|
|
||||||
|
public String testDownloadRecord(String orcidId) throws Exception {
|
||||||
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
|
HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record");
|
||||||
|
httpGet.addHeader("Accept", "application/vnd.orcid+xml");
|
||||||
|
httpGet.addHeader("Authorization", "Bearer 78fdb232-7105-4086-8570-e153f4198e3d");
|
||||||
|
CloseableHttpResponse response = client.execute(httpGet);
|
||||||
|
if (response.getStatusLine().getStatusCode() != 200) {
|
||||||
|
System.out
|
||||||
|
.println("Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode());
|
||||||
|
}
|
||||||
|
return IOUtils.toString(response.getEntity().getContent());
|
||||||
|
} catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return new String("");
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Test
|
||||||
|
public void testLambdaFileParser() throws Exception {
|
||||||
|
try (BufferedReader br = new BufferedReader(
|
||||||
|
new InputStreamReader(this.getClass().getResourceAsStream("last_modified.csv")))) {
|
||||||
|
String line;
|
||||||
|
int counter = 0;
|
||||||
|
int nReqTmp = 0;
|
||||||
|
long startDownload = System.currentTimeMillis();
|
||||||
|
long startReqTmp = System.currentTimeMillis();
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
counter++;
|
||||||
|
// skip headers line
|
||||||
|
if (counter == 1) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String[] values = line.split(",");
|
||||||
|
List<String> recordInfo = Arrays.asList(values);
|
||||||
|
testDownloadRecord(recordInfo.get(0));
|
||||||
|
long endReq = System.currentTimeMillis();
|
||||||
|
nReqTmp++;
|
||||||
|
if (nReqTmp == REQ_LIMIT) {
|
||||||
|
long reqSessionDuration = endReq - startReqTmp;
|
||||||
|
if (reqSessionDuration <= 1000) {
|
||||||
|
System.out
|
||||||
|
.println(
|
||||||
|
"\nreqSessionDuration: " + reqSessionDuration + " nReqTmp: " + nReqTmp + " wait ....");
|
||||||
|
Thread.sleep(1000 - reqSessionDuration);
|
||||||
|
} else {
|
||||||
|
nReqTmp = 0;
|
||||||
|
startReqTmp = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (counter > REQ_MAX_TEST) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if ((counter % RECORD_DOWNLOADED_COUNTER_LOG_INTERVAL) == 0) {
|
||||||
|
System.out.println("Current record downloaded: " + counter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
long endDownload = System.currentTimeMillis();
|
||||||
|
long downloadTime = endDownload - startDownload;
|
||||||
|
System.out.println("Download time: " + ((downloadTime / 1000) / 60) + " minutes");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Test
|
||||||
|
public void getRecordDatestamp() throws ParseException {
|
||||||
|
Date toRetrieveDateDt = new SimpleDateFormat(DATE_FORMAT).parse(toRetrieveDate);
|
||||||
|
Date toNotRetrieveDateDt = new SimpleDateFormat(DATE_FORMAT).parse(toNotRetrieveDate);
|
||||||
|
Date lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate);
|
||||||
|
assertTrue(toRetrieveDateDt.after(lastUpdateDt));
|
||||||
|
assertTrue(!toNotRetrieveDateDt.after(lastUpdateDt));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDate(String value) throws ParseException {
|
||||||
|
System.out.println(value.toString());
|
||||||
|
if (value.length() != 19) {
|
||||||
|
value = value.substring(0, 19);
|
||||||
|
}
|
||||||
|
Date valueDt = new SimpleDateFormat(DATE_FORMAT).parse(value);
|
||||||
|
System.out.println(valueDt.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Test
|
||||||
|
public void testModifiedDate() throws ParseException {
|
||||||
|
testDate(toRetrieveDate);
|
||||||
|
testDate(toNotRetrieveDate);
|
||||||
|
testDate(shortDate);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Test
|
||||||
|
public void testReadBase64CompressedRecord() throws Exception {
|
||||||
|
final String base64CompressedRecord = IOUtils
|
||||||
|
.toString(getClass().getResourceAsStream("0000-0001-6645-509X.compressed.base64"));
|
||||||
|
final String recordFromSeqFile = ArgumentApplicationParser.decompressValue(base64CompressedRecord);
|
||||||
|
System.out.println(recordFromSeqFile);
|
||||||
|
final String downloadedRecord = testDownloadRecord("0000-0001-6645-509X");
|
||||||
|
assertTrue(recordFromSeqFile.equals(downloadedRecord));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
H4sIAAAAAAAAAO1a227bOBB9z1cIepd18SW24aho0wTbAgEWjRdY9I2RaJtbSdSSkhP165eURIm6kHa2SbCLNkBiWDxzhhxyZg7tbN49xZFxhIQinFyZ7sQxDZgEOETJ/sr8Y3trLU2DZiAJQYQTeGUWkJrv/IsNgQEm4bp6MVKQHa5M22E/Fvt1rcViNrfmzupP02AOErpGSQZJAqIr85Bl6dq2Hx8fJ5gEKGR/93ZCbYEQFjDMA5CV01KZNBBhEyKaoSTQW0mgxg6mbCUgg6HGrMEIK5wdILESEEO1VYsRVjGMH1i8DyhVW7WYJhqEYKKJBB8W2ADHsS4A1bhAV1uoRlfjAp2yaWG2S1YIM4AiqrbrIwXDN1g8ah3WgGblMbPWrJwPN9in6gxZKIRJhnYI6mI2BAueXZ5UGaCyrQFNVAjcQcISB+oC0oKEHQhDAqnGpga0WXRE7ABaKaZIf8j7SMHAIvtNbcVHBfLA0gSTQg2uAe0+pREuYhZK3WYJjLD6OwcRC/2pTO/AhC2F5IgCTfLVgO7ZPXVim71hFYLFEOm2tMW02UQhIAFP+pxojm0X186QvSfwiOCjbpoNSNg95JFmV/lof36MgOKc6KI3gJr+hcF+NlX9WJdgKXmqURmRE+RzdsroW+qRLrGxJYsBDe8uvs6qBAzMDphmfuO2AZePq4XY2pVspISVM1zyJCMiHIAI+jDZ2COPa4dayk2dUSL1JEdiJCCwTAErhtkBh/5d2SiskonAcGOrgEMqmj/EiPK+b4Wsq/me464sZ2l53tadrmeLtXc58ZbLry1n32IQ8QjQzIqZeGBBDAWrx7Ztbrnu1puu59P11JksPfdrE/sRm5FlRwDFMPQzkkNpjfXTIZ4Jmoqv7A49s96gxjolKAak0LN0QfU+j+7kpiowdR3SiCZRieSTVplyIWEcEUUPKEIZK85p/hChwKzJxgRYSyJvVXk+2k0abv187rWb1EGP8o1u/QlW3dZLi24lxHqPjjAp1RT1twgkRb4Z6IwO6ATfDsQoKkqs/xmBETIZ0e6GLW2H9LgVe5I2pLqNlmCmLTF120Ovq2gZe9AOa3lEK0Gl5ag0lWxZ6xAhWPSLEqJFJqhFnVB/WnuB6c59qNbG5J5+XSN44aTZ0+qlftg2eEkPWDSPecprY9Aqg2fUyZnlTLfObD2brZ3pZHm5OLNOStOUbjfaWMi47la3XM39Sh/VBqXkaWTfiWPXwFRMte7W0giMiqMvjbVkA7CKtb2yafkkmIpJ0ndaKhmn4uroZi1bF6niG2jCs2pRi1bx1kpdyyYwKg5+edESlABFP3zplOxPbk9wnnaHX9u9zC9VPjpEKZDjQAXYyooU+iFGzfwGg8+iO4Ioh77rTFzXWdnvr69v7u8nPCYTb7X0PNcZ9VNZPctRgknMjv53GBoZAQlF5Q2Wiz2zcQ8Cdu7oafct1/PmwDp1c1FiISyvSc9dOud4llMCoyrZWTHyKYx2o7Qd1PjJGTEbOYkjqJGjuOFJWqZy22XzzApwyG6qly67kCxWjnkqy+0WOSaWWe9LI1BYKAnhE1PNpj4lelqZp+XUmjpbz1szYTt3JjP38hyt3Od9raSXfVR19/TBqHBWEPHjr8192Wr8gl+RSJuzWi5nlrtyp+P3fJ2H3t1/yNS9++uoTn4eMGpsPztAvZCWd4Rrgillt/Q+XfcCoXGsAJXZkqEsOmOLK9g9K1CR9ZFdnBN+kzdu2WnNCTTuQEbQk3HNMp3VvlIXGnflZwfGDhPjI6y+FDC+wBQyJnbHMm7Ze0iMO3yElba7JTg2biIYZATzzzXSA4jwnoDYuEd7lvK0WZRmyhv71KLOb2oK9Hnn5YWam4ryVRqcytlbNznVPF690akcv1SzK/nPangq5An99W8jpIxKXSP4Gf2LlRI+CUAyFERQZJry+DZFuOyb1eeJ6pYjWxRM95fNrJlf+UQfpPPcVOsRS6nKxKebmxvjfXl+60V1x0fUyEBn9LS7rRfvP6rt64/GVlt3vnYXa8ebLJz5T6jt53ObB8OeLl2m2WZvJurP8fviav4cpz+BjF+4znzqzd3TMr5FvryMP5GBPyjjXyC/ZR+/ZPwvGd+Rzh8IQIl1jWOWVkyDf+L/PLMDATSuDyBJYGTdQ67DuYq/ZxUwg/vC+AAoq4fsyXuWtwVF1MA74+bIA/GFlwc2+BHSIgkOBCfoe1kvjC1OuYRPD4WBSi78DRq/szGu+H/p+ddqaiovb9bYVBN4veam8vj/l+6q0PwnNbu7OkOzy3bslxf3ZWNWPThpF4LC91or/va17gefq3e83v0GQZQdAkCgcZPsUQIhQcn+DW4NnbHyqwjxxaP2S0b/YmN3/tnSv/gH9+klwrUpAAA=
|
Loading…
Reference in New Issue