forked from D-Net/dnet-hadoop
Merge branch 'master' into nsprefix_blacklist
This commit is contained in:
commit
652b13abb6
|
@ -7,6 +7,7 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
|
|
||||||
public class ModelConstants {
|
public class ModelConstants {
|
||||||
|
|
||||||
|
public static final String DNET_SUBJECT_TYPOLOGIES = "dnet:subject_classification_typologies";
|
||||||
public static final String DNET_RESULT_TYPOLOGIES = "dnet:result_typologies";
|
public static final String DNET_RESULT_TYPOLOGIES = "dnet:result_typologies";
|
||||||
public static final String DNET_PUBLICATION_RESOURCE = "dnet:publication_resource";
|
public static final String DNET_PUBLICATION_RESOURCE = "dnet:publication_resource";
|
||||||
public static final String DNET_ACCESS_MODES = "dnet:access_modes";
|
public static final String DNET_ACCESS_MODES = "dnet:access_modes";
|
||||||
|
|
|
@ -90,6 +90,7 @@ public class CleanGraphSparkJob {
|
||||||
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
|
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
|
||||||
|
|
||||||
readTableFromPath(spark, inputPath, clazz)
|
readTableFromPath(spark, inputPath, clazz)
|
||||||
|
.map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz))
|
||||||
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
|
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
|
||||||
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
|
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
|
||||||
.write()
|
.write()
|
||||||
|
@ -98,6 +99,65 @@ public class CleanGraphSparkJob {
|
||||||
.json(outputPath);
|
.json(outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static <T extends Oaf> T fixVocabularyNames(T value) {
|
||||||
|
if (value instanceof Datasource) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Project) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Organization) {
|
||||||
|
Organization o = (Organization) value;
|
||||||
|
if (Objects.nonNull(o.getCountry())) {
|
||||||
|
fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE);
|
||||||
|
}
|
||||||
|
} else if (value instanceof Relation) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Result) {
|
||||||
|
|
||||||
|
Result r = (Result) value;
|
||||||
|
|
||||||
|
fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES);
|
||||||
|
fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE);
|
||||||
|
fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES);
|
||||||
|
|
||||||
|
if (Objects.nonNull(r.getSubject())) {
|
||||||
|
r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES));
|
||||||
|
}
|
||||||
|
if (Objects.nonNull(r.getInstance())) {
|
||||||
|
for (Instance i : r.getInstance()) {
|
||||||
|
fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES);
|
||||||
|
fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (Objects.nonNull(r.getAuthor())) {
|
||||||
|
r.getAuthor().forEach(a -> {
|
||||||
|
if (Objects.nonNull(a.getPid())) {
|
||||||
|
a.getPid().forEach(p -> {
|
||||||
|
fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (value instanceof Publication) {
|
||||||
|
|
||||||
|
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
|
||||||
|
|
||||||
|
} else if (value instanceof OtherResearchProduct) {
|
||||||
|
|
||||||
|
} else if (value instanceof Software) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void fixVocabName(Qualifier q, String vocabularyName) {
|
||||||
|
if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) {
|
||||||
|
q.setSchemeid(vocabularyName);
|
||||||
|
q.setSchemename(vocabularyName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected static <T extends Oaf> T fixDefaults(T value) {
|
protected static <T extends Oaf> T fixDefaults(T value) {
|
||||||
if (value instanceof Datasource) {
|
if (value instanceof Datasource) {
|
||||||
// nothing to clean here
|
// nothing to clean here
|
||||||
|
@ -113,6 +173,9 @@ public class CleanGraphSparkJob {
|
||||||
} else if (value instanceof Result) {
|
} else if (value instanceof Result) {
|
||||||
|
|
||||||
Result r = (Result) value;
|
Result r = (Result) value;
|
||||||
|
if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) {
|
||||||
|
r.setPublisher(null);
|
||||||
|
}
|
||||||
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
|
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
|
||||||
r
|
r
|
||||||
.setLanguage(
|
.setLanguage(
|
||||||
|
|
|
@ -41,28 +41,37 @@ object SparkSplitOafTODLIEntities {
|
||||||
.getOrCreate()
|
.getOrCreate()
|
||||||
|
|
||||||
|
|
||||||
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/OAFDataset").as[Oaf]
|
|
||||||
|
|
||||||
|
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
||||||
|
|
||||||
|
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset]
|
||||||
|
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication]
|
||||||
|
val ebi_relation:Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[DLIRelation]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
OAFDataset
|
OAFDataset
|
||||||
.filter(s => s != null && s.isInstanceOf[DLIPublication])
|
.filter(s => s != null && s.isInstanceOf[DLIPublication])
|
||||||
.map(s =>s.asInstanceOf[DLIPublication])
|
.map(s =>s.asInstanceOf[DLIPublication])
|
||||||
|
.union(ebi_publication)
|
||||||
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
|
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
|
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.repartition(1000)
|
.repartition(1000)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
|
||||||
|
|
||||||
OAFDataset
|
OAFDataset
|
||||||
.filter(s => s != null && s.isInstanceOf[DLIDataset])
|
.filter(s => s != null && s.isInstanceOf[DLIDataset])
|
||||||
.map(s =>s.asInstanceOf[DLIDataset])
|
.map(s =>s.asInstanceOf[DLIDataset])
|
||||||
|
.union(ebi_dataset)
|
||||||
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
|
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.repartition(1000)
|
.repartition(1000)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
|
||||||
|
|
||||||
|
|
||||||
OAFDataset
|
OAFDataset
|
||||||
|
@ -73,18 +82,19 @@ object SparkSplitOafTODLIEntities {
|
||||||
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
|
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.repartition(1000)
|
.repartition(1000)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/unknown")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
|
||||||
|
|
||||||
|
|
||||||
OAFDataset
|
OAFDataset
|
||||||
.filter(s => s != null && s.isInstanceOf[DLIRelation])
|
.filter(s => s != null && s.isInstanceOf[DLIRelation])
|
||||||
.map(s =>s.asInstanceOf[DLIRelation])
|
.map(s =>s.asInstanceOf[DLIRelation])
|
||||||
|
.union(ebi_relation)
|
||||||
.map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder))
|
.map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder))
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
|
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.repartition(1000)
|
.repartition(1000)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,8 @@ import static org.mockito.Mockito.lenient;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -19,9 +21,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
|
||||||
|
@ -62,10 +62,12 @@ public class CleaningFunctionTest {
|
||||||
assertTrue(p_in instanceof Result);
|
assertTrue(p_in instanceof Result);
|
||||||
assertTrue(p_in instanceof Publication);
|
assertTrue(p_in instanceof Publication);
|
||||||
|
|
||||||
Publication p_out = OafCleaner.apply(p_in, mapping);
|
Publication p_out = OafCleaner.apply(CleanGraphSparkJob.fixVocabularyNames(p_in), mapping);
|
||||||
|
|
||||||
assertNotNull(p_out);
|
assertNotNull(p_out);
|
||||||
|
|
||||||
|
assertNotNull(p_out.getPublisher());
|
||||||
|
assertNull(p_out.getPublisher().getValue());
|
||||||
assertEquals("und", p_out.getLanguage().getClassid());
|
assertEquals("und", p_out.getLanguage().getClassid());
|
||||||
assertEquals("Undetermined", p_out.getLanguage().getClassname());
|
assertEquals("Undetermined", p_out.getLanguage().getClassname());
|
||||||
|
|
||||||
|
@ -88,6 +90,16 @@ public class CleaningFunctionTest {
|
||||||
|
|
||||||
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
|
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
|
||||||
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
|
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
|
||||||
|
assertNull(p_out.getPublisher());
|
||||||
|
|
||||||
|
getAuthorPids(p_defaults).forEach(pid -> {
|
||||||
|
System.out
|
||||||
|
.println(
|
||||||
|
String
|
||||||
|
.format(
|
||||||
|
"%s [%s - %s]", pid.getValue(), pid.getQualifier().getClassid(),
|
||||||
|
pid.getQualifier().getClassname()));
|
||||||
|
});
|
||||||
|
|
||||||
// TODO add more assertions to verity the cleaned values
|
// TODO add more assertions to verity the cleaned values
|
||||||
System.out.println(MAPPER.writeValueAsString(p_out));
|
System.out.println(MAPPER.writeValueAsString(p_out));
|
||||||
|
@ -97,7 +109,7 @@ public class CleaningFunctionTest {
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
private Stream<Qualifier> getAuthorPidTypes(Publication pub) {
|
private Stream<Qualifier> getAuthorPidTypes(Result pub) {
|
||||||
return pub
|
return pub
|
||||||
.getAuthor()
|
.getAuthor()
|
||||||
.stream()
|
.stream()
|
||||||
|
@ -106,6 +118,14 @@ public class CleaningFunctionTest {
|
||||||
.map(s -> s.getQualifier());
|
.map(s -> s.getQualifier());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Stream<StructuredProperty> getAuthorPids(Result pub) {
|
||||||
|
return pub
|
||||||
|
.getAuthor()
|
||||||
|
.stream()
|
||||||
|
.map(a -> a.getPid())
|
||||||
|
.flatMap(p -> p.stream());
|
||||||
|
}
|
||||||
|
|
||||||
private List<String> vocs() throws IOException {
|
private List<String> vocs() throws IOException {
|
||||||
return IOUtils
|
return IOUtils
|
||||||
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
|
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
|
||||||
|
|
|
@ -59,6 +59,28 @@
|
||||||
"schemename": "dnet:pid_types"
|
"schemename": "dnet:pid_types"
|
||||||
},
|
},
|
||||||
"value": "qwerty"
|
"value": "qwerty"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"dataInfo": {
|
||||||
|
"deletedbyinference": false,
|
||||||
|
"inferenceprovenance": "",
|
||||||
|
"inferred": false,
|
||||||
|
"invisible": false,
|
||||||
|
"provenanceaction": {
|
||||||
|
"classid": "sysimport:crosswalk:datasetarchive",
|
||||||
|
"classname": "sysimport:crosswalk:datasetarchive",
|
||||||
|
"schemeid": "dnet:provenanceActions",
|
||||||
|
"schemename": "dnet:provenanceActions"
|
||||||
|
},
|
||||||
|
"trust": "0.9"
|
||||||
|
},
|
||||||
|
"qualifier": {
|
||||||
|
"classid": "ORCID",
|
||||||
|
"classname": "ORCID",
|
||||||
|
"schemeid": "",
|
||||||
|
"schemename": ""
|
||||||
|
},
|
||||||
|
"value": "asdasd"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"rank": 2,
|
"rank": 2,
|
||||||
|
@ -186,6 +208,9 @@
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"bestaccessright": null,
|
"bestaccessright": null,
|
||||||
|
"publisher": {
|
||||||
|
"value": null
|
||||||
|
},
|
||||||
"collectedfrom": [
|
"collectedfrom": [
|
||||||
{
|
{
|
||||||
"key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747",
|
"key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747",
|
||||||
|
|
Loading…
Reference in New Issue