1
0
Fork 0

different date format in lambda file parsing

This commit is contained in:
Enrico Ottonello 2020-05-11 14:41:11 +02:00
parent b90609848b
commit 7990894454
1 changed files with 164 additions and 143 deletions

View File

@ -1,14 +1,14 @@
package eu.dnetlib.doiboost.orcid; package eu.dnetlib.doiboost.orcid;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -22,13 +22,15 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.HttpClients;
import org.mortbay.log.Log; import org.mortbay.log.Log;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class OrcidDownloader extends OrcidDSManager { public class OrcidDownloader extends OrcidDSManager {
static final int REQ_LIMIT = 24; static final int REQ_LIMIT = 24;
static final int REQ_MAX_TEST = 100; static final int REQ_MAX_TEST = 100;
static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 50000; static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 10;
static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss."; static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
static final String lastUpdate = "2019-09-30 00:00:00.000000"; static final String lastUpdate = "2019-09-30 00:00:00";
private String lambdaFileName; private String lambdaFileName;
private String outputPath; private String outputPath;
private String token; private String token;
@ -39,21 +41,22 @@ public class OrcidDownloader extends OrcidDSManager {
orcidDownloader.parseLambdaFile(); orcidDownloader.parseLambdaFile();
} }
private String downloadRecord(String orcidId) throws Exception { private String downloadRecord(String orcidId) {
try (CloseableHttpClient client = HttpClients.createDefault()) { try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record");
httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Accept", "application/vnd.orcid+xml");
httpGet.addHeader("Authorization", String.format("Bearer %s", token)); httpGet.addHeader("Authorization", String.format("Bearer %s", token));
CloseableHttpResponse response = client.execute(httpGet); CloseableHttpResponse response = client.execute(httpGet);
if (response.getStatusLine().getStatusCode() != 200) { if (response.getStatusLine().getStatusCode() != 200) {
Log.warn( Log
.warn(
"Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); "Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode());
return new String(""); return new String("");
} }
return IOUtils.toString(response.getEntity().getContent()); return IOUtils.toString(response.getEntity().getContent());
} catch (Throwable e) { } catch (Throwable e) {
Log.warn("Downloading " + orcidId, e); Log.warn("Downloading " + orcidId, e.getMessage());
} }
return new String(""); return new String("");
} }
@ -68,15 +71,14 @@ public class OrcidDownloader extends OrcidDSManager {
String lambdaFileUri = hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(lambdaFileName); String lambdaFileUri = hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(lambdaFileName);
Path hdfsreadpath = new Path(lambdaFileUri); Path hdfsreadpath = new Path(lambdaFileUri);
FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath); FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath);
Path hdfsoutputPath = Path hdfsoutputPath = new Path(
new Path(
hdfsServerUri hdfsServerUri
.concat(hdfsOrcidDefaultPath) .concat(hdfsOrcidDefaultPath)
.concat(outputPath) .concat(outputPath)
.concat("orcid_records.seq")); .concat("orcid_records.seq"));
try (SequenceFile.Writer writer = try (SequenceFile.Writer writer = SequenceFile
SequenceFile.createWriter( .createWriter(
conf, conf,
SequenceFile.Writer.file(hdfsoutputPath), SequenceFile.Writer.file(hdfsoutputPath),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.keyClass(Text.class),
@ -95,8 +97,9 @@ public class OrcidDownloader extends OrcidDSManager {
} }
String[] values = line.split(","); String[] values = line.split(",");
List<String> recordInfo = Arrays.asList(values); List<String> recordInfo = Arrays.asList(values);
if (isModified(recordInfo.get(3))) { String orcidId = recordInfo.get(0);
String record = downloadRecord(recordInfo.get(0)); if (isModified(orcidId, recordInfo.get(3))) {
String record = downloadRecord(orcidId);
downloadedRecordsCounter++; downloadedRecordsCounter++;
if (!record.isEmpty()) { if (!record.isEmpty()) {
String compressRecord = ArgumentApplicationParser.compressArgument(record); String compressRecord = ArgumentApplicationParser.compressArgument(record);
@ -107,8 +110,8 @@ public class OrcidDownloader extends OrcidDSManager {
writer.append(key, value); writer.append(key, value);
savedRecordsCounter++; savedRecordsCounter++;
} catch (IOException e) { } catch (IOException e) {
Log.debug("Writing to sequence file: " + e.getMessage()); Log.warn("Writing to sequence file: " + e.getMessage());
Log.debug(e); Log.warn(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@ -118,7 +121,8 @@ public class OrcidDownloader extends OrcidDSManager {
if (nReqTmp == REQ_LIMIT) { if (nReqTmp == REQ_LIMIT) {
long reqSessionDuration = endReq - startReqTmp; long reqSessionDuration = endReq - startReqTmp;
if (reqSessionDuration <= 1000) { if (reqSessionDuration <= 1000) {
Log.warn( Log
.warn(
"\nreqSessionDuration: " "\nreqSessionDuration: "
+ reqSessionDuration + reqSessionDuration
+ " nReqTmp: " + " nReqTmp: "
@ -131,13 +135,18 @@ public class OrcidDownloader extends OrcidDSManager {
} }
} }
// if (parsedRecordsCounter>REQ_MAX_TEST) { if (parsedRecordsCounter > REQ_MAX_TEST) {
// break; break;
// } }
if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) { if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) {
Log.info("Current record parsed: " + parsedRecordsCounter); Log
Log.info("Current record downloaded: " + downloadedRecordsCounter); .info(
Log.info("Current record saved: " + savedRecordsCounter); "Current parsed: "
+ parsedRecordsCounter
+ " downloaded: "
+ downloadedRecordsCounter
+ " saved: "
+ savedRecordsCounter);
} }
} }
long endDownload = System.currentTimeMillis(); long endDownload = System.currentTimeMillis();
@ -153,10 +162,11 @@ public class OrcidDownloader extends OrcidDSManager {
} }
private void loadArgs(String[] args) throws IOException, Exception { private void loadArgs(String[] args) throws IOException, Exception {
final ArgumentApplicationParser parser = final ArgumentApplicationParser parser = new ArgumentApplicationParser(
new ArgumentApplicationParser( IOUtils
IOUtils.toString( .toString(
OrcidDownloader.class.getResourceAsStream( OrcidDownloader.class
.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); "/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
parser.parseArgument(args); parser.parseArgument(args);
@ -171,9 +181,20 @@ public class OrcidDownloader extends OrcidDSManager {
token = parser.get("token"); token = parser.get("token");
} }
private boolean isModified(String modifiedDate) throws ParseException { private boolean isModified(String orcidId, String modifiedDate) {
Date modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); Date modifiedDateDt = null;
Date lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); Date lastUpdateDt = null;
try {
if (modifiedDate.length() != 19) {
modifiedDate = modifiedDate.substring(0, 19);
}
modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate);
lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate);
} catch (Exception e) {
Log.warn("[" + orcidId + "] Parsing date: ", e.getMessage());
return true;
}
return modifiedDateDt.after(lastUpdateDt); return modifiedDateDt.after(lastUpdateDt);
} }
} }