forked from antonis.lempesis/dnet-hadoop
WIP dhp-workflows-common
This commit is contained in:
parent
f680eb3e12
commit
b0b6c6bd47
|
@ -0,0 +1,32 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>dhp-workflows</artifactId>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<version>1.2.4-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>dhp-workflows-common</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.11</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_2.11</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-schemas</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,43 @@
|
|||
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
|
||||
public class CleaningRuleMap extends HashMap<Class, SerializableConsumer<Object>> implements Serializable {
|
||||
|
||||
/**
|
||||
* Creates the mapping for the Oaf types subject to cleaning
|
||||
*
|
||||
* @param vocabularies
|
||||
*/
|
||||
public static CleaningRuleMap create(VocabularyGroup vocabularies) {
|
||||
CleaningRuleMap mapping = new CleaningRuleMap();
|
||||
mapping.put(Qualifier.class, o -> cleanQualifier(vocabularies, (Qualifier) o));
|
||||
mapping.put(Country.class, o -> {
|
||||
final Country c = (Country) o;
|
||||
if (StringUtils.isBlank(c.getSchemeid())) {
|
||||
c.setSchemeid(ModelConstants.DNET_COUNTRY_TYPE);
|
||||
c.setSchemename(ModelConstants.DNET_COUNTRY_TYPE);
|
||||
}
|
||||
cleanQualifier(vocabularies, c);
|
||||
});
|
||||
return mapping;
|
||||
}
|
||||
|
||||
private static <Q extends Qualifier> void cleanQualifier(VocabularyGroup vocabularies, Q q) {
|
||||
if (vocabularies.vocabularyExists(q.getSchemeid())) {
|
||||
Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid());
|
||||
q.setClassid(newValue.getClassid());
|
||||
q.setClassname(newValue.getClassname());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import org.apache.spark.sql.DataFrameWriter;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class GraphSupport {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GraphSupport.class);
|
||||
|
||||
private static <T extends Oaf> void saveGraphTable(Dataset<T> dataset, Class<T> clazz, String outputGraph, eu.dnetlib.dhp.common.SaveMode saveMode) {
|
||||
|
||||
log.info("saving graph in {} mode to {}", outputGraph, saveMode.toString());
|
||||
|
||||
final DataFrameWriter<T> writer = dataset.write().mode(SaveMode.Overwrite);
|
||||
switch (saveMode) {
|
||||
case JSON:
|
||||
writer.option("compression", "gzip").json(outputGraph);
|
||||
break;
|
||||
case PARQUET:
|
||||
final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz);
|
||||
writer.saveAsTable(db_table);
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,82 @@
|
|||
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class OafCleaner implements Serializable {
|
||||
|
||||
public static <E extends Oaf> E apply(E oaf, CleaningRuleMap mapping) {
|
||||
try {
|
||||
navigate(oaf, mapping);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return oaf;
|
||||
}
|
||||
|
||||
private static void navigate(Object o, CleaningRuleMap mapping) throws IllegalAccessException {
|
||||
if (isPrimitive(o)) {
|
||||
return;
|
||||
} else if (isIterable(o.getClass())) {
|
||||
for (final Object elem : (Iterable<?>) o) {
|
||||
navigate(elem, mapping);
|
||||
}
|
||||
} else if (hasMapping(o, mapping)) {
|
||||
mapping.get(o.getClass()).accept(o);
|
||||
} else {
|
||||
for (final Field f : getAllFields(o.getClass())) {
|
||||
f.setAccessible(true);
|
||||
final Object val = f.get(o);
|
||||
if (!isPrimitive(val) && hasMapping(val, mapping)) {
|
||||
mapping.get(val.getClass()).accept(val);
|
||||
} else {
|
||||
navigate(f.get(o), mapping);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean hasMapping(Object o, CleaningRuleMap mapping) {
|
||||
return mapping.containsKey(o.getClass());
|
||||
}
|
||||
|
||||
private static boolean isIterable(final Class<?> cl) {
|
||||
return Iterable.class.isAssignableFrom(cl);
|
||||
}
|
||||
|
||||
private static boolean isPrimitive(Object o) {
|
||||
return Objects.isNull(o)
|
||||
|| o.getClass().isPrimitive()
|
||||
|| o instanceof Class
|
||||
|| o instanceof Integer
|
||||
|| o instanceof Double
|
||||
|| o instanceof Float
|
||||
|| o instanceof Long
|
||||
|| o instanceof Boolean
|
||||
|| o instanceof String
|
||||
|| o instanceof Byte;
|
||||
}
|
||||
|
||||
private static List<Field> getAllFields(Class<?> clazz) {
|
||||
return getAllFields(new LinkedList<>(), clazz);
|
||||
}
|
||||
|
||||
private static List<Field> getAllFields(List<Field> fields, Class<?> clazz) {
|
||||
fields.addAll(Arrays.asList(clazz.getDeclaredFields()));
|
||||
|
||||
final Class<?> superclass = clazz.getSuperclass();
|
||||
if (Objects.nonNull(superclass) && superclass.getPackage().equals(Oaf.class.getPackage())) {
|
||||
getAllFields(fields, superclass);
|
||||
}
|
||||
|
||||
return fields;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
package eu.dnetlib.dhp.common;
|
||||
|
||||
public enum SaveMode {
|
||||
|
||||
JSON, PARQUET
|
||||
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public class Vocabulary implements Serializable {
|
||||
|
||||
private final String id;
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* Code to Term mappings for this Vocabulary.
|
||||
*/
|
||||
private final Map<String, VocabularyTerm> terms = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Synonym to Code mappings for this Vocabulary.
|
||||
*/
|
||||
private final Map<String, String> synonyms = Maps.newHashMap();
|
||||
|
||||
public Vocabulary(final String id, final String name) {
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
protected Map<String, VocabularyTerm> getTerms() {
|
||||
return terms;
|
||||
}
|
||||
|
||||
public VocabularyTerm getTerm(final String id) {
|
||||
return Optional.ofNullable(id).map(s -> s.toLowerCase()).map(s -> terms.get(s)).orElse(null);
|
||||
}
|
||||
|
||||
protected void addTerm(final String id, final String name) {
|
||||
terms.put(id.toLowerCase(), new VocabularyTerm(id, name));
|
||||
}
|
||||
|
||||
protected boolean termExists(final String id) {
|
||||
return terms.containsKey(id.toLowerCase());
|
||||
}
|
||||
|
||||
protected void addSynonym(final String syn, final String termCode) {
|
||||
synonyms.put(syn, termCode.toLowerCase());
|
||||
}
|
||||
|
||||
public VocabularyTerm getTermBySynonym(final String syn) {
|
||||
return getTerm(synonyms.get(syn.toLowerCase()));
|
||||
}
|
||||
|
||||
public Qualifier getTermAsQualifier(final String termId) {
|
||||
if (StringUtils.isBlank(termId)) {
|
||||
return OafMapperUtils.unknown(getId(), getName());
|
||||
} else if (termExists(termId)) {
|
||||
final VocabularyTerm t = getTerm(termId);
|
||||
return OafMapperUtils.qualifier(t.getId(), t.getName(), getId(), getName());
|
||||
} else {
|
||||
return OafMapperUtils.qualifier(termId, termId, getId(), getName());
|
||||
}
|
||||
}
|
||||
|
||||
public Qualifier getSynonymAsQualifier(final String syn) {
|
||||
return Optional
|
||||
.ofNullable(getTermBySynonym(syn))
|
||||
.map(term -> getTermAsQualifier(term.getId()))
|
||||
.orElse(null);
|
||||
// .orElse(OafMapperUtils.unknown(getId(), getName()));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class VocabularyGroup implements Serializable {
|
||||
|
||||
public static final String VOCABULARIES_XQUERY = "for $x in collection('/db/DRIVER/VocabularyDSResources/VocabularyDSResourceType') \n"
|
||||
+
|
||||
"let $vocid := $x//VOCABULARY_NAME/@code\n" +
|
||||
"let $vocname := $x//VOCABULARY_NAME/text()\n" +
|
||||
"for $term in ($x//TERM)\n" +
|
||||
"return concat($vocid,' @=@ ',$vocname,' @=@ ',$term/@code,' @=@ ',$term/@english_name)";
|
||||
|
||||
public static final String VOCABULARY_SYNONYMS_XQUERY = "for $x in collection('/db/DRIVER/VocabularyDSResources/VocabularyDSResourceType')\n"
|
||||
+
|
||||
"let $vocid := $x//VOCABULARY_NAME/@code\n" +
|
||||
"let $vocname := $x//VOCABULARY_NAME/text()\n" +
|
||||
"for $term in ($x//TERM)\n" +
|
||||
"for $syn in ($term//SYNONYM/@term)\n" +
|
||||
"return concat($vocid,' @=@ ',$term/@code,' @=@ ', $syn)\n";
|
||||
|
||||
public static VocabularyGroup loadVocsFromIS(ISLookUpService isLookUpService) throws ISLookUpException {
|
||||
|
||||
final VocabularyGroup vocs = new VocabularyGroup();
|
||||
|
||||
for (final String s : isLookUpService.quickSearchProfile(VOCABULARIES_XQUERY)) {
|
||||
final String[] arr = s.split("@=@");
|
||||
if (arr.length == 4) {
|
||||
final String vocId = arr[0].trim();
|
||||
final String vocName = arr[1].trim();
|
||||
final String termId = arr[2].trim();
|
||||
final String termName = arr[3].trim();
|
||||
|
||||
if (!vocs.vocabularyExists(vocId)) {
|
||||
vocs.addVocabulary(vocId, vocName);
|
||||
}
|
||||
|
||||
vocs.addTerm(vocId, termId, termName);
|
||||
// vocs.addSynonyms(vocId, termId, termId);
|
||||
}
|
||||
}
|
||||
|
||||
for (final String s : isLookUpService.quickSearchProfile(VOCABULARY_SYNONYMS_XQUERY)) {
|
||||
final String[] arr = s.split("@=@");
|
||||
if (arr.length == 3) {
|
||||
final String vocId = arr[0].trim();
|
||||
final String termId = arr[1].trim();
|
||||
final String syn = arr[2].trim();
|
||||
|
||||
vocs.addSynonyms(vocId, termId, syn);
|
||||
// vocs.addSynonyms(vocId, termId, termId);
|
||||
}
|
||||
}
|
||||
|
||||
return vocs;
|
||||
}
|
||||
|
||||
private final Map<String, Vocabulary> vocs = new HashMap<>();
|
||||
|
||||
public void addVocabulary(final String id, final String name) {
|
||||
vocs.put(id.toLowerCase(), new Vocabulary(id, name));
|
||||
}
|
||||
|
||||
public void addTerm(final String vocId, final String id, final String name) {
|
||||
if (vocabularyExists(vocId)) {
|
||||
vocs.get(vocId.toLowerCase()).addTerm(id, name);
|
||||
}
|
||||
}
|
||||
|
||||
public VocabularyTerm getTerm(final String vocId, final String id) {
|
||||
if (termExists(vocId, id)) {
|
||||
return vocs.get(vocId.toLowerCase()).getTerm(id);
|
||||
} else {
|
||||
return new VocabularyTerm(id, id);
|
||||
}
|
||||
}
|
||||
|
||||
public Set<String> getTerms(String vocId) {
|
||||
if (!vocabularyExists(vocId)) {
|
||||
return new HashSet<>();
|
||||
}
|
||||
return vocs
|
||||
.get(vocId.toLowerCase())
|
||||
.getTerms()
|
||||
.values()
|
||||
.stream()
|
||||
.map(t -> t.getId())
|
||||
.collect(Collectors.toCollection(HashSet::new));
|
||||
}
|
||||
|
||||
public Qualifier lookup(String vocId, String id) {
|
||||
return Optional
|
||||
.ofNullable(getSynonymAsQualifier(vocId, id))
|
||||
.orElse(getTermAsQualifier(vocId, id));
|
||||
}
|
||||
|
||||
public Qualifier getTermAsQualifier(final String vocId, final String id) {
|
||||
if (vocabularyExists(vocId)) {
|
||||
return vocs.get(vocId.toLowerCase()).getTermAsQualifier(id);
|
||||
}
|
||||
return OafMapperUtils.qualifier(id, id, "", "");
|
||||
}
|
||||
|
||||
public Qualifier getSynonymAsQualifier(final String vocId, final String syn) {
|
||||
if (StringUtils.isBlank(vocId)) {
|
||||
return OafMapperUtils.unknown("", "");
|
||||
}
|
||||
return vocs.get(vocId.toLowerCase()).getSynonymAsQualifier(syn);
|
||||
}
|
||||
|
||||
public boolean termExists(final String vocId, final String id) {
|
||||
return vocabularyExists(vocId) && vocs.get(vocId.toLowerCase()).termExists(id);
|
||||
}
|
||||
|
||||
public boolean vocabularyExists(final String vocId) {
|
||||
return Optional
|
||||
.ofNullable(vocId)
|
||||
.map(String::toLowerCase)
|
||||
.map(id -> vocs.containsKey(id))
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
private void addSynonyms(final String vocId, final String termId, final String syn) {
|
||||
String id = Optional
|
||||
.ofNullable(vocId)
|
||||
.map(s -> s.toLowerCase())
|
||||
.orElseThrow(
|
||||
() -> new IllegalArgumentException(String.format("empty vocabulary id for [term:%s, synonym:%s]")));
|
||||
Optional
|
||||
.ofNullable(vocs.get(id))
|
||||
.orElseThrow(() -> new IllegalArgumentException("missing vocabulary id: " + vocId))
|
||||
.addSynonym(syn.toLowerCase(), termId);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class VocabularyTerm implements Serializable {
|
||||
|
||||
private final String id;
|
||||
private final String name;
|
||||
|
||||
public VocabularyTerm(final String id, final String name) {
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue