forked from D-Net/dnet-hadoop
cleaning workflow to include cleaning of default values
This commit is contained in:
parent
52f62d5d8c
commit
d0ac7514b2
|
@ -3,9 +3,13 @@ package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
@ -19,7 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
@ -84,12 +90,83 @@ public class CleanGraphSparkJob {
|
||||||
|
|
||||||
readTableFromPath(spark, inputPath, clazz)
|
readTableFromPath(spark, inputPath, clazz)
|
||||||
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
|
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
|
||||||
|
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(outputPath);
|
.json(outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static <T extends Oaf> T fixDefaults(T value) {
|
||||||
|
if (value instanceof Datasource) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Project) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Organization) {
|
||||||
|
Organization o = (Organization) value;
|
||||||
|
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {
|
||||||
|
o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE));
|
||||||
|
}
|
||||||
|
} else if (value instanceof Relation) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Result) {
|
||||||
|
|
||||||
|
Result r = (Result) value;
|
||||||
|
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
|
||||||
|
r
|
||||||
|
.setLanguage(
|
||||||
|
qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES));
|
||||||
|
}
|
||||||
|
if (Objects.nonNull(r.getSubject())) {
|
||||||
|
r
|
||||||
|
.setSubject(
|
||||||
|
r
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
|
||||||
|
.filter(sp -> Objects.nonNull(sp.getQualifier()))
|
||||||
|
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
|
||||||
|
r
|
||||||
|
.setResourcetype(
|
||||||
|
qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
|
||||||
|
}
|
||||||
|
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
|
||||||
|
r
|
||||||
|
.setBestaccessright(
|
||||||
|
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
|
||||||
|
}
|
||||||
|
if (Objects.nonNull(r.getInstance())) {
|
||||||
|
for (Instance i : r.getInstance()) {
|
||||||
|
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
|
||||||
|
i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof Publication) {
|
||||||
|
|
||||||
|
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
|
||||||
|
|
||||||
|
} else if (value instanceof OtherResearchProduct) {
|
||||||
|
|
||||||
|
} else if (value instanceof Software) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Qualifier qualifier(String classid, String classname, String scheme) {
|
||||||
|
return OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
classid, classname, scheme, scheme);
|
||||||
|
}
|
||||||
|
|
||||||
private static <T extends Oaf> Dataset<T> readTableFromPath(
|
private static <T extends Oaf> Dataset<T> readTableFromPath(
|
||||||
SparkSession spark, String inputEntityPath, Class<T> clazz) {
|
SparkSession spark, String inputEntityPath, Class<T> clazz) {
|
||||||
|
|
||||||
|
|
|
@ -4,10 +4,13 @@ package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
|
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
|
||||||
|
|
||||||
public class CleaningRuleMap extends HashMap<Class, SerializableConsumer<Object>> implements Serializable {
|
public class CleaningRuleMap extends HashMap<Class, SerializableConsumer<Object>> implements Serializable {
|
||||||
|
|
||||||
|
@ -18,23 +21,24 @@ public class CleaningRuleMap extends HashMap<Class, SerializableConsumer<Object>
|
||||||
*/
|
*/
|
||||||
public static CleaningRuleMap create(VocabularyGroup vocabularies) {
|
public static CleaningRuleMap create(VocabularyGroup vocabularies) {
|
||||||
CleaningRuleMap mapping = new CleaningRuleMap();
|
CleaningRuleMap mapping = new CleaningRuleMap();
|
||||||
mapping.put(Qualifier.class, o -> {
|
mapping.put(Qualifier.class, o -> cleanQualifier(vocabularies, (Qualifier) o));
|
||||||
Qualifier q = (Qualifier) o;
|
mapping.put(Country.class, o -> {
|
||||||
if (vocabularies.vocabularyExists(q.getSchemeid())) {
|
final Country c = (Country) o;
|
||||||
Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid());
|
if (StringUtils.isBlank(c.getSchemeid())) {
|
||||||
q.setClassid(newValue.getClassid());
|
c.setSchemeid(ModelConstants.DNET_COUNTRY_TYPE);
|
||||||
q.setClassname(newValue.getClassname());
|
c.setSchemename(ModelConstants.DNET_COUNTRY_TYPE);
|
||||||
}
|
}
|
||||||
});
|
cleanQualifier(vocabularies, c);
|
||||||
mapping.put(StructuredProperty.class, o -> {
|
|
||||||
StructuredProperty sp = (StructuredProperty) o;
|
|
||||||
// TODO implement a policy
|
|
||||||
/*
|
|
||||||
* if (StringUtils.isBlank(sp.getValue())) { sp.setValue(null); sp.setQualifier(null); sp.setDataInfo(null);
|
|
||||||
* }
|
|
||||||
*/
|
|
||||||
});
|
});
|
||||||
return mapping;
|
return mapping;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static <Q extends Qualifier> void cleanQualifier(VocabularyGroup vocabularies, Q q) {
|
||||||
|
if (vocabularies.vocabularyExists(q.getSchemeid())) {
|
||||||
|
Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid());
|
||||||
|
q.setClassid(newValue.getClassid());
|
||||||
|
q.setClassname(newValue.getClassname());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,11 @@ public class VocabularyGroup implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean vocabularyExists(final String vocId) {
|
public boolean vocabularyExists(final String vocId) {
|
||||||
return vocs.containsKey(vocId.toLowerCase());
|
return Optional
|
||||||
|
.ofNullable(vocId)
|
||||||
|
.map(String::toLowerCase)
|
||||||
|
.map(id -> vocs.containsKey(id))
|
||||||
|
.orElse(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addSynonyms(final String vocId, final String termId, final String syn) {
|
private void addSynonyms(final String vocId, final String termId, final String syn) {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
|
||||||
|
@ -56,6 +57,9 @@ public class CleaningFunctionTest {
|
||||||
String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json"));
|
String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json"));
|
||||||
Publication p_in = MAPPER.readValue(json, Publication.class);
|
Publication p_in = MAPPER.readValue(json, Publication.class);
|
||||||
|
|
||||||
|
assertTrue(p_in instanceof Result);
|
||||||
|
assertTrue(p_in instanceof Publication);
|
||||||
|
|
||||||
Publication p_out = OafCleaner.apply(p_in, mapping);
|
Publication p_out = OafCleaner.apply(p_in, mapping);
|
||||||
|
|
||||||
assertNotNull(p_out);
|
assertNotNull(p_out);
|
||||||
|
@ -63,6 +67,9 @@ public class CleaningFunctionTest {
|
||||||
assertEquals("und", p_out.getLanguage().getClassid());
|
assertEquals("und", p_out.getLanguage().getClassid());
|
||||||
assertEquals("Undetermined", p_out.getLanguage().getClassname());
|
assertEquals("Undetermined", p_out.getLanguage().getClassname());
|
||||||
|
|
||||||
|
assertEquals("DE", p_out.getCountry().get(0).getClassid());
|
||||||
|
assertEquals("Germany", p_out.getCountry().get(0).getClassname());
|
||||||
|
|
||||||
assertEquals("0018", p_out.getInstance().get(0).getInstancetype().getClassid());
|
assertEquals("0018", p_out.getInstance().get(0).getInstancetype().getClassid());
|
||||||
assertEquals("Annotation", p_out.getInstance().get(0).getInstancetype().getClassname());
|
assertEquals("Annotation", p_out.getInstance().get(0).getInstancetype().getClassname());
|
||||||
|
|
||||||
|
|
|
@ -202,6 +202,12 @@
|
||||||
"contributor": [
|
"contributor": [
|
||||||
],
|
],
|
||||||
"country": [
|
"country": [
|
||||||
|
{
|
||||||
|
"classid": "DE",
|
||||||
|
"classname": "DE",
|
||||||
|
"schemeid": "dnet:countries",
|
||||||
|
"schemename": "dnet:countries"
|
||||||
|
}
|
||||||
],
|
],
|
||||||
"coverage": [
|
"coverage": [
|
||||||
],
|
],
|
||||||
|
|
Loading…
Reference in New Issue