From fc80e8c7de7916c164e251a083d6b59ce9e99ded Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Mon, 18 May 2020 19:51:29 +0200 Subject: [PATCH] added accumulator; last modified date of the record is added to saved data; lambda file is partitioned into 20 parts before starting downloading --- .../orcid/SparkOrcidGenerateAuthors.java | 41 +++++++++++++-- .../orcid/SparkPartitionLambdaFile.java | 50 +++++++++++++++++++ .../orcid/model/DownloadedRecordData.java | 15 +++++- .../orcid_gen_authors/oozie_app/workflow.xml | 25 ++++++++-- 4 files changed, 124 insertions(+), 7 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPartitionLambdaFile.java diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkOrcidGenerateAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkOrcidGenerateAuthors.java index 6a41616953..4e18ab8401 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkOrcidGenerateAuthors.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkOrcidGenerateAuthors.java @@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.List; import java.util.Optional; import org.apache.commons.io.IOUtils; @@ -20,6 +21,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,23 +63,53 @@ public class SparkOrcidGenerateAuthors { isSparkSessionManaged, spark -> { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD lamdaFileRDD = sc.textFile(workingPath + "last_modified.csv"); + + LongAccumulator parsedRecordsAcc = sc.sc().longAccumulator("parsedRecords"); + LongAccumulator modifiedRecordsAcc = sc.sc().longAccumulator("modifiedRecords"); + LongAccumulator downloadedRecordsAcc = sc.sc().longAccumulator("downloadedRecords"); + LongAccumulator alreadyDownloadedRecords = sc.sc().longAccumulator("alreadyDownloadedRecords"); + JavaRDD lamdaFileRDD = sc.textFile(workingPath + "lamdafiles"); + + JavaRDD downloadedRDD = sc.textFile(workingPath + "downloaded"); + Function getOrcidIdFunction = line -> { + try { + String[] values = line.split(","); + return values[0].substring(1); + } catch (Exception e) { + return new String(""); + } + }; + List downloadedRecords = downloadedRDD.map(getOrcidIdFunction).collect(); + Function isModifiedAfterFilter = line -> { String[] values = line.split(","); String orcidId = values[0]; + parsedRecordsAcc.add(1); if (isModified(orcidId, values[3])) { + modifiedRecordsAcc.add(1); return true; } return false; }; + Function isNotDownloadedFilter = line -> { + String[] values = line.split(","); + String orcidId = values[0]; + if (downloadedRecords.contains(orcidId)) { + alreadyDownloadedRecords.add(1); + return false; + } + return true; + }; Function> downloadRecordFunction = line -> { String[] values = line.split(","); String orcidId = values[0]; - return downloadRecord(orcidId, token); + String modifiedDate = values[3]; + return downloadRecord(orcidId, modifiedDate, token, downloadedRecordsAcc); }; lamdaFileRDD .filter(isModifiedAfterFilter) + .filter(isNotDownloadedFilter) .map(downloadRecordFunction) .rdd() .saveAsTextFile(workingPath.concat(outputAuthorsPath)); @@ -101,9 +133,11 @@ public class SparkOrcidGenerateAuthors { return modifiedDateDt.after(lastUpdateDt); } - private static Tuple2 downloadRecord(String orcidId, String token) { + private static Tuple2 downloadRecord(String orcidId, String modifiedDate, String token, + LongAccumulator downloadedRecordsAcc) { final DownloadedRecordData data = new DownloadedRecordData(); data.setOrcidId(orcidId); + data.setModifiedDate(modifiedDate); try (CloseableHttpClient client = HttpClients.createDefault()) { HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); httpGet.addHeader("Accept", "application/vnd.orcid+xml"); @@ -117,6 +151,7 @@ public class SparkOrcidGenerateAuthors { "Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); return data.toTuple2(); } + downloadedRecordsAcc.add(1); data .setCompressedData( ArgumentApplicationParser.compressArgument(IOUtils.toString(response.getEntity().getContent()))); diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPartitionLambdaFile.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPartitionLambdaFile.java new file mode 100644 index 0000000000..ca6f0f6c4c --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPartitionLambdaFile.java @@ -0,0 +1,50 @@ + +package eu.dnetlib.doiboost.orcid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class SparkPartitionLambdaFile { + + public static void main(String[] args) throws IOException, Exception { + Logger logger = LoggerFactory.getLogger(SparkOrcidGenerateAuthors.class); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkOrcidGenerateAuthors.class + .getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/gen_orcid_authors_parameters.json"))); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + final String workingPath = parser.get("workingPath"); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD lamdaFileRDD = sc.textFile(workingPath + "last_modified.csv"); + + lamdaFileRDD + .repartition(20) + .saveAsTextFile(workingPath.concat("lamdafiles")); + }); + } + +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/DownloadedRecordData.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/DownloadedRecordData.java index fdc28013e7..f66ef82a2f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/DownloadedRecordData.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/model/DownloadedRecordData.java @@ -12,6 +12,7 @@ import scala.Tuple2; public class DownloadedRecordData implements Serializable { private String orcidId; + private String modifiedDate; private String statusCode; private String compressedData; private String errorMessage; @@ -19,6 +20,7 @@ public class DownloadedRecordData implements Serializable { public Tuple2 toTuple2() { JsonObject data = new JsonObject(); data.addProperty("statusCode", getStatusCode()); + data.addProperty("modifiedDate", getModifiedDate()); if (getCompressedData() != null) { data.addProperty("compressedData", getCompressedData()); } @@ -45,7 +47,11 @@ public class DownloadedRecordData implements Serializable { } public int getStatusCode() { - return Integer.parseInt(statusCode); + try { + return Integer.parseInt(statusCode); + } catch (Exception e) { + return -2; + } } public void setStatusCode(int statusCode) { @@ -60,4 +66,11 @@ public class DownloadedRecordData implements Serializable { this.compressedData = compressedData; } + public String getModifiedDate() { + return modifiedDate; + } + + public void setModifiedDate(String modifiedDate) { + this.modifiedDate = modifiedDate; + } } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml index 479a970062..6ab2b89724 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml @@ -37,14 +37,14 @@ - + ${jobTracker} ${nameNode} yarn cluster - Gen_Orcid_Authors - eu.dnetlib.doiboost.orcid.SparkOrcidGenerateAuthors + Split_Lambda_File + eu.dnetlib.doiboost.orcid.SparkPartitionLambdaFile dhp-doiboost-1.2.1-SNAPSHOT.jar --num-executors 24 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} @@ -56,5 +56,24 @@ + + + ${jobTracker} + ${nameNode} + yarn + cluster + Gen_Orcid_Authors + eu.dnetlib.doiboost.orcid.SparkOrcidGenerateAuthors + dhp-doiboost-1.2.1-SNAPSHOT.jar + --num-executors 20 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + + -w${workingPath}/ + -oauthors/ + -t${token} + + + + + \ No newline at end of file