updated pom version

This commit is contained in:
Sandro La Bruzzo 2020-05-11 14:35:14 +02:00
parent b90609848b
commit 0c6774e4da
3 changed files with 150 additions and 145 deletions

View File

@ -4,7 +4,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.1.7-SNAPSHOT</version> <version>1.2.1-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -1,6 +1,6 @@
package eu.dnetlib.doiboost.orcid; package eu.dnetlib.doiboost.orcid;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
@ -9,6 +9,7 @@ import java.text.SimpleDateFormat;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -22,158 +23,162 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.HttpClients;
import org.mortbay.log.Log; import org.mortbay.log.Log;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class OrcidDownloader extends OrcidDSManager { public class OrcidDownloader extends OrcidDSManager {
static final int REQ_LIMIT = 24; static final int REQ_LIMIT = 24;
static final int REQ_MAX_TEST = 100; static final int REQ_MAX_TEST = 100;
static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 50000; static final int RECORD_PARSED_COUNTER_LOG_INTERVAL = 50000;
static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss."; static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.";
static final String lastUpdate = "2019-09-30 00:00:00.000000"; static final String lastUpdate = "2019-09-30 00:00:00.000000";
private String lambdaFileName; private String lambdaFileName;
private String outputPath; private String outputPath;
private String token; private String token;
public static void main(String[] args) throws IOException, Exception { public static void main(String[] args) throws IOException, Exception {
OrcidDownloader orcidDownloader = new OrcidDownloader(); OrcidDownloader orcidDownloader = new OrcidDownloader();
orcidDownloader.loadArgs(args); orcidDownloader.loadArgs(args);
orcidDownloader.parseLambdaFile(); orcidDownloader.parseLambdaFile();
} }
private String downloadRecord(String orcidId) throws Exception { private String downloadRecord(String orcidId) throws Exception {
try (CloseableHttpClient client = HttpClients.createDefault()) { try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record");
httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Accept", "application/vnd.orcid+xml");
httpGet.addHeader("Authorization", String.format("Bearer %s", token)); httpGet.addHeader("Authorization", String.format("Bearer %s", token));
CloseableHttpResponse response = client.execute(httpGet); CloseableHttpResponse response = client.execute(httpGet);
if (response.getStatusLine().getStatusCode() != 200) { if (response.getStatusLine().getStatusCode() != 200) {
Log.warn( Log
"Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); .warn(
return new String(""); "Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode());
} return new String("");
return IOUtils.toString(response.getEntity().getContent()); }
return IOUtils.toString(response.getEntity().getContent());
} catch (Throwable e) { } catch (Throwable e) {
Log.warn("Downloading " + orcidId, e); Log.warn("Downloading " + orcidId, e);
} }
return new String(""); return new String("");
} }
public void parseLambdaFile() throws Exception { public void parseLambdaFile() throws Exception {
int parsedRecordsCounter = 0; int parsedRecordsCounter = 0;
int downloadedRecordsCounter = 0; int downloadedRecordsCounter = 0;
int savedRecordsCounter = 0; int savedRecordsCounter = 0;
long startDownload = 0; long startDownload = 0;
Configuration conf = initConfigurationObject(); Configuration conf = initConfigurationObject();
FileSystem fs = initFileSystemObject(conf); FileSystem fs = initFileSystemObject(conf);
String lambdaFileUri = hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(lambdaFileName); String lambdaFileUri = hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(lambdaFileName);
Path hdfsreadpath = new Path(lambdaFileUri); Path hdfsreadpath = new Path(lambdaFileUri);
FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath); FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath);
Path hdfsoutputPath = Path hdfsoutputPath = new Path(
new Path( hdfsServerUri
hdfsServerUri .concat(hdfsOrcidDefaultPath)
.concat(hdfsOrcidDefaultPath) .concat(outputPath)
.concat(outputPath) .concat("orcid_records.seq"));
.concat("orcid_records.seq"));
try (SequenceFile.Writer writer = try (SequenceFile.Writer writer = SequenceFile
SequenceFile.createWriter( .createWriter(
conf, conf,
SequenceFile.Writer.file(hdfsoutputPath), SequenceFile.Writer.file(hdfsoutputPath),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class))) { SequenceFile.Writer.valueClass(Text.class))) {
try (BufferedReader br = new BufferedReader(new InputStreamReader(lambdaFileStream))) { try (BufferedReader br = new BufferedReader(new InputStreamReader(lambdaFileStream))) {
String line; String line;
int nReqTmp = 0; int nReqTmp = 0;
startDownload = System.currentTimeMillis(); startDownload = System.currentTimeMillis();
long startReqTmp = System.currentTimeMillis(); long startReqTmp = System.currentTimeMillis();
while ((line = br.readLine()) != null) { while ((line = br.readLine()) != null) {
parsedRecordsCounter++; parsedRecordsCounter++;
// skip headers line // skip headers line
if (parsedRecordsCounter == 1) { if (parsedRecordsCounter == 1) {
continue; continue;
} }
String[] values = line.split(","); String[] values = line.split(",");
List<String> recordInfo = Arrays.asList(values); List<String> recordInfo = Arrays.asList(values);
if (isModified(recordInfo.get(3))) { if (isModified(recordInfo.get(3))) {
String record = downloadRecord(recordInfo.get(0)); String record = downloadRecord(recordInfo.get(0));
downloadedRecordsCounter++; downloadedRecordsCounter++;
if (!record.isEmpty()) { if (!record.isEmpty()) {
String compressRecord = ArgumentApplicationParser.compressArgument(record); String compressRecord = ArgumentApplicationParser.compressArgument(record);
final Text key = new Text(recordInfo.get(0)); final Text key = new Text(recordInfo.get(0));
final Text value = new Text(compressRecord); final Text value = new Text(compressRecord);
try { try {
writer.append(key, value); writer.append(key, value);
savedRecordsCounter++; savedRecordsCounter++;
} catch (IOException e) { } catch (IOException e) {
Log.debug("Writing to sequence file: " + e.getMessage()); Log.debug("Writing to sequence file: " + e.getMessage());
Log.debug(e); Log.debug(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
} }
long endReq = System.currentTimeMillis(); long endReq = System.currentTimeMillis();
nReqTmp++; nReqTmp++;
if (nReqTmp == REQ_LIMIT) { if (nReqTmp == REQ_LIMIT) {
long reqSessionDuration = endReq - startReqTmp; long reqSessionDuration = endReq - startReqTmp;
if (reqSessionDuration <= 1000) { if (reqSessionDuration <= 1000) {
Log.warn( Log
"\nreqSessionDuration: " .warn(
+ reqSessionDuration "\nreqSessionDuration: "
+ " nReqTmp: " + reqSessionDuration
+ nReqTmp + " nReqTmp: "
+ " wait ...."); + nReqTmp
Thread.sleep(1000 - reqSessionDuration); + " wait ....");
} else { Thread.sleep(1000 - reqSessionDuration);
nReqTmp = 0; } else {
startReqTmp = System.currentTimeMillis(); nReqTmp = 0;
} startReqTmp = System.currentTimeMillis();
} }
}
// if (parsedRecordsCounter>REQ_MAX_TEST) { // if (parsedRecordsCounter>REQ_MAX_TEST) {
// break; // break;
// } // }
if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) { if ((parsedRecordsCounter % RECORD_PARSED_COUNTER_LOG_INTERVAL) == 0) {
Log.info("Current record parsed: " + parsedRecordsCounter); Log.info("Current record parsed: " + parsedRecordsCounter);
Log.info("Current record downloaded: " + downloadedRecordsCounter); Log.info("Current record downloaded: " + downloadedRecordsCounter);
Log.info("Current record saved: " + savedRecordsCounter); Log.info("Current record saved: " + savedRecordsCounter);
} }
} }
long endDownload = System.currentTimeMillis(); long endDownload = System.currentTimeMillis();
long downloadTime = endDownload - startDownload; long downloadTime = endDownload - startDownload;
Log.info("Download time: " + ((downloadTime / 1000) / 60) + " minutes"); Log.info("Download time: " + ((downloadTime / 1000) / 60) + " minutes");
} }
} }
lambdaFileStream.close(); lambdaFileStream.close();
Log.info("Download started at: " + new Date(startDownload).toString()); Log.info("Download started at: " + new Date(startDownload).toString());
Log.info("Parsed Records Counter: " + parsedRecordsCounter); Log.info("Parsed Records Counter: " + parsedRecordsCounter);
Log.info("Downloaded Records Counter: " + downloadedRecordsCounter); Log.info("Downloaded Records Counter: " + downloadedRecordsCounter);
Log.info("Saved Records Counter: " + savedRecordsCounter); Log.info("Saved Records Counter: " + savedRecordsCounter);
} }
private void loadArgs(String[] args) throws IOException, Exception { private void loadArgs(String[] args) throws IOException, Exception {
final ArgumentApplicationParser parser = final ArgumentApplicationParser parser = new ArgumentApplicationParser(
new ArgumentApplicationParser( IOUtils
IOUtils.toString( .toString(
OrcidDownloader.class.getResourceAsStream( OrcidDownloader.class
"/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); .getResourceAsStream(
parser.parseArgument(args); "/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
parser.parseArgument(args);
hdfsServerUri = parser.get("hdfsServerUri"); hdfsServerUri = parser.get("hdfsServerUri");
Log.info("HDFS URI: " + hdfsServerUri); Log.info("HDFS URI: " + hdfsServerUri);
hdfsOrcidDefaultPath = parser.get("hdfsOrcidDefaultPath"); hdfsOrcidDefaultPath = parser.get("hdfsOrcidDefaultPath");
Log.info("Default Path: " + hdfsOrcidDefaultPath); Log.info("Default Path: " + hdfsOrcidDefaultPath);
lambdaFileName = parser.get("lambdaFileName"); lambdaFileName = parser.get("lambdaFileName");
Log.info("Lambda File Name: " + lambdaFileName); Log.info("Lambda File Name: " + lambdaFileName);
outputPath = parser.get("outputPath"); outputPath = parser.get("outputPath");
Log.info("Output Data: " + outputPath); Log.info("Output Data: " + outputPath);
token = parser.get("token"); token = parser.get("token");
} }
private boolean isModified(String modifiedDate) throws ParseException { private boolean isModified(String modifiedDate) throws ParseException {
Date modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); Date modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate);
Date lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); Date lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate);
return modifiedDateDt.after(lastUpdateDt); return modifiedDateDt.after(lastUpdateDt);
} }
} }

View File

@ -16,7 +16,7 @@ class MAGMappingTest {
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
@Test //@Test
def testMAGCSV(): Unit = { def testMAGCSV(): Unit = {
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
@ -31,7 +31,7 @@ class MAGMappingTest {
import spark.implicits._ import spark.implicits._
val d: Dataset[Papers] = spark.read.load("/data/doiboost/mag/datasets/Papers").as[Papers] val d: Dataset[Papers] = spark.read.load("/data/doiboost/mag/datasets/Papers").as[Papers]
logger.info(s"Total number of element: ${d.where(col("Doi").isNotNull).count()}") logger.info(s"Total number of element: ${d.where(col("Doi").isNotNull).count()}")
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Papers] //implicit val mapEncoder = org.apache.spark.sql.Encoders.bean[Papers]
val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey {case (p1:Papers, p2:Papers) => val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey {case (p1:Papers, p2:Papers) =>
var r = if (p1==null) p2 else p1 var r = if (p1==null) p2 else p1
if (p1!=null && p2!=null ) if (p1.CreatedDate.before(p2.CreatedDate)) if (p1!=null && p2!=null ) if (p1.CreatedDate.before(p2.CreatedDate))