diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java new file mode 100644 index 0000000000..9560c36a2f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/ORCIDWorker.java @@ -0,0 +1,236 @@ +package eu.dnetlib.dhp.collection.orcid; + +import eu.dnetlib.dhp.common.collection.HttpClientParams; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.http.HttpHeaders; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.swing.*; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.concurrent.BlockingQueue; + +public class ORCIDWorker extends Thread { + + final static Logger log = LoggerFactory.getLogger(ORCIDWorker.class); + + public static String JOB_COMPLETE="JOB_COMPLETE"; + + private static final String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)"; + + private final BlockingQueue queue; + + private boolean hasComplete = false; + + private final SequenceFile.Writer employments; + + private final SequenceFile.Writer summary; + private final SequenceFile.Writer works; + + private final String token; + + private final String id; + + public static ORCIDWorkerBuilder builder() { + return new ORCIDWorkerBuilder(); + } + + public ORCIDWorker(String id, BlockingQueue myqueue, SequenceFile.Writer employments, SequenceFile.Writer summary, SequenceFile.Writer works, String token) { + this.id= id; + this.queue = myqueue; + this.employments = employments; + this.summary = summary; + this.works = works; + this.token = token; + } + + + public static String retrieveURL(final String id,final String apiUrl, String token) { + try { + final HttpURLConnection urlConn = getHttpURLConnection(apiUrl, token); + if (urlConn.getResponseCode()>199 && urlConn.getResponseCode()<300) { + InputStream input = urlConn.getInputStream(); + return IOUtils.toString(input); + } else { + log.error("Thread {} UNABLE TO DOWNLOAD FROM THIS URL {} , status code {}",id, apiUrl,urlConn.getResponseCode()); + } + } catch (Exception e) { + log.error("Thread {} Error on retrieving URL {} {}",id,apiUrl, e); + } + return null; + } + + @NotNull + private static HttpURLConnection getHttpURLConnection(String apiUrl, String token) throws IOException { + final HttpURLConnection urlConn = (HttpURLConnection) new URL(apiUrl).openConnection(); + final HttpClientParams clientParams = new HttpClientParams(); + urlConn.setInstanceFollowRedirects(false); + urlConn.setReadTimeout(clientParams.getReadTimeOut() * 1000); + urlConn.setConnectTimeout(clientParams.getConnectTimeOut() * 1000); + urlConn.addRequestProperty(HttpHeaders.USER_AGENT, userAgent); + urlConn.addRequestProperty(HttpHeaders.AUTHORIZATION, String.format("Bearer %s", token)); + return urlConn; + } + + private static String generateSummaryURL(final String orcidId) { + return "https://api.orcid.org/v3.0/" + orcidId + "/record"; + } + + private static String generateWorksURL(final String orcidId) { + return "https://api.orcid.org/v3.0/" + orcidId + "/works"; + } + private static String generateEmploymentsURL(final String orcidId) { + return "https://api.orcid.org/v3.0/" + orcidId + "/employments"; + } + + + private static void writeResultToSequenceFile(String id, String url, String token, String orcidId, SequenceFile.Writer file) throws IOException { + final String response = retrieveURL(id, url,token); + if (response!= null) { + + if(orcidId==null || response ==null){ + log.error("Thread {} {} {}",id, orcidId, response); + throw new RuntimeException("null items "); + } + + if(file==null) { + log.error("Thread {} file is null for {} URL:{}",id, url, orcidId); + } + else + file.append(new Text(orcidId),new Text(response)); + } + + } + + + @Override + public void run() { + final Text key = new Text(); + final Text value = new Text(); + long start; + long total_time; + String orcidId=""; + int requests =0; + if(summary==null || employments==null || works == null) + throw new RuntimeException("Null files"); + + while (!hasComplete) { + try { + + orcidId = queue.take(); + + if (orcidId.equalsIgnoreCase(JOB_COMPLETE)) { + queue.put(orcidId); + hasComplete = true; + } else { + start = System.currentTimeMillis(); + writeResultToSequenceFile(id, generateSummaryURL(orcidId), token,orcidId, summary); + total_time = System.currentTimeMillis() - start; + requests++; + if (total_time < 1000) { + Thread.sleep(1000L - total_time); + } + start = System.currentTimeMillis(); + writeResultToSequenceFile(id, generateWorksURL(orcidId),token,orcidId, works); + total_time = System.currentTimeMillis() - start; + requests++; + if (total_time < 1000) { + Thread.sleep(1000L - total_time); + } + start = System.currentTimeMillis(); + writeResultToSequenceFile(id, generateEmploymentsURL(orcidId),token,orcidId, employments); + total_time = System.currentTimeMillis() - start; + requests++; + if (total_time < 1000) { + Thread.sleep(1000L - total_time); + } + if (requests %30 ==0) + { + log.info("Thread {} Downloaded {}",id, requests); + } + } + + + } catch (Throwable e) { + + log.error("Thread {} Unable to save ORICD: {} item error",id, orcidId,e); + + } + + } + try { + works.close(); + summary.close(); + employments.close(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + + log.info("Thread {} COMPLETE ", id); + log.info("Thread {} Downloaded {}", id, requests); + + } + + + public static class ORCIDWorkerBuilder { + + private String id; + private SequenceFile.Writer employments; + private SequenceFile.Writer summary; + private SequenceFile.Writer works; + private BlockingQueue queue; + + private String token; + + public ORCIDWorkerBuilder withId(final String id) { + this.id =id; + return this; + } + + public ORCIDWorkerBuilder withEmployments(final SequenceFile.Writer sequenceFile) { + this.employments = sequenceFile; + return this; + } + + + public ORCIDWorkerBuilder withSummary(final SequenceFile.Writer sequenceFile) { + this.summary = sequenceFile; + return this; + } + + public ORCIDWorkerBuilder withWorks(final SequenceFile.Writer sequenceFile) { + this.works = sequenceFile; + return this; + } + + public ORCIDWorkerBuilder withAccessToken(final String accessToken) { + this.token = accessToken; + return this; + } + + public ORCIDWorkerBuilder withBlockingQueue(final BlockingQueue queue) { + this.queue = queue; + return this; + } + + + public ORCIDWorker build() { + if (this.summary== null || this.works==null || this.employments == null || StringUtils.isEmpty(token) || queue == null) + throw new RuntimeException("Unable to build missing required params"); + return new ORCIDWorker(id, queue,employments,summary,works,token); + } + + + + } + + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java new file mode 100644 index 0000000000..3bfe72328c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.java @@ -0,0 +1,133 @@ + +package eu.dnetlib.dhp.collection.orcid; + +import java.io.*; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import eu.dnetlib.dhp.common.collection.HttpClientParams; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration; + +public class OrcidGetUpdatesFile { + + private static Logger log = LoggerFactory.getLogger(OrcidGetUpdatesFile.class); + + public static void main(String[] args) throws IOException { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + "/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json"); + + final String namenode = parser.get("namenode"); + log.info("got variable namenode: {}", namenode); + + final String targetPath = parser.get("targetPath"); + log.info("got variable targetPath: {}", targetPath); + + //http://74804fb637bd8e2fba5b-e0a029c2f87486cddec3b416996a6057.r3.cf1.rackcdn.com/last_modified.csv.tar + final String apiURL = parser.get("apiURL"); + log.info("got variable apiURL: {}", apiURL); + + final String accessToken = parser.get("accessToken"); + log.info("got variable accessToken: {}", accessToken); + + + + final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(namenode)); + + + } + + private SequenceFile.Writer createFile(Path aPath, FileSystem fileSystem) throws IOException { + return SequenceFile + .createWriter( + fileSystem.getConf(), + SequenceFile.Writer.file(aPath), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class)); + } + + + private ORCIDWorker createWorker(final String id, final String targetPath, final BlockingQueue queue, final String accessToken, FileSystem fileSystem) throws Exception { + return ORCIDWorker.builder() + .withId(id) + .withEmployments(createFile(new Path(String.format("%s/employments_%s", targetPath, id)), fileSystem)) + .withSummary(createFile(new Path(String.format("%s/summary_%s", targetPath, id)), fileSystem)) + .withWorks(createFile(new Path(String.format("%s/works_%s", targetPath, id)), fileSystem)) + .withAccessToken(accessToken) + .withBlockingQueue(queue) + .build(); + } + + + + public void readTar(FileSystem fileSystem, final String accessToken, final String apiURL, final String targetPath, final String startDate ) throws Exception { + final HttpURLConnection urlConn = (HttpURLConnection) new URL(apiURL).openConnection(); + final HttpClientParams clientParams = new HttpClientParams(); + urlConn.setInstanceFollowRedirects(false); + urlConn.setReadTimeout(clientParams.getReadTimeOut() * 1000); + urlConn.setConnectTimeout(clientParams.getConnectTimeOut() * 1000); + if (urlConn.getResponseCode()>199 && urlConn.getResponseCode()<300) { + InputStream input = urlConn.getInputStream(); + TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream( + new BufferedInputStream( + input))); + TarArchiveEntry entry; + + BlockingQueue queue = new ArrayBlockingQueue(3000); + final List workers = new ArrayList<>(); + for (int i = 0; i <20; i++) { + workers.add(createWorker(""+i,targetPath,queue,accessToken, fileSystem)); + } + workers.forEach(Thread::start); + + + while ((entry = tais.getNextTarEntry()) != null) { + + if (entry.isFile()) { + + BufferedReader br = new BufferedReader(new InputStreamReader(tais)); + System.out.println(br.readLine()); + br.lines().map(l -> l.split(",")).filter(s -> StringUtils.compare(s[3].substring(0, 10), startDate) > 0).map(s->s[0]).limit(200).forEach(s -> { + try { + log.info("Adding item "); + queue.put(s); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + queue.put(ORCIDWorker.JOB_COMPLETE); + + } + } + + for (ORCIDWorker worker : workers) { + worker.join(); + } + + + } + + + + + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json new file mode 100644 index 0000000000..dbadf86cbb --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json @@ -0,0 +1,27 @@ +[ + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the Name Node URI", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the target PATH where download the files", + "paramRequired": true + }, + { + "paramName": "a", + "paramLongName": "apiURL", + "paramDescription": "the URL to download the tar file", + "paramRequired": true + }, + { + "paramName": "at", + "paramLongName": "accessToken", + "paramDescription": "the accessToken to contact API", + "paramRequired": true + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/orcid/DownloadORCIDTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/orcid/DownloadORCIDTest.java index 868f4e92d0..dc0393dc50 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/orcid/DownloadORCIDTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/orcid/DownloadORCIDTest.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.collection.orcid; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -9,7 +10,12 @@ import java.util.Objects; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Encoders; @@ -116,4 +122,19 @@ public class DownloadORCIDTest { }); } +// @Test +// public void testReadTar() throws Exception { +//// new OrcidGetUpdatesFile().readTar(); +// +// Configuration conf = new Configuration(); +// FileSystem fs = FileSystem.get(URI.create("file:///"), conf); +// final String token ="78fdb232-7105-4086-8570-e153f4198e3d"; +// +// new OrcidGetUpdatesFile().readTar(fs,token, "http://74804fb637bd8e2fba5b-e0a029c2f87486cddec3b416996a6057.r3.cf1.rackcdn.com/last_modified.csv.tar", "file:///Users/sandro/orcid","2023-09-30"); +// +// +// +// +// } + } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java index e07ba1b4ea..e728830554 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java @@ -82,7 +82,7 @@ public class IndexRecordTransformerTest { void testPeerReviewed() throws IOException, TransformerException { final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, - XmlConverterJob.schemaLocation); + XmlConverterJob.schemaLocation); final Publication p = load("publication.json", Publication.class);