removed duplicates orcid data on authors set

This commit is contained in:
Enrico Ottonello 2021-03-25 11:20:52 +01:00
parent 20c0438f11
commit ebd67b8c8f
2 changed files with 74 additions and 10 deletions

View File

@ -2,8 +2,10 @@
package eu.dnetlib.doiboost.orcid; package eu.dnetlib.doiboost.orcid;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.*;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@ -12,6 +14,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -125,7 +128,7 @@ public class SparkUpdateOrcidAuthors {
.map(item -> OBJECT_MAPPER.readValue(item, AuthorSummary.class)) .map(item -> OBJECT_MAPPER.readValue(item, AuthorSummary.class))
.rdd(), .rdd(),
Encoders.bean(AuthorSummary.class)); Encoders.bean(AuthorSummary.class));
currentAuthorSummaryDS Dataset<AuthorSummary> mergedAuthorSummaryDS = currentAuthorSummaryDS
.joinWith( .joinWith(
downloadedAuthorSummaryDS, downloadedAuthorSummaryDS,
currentAuthorSummaryDS currentAuthorSummaryDS
@ -150,18 +153,79 @@ public class SparkUpdateOrcidAuthors {
return null; return null;
}, },
Encoders.bean(AuthorSummary.class)) Encoders.bean(AuthorSummary.class))
.filter(Objects::nonNull) .filter(Objects::nonNull);
long mergedCount = mergedAuthorSummaryDS.count();
Dataset<AuthorSummary> base64DedupedDS = mergedAuthorSummaryDS.dropDuplicates("base64CompressData");
List<String> dupOids = base64DedupedDS
.groupBy("authorData.oid")
.agg(count("authorData.oid").alias("oidOccurrenceCount"))
.where("oidOccurrenceCount > 1")
.select("oid")
.toJavaRDD()
.map(row -> row.get(0).toString())
.collect();
JavaRDD<AuthorSummary> dupAuthors = base64DedupedDS
.toJavaRDD()
.filter(
authorSummary -> (Objects.nonNull(authorSummary.getAuthorData())
&& Objects.nonNull(authorSummary.getAuthorData().getOid())))
.filter(authorSummary -> dupOids.contains(authorSummary.getAuthorData().getOid()));
Dataset<AuthorSummary> dupAuthorSummaryDS = spark
.createDataset(
dupAuthors.rdd(),
Encoders.bean(AuthorSummary.class));
List<Tuple2<String, String>> lastModifiedAuthors = dupAuthorSummaryDS
.groupBy("authorData.oid")
.agg(array_max(collect_list("downloadDate")))
.map(
row -> new Tuple2<>(row.get(0).toString(), row.get(1).toString()),
Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.toJavaRDD()
.collect();
JavaRDD<AuthorSummary> lastDownloadedAuthors = base64DedupedDS
.toJavaRDD()
.filter(
authorSummary -> (Objects.nonNull(authorSummary.getAuthorData())
&& Objects.nonNull(authorSummary.getAuthorData().getOid())))
.filter(authorSummary -> {
boolean oidFound = lastModifiedAuthors
.stream()
.filter(a -> a._1().equals(authorSummary.getAuthorData().getOid()))
.count() == 1;
boolean tsFound = lastModifiedAuthors
.stream()
.filter(
a -> a._1().equals(authorSummary.getAuthorData().getOid()) &&
a._2().equals(authorSummary.getDownloadDate()))
.count() == 1;
return (oidFound && tsFound) || (!oidFound);
});
Dataset<AuthorSummary> cleanedDS = spark
.createDataset(
lastDownloadedAuthors.rdd(),
Encoders.bean(AuthorSummary.class))
.dropDuplicates("downloadDate", "authorData");
cleanedDS
.toJavaRDD() .toJavaRDD()
.map(authorSummary -> OBJECT_MAPPER.writeValueAsString(authorSummary)) .map(authorSummary -> OBJECT_MAPPER.writeValueAsString(authorSummary))
.saveAsTextFile(workingPath.concat("orcid_dataset/new_authors"), GzipCodec.class); .saveAsTextFile(workingPath.concat("orcid_dataset/new_authors"), GzipCodec.class);
long cleanedDSCount = cleanedDS.count();
logger.info("oldAuthorsFoundAcc: " + oldAuthorsFoundAcc.value().toString()); logger.info("report_oldAuthorsFoundAcc: " + oldAuthorsFoundAcc.value().toString());
logger.info("newAuthorsFoundAcc: " + newAuthorsFoundAcc.value().toString()); logger.info("report_newAuthorsFoundAcc: " + newAuthorsFoundAcc.value().toString());
logger.info("updatedAuthorsFoundAcc: " + updatedAuthorsFoundAcc.value().toString()); logger.info("report_updatedAuthorsFoundAcc: " + updatedAuthorsFoundAcc.value().toString());
logger.info("errorCodeFoundAcc: " + errorCodeAuthorsFoundAcc.value().toString()); logger.info("report_errorCodeFoundAcc: " + errorCodeAuthorsFoundAcc.value().toString());
logger.info("errorLoadingJsonFoundAcc: " + errorLoadingAuthorsJsonFoundAcc.value().toString()); logger.info("report_errorLoadingJsonFoundAcc: " + errorLoadingAuthorsJsonFoundAcc.value().toString());
logger.info("errorParsingXMLFoundAcc: " + errorParsingAuthorsXMLFoundAcc.value().toString()); logger.info("report_errorParsingXMLFoundAcc: " + errorParsingAuthorsXMLFoundAcc.value().toString());
logger.info("report_merged_count: " + mergedCount);
logger.info("report_cleaned_count: " + cleanedDSCount);
}); });
} }