fixed missing parameter on download update

This commit is contained in:
sandro.labruzzo 2024-01-12 16:18:20 +01:00
parent 859babf722
commit e328bc0ade
5 changed files with 259 additions and 259 deletions

View File

@ -1,6 +1,14 @@
package eu.dnetlib.dhp.collection.orcid; package eu.dnetlib.dhp.collection.orcid;
import eu.dnetlib.dhp.common.collection.HttpClientParams; import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.BlockingQueue;
import javax.swing.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
@ -10,18 +18,13 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.swing.*; import eu.dnetlib.dhp.common.collection.HttpClientParams;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.BlockingQueue;
public class ORCIDWorker extends Thread { public class ORCIDWorker extends Thread {
final static Logger log = LoggerFactory.getLogger(ORCIDWorker.class); final static Logger log = LoggerFactory.getLogger(ORCIDWorker.class);
public static String JOB_COMPLETE="JOB_COMPLETE"; public static String JOB_COMPLETE = "JOB_COMPLETE";
private static final String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)"; private static final String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)";
@ -42,8 +45,9 @@ public class ORCIDWorker extends Thread {
return new ORCIDWorkerBuilder(); return new ORCIDWorkerBuilder();
} }
public ORCIDWorker(String id, BlockingQueue<String> myqueue, SequenceFile.Writer employments, SequenceFile.Writer summary, SequenceFile.Writer works, String token) { public ORCIDWorker(String id, BlockingQueue<String> myqueue, SequenceFile.Writer employments,
this.id= id; SequenceFile.Writer summary, SequenceFile.Writer works, String token) {
this.id = id;
this.queue = myqueue; this.queue = myqueue;
this.employments = employments; this.employments = employments;
this.summary = summary; this.summary = summary;
@ -51,18 +55,20 @@ public class ORCIDWorker extends Thread {
this.token = token; this.token = token;
} }
public static String retrieveURL(final String id, final String apiUrl, String token) {
public static String retrieveURL(final String id,final String apiUrl, String token) {
try { try {
final HttpURLConnection urlConn = getHttpURLConnection(apiUrl, token); final HttpURLConnection urlConn = getHttpURLConnection(apiUrl, token);
if (urlConn.getResponseCode()>199 && urlConn.getResponseCode()<300) { if (urlConn.getResponseCode() > 199 && urlConn.getResponseCode() < 300) {
InputStream input = urlConn.getInputStream(); InputStream input = urlConn.getInputStream();
return IOUtils.toString(input); return IOUtils.toString(input);
} else { } else {
log.error("Thread {} UNABLE TO DOWNLOAD FROM THIS URL {} , status code {}",id, apiUrl,urlConn.getResponseCode()); log
.error(
"Thread {} UNABLE TO DOWNLOAD FROM THIS URL {} , status code {}", id, apiUrl,
urlConn.getResponseCode());
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Thread {} Error on retrieving URL {} {}",id,apiUrl, e); log.error("Thread {} Error on retrieving URL {} {}", id, apiUrl, e);
} }
return null; return null;
} }
@ -86,39 +92,38 @@ public class ORCIDWorker extends Thread {
private static String generateWorksURL(final String orcidId) { private static String generateWorksURL(final String orcidId) {
return "https://api.orcid.org/v3.0/" + orcidId + "/works"; return "https://api.orcid.org/v3.0/" + orcidId + "/works";
} }
private static String generateEmploymentsURL(final String orcidId) { private static String generateEmploymentsURL(final String orcidId) {
return "https://api.orcid.org/v3.0/" + orcidId + "/employments"; return "https://api.orcid.org/v3.0/" + orcidId + "/employments";
} }
private static void writeResultToSequenceFile(String id, String url, String token, String orcidId,
SequenceFile.Writer file) throws IOException {
final String response = retrieveURL(id, url, token);
if (response != null) {
private static void writeResultToSequenceFile(String id, String url, String token, String orcidId, SequenceFile.Writer file) throws IOException { if (orcidId == null || response == null) {
final String response = retrieveURL(id, url,token); log.error("Thread {} {} {}", id, orcidId, response);
if (response!= null) {
if(orcidId==null || response ==null){
log.error("Thread {} {} {}",id, orcidId, response);
throw new RuntimeException("null items "); throw new RuntimeException("null items ");
} }
if(file==null) { if (file == null) {
log.error("Thread {} file is null for {} URL:{}",id, url, orcidId); log.error("Thread {} file is null for {} URL:{}", id, url, orcidId);
} } else
else file.append(new Text(orcidId), new Text(response));
file.append(new Text(orcidId),new Text(response));
} }
} }
@Override @Override
public void run() { public void run() {
final Text key = new Text(); final Text key = new Text();
final Text value = new Text(); final Text value = new Text();
long start; long start;
long total_time; long total_time;
String orcidId=""; String orcidId = "";
int requests =0; int requests = 0;
if(summary==null || employments==null || works == null) if (summary == null || employments == null || works == null)
throw new RuntimeException("Null files"); throw new RuntimeException("Null files");
while (!hasComplete) { while (!hasComplete) {
@ -131,42 +136,43 @@ public class ORCIDWorker extends Thread {
hasComplete = true; hasComplete = true;
} else { } else {
start = System.currentTimeMillis(); start = System.currentTimeMillis();
writeResultToSequenceFile(id, generateSummaryURL(orcidId), token,orcidId, summary); writeResultToSequenceFile(id, generateSummaryURL(orcidId), token, orcidId, summary);
total_time = System.currentTimeMillis() - start; total_time = System.currentTimeMillis() - start;
requests++; requests++;
if (total_time < 1000) { if (total_time < 1000) {
//I know making a sleep on a thread is bad, but we need to stay to 24 requests per seconds, hence // I know making a sleep on a thread is bad, but we need to stay to 24 requests per seconds,
//the time between two http request in a thread must be 1 second // hence
// the time between two http request in a thread must be 1 second
Thread.sleep(1000L - total_time); Thread.sleep(1000L - total_time);
} }
start = System.currentTimeMillis(); start = System.currentTimeMillis();
writeResultToSequenceFile(id, generateWorksURL(orcidId),token,orcidId, works); writeResultToSequenceFile(id, generateWorksURL(orcidId), token, orcidId, works);
total_time = System.currentTimeMillis() - start; total_time = System.currentTimeMillis() - start;
requests++; requests++;
if (total_time < 1000) { if (total_time < 1000) {
//I know making a sleep on a thread is bad, but we need to stay to 24 requests per seconds, hence // I know making a sleep on a thread is bad, but we need to stay to 24 requests per seconds,
//the time between two http request in a thread must be 1 second // hence
// the time between two http request in a thread must be 1 second
Thread.sleep(1000L - total_time); Thread.sleep(1000L - total_time);
} }
start = System.currentTimeMillis(); start = System.currentTimeMillis();
writeResultToSequenceFile(id, generateEmploymentsURL(orcidId),token,orcidId, employments); writeResultToSequenceFile(id, generateEmploymentsURL(orcidId), token, orcidId, employments);
total_time = System.currentTimeMillis() - start; total_time = System.currentTimeMillis() - start;
requests++; requests++;
if (total_time < 1000) { if (total_time < 1000) {
//I know making a sleep on a thread is bad, but we need to stay to 24 requests per seconds, hence // I know making a sleep on a thread is bad, but we need to stay to 24 requests per seconds,
//the time between two http request in a thread must be 1 second // hence
// the time between two http request in a thread must be 1 second
Thread.sleep(1000L - total_time); Thread.sleep(1000L - total_time);
} }
if (requests %30 ==0) if (requests % 30 == 0) {
{ log.info("Thread {} Downloaded {}", id, requests);
log.info("Thread {} Downloaded {}",id, requests);
} }
} }
} catch (Throwable e) { } catch (Throwable e) {
log.error("Thread {} Unable to save ORICD: {} item error",id, orcidId,e); log.error("Thread {} Unable to save ORICD: {} item error", id, orcidId, e);
} }
@ -179,13 +185,11 @@ public class ORCIDWorker extends Thread {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
log.info("Thread {} COMPLETE ", id); log.info("Thread {} COMPLETE ", id);
log.info("Thread {} Downloaded {}", id, requests); log.info("Thread {} Downloaded {}", id, requests);
} }
public static class ORCIDWorkerBuilder { public static class ORCIDWorkerBuilder {
private String id; private String id;
@ -197,7 +201,7 @@ public class ORCIDWorker extends Thread {
private String token; private String token;
public ORCIDWorkerBuilder withId(final String id) { public ORCIDWorkerBuilder withId(final String id) {
this.id =id; this.id = id;
return this; return this;
} }
@ -206,7 +210,6 @@ public class ORCIDWorker extends Thread {
return this; return this;
} }
public ORCIDWorkerBuilder withSummary(final SequenceFile.Writer sequenceFile) { public ORCIDWorkerBuilder withSummary(final SequenceFile.Writer sequenceFile) {
this.summary = sequenceFile; this.summary = sequenceFile;
return this; return this;
@ -227,16 +230,13 @@ public class ORCIDWorker extends Thread {
return this; return this;
} }
public ORCIDWorker build() { public ORCIDWorker build() {
if (this.summary== null || this.works==null || this.employments == null || StringUtils.isEmpty(token) || queue == null) if (this.summary == null || this.works == null || this.employments == null || StringUtils.isEmpty(token)
|| queue == null)
throw new RuntimeException("Unable to build missing required params"); throw new RuntimeException("Unable to build missing required params");
return new ORCIDWorker(id, queue,employments,summary,works,token); return new ORCIDWorker(id, queue, employments, summary, works, token);
} }
} }
} }

View File

@ -1,15 +1,17 @@
package eu.dnetlib.dhp.collection.orcid; package eu.dnetlib.dhp.collection.orcid;
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
import java.io.*; import java.io.*;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
@ -23,17 +25,25 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
public class OrcidGetUpdatesFile { public class OrcidGetUpdatesFile {
private static Logger log = LoggerFactory.getLogger(OrcidGetUpdatesFile.class); private static Logger log = LoggerFactory.getLogger(OrcidGetUpdatesFile.class);
public static void main(String[] args) throws IOException { public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
"/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json"); IOUtils
.toString(
Objects
.requireNonNull(
OrcidGetUpdatesFile.class
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json")))
);
parser.parseArgument(args);
final String namenode = parser.get("namenode"); final String namenode = parser.get("namenode");
log.info("got variable namenode: {}", namenode); log.info("got variable namenode: {}", namenode);
@ -41,17 +51,17 @@ public class OrcidGetUpdatesFile {
final String targetPath = parser.get("targetPath"); final String targetPath = parser.get("targetPath");
log.info("got variable targetPath: {}", targetPath); log.info("got variable targetPath: {}", targetPath);
final String apiURL = parser.get("apiURL"); final String apiURL = parser.get("apiURL");
log.info("got variable apiURL: {}", apiURL); log.info("got variable apiURL: {}", apiURL);
final String accessToken = parser.get("accessToken"); final String accessToken = parser.get("accessToken");
log.info("got variable accessToken: {}", accessToken); log.info("got variable accessToken: {}", accessToken);
System.out.println("namenode = " + namenode);
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(namenode)); final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(namenode));
new OrcidGetUpdatesFile().readTar(fileSystem, accessToken, apiURL, targetPath, "2023-09-30");
} }
@ -64,9 +74,10 @@ public class OrcidGetUpdatesFile {
SequenceFile.Writer.valueClass(Text.class)); SequenceFile.Writer.valueClass(Text.class));
} }
private ORCIDWorker createWorker(final String id, final String targetPath, final BlockingQueue<String> queue,
private ORCIDWorker createWorker(final String id, final String targetPath, final BlockingQueue<String> queue, final String accessToken, FileSystem fileSystem) throws Exception { final String accessToken, FileSystem fileSystem) throws Exception {
return ORCIDWorker.builder() return ORCIDWorker
.builder()
.withId(id) .withId(id)
.withEmployments(createFile(new Path(String.format("%s/employments_%s", targetPath, id)), fileSystem)) .withEmployments(createFile(new Path(String.format("%s/employments_%s", targetPath, id)), fileSystem))
.withSummary(createFile(new Path(String.format("%s/summary_%s", targetPath, id)), fileSystem)) .withSummary(createFile(new Path(String.format("%s/summary_%s", targetPath, id)), fileSystem))
@ -76,15 +87,14 @@ public class OrcidGetUpdatesFile {
.build(); .build();
} }
public void readTar(FileSystem fileSystem, final String accessToken, final String apiURL, final String targetPath,
final String startDate) throws Exception {
public void readTar(FileSystem fileSystem, final String accessToken, final String apiURL, final String targetPath, final String startDate ) throws Exception {
final HttpURLConnection urlConn = (HttpURLConnection) new URL(apiURL).openConnection(); final HttpURLConnection urlConn = (HttpURLConnection) new URL(apiURL).openConnection();
final HttpClientParams clientParams = new HttpClientParams(); final HttpClientParams clientParams = new HttpClientParams();
urlConn.setInstanceFollowRedirects(false); urlConn.setInstanceFollowRedirects(false);
urlConn.setReadTimeout(clientParams.getReadTimeOut() * 1000); urlConn.setReadTimeout(clientParams.getReadTimeOut() * 1000);
urlConn.setConnectTimeout(clientParams.getConnectTimeOut() * 1000); urlConn.setConnectTimeout(clientParams.getConnectTimeOut() * 1000);
if (urlConn.getResponseCode()>199 && urlConn.getResponseCode()<300) { if (urlConn.getResponseCode() > 199 && urlConn.getResponseCode() < 300) {
InputStream input = urlConn.getInputStream(); InputStream input = urlConn.getInputStream();
TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream( TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream(
new BufferedInputStream( new BufferedInputStream(
@ -93,19 +103,23 @@ public class OrcidGetUpdatesFile {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3000); BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3000);
final List<ORCIDWorker> workers = new ArrayList<>(); final List<ORCIDWorker> workers = new ArrayList<>();
for (int i = 0; i <20; i++) { for (int i = 0; i < 22; i++) {
workers.add(createWorker(""+i,targetPath,queue,accessToken, fileSystem)); workers.add(createWorker("" + i, targetPath, queue, accessToken, fileSystem));
} }
workers.forEach(Thread::start); workers.forEach(Thread::start);
while ((entry = tais.getNextTarEntry()) != null) { while ((entry = tais.getNextTarEntry()) != null) {
if (entry.isFile()) { if (entry.isFile()) {
BufferedReader br = new BufferedReader(new InputStreamReader(tais)); BufferedReader br = new BufferedReader(new InputStreamReader(tais));
System.out.println(br.readLine()); System.out.println(br.readLine());
br.lines().map(l -> l.split(",")).filter(s -> StringUtils.compare(s[3].substring(0, 10), startDate) > 0).map(s->s[0]).limit(200).forEach(s -> { br
.lines()
.map(l -> l.split(","))
.filter(s -> StringUtils.compare(s[3].substring(0, 10), startDate) > 0)
.map(s -> s[0])
.forEach(s -> {
try { try {
log.info("Adding item "); log.info("Adding item ");
queue.put(s); queue.put(s);
@ -122,12 +136,7 @@ public class OrcidGetUpdatesFile {
worker.join(); worker.join();
} }
} }
} }
} }

View File

@ -1,5 +1,4 @@
[ [ {
{
"paramName": "n", "paramName": "n",
"paramLongName": "namenode", "paramLongName": "namenode",
"paramDescription": "the Name Node URI", "paramDescription": "the Name Node URI",
@ -23,5 +22,4 @@
"paramDescription": "the accessToken to contact API", "paramDescription": "the accessToken to contact API",
"paramRequired": true "paramRequired": true
} }
] ]

View File

@ -6,12 +6,12 @@
</property> </property>
<property> <property>
<name>apiURL</name> <name>apiURL</name>
<description>The URL of the update CSV list </description>
<value>http://74804fb637bd8e2fba5b-e0a029c2f87486cddec3b416996a6057.r3.cf1.rackcdn.com/last_modified.csv.tar</value> <value>http://74804fb637bd8e2fba5b-e0a029c2f87486cddec3b416996a6057.r3.cf1.rackcdn.com/last_modified.csv.tar</value>
<description>The URL of the update CSV list </description>
</property> </property>
<property> <property>
<name>accessToken</name> <name>accessToken</name>
<description>The access tocken</description> <description>The access token</description>
</property> </property>
</parameters> </parameters>

View File

@ -124,16 +124,9 @@ public class DownloadORCIDTest {
// @Test // @Test
// public void testReadTar() throws Exception { // public void testReadTar() throws Exception {
//// new OrcidGetUpdatesFile().readTar(); // OrcidGetUpdatesFile.main(new String[] {
// // "--namenode", "puppa"
// Configuration conf = new Configuration(); // });
// FileSystem fs = FileSystem.get(URI.create("file:///"), conf);
// final String token ="78fdb232-7105-4086-8570-e153f4198e3d";
//
// new OrcidGetUpdatesFile().readTar(fs,token, "http://74804fb637bd8e2fba5b-e0a029c2f87486cddec3b416996a6057.r3.cf1.rackcdn.com/last_modified.csv.tar", "file:///Users/sandro/orcid","2023-09-30");
//
//
//
// //
// } // }