diff --git a/dhp-workflows/dhp-workflows-common/pom.xml b/dhp-workflows/dhp-workflows-common/pom.xml
new file mode 100644
index 000000000..bd01d2c9f
--- /dev/null
+++ b/dhp-workflows/dhp-workflows-common/pom.xml
@@ -0,0 +1,32 @@
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.2.4-SNAPSHOT
+
+ 4.0.0
+ dhp-workflows-common
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
+
+
+
diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/CleaningRuleMap.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/CleaningRuleMap.java
new file mode 100644
index 000000000..630f8be5b
--- /dev/null
+++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/CleaningRuleMap.java
@@ -0,0 +1,43 @@
+
+package eu.dnetlib.dhp.common;
+
+import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
+import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.Country;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class CleaningRuleMap extends HashMap> implements Serializable {
+
+ /**
+ * Creates the mapping for the Oaf types subject to cleaning
+ *
+ * @param vocabularies
+ */
+ public static CleaningRuleMap create(VocabularyGroup vocabularies) {
+ CleaningRuleMap mapping = new CleaningRuleMap();
+ mapping.put(Qualifier.class, o -> cleanQualifier(vocabularies, (Qualifier) o));
+ mapping.put(Country.class, o -> {
+ final Country c = (Country) o;
+ if (StringUtils.isBlank(c.getSchemeid())) {
+ c.setSchemeid(ModelConstants.DNET_COUNTRY_TYPE);
+ c.setSchemename(ModelConstants.DNET_COUNTRY_TYPE);
+ }
+ cleanQualifier(vocabularies, c);
+ });
+ return mapping;
+ }
+
+ private static void cleanQualifier(VocabularyGroup vocabularies, Q q) {
+ if (vocabularies.vocabularyExists(q.getSchemeid())) {
+ Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid());
+ q.setClassid(newValue.getClassid());
+ q.setClassname(newValue.getClassname());
+ }
+ }
+
+}
diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java
new file mode 100644
index 000000000..66c0db4ac
--- /dev/null
+++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java
@@ -0,0 +1,32 @@
+package eu.dnetlib.dhp.common;
+
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SaveMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GraphSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(GraphSupport.class);
+
+ private static void saveGraphTable(Dataset dataset, Class clazz, String outputGraph, eu.dnetlib.dhp.common.SaveMode saveMode) {
+
+ log.info("saving graph in {} mode to {}", outputGraph, saveMode.toString());
+
+ final DataFrameWriter writer = dataset.write().mode(SaveMode.Overwrite);
+ switch (saveMode) {
+ case JSON:
+ writer.option("compression", "gzip").json(outputGraph);
+ break;
+ case PARQUET:
+ final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz);
+ writer.saveAsTable(db_table);
+ break;
+ }
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/OafCleaner.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/OafCleaner.java
new file mode 100644
index 000000000..e7e3570d7
--- /dev/null
+++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/OafCleaner.java
@@ -0,0 +1,82 @@
+
+package eu.dnetlib.dhp.common;
+
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+public class OafCleaner implements Serializable {
+
+ public static E apply(E oaf, CleaningRuleMap mapping) {
+ try {
+ navigate(oaf, mapping);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ return oaf;
+ }
+
+ private static void navigate(Object o, CleaningRuleMap mapping) throws IllegalAccessException {
+ if (isPrimitive(o)) {
+ return;
+ } else if (isIterable(o.getClass())) {
+ for (final Object elem : (Iterable>) o) {
+ navigate(elem, mapping);
+ }
+ } else if (hasMapping(o, mapping)) {
+ mapping.get(o.getClass()).accept(o);
+ } else {
+ for (final Field f : getAllFields(o.getClass())) {
+ f.setAccessible(true);
+ final Object val = f.get(o);
+ if (!isPrimitive(val) && hasMapping(val, mapping)) {
+ mapping.get(val.getClass()).accept(val);
+ } else {
+ navigate(f.get(o), mapping);
+ }
+ }
+ }
+ }
+
+ private static boolean hasMapping(Object o, CleaningRuleMap mapping) {
+ return mapping.containsKey(o.getClass());
+ }
+
+ private static boolean isIterable(final Class> cl) {
+ return Iterable.class.isAssignableFrom(cl);
+ }
+
+ private static boolean isPrimitive(Object o) {
+ return Objects.isNull(o)
+ || o.getClass().isPrimitive()
+ || o instanceof Class
+ || o instanceof Integer
+ || o instanceof Double
+ || o instanceof Float
+ || o instanceof Long
+ || o instanceof Boolean
+ || o instanceof String
+ || o instanceof Byte;
+ }
+
+ private static List getAllFields(Class> clazz) {
+ return getAllFields(new LinkedList<>(), clazz);
+ }
+
+ private static List getAllFields(List fields, Class> clazz) {
+ fields.addAll(Arrays.asList(clazz.getDeclaredFields()));
+
+ final Class> superclass = clazz.getSuperclass();
+ if (Objects.nonNull(superclass) && superclass.getPackage().equals(Oaf.class.getPackage())) {
+ getAllFields(fields, superclass);
+ }
+
+ return fields;
+ }
+
+}
diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/SaveMode.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/SaveMode.java
new file mode 100644
index 000000000..d368827df
--- /dev/null
+++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/SaveMode.java
@@ -0,0 +1,7 @@
+package eu.dnetlib.dhp.common;
+
+public enum SaveMode {
+
+ JSON, PARQUET
+
+}
diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/Vocabulary.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/Vocabulary.java
new file mode 100644
index 000000000..4cedb7455
--- /dev/null
+++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/Vocabulary.java
@@ -0,0 +1,84 @@
+
+package eu.dnetlib.dhp.common;
+
+import com.google.common.collect.Maps;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class Vocabulary implements Serializable {
+
+ private final String id;
+ private final String name;
+
+ /**
+ * Code to Term mappings for this Vocabulary.
+ */
+ private final Map terms = new HashMap<>();
+
+ /**
+ * Synonym to Code mappings for this Vocabulary.
+ */
+ private final Map synonyms = Maps.newHashMap();
+
+ public Vocabulary(final String id, final String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ protected Map getTerms() {
+ return terms;
+ }
+
+ public VocabularyTerm getTerm(final String id) {
+ return Optional.ofNullable(id).map(s -> s.toLowerCase()).map(s -> terms.get(s)).orElse(null);
+ }
+
+ protected void addTerm(final String id, final String name) {
+ terms.put(id.toLowerCase(), new VocabularyTerm(id, name));
+ }
+
+ protected boolean termExists(final String id) {
+ return terms.containsKey(id.toLowerCase());
+ }
+
+ protected void addSynonym(final String syn, final String termCode) {
+ synonyms.put(syn, termCode.toLowerCase());
+ }
+
+ public VocabularyTerm getTermBySynonym(final String syn) {
+ return getTerm(synonyms.get(syn.toLowerCase()));
+ }
+
+ public Qualifier getTermAsQualifier(final String termId) {
+ if (StringUtils.isBlank(termId)) {
+ return OafMapperUtils.unknown(getId(), getName());
+ } else if (termExists(termId)) {
+ final VocabularyTerm t = getTerm(termId);
+ return OafMapperUtils.qualifier(t.getId(), t.getName(), getId(), getName());
+ } else {
+ return OafMapperUtils.qualifier(termId, termId, getId(), getName());
+ }
+ }
+
+ public Qualifier getSynonymAsQualifier(final String syn) {
+ return Optional
+ .ofNullable(getTermBySynonym(syn))
+ .map(term -> getTermAsQualifier(term.getId()))
+ .orElse(null);
+ // .orElse(OafMapperUtils.unknown(getId(), getName()));
+ }
+
+}
diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyGroup.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyGroup.java
new file mode 100644
index 000000000..68c8427a7
--- /dev/null
+++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyGroup.java
@@ -0,0 +1,143 @@
+
+package eu.dnetlib.dhp.common;
+
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class VocabularyGroup implements Serializable {
+
+ public static final String VOCABULARIES_XQUERY = "for $x in collection('/db/DRIVER/VocabularyDSResources/VocabularyDSResourceType') \n"
+ +
+ "let $vocid := $x//VOCABULARY_NAME/@code\n" +
+ "let $vocname := $x//VOCABULARY_NAME/text()\n" +
+ "for $term in ($x//TERM)\n" +
+ "return concat($vocid,' @=@ ',$vocname,' @=@ ',$term/@code,' @=@ ',$term/@english_name)";
+
+ public static final String VOCABULARY_SYNONYMS_XQUERY = "for $x in collection('/db/DRIVER/VocabularyDSResources/VocabularyDSResourceType')\n"
+ +
+ "let $vocid := $x//VOCABULARY_NAME/@code\n" +
+ "let $vocname := $x//VOCABULARY_NAME/text()\n" +
+ "for $term in ($x//TERM)\n" +
+ "for $syn in ($term//SYNONYM/@term)\n" +
+ "return concat($vocid,' @=@ ',$term/@code,' @=@ ', $syn)\n";
+
+ public static VocabularyGroup loadVocsFromIS(ISLookUpService isLookUpService) throws ISLookUpException {
+
+ final VocabularyGroup vocs = new VocabularyGroup();
+
+ for (final String s : isLookUpService.quickSearchProfile(VOCABULARIES_XQUERY)) {
+ final String[] arr = s.split("@=@");
+ if (arr.length == 4) {
+ final String vocId = arr[0].trim();
+ final String vocName = arr[1].trim();
+ final String termId = arr[2].trim();
+ final String termName = arr[3].trim();
+
+ if (!vocs.vocabularyExists(vocId)) {
+ vocs.addVocabulary(vocId, vocName);
+ }
+
+ vocs.addTerm(vocId, termId, termName);
+ // vocs.addSynonyms(vocId, termId, termId);
+ }
+ }
+
+ for (final String s : isLookUpService.quickSearchProfile(VOCABULARY_SYNONYMS_XQUERY)) {
+ final String[] arr = s.split("@=@");
+ if (arr.length == 3) {
+ final String vocId = arr[0].trim();
+ final String termId = arr[1].trim();
+ final String syn = arr[2].trim();
+
+ vocs.addSynonyms(vocId, termId, syn);
+ // vocs.addSynonyms(vocId, termId, termId);
+ }
+ }
+
+ return vocs;
+ }
+
+ private final Map vocs = new HashMap<>();
+
+ public void addVocabulary(final String id, final String name) {
+ vocs.put(id.toLowerCase(), new Vocabulary(id, name));
+ }
+
+ public void addTerm(final String vocId, final String id, final String name) {
+ if (vocabularyExists(vocId)) {
+ vocs.get(vocId.toLowerCase()).addTerm(id, name);
+ }
+ }
+
+ public VocabularyTerm getTerm(final String vocId, final String id) {
+ if (termExists(vocId, id)) {
+ return vocs.get(vocId.toLowerCase()).getTerm(id);
+ } else {
+ return new VocabularyTerm(id, id);
+ }
+ }
+
+ public Set getTerms(String vocId) {
+ if (!vocabularyExists(vocId)) {
+ return new HashSet<>();
+ }
+ return vocs
+ .get(vocId.toLowerCase())
+ .getTerms()
+ .values()
+ .stream()
+ .map(t -> t.getId())
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+
+ public Qualifier lookup(String vocId, String id) {
+ return Optional
+ .ofNullable(getSynonymAsQualifier(vocId, id))
+ .orElse(getTermAsQualifier(vocId, id));
+ }
+
+ public Qualifier getTermAsQualifier(final String vocId, final String id) {
+ if (vocabularyExists(vocId)) {
+ return vocs.get(vocId.toLowerCase()).getTermAsQualifier(id);
+ }
+ return OafMapperUtils.qualifier(id, id, "", "");
+ }
+
+ public Qualifier getSynonymAsQualifier(final String vocId, final String syn) {
+ if (StringUtils.isBlank(vocId)) {
+ return OafMapperUtils.unknown("", "");
+ }
+ return vocs.get(vocId.toLowerCase()).getSynonymAsQualifier(syn);
+ }
+
+ public boolean termExists(final String vocId, final String id) {
+ return vocabularyExists(vocId) && vocs.get(vocId.toLowerCase()).termExists(id);
+ }
+
+ public boolean vocabularyExists(final String vocId) {
+ return Optional
+ .ofNullable(vocId)
+ .map(String::toLowerCase)
+ .map(id -> vocs.containsKey(id))
+ .orElse(false);
+ }
+
+ private void addSynonyms(final String vocId, final String termId, final String syn) {
+ String id = Optional
+ .ofNullable(vocId)
+ .map(s -> s.toLowerCase())
+ .orElseThrow(
+ () -> new IllegalArgumentException(String.format("empty vocabulary id for [term:%s, synonym:%s]")));
+ Optional
+ .ofNullable(vocs.get(id))
+ .orElseThrow(() -> new IllegalArgumentException("missing vocabulary id: " + vocId))
+ .addSynonym(syn.toLowerCase(), termId);
+ }
+
+}
diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyTerm.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyTerm.java
new file mode 100644
index 000000000..775b93fb1
--- /dev/null
+++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/VocabularyTerm.java
@@ -0,0 +1,24 @@
+
+package eu.dnetlib.dhp.common;
+
+import java.io.Serializable;
+
+public class VocabularyTerm implements Serializable {
+
+ private final String id;
+ private final String name;
+
+ public VocabularyTerm(final String id, final String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+}