dump #50

Merged
claudio.atzori merged 98 commits from miriam.baglioni/dnet-hadoop:dump into master 2020-11-04 18:07:01 +01:00
8 changed files with 285 additions and 174 deletions
Showing only changes of commit fcaedac980 - Show all commits

View File

@ -341,13 +341,7 @@ object DoiBoostMappingUtil {
def generateIdentifier (oaf: Result, doi: String): String = {
val id = DHPUtils.md5 (doi.toLowerCase)
return s"50|${
doiBoostNSPREFIX
}${
SEPARATOR
}${
id
}"
s"50|${doiBoostNSPREFIX}${SEPARATOR}${id}"
}

View File

@ -93,7 +93,7 @@ case object Crossref2Oaf {
result.setOriginalId(tmp.filter(id => id != null).asJava)
//Set identifier as {50|60} | doiboost____::md5(DOI)
//Set identifier as 50 | doiboost____::md5(DOI)
result.setId(generateIdentifier(result, doi))
// Add DataInfo
@ -267,7 +267,7 @@ case object Crossref2Oaf {
val r = new Relation
r.setSource(sourceId)
r.setTarget(s"$nsPrefix::$targetId")
r.setTarget(s"40|$nsPrefix::$targetId")
r.setRelType("resultProject")
r.setRelClass("isProducedBy")
r.setSubRelType("outcome")

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.doiboost
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, StructuredProperty, Dataset => OafDataset}
import org.apache.spark.sql.functions.{col, sum}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import scala.::
import scala.collection.JavaConverters._
class QueryTest {
def extractLicense(p:Publication):Tuple2[String,String] = {
val tmp = p.getInstance().asScala.map(i => i.getLicense.getValue).distinct.mkString(",")
(p.getId,tmp)
}
def hasDOI(publication: Publication, doi:String):Boolean = {
val s = publication.getOriginalId.asScala.filter(i => i.equalsIgnoreCase(doi))
s.nonEmpty
}
def hasNullHostedBy(publication: Publication):Boolean = {
publication.getInstance().asScala.exists(i => i.getHostedby == null || i.getHostedby.getValue == null)
}
def myQuery(spark:SparkSession): Unit = {
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderDat: Encoder[OafDataset] = Encoders.kryo[OafDataset]
implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation]
val doiboostPubs:Dataset[Publication] = spark.read.load("/data/doiboost/process/doiBoostPublicationFiltered").as[Publication]
val relFunder: Dataset[Relation] = spark.read.format("org.apache.spark.sql.parquet").load("/data/doiboost/process/crossrefRelation").as[Relation]
doiboostPubs.filter(p => p.getDateofacceptance != null && p.getDateofacceptance.getValue!= null && p.getDateofacceptance.getValue.length > 0 )
doiboostPubs.filter(p=>hasDOI(p, "10.1016/j.is.2020.101522")).collect()(0).getDescription.get(0).getValue
doiboostPubs.filter(p=> hasNullHostedBy(p)).count()
doiboostPubs.map(p=> (p.getId, p.getBestaccessright.getClassname))(Encoders.tuple(Encoders.STRING,Encoders.STRING))
}
}

View File

@ -90,169 +90,15 @@ public class CleanGraphSparkJob {
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
readTableFromPath(spark, inputPath, clazz)
.map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> CleaningFunctions.fixVocabularyNames(value), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> CleaningFunctions.fixDefaults(value), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
protected static <T extends Oaf> T fixVocabularyNames(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
// nothing to clean here
} else if (value instanceof Organization) {
Organization o = (Organization) value;
if (Objects.nonNull(o.getCountry())) {
fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE);
}
} else if (value instanceof Relation) {
// nothing to clean here
} else if (value instanceof Result) {
Result r = (Result) value;
fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES);
fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE);
fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES);
if (Objects.nonNull(r.getSubject())) {
r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES));
}
if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) {
fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES);
fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS);
}
}
if (Objects.nonNull(r.getAuthor())) {
r.getAuthor().forEach(a -> {
if (Objects.nonNull(a.getPid())) {
a.getPid().forEach(p -> {
fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES);
});
}
});
}
if (value instanceof Publication) {
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
} else if (value instanceof OtherResearchProduct) {
} else if (value instanceof Software) {
}
}
return value;
}
private static void fixVocabName(Qualifier q, String vocabularyName) {
if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) {
q.setSchemeid(vocabularyName);
q.setSchemename(vocabularyName);
}
}
protected static <T extends Oaf> T fixDefaults(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
// nothing to clean here
} else if (value instanceof Organization) {
Organization o = (Organization) value;
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {
o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE));
}
} else if (value instanceof Relation) {
// nothing to clean here
} else if (value instanceof Result) {
Result r = (Result) value;
if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) {
r.setPublisher(null);
}
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
r
.setLanguage(
qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES));
}
if (Objects.nonNull(r.getSubject())) {
r
.setSubject(
r
.getSubject()
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.collect(Collectors.toList()));
}
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
r
.setResourcetype(
qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
}
if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) {
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
}
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
}
if (Objects.isNull(i.getRefereed())) {
i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
}
}
}
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance());
if (Objects.isNull(bestaccessrights)) {
r
.setBestaccessright(
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
} else {
r.setBestaccessright(bestaccessrights);
}
}
if (Objects.nonNull(r.getAuthor())) {
boolean nullRank = r
.getAuthor()
.stream()
.anyMatch(a -> Objects.isNull(a.getRank()));
if (nullRank) {
int i = 1;
for (Author author : r.getAuthor()) {
author.setRank(i++);
}
}
}
if (value instanceof Publication) {
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
} else if (value instanceof OtherResearchProduct) {
} else if (value instanceof Software) {
}
}
return value;
}
private static Qualifier qualifier(String classid, String classname, String scheme) {
return OafMapperUtils
.qualifier(
classid, classname, scheme, scheme);
}
private static <T extends Oaf> Dataset<T> readTableFromPath(
SparkSession spark, String inputEntityPath, Class<T> clazz) {

View File

@ -0,0 +1,196 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.clearspring.analytics.util.Lists;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
public class CleaningFunctions {
public static final String ORCID_PREFIX_REGEX = "^http(s?):\\/\\/orcid\\.org\\/";
public static <T extends Oaf> T fixVocabularyNames(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
// nothing to clean here
} else if (value instanceof Organization) {
Organization o = (Organization) value;
if (Objects.nonNull(o.getCountry())) {
fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE);
}
} else if (value instanceof Relation) {
// nothing to clean here
} else if (value instanceof Result) {
Result r = (Result) value;
fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES);
fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE);
fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES);
if (Objects.nonNull(r.getSubject())) {
r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES));
}
if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) {
fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES);
fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS);
}
}
if (Objects.nonNull(r.getAuthor())) {
r.getAuthor().forEach(a -> {
if (Objects.nonNull(a.getPid())) {
a.getPid().forEach(p -> {
fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES);
});
}
});
}
if (value instanceof Publication) {
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
} else if (value instanceof OtherResearchProduct) {
} else if (value instanceof Software) {
}
}
return value;
}
protected static <T extends Oaf> T fixDefaults(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
// nothing to clean here
} else if (value instanceof Organization) {
Organization o = (Organization) value;
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {
o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE));
}
} else if (value instanceof Relation) {
// nothing to clean here
} else if (value instanceof Result) {
Result r = (Result) value;
if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) {
r.setPublisher(null);
}
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
r
.setLanguage(
qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES));
}
if (Objects.nonNull(r.getSubject())) {
r
.setSubject(
r
.getSubject()
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.collect(Collectors.toList()));
}
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
r
.setResourcetype(
qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
}
if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) {
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
}
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
}
if (Objects.isNull(i.getRefereed())) {
i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
}
}
}
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance());
if (Objects.isNull(bestaccessrights)) {
r
.setBestaccessright(
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
} else {
r.setBestaccessright(bestaccessrights);
}
}
if (Objects.nonNull(r.getAuthor())) {
boolean nullRank = r
.getAuthor()
.stream()
.anyMatch(a -> Objects.isNull(a.getRank()));
if (nullRank) {
int i = 1;
for (Author author : r.getAuthor()) {
author.setRank(i++);
}
}
for(Author a : r.getAuthor()) {
if (Objects.isNull(a.getPid())) {
a.setPid(Lists.newArrayList());
} else {
a.setPid(
a.getPid().stream()
.filter(p -> Objects.nonNull(p.getQualifier()))
.filter(p -> StringUtils.isNotBlank(p.getValue()))
.map(p -> {
p.setValue(p.getValue().trim().replaceAll(ORCID_PREFIX_REGEX, ""));
return p;
})
.collect(Collectors.toMap(StructuredProperty::getValue, Function.identity(), (p1, p2) -> p1, LinkedHashMap::new))
.values()
.stream()
.collect(Collectors.toList()));
}
}
}
if (value instanceof Publication) {
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
} else if (value instanceof OtherResearchProduct) {
} else if (value instanceof Software) {
}
}
return value;
}
// HELPERS
private static void fixVocabName(Qualifier q, String vocabularyName) {
if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) {
q.setSchemeid(vocabularyName);
q.setSchemename(vocabularyName);
}
}
private static Qualifier qualifier(String classid, String classname, String scheme) {
return OafMapperUtils
.qualifier(
classid, classname, scheme, scheme);
}
}

View File

@ -1,10 +1,10 @@
DROP VIEW IF EXISTS ${hiveDbName}.result;
CREATE VIEW IF NOT EXISTS result as
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.publication p
CREATE VIEW IF NOT EXISTS ${hiveDbName}.result as
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.publication p
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.dataset d
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.dataset d
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.software s
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.software s
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.otherresearchproduct o;
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.otherresearchproduct o;

View File

@ -62,7 +62,7 @@ public class CleaningFunctionTest {
assertTrue(p_in instanceof Result);
assertTrue(p_in instanceof Publication);
Publication p_out = OafCleaner.apply(CleanGraphSparkJob.fixVocabularyNames(p_in), mapping);
Publication p_out = OafCleaner.apply(CleaningFunctions.fixVocabularyNames(p_in), mapping);
assertNotNull(p_out);
@ -88,7 +88,7 @@ public class CleaningFunctionTest {
.map(p -> p.getQualifier())
.allMatch(q -> pidTerms.contains(q.getClassid())));
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
Publication p_defaults = CleaningFunctions.fixDefaults(p_out);
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
assertNull(p_out.getPublisher());

View File

@ -27,6 +27,28 @@
"schemename": "dnet:pid_types"
},
"value": "0000-0001-9613-6639"
},
{
"dataInfo": {
"deletedbyinference": false,
"inferenceprovenance": "",
"inferred": false,
"invisible": false,
"provenanceaction": {
"classid": "sysimport:crosswalk:datasetarchive",
"classname": "sysimport:crosswalk:datasetarchive",
"schemeid": "dnet:provenanceActions",
"schemename": "dnet:provenanceActions"
},
"trust": "0.9"
},
"qualifier": {
"classid": "ORCID12",
"classname": "ORCID12",
"schemeid": "dnet:pid_types",
"schemename": "dnet:pid_types"
},
"value": "https://orcid.org/0000-0001-9613-6639"
}
],
"rank": 1,
@ -91,8 +113,7 @@
],
"fullname": "Barry, Peter S.",
"name": "Peter S.",
"pid": [
],
"pid": null,
"rank": 3,
"surname": "Barry"
},