forked from antonis.lempesis/dnet-hadoop
Merge branch 'beta' of https://code-repo.d4science.org/D-Net/dnet-hadoop into beta
This commit is contained in:
commit
dc0ec88a58
|
@ -4,6 +4,7 @@ package eu.dnetlib.dhp.common.vocabulary;
|
|||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -66,27 +67,39 @@ public class Vocabulary implements Serializable {
|
|||
}
|
||||
|
||||
public Qualifier getTermAsQualifier(final String termId) {
|
||||
if (StringUtils.isBlank(termId)) {
|
||||
return getTermAsQualifier(termId, false);
|
||||
}
|
||||
|
||||
public Qualifier getTermAsQualifier(final String termId, boolean strict) {
|
||||
final VocabularyTerm term = getTerm(termId);
|
||||
if (Objects.nonNull(term)) {
|
||||
return OafMapperUtils.qualifier(term.getId(), term.getName(), getId(), getName());
|
||||
} else if (Objects.isNull(term) && strict) {
|
||||
return OafMapperUtils.unknown(getId(), getName());
|
||||
} else if (termExists(termId)) {
|
||||
final VocabularyTerm t = getTerm(termId);
|
||||
return OafMapperUtils.qualifier(t.getId(), t.getName(), getId(), getName());
|
||||
} else {
|
||||
return OafMapperUtils.qualifier(termId, termId, getId(), getName());
|
||||
}
|
||||
}
|
||||
|
||||
public Qualifier getSynonymAsQualifier(final String syn) {
|
||||
return getSynonymAsQualifier(syn, false);
|
||||
}
|
||||
|
||||
public Qualifier getSynonymAsQualifier(final String syn, boolean strict) {
|
||||
return Optional
|
||||
.ofNullable(getTermBySynonym(syn))
|
||||
.map(term -> getTermAsQualifier(term.getId()))
|
||||
.map(term -> getTermAsQualifier(term.getId(), strict))
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
public Qualifier lookup(String id) {
|
||||
return lookup(id, false);
|
||||
}
|
||||
|
||||
public Qualifier lookup(String id, boolean strict) {
|
||||
return Optional
|
||||
.ofNullable(getSynonymAsQualifier(id))
|
||||
.orElse(getTermAsQualifier(id));
|
||||
.ofNullable(getSynonymAsQualifier(id, strict))
|
||||
.orElse(getTermAsQualifier(id, strict));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -333,7 +333,7 @@ public class GraphCleaningFunctions extends CleaningFunctions {
|
|||
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
|
||||
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
|
||||
}
|
||||
if (Objects.isNull(i.getRefereed())) {
|
||||
if (Objects.isNull(i.getRefereed()) || StringUtils.isBlank(i.getRefereed().getClassid())) {
|
||||
i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
|
||||
}
|
||||
if (Objects.nonNull(i.getDateofacceptance())) {
|
||||
|
|
|
@ -42,7 +42,7 @@ public class CleaningRuleMap extends HashMap<Class<?>, SerializableConsumer<Obje
|
|||
|
||||
vocabularies.find(vocabularyId).ifPresent(vocabulary -> {
|
||||
if (ModelConstants.DNET_SUBJECT_KEYWORD.equalsIgnoreCase(subject.getQualifier().getClassid())) {
|
||||
Qualifier newValue = vocabulary.lookup(subject.getValue());
|
||||
Qualifier newValue = vocabulary.lookup(subject.getValue(), true);
|
||||
if (!ModelConstants.UNKNOWN.equals(newValue.getClassid())) {
|
||||
subject.setValue(newValue.getClassid());
|
||||
subject.getQualifier().setClassid(vocabularyId);
|
||||
|
|
|
@ -13,6 +13,7 @@ import java.util.stream.Stream;
|
|||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
|
@ -105,6 +106,7 @@ public class CleanCfHbSparkJob {
|
|||
resolved
|
||||
.joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId")))
|
||||
.map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class))
|
||||
.filter((FilterFunction<IdCfHbMapping>) m -> Objects.nonNull(m.getMasterId()))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(resolvedPath);
|
||||
|
@ -134,22 +136,50 @@ public class CleanCfHbSparkJob {
|
|||
|
||||
private static MapFunction<Tuple2<IdCfHbMapping, MasterDuplicate>, IdCfHbMapping> asIdCfHbMapping() {
|
||||
return t -> {
|
||||
t._1().setMasterId(t._2().getMasterId());
|
||||
t._1().setMasterName(t._2().getMasterName());
|
||||
return t._1();
|
||||
final IdCfHbMapping mapping = t._1();
|
||||
Optional
|
||||
.ofNullable(t._2())
|
||||
.ifPresent(t2 -> {
|
||||
mapping.setMasterId(t2.getMasterId());
|
||||
mapping.setMasterName(t2.getMasterName());
|
||||
|
||||
});
|
||||
return mapping;
|
||||
};
|
||||
}
|
||||
|
||||
private static <T extends Result> FlatMapFunction<T, IdCfHbMapping> flattenCfHbFn() {
|
||||
return r -> Stream
|
||||
.concat(
|
||||
r.getCollectedfrom().stream().map(KeyValue::getKey),
|
||||
Optional
|
||||
.ofNullable(r.getCollectedfrom())
|
||||
.map(cf -> cf.stream().map(KeyValue::getKey))
|
||||
.orElse(Stream.empty()),
|
||||
Stream
|
||||
.concat(
|
||||
r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey),
|
||||
r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey)))
|
||||
Optional
|
||||
.ofNullable(r.getInstance())
|
||||
.map(
|
||||
instances -> instances
|
||||
.stream()
|
||||
.map(i -> Optional.ofNullable(i.getHostedby()).map(KeyValue::getKey).orElse("")))
|
||||
.orElse(Stream.empty())
|
||||
.filter(StringUtils::isNotBlank),
|
||||
Optional
|
||||
.ofNullable(r.getInstance())
|
||||
.map(
|
||||
instances -> instances
|
||||
.stream()
|
||||
.map(
|
||||
i -> Optional
|
||||
.ofNullable(i.getCollectedfrom())
|
||||
.map(KeyValue::getKey)
|
||||
.orElse("")))
|
||||
.orElse(Stream.empty())
|
||||
.filter(StringUtils::isNotBlank)))
|
||||
.distinct()
|
||||
.map(s -> asIdCfHbMapping(r.getId(), s))
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.map(cfHb -> asIdCfHbMapping(r.getId(), cfHb))
|
||||
.iterator();
|
||||
}
|
||||
|
||||
|
|
|
@ -4,9 +4,12 @@ package eu.dnetlib.dhp.oa.graph.clean.country;
|
|||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.swing.text.html.Option;
|
||||
|
||||
|
@ -30,6 +33,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob;
|
||||
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||
|
@ -110,8 +114,8 @@ public class CleanCountrySparkJob implements Serializable {
|
|||
return r;
|
||||
}
|
||||
|
||||
if (r
|
||||
.getPid()
|
||||
List<StructuredProperty> ids = getPidsAndAltIds(r).collect(Collectors.toList());
|
||||
if (ids
|
||||
.stream()
|
||||
.anyMatch(
|
||||
p -> p
|
||||
|
@ -148,6 +152,42 @@ public class CleanCountrySparkJob implements Serializable {
|
|||
.json(inputPath);
|
||||
}
|
||||
|
||||
private static <T extends Result> Stream<StructuredProperty> getPidsAndAltIds(T r) {
|
||||
final Stream<StructuredProperty> resultPids = Optional
|
||||
.ofNullable(r.getPid())
|
||||
.map(Collection::stream)
|
||||
.orElse(Stream.empty());
|
||||
|
||||
final Stream<StructuredProperty> instancePids = Optional
|
||||
.ofNullable(r.getInstance())
|
||||
.map(
|
||||
instance -> instance
|
||||
.stream()
|
||||
.flatMap(
|
||||
i -> Optional
|
||||
.ofNullable(i.getPid())
|
||||
.map(Collection::stream)
|
||||
.orElse(Stream.empty())))
|
||||
.orElse(Stream.empty());
|
||||
|
||||
final Stream<StructuredProperty> instanceAltIds = Optional
|
||||
.ofNullable(r.getInstance())
|
||||
.map(
|
||||
instance -> instance
|
||||
.stream()
|
||||
.flatMap(
|
||||
i -> Optional
|
||||
.ofNullable(i.getAlternateIdentifier())
|
||||
.map(Collection::stream)
|
||||
.orElse(Stream.empty())))
|
||||
.orElse(Stream.empty());
|
||||
|
||||
return Stream
|
||||
.concat(
|
||||
Stream.concat(resultPids, instancePids),
|
||||
instanceAltIds);
|
||||
}
|
||||
|
||||
private static boolean pidInParam(String value, String[] verifyParam) {
|
||||
for (String s : verifyParam)
|
||||
if (value.startsWith(s))
|
||||
|
|
|
@ -82,10 +82,10 @@ public class CleanContextTest {
|
|||
CleanContextSparkJob.main(new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--inputPath", workingDir.toString() + "/publication",
|
||||
"-graphTableClassName", Publication.class.getCanonicalName(),
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-contextId", "sobigdata",
|
||||
"-verifyParam", "gCube "
|
||||
"--graphTableClassName", Publication.class.getCanonicalName(),
|
||||
"--workingDir", workingDir.toString() + "/working",
|
||||
"--contextId", "sobigdata",
|
||||
"--verifyParam", "gCube "
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
|
|
@ -84,12 +84,12 @@ public class CleanCountryTest {
|
|||
CleanCountrySparkJob.main(new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--inputPath", workingDir.toString() + "/publication",
|
||||
"-graphTableClassName", Publication.class.getCanonicalName(),
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-country", "NL",
|
||||
"-verifyParam", "10.17632",
|
||||
"-collectedfrom", "NARCIS",
|
||||
"-hostedBy", getClass()
|
||||
"--graphTableClassName", Publication.class.getCanonicalName(),
|
||||
"--workingDir", workingDir.toString() + "/working",
|
||||
"--country", "NL",
|
||||
"--verifyParam", "10.17632",
|
||||
"--collectedfrom", "NARCIS",
|
||||
"--hostedBy", getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
|
||||
.getPath()
|
||||
});
|
||||
|
|
|
@ -7,6 +7,7 @@ import static org.mockito.Mockito.lenient;
|
|||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -278,20 +279,25 @@ public class GraphCleaningFunctionsTest {
|
|||
s -> "0102 computer and information sciences".equals(s.getValue()) &
|
||||
ModelConstants.DNET_SUBJECT_FOS_CLASSID.equals(s.getQualifier().getClassid())));
|
||||
|
||||
List<Subject> s1 = p_cleaned
|
||||
.getSubject()
|
||||
.stream()
|
||||
.filter(s -> s.getValue().equals("In Situ Hybridization"))
|
||||
.collect(Collectors.toList());
|
||||
assertNotNull(s1);
|
||||
assertEquals(1, s1.size());
|
||||
assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get(0).getQualifier().getClassid());
|
||||
assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get(0).getQualifier().getClassname());
|
||||
verify_keyword(p_cleaned, "In Situ Hybridization");
|
||||
verify_keyword(p_cleaned, "Avicennia");
|
||||
|
||||
// TODO add more assertions to verity the cleaned values
|
||||
System.out.println(MAPPER.writeValueAsString(p_cleaned));
|
||||
}
|
||||
|
||||
private static void verify_keyword(Publication p_cleaned, String subject) {
|
||||
Optional<Subject> s1 = p_cleaned
|
||||
.getSubject()
|
||||
.stream()
|
||||
.filter(s -> s.getValue().equals(subject))
|
||||
.findFirst();
|
||||
|
||||
assertTrue(s1.isPresent());
|
||||
assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get().getQualifier().getClassid());
|
||||
assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get().getQualifier().getClassname());
|
||||
}
|
||||
|
||||
private Stream<Qualifier> getAuthorPidTypes(Result pub) {
|
||||
return pub
|
||||
.getAuthor()
|
||||
|
|
|
@ -143,7 +143,7 @@ public class CleanCfHbSparkJobTest {
|
|||
"--outputPath", outputPath,
|
||||
"--resolvedPath", resolvedPath + "/dataset",
|
||||
"--graphTableClassName", Dataset.class.getCanonicalName(),
|
||||
"--datasourceMasterDuplicate", dsMasterDuplicatePath
|
||||
"--masterDuplicatePath", dsMasterDuplicatePath
|
||||
});
|
||||
|
||||
assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset")));
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
@ -238,7 +239,11 @@ class MappersTest {
|
|||
assertNotNull(i.getAccessright());
|
||||
assertEquals("OPEN", i.getAccessright().getClassid());
|
||||
});
|
||||
assertEquals("UNKNOWN", p.getInstance().get(0).getRefereed().getClassid());
|
||||
|
||||
Publication p_cleaned = cleanup(p, vocs);
|
||||
assertEquals("0000", p_cleaned.getInstance().get(0).getRefereed().getClassid());
|
||||
assertEquals("Unknown", p_cleaned.getInstance().get(0).getRefereed().getClassname());
|
||||
|
||||
assertNotNull(p.getInstance().get(0).getPid());
|
||||
assertEquals(2, p.getInstance().get(0).getPid().size());
|
||||
|
||||
|
@ -453,7 +458,10 @@ class MappersTest {
|
|||
assertNotNull(i.getAccessright());
|
||||
assertEquals("OPEN", i.getAccessright().getClassid());
|
||||
});
|
||||
assertEquals("UNKNOWN", p.getInstance().get(0).getRefereed().getClassid());
|
||||
|
||||
Publication p_cleaned = cleanup(p, vocs);
|
||||
assertEquals("0000", p_cleaned.getInstance().get(0).getRefereed().getClassid());
|
||||
assertEquals("Unknown", p_cleaned.getInstance().get(0).getRefereed().getClassname());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -570,7 +578,9 @@ class MappersTest {
|
|||
assertTrue(i.getUrl().contains("http://apps.who.int/trialsearch/Trial3.aspx?trialid=NCT02321059"));
|
||||
assertTrue(i.getUrl().contains("https://clinicaltrials.gov/ct2/show/NCT02321059"));
|
||||
|
||||
assertEquals("UNKNOWN", i.getRefereed().getClassid());
|
||||
Dataset d_cleaned = cleanup(d, vocs);
|
||||
assertEquals("0000", d_cleaned.getInstance().get(0).getRefereed().getClassid());
|
||||
assertEquals("Unknown", d_cleaned.getInstance().get(0).getRefereed().getClassname());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -871,7 +881,10 @@ class MappersTest {
|
|||
assertNotNull(i.getAccessright());
|
||||
assertEquals("UNKNOWN", i.getAccessright().getClassid());
|
||||
});
|
||||
assertEquals("UNKNOWN", p.getInstance().get(0).getRefereed().getClassid());
|
||||
|
||||
Dataset p_cleaned = cleanup(p, vocs);
|
||||
assertEquals("0000", p_cleaned.getInstance().get(0).getRefereed().getClassid());
|
||||
assertEquals("Unknown", p_cleaned.getInstance().get(0).getRefereed().getClassname());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -907,24 +907,23 @@
|
|||
{
|
||||
"dataInfo": {
|
||||
"deletedbyinference": false,
|
||||
"inferenceprovenance": "",
|
||||
"inferred": false,
|
||||
"invisible": false,
|
||||
"provenanceaction": {
|
||||
"classid": "sysimport:crosswalk:datasetarchive",
|
||||
"classname": "sysimport:crosswalk:datasetarchive",
|
||||
"classid": "sysimport:actionset",
|
||||
"classname": "Harvested",
|
||||
"schemeid": "dnet:provenanceActions",
|
||||
"schemename": "dnet:provenanceActions"
|
||||
},
|
||||
"trust": "0.9"
|
||||
},
|
||||
"qualifier": {
|
||||
"classid": "",
|
||||
"classname": "",
|
||||
"schemeid": "",
|
||||
"schemename": ""
|
||||
"classid": "FOS",
|
||||
"classname": "Fields of Science and Technology classification",
|
||||
"schemeid": "dnet:subject_classification_typologies",
|
||||
"schemename": "dnet:subject_classification_typologies"
|
||||
},
|
||||
"value": "doped silicon"
|
||||
"value": "Avicennia"
|
||||
},
|
||||
{
|
||||
"dataInfo": {
|
||||
|
|
Loading…
Reference in New Issue