From d0ac7514b225d08a89159fa59e2ad3a79bacaf42 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 18 Jun 2020 19:37:25 +0200 Subject: [PATCH] cleaning workflow to include cleaning of default values --- .../oa/graph/clean/CleanGraphSparkJob.java | 77 +++++++++++++++++++ .../dhp/oa/graph/clean/CleaningRuleMap.java | 34 ++++---- .../oa/graph/raw/common/VocabularyGroup.java | 6 +- .../oa/graph/clean/CleaningFunctionTest.java | 7 ++ .../eu/dnetlib/dhp/oa/graph/clean/result.json | 6 ++ 5 files changed, 114 insertions(+), 16 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index b2c7152d5f..c908988142 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -3,9 +3,13 @@ package eu.dnetlib.dhp.oa.graph.clean; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.BufferedInputStream; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -19,7 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -84,12 +90,83 @@ public class CleanGraphSparkJob { readTableFromPath(spark, inputPath, clazz) .map((MapFunction) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz)) + .map((MapFunction) value -> fixDefaults(value), Encoders.bean(clazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } + private static T fixDefaults(T value) { + if (value instanceof Datasource) { + // nothing to clean here + } else if (value instanceof Project) { + // nothing to clean here + } else if (value instanceof Organization) { + Organization o = (Organization) value; + if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) { + o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE)); + } + } else if (value instanceof Relation) { + // nothing to clean here + } else if (value instanceof Result) { + + Result r = (Result) value; + if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) { + r + .setLanguage( + qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES)); + } + if (Objects.nonNull(r.getSubject())) { + r + .setSubject( + r + .getSubject() + .stream() + .filter(Objects::nonNull) + .filter(sp -> StringUtils.isNotBlank(sp.getValue())) + .filter(sp -> Objects.nonNull(sp.getQualifier())) + .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) + .collect(Collectors.toList())); + } + if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) { + r + .setResourcetype( + qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE)); + } + if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { + r + .setBestaccessright( + qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); + } + if (Objects.nonNull(r.getInstance())) { + for (Instance i : r.getInstance()) { + if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) { + i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); + } + } + } + + if (value instanceof Publication) { + + } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { + + } else if (value instanceof OtherResearchProduct) { + + } else if (value instanceof Software) { + + } + } + + return value; + } + + private static Qualifier qualifier(String classid, String classname, String scheme) { + return OafMapperUtils + .qualifier( + classid, classname, scheme, scheme); + } + private static Dataset readTableFromPath( SparkSession spark, String inputEntityPath, Class clazz) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java index 8006f73009..d2d4e118fd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java @@ -4,10 +4,13 @@ package eu.dnetlib.dhp.oa.graph.clean; import java.io.Serializable; import java.util.HashMap; +import org.apache.commons.lang3.StringUtils; + import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Country; import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class CleaningRuleMap extends HashMap> implements Serializable { @@ -18,23 +21,24 @@ public class CleaningRuleMap extends HashMap */ public static CleaningRuleMap create(VocabularyGroup vocabularies) { CleaningRuleMap mapping = new CleaningRuleMap(); - mapping.put(Qualifier.class, o -> { - Qualifier q = (Qualifier) o; - if (vocabularies.vocabularyExists(q.getSchemeid())) { - Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid()); - q.setClassid(newValue.getClassid()); - q.setClassname(newValue.getClassname()); + mapping.put(Qualifier.class, o -> cleanQualifier(vocabularies, (Qualifier) o)); + mapping.put(Country.class, o -> { + final Country c = (Country) o; + if (StringUtils.isBlank(c.getSchemeid())) { + c.setSchemeid(ModelConstants.DNET_COUNTRY_TYPE); + c.setSchemename(ModelConstants.DNET_COUNTRY_TYPE); } - }); - mapping.put(StructuredProperty.class, o -> { - StructuredProperty sp = (StructuredProperty) o; - // TODO implement a policy - /* - * if (StringUtils.isBlank(sp.getValue())) { sp.setValue(null); sp.setQualifier(null); sp.setDataInfo(null); - * } - */ + cleanQualifier(vocabularies, c); }); return mapping; } + private static void cleanQualifier(VocabularyGroup vocabularies, Q q) { + if (vocabularies.vocabularyExists(q.getSchemeid())) { + Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid()); + q.setClassid(newValue.getClassid()); + q.setClassname(newValue.getClassname()); + } + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java index d9ff62596a..334339d3be 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java @@ -122,7 +122,11 @@ public class VocabularyGroup implements Serializable { } public boolean vocabularyExists(final String vocId) { - return vocs.containsKey(vocId.toLowerCase()); + return Optional + .ofNullable(vocId) + .map(String::toLowerCase) + .map(id -> vocs.containsKey(id)) + .orElse(false); } private void addSynonyms(final String vocId, final String termId, final String syn) { diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java index 1b21ce2d37..4783aa81f6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -56,6 +57,9 @@ public class CleaningFunctionTest { String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json")); Publication p_in = MAPPER.readValue(json, Publication.class); + assertTrue(p_in instanceof Result); + assertTrue(p_in instanceof Publication); + Publication p_out = OafCleaner.apply(p_in, mapping); assertNotNull(p_out); @@ -63,6 +67,9 @@ public class CleaningFunctionTest { assertEquals("und", p_out.getLanguage().getClassid()); assertEquals("Undetermined", p_out.getLanguage().getClassname()); + assertEquals("DE", p_out.getCountry().get(0).getClassid()); + assertEquals("Germany", p_out.getCountry().get(0).getClassname()); + assertEquals("0018", p_out.getInstance().get(0).getInstancetype().getClassid()); assertEquals("Annotation", p_out.getInstance().get(0).getInstancetype().getClassname()); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json index b63a12f619..2c1d5017d6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json @@ -202,6 +202,12 @@ "contributor": [ ], "country": [ + { + "classid": "DE", + "classname": "DE", + "schemeid": "dnet:countries", + "schemename": "dnet:countries" + } ], "coverage": [ ],