From 1b1e9ea67ccef6e7f04184664a2aed707ef4b416 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Wed, 2 Dec 2020 23:20:16 +0100 Subject: [PATCH] wf to generate doi_author_list for doiboost; wf to download updated works --- .../orcid/SparkDownloadOrcidWorks.java | 224 ++++++++++++++++++ .../orcid/SparkGenerateDoiAuthorList.java | 39 ++- .../doiboost/orcid/xml/XMLRecordParser.java | 60 ++++- .../SparkGenEnrichedOrcidWorks.java | 2 +- .../doiboost/orcidnodoi/json/JsonWriter.java | 4 + .../orcidnodoi/oaf/PublicationToOaf.java | 29 +-- .../gen_doi_author_list_orcid_parameters.json | 2 + .../oozie_app/config-default.xml | 18 -- .../oozie_app/workflow.xml | 148 ++++++++---- .../doiboost/orcid/OrcidClientTest.java | 100 ++++++-- .../orcid/xml/XMLRecordParserTest.java | 18 +- 11 files changed, 523 insertions(+), 121 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java new file mode 100644 index 000000000..ce111570a --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkDownloadOrcidWorks.java @@ -0,0 +1,224 @@ + +package eu.dnetlib.doiboost.orcid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.*; + +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.util.LongAccumulator; +import org.mortbay.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import com.ximpleware.NavException; +import com.ximpleware.ParseException; +import com.ximpleware.XPathEvalException; +import com.ximpleware.XPathParseException; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData; +import eu.dnetlib.doiboost.orcid.model.WorkData; +import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser; +import scala.Tuple2; + +public class SparkDownloadOrcidWorks { + + static Logger logger = LoggerFactory.getLogger(SparkDownloadOrcidWorks.class); + static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + static final String lastUpdate = "2020-09-29 00:00:00"; + + public static void main(String[] args) throws IOException, Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkDownloadOrcidWorks.class + .getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + logger.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String workingPath = parser.get("workingPath"); + logger.info("workingPath: ", workingPath); +// final String outputPath = parser.get("outputPath"); + final String outputPath = "downloads/updated_works"; + logger.info("outputPath: ", outputPath); + final String token = parser.get("token"); +// final String lambdaFileName = parser.get("lambdaFileName"); +// logger.info("lambdaFileName: ", lambdaFileName); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + LongAccumulator parsedRecordsAcc = spark.sparkContext().longAccumulator("parsed_records"); + LongAccumulator modifiedRecordsAcc = spark.sparkContext().longAccumulator("to_download_records"); + LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloaded_records"); + LongAccumulator errorHTTP403Acc = spark.sparkContext().longAccumulator("error_HTTP_403"); + LongAccumulator errorHTTP409Acc = spark.sparkContext().longAccumulator("error_HTTP_409"); + LongAccumulator errorHTTP503Acc = spark.sparkContext().longAccumulator("error_HTTP_503"); + LongAccumulator errorHTTP525Acc = spark.sparkContext().longAccumulator("error_HTTP_525"); + LongAccumulator errorHTTPGenericAcc = spark.sparkContext().longAccumulator("error_HTTP_Generic"); + + logger.info("Retrieving updated authors"); + JavaPairRDD updatedAuthorsRDD = sc + .sequenceFile(workingPath + "downloads/updated_authors/*", Text.class, Text.class); + logger.info("Updated authors retrieved: " + updatedAuthorsRDD.count()); + + Function, Iterator> retrieveWorkUrlFunction = data -> { + String orcidId = data._1().toString(); + String jsonData = data._2().toString(); + List orcidIdWorkId = Lists.newArrayList(); + Map workIdLastModifiedDate = retrieveWorkIdLastModifiedDate(jsonData); + workIdLastModifiedDate.forEach((k, v) -> { + if (isModified(orcidId, v)) { + orcidIdWorkId.add(orcidId.concat("/work/").concat(k)); + } + }); + Iterator iterator = orcidIdWorkId.iterator(); + return iterator; + }; + + List> toDownloadWorksRDD = updatedAuthorsRDD + .map(retrieveWorkUrlFunction) + .take(1000); + sc.parallelize(toDownloadWorksRDD).saveAsTextFile(workingPath.concat("downloads/updated_works_test/")); + + Function, Tuple2> downloadRecordFunction = data -> { + String orcidId = data._1().toString(); + String lastModifiedDate = data._2().toString(); + final DownloadedRecordData downloaded = new DownloadedRecordData(); + downloaded.setOrcidId(orcidId); + downloaded.setLastModifiedDate(lastModifiedDate); + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/work"); + httpGet.addHeader("Accept", "application/vnd.orcid+xml"); + httpGet.addHeader("Authorization", String.format("Bearer %s", token)); + long startReq = System.currentTimeMillis(); + CloseableHttpResponse response = client.execute(httpGet); + long endReq = System.currentTimeMillis(); + long reqTime = endReq - startReq; + if (reqTime < 1000) { + Thread.sleep(1000 - reqTime); + } + int statusCode = response.getStatusLine().getStatusCode(); + downloaded.setStatusCode(statusCode); + if (statusCode != 200) { + switch (statusCode) { + case 403: + errorHTTP403Acc.add(1); + case 409: + errorHTTP409Acc.add(1); + case 503: + errorHTTP503Acc.add(1); + throw new RuntimeException("Orcid request rate limit reached (HTTP 503)"); + case 525: + errorHTTP525Acc.add(1); + default: + errorHTTPGenericAcc.add(1); + logger + .info( + "Downloading " + orcidId + " status code: " + + response.getStatusLine().getStatusCode()); + } + return downloaded.toTuple2(); + } + downloadedRecordsAcc.add(1); + downloaded + .setCompressedData( + ArgumentApplicationParser + .compressArgument(IOUtils.toString(response.getEntity().getContent()))); + } catch (Throwable e) { + logger.info("Downloading " + orcidId, e.getMessage()); + downloaded.setErrorMessage(e.getMessage()); + return downloaded.toTuple2(); + } + return downloaded.toTuple2(); + }; + +// sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true"); + +// logger.info("Start downloading ..."); +// updatedAuthorsRDD +// .map(downloadRecordFunction) +// .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) +// .saveAsNewAPIHadoopFile( +// workingPath.concat(outputPath), +// Text.class, +// Text.class, +// SequenceFileOutputFormat.class, +// sc.hadoopConfiguration()); +// logger.info("parsedRecordsAcc: " + parsedRecordsAcc.value().toString()); +// logger.info("modifiedRecordsAcc: " + modifiedRecordsAcc.value().toString()); +// logger.info("downloadedRecordsAcc: " + downloadedRecordsAcc.value().toString()); +// logger.info("errorHTTP403Acc: " + errorHTTP403Acc.value().toString()); +// logger.info("errorHTTP409Acc: " + errorHTTP409Acc.value().toString()); +// logger.info("errorHTTP503Acc: " + errorHTTP503Acc.value().toString()); +// logger.info("errorHTTP525Acc: " + errorHTTP525Acc.value().toString()); +// logger.info("errorHTTPGenericAcc: " + errorHTTPGenericAcc.value().toString()); + }); + + } + + private static boolean isModified(String orcidId, String modifiedDate) { + Date modifiedDateDt = null; + Date lastUpdateDt = null; + try { + if (modifiedDate.length() != 19) { + modifiedDate = modifiedDate.substring(0, 19); + } + modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate); + lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate); + } catch (Exception e) { + logger.info("[" + orcidId + "] Parsing date: ", e.getMessage()); + return true; + } + return modifiedDateDt.after(lastUpdateDt); + } + + private static Map retrieveWorkIdLastModifiedDate(String json) + throws XPathEvalException, NavException, XPathParseException, ParseException { + JsonElement jElement = new JsonParser().parse(json); + String statusCode = getJsonValue(jElement, "statusCode"); + if (statusCode.equals("200")) { + String compressedData = getJsonValue(jElement, "compressedData"); + String authorSummary = ArgumentApplicationParser.decompressValue(compressedData); + return XMLRecordParser.retrieveWorkIdLastModifiedDate(authorSummary.getBytes()); + } + return new HashMap<>(); + } + + private static String getJsonValue(JsonElement jElement, String property) { + if (jElement.getAsJsonObject().has(property)) { + JsonElement name = null; + name = jElement.getAsJsonObject().get(property); + if (name != null && !name.isJsonNull()) { + return name.getAsString(); + } + } + return new String(""); + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java index 011c153ec..4201ffb07 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java @@ -3,11 +3,11 @@ package eu.dnetlib.doiboost.orcid; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.BufferedReader; import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Optional; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -19,6 +19,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.slf4j.Logger; @@ -28,10 +29,14 @@ import com.esotericsoftware.minlog.Log; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import com.ximpleware.ParseException; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.parser.utility.VtdException; import eu.dnetlib.dhp.schema.orcid.AuthorData; import eu.dnetlib.doiboost.orcid.model.WorkData; +import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser; +import eu.dnetlib.doiboost.orcidnodoi.json.JsonWriter; import scala.Tuple2; public class SparkGenerateDoiAuthorList { @@ -56,6 +61,10 @@ public class SparkGenerateDoiAuthorList { logger.info("workingPath: ", workingPath); final String outputDoiAuthorListPath = parser.get("outputDoiAuthorListPath"); logger.info("outputDoiAuthorListPath: ", outputDoiAuthorListPath); + final String authorsPath = parser.get("authorsPath"); + logger.info("authorsPath: ", authorsPath); + final String xmlWorksPath = parser.get("xmlWorksPath"); + logger.info("xmlWorksPath: ", xmlWorksPath); SparkConf conf = new SparkConf(); runWithSparkSession( @@ -65,17 +74,21 @@ public class SparkGenerateDoiAuthorList { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaPairRDD summariesRDD = sc - .sequenceFile(workingPath + "../orcid_summaries/output/authors.seq", Text.class, Text.class); + .sequenceFile(workingPath.concat(authorsPath), Text.class, Text.class); Dataset summariesDataset = spark .createDataset( summariesRDD.map(seq -> loadAuthorFromJson(seq._1(), seq._2())).rdd(), Encoders.bean(AuthorData.class)); - JavaPairRDD activitiesRDD = sc - .sequenceFile(workingPath + "/output/*.seq", Text.class, Text.class); + JavaPairRDD xmlWorksRDD = sc + .sequenceFile(workingPath.concat(xmlWorksPath), Text.class, Text.class); + Dataset activitiesDataset = spark .createDataset( - activitiesRDD.map(seq -> loadWorkFromJson(seq._1(), seq._2())).rdd(), + xmlWorksRDD + .map(seq -> XMLRecordParser.VTDParseWorkData(seq._2().toString().getBytes())) + .filter(work -> work != null && work.getErrorCode() == null && work.isDoiFound()) + .rdd(), Encoders.bean(WorkData.class)); Function, Tuple2>> toAuthorListFunction = data -> { @@ -135,12 +148,16 @@ public class SparkGenerateDoiAuthorList { } return null; }) + .mapToPair(s -> { + List authorList = s._2(); + Set oidsAlreadySeen = new HashSet<>(); + authorList.removeIf(a -> !oidsAlreadySeen.add(a.getOid())); + return new Tuple2<>(s._1(), authorList); + }) .mapToPair( s -> { - ObjectMapper mapper = new ObjectMapper(); - return new Tuple2<>(s._1(), mapper.writeValueAsString(s._2())); + return new Tuple2<>(s._1(), JsonWriter.create(s._2())); }) - .repartition(10) .saveAsTextFile(workingPath + outputDoiAuthorListPath); }); diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParser.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParser.java index cc9abb621..5accb561d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParser.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParser.java @@ -1,23 +1,17 @@ package eu.dnetlib.doiboost.orcid.xml; -import java.util.Arrays; -import java.util.List; +import java.util.*; import org.mortbay.log.Log; -import com.ximpleware.AutoPilot; -import com.ximpleware.EOFException; -import com.ximpleware.EncodingException; -import com.ximpleware.EntityException; -import com.ximpleware.ParseException; -import com.ximpleware.VTDGen; -import com.ximpleware.VTDNav; +import com.ximpleware.*; import eu.dnetlib.dhp.parser.utility.VtdException; import eu.dnetlib.dhp.parser.utility.VtdUtilityParser; import eu.dnetlib.dhp.schema.orcid.AuthorData; import eu.dnetlib.doiboost.orcid.model.WorkData; +import eu.dnetlib.doiboost.orcidnodoi.model.Contributor; public class XMLRecordParser { @@ -32,7 +26,8 @@ public class XMLRecordParser { private static final String NS_RECORD_URL = "http://www.orcid.org/ns/record"; private static final String NS_RECORD = "record"; private static final String NS_ERROR_URL = "http://www.orcid.org/ns/error"; - + private static final String NS_ACTIVITIES = "activities"; + private static final String NS_ACTIVITIES_URL = "http://www.orcid.org/ns/activities"; private static final String NS_WORK = "work"; private static final String NS_WORK_URL = "http://www.orcid.org/ns/work"; @@ -139,6 +134,12 @@ public class XMLRecordParser { return retrieveOrcidId(bytes, defaultValue, NS_WORK, NS_WORK_URL, "//work:work", "put-code"); } + public static String retrieveWorkIdFromSummary(byte[] bytes, String defaultValue) + throws VtdException, ParseException { + return retrieveOrcidId( + bytes, defaultValue, NS_ACTIVITIES, NS_ACTIVITIES_URL, "//work:work-summary", "put-code"); + } + private static String retrieveOrcidId(byte[] bytes, String defaultValue, String ns, String nsUrl, String xpath, String idAttributeName) throws VtdException, ParseException { @@ -148,6 +149,7 @@ public class XMLRecordParser { final VTDNav vn = vg.getNav(); final AutoPilot ap = new AutoPilot(vn); ap.declareXPathNameSpace(ns, nsUrl); + ap.declareXPathNameSpace(NS_WORK, NS_WORK_URL); List recordNodes = VtdUtilityParser .getTextValuesWithAttributes( ap, vn, xpath, Arrays.asList(idAttributeName)); @@ -157,4 +159,42 @@ public class XMLRecordParser { Log.info("id not found - default: " + defaultValue); return defaultValue; } + + public static Map retrieveWorkIdLastModifiedDate(byte[] bytes) + throws ParseException, XPathParseException, NavException, XPathEvalException { + final VTDGen vg = new VTDGen(); + vg.setDoc(bytes); + vg.parse(true); + final VTDNav vn = vg.getNav(); + final AutoPilot ap = new AutoPilot(vn); + ap.declareXPathNameSpace(NS_COMMON, NS_COMMON_URL); + ap.declareXPathNameSpace(NS_PERSON, NS_PERSON_URL); + ap.declareXPathNameSpace(NS_DETAILS, NS_DETAILS_URL); + ap.declareXPathNameSpace(NS_OTHER, NS_OTHER_URL); + ap.declareXPathNameSpace(NS_RECORD, NS_RECORD_URL); + ap.declareXPathNameSpace(NS_ERROR, NS_ERROR_URL); + ap.declareXPathNameSpace(NS_WORK, NS_WORK_URL); + ap.declareXPathNameSpace(NS_ACTIVITIES, NS_ACTIVITIES_URL); + Map workIdLastModifiedDate = new HashMap<>(); + ap.selectXPath("//work:work-summary"); + + while (ap.evalXPath() != -1) { + String workId = ""; + String lastModifiedDate = ""; + int attr = vn.getAttrVal("put-code"); + if (attr > -1) { + workId = vn.toNormalizedString(attr); + workIdLastModifiedDate.put(workId, ""); + } + if (vn.toElement(VTDNav.FIRST_CHILD, "common:last-modified-date")) { + int val = vn.getText(); + if (val != -1) { + lastModifiedDate = vn.toNormalizedString(val); + workIdLastModifiedDate.put(workId, lastModifiedDate); + } + vn.toElement(VTDNav.PARENT); + } + } + return workIdLastModifiedDate; + } } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java index a92d534d8..2d26adce6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java @@ -37,7 +37,7 @@ import eu.dnetlib.doiboost.orcidnodoi.similarity.AuthorMatcher; import scala.Tuple2; /** - * This spark job generates one parquet file, containing orcid publications dataset + * This spark job generates orcid publications no doi dataset */ public class SparkGenEnrichedOrcidWorks { diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/json/JsonWriter.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/json/JsonWriter.java index 982fb6316..a89bbc279 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/json/JsonWriter.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/json/JsonWriter.java @@ -22,6 +22,10 @@ public class JsonWriter { return OBJECT_MAPPER.writeValueAsString(authorData); } + public static String create(Object obj) throws JsonProcessingException { + return OBJECT_MAPPER.writeValueAsString(obj); + } + public static String create(WorkData workData) { JsonObject work = new JsonObject(); work.addProperty("oid", workData.getOid()); diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/oaf/PublicationToOaf.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/oaf/PublicationToOaf.java index 18fecc6c2..fca00c71c 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/oaf/PublicationToOaf.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/oaf/PublicationToOaf.java @@ -31,6 +31,7 @@ public class PublicationToOaf implements Serializable { static Logger logger = LoggerFactory.getLogger(PublicationToOaf.class); public static final String ORCID = "ORCID"; + public static final String ORCID_PID_TYPE_CLASSNAME = "Open Researcher and Contributor ID"; public final static String orcidPREFIX = "orcid_______"; public static final String OPENAIRE_PREFIX = "openaire____"; public static final String SEPARATOR = "::"; @@ -79,10 +80,10 @@ public class PublicationToOaf implements Serializable { { put("ark".toLowerCase(), new Pair<>("ark", "ark")); - put("arxiv".toLowerCase(), new Pair<>("arxiv", "arXiv")); - put("pmc".toLowerCase(), new Pair<>("pmc", "pmc")); - put("pmid".toLowerCase(), new Pair<>("pmid", "pmid")); - put("source-work-id".toLowerCase(), new Pair<>("orcidworkid", "orcidworkid")); + put("arxiv".toLowerCase(), new Pair<>("arXiv", "arXiv")); + put("pmc".toLowerCase(), new Pair<>("pmc", "PubMed Central ID")); + put("pmid".toLowerCase(), new Pair<>("pmid", "PubMed ID")); + put("source-work-id".toLowerCase(), new Pair<>("orcidworkid", "orcid workid")); put("urn".toLowerCase(), new Pair<>("urn", "urn")); } }; @@ -152,8 +153,8 @@ public class PublicationToOaf implements Serializable { .keySet() .stream() .forEach(jsonExtId -> { - final String classid = externalIds.get(jsonExtId.toLowerCase()).getValue(); - final String classname = externalIds.get(jsonExtId.toLowerCase()).getKey(); + final String classid = externalIds.get(jsonExtId.toLowerCase()).getKey(); + final String classname = externalIds.get(jsonExtId.toLowerCase()).getValue(); final String extId = getStringValue(rootElement, jsonExtId); if (StringUtils.isNotBlank(extId)) { publication @@ -522,21 +523,21 @@ public class PublicationToOaf implements Serializable { sp.setValue(orcidId); final Qualifier q = new Qualifier(); q.setClassid(ORCID.toLowerCase()); - q.setClassname(ORCID.toLowerCase()); + q.setClassname(ORCID_PID_TYPE_CLASSNAME); q.setSchemeid(ModelConstants.DNET_PID_TYPES); q.setSchemename(ModelConstants.DNET_PID_TYPES); sp.setQualifier(q); final DataInfo dataInfo = new DataInfo(); dataInfo.setDeletedbyinference(false); dataInfo.setInferred(false); - dataInfo.setTrust("0.9"); + dataInfo.setTrust("0.91"); dataInfo - .setProvenanceaction( - mapQualifier( - "sysimport:crosswalk:entityregistry", - "Harvested", - "dnet:provenanceActions", - "dnet:provenanceActions")); + .setProvenanceaction( + mapQualifier( + "sysimport:crosswalk:entityregistry", + "Harvested", + "dnet:provenanceActions", + "dnet:provenanceActions")); sp.setDataInfo(dataInfo); return sp; } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_doi_author_list_orcid_parameters.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_doi_author_list_orcid_parameters.json index b894177b3..41c1a2a7d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_doi_author_list_orcid_parameters.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_doi_author_list_orcid_parameters.json @@ -1,3 +1,5 @@ [{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path", "paramRequired": true}, + {"paramName":"a", "paramLongName":"authorsPath", "paramDescription": "the path of the authors seq file", "paramRequired": true}, + {"paramName":"xw", "paramLongName":"xmlWorksPath", "paramDescription": "the path of the works xml seq file", "paramRequired": true}, {"paramName":"o", "paramLongName":"outputDoiAuthorListPath", "paramDescription": "the relative folder of the sequencial file to write the data", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/config-default.xml deleted file mode 100644 index 3726022cb..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/config-default.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - jobTracker - hadoop-rm3.garr-pa1.d4science.org:8032 - - - nameNode - hdfs://hadoop-rm1.garr-pa1.d4science.org:8020 - - - queueName - default - - - oozie.action.sharelib.for.spark - spark2 - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml index 21d092a83..a466db7f6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml @@ -1,55 +1,99 @@ - - + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + spark2MaxExecutors + 40 + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + workingPath + the working dir base path + + + + + ${jobTracker} + ${nameNode} + - workingPath - the working dir base path + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn - cluster - Gen_Doi_Author_List - eu.dnetlib.doiboost.orcid.SparkGenerateDoiAuthorList - dhp-doiboost-1.2.1-SNAPSHOT.jar - --num-executors 10 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - - -w${workingPath}/ - -odoi_author_list/ - - - - - - + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + yarn-cluster + cluster + GenDoiAuthorList + eu.dnetlib.doiboost.orcid.SparkGenerateDoiAuthorList + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + + -w${workingPath}/ + -aauthors/authors.seq + -xwxml/works/*.seq + -odoi_author_list/ + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java index 66a7badb7..fc18132a1 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/OrcidClientTest.java @@ -13,13 +13,13 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.time.LocalDateTime; import java.time.temporal.TemporalUnit; -import java.util.Arrays; -import java.util.Date; -import java.util.List; +import java.util.*; +import java.util.stream.Collectors; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.compress.utils.Lists; import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -30,6 +30,7 @@ import org.junit.jupiter.api.Test; import org.mortbay.log.Log; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.orcid.AuthorData; import jdk.nashorn.internal.ir.annotations.Ignore; public class OrcidClientTest { @@ -42,6 +43,9 @@ public class OrcidClientTest { String toNotRetrieveDate = "2019-09-29 23:59:59.000000"; String lastUpdate = "2019-09-30 00:00:00"; String shortDate = "2020-05-06 16:06:11"; + final String REQUEST_TYPE_RECORD = "record"; + final String REQUEST_TYPE_WORK = "work/47652866"; + final String REQUEST_TYPE_WORKS = "works"; // curl -i -H "Accept: application/vnd.orcid+xml" // -H 'Authorization: Bearer 78fdb232-7105-4086-8570-e153f4198e3d' @@ -86,25 +90,25 @@ public class OrcidClientTest { @Test private void downloadTest(String orcid) throws Exception { - String record = testDownloadRecord(orcid); - String filename = "/tmp/downloaded_".concat(orcid).concat(".xml"); + String record = testDownloadRecord(orcid, REQUEST_TYPE_RECORD); + String filename = "/tmp/downloaded_record_".concat(orcid).concat(".xml"); File f = new File(filename); OutputStream outStream = new FileOutputStream(f); IOUtils.write(record.getBytes(), outStream); } - private String testDownloadRecord(String orcidId) throws Exception { + private String testDownloadRecord(String orcidId, String dataType) throws Exception { try (CloseableHttpClient client = HttpClients.createDefault()) { - HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); + HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/" + dataType); httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Authorization", "Bearer 78fdb232-7105-4086-8570-e153f4198e3d"); - logToFile("start connection: " + new Date(System.currentTimeMillis()).toString()); + long start = System.currentTimeMillis(); CloseableHttpResponse response = client.execute(httpGet); - logToFile("end connection: " + new Date(System.currentTimeMillis()).toString()); + long end = System.currentTimeMillis(); if (response.getStatusLine().getStatusCode() != 200) { - System.out - .println("Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); + logToFile("Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); } + logToFile(orcidId + " " + dataType + " " + (end - start) / 1000 + " seconds"); return IOUtils.toString(response.getEntity().getContent()); } catch (Throwable e) { e.printStackTrace(); @@ -129,7 +133,7 @@ public class OrcidClientTest { } String[] values = line.split(","); List recordInfo = Arrays.asList(values); - testDownloadRecord(recordInfo.get(0)); + testDownloadRecord(recordInfo.get(0), REQUEST_TYPE_RECORD); long endReq = System.currentTimeMillis(); nReqTmp++; if (nReqTmp == REQ_LIMIT) { @@ -190,7 +194,7 @@ public class OrcidClientTest { .toString(getClass().getResourceAsStream("0000-0003-3028-6161.compressed.base64")); final String recordFromSeqFile = ArgumentApplicationParser.decompressValue(base64CompressedRecord); logToFile("\n\ndownloaded \n\n" + recordFromSeqFile); - final String downloadedRecord = testDownloadRecord("0000-0003-3028-6161"); + final String downloadedRecord = testDownloadRecord("0000-0003-3028-6161", REQUEST_TYPE_RECORD); assertTrue(recordFromSeqFile.equals(downloadedRecord)); } @@ -255,7 +259,7 @@ public class OrcidClientTest { logToFile("modified: " + modified); } - private void logToFile(String log) + public static void logToFile(String log) throws IOException { log = log.concat("\n"); Path path = Paths.get("/tmp/orcid_log.txt"); @@ -298,4 +302,72 @@ public class OrcidClientTest { } return new String(""); } + + @Test + private void downloadWorkTest() throws Exception { + String orcid = "0000-0003-0015-1952"; + String record = testDownloadRecord(orcid, REQUEST_TYPE_WORK); + String filename = "/tmp/downloaded_work_".concat(orcid).concat(".xml"); + File f = new File(filename); + OutputStream outStream = new FileOutputStream(f); + IOUtils.write(record.getBytes(), outStream); + } + + @Test + private void downloadRecordTest() throws Exception { + String orcid = "0000-0001-5004-5918"; + String record = testDownloadRecord(orcid, REQUEST_TYPE_RECORD); + String filename = "/tmp/downloaded_record_".concat(orcid).concat(".xml"); + File f = new File(filename); + OutputStream outStream = new FileOutputStream(f); + IOUtils.write(record.getBytes(), outStream); + } + + @Test + private void downloadWorksTest() throws Exception { + String orcid = "0000-0001-5004-5918"; + String record = testDownloadRecord(orcid, REQUEST_TYPE_WORKS); + String filename = "/tmp/downloaded_works_".concat(orcid).concat(".xml"); + File f = new File(filename); + OutputStream outStream = new FileOutputStream(f); + IOUtils.write(record.getBytes(), outStream); + } + + @Test + private void downloadSingleWorkTest() throws Exception { + String orcid = "0000-0001-5004-5918"; + String record = testDownloadRecord(orcid, REQUEST_TYPE_WORK); + String filename = "/tmp/downloaded_work_47652866_".concat(orcid).concat(".xml"); + File f = new File(filename); + OutputStream outStream = new FileOutputStream(f); + IOUtils.write(record.getBytes(), outStream); + } + + @Test + public void cleanAuthorListTest() throws Exception { + AuthorData a1 = new AuthorData(); + a1.setOid("1"); + a1.setName("n1"); + a1.setSurname("s1"); + a1.setCreditName("c1"); + AuthorData a2 = new AuthorData(); + a2.setOid("1"); + a2.setName("n1"); + a2.setSurname("s1"); + a2.setCreditName("c1"); + AuthorData a3 = new AuthorData(); + a3.setOid("3"); + a3.setName("n3"); + a3.setSurname("s3"); + a3.setCreditName("c3"); + List list = Lists.newArrayList(); + list.add(a1); + list.add(a2); + list.add(a3); + + Set namesAlreadySeen = new HashSet<>(); + assertTrue(list.size() == 3); + list.removeIf(a -> !namesAlreadySeen.add(a.getOid())); + assertTrue(list.size() == 2); + } } diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java index b7be5e5cd..7dc42deb8 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java @@ -4,10 +4,13 @@ package eu.dnetlib.doiboost.orcid.xml; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Map; + import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.Test; import eu.dnetlib.dhp.schema.orcid.AuthorData; +import eu.dnetlib.doiboost.orcid.OrcidClientTest; import eu.dnetlib.doiboost.orcid.model.WorkData; import eu.dnetlib.doiboost.orcidnodoi.json.JsonWriter; @@ -59,7 +62,7 @@ public class XMLRecordParserTest { } @Test - public void testOrcidOtherNamesXMLParser() throws Exception { + private void testOrcidOtherNamesXMLParser() throws Exception { String xml = IOUtils .toString( @@ -74,4 +77,17 @@ public class XMLRecordParserTest { String jsonData = JsonWriter.create(authorData); assertNotNull(jsonData); } + + @Test + public void testWorkIdLastModifiedDateXMLParser() throws Exception { + + String xml = IOUtils + .toString( + this.getClass().getResourceAsStream("record_8888-8888-8888-8880.xml")); +// Map workIdLastModifiedDate = XMLRecordParser.retrieveWorkIdLastModifiedDate(xml.getBytes()); +// String LastModifiedDate = workIdLastModifiedDate.get(0); +// OrcidClientTest.logToFile(LastModifiedDate + " -- " + workIdLastModifiedDate.get(LastModifiedDate)); + String result = XMLRecordParser.retrieveWorkIdFromSummary(xml.getBytes(), "empty"); + OrcidClientTest.logToFile(result); + } }