From 196f36c6edd10203ff304e7cd122b3b679593618 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Thu, 30 Jul 2020 13:38:33 +0200 Subject: [PATCH] fix publication dataset creation --- .../SparkGenEnrichedOrcidWorks.java | 47 +++++-- .../orcidnodoi/oaf/PublicationToOaf.java | 117 +++++++++++++----- .../orcidnodoi/PublicationToOafTest.java | 3 +- 3 files changed, 128 insertions(+), 39 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java index b24e71615..cae5a168f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java @@ -17,10 +17,12 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonParser; @@ -93,17 +95,48 @@ public class SparkGenEnrichedOrcidWorks { .toJavaRDD(); enrichedWorksRDD.saveAsTextFile(workingPath + outputEnrichedWorksPath); logger.info("Works enriched data saved"); - JavaRDD oafPublicationRDD = enrichedWorksRDD.map(e -> { - JsonElement j = new JsonParser().parse(e._2()); - return (Publication) PublicationToOaf - .generatePublicationActionsFromDump(j.getAsJsonObject()); - }).filter(p -> p != null); + + final LongAccumulator parsedPublications = spark.sparkContext().longAccumulator("parsedPublications"); + final LongAccumulator enrichedPublications = spark + .sparkContext() + .longAccumulator("enrichedPublications"); + final LongAccumulator errorsGeneric = spark.sparkContext().longAccumulator("errorsGeneric"); + final LongAccumulator errorsInvalidTitle = spark.sparkContext().longAccumulator("errorsInvalidTitle"); + final LongAccumulator errorsNotFoundAuthors = spark + .sparkContext() + .longAccumulator("errorsNotFoundAuthors"); + final LongAccumulator errorsInvalidType = spark.sparkContext().longAccumulator("errorsInvalidType"); + final PublicationToOaf publicationToOaf = new PublicationToOaf( + parsedPublications, + enrichedPublications, + errorsGeneric, + errorsInvalidTitle, + errorsNotFoundAuthors, + errorsInvalidType); + JavaRDD oafPublicationRDD = enrichedWorksRDD + .map( + e -> { + return (Publication) publicationToOaf + .generatePublicationActionsFromJson(e._2()); + }) + .filter(p -> p != null); Dataset publicationDataset = spark .createDataset( - oafPublicationRDD.repartition(1).rdd(), + oafPublicationRDD.rdd(), Encoders.bean(Publication.class)); - publicationDataset.write().mode(SaveMode.Overwrite).save(workingPath + "no_doi_dataset/output"); + publicationDataset + .write() + .format("parquet") + .mode(SaveMode.Overwrite) + .save(workingPath + "no_doi_dataset/output"); + + logger.info("parsedPublications: " + parsedPublications.value().toString()); + logger.info("enrichedPublications: " + enrichedPublications.value().toString()); + logger.info("errorsGeneric: " + errorsGeneric.value().toString()); + logger.info("errorsInvalidTitle: " + errorsInvalidTitle.value().toString()); + logger.info("errorsNotFoundAuthors: " + errorsNotFoundAuthors.value().toString()); + logger.info("errorsInvalidType: " + errorsInvalidType.value().toString()); }); } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/oaf/PublicationToOaf.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/oaf/PublicationToOaf.java index 19bfe0f30..448fa9a74 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/oaf/PublicationToOaf.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/oaf/PublicationToOaf.java @@ -3,18 +3,17 @@ package eu.dnetlib.doiboost.orcidnodoi.oaf; import static eu.dnetlib.doiboost.orcidnodoi.util.DumpToActionsUtility.*; +import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; +import com.google.gson.*; import eu.dnetlib.dhp.common.PacePerson; import eu.dnetlib.dhp.schema.oaf.*; @@ -22,7 +21,7 @@ import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.doiboost.orcidnodoi.util.DumpToActionsUtility; import eu.dnetlib.doiboost.orcidnodoi.util.Pair; -public class PublicationToOaf { +public class PublicationToOaf implements Serializable { static Logger logger = LoggerFactory.getLogger(PublicationToOaf.class); @@ -31,6 +30,37 @@ public class PublicationToOaf { public static final String OPENAIRE_PREFIX = "openaire____"; public static final String SEPARATOR = "::"; + private final LongAccumulator parsedPublications; + private final LongAccumulator enrichedPublications; + private final LongAccumulator errorsGeneric; + private final LongAccumulator errorsInvalidTitle; + private final LongAccumulator errorsNotFoundAuthors; + private final LongAccumulator errorsInvalidType; + + public PublicationToOaf( + LongAccumulator parsedPublications, + LongAccumulator enrichedPublications, + LongAccumulator errorsGeneric, + LongAccumulator errorsInvalidTitle, + LongAccumulator errorsNotFoundAuthors, + LongAccumulator errorsInvalidType) { + this.parsedPublications = parsedPublications; + this.enrichedPublications = enrichedPublications; + this.errorsGeneric = errorsGeneric; + this.errorsInvalidTitle = errorsInvalidTitle; + this.errorsNotFoundAuthors = errorsNotFoundAuthors; + this.errorsInvalidType = errorsInvalidType; + } + + public PublicationToOaf() { + this.parsedPublications = null; + this.enrichedPublications = null; + this.errorsGeneric = null; + this.errorsInvalidTitle = null; + this.errorsNotFoundAuthors = null; + this.errorsInvalidType = null; + } + private static Map> datasources = new HashMap>() { { @@ -69,11 +99,27 @@ public class PublicationToOaf { public static final String PID_TYPES = "dnet:pid_types"; - public static Oaf generatePublicationActionsFromDump(final JsonObject rootElement) { + public Oaf generatePublicationActionsFromJson(final String json) { + try { + if (parsedPublications != null) { + parsedPublications.add(1); + } + JsonElement jElement = new JsonParser().parse(json); + JsonObject jObject = jElement.getAsJsonObject(); + return generatePublicationActionsFromDump(jObject); + } catch (Throwable t) { + logger.error("creating publication: " + t.getMessage()); + if (errorsGeneric != null) { + errorsGeneric.add(1); + } + return null; + } + } + + public Oaf generatePublicationActionsFromDump(final JsonObject rootElement) { logger.debug("generatePublicationActionsFromDump ..."); - if (!isValid(rootElement/* , context */)) { - logger.error("publication not valid"); + if (!isValid(rootElement)) { return null; } @@ -122,8 +168,9 @@ public class PublicationToOaf { // Adding titles final List titles = createRepeatedField(rootElement, "titles"); if (titles == null || titles.isEmpty()) { - logger.error("titles not found"); -// context.incrementCounter("filtered", "title_not_found", 1); + if (errorsInvalidTitle != null) { + errorsInvalidTitle.add(1); + } return null; } Qualifier q = mapQualifier("main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title"); @@ -189,8 +236,9 @@ public class PublicationToOaf { publication.setInstance(Arrays.asList(instance)); } else { - logger.error("type not found"); -// context.incrementCounter("filtered", "type_not_found", 1); + if (errorsInvalidType != null) { + errorsInvalidType.add(1); + } return null; } @@ -199,17 +247,21 @@ public class PublicationToOaf { if (authors != null && authors.size() > 0) { publication.setAuthor(authors); } else { - logger.error("authors not found"); -// context.incrementCounter("filtered", "author_not_found", 1); + if (errorsNotFoundAuthors != null) { + errorsNotFoundAuthors.add(1); + } return null; } String classValue = getDefaultResulttype(cobjValue); publication .setResulttype(mapQualifier(classValue, classValue, "dnet:result_typologies", "dnet:result_typologies")); + if (enrichedPublications != null) { + enrichedPublications.add(1); + } return publication; } - public static List createAuthors(final JsonObject root) { + public List createAuthors(final JsonObject root) { final String authorsJSONFieldName = "contributors"; @@ -273,7 +325,7 @@ public class PublicationToOaf { return null; } - private static List createRepeatedField(final JsonObject rootElement, final String fieldName) { + private List createRepeatedField(final JsonObject rootElement, final String fieldName) { if (!rootElement.has(fieldName)) { return null; } @@ -291,14 +343,14 @@ public class PublicationToOaf { } } - private static String cleanField(String value) { + private String cleanField(String value) { if (value != null && !value.isEmpty() && value.charAt(0) == '"' && value.charAt(value.length() - 1) == '"') { value = value.substring(1, value.length() - 1); } return value; } - private static void settingRelevantDate(final JsonObject rootElement, + private void settingRelevantDate(final JsonObject rootElement, final Publication publication, final String jsonKey, final String dictionaryKey, @@ -322,7 +374,7 @@ public class PublicationToOaf { } } - private static String getPublicationDate(final JsonObject rootElement, + private String getPublicationDate(final JsonObject rootElement, final String jsonKey) { JsonObject pubDateJson = null; @@ -358,24 +410,27 @@ public class PublicationToOaf { return null; } - protected static boolean isValid(final JsonObject rootElement/* , final Reporter context */) { + protected boolean isValid(final JsonObject rootElement/* , final Reporter context */) { final String type = getStringValue(rootElement, "type"); if (!typologiesMapping.containsKey(type)) { logger.error("unknowntype_" + type); -// context.incrementCounter("filtered", "unknowntype_" + type, 1); + if (errorsInvalidType != null) { + errorsInvalidType.add(1); + } return false; } if (!isValidJsonArray(rootElement, "titles")) { - logger.error("invalid_title"); -// context.incrementCounter("filtered", "invalid_title", 1); + if (errorsInvalidTitle != null) { + errorsInvalidTitle.add(1); + } return false; } return true; } - private static boolean isValidJsonArray(final JsonObject rootElement, final String fieldName) { + private boolean isValidJsonArray(final JsonObject rootElement, final String fieldName) { if (!rootElement.has(fieldName)) { return false; } @@ -395,7 +450,7 @@ public class PublicationToOaf { return true; } - private static Qualifier mapQualifier(String classId, String className, String schemeId, String schemeName) { + private Qualifier mapQualifier(String classId, String className, String schemeId, String schemeName) { final Qualifier qualifier = new Qualifier(); qualifier.setClassid(classId); qualifier.setClassname(className); @@ -404,7 +459,7 @@ public class PublicationToOaf { return qualifier; } - private static ExternalReference convertExtRef(String extId, String classId, String className, String schemeId, + private ExternalReference convertExtRef(String extId, String classId, String className, String schemeId, String schemeName) { ExternalReference ex = new ExternalReference(); ex.setRefidentifier(extId); @@ -412,7 +467,7 @@ public class PublicationToOaf { return ex; } - private static StructuredProperty mapStructuredProperty(String value, Qualifier qualifier, DataInfo dataInfo) { + private StructuredProperty mapStructuredProperty(String value, Qualifier qualifier, DataInfo dataInfo) { if (value == null | StringUtils.isBlank(value)) { return null; } @@ -424,7 +479,7 @@ public class PublicationToOaf { return structuredProperty; } - private static Field mapStringField(String value, DataInfo dataInfo) { + private Field mapStringField(String value, DataInfo dataInfo) { if (value == null || StringUtils.isBlank(value)) { return null; } @@ -435,21 +490,21 @@ public class PublicationToOaf { return stringField; } - private static KeyValue createCollectedFrom() { + private KeyValue createCollectedFrom() { KeyValue cf = new KeyValue(); cf.setValue(ORCID); cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + "806360c771262b4d6770e7cdf04b5c5a"); return cf; } - private static KeyValue createHostedBy() { + private KeyValue createHostedBy() { KeyValue hb = new KeyValue(); hb.setValue("Unknown Repository"); hb.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + "55045bd2a65019fd8e6741a755395c8c"); return hb; } - private static StructuredProperty mapAuthorId(String orcidId) { + private StructuredProperty mapAuthorId(String orcidId) { final StructuredProperty sp = new StructuredProperty(); sp.setValue(orcidId); final Qualifier q = new Qualifier(); diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcidnodoi/PublicationToOafTest.java b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcidnodoi/PublicationToOafTest.java index 39f78522f..01e26dcb4 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcidnodoi/PublicationToOafTest.java +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcidnodoi/PublicationToOafTest.java @@ -27,7 +27,8 @@ public class PublicationToOafTest { PublicationToOafTest.class.getResourceAsStream("publication.json")); JsonElement j = new JsonParser().parse(jsonPublication); logger.info("json publication loaded: " + j.toString()); - Publication oafPublication = (Publication) PublicationToOaf + PublicationToOaf publicationToOaf = new PublicationToOaf(); + Publication oafPublication = (Publication) publicationToOaf .generatePublicationActionsFromDump(j.getAsJsonObject()); assertNotNull(oafPublication.getId()); assertNotNull(oafPublication.getOriginalId());