fix last update read/write from file on hdfs

This commit is contained in:
Enrico Ottonello 2021-02-09 23:24:57 +01:00
parent c238561001
commit ee4ba7298b
7 changed files with 205 additions and 158 deletions

View File

@ -8,6 +8,7 @@ import java.util.Date;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.CloseableHttpResponse;
@ -31,7 +32,6 @@ public class SparkDownloadOrcidAuthors {
static Logger logger = LoggerFactory.getLogger(SparkDownloadOrcidAuthors.class); static Logger logger = LoggerFactory.getLogger(SparkDownloadOrcidAuthors.class);
static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
static String lastUpdate;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -54,14 +54,18 @@ public class SparkDownloadOrcidAuthors {
final String token = parser.get("token"); final String token = parser.get("token");
final String lambdaFileName = parser.get("lambdaFileName"); final String lambdaFileName = parser.get("lambdaFileName");
logger.info("lambdaFileName: {}", lambdaFileName); logger.info("lambdaFileName: {}", lambdaFileName);
final String hdfsServerUri = parser.get("hdfsServerUri");
lastUpdate = HDFSUtil.readFromTextFile(workingPath.concat("last_update.txt"));
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
String lastUpdate = HDFSUtil.readFromTextFile(hdfsServerUri, workingPath, "last_update.txt");
logger.info("lastUpdate: ", lastUpdate);
if (StringUtils.isBlank(lastUpdate)) {
throw new RuntimeException("last update info not found");
}
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
LongAccumulator parsedRecordsAcc = spark.sparkContext().longAccumulator("parsed_records"); LongAccumulator parsedRecordsAcc = spark.sparkContext().longAccumulator("parsed_records");
@ -77,13 +81,14 @@ public class SparkDownloadOrcidAuthors {
logger.info("Retrieving data from lamda sequence file"); logger.info("Retrieving data from lamda sequence file");
JavaPairRDD<Text, Text> lamdaFileRDD = sc JavaPairRDD<Text, Text> lamdaFileRDD = sc
.sequenceFile(workingPath + lambdaFileName, Text.class, Text.class); .sequenceFile(workingPath + lambdaFileName, Text.class, Text.class);
logger.info("Data retrieved: " + lamdaFileRDD.count()); final long lamdaFileRDDCount = lamdaFileRDD.count();
logger.info("Data retrieved: " + lamdaFileRDDCount);
Function<Tuple2<Text, Text>, Boolean> isModifiedAfterFilter = data -> { Function<Tuple2<Text, Text>, Boolean> isModifiedAfterFilter = data -> {
String orcidId = data._1().toString(); String orcidId = data._1().toString();
String lastModifiedDate = data._2().toString(); String lastModifiedDate = data._2().toString();
parsedRecordsAcc.add(1); parsedRecordsAcc.add(1);
if (isModified(orcidId, lastModifiedDate)) { if (isModified(orcidId, lastModifiedDate, lastUpdate)) {
modifiedRecordsAcc.add(1); modifiedRecordsAcc.add(1);
return true; return true;
} }
@ -96,51 +101,42 @@ public class SparkDownloadOrcidAuthors {
final DownloadedRecordData downloaded = new DownloadedRecordData(); final DownloadedRecordData downloaded = new DownloadedRecordData();
downloaded.setOrcidId(orcidId); downloaded.setOrcidId(orcidId);
downloaded.setLastModifiedDate(lastModifiedDate); downloaded.setLastModifiedDate(lastModifiedDate);
try (CloseableHttpClient client = HttpClients.createDefault()) { CloseableHttpClient client = HttpClients.createDefault();
HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record");
httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Accept", "application/vnd.orcid+xml");
httpGet.addHeader("Authorization", String.format("Bearer %s", token)); httpGet.addHeader("Authorization", String.format("Bearer %s", token));
long startReq = System.currentTimeMillis(); long startReq = System.currentTimeMillis();
CloseableHttpResponse response = client.execute(httpGet); CloseableHttpResponse response = client.execute(httpGet);
long endReq = System.currentTimeMillis(); long endReq = System.currentTimeMillis();
long reqTime = endReq - startReq; long reqTime = endReq - startReq;
if (reqTime < 1000) { if (reqTime < 1000) {
Thread.sleep(1000 - reqTime); Thread.sleep(1000 - reqTime);
}
int statusCode = response.getStatusLine().getStatusCode();
downloaded.setStatusCode(statusCode);
if (statusCode != 200) {
switch (statusCode) {
case 403:
errorHTTP403Acc.add(1);
case 404:
errorHTTP404Acc.add(1);
case 409:
errorHTTP409Acc.add(1);
case 503:
errorHTTP503Acc.add(1);
case 525:
errorHTTP525Acc.add(1);
default:
errorHTTPGenericAcc.add(1);
} }
int statusCode = response.getStatusLine().getStatusCode();
downloaded.setStatusCode(statusCode);
if (statusCode != 200) {
switch (statusCode) {
case 403:
errorHTTP403Acc.add(1);
case 404:
errorHTTP404Acc.add(1);
case 409:
errorHTTP409Acc.add(1);
case 503:
errorHTTP503Acc.add(1);
throw new RuntimeException("Orcid request rate limit reached (HTTP 503)");
case 525:
errorHTTP525Acc.add(1);
default:
errorHTTPGenericAcc.add(1);
logger
.info(
"Downloading " + orcidId + " status code: "
+ response.getStatusLine().getStatusCode());
}
return downloaded.toTuple2();
}
downloadedRecordsAcc.add(1);
downloaded
.setCompressedData(
ArgumentApplicationParser
.compressArgument(IOUtils.toString(response.getEntity().getContent())));
} catch (Throwable e) {
logger.info("Downloading " + orcidId, e.getMessage());
downloaded.setErrorMessage(e.getMessage());
return downloaded.toTuple2(); return downloaded.toTuple2();
} }
downloadedRecordsAcc.add(1);
downloaded
.setCompressedData(
ArgumentApplicationParser
.compressArgument(IOUtils.toString(response.getEntity().getContent())));
client.close();
return downloaded.toTuple2(); return downloaded.toTuple2();
}; };
@ -148,7 +144,9 @@ public class SparkDownloadOrcidAuthors {
logger.info("Start execution ..."); logger.info("Start execution ...");
JavaPairRDD<Text, Text> authorsModifiedRDD = lamdaFileRDD.filter(isModifiedAfterFilter); JavaPairRDD<Text, Text> authorsModifiedRDD = lamdaFileRDD.filter(isModifiedAfterFilter);
logger.info("Authors modified count: " + authorsModifiedRDD.count()); long authorsModifiedCount = authorsModifiedRDD.count();
logger.info("Authors modified count: " + authorsModifiedCount);
logger.info("Start downloading ..."); logger.info("Start downloading ...");
authorsModifiedRDD authorsModifiedRDD
.repartition(100) .repartition(100)
@ -174,21 +172,27 @@ public class SparkDownloadOrcidAuthors {
} }
private static boolean isModified(String orcidId, String modifiedDate) { public static boolean isModified(String orcidId, String modifiedDate, String lastUpdate) {
Date modifiedDateDt; Date modifiedDateDt;
Date lastUpdateDt; Date lastUpdateDt;
String lastUpdateRedux = "";
try { try {
if (modifiedDate.equals("last_modified")) {
return false;
}
if (modifiedDate.length() != 19) { if (modifiedDate.length() != 19) {
modifiedDate = modifiedDate.substring(0, 19); modifiedDate = modifiedDate.substring(0, 19);
} }
if (lastUpdate.length() != 19) { if (lastUpdate.length() != 19) {
lastUpdate = lastUpdate.substring(0, 19); lastUpdateRedux = lastUpdate.substring(0, 19);
} else {
lastUpdateRedux = lastUpdate;
} }
modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate);
lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdateRedux);
} catch (Exception e) { } catch (Exception e) {
logger.info("[" + orcidId + "] Parsing date: ", e.getMessage()); throw new RuntimeException("[" + orcidId + "] modifiedDate <" + modifiedDate + "> lastUpdate <" + lastUpdate
return true; + "> Parsing date: " + e.getMessage());
} }
return modifiedDateDt.after(lastUpdateDt); return modifiedDateDt.after(lastUpdateDt);
} }

View File

@ -4,6 +4,7 @@ package eu.dnetlib.doiboost.orcid;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
@ -44,7 +45,6 @@ public class SparkDownloadOrcidWorks {
public static final String ORCID_XML_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; public static final String ORCID_XML_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
public static final DateTimeFormatter ORCID_XML_DATETIMEFORMATTER = DateTimeFormatter public static final DateTimeFormatter ORCID_XML_DATETIMEFORMATTER = DateTimeFormatter
.ofPattern(ORCID_XML_DATETIME_FORMAT); .ofPattern(ORCID_XML_DATETIME_FORMAT);
public static String lastUpdateValue;
public static void main(String[] args) throws IOException, Exception { public static void main(String[] args) throws IOException, Exception {
@ -64,17 +64,16 @@ public class SparkDownloadOrcidWorks {
logger.info("workingPath: ", workingPath); logger.info("workingPath: ", workingPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
final String token = parser.get("token"); final String token = parser.get("token");
final String hdfsServerUri = parser.get("hdfsServerUri");
lastUpdateValue = HDFSUtil.readFromTextFile(workingPath.concat("last_update.txt"));
if (lastUpdateValue.length() != 19) {
lastUpdateValue = lastUpdateValue.substring(0, 19);
}
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
final String lastUpdateValue = HDFSUtil.readFromTextFile(hdfsServerUri, workingPath, "last_update.txt");
logger.info("lastUpdateValue: ", lastUpdateValue);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
LongAccumulator updatedAuthorsAcc = spark.sparkContext().longAccumulator("updated_authors"); LongAccumulator updatedAuthorsAcc = spark.sparkContext().longAccumulator("updated_authors");
LongAccumulator parsedAuthorsAcc = spark.sparkContext().longAccumulator("parsed_authors"); LongAccumulator parsedAuthorsAcc = spark.sparkContext().longAccumulator("parsed_authors");
@ -136,7 +135,7 @@ public class SparkDownloadOrcidWorks {
parsedAuthorsAcc.add(1); parsedAuthorsAcc.add(1);
workIdLastModifiedDate.forEach((k, v) -> { workIdLastModifiedDate.forEach((k, v) -> {
parsedWorksAcc.add(1); parsedWorksAcc.add(1);
if (isModified(orcidId, v)) { if (isModified(orcidId, v, lastUpdateValue)) {
modifiedWorksAcc.add(1); modifiedWorksAcc.add(1);
workIds.add(orcidId.concat("/work/").concat(k)); workIds.add(orcidId.concat("/work/").concat(k));
} }
@ -153,51 +152,46 @@ public class SparkDownloadOrcidWorks {
final DownloadedRecordData downloaded = new DownloadedRecordData(); final DownloadedRecordData downloaded = new DownloadedRecordData();
downloaded.setOrcidId(orcidId); downloaded.setOrcidId(orcidId);
downloaded.setLastModifiedDate(lastUpdateValue); downloaded.setLastModifiedDate(lastUpdateValue);
try (CloseableHttpClient client = HttpClients.createDefault()) { CloseableHttpClient client = HttpClients.createDefault();
HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + relativeWorkUrl); HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + relativeWorkUrl);
httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Accept", "application/vnd.orcid+xml");
httpGet.addHeader("Authorization", String.format("Bearer %s", token)); httpGet.addHeader("Authorization", String.format("Bearer %s", token));
long startReq = System.currentTimeMillis(); long startReq = System.currentTimeMillis();
CloseableHttpResponse response = client.execute(httpGet); CloseableHttpResponse response = client.execute(httpGet);
long endReq = System.currentTimeMillis(); long endReq = System.currentTimeMillis();
long reqTime = endReq - startReq; long reqTime = endReq - startReq;
if (reqTime < 1000) { if (reqTime < 1000) {
Thread.sleep(1000 - reqTime); Thread.sleep(1000 - reqTime);
}
int statusCode = response.getStatusLine().getStatusCode();
downloaded.setStatusCode(statusCode);
if (statusCode != 200) {
switch (statusCode) {
case 403:
errorHTTP403Acc.add(1);
case 404:
errorHTTP404Acc.add(1);
case 409:
errorHTTP409Acc.add(1);
case 503:
errorHTTP503Acc.add(1);
case 525:
errorHTTP525Acc.add(1);
default:
errorHTTPGenericAcc.add(1);
logger
.info(
"Downloading " + orcidId + " status code: "
+ response.getStatusLine().getStatusCode());
} }
int statusCode = response.getStatusLine().getStatusCode();
downloaded.setStatusCode(statusCode);
if (statusCode != 200) {
switch (statusCode) {
case 403:
errorHTTP403Acc.add(1);
case 404:
errorHTTP404Acc.add(1);
case 409:
errorHTTP409Acc.add(1);
case 503:
errorHTTP503Acc.add(1);
throw new RuntimeException("Orcid request rate limit reached (HTTP 503)");
case 525:
errorHTTP525Acc.add(1);
default:
errorHTTPGenericAcc.add(1);
logger
.info(
"Downloading " + orcidId + " status code: "
+ response.getStatusLine().getStatusCode());
}
return downloaded.toTuple2();
}
downloadedRecordsAcc.add(1);
downloaded
.setCompressedData(
ArgumentApplicationParser
.compressArgument(IOUtils.toString(response.getEntity().getContent())));
} catch (Throwable e) {
logger.info("Downloading " + orcidId, e.getMessage());
downloaded.setErrorMessage(e.getMessage());
return downloaded.toTuple2(); return downloaded.toTuple2();
} }
downloadedRecordsAcc.add(1);
downloaded
.setCompressedData(
ArgumentApplicationParser
.compressArgument(IOUtils.toString(response.getEntity().getContent())));
client.close();
return downloaded.toTuple2(); return downloaded.toTuple2();
}; };
@ -227,12 +221,20 @@ public class SparkDownloadOrcidWorks {
} }
public static boolean isModified(String orcidId, String modifiedDateValue) { public static boolean isModified(String orcidId, String modifiedDateValue, String lastUpdateValue) {
LocalDate modifiedDate = null; LocalDate modifiedDate = null;
LocalDate lastUpdate = null; LocalDate lastUpdate = null;
modifiedDate = LocalDate.parse(modifiedDateValue, SparkDownloadOrcidWorks.ORCID_XML_DATETIMEFORMATTER); try {
lastUpdate = LocalDate modifiedDate = LocalDate.parse(modifiedDateValue, SparkDownloadOrcidWorks.ORCID_XML_DATETIMEFORMATTER);
.parse(SparkDownloadOrcidWorks.lastUpdateValue, SparkDownloadOrcidWorks.LAMBDA_FILE_DATE_FORMATTER); if (lastUpdateValue.length() != 19) {
lastUpdateValue = lastUpdateValue.substring(0, 19);
}
lastUpdate = LocalDate
.parse(lastUpdateValue, SparkDownloadOrcidWorks.LAMBDA_FILE_DATE_FORMATTER);
} catch (Exception e) {
logger.info("[" + orcidId + "] Parsing date: ", e.getMessage());
throw new RuntimeException("[" + orcidId + "] Parsing date: " + e.getMessage());
}
return modifiedDate.isAfter(lastUpdate); return modifiedDate.isAfter(lastUpdate);
} }

View File

@ -50,9 +50,7 @@ public class SparkGenLastModifiedSeq {
outputPath = parser.get("outputPath"); outputPath = parser.get("outputPath");
lambdaFileName = parser.get("lambdaFileName"); lambdaFileName = parser.get("lambdaFileName");
String lambdaFileUri = hdfsServerUri.concat(workingPath).concat(lambdaFileName); String lambdaFileUri = hdfsServerUri.concat(workingPath).concat(lambdaFileName);
String lastModifiedDateFromLambdaFileUri = hdfsServerUri String lastModifiedDateFromLambdaFileUri = "last_modified_date_from_lambda_file.txt";
.concat(workingPath)
.concat("last_modified_date_from_lambda_file.txt");
SparkConf sparkConf = new SparkConf(); SparkConf sparkConf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -101,7 +99,9 @@ public class SparkGenLastModifiedSeq {
} }
} }
} }
HDFSUtil.writeToTextFile(lastModifiedDateFromLambdaFileUri, lastModifiedAuthorDate); HDFSUtil
.writeToTextFile(
hdfsServerUri, workingPath, lastModifiedDateFromLambdaFileUri, lastModifiedAuthorDate);
Log.info("Saved rows from lamda csv tar file: " + rowsNum); Log.info("Saved rows from lamda csv tar file: " + rowsNum);
}); });
} }

View File

@ -50,7 +50,7 @@ public class SparkUpdateOrcidWorks {
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
// final String outputPath = parser.get("outputPath"); final String hdfsServerUri = parser.get("hdfsServerUri");
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -167,8 +167,8 @@ public class SparkUpdateOrcidWorks {
logger.info("errorParsingXMLWorksFoundAcc: " + errorParsingWorksXMLFoundAcc.value().toString()); logger.info("errorParsingXMLWorksFoundAcc: " + errorParsingWorksXMLFoundAcc.value().toString());
String lastModifiedDateFromLambdaFile = HDFSUtil String lastModifiedDateFromLambdaFile = HDFSUtil
.readFromTextFile(workingPath.concat("last_modified_date_from_lambda_file.txt")); .readFromTextFile(hdfsServerUri, workingPath, "last_modified_date_from_lambda_file.txt");
HDFSUtil.writeToTextFile(workingPath.concat("last_update.txt"), lastModifiedDateFromLambdaFile); HDFSUtil.writeToTextFile(hdfsServerUri, workingPath, "last_update.txt", lastModifiedDateFromLambdaFile);
logger.info("last_update file updated"); logger.info("last_update file updated");
}); });
} }

View File

@ -1,9 +1,8 @@
package eu.dnetlib.doiboost.orcid.util; package eu.dnetlib.doiboost.orcid.util;
import java.io.BufferedWriter; import java.io.*;
import java.io.IOException; import java.net.URI;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -12,27 +11,57 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.doiboost.orcid.SparkDownloadOrcidAuthors;
public class HDFSUtil { public class HDFSUtil {
public static String readFromTextFile(String path) throws IOException { static Logger logger = LoggerFactory.getLogger(HDFSUtil.class);
private static FileSystem getFileSystem(String hdfsServerUri) throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsServerUri);
FileSystem fileSystem = FileSystem.get(conf); FileSystem fileSystem = FileSystem.get(conf);
FSDataInputStream inputStream = new FSDataInputStream(fileSystem.open(new Path(path))); return fileSystem;
return IOUtils.toString(inputStream, StandardCharsets.UTF_8.name());
} }
public static void writeToTextFile(String pathValue, String text) throws IOException { public static String readFromTextFile(String hdfsServerUri, String workingPath, String path) throws IOException {
Configuration conf = new Configuration(); FileSystem fileSystem = getFileSystem(hdfsServerUri);
FileSystem fileSystem = FileSystem.get(conf); Path toReadPath = new Path(workingPath.concat(path));
Path path = new Path(pathValue); if (!fileSystem.exists(toReadPath)) {
if (fileSystem.exists(path)) { throw new RuntimeException("File not exist: " + path);
fileSystem.delete(path, true);
} }
FSDataOutputStream os = fileSystem.create(path); logger.info("Last_update_path " + toReadPath.toString());
FSDataInputStream inputStream = new FSDataInputStream(fileSystem.open(toReadPath));
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
StringBuffer sb = new StringBuffer();
try {
String line;
while ((line = br.readLine()) != null) {
sb.append(line);
}
} finally {
br.close();
}
String buffer = sb.toString();
logger.info("Last_update: " + buffer);
return buffer;
}
public static void writeToTextFile(String hdfsServerUri, String workingPath, String path, String text)
throws IOException {
FileSystem fileSystem = getFileSystem(hdfsServerUri);
Path toWritePath = new Path(workingPath.concat(path));
if (fileSystem.exists(toWritePath)) {
fileSystem.delete(toWritePath, true);
}
FSDataOutputStream os = fileSystem.create(toWritePath);
BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8"));
br.write(text); br.write(text);
br.close(); br.close();
fileSystem.close();
} }
} }

View File

@ -10,11 +10,7 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.TemporalUnit;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
@ -25,9 +21,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.HttpClients;
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mortbay.log.Log;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.orcid.AuthorData; import eu.dnetlib.dhp.schema.orcid.AuthorData;
@ -162,14 +156,17 @@ public class OrcidClientTest {
} }
@Test @Test
private void lambdaFileReaderTest() throws Exception { public void lambdaFileReaderTest() throws Exception {
String last_update = "2021-01-12 00:00:06.685137";
TarArchiveInputStream input = new TarArchiveInputStream( TarArchiveInputStream input = new TarArchiveInputStream(
new GzipCompressorInputStream(new FileInputStream("/develop/last_modified.csv.tar"))); new GzipCompressorInputStream(new FileInputStream("/tmp/last_modified.csv.tar")));
TarArchiveEntry entry = input.getNextTarEntry(); TarArchiveEntry entry = input.getNextTarEntry();
BufferedReader br = null; BufferedReader br = null;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
int rowNum = 0; int rowNum = 1;
int modifiedNum = 1;
int entryNum = 0; int entryNum = 0;
boolean firstNotModifiedFound = false;
while (entry != null) { while (entry != null) {
br = new BufferedReader(new InputStreamReader(input)); // Read directly from tarInput br = new BufferedReader(new InputStreamReader(input)); // Read directly from tarInput
String line; String line;
@ -177,18 +174,31 @@ public class OrcidClientTest {
String[] values = line.toString().split(","); String[] values = line.toString().split(",");
List<String> recordInfo = Arrays.asList(values); List<String> recordInfo = Arrays.asList(values);
assertTrue(recordInfo.size() == 4); assertTrue(recordInfo.size() == 4);
String orcid = recordInfo.get(0);
String modifiedDate = recordInfo.get(3);
rowNum++; rowNum++;
if (rowNum == 1) { if (rowNum == 2) {
assertTrue(recordInfo.get(3).equals("last_modified")); assertTrue(recordInfo.get(3).equals("last_modified"));
} else if (rowNum == 2) { } else {
assertTrue(recordInfo.get(0).equals("0000-0002-0499-7333")); // SparkDownloadOrcidAuthors.lastUpdate = last_update;
// boolean isModified = SparkDownloadOrcidAuthors.isModified(orcid, modifiedDate);
// if (isModified) {
// modifiedNum++;
// } else {
// if (!firstNotModifiedFound) {
// firstNotModifiedFound = true;
// logToFile(orcid + " - " + modifiedDate + " > " + isModified);
// }
// }
} }
} }
entryNum++; entryNum++;
assertTrue(entryNum == 1); assertTrue(entryNum == 1);
entry = input.getNextTarEntry(); entry = input.getNextTarEntry();
} }
logToFile("modifiedNum : " + modifiedNum + " / " + rowNum);
} }
public static void logToFile(String log) public static void logToFile(String log)
@ -304,7 +314,8 @@ public class OrcidClientTest {
} }
@Test @Test
public void testUpdatedRecord() throws Exception { @Ignore
private void testUpdatedRecord() throws Exception {
final String base64CompressedRecord = IOUtils final String base64CompressedRecord = IOUtils
.toString(getClass().getResourceAsStream("0000-0003-3028-6161.compressed.base64")); .toString(getClass().getResourceAsStream("0000-0003-3028-6161.compressed.base64"));
final String record = ArgumentApplicationParser.decompressValue(base64CompressedRecord); final String record = ArgumentApplicationParser.decompressValue(base64CompressedRecord);
@ -312,7 +323,8 @@ public class OrcidClientTest {
} }
@Test @Test
public void testUpdatedWork() throws Exception { @Ignore
private void testUpdatedWork() throws Exception {
final String base64CompressedWork = "H4sIAAAAAAAAAM1XS2/jNhC+51cQOuxJsiXZSR03Vmq0G6Bo013E6R56oyXaZiOJWpKy4y783zvUg5Ksh5uiCJogisX5Zjj85sHx3f1rFKI94YKyeGE4I9tAJPZZQOPtwvj9+cGaGUhIHAc4ZDFZGEcijHvv6u7A+MtcPVCSSgsUQObYzuzaccBEguVuYYxt+LHgbwKP6a11M3WnY6UzrpB7KuiahlQeF0aSrkPqGwhcisWcxpLwGIcLYydlMh+PD4fDiHGfBvDcjmMxLhGlBglSH8vsIH0qGlLqBFRIGvvDWjWQ1iMJJ2CKBANqGlNqMbkj3IpxRPq1KkypFZFoDRHa0aRfq8JoNjhnfIAJJS6xPouiIQJyeYmGQzE+cO5cXqITcItBlKyASExD0a93jiwtvJDjYXDDAqBPHoH2wMmVWGNf8xyyaEBiSTeUDHHWBpd2Nmmc10yfbgHQrHCyIRxKjQwRUoFKPRwEnIgBnQJQVdGeQgJaCRN0OMnPkaUFVbD9WkpaIndQJowf+8EFoIpTErJjBFQOBavElFpfUxwC9ZcqvQErdQXhe+oPFF8BaObupYzVsYEOARzSoZBWmKqaBMHcV0Wf8oG0beIqD+Gdkz0lhyE3NajUW6fhQFSV9Nw/MCBYyofYa0EN7wrBz13eP+Y+J6obWgE8Pdd2JpYD94P77Ezmjj13b0bu5PqPu3EXumEnxEJaEVxSUIHammsra+53z44zt2/m1/bItaeVtQ6dhs3c4XytvW75IYUchMKvEHVUyqmnWBFAS0VJrqSvQde6vp251ux2NtFuKcVOi+oK9YY0M0Cn6o4J6WkvtEK2XJ1vfPGAZxSoK8lb+SxJBbLQx1CohOLndjJUywQWUFmqEi3G6Zaqf/7buOyYJd5IYpfmf0XipfP18pDR9cQCeEuJQI/Lx36bFbVnpBeL2UwmqQw7ApAvf4GeGGQdEbENgolui/wdpjHaYCmPCIPPAmGBIsxfoLUhyRCB0SeCakEBJRKBtfJ+UBbI15TG4PaGBAhWthx8DmFYtHZQujv1CWbLLdzmmUKmHEOWCe1/zdu78bn/+YH+hCOqOzcXfFwuP6OVT/P710crwqGXFrpNaM2GT3MXarw01i15TIi3pmtJXgtbTVGf3h6HKfF+wBAnPyTfdCChudlm5gZaoG//F9pPZsGQcqqbyZN5hBau5OoIJ3PPwjTKDuG4s5MZp2rMzF5PZoK34IT6PIFOPrk+mTiVO5aJH2C+JJRjE/06eoRfpJxa4VgyYaLlaJUv/EhCfATMU/76gEOfmehL/qbJNNHjaFna+CQYB8wvo9PpPFJ5MOrJ1Ix7USBZqBl7KRNOx1d3jex7SG6zuijqCMWRusBsncjZSrM2u82UJmqzpGhvUJN2t6caIM9QQgO9c0t40UROnWsJd2Rbs+nsxpna9u30ttNkjechmzHjEST+X5CkkuNY0GzQkzyFseAf7lSZuLwdh1xSXKvvQJ4g4abTYgPV7uMt3rskohlJmMa82kQkshtyBEIYqQ+YB8X3oRHg7iFKi/bZP+Ao+T6BJhIT/vNPi8ffZs+flk+r2v0WNroZiyWn6xRmadHqTJXsjLJczElAZX6TnJdoWTM1SI2gfutv3rjeBt5t06rVvNuWup29246tlvluO+u2/G92bK9DXheL6uFd/Q3EaRDZqBIAAA=="; final String base64CompressedWork = "H4sIAAAAAAAAAM1XS2/jNhC+51cQOuxJsiXZSR03Vmq0G6Bo013E6R56oyXaZiOJWpKy4y783zvUg5Ksh5uiCJogisX5Zjj85sHx3f1rFKI94YKyeGE4I9tAJPZZQOPtwvj9+cGaGUhIHAc4ZDFZGEcijHvv6u7A+MtcPVCSSgsUQObYzuzaccBEguVuYYxt+LHgbwKP6a11M3WnY6UzrpB7KuiahlQeF0aSrkPqGwhcisWcxpLwGIcLYydlMh+PD4fDiHGfBvDcjmMxLhGlBglSH8vsIH0qGlLqBFRIGvvDWjWQ1iMJJ2CKBANqGlNqMbkj3IpxRPq1KkypFZFoDRHa0aRfq8JoNjhnfIAJJS6xPouiIQJyeYmGQzE+cO5cXqITcItBlKyASExD0a93jiwtvJDjYXDDAqBPHoH2wMmVWGNf8xyyaEBiSTeUDHHWBpd2Nmmc10yfbgHQrHCyIRxKjQwRUoFKPRwEnIgBnQJQVdGeQgJaCRN0OMnPkaUFVbD9WkpaIndQJowf+8EFoIpTErJjBFQOBavElFpfUxwC9ZcqvQErdQXhe+oPFF8BaObupYzVsYEOARzSoZBWmKqaBMHcV0Wf8oG0beIqD+Gdkz0lhyE3NajUW6fhQFSV9Nw/MCBYyofYa0EN7wrBz13eP+Y+J6obWgE8Pdd2JpYD94P77Ezmjj13b0bu5PqPu3EXumEnxEJaEVxSUIHammsra+53z44zt2/m1/bItaeVtQ6dhs3c4XytvW75IYUchMKvEHVUyqmnWBFAS0VJrqSvQde6vp251ux2NtFuKcVOi+oK9YY0M0Cn6o4J6WkvtEK2XJ1vfPGAZxSoK8lb+SxJBbLQx1CohOLndjJUywQWUFmqEi3G6Zaqf/7buOyYJd5IYpfmf0XipfP18pDR9cQCeEuJQI/Lx36bFbVnpBeL2UwmqQw7ApAvf4GeGGQdEbENgolui/wdpjHaYCmPCIPPAmGBIsxfoLUhyRCB0SeCakEBJRKBtfJ+UBbI15TG4PaGBAhWthx8DmFYtHZQujv1CWbLLdzmmUKmHEOWCe1/zdu78bn/+YH+hCOqOzcXfFwuP6OVT/P710crwqGXFrpNaM2GT3MXarw01i15TIi3pmtJXgtbTVGf3h6HKfF+wBAnPyTfdCChudlm5gZaoG//F9pPZsGQcqqbyZN5hBau5OoIJ3PPwjTKDuG4s5MZp2rMzF5PZoK34IT6PIFOPrk+mTiVO5aJH2C+JJRjE/06eoRfpJxa4VgyYaLlaJUv/EhCfATMU/76gEOfmehL/qbJNNHjaFna+CQYB8wvo9PpPFJ5MOrJ1Ix7USBZqBl7KRNOx1d3jex7SG6zuijqCMWRusBsncjZSrM2u82UJmqzpGhvUJN2t6caIM9QQgO9c0t40UROnWsJd2Rbs+nsxpna9u30ttNkjechmzHjEST+X5CkkuNY0GzQkzyFseAf7lSZuLwdh1xSXKvvQJ4g4abTYgPV7uMt3rskohlJmMa82kQkshtyBEIYqQ+YB8X3oRHg7iFKi/bZP+Ao+T6BJhIT/vNPi8ffZs+flk+r2v0WNroZiyWn6xRmadHqTJXsjLJczElAZX6TnJdoWTM1SI2gfutv3rjeBt5t06rVvNuWup29246tlvluO+u2/G92bK9DXheL6uFd/Q3EaRDZqBIAAA==";
final String work = ArgumentApplicationParser.decompressValue(base64CompressedWork); final String work = ArgumentApplicationParser.decompressValue(base64CompressedWork);
logToFile("\n\nwork updated \n\n" + work); logToFile("\n\nwork updated \n\n" + work);

View File

@ -90,22 +90,22 @@ public class XMLRecordParserTest {
assertNotNull(jsonData); assertNotNull(jsonData);
} }
@Test // @Test
private void testWorkIdLastModifiedDateXMLParser() throws Exception { // private void testWorkIdLastModifiedDateXMLParser() throws Exception {
String xml = IOUtils // String xml = IOUtils
.toString( // .toString(
this.getClass().getResourceAsStream("record_0000-0001-5004-5918.xml")); // this.getClass().getResourceAsStream("record_0000-0001-5004-5918.xml"));
Map<String, String> workIdLastModifiedDate = XMLRecordParser.retrieveWorkIdLastModifiedDate(xml.getBytes()); // Map<String, String> workIdLastModifiedDate = XMLRecordParser.retrieveWorkIdLastModifiedDate(xml.getBytes());
workIdLastModifiedDate.forEach((k, v) -> { // workIdLastModifiedDate.forEach((k, v) -> {
try { // try {
OrcidClientTest // OrcidClientTest
.logToFile( // .logToFile(
k + " " + v + " isModified after " + SparkDownloadOrcidWorks.lastUpdateValue + ": " // k + " " + v + " isModified after " + SparkDownloadOrcidWorks.lastUpdateValue + ": "
+ SparkDownloadOrcidWorks.isModified("0000-0001-5004-5918", v)); // + SparkDownloadOrcidWorks.isModified("0000-0001-5004-5918", v));
} catch (IOException e) { // } catch (IOException e) {
} // }
}); // });
} // }
@Test @Test
public void testAuthorSummaryXMLParser() throws Exception { public void testAuthorSummaryXMLParser() throws Exception {