From 9d812788e47e7d9ab891d730fa4ea52afa0cb4cb Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Fri, 8 May 2020 14:49:39 +0200 Subject: [PATCH 1/2] added job to download from orcid the records modified after a fixed date, the info are taken from last_modified.csv on hdfs --- .../doiboost/orcid/OrcidDownloader.java | 183 ++++++++++++++++++ .../dhp/doiboost/download_orcid_data.json | 7 + .../oozie_app/config-default.xml | 22 +++ .../orcid_download/oozie_app/workflow.xml | 45 +++++ 4 files changed, 257 insertions(+) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/download_orcid_data.json create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_download/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_download/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java new file mode 100644 index 000000000..445291f89 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java @@ -0,0 +1,183 @@ +package eu.dnetlib.doiboost.orcid; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.mortbay.log.Log; + +public class OrcidDownloader extends OrcidDSManager { + + static final int REQ_LIMIT = 24; + static final int REQ_MAX_TEST = 100; + static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 50000; + static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss."; + static final String lastUpdate = "2019-09-30 00:00:00.000000"; + private String lambdaFileName; + private String outputPath; + private String token; + + public static void main(String[] args) throws IOException, Exception { + OrcidDownloader orcidDownloader = new OrcidDownloader(); + orcidDownloader.loadArgs(args); + orcidDownloader.parseLambdaFile(); + } + + private String downloadRecord(String orcidId) throws Exception { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); + httpGet.addHeader("Accept", "application/vnd.orcid+xml"); + httpGet.addHeader("Authorization", String.format("Bearer %s", token)); + CloseableHttpResponse response = client.execute(httpGet); + if (response.getStatusLine().getStatusCode() != 200) { + Log.warn( + "Downloading " + + orcidId + + " status code: " + + response.getStatusLine().getStatusCode()); + return new String(""); + } + return IOUtils.toString(response.getEntity().getContent()); + + } catch (Throwable e) { + Log.warn("Downloading " + orcidId, e); + } + return new String(""); + } + + public void parseLambdaFile() throws Exception { + int parsedRecordsCounter = 0; + int downloadedRecordsCounter = 0; + int savedRecordsCounter = 0; + long startDownload = 0; + Configuration conf = initConfigurationObject(); + FileSystem fs = initFileSystemObject(conf); + String lambdaFileUri = hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(lambdaFileName); + Path hdfsreadpath = new Path(lambdaFileUri); + FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath); + Path hdfsoutputPath = + new Path( + hdfsServerUri + .concat(hdfsOrcidDefaultPath) + .concat(outputPath) + .concat("orcid_records.seq")); + + try (SequenceFile.Writer writer = + SequenceFile.createWriter( + conf, + SequenceFile.Writer.file(hdfsoutputPath), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class))) { + + try (BufferedReader br = new BufferedReader(new InputStreamReader(lambdaFileStream))) { + String line; + int nReqTmp = 0; + startDownload = System.currentTimeMillis(); + long startReqTmp = System.currentTimeMillis(); + while ((line = br.readLine()) != null) { + parsedRecordsCounter++; + // skip headers line + if (parsedRecordsCounter == 1) { + continue; + } + String[] values = line.split(","); + List recordInfo = Arrays.asList(values); + if (isModified(recordInfo.get(3))) { + String record = downloadRecord(recordInfo.get(0)); + downloadedRecordsCounter++; + if (!record.isEmpty()) { + String compressRecord = + ArgumentApplicationParser.compressArgument(record); + final Text key = new Text(recordInfo.get(0)); + final Text value = new Text(compressRecord); + + try { + writer.append(key, value); + savedRecordsCounter++; + } catch (IOException e) { + Log.debug("Writing to sequence file: " + e.getMessage()); + Log.debug(e); + throw new RuntimeException(e); + } + } + } + long endReq = System.currentTimeMillis(); + nReqTmp++; + if (nReqTmp == REQ_LIMIT) { + long reqSessionDuration = endReq - startReqTmp; + if (reqSessionDuration <= 1000) { + Log.warn( + "\nreqSessionDuration: " + + reqSessionDuration + + " nReqTmp: " + + nReqTmp + + " wait ...."); + Thread.sleep(1000 - reqSessionDuration); + } else { + nReqTmp = 0; + startReqTmp = System.currentTimeMillis(); + } + } + + // if (parsedRecordsCounter>REQ_MAX_TEST) { + // break; + // } + if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) { + Log.info("Current record parsed: " + parsedRecordsCounter); + Log.info("Current record downloaded: " + downloadedRecordsCounter); + Log.info("Current record saved: " + savedRecordsCounter); + } + } + long endDownload = System.currentTimeMillis(); + long downloadTime = endDownload - startDownload; + Log.info("Download time: " + ((downloadTime / 1000) / 60) + " minutes"); + } + } + lambdaFileStream.close(); + Log.info("Download started at: " + new Date(startDownload).toString()); + Log.info("Parsed Records Counter: " + parsedRecordsCounter); + Log.info("Downloaded Records Counter: " + downloadedRecordsCounter); + Log.info("Saved Records Counter: " + savedRecordsCounter); + } + + private void loadArgs(String[] args) throws IOException, Exception { + final ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + OrcidDownloader.class.getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); + parser.parseArgument(args); + + hdfsServerUri = parser.get("hdfsServerUri"); + Log.info("HDFS URI: " + hdfsServerUri); + hdfsOrcidDefaultPath = parser.get("hdfsOrcidDefaultPath"); + Log.info("Default Path: " + hdfsOrcidDefaultPath); + lambdaFileName = parser.get("lambdaFileName"); + Log.info("Lambda File Name: " + lambdaFileName); + outputPath = parser.get("outputPath"); + Log.info("Output Data: " + outputPath); + token = parser.get("token"); + } + + private boolean isModified(String modifiedDate) throws ParseException { + Date modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); + Date lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); + return modifiedDateDt.after(lastUpdateDt); + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/download_orcid_data.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/download_orcid_data.json new file mode 100644 index 000000000..444e487f7 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/download_orcid_data.json @@ -0,0 +1,7 @@ +[ + {"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true}, + {"paramName":"d", "paramLongName":"hdfsOrcidDefaultPath", "paramDescription": "the default work path", "paramRequired": true}, + {"paramName":"f", "paramLongName":"lambdaFileName", "paramDescription": "the name of the lambda file", "paramRequired": true}, + {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the relative folder of the sequencial file to write", "paramRequired": true}, + {"paramName":"t", "paramLongName":"token", "paramDescription": "token to grant access", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_download/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_download/oozie_app/config-default.xml new file mode 100644 index 000000000..5621415d9 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_download/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.action.sharelib.for.java + spark2 + + + oozie.launcher.mapreduce.user.classpath.first + true + + + oozie.launcher.mapreduce.map.java.opts + -Xmx4g + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_download/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_download/oozie_app/workflow.xml new file mode 100644 index 000000000..1f9adeb4d --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_download/oozie_app/workflow.xml @@ -0,0 +1,45 @@ + + + + workingPathOrcid + the working dir base path + + + token + access token + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.doiboost.orcid.OrcidDownloader + -d${workingPathOrcid}/ + -n${nameNode} + -flast_modified.csv + -odownload/ + -t${token} + + + + + + + \ No newline at end of file From b9d126dd1ffa739327a36c3dee0f1a3f4c897ce0 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Fri, 8 May 2020 14:54:37 +0200 Subject: [PATCH 2/2] formatting modified after commit --- .../doiboost/orcid/OrcidDownloader.java | 280 +++++++++--------- 1 file changed, 138 insertions(+), 142 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java index 445291f89..7163257f6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java @@ -24,160 +24,156 @@ import org.mortbay.log.Log; public class OrcidDownloader extends OrcidDSManager { - static final int REQ_LIMIT = 24; - static final int REQ_MAX_TEST = 100; - static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 50000; - static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss."; - static final String lastUpdate = "2019-09-30 00:00:00.000000"; - private String lambdaFileName; - private String outputPath; - private String token; + static final int REQ_LIMIT = 24; + static final int REQ_MAX_TEST = 100; + static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 50000; + static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss."; + static final String lastUpdate = "2019-09-30 00:00:00.000000"; + private String lambdaFileName; + private String outputPath; + private String token; - public static void main(String[] args) throws IOException, Exception { - OrcidDownloader orcidDownloader = new OrcidDownloader(); - orcidDownloader.loadArgs(args); - orcidDownloader.parseLambdaFile(); - } + public static void main(String[] args) throws IOException, Exception { + OrcidDownloader orcidDownloader = new OrcidDownloader(); + orcidDownloader.loadArgs(args); + orcidDownloader.parseLambdaFile(); + } - private String downloadRecord(String orcidId) throws Exception { - try (CloseableHttpClient client = HttpClients.createDefault()) { - HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); - httpGet.addHeader("Accept", "application/vnd.orcid+xml"); - httpGet.addHeader("Authorization", String.format("Bearer %s", token)); - CloseableHttpResponse response = client.execute(httpGet); - if (response.getStatusLine().getStatusCode() != 200) { - Log.warn( - "Downloading " - + orcidId - + " status code: " - + response.getStatusLine().getStatusCode()); - return new String(""); - } - return IOUtils.toString(response.getEntity().getContent()); - - } catch (Throwable e) { - Log.warn("Downloading " + orcidId, e); - } + private String downloadRecord(String orcidId) throws Exception { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); + httpGet.addHeader("Accept", "application/vnd.orcid+xml"); + httpGet.addHeader("Authorization", String.format("Bearer %s", token)); + CloseableHttpResponse response = client.execute(httpGet); + if (response.getStatusLine().getStatusCode() != 200) { + Log.warn( + "Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); return new String(""); + } + return IOUtils.toString(response.getEntity().getContent()); + + } catch (Throwable e) { + Log.warn("Downloading " + orcidId, e); } + return new String(""); + } - public void parseLambdaFile() throws Exception { - int parsedRecordsCounter = 0; - int downloadedRecordsCounter = 0; - int savedRecordsCounter = 0; - long startDownload = 0; - Configuration conf = initConfigurationObject(); - FileSystem fs = initFileSystemObject(conf); - String lambdaFileUri = hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(lambdaFileName); - Path hdfsreadpath = new Path(lambdaFileUri); - FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath); - Path hdfsoutputPath = - new Path( - hdfsServerUri - .concat(hdfsOrcidDefaultPath) - .concat(outputPath) - .concat("orcid_records.seq")); + public void parseLambdaFile() throws Exception { + int parsedRecordsCounter = 0; + int downloadedRecordsCounter = 0; + int savedRecordsCounter = 0; + long startDownload = 0; + Configuration conf = initConfigurationObject(); + FileSystem fs = initFileSystemObject(conf); + String lambdaFileUri = hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(lambdaFileName); + Path hdfsreadpath = new Path(lambdaFileUri); + FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath); + Path hdfsoutputPath = + new Path( + hdfsServerUri + .concat(hdfsOrcidDefaultPath) + .concat(outputPath) + .concat("orcid_records.seq")); - try (SequenceFile.Writer writer = - SequenceFile.createWriter( - conf, - SequenceFile.Writer.file(hdfsoutputPath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(Text.class))) { + try (SequenceFile.Writer writer = + SequenceFile.createWriter( + conf, + SequenceFile.Writer.file(hdfsoutputPath), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class))) { - try (BufferedReader br = new BufferedReader(new InputStreamReader(lambdaFileStream))) { - String line; - int nReqTmp = 0; - startDownload = System.currentTimeMillis(); - long startReqTmp = System.currentTimeMillis(); - while ((line = br.readLine()) != null) { - parsedRecordsCounter++; - // skip headers line - if (parsedRecordsCounter == 1) { - continue; - } - String[] values = line.split(","); - List recordInfo = Arrays.asList(values); - if (isModified(recordInfo.get(3))) { - String record = downloadRecord(recordInfo.get(0)); - downloadedRecordsCounter++; - if (!record.isEmpty()) { - String compressRecord = - ArgumentApplicationParser.compressArgument(record); - final Text key = new Text(recordInfo.get(0)); - final Text value = new Text(compressRecord); + try (BufferedReader br = new BufferedReader(new InputStreamReader(lambdaFileStream))) { + String line; + int nReqTmp = 0; + startDownload = System.currentTimeMillis(); + long startReqTmp = System.currentTimeMillis(); + while ((line = br.readLine()) != null) { + parsedRecordsCounter++; + // skip headers line + if (parsedRecordsCounter == 1) { + continue; + } + String[] values = line.split(","); + List recordInfo = Arrays.asList(values); + if (isModified(recordInfo.get(3))) { + String record = downloadRecord(recordInfo.get(0)); + downloadedRecordsCounter++; + if (!record.isEmpty()) { + String compressRecord = ArgumentApplicationParser.compressArgument(record); + final Text key = new Text(recordInfo.get(0)); + final Text value = new Text(compressRecord); - try { - writer.append(key, value); - savedRecordsCounter++; - } catch (IOException e) { - Log.debug("Writing to sequence file: " + e.getMessage()); - Log.debug(e); - throw new RuntimeException(e); - } - } - } - long endReq = System.currentTimeMillis(); - nReqTmp++; - if (nReqTmp == REQ_LIMIT) { - long reqSessionDuration = endReq - startReqTmp; - if (reqSessionDuration <= 1000) { - Log.warn( - "\nreqSessionDuration: " - + reqSessionDuration - + " nReqTmp: " - + nReqTmp - + " wait ...."); - Thread.sleep(1000 - reqSessionDuration); - } else { - nReqTmp = 0; - startReqTmp = System.currentTimeMillis(); - } - } - - // if (parsedRecordsCounter>REQ_MAX_TEST) { - // break; - // } - if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) { - Log.info("Current record parsed: " + parsedRecordsCounter); - Log.info("Current record downloaded: " + downloadedRecordsCounter); - Log.info("Current record saved: " + savedRecordsCounter); - } - } - long endDownload = System.currentTimeMillis(); - long downloadTime = endDownload - startDownload; - Log.info("Download time: " + ((downloadTime / 1000) / 60) + " minutes"); + try { + writer.append(key, value); + savedRecordsCounter++; + } catch (IOException e) { + Log.debug("Writing to sequence file: " + e.getMessage()); + Log.debug(e); + throw new RuntimeException(e); + } } + } + long endReq = System.currentTimeMillis(); + nReqTmp++; + if (nReqTmp == REQ_LIMIT) { + long reqSessionDuration = endReq - startReqTmp; + if (reqSessionDuration <= 1000) { + Log.warn( + "\nreqSessionDuration: " + + reqSessionDuration + + " nReqTmp: " + + nReqTmp + + " wait ...."); + Thread.sleep(1000 - reqSessionDuration); + } else { + nReqTmp = 0; + startReqTmp = System.currentTimeMillis(); + } + } + + // if (parsedRecordsCounter>REQ_MAX_TEST) { + // break; + // } + if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) { + Log.info("Current record parsed: " + parsedRecordsCounter); + Log.info("Current record downloaded: " + downloadedRecordsCounter); + Log.info("Current record saved: " + savedRecordsCounter); + } } - lambdaFileStream.close(); - Log.info("Download started at: " + new Date(startDownload).toString()); - Log.info("Parsed Records Counter: " + parsedRecordsCounter); - Log.info("Downloaded Records Counter: " + downloadedRecordsCounter); - Log.info("Saved Records Counter: " + savedRecordsCounter); + long endDownload = System.currentTimeMillis(); + long downloadTime = endDownload - startDownload; + Log.info("Download time: " + ((downloadTime / 1000) / 60) + " minutes"); + } } + lambdaFileStream.close(); + Log.info("Download started at: " + new Date(startDownload).toString()); + Log.info("Parsed Records Counter: " + parsedRecordsCounter); + Log.info("Downloaded Records Counter: " + downloadedRecordsCounter); + Log.info("Saved Records Counter: " + savedRecordsCounter); + } - private void loadArgs(String[] args) throws IOException, Exception { - final ArgumentApplicationParser parser = - new ArgumentApplicationParser( - IOUtils.toString( - OrcidDownloader.class.getResourceAsStream( - "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); - parser.parseArgument(args); + private void loadArgs(String[] args) throws IOException, Exception { + final ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + OrcidDownloader.class.getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); + parser.parseArgument(args); - hdfsServerUri = parser.get("hdfsServerUri"); - Log.info("HDFS URI: " + hdfsServerUri); - hdfsOrcidDefaultPath = parser.get("hdfsOrcidDefaultPath"); - Log.info("Default Path: " + hdfsOrcidDefaultPath); - lambdaFileName = parser.get("lambdaFileName"); - Log.info("Lambda File Name: " + lambdaFileName); - outputPath = parser.get("outputPath"); - Log.info("Output Data: " + outputPath); - token = parser.get("token"); - } + hdfsServerUri = parser.get("hdfsServerUri"); + Log.info("HDFS URI: " + hdfsServerUri); + hdfsOrcidDefaultPath = parser.get("hdfsOrcidDefaultPath"); + Log.info("Default Path: " + hdfsOrcidDefaultPath); + lambdaFileName = parser.get("lambdaFileName"); + Log.info("Lambda File Name: " + lambdaFileName); + outputPath = parser.get("outputPath"); + Log.info("Output Data: " + outputPath); + token = parser.get("token"); + } - private boolean isModified(String modifiedDate) throws ParseException { - Date modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); - Date lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); - return modifiedDateDt.after(lastUpdateDt); - } + private boolean isModified(String modifiedDate) throws ParseException { + Date modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); + Date lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); + return modifiedDateDt.after(lastUpdateDt); + } }