From 465ce39f75f8664da2199da5db44c757cfeb6fbf Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Thu, 4 Feb 2021 10:44:04 +0100 Subject: [PATCH] job execution now based on file last_update.txt on hdfs --- .../orcid/SparkDownloadOrcidAuthors.java | 8 +- .../orcid/SparkDownloadOrcidWorks.java | 8 +- .../orcid/SparkGenLastModifiedSeq.java | 15 +- .../orcid/SparkUpdateOrcidAuthors.java | 4 +- .../doiboost/orcid/SparkUpdateOrcidWorks.java | 7 +- .../dnetlib/doiboost/orcid/util/HDFSUtil.java | 38 ++++ .../orcid_update/oozie_app/workflow.xml | 163 ------------------ 7 files changed, 72 insertions(+), 171 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/util/HDFSUtil.java delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java index 71efdf28a..d480f1488 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.java @@ -28,13 +28,14 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData; +import eu.dnetlib.doiboost.orcid.util.HDFSUtil; import scala.Tuple2; public class SparkDownloadOrcidAuthors { static Logger logger = LoggerFactory.getLogger(SparkDownloadOrcidAuthors.class); static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; - static final String lastUpdate = "2020-11-18 00:00:05"; + static String lastUpdate; public static void main(String[] args) throws IOException, Exception { @@ -58,6 +59,8 @@ public class SparkDownloadOrcidAuthors { final String lambdaFileName = parser.get("lambdaFileName"); logger.info("lambdaFileName: ", lambdaFileName); + lastUpdate = HDFSUtil.readFromTextFile(workingPath.concat("last_update.txt")); + SparkConf conf = new SparkConf(); runWithSparkSession( conf, @@ -182,6 +185,9 @@ public class SparkDownloadOrcidAuthors { if (modifiedDate.length() != 19) { modifiedDate = modifiedDate.substring(0, 19); } + if (lastUpdate.length() != 19) { + lastUpdate = lastUpdate.substring(0, 19); + } modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); } catch (Exception e) { diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java index 871f2eaa7..51a378e06 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java @@ -31,6 +31,7 @@ import com.google.gson.JsonParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData; +import eu.dnetlib.doiboost.orcid.util.HDFSUtil; import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser; import scala.Tuple2; @@ -43,7 +44,7 @@ public class SparkDownloadOrcidWorks { public static final String ORCID_XML_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; public static final DateTimeFormatter ORCID_XML_DATETIMEFORMATTER = DateTimeFormatter .ofPattern(ORCID_XML_DATETIME_FORMAT); - public static final String lastUpdateValue = "2020-11-18 00:00:05"; + public static String lastUpdateValue; public static void main(String[] args) throws IOException, Exception { @@ -64,6 +65,11 @@ public class SparkDownloadOrcidWorks { final String outputPath = parser.get("outputPath"); final String token = parser.get("token"); + lastUpdateValue = HDFSUtil.readFromTextFile(workingPath.concat("last_update.txt")); + if (lastUpdateValue.length() != 19) { + lastUpdateValue = lastUpdateValue.substring(0, 19); + } + SparkConf conf = new SparkConf(); runWithSparkSession( conf, diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenLastModifiedSeq.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenLastModifiedSeq.java index f710635ab..003509f76 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenLastModifiedSeq.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenLastModifiedSeq.java @@ -3,9 +3,7 @@ package eu.dnetlib.doiboost.orcid; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; +import java.io.*; import java.net.URI; import java.util.Arrays; import java.util.List; @@ -17,6 +15,7 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -26,6 +25,7 @@ import org.apache.spark.SparkConf; import org.mortbay.log.Log; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.doiboost.orcid.util.HDFSUtil; public class SparkGenLastModifiedSeq { private static String hdfsServerUri; @@ -50,6 +50,9 @@ public class SparkGenLastModifiedSeq { outputPath = parser.get("outputPath"); lambdaFileName = parser.get("lambdaFileName"); String lambdaFileUri = hdfsServerUri.concat(workingPath).concat(lambdaFileName); + String lastModifiedDateFromLambdaFileUri = hdfsServerUri + .concat(workingPath) + .concat("last_modified_date_from_lambda_file.txt"); SparkConf sparkConf = new SparkConf(); runWithSparkSession( @@ -57,6 +60,7 @@ public class SparkGenLastModifiedSeq { isSparkSessionManaged, spark -> { int rowsNum = 0; + String lastModifiedAuthorDate = ""; Path output = new Path( hdfsServerUri .concat(workingPath) @@ -89,10 +93,15 @@ public class SparkGenLastModifiedSeq { final Text value = new Text(recordInfo.get(3)); writer.append(key, value); rowsNum++; + if (rowsNum == 2) { + lastModifiedAuthorDate = value.toString(); + } } + } } } + HDFSUtil.writeToTextFile(lastModifiedDateFromLambdaFileUri, lastModifiedAuthorDate); Log.info("Saved rows from lamda csv tar file: " + rowsNum); }); } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java index 6ed53b922..9d7fee053 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java @@ -95,7 +95,7 @@ public class SparkUpdateOrcidAuthors { authorSummary = XMLRecordParser .VTDParseAuthorSummary(xmlAuthor.getBytes()); authorSummary.setStatusCode(statusCode); - authorSummary.setDownloadDate("2020-12-15 00:00:01.000000"); + authorSummary.setDownloadDate(Long.toString(System.currentTimeMillis())); authorSummary.setBase64CompressData(compressedData); return authorSummary; } catch (Exception e) { @@ -105,7 +105,7 @@ public class SparkUpdateOrcidAuthors { } } else { authorSummary.setStatusCode(statusCode); - authorSummary.setDownloadDate("2020-12-15 00:00:01.000000"); + authorSummary.setDownloadDate(Long.toString(System.currentTimeMillis())); errorCodeAuthorsFoundAcc.add(1); } return authorSummary; diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java index efdecb3b9..a1e092ff6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java @@ -27,6 +27,7 @@ import com.google.gson.JsonParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.orcid.Work; import eu.dnetlib.dhp.schema.orcid.WorkDetail; +import eu.dnetlib.doiboost.orcid.util.HDFSUtil; import eu.dnetlib.doiboost.orcidnodoi.xml.XMLRecordParserNoDoi; public class SparkUpdateOrcidWorks { @@ -83,7 +84,7 @@ public class SparkUpdateOrcidWorks { String statusCode = getJsonValue(jElement, "statusCode"); work.setStatusCode(statusCode); String downloadDate = getJsonValue(jElement, "lastModifiedDate"); - work.setDownloadDate("2020-12-15 00:00:01.000000"); + work.setDownloadDate(Long.toString(System.currentTimeMillis())); if (statusCode.equals("200")) { String compressedData = getJsonValue(jElement, "compressedData"); if (StringUtils.isEmpty(compressedData)) { @@ -165,6 +166,10 @@ public class SparkUpdateOrcidWorks { logger.info("errorLoadingJsonWorksFoundAcc: " + errorLoadingWorksJsonFoundAcc.value().toString()); logger.info("errorParsingXMLWorksFoundAcc: " + errorParsingWorksXMLFoundAcc.value().toString()); + String lastModifiedDateFromLambdaFile = HDFSUtil + .readFromTextFile(workingPath.concat("last_modified_date_from_lambda_file.txt")); + HDFSUtil.writeToTextFile(workingPath.concat("last_update.txt"), lastModifiedDateFromLambdaFile); + logger.info("last_update file updated"); }); } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/util/HDFSUtil.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/util/HDFSUtil.java new file mode 100644 index 000000000..41e39c047 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/util/HDFSUtil.java @@ -0,0 +1,38 @@ + +package eu.dnetlib.doiboost.orcid.util; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class HDFSUtil { + + public static String readFromTextFile(String path) throws IOException { + Configuration conf = new Configuration(); + FileSystem fileSystem = FileSystem.get(conf); + FSDataInputStream inputStream = new FSDataInputStream(fileSystem.open(new Path(path))); + return IOUtils.toString(inputStream, StandardCharsets.UTF_8.name()); + } + + public static void writeToTextFile(String pathValue, String text) throws IOException { + Configuration conf = new Configuration(); + FileSystem fileSystem = FileSystem.get(conf); + Path path = new Path(pathValue); + if (fileSystem.exists(path)) { + fileSystem.delete(path, true); + } + FSDataOutputStream os = fileSystem.create(path); + BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); + br.write(text); + br.close(); + fileSystem.close(); + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml deleted file mode 100644 index 135e6a4c8..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml +++ /dev/null @@ -1,163 +0,0 @@ - - - - spark2MaxExecutors - 50 - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - oozieActionShareLibForSpark2 - oozie action sharelib for spark 2.* - - - spark2ExtraListeners - com.cloudera.spark.lineage.NavigatorAppListener - spark 2.* extra listeners classname - - - spark2SqlQueryExecutionListeners - com.cloudera.spark.lineage.NavigatorQueryListener - spark 2.* sql query execution listeners classname - - - spark2YarnHistoryServerAddress - spark 2.* yarn history server address - - - spark2EventLogDir - spark 2.* event log dir location - - - workingPath - the working dir base path - - - - - ${jobTracker} - ${nameNode} - - - oozie.action.sharelib.for.spark - ${oozieActionShareLibForSpark2} - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - yarn-cluster - cluster - UpdateOrcidAuthors - eu.dnetlib.doiboost.orcid.SparkUpdateOrcidAuthors - dhp-doiboost-${projectVersion}.jar - - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - - -w${workingPath}/ - -n${nameNode} - -f- - -o- - -t- - - - - - - - - yarn-cluster - cluster - UpdateOrcidWorks - eu.dnetlib.doiboost.orcid.SparkUpdateOrcidWorks - dhp-doiboost-${projectVersion}.jar - - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - - -w${workingPath}/ - -n${nameNode} - -f- - -o- - -t- - - - - - - - - - - - - ${workingPath}/orcid_dataset/new_authors/* - ${workingPath}/orcid_dataset/authors - - - - - - - - - - - - ${workingPath}/orcid_dataset/new_works/* - ${workingPath}/orcid_dataset/works - - - - - - - - - - - - - - - - \ No newline at end of file