enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
2 changed files with 229 additions and 82 deletions
Showing only changes of commit 1681de672d - Show all commits

View File

@ -4,9 +4,12 @@ import java.time.LocalDateTime
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
import eu.dnetlib.dhp.common.PacePerson import eu.dnetlib.dhp.common.PacePerson
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, Field, Instance, KeyValue, Publication, Qualifier, Relation, StructuredProperty} import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, StructuredProperty}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.codehaus.jackson.map.ObjectMapper
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -77,6 +80,76 @@ object DLIToOAF {
) )
val rel_inverse: Map[String, String] = Map(
"isRelatedTo" -> "isRelatedTo",
"IsSupplementedBy" -> "isSupplementTo",
"cites" -> "IsCitedBy",
"IsCitedBy" -> "cites",
"reviews" -> "IsReviewedBy"
)
val PidTypeMap: Map[String, String] = Map(
"pbmid" -> "pmid",
"pmcid" -> "pmc",
"pmid" -> "pmid",
"pubmedid" -> "pmid",
"DOI" -> "doi",
"doi" -> "doi"
)
def toActionSet(item: Oaf): (String, String) = {
val mapper = new ObjectMapper()
item match {
case dataset: Dataset =>
val a: AtomicAction[Dataset] = new AtomicAction[Dataset]
a.setClazz(classOf[Dataset])
a.setPayload(dataset)
(dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
case publication: Publication =>
val a: AtomicAction[Publication] = new AtomicAction[Publication]
a.setClazz(classOf[Publication])
a.setPayload(publication)
(publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
case relation: Relation =>
val a: AtomicAction[Relation] = new AtomicAction[Relation]
a.setClazz(classOf[Relation])
a.setPayload(relation)
(relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
case _ =>
null
}
}
def convertClinicalTrial(dataset: DLIDataset): (String, String) = {
val currentId = generateId(dataset.getId)
val pids = dataset.getPid.asScala.filter(p => "clinicaltrials.gov".equalsIgnoreCase(p.getQualifier.getClassname)).map(p => s"50|r3111dacbab5::${DHPUtils.md5(p.getValue.toLowerCase())}")
if (pids.isEmpty)
null
else
(currentId, pids.head)
}
def insertExternalRefs(publication: Publication, externalReferences: List[DLIExternalReference]): Publication = {
val eRefs = externalReferences.map(e => {
val result = new ExternalReference()
result.setSitename(e.sitename)
result.setLabel(e.label)
result.setUrl(e.url)
result.setRefidentifier(e.pid)
result.setDataInfo(generateDataInfo())
result.setQualifier(createQualifier(e.classId, "dnet:externalReference_typologies"))
result
})
publication.setExternalReference(eRefs.asJava)
publication
}
def filterPid(p: StructuredProperty): Boolean = { def filterPid(p: StructuredProperty): Boolean = {
if (expectecdPidType.contains(p.getQualifier.getClassname) && p.getQualifier.getClassname.equalsIgnoreCase("url")) if (expectecdPidType.contains(p.getQualifier.getClassname) && p.getQualifier.getClassname.equalsIgnoreCase("url"))
if (filteredURL.exists(u => p.getValue.contains(u))) if (filteredURL.exists(u => p.getValue.contains(u)))
@ -97,7 +170,6 @@ object DLIToOAF {
} }
def convertDLIDatasetToExternalReference(dataset: DLIDataset): DLIExternalReference = { def convertDLIDatasetToExternalReference(dataset: DLIDataset): DLIExternalReference = {
val currentId = generateId(dataset.getId)
val pids = dataset.getPid.asScala.filter(filterPid) val pids = dataset.getPid.asScala.filter(filterPid)
if (pids == null || pids.isEmpty) if (pids == null || pids.isEmpty)
@ -126,43 +198,50 @@ object DLIToOAF {
} }
def convertDLIPublicationToOAF(p: DLIPublication): Publication = { def convertDLIPublicationToOAF(inputPublication: DLIPublication): Publication = {
val result = new Publication val result = new Publication
result.setId(generateId(p.getId)) val cleanedPids = inputPublication.getPid.asScala.filter(p => PidTypeMap.contains(p.getQualifier.getClassid))
.map(p => {
p.setQualifier(createQualifier(PidTypeMap(p.getQualifier.getClassid), p.getQualifier.getSchemeid))
p
})
if (cleanedPids.isEmpty)
return null
result.setId(generateId(inputPublication.getId))
result.setDataInfo(generateDataInfo(invisibile = true)) result.setDataInfo(generateDataInfo(invisibile = true))
if (p.getCollectedfrom == null || p.getCollectedfrom.size() == 0 || (p.getCollectedfrom.size() == 1 && p.getCollectedfrom.get(0) == null)) if (inputPublication.getCollectedfrom == null || inputPublication.getCollectedfrom.size() == 0 || (inputPublication.getCollectedfrom.size() == 1 && inputPublication.getCollectedfrom.get(0) == null))
return null return null
result.setCollectedfrom(inputPublication.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
result.setCollectedfrom(p.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).asJava) if(result.getCollectedfrom.isEmpty)
result.setPid(p.getPid) return null
result.setDateofcollection(p.getDateofcollection) result.setPid(cleanedPids.asJava)
result.setOriginalId(p.getPid.asScala.map(p => p.getValue).asJava) result.setDateofcollection(inputPublication.getDateofcollection)
result.setOriginalId(inputPublication.getPid.asScala.map(p => p.getValue).asJava)
result.setDateoftransformation(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"))) result.setDateoftransformation(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'")))
if (p.getAuthor == null || p.getAuthor.isEmpty) if (inputPublication.getAuthor == null || inputPublication.getAuthor.isEmpty)
return null return null
result.setAuthor(p.getAuthor.asScala.map(convertAuthor).asJava) result.setAuthor(inputPublication.getAuthor.asScala.map(convertAuthor).asJava)
result.setResulttype(createQualifier(p.getResulttype.getClassid, p.getResulttype.getClassname, "dnet:result_typologies", "dnet:result_typologies")) result.setResulttype(createQualifier(inputPublication.getResulttype.getClassid, inputPublication.getResulttype.getClassname, "dnet:result_typologies", "dnet:result_typologies"))
if (p.getSubject != null) if (inputPublication.getSubject != null)
result.setSubject(p.getSubject.asScala.map(convertSubject).asJava) result.setSubject(inputPublication.getSubject.asScala.map(convertSubject).asJava)
if (p.getTitle == null || p.getTitle.isEmpty) if (inputPublication.getTitle == null || inputPublication.getTitle.isEmpty)
return null return null
result.setTitle(List(patchTitle(p.getTitle.get(0))).asJava) result.setTitle(List(patchTitle(inputPublication.getTitle.get(0))).asJava)
if (p.getRelevantdate == null || p.getRelevantdate.size() == 0) if (inputPublication.getRelevantdate == null || inputPublication.getRelevantdate.size() == 0)
return null return null
result.setRelevantdate(p.getRelevantdate.asScala.map(patchRelevantDate).asJava) result.setRelevantdate(inputPublication.getRelevantdate.asScala.map(patchRelevantDate).asJava)
result.setDescription(p.getDescription) result.setDescription(inputPublication.getDescription)
result.setDateofacceptance(asField(p.getRelevantdate.get(0).getValue)) result.setDateofacceptance(asField(inputPublication.getRelevantdate.get(0).getValue))
result.setPublisher(p.getPublisher) result.setPublisher(inputPublication.getPublisher)
result.setSource(p.getSource) result.setSource(inputPublication.getSource)
result.setBestaccessright(createQualifier("UNKNOWN", "not available", "dnet:access_modes", "dnet:access_modes")) result.setBestaccessright(createQualifier("UNKNOWN", "not available", "dnet:access_modes", "dnet:access_modes"))
val dois = result.getPid.asScala.filter(p => "doi".equalsIgnoreCase(p.getQualifier.getClassname)).map(p => p.getValue) val dois = result.getPid.asScala.filter(p => "doi".equalsIgnoreCase(p.getQualifier.getClassname)).map(p => p.getValue)
@ -170,7 +249,7 @@ object DLIToOAF {
return null return null
val i: Instance = createInstance(s"https://dx.doi.org/${dois.head}", firstInstanceOrNull(p.getInstance()), result.getDateofacceptance) val i: Instance = createInstance(s"https://dx.doi.org/${dois.head}", firstInstanceOrNull(inputPublication.getInstance()), result.getDateofacceptance)
if (i != null) if (i != null)
result.setInstance(List(i).asJava) result.setInstance(List(i).asJava)
@ -211,7 +290,9 @@ object DLIToOAF {
val result: Dataset = new Dataset val result: Dataset = new Dataset
result.setId(generateId(d.getId)) result.setId(generateId(d.getId))
result.setDataInfo(generateDataInfo()) result.setDataInfo(generateDataInfo())
result.setCollectedfrom(d.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).asJava) result.setCollectedfrom(d.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
if(result.getCollectedfrom.isEmpty)
return null
result.setPid(d.getPid) result.setPid(d.getPid)
@ -280,7 +361,7 @@ object DLIToOAF {
if (dataset) if (dataset)
i.setInstancetype(createQualifier("0021", "Dataset", "dnet:publication_resource", "dnet:publication_resource")) i.setInstancetype(createQualifier("0021", "Dataset", "dnet:publication_resource", "dnet:publication_resource"))
else else
i.setInstancetype(createQualifier("0000", "UNKNOWN", "dnet:publication_resource", "dnet:publication_resource")) i.setInstancetype(createQualifier("0000", "Unknown", "dnet:publication_resource", "dnet:publication_resource"))
if (originalInstance != null && originalInstance.getHostedby != null) if (originalInstance != null && originalInstance.getHostedby != null)
i.setHostedby(originalInstance.getHostedby) i.setHostedby(originalInstance.getHostedby)

View File

@ -4,10 +4,16 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext}
import org.codehaus.jackson.map.ObjectMapper import org.codehaus.jackson.map.ObjectMapper
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
@ -36,57 +42,66 @@ object SparkExportContentForOpenAire {
implicit val dliRelEncoder: Encoder[DLIRelation] = Encoders.bean(classOf[DLIRelation]) implicit val dliRelEncoder: Encoder[DLIRelation] = Encoders.bean(classOf[DLIRelation])
import spark.implicits._ import spark.implicits._
//
// val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j") val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j")
// .map(s => new ObjectMapper().readValue(s, classOf[DLIRelation])) .map(s => new ObjectMapper().readValue(s, classOf[DLIRelation]))
// .filter(p => p.getDataInfo.getDeletedbyinference == false) .filter(p => p.getDataInfo.getDeletedbyinference == false)
// .map(DLIToOAF.convertDLIRelation).filter(p=>p!= null) .map(DLIToOAF.convertDLIRelation).filter(p=>p!= null)
// spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS") spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS")
//
// val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset") val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset")
// .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
// .filter(p => p.getDataInfo.getDeletedbyinference == false) .filter(p => p.getDataInfo.getDeletedbyinference == false)
// .map(DLIToOAF.convertDLIDatasetTOOAF).filter(p=>p!= null) .map(DLIToOAF.convertDLIDatasetTOOAF).filter(p=>p!= null)
// spark.createDataset(datRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS") spark.createDataset(datRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS")
//
//
// val pubRDD:RDD[Publication] = sc.textFile(s"$workingPath/publication") val pubRDD:RDD[Publication] = sc.textFile(s"$workingPath/publication")
// .map(s => new ObjectMapper().readValue(s, classOf[DLIPublication])) .map(s => new ObjectMapper().readValue(s, classOf[DLIPublication]))
// .filter(p => p.getDataInfo.getDeletedbyinference == false) .filter(p => p.getDataInfo.getDeletedbyinference == false)
// .map(DLIToOAF.convertDLIPublicationToOAF).filter(p=>p!= null) .map(DLIToOAF.convertDLIPublicationToOAF).filter(p=>p!= null)
// spark.createDataset(pubRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS") spark.createDataset(pubRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS")
//
//
//
// val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS").as[Publication] val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS").as[Publication]
// val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/datasetDS").as[OafDataset] val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/datasetDS").as[OafDataset]
var relDS :Dataset[Relation] = spark.read.load(s"$workingPath/relationDS").as[Relation] val relDS1 :Dataset[Relation] = spark.read.load(s"$workingPath/relationDS").as[Relation]
//
//
// pubs.joinWith(relDS, pubs("id").equalTo(relDS("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_f1") val pub_id = pubs.select("id").distinct()
// val dat_id = dats.select("id").distinct()
// relDS= spark.read.load(s"$workingPath/relationDS_f1").as[Relation]
//
// relDS.joinWith(dats, relDS("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_filtered") pub_id.joinWith(relDS1, pub_id("id").equalTo(relDS1("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_f1")
//
// val relDS2= spark.read.load(s"$workingPath/relationDS_f1").as[Relation]
// val r_source = relDS.select(relDS("source")).distinct()
// val r_target = relDS.select(relDS("source")).distinct() relDS2.joinWith(dat_id, relDS2("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_filtered")
//
//
// pubs.joinWith(r_source, pubs("id").equalTo(r_source("source")), "inner").map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS_filtered") val r_source = relDS2.select(relDS2("source")).distinct()
// val r_target = relDS2.select(relDS2("target")).distinct()
// dats.joinWith(r_target, dats("id").equalTo(r_target("target")), "inner").map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS_filtered")
//
// spark.createDataset(sc.textFile(s"$workingPath/dataset") val w2 = Window.partitionBy("id").orderBy("lastupdatetimestamp")
// .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
// .map(DLIToOAF.convertDLIDatasetToExternalReference) pubs.joinWith(r_source, pubs("id").equalTo(r_source("source")), "inner").map(k => k._1)
// .filter(p => p != null)).as[DLIExternalReference].write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference") .withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row")
// .write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS_filtered")
dats.joinWith(r_target, dats("id").equalTo(r_target("target")), "inner").map(k => k._1)
.withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row")
.write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS")
spark.createDataset(sc.textFile(s"$workingPath/dataset")
.map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
.map(DLIToOAF.convertDLIDatasetToExternalReference)
.filter(p => p != null)).as[DLIExternalReference].write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference")
val pf = spark.read.load(s"$workingPath/publicationDS_filtered").select("id") val pf = spark.read.load(s"$workingPath/publicationDS_filtered").select("id")
relDS = spark.read.load(s"$workingPath/relationDS").as[Relation] val relDS3 = spark.read.load(s"$workingPath/relationDS").as[Relation]
val relationTo = pf.joinWith(relDS, pf("id").equalTo(relDS("source")),"inner").map(t =>t._2) val relationTo = pf.joinWith(relDS3, pf("id").equalTo(relDS3("source")),"inner").map(t =>t._2)
val extRef = spark.read.load(s"$workingPath/externalReference").as[DLIExternalReference] val extRef = spark.read.load(s"$workingPath/externalReference").as[DLIExternalReference]
@ -100,11 +115,64 @@ object SparkExportContentForOpenAire {
(f._1, dli_ext) (f._1, dli_ext)
})).write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference_grouped") })).write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference_grouped")
val pubf :Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS_filtered").as[Publication]
val groupedERf:Dataset[(String, List[DLIExternalReference])]= spark.read.load(s"$workingPath/externalReference_grouped").as[(String, List[DLIExternalReference])]
groupedERf.joinWith(pubf,pubf("id").equalTo(groupedERf("_1"))).map(t =>
{
val publication = t._2
if (t._1 != null) {
val eRefs = t._1._2
DLIToOAF.insertExternalRefs(publication, eRefs)
} else
publication
}
).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS")
spark.createDataset(sc.textFile(s"$workingPath/dataset")
.map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
.map(DLIToOAF.convertClinicalTrial)
.filter(p => p != null))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrials")
val ct:Dataset[(String,String)] = spark.read.load(s"$workingPath/clinicalTrials").as[(String,String)]
val relDS= spark.read.load(s"$workingPath/relationDS_f1").as[Relation]
relDS.joinWith(ct, relDS("target").equalTo(ct("_1")), "inner")
.map(k =>{
val currentRel = k._1
currentRel.setTarget(k._2._2)
currentRel
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrialsRels")
val clRels:Dataset[Relation] = spark.read.load(s"$workingPath/clinicalTrialsRels").as[Relation]
val rels:Dataset[Relation] = spark.read.load(s"$workingPath/relationDS_filtered").as[Relation]
rels.union(clRels).flatMap(r => {
val inverseRel = new Relation
inverseRel.setSource(r.getTarget)
inverseRel.setTarget(r.getSource)
inverseRel.setDataInfo(r.getDataInfo)
inverseRel.setCollectedfrom(r.getCollectedfrom)
inverseRel.setRelType(r.getRelType)
inverseRel.setSubRelType(r.getSubRelType)
inverseRel.setRelClass(DLIToOAF.rel_inverse(r.getRelClass))
List(r, inverseRel)
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS")
val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet)
val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.toActionSet)
val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.toActionSet)
fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
}
@ -113,6 +181,4 @@ object SparkExportContentForOpenAire {
}
} }