From ebd67b8c8f68e251e745a3474e8180d9a2a9f4ca Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Thu, 25 Mar 2021 11:20:52 +0100 Subject: [PATCH] removed duplicates orcid data on authors set --- .../orcid/SparkUpdateOrcidAuthors.java | 82 +++++++++++++++++-- .../oozie_app/workflow.xml | 2 +- 2 files changed, 74 insertions(+), 10 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java index 9d7fee053..0eb844fe2 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidAuthors.java @@ -2,8 +2,10 @@ package eu.dnetlib.doiboost.orcid; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static org.apache.spark.sql.functions.*; import java.io.IOException; +import java.util.List; import java.util.Objects; import java.util.Optional; @@ -12,6 +14,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; @@ -125,7 +128,7 @@ public class SparkUpdateOrcidAuthors { .map(item -> OBJECT_MAPPER.readValue(item, AuthorSummary.class)) .rdd(), Encoders.bean(AuthorSummary.class)); - currentAuthorSummaryDS + Dataset mergedAuthorSummaryDS = currentAuthorSummaryDS .joinWith( downloadedAuthorSummaryDS, currentAuthorSummaryDS @@ -150,18 +153,79 @@ public class SparkUpdateOrcidAuthors { return null; }, Encoders.bean(AuthorSummary.class)) - .filter(Objects::nonNull) + .filter(Objects::nonNull); + + long mergedCount = mergedAuthorSummaryDS.count(); + + Dataset base64DedupedDS = mergedAuthorSummaryDS.dropDuplicates("base64CompressData"); + + List dupOids = base64DedupedDS + .groupBy("authorData.oid") + .agg(count("authorData.oid").alias("oidOccurrenceCount")) + .where("oidOccurrenceCount > 1") + .select("oid") + .toJavaRDD() + .map(row -> row.get(0).toString()) + .collect(); + + JavaRDD dupAuthors = base64DedupedDS + .toJavaRDD() + .filter( + authorSummary -> (Objects.nonNull(authorSummary.getAuthorData()) + && Objects.nonNull(authorSummary.getAuthorData().getOid()))) + .filter(authorSummary -> dupOids.contains(authorSummary.getAuthorData().getOid())); + + Dataset dupAuthorSummaryDS = spark + .createDataset( + dupAuthors.rdd(), + Encoders.bean(AuthorSummary.class)); + List> lastModifiedAuthors = dupAuthorSummaryDS + .groupBy("authorData.oid") + .agg(array_max(collect_list("downloadDate"))) + .map( + row -> new Tuple2<>(row.get(0).toString(), row.get(1).toString()), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .toJavaRDD() + .collect(); + + JavaRDD lastDownloadedAuthors = base64DedupedDS + .toJavaRDD() + .filter( + authorSummary -> (Objects.nonNull(authorSummary.getAuthorData()) + && Objects.nonNull(authorSummary.getAuthorData().getOid()))) + .filter(authorSummary -> { + boolean oidFound = lastModifiedAuthors + .stream() + .filter(a -> a._1().equals(authorSummary.getAuthorData().getOid())) + .count() == 1; + boolean tsFound = lastModifiedAuthors + .stream() + .filter( + a -> a._1().equals(authorSummary.getAuthorData().getOid()) && + a._2().equals(authorSummary.getDownloadDate())) + .count() == 1; + return (oidFound && tsFound) || (!oidFound); + }); + + Dataset cleanedDS = spark + .createDataset( + lastDownloadedAuthors.rdd(), + Encoders.bean(AuthorSummary.class)) + .dropDuplicates("downloadDate", "authorData"); + cleanedDS .toJavaRDD() .map(authorSummary -> OBJECT_MAPPER.writeValueAsString(authorSummary)) .saveAsTextFile(workingPath.concat("orcid_dataset/new_authors"), GzipCodec.class); + long cleanedDSCount = cleanedDS.count(); - logger.info("oldAuthorsFoundAcc: " + oldAuthorsFoundAcc.value().toString()); - logger.info("newAuthorsFoundAcc: " + newAuthorsFoundAcc.value().toString()); - logger.info("updatedAuthorsFoundAcc: " + updatedAuthorsFoundAcc.value().toString()); - logger.info("errorCodeFoundAcc: " + errorCodeAuthorsFoundAcc.value().toString()); - logger.info("errorLoadingJsonFoundAcc: " + errorLoadingAuthorsJsonFoundAcc.value().toString()); - logger.info("errorParsingXMLFoundAcc: " + errorParsingAuthorsXMLFoundAcc.value().toString()); - + logger.info("report_oldAuthorsFoundAcc: " + oldAuthorsFoundAcc.value().toString()); + logger.info("report_newAuthorsFoundAcc: " + newAuthorsFoundAcc.value().toString()); + logger.info("report_updatedAuthorsFoundAcc: " + updatedAuthorsFoundAcc.value().toString()); + logger.info("report_errorCodeFoundAcc: " + errorCodeAuthorsFoundAcc.value().toString()); + logger.info("report_errorLoadingJsonFoundAcc: " + errorLoadingAuthorsJsonFoundAcc.value().toString()); + logger.info("report_errorParsingXMLFoundAcc: " + errorParsingAuthorsXMLFoundAcc.value().toString()); + logger.info("report_merged_count: " + mergedCount); + logger.info("report_cleaned_count: " + cleanedDSCount); }); } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml index 9cb917251..fa161ad35 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_updates_download/oozie_app/workflow.xml @@ -315,6 +315,6 @@ - + \ No newline at end of file