From b0b6c6bd4730ac9b5c2dd7d182edd085d5e15de9 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 28 Jul 2020 14:59:14 +0200 Subject: [PATCH] WIP dhp-workflows-common --- dhp-workflows/dhp-workflows-common/pom.xml | 32 ++++ .../dnetlib/dhp/common/CleaningRuleMap.java | 43 ++++++ .../eu/dnetlib/dhp/common/GraphSupport.java | 32 ++++ .../eu/dnetlib/dhp/common/OafCleaner.java | 82 ++++++++++ .../java/eu/dnetlib/dhp/common/SaveMode.java | 7 + .../eu/dnetlib/dhp/common/Vocabulary.java | 84 ++++++++++ .../dnetlib/dhp/common/VocabularyGroup.java | 143 ++++++++++++++++++ .../eu/dnetlib/dhp/common/VocabularyTerm.java | 24 +++ 8 files changed, 447 insertions(+) create mode 100644 dhp-workflows/dhp-workflows-common/pom.xml create mode 100644 dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/CleaningRuleMap.java create mode 100644 dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java create mode 100644 dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/OafCleaner.java create mode 100644 dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/SaveMode.java create mode 100644 dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/Vocabulary.java create mode 100644 dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyGroup.java create mode 100644 dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyTerm.java diff --git a/dhp-workflows/dhp-workflows-common/pom.xml b/dhp-workflows/dhp-workflows-common/pom.xml new file mode 100644 index 000000000..bd01d2c9f --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/pom.xml @@ -0,0 +1,32 @@ + + + + dhp-workflows + eu.dnetlib.dhp + 1.2.4-SNAPSHOT + + 4.0.0 + dhp-workflows-common + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + eu.dnetlib.dhp + dhp-schemas + ${project.version} + + + + diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/CleaningRuleMap.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/CleaningRuleMap.java new file mode 100644 index 000000000..630f8be5b --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/CleaningRuleMap.java @@ -0,0 +1,43 @@ + +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer; +import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Country; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.util.HashMap; + +public class CleaningRuleMap extends HashMap> implements Serializable { + + /** + * Creates the mapping for the Oaf types subject to cleaning + * + * @param vocabularies + */ + public static CleaningRuleMap create(VocabularyGroup vocabularies) { + CleaningRuleMap mapping = new CleaningRuleMap(); + mapping.put(Qualifier.class, o -> cleanQualifier(vocabularies, (Qualifier) o)); + mapping.put(Country.class, o -> { + final Country c = (Country) o; + if (StringUtils.isBlank(c.getSchemeid())) { + c.setSchemeid(ModelConstants.DNET_COUNTRY_TYPE); + c.setSchemename(ModelConstants.DNET_COUNTRY_TYPE); + } + cleanQualifier(vocabularies, c); + }); + return mapping; + } + + private static void cleanQualifier(VocabularyGroup vocabularies, Q q) { + if (vocabularies.vocabularyExists(q.getSchemeid())) { + Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid()); + q.setClassid(newValue.getClassid()); + q.setClassname(newValue.getClassname()); + } + } + +} diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java new file mode 100644 index 000000000..66c0db4ac --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java @@ -0,0 +1,32 @@ +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GraphSupport { + + private static final Logger log = LoggerFactory.getLogger(GraphSupport.class); + + private static void saveGraphTable(Dataset dataset, Class clazz, String outputGraph, eu.dnetlib.dhp.common.SaveMode saveMode) { + + log.info("saving graph in {} mode to {}", outputGraph, saveMode.toString()); + + final DataFrameWriter writer = dataset.write().mode(SaveMode.Overwrite); + switch (saveMode) { + case JSON: + writer.option("compression", "gzip").json(outputGraph); + break; + case PARQUET: + final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz); + writer.saveAsTable(db_table); + break; + } + + } + +} diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/OafCleaner.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/OafCleaner.java new file mode 100644 index 000000000..e7e3570d7 --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/OafCleaner.java @@ -0,0 +1,82 @@ + +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.schema.oaf.Oaf; + +import java.io.Serializable; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + +public class OafCleaner implements Serializable { + + public static E apply(E oaf, CleaningRuleMap mapping) { + try { + navigate(oaf, mapping); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + return oaf; + } + + private static void navigate(Object o, CleaningRuleMap mapping) throws IllegalAccessException { + if (isPrimitive(o)) { + return; + } else if (isIterable(o.getClass())) { + for (final Object elem : (Iterable) o) { + navigate(elem, mapping); + } + } else if (hasMapping(o, mapping)) { + mapping.get(o.getClass()).accept(o); + } else { + for (final Field f : getAllFields(o.getClass())) { + f.setAccessible(true); + final Object val = f.get(o); + if (!isPrimitive(val) && hasMapping(val, mapping)) { + mapping.get(val.getClass()).accept(val); + } else { + navigate(f.get(o), mapping); + } + } + } + } + + private static boolean hasMapping(Object o, CleaningRuleMap mapping) { + return mapping.containsKey(o.getClass()); + } + + private static boolean isIterable(final Class cl) { + return Iterable.class.isAssignableFrom(cl); + } + + private static boolean isPrimitive(Object o) { + return Objects.isNull(o) + || o.getClass().isPrimitive() + || o instanceof Class + || o instanceof Integer + || o instanceof Double + || o instanceof Float + || o instanceof Long + || o instanceof Boolean + || o instanceof String + || o instanceof Byte; + } + + private static List getAllFields(Class clazz) { + return getAllFields(new LinkedList<>(), clazz); + } + + private static List getAllFields(List fields, Class clazz) { + fields.addAll(Arrays.asList(clazz.getDeclaredFields())); + + final Class superclass = clazz.getSuperclass(); + if (Objects.nonNull(superclass) && superclass.getPackage().equals(Oaf.class.getPackage())) { + getAllFields(fields, superclass); + } + + return fields; + } + +} diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/SaveMode.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/SaveMode.java new file mode 100644 index 000000000..d368827df --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/SaveMode.java @@ -0,0 +1,7 @@ +package eu.dnetlib.dhp.common; + +public enum SaveMode { + + JSON, PARQUET + +} diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/Vocabulary.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/Vocabulary.java new file mode 100644 index 000000000..4cedb7455 --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/Vocabulary.java @@ -0,0 +1,84 @@ + +package eu.dnetlib.dhp.common; + +import com.google.common.collect.Maps; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class Vocabulary implements Serializable { + + private final String id; + private final String name; + + /** + * Code to Term mappings for this Vocabulary. + */ + private final Map terms = new HashMap<>(); + + /** + * Synonym to Code mappings for this Vocabulary. + */ + private final Map synonyms = Maps.newHashMap(); + + public Vocabulary(final String id, final String name) { + this.id = id; + this.name = name; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + protected Map getTerms() { + return terms; + } + + public VocabularyTerm getTerm(final String id) { + return Optional.ofNullable(id).map(s -> s.toLowerCase()).map(s -> terms.get(s)).orElse(null); + } + + protected void addTerm(final String id, final String name) { + terms.put(id.toLowerCase(), new VocabularyTerm(id, name)); + } + + protected boolean termExists(final String id) { + return terms.containsKey(id.toLowerCase()); + } + + protected void addSynonym(final String syn, final String termCode) { + synonyms.put(syn, termCode.toLowerCase()); + } + + public VocabularyTerm getTermBySynonym(final String syn) { + return getTerm(synonyms.get(syn.toLowerCase())); + } + + public Qualifier getTermAsQualifier(final String termId) { + if (StringUtils.isBlank(termId)) { + return OafMapperUtils.unknown(getId(), getName()); + } else if (termExists(termId)) { + final VocabularyTerm t = getTerm(termId); + return OafMapperUtils.qualifier(t.getId(), t.getName(), getId(), getName()); + } else { + return OafMapperUtils.qualifier(termId, termId, getId(), getName()); + } + } + + public Qualifier getSynonymAsQualifier(final String syn) { + return Optional + .ofNullable(getTermBySynonym(syn)) + .map(term -> getTermAsQualifier(term.getId())) + .orElse(null); + // .orElse(OafMapperUtils.unknown(getId(), getName())); + } + +} diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyGroup.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyGroup.java new file mode 100644 index 000000000..68c8427a7 --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyGroup.java @@ -0,0 +1,143 @@ + +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +public class VocabularyGroup implements Serializable { + + public static final String VOCABULARIES_XQUERY = "for $x in collection('/db/DRIVER/VocabularyDSResources/VocabularyDSResourceType') \n" + + + "let $vocid := $x//VOCABULARY_NAME/@code\n" + + "let $vocname := $x//VOCABULARY_NAME/text()\n" + + "for $term in ($x//TERM)\n" + + "return concat($vocid,' @=@ ',$vocname,' @=@ ',$term/@code,' @=@ ',$term/@english_name)"; + + public static final String VOCABULARY_SYNONYMS_XQUERY = "for $x in collection('/db/DRIVER/VocabularyDSResources/VocabularyDSResourceType')\n" + + + "let $vocid := $x//VOCABULARY_NAME/@code\n" + + "let $vocname := $x//VOCABULARY_NAME/text()\n" + + "for $term in ($x//TERM)\n" + + "for $syn in ($term//SYNONYM/@term)\n" + + "return concat($vocid,' @=@ ',$term/@code,' @=@ ', $syn)\n"; + + public static VocabularyGroup loadVocsFromIS(ISLookUpService isLookUpService) throws ISLookUpException { + + final VocabularyGroup vocs = new VocabularyGroup(); + + for (final String s : isLookUpService.quickSearchProfile(VOCABULARIES_XQUERY)) { + final String[] arr = s.split("@=@"); + if (arr.length == 4) { + final String vocId = arr[0].trim(); + final String vocName = arr[1].trim(); + final String termId = arr[2].trim(); + final String termName = arr[3].trim(); + + if (!vocs.vocabularyExists(vocId)) { + vocs.addVocabulary(vocId, vocName); + } + + vocs.addTerm(vocId, termId, termName); + // vocs.addSynonyms(vocId, termId, termId); + } + } + + for (final String s : isLookUpService.quickSearchProfile(VOCABULARY_SYNONYMS_XQUERY)) { + final String[] arr = s.split("@=@"); + if (arr.length == 3) { + final String vocId = arr[0].trim(); + final String termId = arr[1].trim(); + final String syn = arr[2].trim(); + + vocs.addSynonyms(vocId, termId, syn); + // vocs.addSynonyms(vocId, termId, termId); + } + } + + return vocs; + } + + private final Map vocs = new HashMap<>(); + + public void addVocabulary(final String id, final String name) { + vocs.put(id.toLowerCase(), new Vocabulary(id, name)); + } + + public void addTerm(final String vocId, final String id, final String name) { + if (vocabularyExists(vocId)) { + vocs.get(vocId.toLowerCase()).addTerm(id, name); + } + } + + public VocabularyTerm getTerm(final String vocId, final String id) { + if (termExists(vocId, id)) { + return vocs.get(vocId.toLowerCase()).getTerm(id); + } else { + return new VocabularyTerm(id, id); + } + } + + public Set getTerms(String vocId) { + if (!vocabularyExists(vocId)) { + return new HashSet<>(); + } + return vocs + .get(vocId.toLowerCase()) + .getTerms() + .values() + .stream() + .map(t -> t.getId()) + .collect(Collectors.toCollection(HashSet::new)); + } + + public Qualifier lookup(String vocId, String id) { + return Optional + .ofNullable(getSynonymAsQualifier(vocId, id)) + .orElse(getTermAsQualifier(vocId, id)); + } + + public Qualifier getTermAsQualifier(final String vocId, final String id) { + if (vocabularyExists(vocId)) { + return vocs.get(vocId.toLowerCase()).getTermAsQualifier(id); + } + return OafMapperUtils.qualifier(id, id, "", ""); + } + + public Qualifier getSynonymAsQualifier(final String vocId, final String syn) { + if (StringUtils.isBlank(vocId)) { + return OafMapperUtils.unknown("", ""); + } + return vocs.get(vocId.toLowerCase()).getSynonymAsQualifier(syn); + } + + public boolean termExists(final String vocId, final String id) { + return vocabularyExists(vocId) && vocs.get(vocId.toLowerCase()).termExists(id); + } + + public boolean vocabularyExists(final String vocId) { + return Optional + .ofNullable(vocId) + .map(String::toLowerCase) + .map(id -> vocs.containsKey(id)) + .orElse(false); + } + + private void addSynonyms(final String vocId, final String termId, final String syn) { + String id = Optional + .ofNullable(vocId) + .map(s -> s.toLowerCase()) + .orElseThrow( + () -> new IllegalArgumentException(String.format("empty vocabulary id for [term:%s, synonym:%s]"))); + Optional + .ofNullable(vocs.get(id)) + .orElseThrow(() -> new IllegalArgumentException("missing vocabulary id: " + vocId)) + .addSynonym(syn.toLowerCase(), termId); + } + +} diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyTerm.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyTerm.java new file mode 100644 index 000000000..775b93fb1 --- /dev/null +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyTerm.java @@ -0,0 +1,24 @@ + +package eu.dnetlib.dhp.common; + +import java.io.Serializable; + +public class VocabularyTerm implements Serializable { + + private final String id; + private final String name; + + public VocabularyTerm(final String id, final String name) { + this.id = id; + this.name = name; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + +}