WIP: factoring out utilities into dhp-workflows-common

This commit is contained in:
Claudio Atzori 2020-07-29 13:08:20 +02:00
parent 91811ab43a
commit 2dbac631c9
29 changed files with 84 additions and 480 deletions

View File

@ -195,10 +195,10 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(3432, orgs_simrel);
assertEquals(7152, pubs_simrel);
assertEquals(344, sw_simrel);
assertEquals(6944, pubs_simrel);
assertEquals(318, sw_simrel);
assertEquals(458, ds_simrel);
assertEquals(6750, orp_simrel);
assertEquals(6746, orp_simrel);
}
@Test
@ -344,10 +344,10 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(1276, orgs_mergerel);
assertEquals(1442, pubs_mergerel);
assertEquals(288, sw_mergerel);
assertEquals(1418, pubs_mergerel);
assertEquals(276, sw_mergerel);
assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel);
assertEquals(716, orp_mergerel);
}
@Test
@ -391,8 +391,8 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(82, orgs_deduprecord);
assertEquals(66, pubs_deduprecord);
assertEquals(51, sw_deduprecord);
assertEquals(65, pubs_deduprecord);
assertEquals(50, sw_deduprecord);
assertEquals(96, ds_deduprecord);
assertEquals(89, orp_deduprecord);
}
@ -473,11 +473,11 @@ public class SparkDedupTest implements Serializable {
.distinct()
.count();
assertEquals(897, publications);
assertEquals(896, publications);
assertEquals(835, organizations);
assertEquals(100, projects);
assertEquals(100, datasource);
assertEquals(200, softwares);
assertEquals(199, softwares);
assertEquals(388, dataset);
assertEquals(517, otherresearchproduct);
@ -533,7 +533,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4866, relations);
assertEquals(4828, relations);
// check deletedbyinference
final Dataset<Relation> mergeRels = spark

View File

@ -168,10 +168,10 @@ public class SparkStatsTest implements Serializable {
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
.count();
assertEquals(121, orgs_blocks);
assertEquals(110, pubs_blocks);
assertEquals(21, sw_blocks);
assertEquals(67, ds_blocks);
assertEquals(549, orgs_blocks);
assertEquals(868, pubs_blocks);
assertEquals(473, sw_blocks);
assertEquals(523, ds_blocks);
assertEquals(55, orp_blocks);
}
}

View File

@ -17,8 +17,7 @@
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree" : {

View File

@ -17,8 +17,7 @@
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree" : {

View File

@ -29,8 +29,7 @@
},
"pace": {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {

View File

@ -17,8 +17,7 @@
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {

View File

@ -71,6 +71,11 @@
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedInputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@ -22,10 +21,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.*;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;

View File

@ -1,44 +0,0 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.io.Serializable;
import java.util.HashMap;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class CleaningRuleMap extends HashMap<Class, SerializableConsumer<Object>> implements Serializable {
/**
* Creates the mapping for the Oaf types subject to cleaning
*
* @param vocabularies
*/
public static CleaningRuleMap create(VocabularyGroup vocabularies) {
CleaningRuleMap mapping = new CleaningRuleMap();
mapping.put(Qualifier.class, o -> cleanQualifier(vocabularies, (Qualifier) o));
mapping.put(Country.class, o -> {
final Country c = (Country) o;
if (StringUtils.isBlank(c.getSchemeid())) {
c.setSchemeid(ModelConstants.DNET_COUNTRY_TYPE);
c.setSchemename(ModelConstants.DNET_COUNTRY_TYPE);
}
cleanQualifier(vocabularies, c);
});
return mapping;
}
private static <Q extends Qualifier> void cleanQualifier(VocabularyGroup vocabularies, Q q) {
if (vocabularies.vocabularyExists(q.getSchemeid())) {
Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid());
q.setClassid(newValue.getClassid());
q.setClassname(newValue.getClassname());
}
}
}

View File

@ -1,82 +0,0 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import eu.dnetlib.dhp.schema.oaf.Oaf;
public class OafCleaner implements Serializable {
public static <E extends Oaf> E apply(E oaf, CleaningRuleMap mapping) {
try {
navigate(oaf, mapping);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
return oaf;
}
private static void navigate(Object o, CleaningRuleMap mapping) throws IllegalAccessException {
if (isPrimitive(o)) {
return;
} else if (isIterable(o.getClass())) {
for (final Object elem : (Iterable<?>) o) {
navigate(elem, mapping);
}
} else if (hasMapping(o, mapping)) {
mapping.get(o.getClass()).accept(o);
} else {
for (final Field f : getAllFields(o.getClass())) {
f.setAccessible(true);
final Object val = f.get(o);
if (!isPrimitive(val) && hasMapping(val, mapping)) {
mapping.get(val.getClass()).accept(val);
} else {
navigate(f.get(o), mapping);
}
}
}
}
private static boolean hasMapping(Object o, CleaningRuleMap mapping) {
return mapping.containsKey(o.getClass());
}
private static boolean isIterable(final Class<?> cl) {
return Iterable.class.isAssignableFrom(cl);
}
private static boolean isPrimitive(Object o) {
return Objects.isNull(o)
|| o.getClass().isPrimitive()
|| o instanceof Class
|| o instanceof Integer
|| o instanceof Double
|| o instanceof Float
|| o instanceof Long
|| o instanceof Boolean
|| o instanceof String
|| o instanceof Byte;
}
private static List<Field> getAllFields(Class<?> clazz) {
return getAllFields(new LinkedList<>(), clazz);
}
private static List<Field> getAllFields(List<Field> fields, Class<?> clazz) {
fields.addAll(Arrays.asList(clazz.getDeclaredFields()));
final Class<?> superclass = clazz.getSuperclass();
if (Objects.nonNull(superclass) && superclass.getPackage().equals(Oaf.class.getPackage())) {
getAllFields(fields, superclass);
}
return fields;
}
}

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.*;
@ -12,7 +12,7 @@ import org.dom4j.DocumentFactory;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.LicenseComparator;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;

View File

@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;

View File

@ -1,15 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
@ -52,8 +44,8 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;

View File

@ -1,9 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.ArrayList;
@ -18,7 +16,7 @@ import org.dom4j.Node;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;

View File

@ -1,9 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.ArrayList;
@ -17,7 +15,7 @@ import org.dom4j.Document;
import org.dom4j.Node;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;

View File

@ -1,86 +0,0 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class Vocabulary implements Serializable {
private final String id;
private final String name;
/**
* Code to Term mappings for this Vocabulary.
*/
private final Map<String, VocabularyTerm> terms = new HashMap<>();
/**
* Synonym to Code mappings for this Vocabulary.
*/
private final Map<String, String> synonyms = Maps.newHashMap();
public Vocabulary(final String id, final String name) {
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public String getName() {
return name;
}
protected Map<String, VocabularyTerm> getTerms() {
return terms;
}
public VocabularyTerm getTerm(final String id) {
return Optional.ofNullable(id).map(s -> s.toLowerCase()).map(s -> terms.get(s)).orElse(null);
}
protected void addTerm(final String id, final String name) {
terms.put(id.toLowerCase(), new VocabularyTerm(id, name));
}
protected boolean termExists(final String id) {
return terms.containsKey(id.toLowerCase());
}
protected void addSynonym(final String syn, final String termCode) {
synonyms.put(syn, termCode.toLowerCase());
}
public VocabularyTerm getTermBySynonym(final String syn) {
return getTerm(synonyms.get(syn.toLowerCase()));
}
public Qualifier getTermAsQualifier(final String termId) {
if (StringUtils.isBlank(termId)) {
return OafMapperUtils.unknown(getId(), getName());
} else if (termExists(termId)) {
final VocabularyTerm t = getTerm(termId);
return OafMapperUtils.qualifier(t.getId(), t.getName(), getId(), getName());
} else {
return OafMapperUtils.qualifier(termId, termId, getId(), getName());
}
}
public Qualifier getSynonymAsQualifier(final String syn) {
return Optional
.ofNullable(getTermBySynonym(syn))
.map(term -> getTermAsQualifier(term.getId()))
.orElse(null);
// .orElse(OafMapperUtils.unknown(getId(), getName()));
}
}

View File

@ -1,144 +0,0 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class VocabularyGroup implements Serializable {
public static final String VOCABULARIES_XQUERY = "for $x in collection('/db/DRIVER/VocabularyDSResources/VocabularyDSResourceType') \n"
+
"let $vocid := $x//VOCABULARY_NAME/@code\n" +
"let $vocname := $x//VOCABULARY_NAME/text()\n" +
"for $term in ($x//TERM)\n" +
"return concat($vocid,' @=@ ',$vocname,' @=@ ',$term/@code,' @=@ ',$term/@english_name)";
public static final String VOCABULARY_SYNONYMS_XQUERY = "for $x in collection('/db/DRIVER/VocabularyDSResources/VocabularyDSResourceType')\n"
+
"let $vocid := $x//VOCABULARY_NAME/@code\n" +
"let $vocname := $x//VOCABULARY_NAME/text()\n" +
"for $term in ($x//TERM)\n" +
"for $syn in ($term//SYNONYM/@term)\n" +
"return concat($vocid,' @=@ ',$term/@code,' @=@ ', $syn)\n";
public static VocabularyGroup loadVocsFromIS(ISLookUpService isLookUpService) throws ISLookUpException {
final VocabularyGroup vocs = new VocabularyGroup();
for (final String s : isLookUpService.quickSearchProfile(VOCABULARIES_XQUERY)) {
final String[] arr = s.split("@=@");
if (arr.length == 4) {
final String vocId = arr[0].trim();
final String vocName = arr[1].trim();
final String termId = arr[2].trim();
final String termName = arr[3].trim();
if (!vocs.vocabularyExists(vocId)) {
vocs.addVocabulary(vocId, vocName);
}
vocs.addTerm(vocId, termId, termName);
// vocs.addSynonyms(vocId, termId, termId);
}
}
for (final String s : isLookUpService.quickSearchProfile(VOCABULARY_SYNONYMS_XQUERY)) {
final String[] arr = s.split("@=@");
if (arr.length == 3) {
final String vocId = arr[0].trim();
final String termId = arr[1].trim();
final String syn = arr[2].trim();
vocs.addSynonyms(vocId, termId, syn);
// vocs.addSynonyms(vocId, termId, termId);
}
}
return vocs;
}
private final Map<String, Vocabulary> vocs = new HashMap<>();
public void addVocabulary(final String id, final String name) {
vocs.put(id.toLowerCase(), new Vocabulary(id, name));
}
public void addTerm(final String vocId, final String id, final String name) {
if (vocabularyExists(vocId)) {
vocs.get(vocId.toLowerCase()).addTerm(id, name);
}
}
public VocabularyTerm getTerm(final String vocId, final String id) {
if (termExists(vocId, id)) {
return vocs.get(vocId.toLowerCase()).getTerm(id);
} else {
return new VocabularyTerm(id, id);
}
}
public Set<String> getTerms(String vocId) {
if (!vocabularyExists(vocId)) {
return new HashSet<>();
}
return vocs
.get(vocId.toLowerCase())
.getTerms()
.values()
.stream()
.map(t -> t.getId())
.collect(Collectors.toCollection(HashSet::new));
}
public Qualifier lookup(String vocId, String id) {
return Optional
.ofNullable(getSynonymAsQualifier(vocId, id))
.orElse(getTermAsQualifier(vocId, id));
}
public Qualifier getTermAsQualifier(final String vocId, final String id) {
if (vocabularyExists(vocId)) {
return vocs.get(vocId.toLowerCase()).getTermAsQualifier(id);
}
return OafMapperUtils.qualifier(id, id, "", "");
}
public Qualifier getSynonymAsQualifier(final String vocId, final String syn) {
if (StringUtils.isBlank(vocId)) {
return OafMapperUtils.unknown("", "");
}
return vocs.get(vocId.toLowerCase()).getSynonymAsQualifier(syn);
}
public boolean termExists(final String vocId, final String id) {
return vocabularyExists(vocId) && vocs.get(vocId.toLowerCase()).termExists(id);
}
public boolean vocabularyExists(final String vocId) {
return Optional
.ofNullable(vocId)
.map(String::toLowerCase)
.map(id -> vocs.containsKey(id))
.orElse(false);
}
private void addSynonyms(final String vocId, final String termId, final String syn) {
String id = Optional
.ofNullable(vocId)
.map(s -> s.toLowerCase())
.orElseThrow(
() -> new IllegalArgumentException(String.format("empty vocabulary id for [term:%s, synonym:%s]")));
Optional
.ofNullable(vocs.get(id))
.orElseThrow(() -> new IllegalArgumentException("missing vocabulary id: " + vocId))
.addSynonym(syn.toLowerCase(), termId);
}
}

View File

@ -1,24 +0,0 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
import java.io.Serializable;
public class VocabularyTerm implements Serializable {
private final String id;
private final String name;
public VocabularyTerm(final String id, final String name) {
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public String getName() {
return name;
}
}

View File

@ -18,7 +18,9 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.CleaningRuleMap;
import eu.dnetlib.dhp.common.OafCleaner;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;

View File

@ -21,8 +21,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;

View File

@ -27,8 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.OafMapperUtils;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;

View File

@ -1,15 +1,15 @@
package eu.dnetlib.dhp.common;
import java.io.Serializable;
import java.util.HashMap;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.HashMap;
public class CleaningRuleMap extends HashMap<Class, SerializableConsumer<Object>> implements Serializable {

View File

@ -1,32 +1,35 @@
package eu.dnetlib.dhp.common;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
public class GraphSupport {
private static final Logger log = LoggerFactory.getLogger(GraphSupport.class);
private static final Logger log = LoggerFactory.getLogger(GraphSupport.class);
private static <T extends Oaf> void saveGraphTable(Dataset<T> dataset, Class<T> clazz, String outputGraph, eu.dnetlib.dhp.common.SaveMode saveMode) {
private static <T extends Oaf> void saveGraphTable(Dataset<T> dataset, Class<T> clazz, String outputGraph,
eu.dnetlib.dhp.common.SaveMode saveMode) {
log.info("saving graph in {} mode to {}", outputGraph, saveMode.toString());
log.info("saving graph in {} mode to {}", outputGraph, saveMode.toString());
final DataFrameWriter<T> writer = dataset.write().mode(SaveMode.Overwrite);
switch (saveMode) {
case JSON:
writer.option("compression", "gzip").json(outputGraph);
break;
case PARQUET:
final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz);
writer.saveAsTable(db_table);
break;
}
final DataFrameWriter<T> writer = dataset.write().mode(SaveMode.Overwrite);
switch (saveMode) {
case JSON:
writer.option("compression", "gzip").json(outputGraph);
break;
case PARQUET:
final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz);
writer.saveAsTable(db_table);
break;
}
}
}
}

View File

@ -1,8 +1,6 @@
package eu.dnetlib.dhp.common;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
@ -10,6 +8,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import eu.dnetlib.dhp.schema.oaf.Oaf;
public class OafCleaner implements Serializable {
public static <E extends Oaf> E apply(E oaf, CleaningRuleMap mapping) {

View File

@ -1,11 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
package eu.dnetlib.dhp.common;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
@ -13,15 +9,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.DHPUtils;
public class OafMapperUtils {

View File

@ -1,7 +1,8 @@
package eu.dnetlib.dhp.common;
public enum SaveMode {
JSON, PARQUET
JSON, PARQUET
}

View File

@ -1,15 +1,17 @@
package eu.dnetlib.dhp.common;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class Vocabulary implements Serializable {
private final String id;

View File

@ -1,15 +1,16 @@
package eu.dnetlib.dhp.common;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class VocabularyGroup implements Serializable {
public static final String VOCABULARIES_XQUERY = "for $x in collection('/db/DRIVER/VocabularyDSResources/VocabularyDSResourceType') \n"

View File

@ -17,6 +17,7 @@
<modules>
<module>dhp-workflow-profiles</module>
<module>dhp-workflows-common</module>
<module>dhp-aggregation</module>
<module>dhp-distcp</module>
<module>dhp-actionmanager</module>