all actions from download lambda file to merge updated data into one wf

This commit is contained in:
Enrico Ottonello 2020-12-15 10:42:55 +01:00
parent efe4c2a9c5
commit b2de598c1a
9 changed files with 146 additions and 649 deletions

View File

@ -1,208 +0,0 @@
package eu.dnetlib.doiboost.orcid;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.mortbay.log.Log;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class OrcidDownloader extends OrcidDSManager {
static final int REQ_LIMIT = 24;
static final int REQ_MAX_TEST = -1;
static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 500;
static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
static final String lastUpdate = "2020-09-29 00:00:00";
private String lambdaFileName;
private String outputPath;
private String token;
public static void main(String[] args) throws IOException, Exception {
OrcidDownloader orcidDownloader = new OrcidDownloader();
orcidDownloader.loadArgs(args);
orcidDownloader.parseLambdaFile();
}
private String downloadRecord(String orcidId) throws IOException {
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record");
httpGet.addHeader("Accept", "application/vnd.orcid+xml");
httpGet.addHeader("Authorization", String.format("Bearer %s", token));
CloseableHttpResponse response = client.execute(httpGet);
if (response.getStatusLine().getStatusCode() != 200) {
Log
.info(
"Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode());
return new String("");
}
// return IOUtils.toString(response.getEntity().getContent());
return xmlStreamToString(response.getEntity().getContent());
}
}
private String xmlStreamToString(InputStream xmlStream) throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(xmlStream));
String line;
StringBuffer buffer = new StringBuffer();
while ((line = br.readLine()) != null) {
buffer.append(line);
}
return buffer.toString();
}
public void parseLambdaFile() throws Exception {
int parsedRecordsCounter = 0;
int downloadedRecordsCounter = 0;
int savedRecordsCounter = 0;
long startDownload = 0;
Configuration conf = initConfigurationObject();
FileSystem fs = initFileSystemObject(conf);
String lambdaFileUri = hdfsServerUri.concat(workingPath).concat(lambdaFileName);
Path hdfsreadpath = new Path(lambdaFileUri);
FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath);
Path hdfsoutputPath = new Path(
hdfsServerUri
.concat(workingPath)
.concat(outputPath)
.concat("updated_xml_authors.seq"));
try (TarArchiveInputStream tais = new TarArchiveInputStream(
new GzipCompressorInputStream(lambdaFileStream))) {
TarArchiveEntry entry = null;
StringBuilder sb = new StringBuilder();
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
conf,
SequenceFile.Writer.file(hdfsoutputPath),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()))) {
startDownload = System.currentTimeMillis();
while ((entry = tais.getNextTarEntry()) != null) {
BufferedReader br = new BufferedReader(new InputStreamReader(tais)); // Read directly from tarInput
String line;
while ((line = br.readLine()) != null) {
String[] values = line.split(",");
List<String> recordInfo = Arrays.asList(values);
int nReqTmp = 0;
long startReqTmp = System.currentTimeMillis();
// skip headers line
if (parsedRecordsCounter == 0) {
parsedRecordsCounter++;
continue;
}
parsedRecordsCounter++;
String orcidId = recordInfo.get(0);
if (isModified(orcidId, recordInfo.get(3))) {
String record = downloadRecord(orcidId);
downloadedRecordsCounter++;
if (!record.isEmpty()) {
// String compressRecord = ArgumentApplicationParser.compressArgument(record);
final Text key = new Text(recordInfo.get(0));
final Text value = new Text(record);
writer.append(key, value);
savedRecordsCounter++;
}
} else {
break;
}
long endReq = System.currentTimeMillis();
nReqTmp++;
if (nReqTmp == REQ_LIMIT) {
long reqSessionDuration = endReq - startReqTmp;
if (reqSessionDuration <= 1000) {
Log
.info(
"\nreqSessionDuration: "
+ reqSessionDuration
+ " nReqTmp: "
+ nReqTmp
+ " wait ....");
Thread.sleep(1000 - reqSessionDuration);
} else {
nReqTmp = 0;
startReqTmp = System.currentTimeMillis();
}
}
if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) {
Log
.info(
"Current parsed: "
+ parsedRecordsCounter
+ " downloaded: "
+ downloadedRecordsCounter
+ " saved: "
+ savedRecordsCounter);
if (REQ_MAX_TEST != -1 && parsedRecordsCounter > REQ_MAX_TEST) {
break;
}
}
}
long endDownload = System.currentTimeMillis();
long downloadTime = endDownload - startDownload;
Log.info("Download time: " + ((downloadTime / 1000) / 60) + " minutes");
}
}
}
Log.info("Download started at: " + new Date(startDownload).toString());
Log.info("Download ended at: " + new Date(System.currentTimeMillis()).toString());
Log.info("Parsed Records Counter: " + parsedRecordsCounter);
Log.info("Downloaded Records Counter: " + downloadedRecordsCounter);
Log.info("Saved Records Counter: " + savedRecordsCounter);
}
private void loadArgs(String[] args) throws IOException, Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
OrcidDownloader.class
.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
parser.parseArgument(args);
hdfsServerUri = parser.get("hdfsServerUri");
Log.info("HDFS URI: " + hdfsServerUri);
workingPath = parser.get("workingPath");
Log.info("Default Path: " + workingPath);
lambdaFileName = parser.get("lambdaFileName");
Log.info("Lambda File Name: " + lambdaFileName);
outputPath = parser.get("outputPath");
Log.info("Output Data: " + outputPath);
token = parser.get("token");
}
public boolean isModified(String orcidId, String modifiedDate) {
Date modifiedDateDt = null;
Date lastUpdateDt = null;
try {
if (modifiedDate.length() != 19) {
modifiedDate = modifiedDate.substring(0, 19);
}
modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate);
lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate);
} catch (Exception e) {
Log.info("[" + orcidId + "] Parsing date: ", e.getMessage());
return true;
}
return modifiedDateDt.after(lastUpdateDt);
}
}

View File

@ -34,7 +34,7 @@ public class SparkDownloadOrcidAuthors {
static Logger logger = LoggerFactory.getLogger(SparkDownloadOrcidAuthors.class);
static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
static final String lastUpdate = "2020-09-29 00:00:00";
static final String lastUpdate = "2020-11-18 00:00:05";
public static void main(String[] args) throws IOException, Exception {
@ -69,6 +69,7 @@ public class SparkDownloadOrcidAuthors {
LongAccumulator modifiedRecordsAcc = spark.sparkContext().longAccumulator("to_download_records");
LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloaded_records");
LongAccumulator errorHTTP403Acc = spark.sparkContext().longAccumulator("error_HTTP_403");
LongAccumulator errorHTTP404Acc = spark.sparkContext().longAccumulator("error_HTTP_404");
LongAccumulator errorHTTP409Acc = spark.sparkContext().longAccumulator("error_HTTP_409");
LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503");
LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525");
@ -113,6 +114,8 @@ public class SparkDownloadOrcidAuthors {
switch (statusCode) {
case 403:
errorHTTP403Acc.add(1);
case 404:
errorHTTP404Acc.add(1);
case 409:
errorHTTP409Acc.add(1);
case 503:
@ -149,7 +152,7 @@ public class SparkDownloadOrcidAuthors {
logger.info("Authors modified count: " + authorsModifiedRDD.count());
logger.info("Start downloading ...");
authorsModifiedRDD
.repartition(10)
.repartition(100)
.map(downloadRecordFunction)
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
.saveAsNewAPIHadoopFile(
@ -158,10 +161,12 @@ public class SparkDownloadOrcidAuthors {
Text.class,
SequenceFileOutputFormat.class,
sc.hadoopConfiguration());
logger.info("parsedRecordsAcc: " + parsedRecordsAcc.value().toString());
logger.info("modifiedRecordsAcc: " + modifiedRecordsAcc.value().toString());
logger.info("downloadedRecordsAcc: " + downloadedRecordsAcc.value().toString());
logger.info("errorHTTP403Acc: " + errorHTTP403Acc.value().toString());
logger.info("errorHTTP404Acc: " + errorHTTP404Acc.value().toString());
logger.info("errorHTTP409Acc: " + errorHTTP409Acc.value().toString());
logger.info("errorHTTP503Acc: " + errorHTTP503Acc.value().toString());
logger.info("errorHTTP525Acc: " + errorHTTP525Acc.value().toString());

View File

@ -43,7 +43,7 @@ public class SparkDownloadOrcidWorks {
public static final String ORCID_XML_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
public static final DateTimeFormatter ORCID_XML_DATETIMEFORMATTER = DateTimeFormatter
.ofPattern(ORCID_XML_DATETIME_FORMAT);
public static final String lastUpdateValue = "2020-09-29 00:00:00";
public static final String lastUpdateValue = "2020-11-18 00:00:05";
public static void main(String[] args) throws IOException, Exception {
@ -89,6 +89,7 @@ public class SparkDownloadOrcidWorks {
.longAccumulator("error_parsing_xml_found");
LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloaded_records");
LongAccumulator errorHTTP403Acc = spark.sparkContext().longAccumulator("error_HTTP_403");
LongAccumulator errorHTTP404Acc = spark.sparkContext().longAccumulator("error_HTTP_404");
LongAccumulator errorHTTP409Acc = spark.sparkContext().longAccumulator("error_HTTP_409");
LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503");
LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525");
@ -163,6 +164,8 @@ public class SparkDownloadOrcidWorks {
switch (statusCode) {
case 403:
errorHTTP403Acc.add(1);
case 404:
errorHTTP404Acc.add(1);
case 409:
errorHTTP409Acc.add(1);
case 503:
@ -186,29 +189,19 @@ public class SparkDownloadOrcidWorks {
.compressArgument(IOUtils.toString(response.getEntity().getContent())));
} catch (Throwable e) {
logger.info("Downloading " + orcidId, e.getMessage());
if (downloaded.getStatusCode() == 503) {
throw new RuntimeException("Orcid request rate limit reached (HTTP 503)");
}
downloaded.setErrorMessage(e.getMessage());
return downloaded.toTuple2();
}
return downloaded.toTuple2();
};
// sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
updatedAuthorsRDD
.flatMap(retrieveWorkUrlFunction)
.repartition(100)
.map(downloadWorkFunction)
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
.saveAsTextFile(workingPath.concat(outputPath), GzipCodec.class);
// .saveAsNewAPIHadoopFile(
// workingPath.concat(outputPath),
// Text.class,
// Text.class,
// SequenceFileOutputFormat.class,
// sc.hadoopConfiguration());
logger.info("updatedAuthorsAcc: " + updatedAuthorsAcc.value().toString());
logger.info("parsedAuthorsAcc: " + parsedAuthorsAcc.value().toString());
logger.info("parsedWorksAcc: " + parsedWorksAcc.value().toString());

View File

@ -36,12 +36,12 @@ public class SparkUpdateOrcidAuthors {
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
public static void main(String[] args) throws IOException, Exception {
Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class);
Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidAuthors.class);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkUpdateOrcidDatasets.class
SparkUpdateOrcidAuthors.class
.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
parser.parseArgument(args);
@ -95,7 +95,7 @@ public class SparkUpdateOrcidAuthors {
authorSummary = XMLRecordParser
.VTDParseAuthorSummary(xmlAuthor.getBytes());
authorSummary.setStatusCode(statusCode);
authorSummary.setDownloadDate("2020-11-18 00:00:05.644768");
authorSummary.setDownloadDate("2020-12-15 00:00:01.000000");
authorSummary.setBase64CompressData(compressedData);
return authorSummary;
} catch (Exception e) {
@ -105,7 +105,7 @@ public class SparkUpdateOrcidAuthors {
}
} else {
authorSummary.setStatusCode(statusCode);
authorSummary.setDownloadDate("2020-11-18 00:00:05.644768");
authorSummary.setDownloadDate("2020-12-15 00:00:01.000000");
errorCodeAuthorsFoundAcc.add(1);
}
return authorSummary;

View File

@ -1,317 +0,0 @@
package eu.dnetlib.doiboost.orcid;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.orcid.AuthorSummary;
import eu.dnetlib.dhp.schema.orcid.Work;
import eu.dnetlib.dhp.schema.orcid.WorkDetail;
import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser;
import eu.dnetlib.doiboost.orcidnodoi.xml.XMLRecordParserNoDoi;
import scala.Tuple2;
public class SparkUpdateOrcidDatasets {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
public static void main(String[] args) throws IOException, Exception {
Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkUpdateOrcidDatasets.class
.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String workingPath = parser.get("workingPath");
// final String outputPath = parser.get("outputPath");
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
LongAccumulator oldAuthorsFoundAcc = spark
.sparkContext()
.longAccumulator("old_authors_found");
LongAccumulator updatedAuthorsFoundAcc = spark
.sparkContext()
.longAccumulator("updated_authors_found");
LongAccumulator newAuthorsFoundAcc = spark
.sparkContext()
.longAccumulator("new_authors_found");
LongAccumulator errorCodeAuthorsFoundAcc = spark
.sparkContext()
.longAccumulator("error_code_authors_found");
LongAccumulator errorLoadingAuthorsJsonFoundAcc = spark
.sparkContext()
.longAccumulator("error_loading_authors_json_found");
LongAccumulator errorParsingAuthorsXMLFoundAcc = spark
.sparkContext()
.longAccumulator("error_parsing_authors_xml_found");
LongAccumulator oldWorksFoundAcc = spark
.sparkContext()
.longAccumulator("old_works_found");
LongAccumulator updatedWorksFoundAcc = spark
.sparkContext()
.longAccumulator("updated_works_found");
LongAccumulator newWorksFoundAcc = spark
.sparkContext()
.longAccumulator("new_works_found");
LongAccumulator errorCodeWorksFoundAcc = spark
.sparkContext()
.longAccumulator("error_code_works_found");
LongAccumulator errorLoadingWorksJsonFoundAcc = spark
.sparkContext()
.longAccumulator("error_loading_works_json_found");
LongAccumulator errorParsingWorksXMLFoundAcc = spark
.sparkContext()
.longAccumulator("error_parsing_works_xml_found");
// JavaPairRDD<Text, Text> xmlSummariesRDD = sc
// .sequenceFile(workingPath.concat("xml/authors/xml_authors.seq"), Text.class, Text.class);
// xmlSummariesRDD
// .map(seq -> {
// AuthorSummary authorSummary = XMLRecordParser
// .VTDParseAuthorSummary(seq._2().toString().getBytes());
// authorSummary
// .setBase64CompressData(ArgumentApplicationParser.compressArgument(seq._2().toString()));
// return authorSummary;
// })
// .filter(authorSummary -> authorSummary != null)
// .map(authorSummary -> JsonWriter.create(authorSummary))
// .saveAsTextFile(workingPath.concat("orcid_dataset/authors"), GzipCodec.class);
//
// JavaPairRDD<Text, Text> xmlWorksRDD = sc
// .sequenceFile(workingPath.concat("xml/works/*"), Text.class, Text.class);
//
// xmlWorksRDD
// .map(seq -> {
// WorkDetail workDetail = XMLRecordParserNoDoi.VTDParseWorkData(seq._2().toString().getBytes());
// Work work = new Work();
// work.setWorkDetail(workDetail);
// work.setBase64CompressData(ArgumentApplicationParser.compressArgument(seq._2().toString()));
// return work;
// })
// .filter(work -> work != null)
// .map(work -> JsonWriter.create(work))
// .saveAsTextFile(workingPath.concat("orcid_dataset/works"), GzipCodec.class);
// Function<Tuple2<Text, Text>, AuthorSummary> retrieveAuthorSummaryFunction = data -> {
// AuthorSummary authorSummary = new AuthorSummary();
// String orcidId = data._1().toString();
// String jsonData = data._2().toString();
// JsonElement jElement = new JsonParser().parse(jsonData);
// String statusCode = getJsonValue(jElement, "statusCode");
// String downloadDate = getJsonValue(jElement, "lastModifiedDate");
// if (statusCode.equals("200")) {
// String compressedData = getJsonValue(jElement, "compressedData");
// if (StringUtils.isEmpty(compressedData)) {
// errorLoadingAuthorsJsonFoundAcc.add(1);
// } else {
// String xmlAuthor = ArgumentApplicationParser.decompressValue(compressedData);
// try {
// authorSummary = XMLRecordParser
// .VTDParseAuthorSummary(xmlAuthor.getBytes());
// authorSummary.setStatusCode(statusCode);
// authorSummary.setDownloadDate("2020-11-18 00:00:05.644768");
// authorSummary.setBase64CompressData(compressedData);
// return authorSummary;
// } catch (Exception e) {
// logger.error("parsing xml " + orcidId + " [" + jsonData + "]", e);
// errorParsingAuthorsXMLFoundAcc.add(1);
// }
// }
// } else {
// authorSummary.setStatusCode(statusCode);
// authorSummary.setDownloadDate("2020-11-18 00:00:05.644768");
// errorCodeAuthorsFoundAcc.add(1);
// }
// return authorSummary;
// };
//
// Dataset<AuthorSummary> downloadedAuthorSummaryDS = spark
// .createDataset(
// sc
// .sequenceFile(workingPath + "downloads/updated_authors/*", Text.class, Text.class)
// .map(retrieveAuthorSummaryFunction)
// .rdd(),
// Encoders.bean(AuthorSummary.class));
// Dataset<AuthorSummary> currentAuthorSummaryDS = spark
// .createDataset(
// sc
// .textFile(workingPath.concat("orcid_dataset/authors/*"))
// .map(item -> OBJECT_MAPPER.readValue(item, AuthorSummary.class))
// .rdd(),
// Encoders.bean(AuthorSummary.class));
// currentAuthorSummaryDS
// .joinWith(
// downloadedAuthorSummaryDS,
// currentAuthorSummaryDS
// .col("authorData.oid")
// .equalTo(downloadedAuthorSummaryDS.col("authorData.oid")),
// "full_outer")
// .map(value -> {
// Optional<AuthorSummary> opCurrent = Optional.ofNullable(value._1());
// Optional<AuthorSummary> opDownloaded = Optional.ofNullable(value._2());
// if (!opCurrent.isPresent()) {
// newAuthorsFoundAcc.add(1);
// return opDownloaded.get();
// }
// if (!opDownloaded.isPresent()) {
// oldAuthorsFoundAcc.add(1);
// return opCurrent.get();
// }
// if (opCurrent.isPresent() && opDownloaded.isPresent()) {
// updatedAuthorsFoundAcc.add(1);
// return opDownloaded.get();
// }
// return null;
// },
// Encoders.bean(AuthorSummary.class))
// .filter(Objects::nonNull)
// .toJavaRDD()
// .map(authorSummary -> OBJECT_MAPPER.writeValueAsString(authorSummary))
// .saveAsTextFile(workingPath.concat("orcid_dataset/new_authors"), GzipCodec.class);
//
// logger.info("oldAuthorsFoundAcc: " + oldAuthorsFoundAcc.value().toString());
// logger.info("newAuthorsFoundAcc: " + newAuthorsFoundAcc.value().toString());
// logger.info("updatedAuthorsFoundAcc: " + updatedAuthorsFoundAcc.value().toString());
// logger.info("errorCodeFoundAcc: " + errorCodeAuthorsFoundAcc.value().toString());
// logger.info("errorLoadingJsonFoundAcc: " + errorLoadingAuthorsJsonFoundAcc.value().toString());
// logger.info("errorParsingXMLFoundAcc: " + errorParsingAuthorsXMLFoundAcc.value().toString());
Function<String, Work> retrieveWorkFunction = jsonData -> {
Work work = new Work();
JsonElement jElement = new JsonParser().parse(jsonData);
String statusCode = getJsonValue(jElement, "statusCode");
work.setStatusCode(statusCode);
String downloadDate = getJsonValue(jElement, "lastModifiedDate");
work.setDownloadDate("2020-11-18 00:00:05.644768");
if (statusCode.equals("200")) {
String compressedData = getJsonValue(jElement, "compressedData");
if (StringUtils.isEmpty(compressedData)) {
errorLoadingWorksJsonFoundAcc.add(1);
} else {
String xmlWork = ArgumentApplicationParser.decompressValue(compressedData);
try {
WorkDetail workDetail = XMLRecordParserNoDoi
.VTDParseWorkData(xmlWork.getBytes());
work.setWorkDetail(workDetail);
work.setBase64CompressData(compressedData);
return work;
} catch (Exception e) {
logger.error("parsing xml [" + jsonData + "]", e);
errorParsingWorksXMLFoundAcc.add(1);
}
}
} else {
errorCodeWorksFoundAcc.add(1);
}
return work;
};
Dataset<Work> downloadedWorksDS = spark
.createDataset(
sc
.textFile(workingPath + "downloads/updated_works/*")
.map(s -> {
return s.substring(21, s.length() - 1);
})
.map(retrieveWorkFunction)
.rdd(),
Encoders.bean(Work.class));
Dataset<Work> currentWorksDS = spark
.createDataset(
sc
.textFile(workingPath.concat("orcid_dataset/works/*"))
.map(item -> OBJECT_MAPPER.readValue(item, Work.class))
.rdd(),
Encoders.bean(Work.class));
currentWorksDS
.joinWith(
downloadedWorksDS,
currentWorksDS
.col("workDetail.id")
.equalTo(downloadedWorksDS.col("workDetail.id"))
.and(
currentWorksDS
.col("workDetail.oid")
.equalTo(downloadedWorksDS.col("workDetail.oid"))),
"full_outer")
.map(value -> {
Optional<Work> opCurrent = Optional.ofNullable(value._1());
Optional<Work> opDownloaded = Optional.ofNullable(value._2());
if (!opCurrent.isPresent()) {
newWorksFoundAcc.add(1);
return opDownloaded.get();
}
if (!opDownloaded.isPresent()) {
oldWorksFoundAcc.add(1);
return opCurrent.get();
}
if (opCurrent.isPresent() && opDownloaded.isPresent()) {
updatedWorksFoundAcc.add(1);
return opDownloaded.get();
}
return null;
},
Encoders.bean(Work.class))
.filter(Objects::nonNull)
.toJavaRDD()
.map(work -> OBJECT_MAPPER.writeValueAsString(work))
.saveAsTextFile(workingPath.concat("orcid_dataset/new_works"), GzipCodec.class);
logger.info("oldWorksFoundAcc: " + oldWorksFoundAcc.value().toString());
logger.info("newWorksFoundAcc: " + newWorksFoundAcc.value().toString());
logger.info("updatedWorksFoundAcc: " + updatedWorksFoundAcc.value().toString());
logger.info("errorCodeWorksFoundAcc: " + errorCodeWorksFoundAcc.value().toString());
logger.info("errorLoadingJsonWorksFoundAcc: " + errorLoadingWorksJsonFoundAcc.value().toString());
logger.info("errorParsingXMLWorksFoundAcc: " + errorParsingWorksXMLFoundAcc.value().toString());
});
}
private static String getJsonValue(JsonElement jElement, String property) {
if (jElement.getAsJsonObject().has(property)) {
JsonElement name = null;
name = jElement.getAsJsonObject().get(property);
if (name != null && !name.isJsonNull()) {
return name.getAsString();
}
}
return "";
}
}

View File

@ -35,12 +35,12 @@ public class SparkUpdateOrcidWorks {
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
public static void main(String[] args) throws IOException, Exception {
Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class);
Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidWorks.class);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkUpdateOrcidDatasets.class
SparkUpdateOrcidWorks.class
.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
parser.parseArgument(args);
@ -83,7 +83,7 @@ public class SparkUpdateOrcidWorks {
String statusCode = getJsonValue(jElement, "statusCode");
work.setStatusCode(statusCode);
String downloadDate = getJsonValue(jElement, "lastModifiedDate");
work.setDownloadDate("2020-11-18 00:00:05.644768");
work.setDownloadDate("2020-12-15 00:00:01.000000");
if (statusCode.equals("200")) {
String compressedData = getJsonValue(jElement, "compressedData");
if (StringUtils.isEmpty(compressedData)) {

View File

@ -1,22 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.action.sharelib.for.java</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>-Xmx4g</value>
</property>
</configuration>

View File

@ -1,9 +1,25 @@
<workflow-app name="Orcid Updates Download" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>spark2UpdateStepMaxExecutors</name>
<value>50</value>
</property>
<property>
<name>workingPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>oozie.action.sharelib.for.java</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>-Xmx4g</value>
</property>
<property>
<name>token</name>
<description>access token</description>
@ -30,7 +46,7 @@
<description>number of cores used by single executor</description>
</property>
<property>
<name>spark2MaxExecutors</name>
<name>spark2DownloadingMaxExecutors</name>
<value>10</value>
</property>
<property>
@ -58,6 +74,8 @@
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
@ -66,18 +84,16 @@
</configuration>
</global>
<start to="ResetWorkingPath"/>
<start to="ResetLambda"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<action name="ResetLambda">
<fs>
<delete path='${workingPath}/downloads'/>
<delete path='${workingPath}/last_modified.csv.tar'/>
<mkdir path='${workingPath}/downloads'/>
<delete path='${workingPath}/last_modified.seq'/>
</fs>
<ok to="DownloadLambdaFile"/>
<error to="Kill"/>
@ -92,7 +108,7 @@
<argument>${shell_cmd}</argument>
<capture-output/>
</shell>
<ok to="DownloadUpdatedXMLAuthors"/>
<ok to="GenLastModifiedSeq"/>
<error to="Kill"/>
</action>
@ -118,7 +134,16 @@
<arg>-o</arg><arg>last_modified.seq</arg>
<arg>-t</arg><arg>-</arg>
</spark>
<ok to="End"/>
<ok to="ResetDownloads"/>
<error to="Kill"/>
</action>
<action name="ResetDownloads">
<fs>
<delete path='${workingPath}/downloads/updated_authors'/>
<delete path='${workingPath}/downloads/updated_works'/>
</fs>
<ok to="DownloadOrcidAuthors"/>
<error to="Kill"/>
</action>
@ -131,7 +156,7 @@
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.dynamicAllocation.maxExecutors=${spark2DownloadingMaxExecutors}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
@ -145,7 +170,7 @@
<arg>-o</arg><arg>downloads/updated_authors</arg>
<arg>-t</arg><arg>${token}</arg>
</spark>
<ok to="End"/>
<ok to="DownloadOrcidWorks"/>
<error to="Kill"/>
</action>
@ -158,7 +183,7 @@
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
--conf spark.dynamicAllocation.maxExecutors=${spark2DownloadingMaxExecutors}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
@ -172,6 +197,95 @@
<arg>-o</arg><arg>downloads/updated_works</arg>
<arg>-t</arg><arg>${token}</arg>
</spark>
<ok to="UpdateOrcidAuthors"/>
<error to="Kill"/>
</action>
<action name="UpdateOrcidAuthors">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>UpdateOrcidAuthors</name>
<class>eu.dnetlib.doiboost.orcid.SparkUpdateOrcidAuthors</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2UpdateStepMaxExecutors}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
</spark-opts>
<arg>-w</arg><arg>${workingPath}/</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>-</arg>
<arg>-o</arg><arg>-</arg>
<arg>-t</arg><arg>-</arg>
</spark>
<ok to="UpdateOrcidWorks"/>
<error to="Kill"/>
</action>
<action name="UpdateOrcidWorks">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>UpdateOrcidWorks</name>
<class>eu.dnetlib.doiboost.orcid.SparkUpdateOrcidWorks</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2UpdateStepMaxExecutors}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
</spark-opts>
<arg>-w</arg><arg>${workingPath}/</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>-</arg>
<arg>-o</arg><arg>-</arg>
<arg>-t</arg><arg>-</arg>
</spark>
<ok to="promoteOrcidAuthorsDataset"/>
<error to="Kill"/>
</action>
<action name="promoteOrcidAuthorsDataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${workingPath}/orcid_dataset/authors"/>
<mkdir path="${workingPath}/orcid_dataset/authors"/>
</prepare>
<arg>${workingPath}/orcid_dataset/new_authors/*</arg>
<arg>${workingPath}/orcid_dataset/authors</arg>
</distcp>
<ok to="promoteOrcidWorksDataset"/>
<error to="Kill"/>
</action>
<action name="promoteOrcidWorksDataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${workingPath}/orcid_dataset/works"/>
<mkdir path="${workingPath}/orcid_dataset/works"/>
</prepare>
<arg>${workingPath}/orcid_dataset/new_works/*</arg>
<arg>${workingPath}/orcid_dataset/works</arg>
</distcp>
<ok to="CleanWorkingPath"/>
<error to="Kill"/>
</action>
<action name="CleanWorkingPath">
<fs>
<delete path='${workingPath}/orcid_dataset/new_authors'/>
<delete path='${workingPath}/orcid_dataset/new_works'/>
</fs>
<ok to="End"/>
<error to="Kill"/>
</action>

View File

@ -51,43 +51,6 @@ public class OrcidClientTest {
// -H 'Authorization: Bearer 78fdb232-7105-4086-8570-e153f4198e3d'
// 'https://api.orcid.org/v3.0/0000-0001-7291-3210/record'
@Test
private void multipleDownloadTest() throws Exception {
int toDownload = 10;
long start = System.currentTimeMillis();
OrcidDownloader downloader = new OrcidDownloader();
TarArchiveInputStream input = new TarArchiveInputStream(
new GzipCompressorInputStream(new FileInputStream("/tmp/last_modified.csv.tar")));
TarArchiveEntry entry = input.getNextTarEntry();
BufferedReader br = null;
StringBuilder sb = new StringBuilder();
int rowNum = 0;
int entryNum = 0;
int modified = 0;
while (entry != null) {
br = new BufferedReader(new InputStreamReader(input)); // Read directly from tarInput
String line;
while ((line = br.readLine()) != null) {
String[] values = line.toString().split(",");
List<String> recordInfo = Arrays.asList(values);
String orcidId = recordInfo.get(0);
if (downloader.isModified(orcidId, recordInfo.get(3))) {
slowedDownDownload(orcidId);
modified++;
}
rowNum++;
if (modified > toDownload) {
break;
}
}
entryNum++;
entry = input.getNextTarEntry();
}
long end = System.currentTimeMillis();
logToFile("start test: " + new Date(start).toString());
logToFile("end test: " + new Date(end).toString());
}
@Test
private void downloadTest(String orcid) throws Exception {
String record = testDownloadRecord(orcid, REQUEST_TYPE_RECORD);
@ -228,37 +191,6 @@ public class OrcidClientTest {
}
}
@Test
private void lambdaFileCounterTest() throws Exception {
final String lastUpdate = "2020-09-29 00:00:00";
OrcidDownloader downloader = new OrcidDownloader();
TarArchiveInputStream input = new TarArchiveInputStream(
new GzipCompressorInputStream(new FileInputStream("/tmp/last_modified.csv.tar")));
TarArchiveEntry entry = input.getNextTarEntry();
BufferedReader br = null;
StringBuilder sb = new StringBuilder();
int rowNum = 0;
int entryNum = 0;
int modified = 0;
while (entry != null) {
br = new BufferedReader(new InputStreamReader(input)); // Read directly from tarInput
String line;
while ((line = br.readLine()) != null) {
String[] values = line.toString().split(",");
List<String> recordInfo = Arrays.asList(values);
String orcidId = recordInfo.get(0);
if (downloader.isModified(orcidId, recordInfo.get(3))) {
modified++;
}
rowNum++;
}
entryNum++;
entry = input.getNextTarEntry();
}
logToFile("rowNum: " + rowNum);
logToFile("modified: " + modified);
}
public static void logToFile(String log)
throws IOException {
log = log.concat("\n");