forked from D-Net/dnet-hadoop
Merge remote-tracking branch 'origin/beta' into beta
This commit is contained in:
commit
135cf81151
|
@ -211,7 +211,7 @@ public class GraphCleaningFunctions extends CleaningFunctions {
|
||||||
.orElse(s.getValue()),
|
.orElse(s.getValue()),
|
||||||
Function.identity(),
|
Function.identity(),
|
||||||
(s1, s2) -> Collections
|
(s1, s2) -> Collections
|
||||||
.min(Lists.newArrayList(s1, s1), new SubjectProvenanceComparator())))
|
.min(Lists.newArrayList(s1, s2), new SubjectProvenanceComparator())))
|
||||||
.values());
|
.values());
|
||||||
r.setSubject(subjects);
|
r.setSubject(subjects);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.commons.lang3.SerializationUtils;
|
import org.apache.commons.lang3.SerializationUtils;
|
||||||
|
@ -10,6 +11,7 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
|
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyTerm;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
|
||||||
|
@ -31,29 +33,30 @@ public class CleaningRuleMap extends HashMap<Class<?>, SerializableConsumer<Obje
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void cleanSubject(VocabularyGroup vocabularies, Subject subject) {
|
private static void cleanSubject(VocabularyGroup vocabularies, Subject subject) {
|
||||||
if (cleanSubjectForVocabulary(ModelConstants.DNET_SUBJECT_FOS_CLASSID, vocabularies, subject)) {
|
cleanSubjectForVocabulary(ModelConstants.DNET_SUBJECT_FOS_CLASSID, vocabularies, subject);
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
// TODO cleaning based on different subject vocabs can be added here
|
// TODO cleaning based on different subject vocabs can be added here
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean cleanSubjectForVocabulary(String vocabularyId, VocabularyGroup vocabularies,
|
private static void cleanSubjectForVocabulary(String vocabularyId, VocabularyGroup vocabularies,
|
||||||
Subject subject) {
|
Subject subject) {
|
||||||
AtomicReference<Boolean> modified = new AtomicReference<>(false);
|
|
||||||
vocabularies.find(vocabularyId).ifPresent(vocabulary -> {
|
vocabularies.find(vocabularyId).ifPresent(vocabulary -> {
|
||||||
if (!ModelConstants.DNET_SUBJECT_KEYWORD.equalsIgnoreCase(subject.getQualifier().getClassid())) {
|
if (ModelConstants.DNET_SUBJECT_KEYWORD.equalsIgnoreCase(subject.getQualifier().getClassid())) {
|
||||||
return;
|
|
||||||
}
|
|
||||||
Qualifier newValue = vocabulary.lookup(subject.getValue());
|
Qualifier newValue = vocabulary.lookup(subject.getValue());
|
||||||
if (!ModelConstants.UNKNOWN.equals(newValue.getClassid())) {
|
if (!ModelConstants.UNKNOWN.equals(newValue.getClassid())) {
|
||||||
subject.setValue(newValue.getClassid());
|
subject.setValue(newValue.getClassid());
|
||||||
subject.getQualifier().setClassid(vocabularyId);
|
subject.getQualifier().setClassid(vocabularyId);
|
||||||
subject.getQualifier().setClassname(vocabulary.getName());
|
subject.getQualifier().setClassname(vocabulary.getName());
|
||||||
modified.set(true);
|
}
|
||||||
|
} else if (vocabularyId.equals(subject.getQualifier().getClassid())) {
|
||||||
|
Qualifier syn = vocabulary.getSynonymAsQualifier(subject.getValue());
|
||||||
|
VocabularyTerm term = vocabulary.getTerm(subject.getValue());
|
||||||
|
if (Objects.isNull(syn) && Objects.isNull(term)) {
|
||||||
|
subject.getQualifier().setClassid(ModelConstants.DNET_SUBJECT_KEYWORD);
|
||||||
|
subject.getQualifier().setClassname(ModelConstants.DNET_SUBJECT_KEYWORD);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return modified.get();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void cleanRelation(VocabularyGroup vocabularies, Relation r) {
|
private static void cleanRelation(VocabularyGroup vocabularies, Relation r) {
|
||||||
|
|
|
@ -43,7 +43,7 @@ public class CleanCountrySparkJob implements Serializable {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
CleanContextSparkJob.class
|
CleanCountrySparkJob.class
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json"));
|
"/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json"));
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
|
|
@ -471,7 +471,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Clean publications counmtry</name>
|
<name>Clean publication country</name>
|
||||||
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -489,7 +489,7 @@
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/publication</arg>
|
<arg>--workingPath</arg><arg>${workingDir}/working/publication</arg>
|
||||||
<arg>--country</arg><arg>${country}</arg>
|
<arg>--country</arg><arg>${country}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||||
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
|
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
||||||
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="wait_clean_country"/>
|
<ok to="wait_clean_country"/>
|
||||||
|
@ -500,7 +500,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Clean datasets Country</name>
|
<name>Clean dataset country</name>
|
||||||
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -518,7 +518,7 @@
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/dataset</arg>
|
<arg>--workingPath</arg><arg>${workingDir}/working/dataset</arg>
|
||||||
<arg>--country</arg><arg>${country}</arg>
|
<arg>--country</arg><arg>${country}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||||
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
|
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
||||||
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="wait_clean_country"/>
|
<ok to="wait_clean_country"/>
|
||||||
|
@ -529,7 +529,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Clean otherresearchproducts country</name>
|
<name>Clean otherresearchproduct country</name>
|
||||||
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -547,7 +547,7 @@
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/otherresearchproduct</arg>
|
<arg>--workingPath</arg><arg>${workingDir}/working/otherresearchproduct</arg>
|
||||||
<arg>--country</arg><arg>${country}</arg>
|
<arg>--country</arg><arg>${country}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||||
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
|
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
||||||
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="wait_clean_country"/>
|
<ok to="wait_clean_country"/>
|
||||||
|
@ -558,7 +558,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Clean softwares country</name>
|
<name>Clean software country</name>
|
||||||
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -576,7 +576,7 @@
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/software</arg>
|
<arg>--workingPath</arg><arg>${workingDir}/working/software</arg>
|
||||||
<arg>--country</arg><arg>${country}</arg>
|
<arg>--country</arg><arg>${country}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||||
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
|
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
||||||
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="wait_clean_country"/>
|
<ok to="wait_clean_country"/>
|
||||||
|
|
|
@ -278,6 +278,16 @@ public class GraphCleaningFunctionsTest {
|
||||||
s -> "0102 computer and information sciences".equals(s.getValue()) &
|
s -> "0102 computer and information sciences".equals(s.getValue()) &
|
||||||
ModelConstants.DNET_SUBJECT_FOS_CLASSID.equals(s.getQualifier().getClassid())));
|
ModelConstants.DNET_SUBJECT_FOS_CLASSID.equals(s.getQualifier().getClassid())));
|
||||||
|
|
||||||
|
List<Subject> s1 = p_cleaned
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(s -> s.getValue().equals("In Situ Hybridization"))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
assertNotNull(s1);
|
||||||
|
assertEquals(1, s1.size());
|
||||||
|
assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get(0).getQualifier().getClassid());
|
||||||
|
assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get(0).getQualifier().getClassname());
|
||||||
|
|
||||||
// TODO add more assertions to verity the cleaned values
|
// TODO add more assertions to verity the cleaned values
|
||||||
System.out.println(MAPPER.writeValueAsString(p_cleaned));
|
System.out.println(MAPPER.writeValueAsString(p_cleaned));
|
||||||
}
|
}
|
||||||
|
|
|
@ -706,6 +706,28 @@
|
||||||
"source": [
|
"source": [
|
||||||
],
|
],
|
||||||
"subject": [
|
"subject": [
|
||||||
|
{
|
||||||
|
"dataInfo": {
|
||||||
|
"provenanceaction": {
|
||||||
|
"classid": "sysimport:crosswalk:repository",
|
||||||
|
"classname": "sysimport:crosswalk:repository",
|
||||||
|
"schemeid": "dnet:provenanceActions",
|
||||||
|
"schemename": "dnet:provenanceActions"
|
||||||
|
},
|
||||||
|
"deletedbyinference": false,
|
||||||
|
"inferred": false,
|
||||||
|
"inferenceprovenance": "",
|
||||||
|
"invisible": false,
|
||||||
|
"trust": "0.9"
|
||||||
|
},
|
||||||
|
"qualifier": {
|
||||||
|
"classid": "FOS",
|
||||||
|
"classname": "Fields of Science and Technology classification",
|
||||||
|
"schemeid": "dnet:result_subject",
|
||||||
|
"schemename": "dnet:result_subject"
|
||||||
|
},
|
||||||
|
"value": "In Situ Hybridization"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"dataInfo": {
|
"dataInfo": {
|
||||||
"deletedbyinference": false,
|
"deletedbyinference": false,
|
||||||
|
|
Loading…
Reference in New Issue