1
0
Fork 0

imported methods from CleaningFunctions, defined in GraphCleaningFunctions

This commit is contained in:
Claudio Atzori 2021-05-10 16:43:39 +02:00
parent 3797543600
commit d1cbee8413
7 changed files with 377 additions and 23 deletions

View File

@ -0,0 +1,348 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
public class GraphCleaningFunctions extends CleaningFunctions {
public static final String CLEANING_REGEX = "(?:\\n|\\r|\\t)";
public static final String ORCID_PREFIX_REGEX = "^http(s?):\\/\\/orcid\\.org\\/";
public static final String INVALID_AUTHOR_REGEX = ".*deactivated.*";
public static final String TITLE_FILTER_REGEX = "[.*test.*\\W\\d]";
public static final int TITLE_FILTER_RESIDUAL_LENGTH = 10;
public static <T extends Oaf> T fixVocabularyNames(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
// nothing to clean here
} else if (value instanceof Organization) {
Organization o = (Organization) value;
if (Objects.nonNull(o.getCountry())) {
fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE);
}
} else if (value instanceof Relation) {
// nothing to clean here
} else if (value instanceof Result) {
Result r = (Result) value;
fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES);
fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE);
fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES);
if (Objects.nonNull(r.getSubject())) {
r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES));
}
if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) {
fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES);
fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS);
}
}
if (Objects.nonNull(r.getAuthor())) {
r.getAuthor().stream().filter(Objects::nonNull).forEach(a -> {
if (Objects.nonNull(a.getPid())) {
a.getPid().stream().filter(Objects::nonNull).forEach(p -> {
fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES);
});
}
});
}
if (value instanceof Publication) {
} else if (value instanceof Dataset) {
} else if (value instanceof OtherResearchProduct) {
} else if (value instanceof Software) {
}
}
return value;
}
public static <T extends Oaf> boolean filter(T value) {
if (value instanceof Datasource) {
// nothing to evaluate here
} else if (value instanceof Project) {
// nothing to evaluate here
} else if (value instanceof Organization) {
// nothing to evaluate here
} else if (value instanceof Relation) {
// nothing to clean here
} else if (value instanceof Result) {
Result r = (Result) value;
if (Objects.nonNull(r.getTitle()) && r.getTitle().isEmpty()) {
return false;
}
if (value instanceof Publication) {
} else if (value instanceof Dataset) {
} else if (value instanceof OtherResearchProduct) {
} else if (value instanceof Software) {
}
}
return true;
}
public static <T extends Oaf> T cleanup(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
// nothing to clean here
} else if (value instanceof Organization) {
Organization o = (Organization) value;
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {
o.setCountry(ModelConstants.UNKNOWN_COUNTRY);
}
} else if (value instanceof Relation) {
// nothing to clean here
} else if (value instanceof Result) {
Result r = (Result) value;
if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) {
r.setPublisher(null);
}
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
r
.setLanguage(
qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES));
}
if (Objects.nonNull(r.getSubject())) {
r
.setSubject(
r
.getSubject()
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.map(GraphCleaningFunctions::cleanValue)
.collect(Collectors.toList()));
}
if (Objects.nonNull(r.getTitle())) {
r
.setTitle(
r
.getTitle()
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
.filter(
sp -> sp
.getValue()
.toLowerCase()
.replaceAll(TITLE_FILTER_REGEX, "")
.length() > TITLE_FILTER_RESIDUAL_LENGTH)
.map(GraphCleaningFunctions::cleanValue)
.collect(Collectors.toList()));
}
if (Objects.nonNull(r.getDescription())) {
r
.setDescription(
r
.getDescription()
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
.map(GraphCleaningFunctions::cleanValue)
.collect(Collectors.toList()));
}
if (Objects.nonNull(r.getPid())) {
r.setPid(processPidCleaning(r.getPid()));
}
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
r
.setResourcetype(
qualifier(ModelConstants.UNKNOWN, "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
}
if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) {
Optional
.ofNullable(i.getPid())
.ifPresent(pid -> {
final Set<StructuredProperty> pids = pid
.stream()
.filter(Objects::nonNull)
.filter(p -> StringUtils.isNotBlank(p.getValue()))
.collect(Collectors.toCollection(HashSet::new));
Optional
.ofNullable(i.getAlternateIdentifier())
.ifPresent(altId -> {
final Set<StructuredProperty> altIds = altId
.stream()
.filter(Objects::nonNull)
.filter(p -> StringUtils.isNotBlank(p.getValue()))
.collect(Collectors.toCollection(HashSet::new));
i.setAlternateIdentifier(Lists.newArrayList(Sets.difference(altIds, pids)));
});
});
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
i
.setAccessright(
accessRight(
ModelConstants.UNKNOWN, ModelConstants.NOT_AVAILABLE,
ModelConstants.DNET_ACCESS_MODES));
}
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
}
if (Objects.isNull(i.getRefereed())) {
i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
}
}
}
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
Qualifier bestaccessrights = OafMapperUtils.createBestAccessRights(r.getInstance());
if (Objects.isNull(bestaccessrights)) {
r
.setBestaccessright(
qualifier(
ModelConstants.UNKNOWN, ModelConstants.NOT_AVAILABLE,
ModelConstants.DNET_ACCESS_MODES));
} else {
r.setBestaccessright(bestaccessrights);
}
}
if (Objects.nonNull(r.getAuthor())) {
final List<Author> authors = Lists.newArrayList();
for (Author a : r.getAuthor()) {
if (Objects.isNull(a.getPid())) {
a.setPid(Lists.newArrayList());
} else {
a
.setPid(
a
.getPid()
.stream()
.filter(Objects::nonNull)
.filter(p -> Objects.nonNull(p.getQualifier()))
.filter(p -> StringUtils.isNotBlank(p.getValue()))
.map(p -> {
p.setValue(p.getValue().trim().replaceAll(ORCID_PREFIX_REGEX, ""));
return p;
})
.filter(p -> StringUtils.isNotBlank(p.getValue()))
.collect(
Collectors
.toMap(
StructuredProperty::getValue, Function.identity(), (p1, p2) -> p1,
LinkedHashMap::new))
.values()
.stream()
.collect(Collectors.toList()));
}
if (StringUtils.isBlank(a.getFullname())) {
if (StringUtils.isNotBlank(a.getName()) && StringUtils.isNotBlank(a.getSurname())) {
a.setFullname(a.getSurname() + ", " + a.getName());
}
}
if (StringUtils.isNotBlank(a.getFullname()) && isValidAuthorName(a)) {
authors.add(a);
}
}
boolean nullRank = authors
.stream()
.anyMatch(a -> Objects.isNull(a.getRank()));
if (nullRank) {
int i = 1;
for (Author author : authors) {
author.setRank(i++);
}
}
r.setAuthor(authors);
}
if (value instanceof Publication) {
} else if (value instanceof Dataset) {
} else if (value instanceof OtherResearchProduct) {
} else if (value instanceof Software) {
}
}
return value;
}
// HELPERS
private static boolean isValidAuthorName(Author a) {
return !Stream
.of(a.getFullname(), a.getName(), a.getSurname())
.filter(s -> s != null && !s.isEmpty())
.collect(Collectors.joining(""))
.toLowerCase()
.matches(INVALID_AUTHOR_REGEX);
}
private static List<StructuredProperty> processPidCleaning(List<StructuredProperty> pids) {
return pids
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue())))
.filter(sp -> !PID_BLACKLIST.contains(sp.getValue().trim().toLowerCase()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.map(CleaningFunctions::normalizePidValue)
.filter(CleaningFunctions::pidFilter)
.collect(Collectors.toList());
}
private static void fixVocabName(Qualifier q, String vocabularyName) {
if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) {
q.setSchemeid(vocabularyName);
q.setSchemename(vocabularyName);
}
}
private static AccessRight accessRight(String classid, String classname, String scheme) {
return OafMapperUtils
.accessRight(
classid, classname, scheme, scheme);
}
private static Qualifier qualifier(String classid, String classname, String scheme) {
return OafMapperUtils
.qualifier(
classid, classname, scheme, scheme);
}
protected static StructuredProperty cleanValue(StructuredProperty s) {
s.setValue(s.getValue().replaceAll(CLEANING_REGEX, " "));
return s;
}
protected static Field<String> cleanValue(Field<String> s) {
s.setValue(s.getValue().replaceAll(CLEANING_REGEX, " "));
return s;
}
}

View File

@ -2,7 +2,6 @@
package eu.dnetlib.dhp.oa.graph.clean; package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions.*;
import java.util.Optional; import java.util.Optional;
@ -24,6 +23,7 @@ import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -86,10 +86,10 @@ public class CleanGraphSparkJob {
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs); final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
readTableFromPath(spark, inputPath, clazz) readTableFromPath(spark, inputPath, clazz)
.map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz)) .map((MapFunction<T, T>) GraphCleaningFunctions::fixVocabularyNames, Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz)) .map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> cleanup(value), Encoders.bean(clazz)) .map((MapFunction<T, T>) GraphCleaningFunctions::cleanup, Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> filter(value)) .filter((FilterFunction<T>) GraphCleaningFunctions::filter)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")

View File

@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentFactory; import org.dom4j.DocumentFactory;
@ -15,6 +14,7 @@ import org.dom4j.DocumentHelper;
import org.dom4j.Node; import org.dom4j.Node;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;

View File

@ -22,11 +22,12 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class CleaningFunctionTest { public class GraphCleaningFunctionsTest {
public static final ObjectMapper MAPPER = new ObjectMapper(); public static final ObjectMapper MAPPER = new ObjectMapper();
@ -62,7 +63,7 @@ public class CleaningFunctionTest {
assertTrue(p_in instanceof Result); assertTrue(p_in instanceof Result);
assertTrue(p_in instanceof Publication); assertTrue(p_in instanceof Publication);
Publication p_out = OafCleaner.apply(CleaningFunctions.fixVocabularyNames(p_in), mapping); Publication p_out = OafCleaner.apply(GraphCleaningFunctions.fixVocabularyNames(p_in), mapping);
assertNotNull(p_out); assertNotNull(p_out);
@ -121,7 +122,7 @@ public class CleaningFunctionTest {
.findFirst() .findFirst()
.isPresent()); .isPresent());
Publication p_cleaned = CleaningFunctions.cleanup(p_out); Publication p_cleaned = GraphCleaningFunctions.cleanup(p_out);
assertEquals(1, p_cleaned.getTitle().size()); assertEquals(1, p_cleaned.getTitle().size());
@ -188,11 +189,13 @@ public class CleaningFunctionTest {
private List<String> vocs() throws IOException { private List<String> vocs() throws IOException {
return IOUtils return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); .readLines(
GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
} }
private List<String> synonyms() throws IOException { private List<String> synonyms() throws IOException {
return IOUtils return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); .readLines(
GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
} }
} }

View File

@ -16,7 +16,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
@ -89,12 +89,14 @@ public class GenerateEntitiesApplicationTest {
private List<String> vocs() throws IOException { private List<String> vocs() throws IOException {
return IOUtils return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); .readLines(
GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
} }
private List<String> synonyms() throws IOException { private List<String> synonyms() throws IOException {
return IOUtils return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); .readLines(
GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
} }
} }

View File

@ -8,7 +8,6 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -20,8 +19,9 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -431,7 +431,6 @@ public class MappersTest {
assertEquals(ModelConstants.DNET_PID_TYPES, i.getAlternateIdentifier().get(0).getQualifier().getSchemeid()); assertEquals(ModelConstants.DNET_PID_TYPES, i.getAlternateIdentifier().get(0).getQualifier().getSchemeid());
assertEquals(ModelConstants.DNET_PID_TYPES, i.getAlternateIdentifier().get(0).getQualifier().getSchemename()); assertEquals(ModelConstants.DNET_PID_TYPES, i.getAlternateIdentifier().get(0).getQualifier().getSchemename());
assertNotNull(i.getUrl()); assertNotNull(i.getUrl());
assertEquals(2, i.getUrl().size()); assertEquals(2, i.getUrl().size());
assertTrue(i.getUrl().contains("http://apps.who.int/trialsearch/Trial3.aspx?trialid=NCT02321059")); assertTrue(i.getUrl().contains("http://apps.who.int/trialsearch/Trial3.aspx?trialid=NCT02321059"));
@ -598,12 +597,14 @@ public class MappersTest {
private List<String> vocs() throws IOException { private List<String> vocs() throws IOException {
return IOUtils return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); .readLines(
GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
} }
private List<String> synonyms() throws IOException { private List<String> synonyms() throws IOException {
return IOUtils return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); .readLines(
GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
} }
} }

View File

@ -730,7 +730,7 @@
<mockito-core.version>3.3.3</mockito-core.version> <mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version> <mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version> <vtd.version>[2.12,3.0)</vtd.version>
<dhp-schemas.version>[2.3.6]</dhp-schemas.version> <dhp-schemas.version>[2.3.7-SNAPSHOT]</dhp-schemas.version>
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version> <dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version> <dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version> <dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>