From efe4c2a9c5b97d3634fdbe5d42b4d708b5a4861a Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Sat, 12 Dec 2020 02:06:21 +0100 Subject: [PATCH] authors and works are now updated in two separate spark actions of the wf --- .../dhp/schema/orcid/AuthorSummary.java | 4 +- .../dnetlib/dhp/schema/orcid/ExternalId.java | 4 +- .../dnetlib/dhp/schema/orcid/OrcidData.java | 4 +- .../dhp/schema/orcid/PublicationDate.java | 4 +- .../eu/dnetlib/dhp/schema/orcid/Work.java | 4 +- .../orcid/SparkUpdateOrcidAuthors.java | 178 +++++++++++++++++ .../orcid/SparkUpdateOrcidDatasets.java | 104 +++++----- .../doiboost/orcid/SparkUpdateOrcidWorks.java | 181 ++++++++++++++++++ .../orcid_update/oozie_app/workflow.xml | 79 +++++++- .../oozie_app/workflow.xml | 17 +- 10 files changed, 504 insertions(+), 75 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/AuthorSummary.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/AuthorSummary.java index 1f773b6c94..813aead490 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/AuthorSummary.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/AuthorSummary.java @@ -1,7 +1,9 @@ package eu.dnetlib.dhp.schema.orcid; -public class AuthorSummary extends OrcidData { +import java.io.Serializable; + +public class AuthorSummary extends OrcidData implements Serializable { AuthorData authorData; AuthorHistory authorHistory; diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/ExternalId.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/ExternalId.java index 8bb750b2a3..d8f001aa5a 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/ExternalId.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/ExternalId.java @@ -1,11 +1,13 @@ package eu.dnetlib.dhp.schema.orcid; +import java.io.Serializable; + /** * This class models the data related to external id, that are retrieved from an orcid publication */ -public class ExternalId { +public class ExternalId implements Serializable { private String type; private String value; private String relationShip; diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidData.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidData.java index bc581df17a..606eea6a8e 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidData.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidData.java @@ -1,7 +1,9 @@ package eu.dnetlib.dhp.schema.orcid; -public class OrcidData { +import java.io.Serializable; + +public class OrcidData implements Serializable { protected String base64CompressData; protected String statusCode; protected String downloadDate; diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/PublicationDate.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/PublicationDate.java index 1d44676a32..01972ce95a 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/PublicationDate.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/PublicationDate.java @@ -1,11 +1,13 @@ package eu.dnetlib.dhp.schema.orcid; +import java.io.Serializable; + /** * This class models the data related to a publication date, that are retrieved from an orcid publication */ -public class PublicationDate { +public class PublicationDate implements Serializable { private String year; private String month; private String day; diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/Work.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/Work.java index a0953a4652..c557eb5d2f 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/Work.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/Work.java @@ -1,7 +1,9 @@ package eu.dnetlib.dhp.schema.orcid; -public class Work extends OrcidData { +import java.io.Serializable; + +public class Work extends OrcidData implements Serializable { WorkDetail workDetail; public WorkDetail getWorkDetail() { diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java new file mode 100644 index 0000000000..4dbc403013 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java @@ -0,0 +1,178 @@ + +package eu.dnetlib.doiboost.orcid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.orcid.AuthorSummary; +import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser; +import scala.Tuple2; + +public class SparkUpdateOrcidAuthors { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .setSerializationInclusion(JsonInclude.Include.NON_NULL); + + public static void main(String[] args) throws IOException, Exception { + Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkUpdateOrcidDatasets.class + .getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + final String workingPath = parser.get("workingPath"); +// final String outputPath = parser.get("outputPath"); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + LongAccumulator oldAuthorsFoundAcc = spark + .sparkContext() + .longAccumulator("old_authors_found"); + LongAccumulator updatedAuthorsFoundAcc = spark + .sparkContext() + .longAccumulator("updated_authors_found"); + LongAccumulator newAuthorsFoundAcc = spark + .sparkContext() + .longAccumulator("new_authors_found"); + LongAccumulator errorCodeAuthorsFoundAcc = spark + .sparkContext() + .longAccumulator("error_code_authors_found"); + LongAccumulator errorLoadingAuthorsJsonFoundAcc = spark + .sparkContext() + .longAccumulator("error_loading_authors_json_found"); + LongAccumulator errorParsingAuthorsXMLFoundAcc = spark + .sparkContext() + .longAccumulator("error_parsing_authors_xml_found"); + + Function, AuthorSummary> retrieveAuthorSummaryFunction = data -> { + AuthorSummary authorSummary = new AuthorSummary(); + String orcidId = data._1().toString(); + String jsonData = data._2().toString(); + JsonElement jElement = new JsonParser().parse(jsonData); + String statusCode = getJsonValue(jElement, "statusCode"); + String downloadDate = getJsonValue(jElement, "lastModifiedDate"); + if (statusCode.equals("200")) { + String compressedData = getJsonValue(jElement, "compressedData"); + if (StringUtils.isEmpty(compressedData)) { + errorLoadingAuthorsJsonFoundAcc.add(1); + } else { + String xmlAuthor = ArgumentApplicationParser.decompressValue(compressedData); + try { + authorSummary = XMLRecordParser + .VTDParseAuthorSummary(xmlAuthor.getBytes()); + authorSummary.setStatusCode(statusCode); + authorSummary.setDownloadDate("2020-11-18 00:00:05.644768"); + authorSummary.setBase64CompressData(compressedData); + return authorSummary; + } catch (Exception e) { + logger.error("parsing xml " + orcidId + " [" + jsonData + "]", e); + errorParsingAuthorsXMLFoundAcc.add(1); + } + } + } else { + authorSummary.setStatusCode(statusCode); + authorSummary.setDownloadDate("2020-11-18 00:00:05.644768"); + errorCodeAuthorsFoundAcc.add(1); + } + return authorSummary; + }; + + Dataset downloadedAuthorSummaryDS = spark + .createDataset( + sc + .sequenceFile(workingPath + "downloads/updated_authors/*", Text.class, Text.class) + .map(retrieveAuthorSummaryFunction) + .rdd(), + Encoders.bean(AuthorSummary.class)); + Dataset currentAuthorSummaryDS = spark + .createDataset( + sc + .textFile(workingPath.concat("orcid_dataset/authors/*")) + .map(item -> OBJECT_MAPPER.readValue(item, AuthorSummary.class)) + .rdd(), + Encoders.bean(AuthorSummary.class)); + currentAuthorSummaryDS + .joinWith( + downloadedAuthorSummaryDS, + currentAuthorSummaryDS + .col("authorData.oid") + .equalTo(downloadedAuthorSummaryDS.col("authorData.oid")), + "full_outer") + .map(value -> { + Optional opCurrent = Optional.ofNullable(value._1()); + Optional opDownloaded = Optional.ofNullable(value._2()); + if (!opCurrent.isPresent()) { + newAuthorsFoundAcc.add(1); + return opDownloaded.get(); + } + if (!opDownloaded.isPresent()) { + oldAuthorsFoundAcc.add(1); + return opCurrent.get(); + } + if (opCurrent.isPresent() && opDownloaded.isPresent()) { + updatedAuthorsFoundAcc.add(1); + return opDownloaded.get(); + } + return null; + }, + Encoders.bean(AuthorSummary.class)) + .filter(Objects::nonNull) + .toJavaRDD() + .map(authorSummary -> OBJECT_MAPPER.writeValueAsString(authorSummary)) + .saveAsTextFile(workingPath.concat("orcid_dataset/new_authors"), GzipCodec.class); + + logger.info("oldAuthorsFoundAcc: " + oldAuthorsFoundAcc.value().toString()); + logger.info("newAuthorsFoundAcc: " + newAuthorsFoundAcc.value().toString()); + logger.info("updatedAuthorsFoundAcc: " + updatedAuthorsFoundAcc.value().toString()); + logger.info("errorCodeFoundAcc: " + errorCodeAuthorsFoundAcc.value().toString()); + logger.info("errorLoadingJsonFoundAcc: " + errorLoadingAuthorsJsonFoundAcc.value().toString()); + logger.info("errorParsingXMLFoundAcc: " + errorParsingAuthorsXMLFoundAcc.value().toString()); + + }); + } + + private static String getJsonValue(JsonElement jElement, String property) { + if (jElement.getAsJsonObject().has(property)) { + JsonElement name = null; + name = jElement.getAsJsonObject().get(property); + if (name != null && !name.isJsonNull()) { + return name.getAsString(); + } + } + return ""; + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java index 8e0ddc078c..71c011ebcf 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java @@ -4,27 +4,23 @@ package eu.dnetlib.doiboost.orcid; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; -import java.util.*; +import java.util.Objects; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.JsonElement; import com.google.gson.JsonParser; @@ -33,15 +29,14 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.orcid.AuthorSummary; import eu.dnetlib.dhp.schema.orcid.Work; import eu.dnetlib.dhp.schema.orcid.WorkDetail; -import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser; -import eu.dnetlib.doiboost.orcidnodoi.json.JsonWriter; import eu.dnetlib.doiboost.orcidnodoi.xml.XMLRecordParserNoDoi; import scala.Tuple2; public class SparkUpdateOrcidDatasets { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .setSerializationInclusion(JsonInclude.Include.NON_NULL); public static void main(String[] args) throws IOException, Exception { Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class); @@ -67,31 +62,40 @@ public class SparkUpdateOrcidDatasets { spark -> { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + LongAccumulator oldAuthorsFoundAcc = spark + .sparkContext() + .longAccumulator("old_authors_found"); + LongAccumulator updatedAuthorsFoundAcc = spark + .sparkContext() + .longAccumulator("updated_authors_found"); + LongAccumulator newAuthorsFoundAcc = spark + .sparkContext() + .longAccumulator("new_authors_found"); LongAccumulator errorCodeAuthorsFoundAcc = spark .sparkContext() .longAccumulator("error_code_authors_found"); LongAccumulator errorLoadingAuthorsJsonFoundAcc = spark .sparkContext() .longAccumulator("error_loading_authors_json_found"); - LongAccumulator errorLoadingAuthorsXMLFoundAcc = spark - .sparkContext() - .longAccumulator("error_loading_authors_xml_found"); LongAccumulator errorParsingAuthorsXMLFoundAcc = spark .sparkContext() .longAccumulator("error_parsing_authors_xml_found"); + LongAccumulator oldWorksFoundAcc = spark + .sparkContext() + .longAccumulator("old_works_found"); LongAccumulator updatedWorksFoundAcc = spark .sparkContext() .longAccumulator("updated_works_found"); + LongAccumulator newWorksFoundAcc = spark + .sparkContext() + .longAccumulator("new_works_found"); LongAccumulator errorCodeWorksFoundAcc = spark .sparkContext() .longAccumulator("error_code_works_found"); LongAccumulator errorLoadingWorksJsonFoundAcc = spark .sparkContext() .longAccumulator("error_loading_works_json_found"); - LongAccumulator errorLoadingWorksXMLFoundAcc = spark - .sparkContext() - .longAccumulator("error_loading_works_xml_found"); LongAccumulator errorParsingWorksXMLFoundAcc = spark .sparkContext() .longAccumulator("error_parsing_works_xml_found"); @@ -138,25 +142,21 @@ public class SparkUpdateOrcidDatasets { // errorLoadingAuthorsJsonFoundAcc.add(1); // } else { // String xmlAuthor = ArgumentApplicationParser.decompressValue(compressedData); -// if (StringUtils.isEmpty(xmlAuthor)) { -// errorLoadingAuthorsXMLFoundAcc.add(1); -// } else { -// try { -// authorSummary = XMLRecordParser -// .VTDParseAuthorSummary(xmlAuthor.getBytes()); -// authorSummary.setStatusCode(statusCode); -// authorSummary.setDownloadDate(downloadDate); -// authorSummary.setBase64CompressData(compressedData); -// return authorSummary; -// } catch (Exception e) { -// logger.error("parsing xml " + orcidId + " [" + jsonData + "]", e); -// errorParsingAuthorsXMLFoundAcc.add(1); -// } +// try { +// authorSummary = XMLRecordParser +// .VTDParseAuthorSummary(xmlAuthor.getBytes()); +// authorSummary.setStatusCode(statusCode); +// authorSummary.setDownloadDate("2020-11-18 00:00:05.644768"); +// authorSummary.setBase64CompressData(compressedData); +// return authorSummary; +// } catch (Exception e) { +// logger.error("parsing xml " + orcidId + " [" + jsonData + "]", e); +// errorParsingAuthorsXMLFoundAcc.add(1); // } // } // } else { // authorSummary.setStatusCode(statusCode); -// authorSummary.setDownloadDate(downloadDate); +// authorSummary.setDownloadDate("2020-11-18 00:00:05.644768"); // errorCodeAuthorsFoundAcc.add(1); // } // return authorSummary; @@ -187,12 +187,15 @@ public class SparkUpdateOrcidDatasets { // Optional opCurrent = Optional.ofNullable(value._1()); // Optional opDownloaded = Optional.ofNullable(value._2()); // if (!opCurrent.isPresent()) { +// newAuthorsFoundAcc.add(1); // return opDownloaded.get(); // } // if (!opDownloaded.isPresent()) { +// oldAuthorsFoundAcc.add(1); // return opCurrent.get(); // } // if (opCurrent.isPresent() && opDownloaded.isPresent()) { +// updatedAuthorsFoundAcc.add(1); // return opDownloaded.get(); // } // return null; @@ -200,12 +203,14 @@ public class SparkUpdateOrcidDatasets { // Encoders.bean(AuthorSummary.class)) // .filter(Objects::nonNull) // .toJavaRDD() -// .map(authorSummary -> JsonWriter.create(authorSummary)) +// .map(authorSummary -> OBJECT_MAPPER.writeValueAsString(authorSummary)) // .saveAsTextFile(workingPath.concat("orcid_dataset/new_authors"), GzipCodec.class); // +// logger.info("oldAuthorsFoundAcc: " + oldAuthorsFoundAcc.value().toString()); +// logger.info("newAuthorsFoundAcc: " + newAuthorsFoundAcc.value().toString()); +// logger.info("updatedAuthorsFoundAcc: " + updatedAuthorsFoundAcc.value().toString()); // logger.info("errorCodeFoundAcc: " + errorCodeAuthorsFoundAcc.value().toString()); // logger.info("errorLoadingJsonFoundAcc: " + errorLoadingAuthorsJsonFoundAcc.value().toString()); -// logger.info("errorLoadingXMLFoundAcc: " + errorLoadingAuthorsXMLFoundAcc.value().toString()); // logger.info("errorParsingXMLFoundAcc: " + errorParsingAuthorsXMLFoundAcc.value().toString()); Function retrieveWorkFunction = jsonData -> { @@ -214,27 +219,22 @@ public class SparkUpdateOrcidDatasets { String statusCode = getJsonValue(jElement, "statusCode"); work.setStatusCode(statusCode); String downloadDate = getJsonValue(jElement, "lastModifiedDate"); - work.setDownloadDate(downloadDate); + work.setDownloadDate("2020-11-18 00:00:05.644768"); if (statusCode.equals("200")) { String compressedData = getJsonValue(jElement, "compressedData"); if (StringUtils.isEmpty(compressedData)) { errorLoadingWorksJsonFoundAcc.add(1); } else { String xmlWork = ArgumentApplicationParser.decompressValue(compressedData); - if (StringUtils.isEmpty(xmlWork)) { - errorLoadingWorksXMLFoundAcc.add(1); - } else { - try { - WorkDetail workDetail = XMLRecordParserNoDoi - .VTDParseWorkData(xmlWork.getBytes()); - work.setWorkDetail(workDetail); - work.setBase64CompressData(compressedData); - updatedWorksFoundAcc.add(1); - return work; - } catch (Exception e) { - logger.error("parsing xml [" + jsonData + "]", e); - errorParsingWorksXMLFoundAcc.add(1); - } + try { + WorkDetail workDetail = XMLRecordParserNoDoi + .VTDParseWorkData(xmlWork.getBytes()); + work.setWorkDetail(workDetail); + work.setBase64CompressData(compressedData); + return work; + } catch (Exception e) { + logger.error("parsing xml [" + jsonData + "]", e); + errorParsingWorksXMLFoundAcc.add(1); } } } else { @@ -275,12 +275,15 @@ public class SparkUpdateOrcidDatasets { Optional opCurrent = Optional.ofNullable(value._1()); Optional opDownloaded = Optional.ofNullable(value._2()); if (!opCurrent.isPresent()) { + newWorksFoundAcc.add(1); return opDownloaded.get(); } if (!opDownloaded.isPresent()) { + oldWorksFoundAcc.add(1); return opCurrent.get(); } if (opCurrent.isPresent() && opDownloaded.isPresent()) { + updatedWorksFoundAcc.add(1); return opDownloaded.get(); } return null; @@ -288,13 +291,14 @@ public class SparkUpdateOrcidDatasets { Encoders.bean(Work.class)) .filter(Objects::nonNull) .toJavaRDD() - .map(work -> JsonWriter.create(work)) + .map(work -> OBJECT_MAPPER.writeValueAsString(work)) .saveAsTextFile(workingPath.concat("orcid_dataset/new_works"), GzipCodec.class); + logger.info("oldWorksFoundAcc: " + oldWorksFoundAcc.value().toString()); + logger.info("newWorksFoundAcc: " + newWorksFoundAcc.value().toString()); logger.info("updatedWorksFoundAcc: " + updatedWorksFoundAcc.value().toString()); logger.info("errorCodeWorksFoundAcc: " + errorCodeWorksFoundAcc.value().toString()); logger.info("errorLoadingJsonWorksFoundAcc: " + errorLoadingWorksJsonFoundAcc.value().toString()); - logger.info("errorLoadingXMLWorksFoundAcc: " + errorLoadingWorksXMLFoundAcc.value().toString()); logger.info("errorParsingXMLWorksFoundAcc: " + errorParsingWorksXMLFoundAcc.value().toString()); }); diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java new file mode 100644 index 0000000000..d06aac98a2 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidWorks.java @@ -0,0 +1,181 @@ + +package eu.dnetlib.doiboost.orcid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.orcid.Work; +import eu.dnetlib.dhp.schema.orcid.WorkDetail; +import eu.dnetlib.doiboost.orcidnodoi.xml.XMLRecordParserNoDoi; + +public class SparkUpdateOrcidWorks { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .setSerializationInclusion(JsonInclude.Include.NON_NULL); + + public static void main(String[] args) throws IOException, Exception { + Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkUpdateOrcidDatasets.class + .getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + final String workingPath = parser.get("workingPath"); +// final String outputPath = parser.get("outputPath"); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + LongAccumulator oldWorksFoundAcc = spark + .sparkContext() + .longAccumulator("old_works_found"); + LongAccumulator updatedWorksFoundAcc = spark + .sparkContext() + .longAccumulator("updated_works_found"); + LongAccumulator newWorksFoundAcc = spark + .sparkContext() + .longAccumulator("new_works_found"); + LongAccumulator errorCodeWorksFoundAcc = spark + .sparkContext() + .longAccumulator("error_code_works_found"); + LongAccumulator errorLoadingWorksJsonFoundAcc = spark + .sparkContext() + .longAccumulator("error_loading_works_json_found"); + LongAccumulator errorParsingWorksXMLFoundAcc = spark + .sparkContext() + .longAccumulator("error_parsing_works_xml_found"); + + Function retrieveWorkFunction = jsonData -> { + Work work = new Work(); + JsonElement jElement = new JsonParser().parse(jsonData); + String statusCode = getJsonValue(jElement, "statusCode"); + work.setStatusCode(statusCode); + String downloadDate = getJsonValue(jElement, "lastModifiedDate"); + work.setDownloadDate("2020-11-18 00:00:05.644768"); + if (statusCode.equals("200")) { + String compressedData = getJsonValue(jElement, "compressedData"); + if (StringUtils.isEmpty(compressedData)) { + errorLoadingWorksJsonFoundAcc.add(1); + } else { + String xmlWork = ArgumentApplicationParser.decompressValue(compressedData); + try { + WorkDetail workDetail = XMLRecordParserNoDoi + .VTDParseWorkData(xmlWork.getBytes()); + work.setWorkDetail(workDetail); + work.setBase64CompressData(compressedData); + return work; + } catch (Exception e) { + logger.error("parsing xml [" + jsonData + "]", e); + errorParsingWorksXMLFoundAcc.add(1); + } + } + } else { + errorCodeWorksFoundAcc.add(1); + } + return work; + }; + + Dataset downloadedWorksDS = spark + .createDataset( + sc + .textFile(workingPath + "downloads/updated_works/*") + .map(s -> { + return s.substring(21, s.length() - 1); + }) + .map(retrieveWorkFunction) + .rdd(), + Encoders.bean(Work.class)); + Dataset currentWorksDS = spark + .createDataset( + sc + .textFile(workingPath.concat("orcid_dataset/works/*")) + .map(item -> OBJECT_MAPPER.readValue(item, Work.class)) + .rdd(), + Encoders.bean(Work.class)); + currentWorksDS + .joinWith( + downloadedWorksDS, + currentWorksDS + .col("workDetail.id") + .equalTo(downloadedWorksDS.col("workDetail.id")) + .and( + currentWorksDS + .col("workDetail.oid") + .equalTo(downloadedWorksDS.col("workDetail.oid"))), + "full_outer") + .map(value -> { + Optional opCurrent = Optional.ofNullable(value._1()); + Optional opDownloaded = Optional.ofNullable(value._2()); + if (!opCurrent.isPresent()) { + newWorksFoundAcc.add(1); + return opDownloaded.get(); + } + if (!opDownloaded.isPresent()) { + oldWorksFoundAcc.add(1); + return opCurrent.get(); + } + if (opCurrent.isPresent() && opDownloaded.isPresent()) { + updatedWorksFoundAcc.add(1); + return opDownloaded.get(); + } + return null; + }, + Encoders.bean(Work.class)) + .filter(Objects::nonNull) + .toJavaRDD() + .map(work -> OBJECT_MAPPER.writeValueAsString(work)) + .saveAsTextFile(workingPath.concat("orcid_dataset/new_works"), GzipCodec.class); + + logger.info("oldWorksFoundAcc: " + oldWorksFoundAcc.value().toString()); + logger.info("newWorksFoundAcc: " + newWorksFoundAcc.value().toString()); + logger.info("updatedWorksFoundAcc: " + updatedWorksFoundAcc.value().toString()); + logger.info("errorCodeWorksFoundAcc: " + errorCodeWorksFoundAcc.value().toString()); + logger.info("errorLoadingJsonWorksFoundAcc: " + errorLoadingWorksJsonFoundAcc.value().toString()); + logger.info("errorParsingXMLWorksFoundAcc: " + errorParsingWorksXMLFoundAcc.value().toString()); + + }); + } + + private static String getJsonValue(JsonElement jElement, String property) { + if (jElement.getAsJsonObject().has(property)) { + JsonElement name = null; + name = jElement.getAsJsonObject().get(property); + if (name != null && !name.isJsonNull()) { + return name.getAsString(); + } + } + return ""; + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml index 7e34f67c82..135e6a4c86 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml @@ -55,18 +55,54 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + + + + + + + + + + yarn-cluster cluster - UpdateOrcidDatasets - eu.dnetlib.doiboost.orcid.SparkUpdateOrcidDatasets + UpdateOrcidAuthors + eu.dnetlib.doiboost.orcid.SparkUpdateOrcidAuthors + dhp-doiboost-${projectVersion}.jar + + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + + -w${workingPath}/ + -n${nameNode} + -f- + -o- + -t- + + + + + + + + yarn-cluster + cluster + UpdateOrcidWorks + eu.dnetlib.doiboost.orcid.SparkUpdateOrcidWorks dhp-doiboost-${projectVersion}.jar --conf spark.dynamicAllocation.enabled=true @@ -88,5 +124,40 @@ + + + + + + + ${workingPath}/orcid_dataset/new_authors/* + ${workingPath}/orcid_dataset/authors + + + + + + + + + + + + ${workingPath}/orcid_dataset/new_works/* + ${workingPath}/orcid_dataset/works + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml index 8844a15391..a1537387ea 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml @@ -66,7 +66,7 @@ - + @@ -96,21 +96,6 @@ - - - ${jobTracker} - ${nameNode} - eu.dnetlib.doiboost.orcid.OrcidDownloader - -w${workingPath}/ - -n${nameNode} - -flast_modified.csv.tar - -odownloads/ - -t${token} - - - - - yarn-cluster