From 97c8111847a148fb738c593136d16934c6be15cf Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Mon, 23 Nov 2020 09:49:22 +0100 Subject: [PATCH] action to convert lambda file in seq file; spark action to download updated authors --- .../doiboost/orcid/OrcidDownloader.java | 185 +++++++++--------- .../orcid/SparkDownloadOrcidAuthors.java | 166 ++++++++++++++++ .../orcid/SparkGenLastModifiedSeq.java | 99 ++++++++++ .../orcid/SparkOrcidGenerateAuthors.java | 165 ---------------- .../orcid/SparkPartitionLambdaFile.java | 50 ----- .../orcid/model/DownloadedRecordData.java | 14 +- .../gen_orcid_authors_parameters.json | 4 - .../oozie_app/config-default.xml | 22 --- .../orcid_gen_authors/oozie_app/workflow.xml | 83 -------- .../oozie_app/workflow.xml | 122 +++++++++++- .../doiboost/orcid/OrcidClientTest.java | 139 +++++++++++-- .../0000-0001-6645-509X.compressed.base64 | 1 - .../0000-0003-3028-6161.compressed.base64 | 1 + 13 files changed, 608 insertions(+), 443 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenLastModifiedSeq.java delete mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkOrcidGenerateAuthors.java delete mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPartitionLambdaFile.java delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_orcid_authors_parameters.json delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/orcid/0000-0001-6645-509X.compressed.base64 create mode 100644 dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/orcid/0000-0003-3028-6161.compressed.base64 diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java index 762d8aecd8..be727ab9fe 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java @@ -1,14 +1,15 @@ package eu.dnetlib.doiboost.orcid; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; +import java.io.*; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.List; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -16,6 +17,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -27,10 +29,10 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; public class OrcidDownloader extends OrcidDSManager { static final int REQ_LIMIT = 24; -// static final int REQ_MAX_TEST = 100; - static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 10000; + static final int REQ_MAX_TEST = -1; + static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 500; static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; - static final String lastUpdate = "2019-09-30 00:00:00"; + static final String lastUpdate = "2020-09-29 00:00:00"; private String lambdaFileName; private String outputPath; private String token; @@ -41,7 +43,7 @@ public class OrcidDownloader extends OrcidDSManager { orcidDownloader.parseLambdaFile(); } - private String downloadRecord(String orcidId) { + private String downloadRecord(String orcidId) throws IOException { try (CloseableHttpClient client = HttpClients.createDefault()) { HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); httpGet.addHeader("Accept", "application/vnd.orcid+xml"); @@ -49,17 +51,23 @@ public class OrcidDownloader extends OrcidDSManager { CloseableHttpResponse response = client.execute(httpGet); if (response.getStatusLine().getStatusCode() != 200) { Log - .warn( + .info( "Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); return new String(""); } - return IOUtils.toString(response.getEntity().getContent()); - - } catch (Throwable e) { - Log.warn("Downloading " + orcidId, e.getMessage()); - +// return IOUtils.toString(response.getEntity().getContent()); + return xmlStreamToString(response.getEntity().getContent()); } - return new String(""); + } + + private String xmlStreamToString(InputStream xmlStream) throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(xmlStream)); + String line; + StringBuffer buffer = new StringBuffer(); + while ((line = br.readLine()) != null) { + buffer.append(line); + } + return buffer.toString(); } public void parseLambdaFile() throws Exception { @@ -76,90 +84,87 @@ public class OrcidDownloader extends OrcidDSManager { hdfsServerUri .concat(workingPath) .concat(outputPath) - .concat("orcid_records.seq")); - - try (SequenceFile.Writer writer = SequenceFile - .createWriter( - conf, - SequenceFile.Writer.file(hdfsoutputPath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(Text.class))) { - - try (BufferedReader br = new BufferedReader(new InputStreamReader(lambdaFileStream))) { - String line; - int nReqTmp = 0; + .concat("updated_xml_authors.seq")); + try (TarArchiveInputStream tais = new TarArchiveInputStream( + new GzipCompressorInputStream(lambdaFileStream))) { + TarArchiveEntry entry = null; + StringBuilder sb = new StringBuilder(); + try (SequenceFile.Writer writer = SequenceFile + .createWriter( + conf, + SequenceFile.Writer.file(hdfsoutputPath), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class), + SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()))) { startDownload = System.currentTimeMillis(); - long startReqTmp = System.currentTimeMillis(); - while ((line = br.readLine()) != null) { - parsedRecordsCounter++; - // skip headers line - if (parsedRecordsCounter == 1) { - continue; - } - String[] values = line.split(","); - List recordInfo = Arrays.asList(values); - String orcidId = recordInfo.get(0); - if (isModified(orcidId, recordInfo.get(3))) { - String record = downloadRecord(orcidId); - downloadedRecordsCounter++; - if (!record.isEmpty()) { - String compressRecord = ArgumentApplicationParser.compressArgument(record); - final Text key = new Text(recordInfo.get(0)); - final Text value = new Text(compressRecord); - - try { + while ((entry = tais.getNextTarEntry()) != null) { + BufferedReader br = new BufferedReader(new InputStreamReader(tais)); // Read directly from tarInput + String line; + while ((line = br.readLine()) != null) { + String[] values = line.split(","); + List recordInfo = Arrays.asList(values); + int nReqTmp = 0; + long startReqTmp = System.currentTimeMillis(); + // skip headers line + if (parsedRecordsCounter == 0) { + parsedRecordsCounter++; + continue; + } + parsedRecordsCounter++; + String orcidId = recordInfo.get(0); + if (isModified(orcidId, recordInfo.get(3))) { + String record = downloadRecord(orcidId); + downloadedRecordsCounter++; + if (!record.isEmpty()) { +// String compressRecord = ArgumentApplicationParser.compressArgument(record); + final Text key = new Text(recordInfo.get(0)); + final Text value = new Text(record); writer.append(key, value); savedRecordsCounter++; - } catch (IOException e) { - Log.warn("Writing to sequence file: " + e.getMessage()); - Log.warn(e); - throw new RuntimeException(e); + } + } else { + break; + } + long endReq = System.currentTimeMillis(); + nReqTmp++; + if (nReqTmp == REQ_LIMIT) { + long reqSessionDuration = endReq - startReqTmp; + if (reqSessionDuration <= 1000) { + Log + .info( + "\nreqSessionDuration: " + + reqSessionDuration + + " nReqTmp: " + + nReqTmp + + " wait ...."); + Thread.sleep(1000 - reqSessionDuration); + } else { + nReqTmp = 0; + startReqTmp = System.currentTimeMillis(); + } + } + if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) { + Log + .info( + "Current parsed: " + + parsedRecordsCounter + + " downloaded: " + + downloadedRecordsCounter + + " saved: " + + savedRecordsCounter); + if (REQ_MAX_TEST != -1 && parsedRecordsCounter > REQ_MAX_TEST) { + break; } } } - long endReq = System.currentTimeMillis(); - nReqTmp++; - if (nReqTmp == REQ_LIMIT) { - long reqSessionDuration = endReq - startReqTmp; - if (reqSessionDuration <= 1000) { - Log - .warn( - "\nreqSessionDuration: " - + reqSessionDuration - + " nReqTmp: " - + nReqTmp - + " wait ...."); - Thread.sleep(1000 - reqSessionDuration); - } else { - nReqTmp = 0; - startReqTmp = System.currentTimeMillis(); - } - } - -// if (parsedRecordsCounter > REQ_MAX_TEST) { -// break; -// } - if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) { - Log - .info( - "Current parsed: " - + parsedRecordsCounter - + " downloaded: " - + downloadedRecordsCounter - + " saved: " - + savedRecordsCounter); -// if (parsedRecordsCounter > REQ_MAX_TEST) { -// break; -// } - } + long endDownload = System.currentTimeMillis(); + long downloadTime = endDownload - startDownload; + Log.info("Download time: " + ((downloadTime / 1000) / 60) + " minutes"); } - long endDownload = System.currentTimeMillis(); - long downloadTime = endDownload - startDownload; - Log.info("Download time: " + ((downloadTime / 1000) / 60) + " minutes"); } } - lambdaFileStream.close(); Log.info("Download started at: " + new Date(startDownload).toString()); + Log.info("Download ended at: " + new Date(System.currentTimeMillis()).toString()); Log.info("Parsed Records Counter: " + parsedRecordsCounter); Log.info("Downloaded Records Counter: " + downloadedRecordsCounter); Log.info("Saved Records Counter: " + savedRecordsCounter); @@ -185,7 +190,7 @@ public class OrcidDownloader extends OrcidDSManager { token = parser.get("token"); } - private boolean isModified(String orcidId, String modifiedDate) { + public boolean isModified(String orcidId, String modifiedDate) { Date modifiedDateDt = null; Date lastUpdateDt = null; try { @@ -195,7 +200,7 @@ public class OrcidDownloader extends OrcidDSManager { modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); } catch (Exception e) { - Log.warn("[" + orcidId + "] Parsing date: ", e.getMessage()); + Log.info("[" + orcidId + "] Parsing date: ", e.getMessage()); return true; } return modifiedDateDt.after(lastUpdateDt); diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java new file mode 100644 index 0000000000..850a654d4a --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java @@ -0,0 +1,166 @@ + +package eu.dnetlib.doiboost.orcid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.util.LongAccumulator; +import org.mortbay.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData; +import scala.Tuple2; + +public class SparkDownloadOrcidAuthors { + + static Logger logger = LoggerFactory.getLogger(SparkDownloadOrcidAuthors.class); + static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + static final String lastUpdate = "2020-09-29 00:00:00"; + + public static void main(String[] args) throws IOException, Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkDownloadOrcidAuthors.class + .getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + logger.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String workingPath = parser.get("workingPath"); + logger.info("workingPath: ", workingPath); + final String outputPath = parser.get("outputPath"); + logger.info("outputPath: ", outputPath); + final String token = parser.get("token"); + final String lambdaFileName = parser.get("lambdaFileName"); + logger.info("lambdaFileName: ", lambdaFileName); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + LongAccumulator parsedRecordsAcc = spark.sparkContext().longAccumulator("parsedRecords"); + LongAccumulator modifiedRecordsAcc = spark.sparkContext().longAccumulator("modifiedRecords"); + LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloadedRecords"); + + logger.info("Retrieving data from lamda sequence file"); + JavaPairRDD lamdaFileRDD = sc + .sequenceFile(workingPath + lambdaFileName, Text.class, Text.class); + logger.info("Data retrieved: " + lamdaFileRDD.count()); + + Function, Boolean> isModifiedAfterFilter = data -> { + String orcidId = data._1().toString(); + String lastModifiedDate = data._2().toString(); + parsedRecordsAcc.add(1); + if (isModified(orcidId, lastModifiedDate)) { + modifiedRecordsAcc.add(1); + return true; + } + return false; + }; + + Function, Tuple2> downloadRecordFunction = data -> { + String orcidId = data._1().toString(); + String lastModifiedDate = data._2().toString(); + final DownloadedRecordData downloaded = new DownloadedRecordData(); + downloaded.setOrcidId(orcidId); + downloaded.setLastModifiedDate(lastModifiedDate); + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); + httpGet.addHeader("Accept", "application/vnd.orcid+xml"); + httpGet.addHeader("Authorization", String.format("Bearer %s", token)); + CloseableHttpResponse response = client.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + downloaded.setStatusCode(statusCode); + if (statusCode != 200) { + logger + .info( + "Downloading " + orcidId + " status code: " + + response.getStatusLine().getStatusCode()); + return downloaded.toTuple2(); + } + downloadedRecordsAcc.add(1); + long currentDownloaded = downloadedRecordsAcc.value(); + if ((currentDownloaded % 10000) == 0) { + logger.info("Current downloaded: " + currentDownloaded); + } + downloaded + .setCompressedData( + ArgumentApplicationParser + .compressArgument(IOUtils.toString(response.getEntity().getContent()))); + } catch (Throwable e) { + logger.info("Downloading " + orcidId, e.getMessage()); + downloaded.setErrorMessage(e.getMessage()); + return downloaded.toTuple2(); + } + return downloaded.toTuple2(); + }; + + sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true"); + + logger.info("Start execution ..."); +// List> sampleList = lamdaFileRDD.take(500); +// JavaRDD> sampleRDD = sc.parallelize(sampleList); +// sampleRDD + JavaPairRDD authorsModifiedRDD = lamdaFileRDD + .filter(isModifiedAfterFilter); + logger.info("Authors modified count: " + authorsModifiedRDD.count()); + logger.info("Start downloading ..."); + authorsModifiedRDD + .map(downloadRecordFunction) + .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) + .saveAsNewAPIHadoopFile( + workingPath.concat(outputPath), + Text.class, + Text.class, + SequenceFileOutputFormat.class, + sc.hadoopConfiguration()); + logger.info("parsedRecordsAcc: " + parsedRecordsAcc.value().toString()); + logger.info("modifiedRecordsAcc: " + modifiedRecordsAcc.value().toString()); + logger.info("downloadedRecordsAcc: " + downloadedRecordsAcc.value().toString()); + }); + + } + + private static boolean isModified(String orcidId, String modifiedDate) { + Date modifiedDateDt = null; + Date lastUpdateDt = null; + try { + if (modifiedDate.length() != 19) { + modifiedDate = modifiedDate.substring(0, 19); + } + modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); + lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); + } catch (Exception e) { + logger.info("[" + orcidId + "] Parsing date: ", e.getMessage()); + return true; + } + return modifiedDateDt.after(lastUpdateDt); + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenLastModifiedSeq.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenLastModifiedSeq.java new file mode 100644 index 0000000000..f710635abe --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenLastModifiedSeq.java @@ -0,0 +1,99 @@ + +package eu.dnetlib.doiboost.orcid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.mortbay.log.Log; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class SparkGenLastModifiedSeq { + private static String hdfsServerUri; + private static String workingPath; + private static String outputPath; + private static String lambdaFileName; + + public static void main(String[] args) throws IOException, Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkGenLastModifiedSeq.class + .getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + hdfsServerUri = parser.get("hdfsServerUri"); + workingPath = parser.get("workingPath"); + outputPath = parser.get("outputPath"); + lambdaFileName = parser.get("lambdaFileName"); + String lambdaFileUri = hdfsServerUri.concat(workingPath).concat(lambdaFileName); + + SparkConf sparkConf = new SparkConf(); + runWithSparkSession( + sparkConf, + isSparkSessionManaged, + spark -> { + int rowsNum = 0; + Path output = new Path( + hdfsServerUri + .concat(workingPath) + .concat(outputPath)); + Path hdfsreadpath = new Path(lambdaFileUri); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath)); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf); + FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath); + try (TarArchiveInputStream tais = new TarArchiveInputStream( + new GzipCompressorInputStream(lambdaFileStream))) { + TarArchiveEntry entry = null; + try (SequenceFile.Writer writer = SequenceFile + .createWriter( + conf, + SequenceFile.Writer.file(output), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class), + SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()))) { + while ((entry = tais.getNextTarEntry()) != null) { + BufferedReader br = new BufferedReader(new InputStreamReader(tais)); + String line; + while ((line = br.readLine()) != null) { + String[] values = line.split(","); + List recordInfo = Arrays.asList(values); + String orcidId = recordInfo.get(0); + final Text key = new Text(orcidId); + final Text value = new Text(recordInfo.get(3)); + writer.append(key, value); + rowsNum++; + } + } + } + } + Log.info("Saved rows from lamda csv tar file: " + rowsNum); + }); + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkOrcidGenerateAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkOrcidGenerateAuthors.java deleted file mode 100644 index 4e18ab8401..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkOrcidGenerateAuthors.java +++ /dev/null @@ -1,165 +0,0 @@ - -package eu.dnetlib.doiboost.orcid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.util.LongAccumulator; -import org.mortbay.log.Log; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData; -import scala.Tuple2; - -public class SparkOrcidGenerateAuthors { - - static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; - static final String lastUpdate = "2019-09-30 00:00:00"; - - public static void main(String[] args) throws IOException, Exception { - Logger logger = LoggerFactory.getLogger(SparkOrcidGenerateAuthors.class); - logger.info("[ SparkOrcidGenerateAuthors STARTED]"); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkOrcidGenerateAuthors.class - .getResourceAsStream( - "/eu/dnetlib/dhp/doiboost/gen_orcid_authors_parameters.json"))); - parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - logger.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - logger.info("workingPath: ", workingPath); - final String outputAuthorsPath = parser.get("outputAuthorsPath"); - logger.info("outputAuthorsPath: ", outputAuthorsPath); - final String token = parser.get("token"); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - LongAccumulator parsedRecordsAcc = sc.sc().longAccumulator("parsedRecords"); - LongAccumulator modifiedRecordsAcc = sc.sc().longAccumulator("modifiedRecords"); - LongAccumulator downloadedRecordsAcc = sc.sc().longAccumulator("downloadedRecords"); - LongAccumulator alreadyDownloadedRecords = sc.sc().longAccumulator("alreadyDownloadedRecords"); - JavaRDD lamdaFileRDD = sc.textFile(workingPath + "lamdafiles"); - - JavaRDD downloadedRDD = sc.textFile(workingPath + "downloaded"); - Function getOrcidIdFunction = line -> { - try { - String[] values = line.split(","); - return values[0].substring(1); - } catch (Exception e) { - return new String(""); - } - }; - List downloadedRecords = downloadedRDD.map(getOrcidIdFunction).collect(); - - Function isModifiedAfterFilter = line -> { - String[] values = line.split(","); - String orcidId = values[0]; - parsedRecordsAcc.add(1); - if (isModified(orcidId, values[3])) { - modifiedRecordsAcc.add(1); - return true; - } - return false; - }; - Function isNotDownloadedFilter = line -> { - String[] values = line.split(","); - String orcidId = values[0]; - if (downloadedRecords.contains(orcidId)) { - alreadyDownloadedRecords.add(1); - return false; - } - return true; - }; - Function> downloadRecordFunction = line -> { - String[] values = line.split(","); - String orcidId = values[0]; - String modifiedDate = values[3]; - return downloadRecord(orcidId, modifiedDate, token, downloadedRecordsAcc); - }; - - lamdaFileRDD - .filter(isModifiedAfterFilter) - .filter(isNotDownloadedFilter) - .map(downloadRecordFunction) - .rdd() - .saveAsTextFile(workingPath.concat(outputAuthorsPath)); - }); - - } - - private static boolean isModified(String orcidId, String modifiedDate) { - Date modifiedDateDt = null; - Date lastUpdateDt = null; - try { - if (modifiedDate.length() != 19) { - modifiedDate = modifiedDate.substring(0, 19); - } - modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); - lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); - } catch (Exception e) { - Log.warn("[" + orcidId + "] Parsing date: ", e.getMessage()); - return true; - } - return modifiedDateDt.after(lastUpdateDt); - } - - private static Tuple2 downloadRecord(String orcidId, String modifiedDate, String token, - LongAccumulator downloadedRecordsAcc) { - final DownloadedRecordData data = new DownloadedRecordData(); - data.setOrcidId(orcidId); - data.setModifiedDate(modifiedDate); - try (CloseableHttpClient client = HttpClients.createDefault()) { - HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); - httpGet.addHeader("Accept", "application/vnd.orcid+xml"); - httpGet.addHeader("Authorization", String.format("Bearer %s", token)); - CloseableHttpResponse response = client.execute(httpGet); - int statusCode = response.getStatusLine().getStatusCode(); - data.setStatusCode(statusCode); - if (statusCode != 200) { - Log - .warn( - "Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); - return data.toTuple2(); - } - downloadedRecordsAcc.add(1); - data - .setCompressedData( - ArgumentApplicationParser.compressArgument(IOUtils.toString(response.getEntity().getContent()))); - } catch (Throwable e) { - Log.warn("Downloading " + orcidId, e.getMessage()); - data.setErrorMessage(e.getMessage()); - return data.toTuple2(); - } - return data.toTuple2(); - } -} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPartitionLambdaFile.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPartitionLambdaFile.java deleted file mode 100644 index ca6f0f6c4c..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPartitionLambdaFile.java +++ /dev/null @@ -1,50 +0,0 @@ - -package eu.dnetlib.doiboost.orcid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.IOException; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; - -public class SparkPartitionLambdaFile { - - public static void main(String[] args) throws IOException, Exception { - Logger logger = LoggerFactory.getLogger(SparkOrcidGenerateAuthors.class); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkOrcidGenerateAuthors.class - .getResourceAsStream( - "/eu/dnetlib/dhp/doiboost/gen_orcid_authors_parameters.json"))); - parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - final String workingPath = parser.get("workingPath"); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD lamdaFileRDD = sc.textFile(workingPath + "last_modified.csv"); - - lamdaFileRDD - .repartition(20) - .saveAsTextFile(workingPath.concat("lamdafiles")); - }); - } - -} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/DownloadedRecordData.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/DownloadedRecordData.java index f66ef82a2f..da1a79b199 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/DownloadedRecordData.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/DownloadedRecordData.java @@ -3,8 +3,6 @@ package eu.dnetlib.doiboost.orcid.model; import java.io.Serializable; -import org.apache.hadoop.io.Text; - import com.google.gson.JsonObject; import scala.Tuple2; @@ -12,7 +10,7 @@ import scala.Tuple2; public class DownloadedRecordData implements Serializable { private String orcidId; - private String modifiedDate; + private String lastModifiedDate; private String statusCode; private String compressedData; private String errorMessage; @@ -20,7 +18,7 @@ public class DownloadedRecordData implements Serializable { public Tuple2 toTuple2() { JsonObject data = new JsonObject(); data.addProperty("statusCode", getStatusCode()); - data.addProperty("modifiedDate", getModifiedDate()); + data.addProperty("lastModifiedDate", getLastModifiedDate()); if (getCompressedData() != null) { data.addProperty("compressedData", getCompressedData()); } @@ -66,11 +64,11 @@ public class DownloadedRecordData implements Serializable { this.compressedData = compressedData; } - public String getModifiedDate() { - return modifiedDate; + public String getLastModifiedDate() { + return lastModifiedDate; } - public void setModifiedDate(String modifiedDate) { - this.modifiedDate = modifiedDate; + public void setLastModifiedDate(String lastModifiedDate) { + this.lastModifiedDate = lastModifiedDate; } } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_orcid_authors_parameters.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_orcid_authors_parameters.json deleted file mode 100644 index 35bfe1b415..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_orcid_authors_parameters.json +++ /dev/null @@ -1,4 +0,0 @@ -[{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path", "paramRequired": true}, - {"paramName":"t", "paramLongName":"token", "paramDescription": "token to grant access", "paramRequired": true}, - {"paramName":"o", "paramLongName":"outputAuthorsPath", "paramDescription": "the relative folder of the sequencial file to write the authors data", "paramRequired": true} -] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml deleted file mode 100644 index a720e75923..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - jobTracker - hadoop-rm3.garr-pa1.d4science.org:8032 - - - nameNode - hdfs://hadoop-rm1.garr-pa1.d4science.org:8020 - - - queueName - default - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml deleted file mode 100644 index 7ebc5f0a0b..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml +++ /dev/null @@ -1,83 +0,0 @@ - - - - workingPath - the working dir base path - - - token - access token - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - outputPath - the working dir base path - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn - cluster - Split_Lambda_File - eu.dnetlib.doiboost.orcid.SparkPartitionLambdaFile - dhp-doiboost-1.2.1-SNAPSHOT.jar - --num-executors 24 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - - -w${workingPath}/ - -oauthors/ - -t${token} - - - - - - - - ${jobTracker} - ${nameNode} - yarn - cluster - Gen_Orcid_Authors - eu.dnetlib.doiboost.orcid.SparkOrcidGenerateAuthors - dhp-doiboost-1.2.1-SNAPSHOT.jar - --num-executors 20 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - - -w${workingPath}/ - -oauthors/ - -t${token} - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml index a3daab1162..5f728d35ba 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml @@ -14,9 +14,63 @@ the shell command that downloads the lambda file from orcid containing last orcid update informations + + sparkExecutorNumber + 20 + + + sparkDriverMemory + 7G + memory for driver process + + + sparkExecutorMemory + 2G + memory for individual executor + + + sparkExecutorCores + 1 + number of cores used by single executor + + + spark2MaxExecutors + 20 + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + - - + + + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + @@ -26,6 +80,7 @@ + @@ -41,24 +96,77 @@ ${shell_cmd} - + - + ${jobTracker} ${nameNode} eu.dnetlib.doiboost.orcid.OrcidDownloader - -d${workingPathOrcid}/ + -w${workingPath}/ -n${nameNode} - -flast_modified.csv - -odownload/ + -flast_modified.csv.tar + -odownloads/ -t${token} + + + + yarn-cluster + cluster + GenLastModifiedSeq + eu.dnetlib.doiboost.orcid.SparkGenLastModifiedSeq + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + + -w${workingPath}/ + -n${nameNode} + -flast_modified.csv.tar + -olast_modified.seq + -t- + + + + + + + + yarn-cluster + cluster + DownloadOrcidAuthors + eu.dnetlib.doiboost.orcid.SparkDownloadOrcidAuthors + dhp-doiboost-${projectVersion}.jar + + --num-executors=${sparkExecutorNumber} + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + + -w${workingPath}/ + -n${nameNode} + -flast_modified.seq + -odownloads/updated_authors + -t${token} + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java index 774475626d..d6ce99f1c6 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java @@ -5,17 +5,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.*; import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.List; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull; import org.junit.jupiter.api.Test; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -37,12 +44,49 @@ public class OrcidClientTest { // 'https://api.orcid.org/v3.0/0000-0001-7291-3210/record' @Test - public void downloadTest() throws Exception { - String record = testDownloadRecord("0000-0001-6163-2042"); - File f = new File("/tmp/downloaded_0000-0001-6163-2042.xml"); + private void multipleDownloadTest() throws Exception { + int toDownload = 1; + long start = System.currentTimeMillis(); + OrcidDownloader downloader = new OrcidDownloader(); + TarArchiveInputStream input = new TarArchiveInputStream( + new GzipCompressorInputStream(new FileInputStream("/tmp/last_modified.csv.tar"))); + TarArchiveEntry entry = input.getNextTarEntry(); + BufferedReader br = null; + StringBuilder sb = new StringBuilder(); + int rowNum = 0; + int entryNum = 0; + int modified = 0; + while (entry != null) { + br = new BufferedReader(new InputStreamReader(input)); // Read directly from tarInput + String line; + while ((line = br.readLine()) != null) { + String[] values = line.toString().split(","); + List recordInfo = Arrays.asList(values); + String orcidId = recordInfo.get(0); + if (downloader.isModified(orcidId, recordInfo.get(3))) { + downloadTest(orcidId); + modified++; + } + rowNum++; + if (modified > toDownload) { + break; + } + } + entryNum++; + entry = input.getNextTarEntry(); + } + long end = System.currentTimeMillis(); + logToFile("start test: " + new Date(start).toString()); + logToFile("end test: " + new Date(end).toString()); + } + + @Test + private void downloadTest(String orcid) throws Exception { + String record = testDownloadRecord(orcid); + String filename = "/tmp/downloaded_".concat(orcid).concat(".xml"); + File f = new File(filename); OutputStream outStream = new FileOutputStream(f); IOUtils.write(record.getBytes(), outStream); - System.out.println("saved to tmp"); } private String testDownloadRecord(String orcidId) throws Exception { @@ -50,7 +94,9 @@ public class OrcidClientTest { HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Authorization", "Bearer 78fdb232-7105-4086-8570-e153f4198e3d"); + logToFile("start connection: " + new Date(System.currentTimeMillis()).toString()); CloseableHttpResponse response = client.execute(httpGet); + logToFile("end connection: " + new Date(System.currentTimeMillis()).toString()); if (response.getStatusLine().getStatusCode() != 200) { System.out .println("Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); @@ -62,7 +108,7 @@ public class OrcidClientTest { return new String(""); } -// @Test + // @Test private void testLambdaFileParser() throws Exception { try (BufferedReader br = new BufferedReader( new InputStreamReader(this.getClass().getResourceAsStream("last_modified.csv")))) { @@ -108,7 +154,7 @@ public class OrcidClientTest { } } -// @Test + // @Test private void getRecordDatestamp() throws ParseException { Date toRetrieveDateDt = new SimpleDateFormat(DATE_FORMAT).parse(toRetrieveDate); Date toNotRetrieveDateDt = new SimpleDateFormat(DATE_FORMAT).parse(toNotRetrieveDate); @@ -126,7 +172,7 @@ public class OrcidClientTest { System.out.println(valueDt.toString()); } -// @Test + // @Test @Ignore private void testModifiedDate() throws ParseException { testDate(toRetrieveDate); @@ -134,14 +180,81 @@ public class OrcidClientTest { testDate(shortDate); } -// @Test - @Ignore - private void testReadBase64CompressedRecord() throws Exception { + @Test + public void testReadBase64CompressedRecord() throws Exception { final String base64CompressedRecord = IOUtils - .toString(getClass().getResourceAsStream("0000-0001-6645-509X.compressed.base64")); + .toString(getClass().getResourceAsStream("0000-0003-3028-6161.compressed.base64")); final String recordFromSeqFile = ArgumentApplicationParser.decompressValue(base64CompressedRecord); - System.out.println(recordFromSeqFile); - final String downloadedRecord = testDownloadRecord("0000-0001-6645-509X"); + logToFile("\n\ndownloaded \n\n" + recordFromSeqFile); + final String downloadedRecord = testDownloadRecord("0000-0003-3028-6161"); assertTrue(recordFromSeqFile.equals(downloadedRecord)); } + + @Test + private void lambdaFileReaderTest() throws Exception { + TarArchiveInputStream input = new TarArchiveInputStream( + new GzipCompressorInputStream(new FileInputStream("/develop/last_modified.csv.tar"))); + TarArchiveEntry entry = input.getNextTarEntry(); + BufferedReader br = null; + StringBuilder sb = new StringBuilder(); + int rowNum = 0; + int entryNum = 0; + while (entry != null) { + br = new BufferedReader(new InputStreamReader(input)); // Read directly from tarInput + String line; + while ((line = br.readLine()) != null) { + String[] values = line.toString().split(","); + List recordInfo = Arrays.asList(values); + assertTrue(recordInfo.size() == 4); + + rowNum++; + if (rowNum == 1) { + assertTrue(recordInfo.get(3).equals("last_modified")); + } else if (rowNum == 2) { + assertTrue(recordInfo.get(0).equals("0000-0002-0499-7333")); + } + } + entryNum++; + assertTrue(entryNum == 1); + entry = input.getNextTarEntry(); + } + } + + @Test + private void lambdaFileCounterTest() throws Exception { + final String lastUpdate = "2020-09-29 00:00:00"; + OrcidDownloader downloader = new OrcidDownloader(); + TarArchiveInputStream input = new TarArchiveInputStream( + new GzipCompressorInputStream(new FileInputStream("/tmp/last_modified.csv.tar"))); + TarArchiveEntry entry = input.getNextTarEntry(); + BufferedReader br = null; + StringBuilder sb = new StringBuilder(); + int rowNum = 0; + int entryNum = 0; + int modified = 0; + while (entry != null) { + br = new BufferedReader(new InputStreamReader(input)); // Read directly from tarInput + String line; + while ((line = br.readLine()) != null) { + String[] values = line.toString().split(","); + List recordInfo = Arrays.asList(values); + String orcidId = recordInfo.get(0); + if (downloader.isModified(orcidId, recordInfo.get(3))) { + modified++; + } + rowNum++; + } + entryNum++; + entry = input.getNextTarEntry(); + } + logToFile("rowNum: " + rowNum); + logToFile("modified: " + modified); + } + + private void logToFile(String log) + throws IOException { + log = log.concat("\n"); + Path path = Paths.get("/tmp/orcid_log.txt"); + Files.write(path, log.getBytes(), StandardOpenOption.APPEND); + } } diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/orcid/0000-0001-6645-509X.compressed.base64 b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/orcid/0000-0001-6645-509X.compressed.base64 deleted file mode 100644 index 1b088e061c..0000000000 --- a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/orcid/0000-0001-6645-509X.compressed.base64 +++ /dev/null @@ -1 +0,0 @@ -H4sIAAAAAAAAAO1a227bOBB9z1cIepd18SW24aho0wTbAgEWjRdY9I2RaJtbSdSSkhP165eURIm6kHa2SbCLNkBiWDxzhhxyZg7tbN49xZFxhIQinFyZ7sQxDZgEOETJ/sr8Y3trLU2DZiAJQYQTeGUWkJrv/IsNgQEm4bp6MVKQHa5M22E/Fvt1rcViNrfmzupP02AOErpGSQZJAqIr85Bl6dq2Hx8fJ5gEKGR/93ZCbYEQFjDMA5CV01KZNBBhEyKaoSTQW0mgxg6mbCUgg6HGrMEIK5wdILESEEO1VYsRVjGMH1i8DyhVW7WYJhqEYKKJBB8W2ADHsS4A1bhAV1uoRlfjAp2yaWG2S1YIM4AiqrbrIwXDN1g8ah3WgGblMbPWrJwPN9in6gxZKIRJhnYI6mI2BAueXZ5UGaCyrQFNVAjcQcISB+oC0oKEHQhDAqnGpga0WXRE7ABaKaZIf8j7SMHAIvtNbcVHBfLA0gSTQg2uAe0+pREuYhZK3WYJjLD6OwcRC/2pTO/AhC2F5IgCTfLVgO7ZPXVim71hFYLFEOm2tMW02UQhIAFP+pxojm0X186QvSfwiOCjbpoNSNg95JFmV/lof36MgOKc6KI3gJr+hcF+NlX9WJdgKXmqURmRE+RzdsroW+qRLrGxJYsBDe8uvs6qBAzMDphmfuO2AZePq4XY2pVspISVM1zyJCMiHIAI+jDZ2COPa4dayk2dUSL1JEdiJCCwTAErhtkBh/5d2SiskonAcGOrgEMqmj/EiPK+b4Wsq/me464sZ2l53tadrmeLtXc58ZbLry1n32IQ8QjQzIqZeGBBDAWrx7Ztbrnu1puu59P11JksPfdrE/sRm5FlRwDFMPQzkkNpjfXTIZ4Jmoqv7A49s96gxjolKAak0LN0QfU+j+7kpiowdR3SiCZRieSTVplyIWEcEUUPKEIZK85p/hChwKzJxgRYSyJvVXk+2k0abv187rWb1EGP8o1u/QlW3dZLi24lxHqPjjAp1RT1twgkRb4Z6IwO6ATfDsQoKkqs/xmBETIZ0e6GLW2H9LgVe5I2pLqNlmCmLTF120Ovq2gZe9AOa3lEK0Gl5ag0lWxZ6xAhWPSLEqJFJqhFnVB/WnuB6c59qNbG5J5+XSN44aTZ0+qlftg2eEkPWDSPecprY9Aqg2fUyZnlTLfObD2brZ3pZHm5OLNOStOUbjfaWMi47la3XM39Sh/VBqXkaWTfiWPXwFRMte7W0giMiqMvjbVkA7CKtb2yafkkmIpJ0ndaKhmn4uroZi1bF6niG2jCs2pRi1bx1kpdyyYwKg5+edESlABFP3zplOxPbk9wnnaHX9u9zC9VPjpEKZDjQAXYyooU+iFGzfwGg8+iO4Ioh77rTFzXWdnvr69v7u8nPCYTb7X0PNcZ9VNZPctRgknMjv53GBoZAQlF5Q2Wiz2zcQ8Cdu7oafct1/PmwDp1c1FiISyvSc9dOud4llMCoyrZWTHyKYx2o7Qd1PjJGTEbOYkjqJGjuOFJWqZy22XzzApwyG6qly67kCxWjnkqy+0WOSaWWe9LI1BYKAnhE1PNpj4lelqZp+XUmjpbz1szYTt3JjP38hyt3Od9raSXfVR19/TBqHBWEPHjr8192Wr8gl+RSJuzWi5nlrtyp+P3fJ2H3t1/yNS9++uoTn4eMGpsPztAvZCWd4Rrgillt/Q+XfcCoXGsAJXZkqEsOmOLK9g9K1CR9ZFdnBN+kzdu2WnNCTTuQEbQk3HNMp3VvlIXGnflZwfGDhPjI6y+FDC+wBQyJnbHMm7Ze0iMO3yElba7JTg2biIYZATzzzXSA4jwnoDYuEd7lvK0WZRmyhv71KLOb2oK9Hnn5YWam4ryVRqcytlbNznVPF690akcv1SzK/nPangq5An99W8jpIxKXSP4Gf2LlRI+CUAyFERQZJry+DZFuOyb1eeJ6pYjWxRM95fNrJlf+UQfpPPcVOsRS6nKxKebmxvjfXl+60V1x0fUyEBn9LS7rRfvP6rt64/GVlt3vnYXa8ebLJz5T6jt53ObB8OeLl2m2WZvJurP8fviav4cpz+BjF+4znzqzd3TMr5FvryMP5GBPyjjXyC/ZR+/ZPwvGd+Rzh8IQIl1jWOWVkyDf+L/PLMDATSuDyBJYGTdQ67DuYq/ZxUwg/vC+AAoq4fsyXuWtwVF1MA74+bIA/GFlwc2+BHSIgkOBCfoe1kvjC1OuYRPD4WBSi78DRq/szGu+H/p+ddqaiovb9bYVBN4veam8vj/l+6q0PwnNbu7OkOzy3bslxf3ZWNWPThpF4LC91or/va17gefq3e83v0GQZQdAkCgcZPsUQIhQcn+DW4NnbHyqwjxxaP2S0b/YmN3/tnSv/gH9+klwrUpAAA= \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/orcid/0000-0003-3028-6161.compressed.base64 b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/orcid/0000-0003-3028-6161.compressed.base64 new file mode 100644 index 0000000000..8dc3d32ad8 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/orcid/0000-0003-3028-6161.compressed.base64 @@ -0,0 +1 @@ +H4sIAAAAAAAAAO1dW5fbthF+z6/A2XPal5biRaREqmvlrG+JG6/t4900bd+4JLSCQxIqSa1X+fUFeIUkAiIlkpFs5jRxTQ1mgAEw881gQF7/+Ox74AmGEcLBiyt1pFwBGDjYRcHji6tf799K5hWIYjtwbQ8H8MXVBkZXP85/uA6hg0N3lv4BVna8fHElK+Qfifw7lsaKZkoTdaJeASIgiGYoiGEY2N6Lq2Ucr2ay/PXr1xEOHeSS/z7KQSTnFHkL6K4dO066xWtSkORtXBTFKHDErRiioh1ckZHYMXQFzQqavBWOlzCUAtuH/FYlTd7Kh/4D0fcSrfitSppCG2GIQ4Em6M85rYN9X6SA9PecOp1CPnX6e069It3CZJYkF8Y28iJ+u13KnMPvcPNVKDAjKEbuk9aCkdOfC9rndA1JyIVBjBYIinS2T5zzWayDdAfw2mYEhVZCuIAh2ThQpJCSKG9nu24II0GbjKDcRU+ILEBphSMkXuS7lDkHotnf+a3orznlkmwTHG74xBlBOU8rD298okrRZOU0eav/rW2PqP7QTt8iy9tGMHxCjmDzZQTba/fQii3mhlgIokMkmtKSptxNEbRDh276dShYttt0ZQ/J30P4hOBXUTcLorzdw9oTzCr9dbd/hEGE16FIe3ukV/MfAPnnOrUfs4SY2TzpryzFOkRzyj0i7EvWFV7iWmZa7LGh3mUuapUQ7DVb4iieF2IL4uRxOhBZOJJrZsOyO5yRxFJ42LE9OIfBtVzxOBMoZHmd7ah86zGC8l+cECZbQPJhvMTu/DZxFFLCKYTutcwj3GcVrR98FFG/L7nEq801RdUlxZK08b2mzDR9NlZHlmX9t+S522JP454dxZJPwANRoptz1RRJVSV1eq+NZwZhrIx0TflvofuKNhXD9mzkQ3ceh2vIjDF7uk9PAE3KL/EOO812fhS0XoXIt8ONmMs2UTbPlTN5nRqYzA4JQFNuiWpqWDUlZSqpk3vVnCnaTLNGxsSqqeGsSxSggCcUoQfkoZgY/dX6wUPOVdbJKmBXMmE7mKw7pmsTSdEl1Ugm35ypxshUpmXXtqgr+VUPWMxVNGBm0CU0mT2iJxgkKC2avwwJ2sV0F4uoDjBc2D7yNgnt/PWacIwr+LFE5YzIzJQwj0sgyeDOSLSIGLIrmeG07Xp2PJaQ4w7pFtdk+adgTcgjxWtsywzj5GBIPKgcELEMMsCYI0th+5xmu+/7SLAKSorHVUHP2SNtb+ImYwCrdSyR+I74fVUxjYkyuRLs+9ojlQtmJLpaefZGQoELn4nl2NGByFaINcC3FV3rluWfIqH93/dpJMdDRD9ES9XUbItqoJQyKOZAkwzL1CTTMsfVeInHfQs/VXHZxk88Ngfx1F5DuZFCdtSX2L87B6/WEZDAGy+iiDfc5bltJavY2cSkhAkUwiF6RPQP5/g5qQ1ea03GYTDb/mQ00QdXh4naM08JcgcnJN7fUfKBLZULZ+yNFG9WxaK4WRNkG4J3rwtOe5S1eD7Z3hrO9SmZBFXVp4pSyS+lqsWQ+MY5E1RFSXdHhJBE5V/t0JXtpOevUxgwIuQ/pk/evX7BdOOvtr/6x8oO4wDSX24/mPcfbz7fVfaOiqzVtxB6SVxAc0vzCHqLSnZbVNt+psr8VzkaFtHU9a9FlMTi5OxhGWozkbkUrX0KvoWIoYzRj49Y1Jrwku0mk2cUIgeWbhsYlbyKTKcYgxRUTZAHO1zdmmnaSB2bDZAHOzOLBcERaeD5GOL1qqGjPrErnEUfyRVkha5K3ZarqcBI+tTSLGMP1ahigJQzlPPmFQhLbHB3oREbmVsUwChjvS406kPrrAwRRNqnO+SO2RYtu2SW9YlumWXV2DUnjeVGWqnCShx3fBgoHXLErEAXUo9EM7gpx1dL6BP7FW4KLrsUQnYh9qAUo9iD80/L0pEzj8VLLSaiBEuSpd2Q0JVupXJKkycH25F/6dIwi2bpg4PtXHsz14xSLfbmkPoPDKawbIFoF1YN2TxqyKp2zJDVJkMWD6VMND/aAfojMamHO5Esul8DlBxqxhuAF+C3DfZRUG5F/rpkGWWphnqb3iGi5u/t0PYRLO0yfVireQgf6eB++0+5BdIn9YTjdUC24PzXEhzmjw4bnIPDLGYXRbb/gB7Xia+pNyn12rOwUdfVSbmCajVpKj1x9amt+/zuw08/fXz/ukoiQ3ZYi02Vw5w9iEivZQFO2UXm9YFYm5htC5uY5H8j3TD+dMymVWSiGmA2rWXMtq+XEzFbC1pnZQyYbcBsR2C2l7azhB4OI+pl7xxEDzMBCsC+hA4RnaL3ieiUc0B0ynGITjGOQnTm+SG6e/hsR8COwc2aJk86R3Y7YhoDu/t/f/fATtXGY2VAdiVZ68hui67MBXIyikUp1oHj2oLs2JxiwxTqbkfbzCke0RWW/0F8WiiLUW8FQlXHmmFYk8Z5xYKnXHI4FaYeUE+LqcUjlc/KGGDqdwRTPy03EXKIlw9ccEPAJA6w30KakejKxU6MQ9sDn7OCFsI/wg4xMrAPwKpax6E3/Rj0pqgdANaT8dvH3z17iX27c+D2AYe+fQJw+/jLmQG3+vBC5IzaABglMqhGGFmNvBBe5DS8c8/dMnYhsz1iHtfyeoWQH0PG48TUYgtZsXQ8Xls17kJu25Q8fnv127Vq+0pqHt+sql7ILafh8aAXDYQMEoJG9XMWrQlVjHuFwoyZYY0svUn9HNO5o7Kgp4ln+bMo80DeoKQ8bDv3imNcjBpUzojYpbUxqjJSVd2StZGiqKqlGF9g1KSURiQhoGbcQ39AF8QEgkQouWZCK7Kv6sstmTQTntfzUGxIlJYgw9pCKytzRNJOrdApeFc0q/ITtVbdNd2Pya6tCGeMiaqa1tgSBi+0rVxSVtWc1igdZZW2m28X78BT8+2n729WRsNApkYpKduqVjmpKlmmqUuqpdYsJ2UlbMU0VZx6jmka1NqyQcurEEdRCBdtRzTJbhEED2x/UrI77NkheLW0w0di4z5DFz+Dtx7+Cl7aMdmYmxm4AfckEoI++LTEMYYedEhI5NBEu0MimZvVKsS2syz6Jwpd5EP9q++KONT1pr4ll8Rj2a5b4knpzTXxOtCde+JJbMtFJfxruSke5QGA1Fg1XHVkW5quwy9k51PpJFRGjgfz3cRdooXNTNxcGli1ny8oLW8tMel48qGkpuCf6d+S85UlBG92TMwddhCMN9l4t5tW4Io9xCCIOQ+UKBwFyLfvfOiTkTYdnzcgh8htFZBrUkQWuWQaim7qk4nFrKMO4XhNqceC8X7hcXuT20m0pVjyp3/dvRqR5T4dmePJZKKbvYVcRPjqKXIOC7+MqW4jEiLrnRgefXw4EiopO4iExKbv1EjodMPKyjiDSKjxxTpWwomX61hWf2YkdMJFO5Zl08t2bNvv4Qju2MuIXVzA2+pYy+FsEqM+YS+2kQNCGK2IfYYUWAZ2TDrreRuAHWcdhih4BBEFmDhw1wTx0b+uvQVyIfBRAAlpCc6HkLYL388T1a//5/Xizwk1Ob05cgkch+t5LNvE9jwZPeF7nviLyy9wb/qmL09LrviSVfBiX3eHLvnWSUgQ25m9SEha2SvqujvPSNSpP20hI0FNC3j35s0boOvxEmx5lbsVdBBZYVEMXhUK+DugNhHQdpebpzD1885TdBHKKupE/jKKsOfbMfUoxkixRophHGPVjvFmjeRfRkDb3oR3lZjSLd0YK5apTPtMTB2QehmT22K2wqidraguN20/W2G2d7e9BfPKyhiyFUO2YshWNNbrhWcrwC8wIlYWxRA44SaKh6REDyiOJ613JMfryJCa6BLd8WT0hPB44r/51ESpuzZSE/3XSpQvj+g0M5HWa70hqnncgFubegfiFZK7KlkpF/Sol7jUHMSk5iuad/lfcg6CTalPRtOpQZxHL5HpbjpdLPwyAtT2prq7shhlbFhT09T7mOOaUi9jclvMPmi1sw/V72RpP/vAmr72sw9NDSsrY8g+DNmHIfvQWK8Xnn0YaiXarJU4CdjxRPUL7ni9GBISXQI+noyeQB9P/DefkCh1d6G1Ej1lJMoiCPA5/SojcSHxEqb1E5zSiYipnbjcZMW0hXeEX2IEq1uqppCtofZSJlFT6mVEsO1NbtuZqMxGus8j9sba7qmGTkDFSNG0Pia+sfxTLtpXMvwOLtGrE0U3FKvG1ZGSsoN0iJFcc58mxnU607XRpP43mFi+1abboKZb05OX1hozXR0Z4/7SIUN4nTSWG2mlKv/R7dvAmgfQb+2HMMNMFPO8cu8hiZ2JEcIusMPQ3kQAP8EQePRyPXkCbYKLCPJ4XNK9RncssXrZhXoX5m8PAgscArhYIPpa2xhEyakOTA98CIrMvpk9BNz9+y+e/N59GK8jx/kxHrfLjfCS90G7iAR2cRLoZU/k7DRYXiEk3ymWNlU03VR1RZko2rhRONf/+bLRTzRX73wZtBSzlb0QH9botQ9r9L4utpqdXmzt7zNIw2HNbuPhsOYw67NBk9/LYc2ANc8Ja353yHI44ukyI8qT0VNWlCf+cgOA2jWnue4us+b0rGKCSznG2csFTvXJeR/j2OEzeuoi1083y5cHJ4XwOIgphtfMqazJ6niijhZrzxut3MXflrHvtZ/wl2AYEvixZ9nq2SnaNgkS56bCvGueed6Ajw+jyH6E81d2EOAYFH0E5TdhQDJWkMzF7CT9bUlsZDD3lPd9HHNwenEGZ2YJBpvqMl0EtyrBXvQGg97L6+5Y0f4B0cO52NHnYmbtczGz83MxazZWR4bZ3ldyWvCFrIzhXGw4F6Nkb+zQ24Abj+y97JsSeAFIJ+kHGglSXDNo8mfsQ6IeO0IR/X7jz+uAlpfGdDAognYEwS3ZjUN9aaeuhyeoT/fD68O3cKTF6c2Ri+nYaIDHtNeIgNeJE6OCLbYnRgb7vM4wOkg6eVqEwJuKb2HHnZ5DEkyc/RCR1enEDNY4x+RQmQTu6+XtLxH28GPyznbmQ8yXmhWyWvhY56XFsuOpKmdTNVrhANJPFZjj8eQYU38Momgkf4hqj4xqVUX8vdcyqk0ou41qVbLh9JFptvii8NP3MitjiGqHqJaSJfEsMVyv8cqm1x7T8w8YYhKtktD1Fsb2A/ZQ5NNA9pY8pef19BLlTbTxVzH2iRFwwOeJrIFb5JQffB0i2o4cEE9a706I15EBae8U6xLN/0V7K9T9eWJttQbWZtv56YeVy5o2n/9h5RYw+qf3+A58/PDmYsG4qZ35N027PaL1wq1Y1NBkS1anujac0NbKwTRV33BAKxzkpQW1qqXLdA3cKupUnfb3napMrC8QO4SwR4ewNa4ElJQdh7DqTJuOpnqLVwJO94CsjCGEHULYpJAPPpNlv8jeu5Acy5LpCOj+osHs78gN4AY8evgBxyGycbTxHBiSaNdNo11vE2Gp+mcS89IS9Q3wh9i2Oz/EE9KXL+LJ/xYiWU5vzvaUtruggNeHb/aQtpsAIenjcEbb4Rktd94u5Ii2Ttqo3SPa92iFXPAZRkSes+whH7T1G2WRTfHW8/L/lgKus0sbs/SP+Q//BxvQAv4zvAAA \ No newline at end of file