fix publication dataset creation

This commit is contained in:
Enrico Ottonello 2020-07-30 13:38:33 +02:00
parent c82b15b5f4
commit 196f36c6ed
3 changed files with 128 additions and 39 deletions

View File

@ -17,10 +17,12 @@ import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
@ -93,17 +95,48 @@ public class SparkGenEnrichedOrcidWorks {
.toJavaRDD(); .toJavaRDD();
enrichedWorksRDD.saveAsTextFile(workingPath + outputEnrichedWorksPath); enrichedWorksRDD.saveAsTextFile(workingPath + outputEnrichedWorksPath);
logger.info("Works enriched data saved"); logger.info("Works enriched data saved");
JavaRDD<Publication> oafPublicationRDD = enrichedWorksRDD.map(e -> {
JsonElement j = new JsonParser().parse(e._2()); final LongAccumulator parsedPublications = spark.sparkContext().longAccumulator("parsedPublications");
return (Publication) PublicationToOaf final LongAccumulator enrichedPublications = spark
.generatePublicationActionsFromDump(j.getAsJsonObject()); .sparkContext()
}).filter(p -> p != null); .longAccumulator("enrichedPublications");
final LongAccumulator errorsGeneric = spark.sparkContext().longAccumulator("errorsGeneric");
final LongAccumulator errorsInvalidTitle = spark.sparkContext().longAccumulator("errorsInvalidTitle");
final LongAccumulator errorsNotFoundAuthors = spark
.sparkContext()
.longAccumulator("errorsNotFoundAuthors");
final LongAccumulator errorsInvalidType = spark.sparkContext().longAccumulator("errorsInvalidType");
final PublicationToOaf publicationToOaf = new PublicationToOaf(
parsedPublications,
enrichedPublications,
errorsGeneric,
errorsInvalidTitle,
errorsNotFoundAuthors,
errorsInvalidType);
JavaRDD<Publication> oafPublicationRDD = enrichedWorksRDD
.map(
e -> {
return (Publication) publicationToOaf
.generatePublicationActionsFromJson(e._2());
})
.filter(p -> p != null);
Dataset<Publication> publicationDataset = spark Dataset<Publication> publicationDataset = spark
.createDataset( .createDataset(
oafPublicationRDD.repartition(1).rdd(), oafPublicationRDD.rdd(),
Encoders.bean(Publication.class)); Encoders.bean(Publication.class));
publicationDataset.write().mode(SaveMode.Overwrite).save(workingPath + "no_doi_dataset/output"); publicationDataset
.write()
.format("parquet")
.mode(SaveMode.Overwrite)
.save(workingPath + "no_doi_dataset/output");
logger.info("parsedPublications: " + parsedPublications.value().toString());
logger.info("enrichedPublications: " + enrichedPublications.value().toString());
logger.info("errorsGeneric: " + errorsGeneric.value().toString());
logger.info("errorsInvalidTitle: " + errorsInvalidTitle.value().toString());
logger.info("errorsNotFoundAuthors: " + errorsNotFoundAuthors.value().toString());
logger.info("errorsInvalidType: " + errorsInvalidType.value().toString());
}); });
} }

View File

@ -3,18 +3,17 @@ package eu.dnetlib.doiboost.orcidnodoi.oaf;
import static eu.dnetlib.doiboost.orcidnodoi.util.DumpToActionsUtility.*; import static eu.dnetlib.doiboost.orcidnodoi.util.DumpToActionsUtility.*;
import java.io.Serializable;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.gson.Gson; import com.google.gson.*;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import eu.dnetlib.dhp.common.PacePerson; import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
@ -22,7 +21,7 @@ import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.doiboost.orcidnodoi.util.DumpToActionsUtility; import eu.dnetlib.doiboost.orcidnodoi.util.DumpToActionsUtility;
import eu.dnetlib.doiboost.orcidnodoi.util.Pair; import eu.dnetlib.doiboost.orcidnodoi.util.Pair;
public class PublicationToOaf { public class PublicationToOaf implements Serializable {
static Logger logger = LoggerFactory.getLogger(PublicationToOaf.class); static Logger logger = LoggerFactory.getLogger(PublicationToOaf.class);
@ -31,6 +30,37 @@ public class PublicationToOaf {
public static final String OPENAIRE_PREFIX = "openaire____"; public static final String OPENAIRE_PREFIX = "openaire____";
public static final String SEPARATOR = "::"; public static final String SEPARATOR = "::";
private final LongAccumulator parsedPublications;
private final LongAccumulator enrichedPublications;
private final LongAccumulator errorsGeneric;
private final LongAccumulator errorsInvalidTitle;
private final LongAccumulator errorsNotFoundAuthors;
private final LongAccumulator errorsInvalidType;
public PublicationToOaf(
LongAccumulator parsedPublications,
LongAccumulator enrichedPublications,
LongAccumulator errorsGeneric,
LongAccumulator errorsInvalidTitle,
LongAccumulator errorsNotFoundAuthors,
LongAccumulator errorsInvalidType) {
this.parsedPublications = parsedPublications;
this.enrichedPublications = enrichedPublications;
this.errorsGeneric = errorsGeneric;
this.errorsInvalidTitle = errorsInvalidTitle;
this.errorsNotFoundAuthors = errorsNotFoundAuthors;
this.errorsInvalidType = errorsInvalidType;
}
public PublicationToOaf() {
this.parsedPublications = null;
this.enrichedPublications = null;
this.errorsGeneric = null;
this.errorsInvalidTitle = null;
this.errorsNotFoundAuthors = null;
this.errorsInvalidType = null;
}
private static Map<String, Pair<String, String>> datasources = new HashMap<String, Pair<String, String>>() { private static Map<String, Pair<String, String>> datasources = new HashMap<String, Pair<String, String>>() {
{ {
@ -69,11 +99,27 @@ public class PublicationToOaf {
public static final String PID_TYPES = "dnet:pid_types"; public static final String PID_TYPES = "dnet:pid_types";
public static Oaf generatePublicationActionsFromDump(final JsonObject rootElement) { public Oaf generatePublicationActionsFromJson(final String json) {
try {
if (parsedPublications != null) {
parsedPublications.add(1);
}
JsonElement jElement = new JsonParser().parse(json);
JsonObject jObject = jElement.getAsJsonObject();
return generatePublicationActionsFromDump(jObject);
} catch (Throwable t) {
logger.error("creating publication: " + t.getMessage());
if (errorsGeneric != null) {
errorsGeneric.add(1);
}
return null;
}
}
public Oaf generatePublicationActionsFromDump(final JsonObject rootElement) {
logger.debug("generatePublicationActionsFromDump ..."); logger.debug("generatePublicationActionsFromDump ...");
if (!isValid(rootElement/* , context */)) { if (!isValid(rootElement)) {
logger.error("publication not valid");
return null; return null;
} }
@ -122,8 +168,9 @@ public class PublicationToOaf {
// Adding titles // Adding titles
final List<String> titles = createRepeatedField(rootElement, "titles"); final List<String> titles = createRepeatedField(rootElement, "titles");
if (titles == null || titles.isEmpty()) { if (titles == null || titles.isEmpty()) {
logger.error("titles not found"); if (errorsInvalidTitle != null) {
// context.incrementCounter("filtered", "title_not_found", 1); errorsInvalidTitle.add(1);
}
return null; return null;
} }
Qualifier q = mapQualifier("main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title"); Qualifier q = mapQualifier("main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title");
@ -189,8 +236,9 @@ public class PublicationToOaf {
publication.setInstance(Arrays.asList(instance)); publication.setInstance(Arrays.asList(instance));
} else { } else {
logger.error("type not found"); if (errorsInvalidType != null) {
// context.incrementCounter("filtered", "type_not_found", 1); errorsInvalidType.add(1);
}
return null; return null;
} }
@ -199,17 +247,21 @@ public class PublicationToOaf {
if (authors != null && authors.size() > 0) { if (authors != null && authors.size() > 0) {
publication.setAuthor(authors); publication.setAuthor(authors);
} else { } else {
logger.error("authors not found"); if (errorsNotFoundAuthors != null) {
// context.incrementCounter("filtered", "author_not_found", 1); errorsNotFoundAuthors.add(1);
}
return null; return null;
} }
String classValue = getDefaultResulttype(cobjValue); String classValue = getDefaultResulttype(cobjValue);
publication publication
.setResulttype(mapQualifier(classValue, classValue, "dnet:result_typologies", "dnet:result_typologies")); .setResulttype(mapQualifier(classValue, classValue, "dnet:result_typologies", "dnet:result_typologies"));
if (enrichedPublications != null) {
enrichedPublications.add(1);
}
return publication; return publication;
} }
public static List<Author> createAuthors(final JsonObject root) { public List<Author> createAuthors(final JsonObject root) {
final String authorsJSONFieldName = "contributors"; final String authorsJSONFieldName = "contributors";
@ -273,7 +325,7 @@ public class PublicationToOaf {
return null; return null;
} }
private static List<String> createRepeatedField(final JsonObject rootElement, final String fieldName) { private List<String> createRepeatedField(final JsonObject rootElement, final String fieldName) {
if (!rootElement.has(fieldName)) { if (!rootElement.has(fieldName)) {
return null; return null;
} }
@ -291,14 +343,14 @@ public class PublicationToOaf {
} }
} }
private static String cleanField(String value) { private String cleanField(String value) {
if (value != null && !value.isEmpty() && value.charAt(0) == '"' && value.charAt(value.length() - 1) == '"') { if (value != null && !value.isEmpty() && value.charAt(0) == '"' && value.charAt(value.length() - 1) == '"') {
value = value.substring(1, value.length() - 1); value = value.substring(1, value.length() - 1);
} }
return value; return value;
} }
private static void settingRelevantDate(final JsonObject rootElement, private void settingRelevantDate(final JsonObject rootElement,
final Publication publication, final Publication publication,
final String jsonKey, final String jsonKey,
final String dictionaryKey, final String dictionaryKey,
@ -322,7 +374,7 @@ public class PublicationToOaf {
} }
} }
private static String getPublicationDate(final JsonObject rootElement, private String getPublicationDate(final JsonObject rootElement,
final String jsonKey) { final String jsonKey) {
JsonObject pubDateJson = null; JsonObject pubDateJson = null;
@ -358,24 +410,27 @@ public class PublicationToOaf {
return null; return null;
} }
protected static boolean isValid(final JsonObject rootElement/* , final Reporter context */) { protected boolean isValid(final JsonObject rootElement/* , final Reporter context */) {
final String type = getStringValue(rootElement, "type"); final String type = getStringValue(rootElement, "type");
if (!typologiesMapping.containsKey(type)) { if (!typologiesMapping.containsKey(type)) {
logger.error("unknowntype_" + type); logger.error("unknowntype_" + type);
// context.incrementCounter("filtered", "unknowntype_" + type, 1); if (errorsInvalidType != null) {
errorsInvalidType.add(1);
}
return false; return false;
} }
if (!isValidJsonArray(rootElement, "titles")) { if (!isValidJsonArray(rootElement, "titles")) {
logger.error("invalid_title"); if (errorsInvalidTitle != null) {
// context.incrementCounter("filtered", "invalid_title", 1); errorsInvalidTitle.add(1);
}
return false; return false;
} }
return true; return true;
} }
private static boolean isValidJsonArray(final JsonObject rootElement, final String fieldName) { private boolean isValidJsonArray(final JsonObject rootElement, final String fieldName) {
if (!rootElement.has(fieldName)) { if (!rootElement.has(fieldName)) {
return false; return false;
} }
@ -395,7 +450,7 @@ public class PublicationToOaf {
return true; return true;
} }
private static Qualifier mapQualifier(String classId, String className, String schemeId, String schemeName) { private Qualifier mapQualifier(String classId, String className, String schemeId, String schemeName) {
final Qualifier qualifier = new Qualifier(); final Qualifier qualifier = new Qualifier();
qualifier.setClassid(classId); qualifier.setClassid(classId);
qualifier.setClassname(className); qualifier.setClassname(className);
@ -404,7 +459,7 @@ public class PublicationToOaf {
return qualifier; return qualifier;
} }
private static ExternalReference convertExtRef(String extId, String classId, String className, String schemeId, private ExternalReference convertExtRef(String extId, String classId, String className, String schemeId,
String schemeName) { String schemeName) {
ExternalReference ex = new ExternalReference(); ExternalReference ex = new ExternalReference();
ex.setRefidentifier(extId); ex.setRefidentifier(extId);
@ -412,7 +467,7 @@ public class PublicationToOaf {
return ex; return ex;
} }
private static StructuredProperty mapStructuredProperty(String value, Qualifier qualifier, DataInfo dataInfo) { private StructuredProperty mapStructuredProperty(String value, Qualifier qualifier, DataInfo dataInfo) {
if (value == null | StringUtils.isBlank(value)) { if (value == null | StringUtils.isBlank(value)) {
return null; return null;
} }
@ -424,7 +479,7 @@ public class PublicationToOaf {
return structuredProperty; return structuredProperty;
} }
private static Field<String> mapStringField(String value, DataInfo dataInfo) { private Field<String> mapStringField(String value, DataInfo dataInfo) {
if (value == null || StringUtils.isBlank(value)) { if (value == null || StringUtils.isBlank(value)) {
return null; return null;
} }
@ -435,21 +490,21 @@ public class PublicationToOaf {
return stringField; return stringField;
} }
private static KeyValue createCollectedFrom() { private KeyValue createCollectedFrom() {
KeyValue cf = new KeyValue(); KeyValue cf = new KeyValue();
cf.setValue(ORCID); cf.setValue(ORCID);
cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + "806360c771262b4d6770e7cdf04b5c5a"); cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + "806360c771262b4d6770e7cdf04b5c5a");
return cf; return cf;
} }
private static KeyValue createHostedBy() { private KeyValue createHostedBy() {
KeyValue hb = new KeyValue(); KeyValue hb = new KeyValue();
hb.setValue("Unknown Repository"); hb.setValue("Unknown Repository");
hb.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + "55045bd2a65019fd8e6741a755395c8c"); hb.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + "55045bd2a65019fd8e6741a755395c8c");
return hb; return hb;
} }
private static StructuredProperty mapAuthorId(String orcidId) { private StructuredProperty mapAuthorId(String orcidId) {
final StructuredProperty sp = new StructuredProperty(); final StructuredProperty sp = new StructuredProperty();
sp.setValue(orcidId); sp.setValue(orcidId);
final Qualifier q = new Qualifier(); final Qualifier q = new Qualifier();

View File

@ -27,7 +27,8 @@ public class PublicationToOafTest {
PublicationToOafTest.class.getResourceAsStream("publication.json")); PublicationToOafTest.class.getResourceAsStream("publication.json"));
JsonElement j = new JsonParser().parse(jsonPublication); JsonElement j = new JsonParser().parse(jsonPublication);
logger.info("json publication loaded: " + j.toString()); logger.info("json publication loaded: " + j.toString());
Publication oafPublication = (Publication) PublicationToOaf PublicationToOaf publicationToOaf = new PublicationToOaf();
Publication oafPublication = (Publication) publicationToOaf
.generatePublicationActionsFromDump(j.getAsJsonObject()); .generatePublicationActionsFromDump(j.getAsJsonObject());
assertNotNull(oafPublication.getId()); assertNotNull(oafPublication.getId());
assertNotNull(oafPublication.getOriginalId()); assertNotNull(oafPublication.getOriginalId());