From e265c3e125a9db77299ce90177320c124870d044 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 1 Oct 2020 10:50:15 +0200 Subject: [PATCH] cleaning functions factored out in a dedicated class --- .../oa/graph/clean/CleanGraphSparkJob.java | 158 +--------------- .../dhp/oa/graph/clean/CleaningFunctions.java | 172 ++++++++++++++++++ .../oa/graph/clean/CleaningFunctionTest.java | 4 +- 3 files changed, 176 insertions(+), 158 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index ae1b37906..e295b9503 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -90,169 +90,15 @@ public class CleanGraphSparkJob { final CleaningRuleMap mapping = CleaningRuleMap.create(vocs); readTableFromPath(spark, inputPath, clazz) - .map((MapFunction) value -> fixVocabularyNames(value), Encoders.bean(clazz)) + .map((MapFunction) value -> CleaningFunctions.fixVocabularyNames(value), Encoders.bean(clazz)) .map((MapFunction) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz)) - .map((MapFunction) value -> fixDefaults(value), Encoders.bean(clazz)) + .map((MapFunction) value -> CleaningFunctions.fixDefaults(value), Encoders.bean(clazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } - protected static T fixVocabularyNames(T value) { - if (value instanceof Datasource) { - // nothing to clean here - } else if (value instanceof Project) { - // nothing to clean here - } else if (value instanceof Organization) { - Organization o = (Organization) value; - if (Objects.nonNull(o.getCountry())) { - fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE); - } - } else if (value instanceof Relation) { - // nothing to clean here - } else if (value instanceof Result) { - - Result r = (Result) value; - - fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES); - fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE); - fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES); - - if (Objects.nonNull(r.getSubject())) { - r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES)); - } - if (Objects.nonNull(r.getInstance())) { - for (Instance i : r.getInstance()) { - fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES); - fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS); - } - } - if (Objects.nonNull(r.getAuthor())) { - r.getAuthor().forEach(a -> { - if (Objects.nonNull(a.getPid())) { - a.getPid().forEach(p -> { - fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES); - }); - } - }); - } - if (value instanceof Publication) { - - } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { - - } else if (value instanceof OtherResearchProduct) { - - } else if (value instanceof Software) { - - } - } - - return value; - } - - private static void fixVocabName(Qualifier q, String vocabularyName) { - if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) { - q.setSchemeid(vocabularyName); - q.setSchemename(vocabularyName); - } - } - - protected static T fixDefaults(T value) { - if (value instanceof Datasource) { - // nothing to clean here - } else if (value instanceof Project) { - // nothing to clean here - } else if (value instanceof Organization) { - Organization o = (Organization) value; - if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) { - o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE)); - } - } else if (value instanceof Relation) { - // nothing to clean here - } else if (value instanceof Result) { - - Result r = (Result) value; - if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) { - r.setPublisher(null); - } - if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) { - r - .setLanguage( - qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES)); - } - if (Objects.nonNull(r.getSubject())) { - r - .setSubject( - r - .getSubject() - .stream() - .filter(Objects::nonNull) - .filter(sp -> StringUtils.isNotBlank(sp.getValue())) - .filter(sp -> Objects.nonNull(sp.getQualifier())) - .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) - .collect(Collectors.toList())); - } - if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) { - r - .setResourcetype( - qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE)); - } - if (Objects.nonNull(r.getInstance())) { - for (Instance i : r.getInstance()) { - if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) { - i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); - } - if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) { - i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY); - } - if (Objects.isNull(i.getRefereed())) { - i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); - } - } - } - if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { - Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance()); - if (Objects.isNull(bestaccessrights)) { - r - .setBestaccessright( - qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); - } else { - r.setBestaccessright(bestaccessrights); - } - } - if (Objects.nonNull(r.getAuthor())) { - boolean nullRank = r - .getAuthor() - .stream() - .anyMatch(a -> Objects.isNull(a.getRank())); - if (nullRank) { - int i = 1; - for (Author author : r.getAuthor()) { - author.setRank(i++); - } - } - } - if (value instanceof Publication) { - - } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { - - } else if (value instanceof OtherResearchProduct) { - - } else if (value instanceof Software) { - - } - } - - return value; - } - - private static Qualifier qualifier(String classid, String classname, String scheme) { - return OafMapperUtils - .qualifier( - classid, classname, scheme, scheme); - } - private static Dataset readTableFromPath( SparkSession spark, String inputEntityPath, Class clazz) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java new file mode 100644 index 000000000..3a0eace1f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java @@ -0,0 +1,172 @@ + +package eu.dnetlib.dhp.oa.graph.clean; + +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + +import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; +import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; + +public class CleaningFunctions { + + public static T fixVocabularyNames(T value) { + if (value instanceof Datasource) { + // nothing to clean here + } else if (value instanceof Project) { + // nothing to clean here + } else if (value instanceof Organization) { + Organization o = (Organization) value; + if (Objects.nonNull(o.getCountry())) { + fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE); + } + } else if (value instanceof Relation) { + // nothing to clean here + } else if (value instanceof Result) { + + Result r = (Result) value; + + fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES); + fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE); + fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES); + + if (Objects.nonNull(r.getSubject())) { + r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES)); + } + if (Objects.nonNull(r.getInstance())) { + for (Instance i : r.getInstance()) { + fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES); + fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS); + } + } + if (Objects.nonNull(r.getAuthor())) { + r.getAuthor().forEach(a -> { + if (Objects.nonNull(a.getPid())) { + a.getPid().forEach(p -> { + fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES); + }); + } + }); + } + if (value instanceof Publication) { + + } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { + + } else if (value instanceof OtherResearchProduct) { + + } else if (value instanceof Software) { + + } + } + + return value; + } + + protected static T fixDefaults(T value) { + if (value instanceof Datasource) { + // nothing to clean here + } else if (value instanceof Project) { + // nothing to clean here + } else if (value instanceof Organization) { + Organization o = (Organization) value; + if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) { + o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE)); + } + } else if (value instanceof Relation) { + // nothing to clean here + } else if (value instanceof Result) { + + Result r = (Result) value; + if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) { + r.setPublisher(null); + } + if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) { + r + .setLanguage( + qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES)); + } + if (Objects.nonNull(r.getSubject())) { + r + .setSubject( + r + .getSubject() + .stream() + .filter(Objects::nonNull) + .filter(sp -> StringUtils.isNotBlank(sp.getValue())) + .filter(sp -> Objects.nonNull(sp.getQualifier())) + .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) + .collect(Collectors.toList())); + } + if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) { + r + .setResourcetype( + qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE)); + } + if (Objects.nonNull(r.getInstance())) { + for (Instance i : r.getInstance()) { + if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) { + i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); + } + if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) { + i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY); + } + if (Objects.isNull(i.getRefereed())) { + i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); + } + } + } + if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { + Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance()); + if (Objects.isNull(bestaccessrights)) { + r + .setBestaccessright( + qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); + } else { + r.setBestaccessright(bestaccessrights); + } + } + if (Objects.nonNull(r.getAuthor())) { + boolean nullRank = r + .getAuthor() + .stream() + .anyMatch(a -> Objects.isNull(a.getRank())); + if (nullRank) { + int i = 1; + for (Author author : r.getAuthor()) { + author.setRank(i++); + } + } + } + if (value instanceof Publication) { + + } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { + + } else if (value instanceof OtherResearchProduct) { + + } else if (value instanceof Software) { + + } + } + + return value; + } + + // HELPERS + + private static void fixVocabName(Qualifier q, String vocabularyName) { + if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) { + q.setSchemeid(vocabularyName); + q.setSchemename(vocabularyName); + } + } + + private static Qualifier qualifier(String classid, String classname, String scheme) { + return OafMapperUtils + .qualifier( + classid, classname, scheme, scheme); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java index e1ef847c3..8a53c3a50 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java @@ -62,7 +62,7 @@ public class CleaningFunctionTest { assertTrue(p_in instanceof Result); assertTrue(p_in instanceof Publication); - Publication p_out = OafCleaner.apply(CleanGraphSparkJob.fixVocabularyNames(p_in), mapping); + Publication p_out = OafCleaner.apply(CleaningFunctions.fixVocabularyNames(p_in), mapping); assertNotNull(p_out); @@ -88,7 +88,7 @@ public class CleaningFunctionTest { .map(p -> p.getQualifier()) .allMatch(q -> pidTerms.contains(q.getClassid()))); - Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out); + Publication p_defaults = CleaningFunctions.fixDefaults(p_out); assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid()); assertNull(p_out.getPublisher());