From b2de598c1ae22beaf85c4eb5e162cc1cf6b13601 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Tue, 15 Dec 2020 10:42:55 +0100 Subject: [PATCH] all actions from download lambda file to merge updated data into one wf --- .../doiboost/orcid/OrcidDownloader.java | 208 ------------ .../orcid/SparkDownloadOrcidAuthors.java | 9 +- .../orcid/SparkDownloadOrcidWorks.java | 17 +- .../orcid/SparkUpdateOrcidAuthors.java | 8 +- .../orcid/SparkUpdateOrcidDatasets.java | 317 ------------------ .../doiboost/orcid/SparkUpdateOrcidWorks.java | 6 +- .../oozie_app/config-default.xml | 22 -- .../oozie_app/workflow.xml | 140 +++++++- .../doiboost/orcid/OrcidClientTest.java | 68 ---- 9 files changed, 146 insertions(+), 649 deletions(-) delete mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java delete mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java deleted file mode 100644 index be727ab9fe..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/OrcidDownloader.java +++ /dev/null @@ -1,208 +0,0 @@ - -package eu.dnetlib.doiboost.orcid; - -import java.io.*; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.List; - -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.mortbay.log.Log; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; - -public class OrcidDownloader extends OrcidDSManager { - - static final int REQ_LIMIT = 24; - static final int REQ_MAX_TEST = -1; - static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 500; - static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; - static final String lastUpdate = "2020-09-29 00:00:00"; - private String lambdaFileName; - private String outputPath; - private String token; - - public static void main(String[] args) throws IOException, Exception { - OrcidDownloader orcidDownloader = new OrcidDownloader(); - orcidDownloader.loadArgs(args); - orcidDownloader.parseLambdaFile(); - } - - private String downloadRecord(String orcidId) throws IOException { - try (CloseableHttpClient client = HttpClients.createDefault()) { - HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); - httpGet.addHeader("Accept", "application/vnd.orcid+xml"); - httpGet.addHeader("Authorization", String.format("Bearer %s", token)); - CloseableHttpResponse response = client.execute(httpGet); - if (response.getStatusLine().getStatusCode() != 200) { - Log - .info( - "Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); - return new String(""); - } -// return IOUtils.toString(response.getEntity().getContent()); - return xmlStreamToString(response.getEntity().getContent()); - } - } - - private String xmlStreamToString(InputStream xmlStream) throws IOException { - BufferedReader br = new BufferedReader(new InputStreamReader(xmlStream)); - String line; - StringBuffer buffer = new StringBuffer(); - while ((line = br.readLine()) != null) { - buffer.append(line); - } - return buffer.toString(); - } - - public void parseLambdaFile() throws Exception { - int parsedRecordsCounter = 0; - int downloadedRecordsCounter = 0; - int savedRecordsCounter = 0; - long startDownload = 0; - Configuration conf = initConfigurationObject(); - FileSystem fs = initFileSystemObject(conf); - String lambdaFileUri = hdfsServerUri.concat(workingPath).concat(lambdaFileName); - Path hdfsreadpath = new Path(lambdaFileUri); - FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath); - Path hdfsoutputPath = new Path( - hdfsServerUri - .concat(workingPath) - .concat(outputPath) - .concat("updated_xml_authors.seq")); - try (TarArchiveInputStream tais = new TarArchiveInputStream( - new GzipCompressorInputStream(lambdaFileStream))) { - TarArchiveEntry entry = null; - StringBuilder sb = new StringBuilder(); - try (SequenceFile.Writer writer = SequenceFile - .createWriter( - conf, - SequenceFile.Writer.file(hdfsoutputPath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(Text.class), - SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()))) { - startDownload = System.currentTimeMillis(); - while ((entry = tais.getNextTarEntry()) != null) { - BufferedReader br = new BufferedReader(new InputStreamReader(tais)); // Read directly from tarInput - String line; - while ((line = br.readLine()) != null) { - String[] values = line.split(","); - List recordInfo = Arrays.asList(values); - int nReqTmp = 0; - long startReqTmp = System.currentTimeMillis(); - // skip headers line - if (parsedRecordsCounter == 0) { - parsedRecordsCounter++; - continue; - } - parsedRecordsCounter++; - String orcidId = recordInfo.get(0); - if (isModified(orcidId, recordInfo.get(3))) { - String record = downloadRecord(orcidId); - downloadedRecordsCounter++; - if (!record.isEmpty()) { -// String compressRecord = ArgumentApplicationParser.compressArgument(record); - final Text key = new Text(recordInfo.get(0)); - final Text value = new Text(record); - writer.append(key, value); - savedRecordsCounter++; - } - } else { - break; - } - long endReq = System.currentTimeMillis(); - nReqTmp++; - if (nReqTmp == REQ_LIMIT) { - long reqSessionDuration = endReq - startReqTmp; - if (reqSessionDuration <= 1000) { - Log - .info( - "\nreqSessionDuration: " - + reqSessionDuration - + " nReqTmp: " - + nReqTmp - + " wait ...."); - Thread.sleep(1000 - reqSessionDuration); - } else { - nReqTmp = 0; - startReqTmp = System.currentTimeMillis(); - } - } - if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) { - Log - .info( - "Current parsed: " - + parsedRecordsCounter - + " downloaded: " - + downloadedRecordsCounter - + " saved: " - + savedRecordsCounter); - if (REQ_MAX_TEST != -1 && parsedRecordsCounter > REQ_MAX_TEST) { - break; - } - } - } - long endDownload = System.currentTimeMillis(); - long downloadTime = endDownload - startDownload; - Log.info("Download time: " + ((downloadTime / 1000) / 60) + " minutes"); - } - } - } - Log.info("Download started at: " + new Date(startDownload).toString()); - Log.info("Download ended at: " + new Date(System.currentTimeMillis()).toString()); - Log.info("Parsed Records Counter: " + parsedRecordsCounter); - Log.info("Downloaded Records Counter: " + downloadedRecordsCounter); - Log.info("Saved Records Counter: " + savedRecordsCounter); - } - - private void loadArgs(String[] args) throws IOException, Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - OrcidDownloader.class - .getResourceAsStream( - "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); - parser.parseArgument(args); - - hdfsServerUri = parser.get("hdfsServerUri"); - Log.info("HDFS URI: " + hdfsServerUri); - workingPath = parser.get("workingPath"); - Log.info("Default Path: " + workingPath); - lambdaFileName = parser.get("lambdaFileName"); - Log.info("Lambda File Name: " + lambdaFileName); - outputPath = parser.get("outputPath"); - Log.info("Output Data: " + outputPath); - token = parser.get("token"); - } - - public boolean isModified(String orcidId, String modifiedDate) { - Date modifiedDateDt = null; - Date lastUpdateDt = null; - try { - if (modifiedDate.length() != 19) { - modifiedDate = modifiedDate.substring(0, 19); - } - modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); - lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); - } catch (Exception e) { - Log.info("[" + orcidId + "] Parsing date: ", e.getMessage()); - return true; - } - return modifiedDateDt.after(lastUpdateDt); - } -} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java index 598835a000..71efdf28a4 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java @@ -34,7 +34,7 @@ public class SparkDownloadOrcidAuthors { static Logger logger = LoggerFactory.getLogger(SparkDownloadOrcidAuthors.class); static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; - static final String lastUpdate = "2020-09-29 00:00:00"; + static final String lastUpdate = "2020-11-18 00:00:05"; public static void main(String[] args) throws IOException, Exception { @@ -69,6 +69,7 @@ public class SparkDownloadOrcidAuthors { LongAccumulator modifiedRecordsAcc = spark.sparkContext().longAccumulator("to_download_records"); LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloaded_records"); LongAccumulator errorHTTP403Acc = spark.sparkContext().longAccumulator("error_HTTP_403"); + LongAccumulator errorHTTP404Acc = spark.sparkContext().longAccumulator("error_HTTP_404"); LongAccumulator errorHTTP409Acc = spark.sparkContext().longAccumulator("error_HTTP_409"); LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503"); LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525"); @@ -113,6 +114,8 @@ public class SparkDownloadOrcidAuthors { switch (statusCode) { case 403: errorHTTP403Acc.add(1); + case 404: + errorHTTP404Acc.add(1); case 409: errorHTTP409Acc.add(1); case 503: @@ -149,7 +152,7 @@ public class SparkDownloadOrcidAuthors { logger.info("Authors modified count: " + authorsModifiedRDD.count()); logger.info("Start downloading ..."); authorsModifiedRDD - .repartition(10) + .repartition(100) .map(downloadRecordFunction) .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) .saveAsNewAPIHadoopFile( @@ -158,10 +161,12 @@ public class SparkDownloadOrcidAuthors { Text.class, SequenceFileOutputFormat.class, sc.hadoopConfiguration()); + logger.info("parsedRecordsAcc: " + parsedRecordsAcc.value().toString()); logger.info("modifiedRecordsAcc: " + modifiedRecordsAcc.value().toString()); logger.info("downloadedRecordsAcc: " + downloadedRecordsAcc.value().toString()); logger.info("errorHTTP403Acc: " + errorHTTP403Acc.value().toString()); + logger.info("errorHTTP404Acc: " + errorHTTP404Acc.value().toString()); logger.info("errorHTTP409Acc: " + errorHTTP409Acc.value().toString()); logger.info("errorHTTP503Acc: " + errorHTTP503Acc.value().toString()); logger.info("errorHTTP525Acc: " + errorHTTP525Acc.value().toString()); diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java index f67e7e0ec4..871f2eaa7c 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java @@ -43,7 +43,7 @@ public class SparkDownloadOrcidWorks { public static final String ORCID_XML_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; public static final DateTimeFormatter ORCID_XML_DATETIMEFORMATTER = DateTimeFormatter .ofPattern(ORCID_XML_DATETIME_FORMAT); - public static final String lastUpdateValue = "2020-09-29 00:00:00"; + public static final String lastUpdateValue = "2020-11-18 00:00:05"; public static void main(String[] args) throws IOException, Exception { @@ -89,6 +89,7 @@ public class SparkDownloadOrcidWorks { .longAccumulator("error_parsing_xml_found"); LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloaded_records"); LongAccumulator errorHTTP403Acc = spark.sparkContext().longAccumulator("error_HTTP_403"); + LongAccumulator errorHTTP404Acc = spark.sparkContext().longAccumulator("error_HTTP_404"); LongAccumulator errorHTTP409Acc = spark.sparkContext().longAccumulator("error_HTTP_409"); LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503"); LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525"); @@ -163,6 +164,8 @@ public class SparkDownloadOrcidWorks { switch (statusCode) { case 403: errorHTTP403Acc.add(1); + case 404: + errorHTTP404Acc.add(1); case 409: errorHTTP409Acc.add(1); case 503: @@ -186,29 +189,19 @@ public class SparkDownloadOrcidWorks { .compressArgument(IOUtils.toString(response.getEntity().getContent()))); } catch (Throwable e) { logger.info("Downloading " + orcidId, e.getMessage()); - if (downloaded.getStatusCode() == 503) { - throw new RuntimeException("Orcid request rate limit reached (HTTP 503)"); - } downloaded.setErrorMessage(e.getMessage()); return downloaded.toTuple2(); } return downloaded.toTuple2(); }; -// sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true"); - updatedAuthorsRDD .flatMap(retrieveWorkUrlFunction) .repartition(100) .map(downloadWorkFunction) .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) .saveAsTextFile(workingPath.concat(outputPath), GzipCodec.class); -// .saveAsNewAPIHadoopFile( -// workingPath.concat(outputPath), -// Text.class, -// Text.class, -// SequenceFileOutputFormat.class, -// sc.hadoopConfiguration()); + logger.info("updatedAuthorsAcc: " + updatedAuthorsAcc.value().toString()); logger.info("parsedAuthorsAcc: " + parsedAuthorsAcc.value().toString()); logger.info("parsedWorksAcc: " + parsedWorksAcc.value().toString()); diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java index 4dbc403013..6ed53b9226 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java @@ -36,12 +36,12 @@ public class SparkUpdateOrcidAuthors { .setSerializationInclusion(JsonInclude.Include.NON_NULL); public static void main(String[] args) throws IOException, Exception { - Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class); + Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidAuthors.class); final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - SparkUpdateOrcidDatasets.class + SparkUpdateOrcidAuthors.class .getResourceAsStream( "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); parser.parseArgument(args); @@ -95,7 +95,7 @@ public class SparkUpdateOrcidAuthors { authorSummary = XMLRecordParser .VTDParseAuthorSummary(xmlAuthor.getBytes()); authorSummary.setStatusCode(statusCode); - authorSummary.setDownloadDate("2020-11-18 00:00:05.644768"); + authorSummary.setDownloadDate("2020-12-15 00:00:01.000000"); authorSummary.setBase64CompressData(compressedData); return authorSummary; } catch (Exception e) { @@ -105,7 +105,7 @@ public class SparkUpdateOrcidAuthors { } } else { authorSummary.setStatusCode(statusCode); - authorSummary.setDownloadDate("2020-11-18 00:00:05.644768"); + authorSummary.setDownloadDate("2020-12-15 00:00:01.000000"); errorCodeAuthorsFoundAcc.add(1); } return authorSummary; diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java deleted file mode 100644 index 71c011ebcf..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java +++ /dev/null @@ -1,317 +0,0 @@ - -package eu.dnetlib.doiboost.orcid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.IOException; -import java.util.Objects; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.util.LongAccumulator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.orcid.AuthorSummary; -import eu.dnetlib.dhp.schema.orcid.Work; -import eu.dnetlib.dhp.schema.orcid.WorkDetail; -import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser; -import eu.dnetlib.doiboost.orcidnodoi.xml.XMLRecordParserNoDoi; -import scala.Tuple2; - -public class SparkUpdateOrcidDatasets { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .setSerializationInclusion(JsonInclude.Include.NON_NULL); - - public static void main(String[] args) throws IOException, Exception { - Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkUpdateOrcidDatasets.class - .getResourceAsStream( - "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); - parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - final String workingPath = parser.get("workingPath"); -// final String outputPath = parser.get("outputPath"); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - LongAccumulator oldAuthorsFoundAcc = spark - .sparkContext() - .longAccumulator("old_authors_found"); - LongAccumulator updatedAuthorsFoundAcc = spark - .sparkContext() - .longAccumulator("updated_authors_found"); - LongAccumulator newAuthorsFoundAcc = spark - .sparkContext() - .longAccumulator("new_authors_found"); - LongAccumulator errorCodeAuthorsFoundAcc = spark - .sparkContext() - .longAccumulator("error_code_authors_found"); - LongAccumulator errorLoadingAuthorsJsonFoundAcc = spark - .sparkContext() - .longAccumulator("error_loading_authors_json_found"); - LongAccumulator errorParsingAuthorsXMLFoundAcc = spark - .sparkContext() - .longAccumulator("error_parsing_authors_xml_found"); - - LongAccumulator oldWorksFoundAcc = spark - .sparkContext() - .longAccumulator("old_works_found"); - LongAccumulator updatedWorksFoundAcc = spark - .sparkContext() - .longAccumulator("updated_works_found"); - LongAccumulator newWorksFoundAcc = spark - .sparkContext() - .longAccumulator("new_works_found"); - LongAccumulator errorCodeWorksFoundAcc = spark - .sparkContext() - .longAccumulator("error_code_works_found"); - LongAccumulator errorLoadingWorksJsonFoundAcc = spark - .sparkContext() - .longAccumulator("error_loading_works_json_found"); - LongAccumulator errorParsingWorksXMLFoundAcc = spark - .sparkContext() - .longAccumulator("error_parsing_works_xml_found"); - -// JavaPairRDD xmlSummariesRDD = sc -// .sequenceFile(workingPath.concat("xml/authors/xml_authors.seq"), Text.class, Text.class); -// xmlSummariesRDD -// .map(seq -> { -// AuthorSummary authorSummary = XMLRecordParser -// .VTDParseAuthorSummary(seq._2().toString().getBytes()); -// authorSummary -// .setBase64CompressData(ArgumentApplicationParser.compressArgument(seq._2().toString())); -// return authorSummary; -// }) -// .filter(authorSummary -> authorSummary != null) -// .map(authorSummary -> JsonWriter.create(authorSummary)) -// .saveAsTextFile(workingPath.concat("orcid_dataset/authors"), GzipCodec.class); -// -// JavaPairRDD xmlWorksRDD = sc -// .sequenceFile(workingPath.concat("xml/works/*"), Text.class, Text.class); -// -// xmlWorksRDD -// .map(seq -> { -// WorkDetail workDetail = XMLRecordParserNoDoi.VTDParseWorkData(seq._2().toString().getBytes()); -// Work work = new Work(); -// work.setWorkDetail(workDetail); -// work.setBase64CompressData(ArgumentApplicationParser.compressArgument(seq._2().toString())); -// return work; -// }) -// .filter(work -> work != null) -// .map(work -> JsonWriter.create(work)) -// .saveAsTextFile(workingPath.concat("orcid_dataset/works"), GzipCodec.class); - -// Function, AuthorSummary> retrieveAuthorSummaryFunction = data -> { -// AuthorSummary authorSummary = new AuthorSummary(); -// String orcidId = data._1().toString(); -// String jsonData = data._2().toString(); -// JsonElement jElement = new JsonParser().parse(jsonData); -// String statusCode = getJsonValue(jElement, "statusCode"); -// String downloadDate = getJsonValue(jElement, "lastModifiedDate"); -// if (statusCode.equals("200")) { -// String compressedData = getJsonValue(jElement, "compressedData"); -// if (StringUtils.isEmpty(compressedData)) { -// errorLoadingAuthorsJsonFoundAcc.add(1); -// } else { -// String xmlAuthor = ArgumentApplicationParser.decompressValue(compressedData); -// try { -// authorSummary = XMLRecordParser -// .VTDParseAuthorSummary(xmlAuthor.getBytes()); -// authorSummary.setStatusCode(statusCode); -// authorSummary.setDownloadDate("2020-11-18 00:00:05.644768"); -// authorSummary.setBase64CompressData(compressedData); -// return authorSummary; -// } catch (Exception e) { -// logger.error("parsing xml " + orcidId + " [" + jsonData + "]", e); -// errorParsingAuthorsXMLFoundAcc.add(1); -// } -// } -// } else { -// authorSummary.setStatusCode(statusCode); -// authorSummary.setDownloadDate("2020-11-18 00:00:05.644768"); -// errorCodeAuthorsFoundAcc.add(1); -// } -// return authorSummary; -// }; -// -// Dataset downloadedAuthorSummaryDS = spark -// .createDataset( -// sc -// .sequenceFile(workingPath + "downloads/updated_authors/*", Text.class, Text.class) -// .map(retrieveAuthorSummaryFunction) -// .rdd(), -// Encoders.bean(AuthorSummary.class)); -// Dataset currentAuthorSummaryDS = spark -// .createDataset( -// sc -// .textFile(workingPath.concat("orcid_dataset/authors/*")) -// .map(item -> OBJECT_MAPPER.readValue(item, AuthorSummary.class)) -// .rdd(), -// Encoders.bean(AuthorSummary.class)); -// currentAuthorSummaryDS -// .joinWith( -// downloadedAuthorSummaryDS, -// currentAuthorSummaryDS -// .col("authorData.oid") -// .equalTo(downloadedAuthorSummaryDS.col("authorData.oid")), -// "full_outer") -// .map(value -> { -// Optional opCurrent = Optional.ofNullable(value._1()); -// Optional opDownloaded = Optional.ofNullable(value._2()); -// if (!opCurrent.isPresent()) { -// newAuthorsFoundAcc.add(1); -// return opDownloaded.get(); -// } -// if (!opDownloaded.isPresent()) { -// oldAuthorsFoundAcc.add(1); -// return opCurrent.get(); -// } -// if (opCurrent.isPresent() && opDownloaded.isPresent()) { -// updatedAuthorsFoundAcc.add(1); -// return opDownloaded.get(); -// } -// return null; -// }, -// Encoders.bean(AuthorSummary.class)) -// .filter(Objects::nonNull) -// .toJavaRDD() -// .map(authorSummary -> OBJECT_MAPPER.writeValueAsString(authorSummary)) -// .saveAsTextFile(workingPath.concat("orcid_dataset/new_authors"), GzipCodec.class); -// -// logger.info("oldAuthorsFoundAcc: " + oldAuthorsFoundAcc.value().toString()); -// logger.info("newAuthorsFoundAcc: " + newAuthorsFoundAcc.value().toString()); -// logger.info("updatedAuthorsFoundAcc: " + updatedAuthorsFoundAcc.value().toString()); -// logger.info("errorCodeFoundAcc: " + errorCodeAuthorsFoundAcc.value().toString()); -// logger.info("errorLoadingJsonFoundAcc: " + errorLoadingAuthorsJsonFoundAcc.value().toString()); -// logger.info("errorParsingXMLFoundAcc: " + errorParsingAuthorsXMLFoundAcc.value().toString()); - - Function retrieveWorkFunction = jsonData -> { - Work work = new Work(); - JsonElement jElement = new JsonParser().parse(jsonData); - String statusCode = getJsonValue(jElement, "statusCode"); - work.setStatusCode(statusCode); - String downloadDate = getJsonValue(jElement, "lastModifiedDate"); - work.setDownloadDate("2020-11-18 00:00:05.644768"); - if (statusCode.equals("200")) { - String compressedData = getJsonValue(jElement, "compressedData"); - if (StringUtils.isEmpty(compressedData)) { - errorLoadingWorksJsonFoundAcc.add(1); - } else { - String xmlWork = ArgumentApplicationParser.decompressValue(compressedData); - try { - WorkDetail workDetail = XMLRecordParserNoDoi - .VTDParseWorkData(xmlWork.getBytes()); - work.setWorkDetail(workDetail); - work.setBase64CompressData(compressedData); - return work; - } catch (Exception e) { - logger.error("parsing xml [" + jsonData + "]", e); - errorParsingWorksXMLFoundAcc.add(1); - } - } - } else { - errorCodeWorksFoundAcc.add(1); - } - return work; - }; - - Dataset downloadedWorksDS = spark - .createDataset( - sc - .textFile(workingPath + "downloads/updated_works/*") - .map(s -> { - return s.substring(21, s.length() - 1); - }) - .map(retrieveWorkFunction) - .rdd(), - Encoders.bean(Work.class)); - Dataset currentWorksDS = spark - .createDataset( - sc - .textFile(workingPath.concat("orcid_dataset/works/*")) - .map(item -> OBJECT_MAPPER.readValue(item, Work.class)) - .rdd(), - Encoders.bean(Work.class)); - currentWorksDS - .joinWith( - downloadedWorksDS, - currentWorksDS - .col("workDetail.id") - .equalTo(downloadedWorksDS.col("workDetail.id")) - .and( - currentWorksDS - .col("workDetail.oid") - .equalTo(downloadedWorksDS.col("workDetail.oid"))), - "full_outer") - .map(value -> { - Optional opCurrent = Optional.ofNullable(value._1()); - Optional opDownloaded = Optional.ofNullable(value._2()); - if (!opCurrent.isPresent()) { - newWorksFoundAcc.add(1); - return opDownloaded.get(); - } - if (!opDownloaded.isPresent()) { - oldWorksFoundAcc.add(1); - return opCurrent.get(); - } - if (opCurrent.isPresent() && opDownloaded.isPresent()) { - updatedWorksFoundAcc.add(1); - return opDownloaded.get(); - } - return null; - }, - Encoders.bean(Work.class)) - .filter(Objects::nonNull) - .toJavaRDD() - .map(work -> OBJECT_MAPPER.writeValueAsString(work)) - .saveAsTextFile(workingPath.concat("orcid_dataset/new_works"), GzipCodec.class); - - logger.info("oldWorksFoundAcc: " + oldWorksFoundAcc.value().toString()); - logger.info("newWorksFoundAcc: " + newWorksFoundAcc.value().toString()); - logger.info("updatedWorksFoundAcc: " + updatedWorksFoundAcc.value().toString()); - logger.info("errorCodeWorksFoundAcc: " + errorCodeWorksFoundAcc.value().toString()); - logger.info("errorLoadingJsonWorksFoundAcc: " + errorLoadingWorksJsonFoundAcc.value().toString()); - logger.info("errorParsingXMLWorksFoundAcc: " + errorParsingWorksXMLFoundAcc.value().toString()); - - }); - } - - private static String getJsonValue(JsonElement jElement, String property) { - if (jElement.getAsJsonObject().has(property)) { - JsonElement name = null; - name = jElement.getAsJsonObject().get(property); - if (name != null && !name.isJsonNull()) { - return name.getAsString(); - } - } - return ""; - } -} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java index d06aac98a2..efdecb3b96 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java @@ -35,12 +35,12 @@ public class SparkUpdateOrcidWorks { .setSerializationInclusion(JsonInclude.Include.NON_NULL); public static void main(String[] args) throws IOException, Exception { - Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class); + Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidWorks.class); final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - SparkUpdateOrcidDatasets.class + SparkUpdateOrcidWorks.class .getResourceAsStream( "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); parser.parseArgument(args); @@ -83,7 +83,7 @@ public class SparkUpdateOrcidWorks { String statusCode = getJsonValue(jElement, "statusCode"); work.setStatusCode(statusCode); String downloadDate = getJsonValue(jElement, "lastModifiedDate"); - work.setDownloadDate("2020-11-18 00:00:05.644768"); + work.setDownloadDate("2020-12-15 00:00:01.000000"); if (statusCode.equals("200")) { String compressedData = getJsonValue(jElement, "compressedData"); if (StringUtils.isEmpty(compressedData)) { diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/config-default.xml deleted file mode 100644 index 5621415d97..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/config-default.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.action.sharelib.for.java - spark2 - - - oozie.launcher.mapreduce.user.classpath.first - true - - - oozie.launcher.mapreduce.map.java.opts - -Xmx4g - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml index a1537387ea..f9c5b9af5a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml @@ -1,9 +1,25 @@ + + spark2UpdateStepMaxExecutors + 50 + workingPath the working dir base path + + oozie.action.sharelib.for.java + spark2 + + + oozie.launcher.mapreduce.user.classpath.first + true + + + oozie.launcher.mapreduce.map.java.opts + -Xmx4g + token access token @@ -30,7 +46,7 @@ number of cores used by single executor - spark2MaxExecutors + spark2DownloadingMaxExecutors 10 @@ -58,6 +74,8 @@ + ${jobTracker} + ${nameNode} oozie.action.sharelib.for.spark @@ -66,18 +84,16 @@ - - - + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - + + - - + @@ -92,7 +108,7 @@ ${shell_cmd} - + @@ -118,7 +134,16 @@ -olast_modified.seq -t- - + + + + + + + + + + @@ -131,7 +156,7 @@ dhp-doiboost-${projectVersion}.jar --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.dynamicAllocation.maxExecutors=${spark2DownloadingMaxExecutors} --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} @@ -145,7 +170,7 @@ -odownloads/updated_authors -t${token} - + @@ -158,7 +183,7 @@ dhp-doiboost-${projectVersion}.jar --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.dynamicAllocation.maxExecutors=${spark2DownloadingMaxExecutors} --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} @@ -172,6 +197,95 @@ -odownloads/updated_works -t${token} + + + + + + + yarn-cluster + cluster + UpdateOrcidAuthors + eu.dnetlib.doiboost.orcid.SparkUpdateOrcidAuthors + dhp-doiboost-${projectVersion}.jar + + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2UpdateStepMaxExecutors} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + + -w${workingPath}/ + -n${nameNode} + -f- + -o- + -t- + + + + + + + + yarn-cluster + cluster + UpdateOrcidWorks + eu.dnetlib.doiboost.orcid.SparkUpdateOrcidWorks + dhp-doiboost-${projectVersion}.jar + + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2UpdateStepMaxExecutors} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + + -w${workingPath}/ + -n${nameNode} + -f- + -o- + -t- + + + + + + + + + + + + ${workingPath}/orcid_dataset/new_authors/* + ${workingPath}/orcid_dataset/authors + + + + + + + + + + + + ${workingPath}/orcid_dataset/new_works/* + ${workingPath}/orcid_dataset/works + + + + + + + + + + diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java index dac60b1986..e25eb906cd 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java @@ -51,43 +51,6 @@ public class OrcidClientTest { // -H 'Authorization: Bearer 78fdb232-7105-4086-8570-e153f4198e3d' // 'https://api.orcid.org/v3.0/0000-0001-7291-3210/record' - @Test - private void multipleDownloadTest() throws Exception { - int toDownload = 10; - long start = System.currentTimeMillis(); - OrcidDownloader downloader = new OrcidDownloader(); - TarArchiveInputStream input = new TarArchiveInputStream( - new GzipCompressorInputStream(new FileInputStream("/tmp/last_modified.csv.tar"))); - TarArchiveEntry entry = input.getNextTarEntry(); - BufferedReader br = null; - StringBuilder sb = new StringBuilder(); - int rowNum = 0; - int entryNum = 0; - int modified = 0; - while (entry != null) { - br = new BufferedReader(new InputStreamReader(input)); // Read directly from tarInput - String line; - while ((line = br.readLine()) != null) { - String[] values = line.toString().split(","); - List recordInfo = Arrays.asList(values); - String orcidId = recordInfo.get(0); - if (downloader.isModified(orcidId, recordInfo.get(3))) { - slowedDownDownload(orcidId); - modified++; - } - rowNum++; - if (modified > toDownload) { - break; - } - } - entryNum++; - entry = input.getNextTarEntry(); - } - long end = System.currentTimeMillis(); - logToFile("start test: " + new Date(start).toString()); - logToFile("end test: " + new Date(end).toString()); - } - @Test private void downloadTest(String orcid) throws Exception { String record = testDownloadRecord(orcid, REQUEST_TYPE_RECORD); @@ -228,37 +191,6 @@ public class OrcidClientTest { } } - @Test - private void lambdaFileCounterTest() throws Exception { - final String lastUpdate = "2020-09-29 00:00:00"; - OrcidDownloader downloader = new OrcidDownloader(); - TarArchiveInputStream input = new TarArchiveInputStream( - new GzipCompressorInputStream(new FileInputStream("/tmp/last_modified.csv.tar"))); - TarArchiveEntry entry = input.getNextTarEntry(); - BufferedReader br = null; - StringBuilder sb = new StringBuilder(); - int rowNum = 0; - int entryNum = 0; - int modified = 0; - while (entry != null) { - br = new BufferedReader(new InputStreamReader(input)); // Read directly from tarInput - String line; - while ((line = br.readLine()) != null) { - String[] values = line.toString().split(","); - List recordInfo = Arrays.asList(values); - String orcidId = recordInfo.get(0); - if (downloader.isModified(orcidId, recordInfo.get(3))) { - modified++; - } - rowNum++; - } - entryNum++; - entry = input.getNextTarEntry(); - } - logToFile("rowNum: " + rowNum); - logToFile("modified: " + modified); - } - public static void logToFile(String log) throws IOException { log = log.concat("\n");