diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index ae1b379061..e295b95037 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -90,169 +90,15 @@ public class CleanGraphSparkJob { final CleaningRuleMap mapping = CleaningRuleMap.create(vocs); readTableFromPath(spark, inputPath, clazz) - .map((MapFunction) value -> fixVocabularyNames(value), Encoders.bean(clazz)) + .map((MapFunction) value -> CleaningFunctions.fixVocabularyNames(value), Encoders.bean(clazz)) .map((MapFunction) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz)) - .map((MapFunction) value -> fixDefaults(value), Encoders.bean(clazz)) + .map((MapFunction) value -> CleaningFunctions.fixDefaults(value), Encoders.bean(clazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } - protected static T fixVocabularyNames(T value) { - if (value instanceof Datasource) { - // nothing to clean here - } else if (value instanceof Project) { - // nothing to clean here - } else if (value instanceof Organization) { - Organization o = (Organization) value; - if (Objects.nonNull(o.getCountry())) { - fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE); - } - } else if (value instanceof Relation) { - // nothing to clean here - } else if (value instanceof Result) { - - Result r = (Result) value; - - fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES); - fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE); - fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES); - - if (Objects.nonNull(r.getSubject())) { - r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES)); - } - if (Objects.nonNull(r.getInstance())) { - for (Instance i : r.getInstance()) { - fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES); - fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS); - } - } - if (Objects.nonNull(r.getAuthor())) { - r.getAuthor().forEach(a -> { - if (Objects.nonNull(a.getPid())) { - a.getPid().forEach(p -> { - fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES); - }); - } - }); - } - if (value instanceof Publication) { - - } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { - - } else if (value instanceof OtherResearchProduct) { - - } else if (value instanceof Software) { - - } - } - - return value; - } - - private static void fixVocabName(Qualifier q, String vocabularyName) { - if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) { - q.setSchemeid(vocabularyName); - q.setSchemename(vocabularyName); - } - } - - protected static T fixDefaults(T value) { - if (value instanceof Datasource) { - // nothing to clean here - } else if (value instanceof Project) { - // nothing to clean here - } else if (value instanceof Organization) { - Organization o = (Organization) value; - if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) { - o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE)); - } - } else if (value instanceof Relation) { - // nothing to clean here - } else if (value instanceof Result) { - - Result r = (Result) value; - if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) { - r.setPublisher(null); - } - if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) { - r - .setLanguage( - qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES)); - } - if (Objects.nonNull(r.getSubject())) { - r - .setSubject( - r - .getSubject() - .stream() - .filter(Objects::nonNull) - .filter(sp -> StringUtils.isNotBlank(sp.getValue())) - .filter(sp -> Objects.nonNull(sp.getQualifier())) - .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) - .collect(Collectors.toList())); - } - if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) { - r - .setResourcetype( - qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE)); - } - if (Objects.nonNull(r.getInstance())) { - for (Instance i : r.getInstance()) { - if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) { - i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); - } - if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) { - i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY); - } - if (Objects.isNull(i.getRefereed())) { - i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); - } - } - } - if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { - Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance()); - if (Objects.isNull(bestaccessrights)) { - r - .setBestaccessright( - qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); - } else { - r.setBestaccessright(bestaccessrights); - } - } - if (Objects.nonNull(r.getAuthor())) { - boolean nullRank = r - .getAuthor() - .stream() - .anyMatch(a -> Objects.isNull(a.getRank())); - if (nullRank) { - int i = 1; - for (Author author : r.getAuthor()) { - author.setRank(i++); - } - } - } - if (value instanceof Publication) { - - } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { - - } else if (value instanceof OtherResearchProduct) { - - } else if (value instanceof Software) { - - } - } - - return value; - } - - private static Qualifier qualifier(String classid, String classname, String scheme) { - return OafMapperUtils - .qualifier( - classid, classname, scheme, scheme); - } - private static Dataset readTableFromPath( SparkSession spark, String inputEntityPath, Class clazz) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java new file mode 100644 index 0000000000..f615d69f2f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java @@ -0,0 +1,196 @@ + +package eu.dnetlib.dhp.oa.graph.clean; + +import java.util.LinkedHashMap; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.clearspring.analytics.util.Lists; +import org.apache.commons.lang3.StringUtils; + +import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; +import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; + +public class CleaningFunctions { + + public static final String ORCID_PREFIX_REGEX = "^http(s?):\\/\\/orcid\\.org\\/"; + + public static T fixVocabularyNames(T value) { + if (value instanceof Datasource) { + // nothing to clean here + } else if (value instanceof Project) { + // nothing to clean here + } else if (value instanceof Organization) { + Organization o = (Organization) value; + if (Objects.nonNull(o.getCountry())) { + fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE); + } + } else if (value instanceof Relation) { + // nothing to clean here + } else if (value instanceof Result) { + + Result r = (Result) value; + + fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES); + fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE); + fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES); + + if (Objects.nonNull(r.getSubject())) { + r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES)); + } + if (Objects.nonNull(r.getInstance())) { + for (Instance i : r.getInstance()) { + fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES); + fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS); + } + } + if (Objects.nonNull(r.getAuthor())) { + r.getAuthor().forEach(a -> { + if (Objects.nonNull(a.getPid())) { + a.getPid().forEach(p -> { + fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES); + }); + } + }); + } + if (value instanceof Publication) { + + } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { + + } else if (value instanceof OtherResearchProduct) { + + } else if (value instanceof Software) { + + } + } + + return value; + } + + protected static T fixDefaults(T value) { + if (value instanceof Datasource) { + // nothing to clean here + } else if (value instanceof Project) { + // nothing to clean here + } else if (value instanceof Organization) { + Organization o = (Organization) value; + if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) { + o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE)); + } + } else if (value instanceof Relation) { + // nothing to clean here + } else if (value instanceof Result) { + + Result r = (Result) value; + if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) { + r.setPublisher(null); + } + if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) { + r + .setLanguage( + qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES)); + } + if (Objects.nonNull(r.getSubject())) { + r + .setSubject( + r + .getSubject() + .stream() + .filter(Objects::nonNull) + .filter(sp -> StringUtils.isNotBlank(sp.getValue())) + .filter(sp -> Objects.nonNull(sp.getQualifier())) + .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) + .collect(Collectors.toList())); + } + if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) { + r + .setResourcetype( + qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE)); + } + if (Objects.nonNull(r.getInstance())) { + for (Instance i : r.getInstance()) { + if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) { + i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); + } + if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) { + i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY); + } + if (Objects.isNull(i.getRefereed())) { + i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); + } + } + } + if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { + Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance()); + if (Objects.isNull(bestaccessrights)) { + r + .setBestaccessright( + qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); + } else { + r.setBestaccessright(bestaccessrights); + } + } + if (Objects.nonNull(r.getAuthor())) { + boolean nullRank = r + .getAuthor() + .stream() + .anyMatch(a -> Objects.isNull(a.getRank())); + if (nullRank) { + int i = 1; + for (Author author : r.getAuthor()) { + author.setRank(i++); + } + } + for(Author a : r.getAuthor()) { + if (Objects.isNull(a.getPid())) { + a.setPid(Lists.newArrayList()); + } else { + a.setPid( + a.getPid().stream() + .filter(p -> Objects.nonNull(p.getQualifier())) + .filter(p -> StringUtils.isNotBlank(p.getValue())) + .map(p -> { + p.setValue(p.getValue().trim().replaceAll(ORCID_PREFIX_REGEX, "")); + return p; + }) + .collect(Collectors.toMap(StructuredProperty::getValue, Function.identity(), (p1, p2) -> p1, LinkedHashMap::new)) + .values() + .stream() + .collect(Collectors.toList())); + } + } + + } + if (value instanceof Publication) { + + } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { + + } else if (value instanceof OtherResearchProduct) { + + } else if (value instanceof Software) { + + } + } + + return value; + } + + // HELPERS + + private static void fixVocabName(Qualifier q, String vocabularyName) { + if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) { + q.setSchemeid(vocabularyName); + q.setSchemename(vocabularyName); + } + } + + private static Qualifier qualifier(String classid, String classname, String scheme) { + return OafMapperUtils + .qualifier( + classid, classname, scheme, scheme); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql index 7bec2fe040..778e3afd21 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql @@ -1,10 +1,10 @@ DROP VIEW IF EXISTS ${hiveDbName}.result; -CREATE VIEW IF NOT EXISTS result as - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.publication p +CREATE VIEW IF NOT EXISTS ${hiveDbName}.result as + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.publication p union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.dataset d + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.dataset d union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.software s + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.software s union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.otherresearchproduct o; + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.otherresearchproduct o; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java index e1ef847c38..8a53c3a507 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java @@ -62,7 +62,7 @@ public class CleaningFunctionTest { assertTrue(p_in instanceof Result); assertTrue(p_in instanceof Publication); - Publication p_out = OafCleaner.apply(CleanGraphSparkJob.fixVocabularyNames(p_in), mapping); + Publication p_out = OafCleaner.apply(CleaningFunctions.fixVocabularyNames(p_in), mapping); assertNotNull(p_out); @@ -88,7 +88,7 @@ public class CleaningFunctionTest { .map(p -> p.getQualifier()) .allMatch(q -> pidTerms.contains(q.getClassid()))); - Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out); + Publication p_defaults = CleaningFunctions.fixDefaults(p_out); assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid()); assertNull(p_out.getPublisher()); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json index f51eed067d..5c903cd0e6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json @@ -27,6 +27,28 @@ "schemename": "dnet:pid_types" }, "value": "0000-0001-9613-6639" + }, + { + "dataInfo": { + "deletedbyinference": false, + "inferenceprovenance": "", + "inferred": false, + "invisible": false, + "provenanceaction": { + "classid": "sysimport:crosswalk:datasetarchive", + "classname": "sysimport:crosswalk:datasetarchive", + "schemeid": "dnet:provenanceActions", + "schemename": "dnet:provenanceActions" + }, + "trust": "0.9" + }, + "qualifier": { + "classid": "ORCID12", + "classname": "ORCID12", + "schemeid": "dnet:pid_types", + "schemename": "dnet:pid_types" + }, + "value": "https://orcid.org/0000-0001-9613-6639" } ], "rank": 1, @@ -91,8 +113,7 @@ ], "fullname": "Barry, Peter S.", "name": "Peter S.", - "pid": [ - ], + "pid": null, "rank": 3, "surname": "Barry" },