From 1681de672ddd22f7702b4330af79b34b6200695d Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 19 Jun 2020 15:11:46 +0200 Subject: [PATCH] updated mapping scholexplorer to OAF --- .../java/eu/dnetlib/dhp/export/DLIToOAF.scala | 137 +++++++++++--- .../SparkExportContentForOpenAire.scala | 174 ++++++++++++------ 2 files changed, 229 insertions(+), 82 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala index 5d7c444b2..637362acf 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala @@ -4,9 +4,12 @@ import java.time.LocalDateTime import java.time.format.DateTimeFormatter import eu.dnetlib.dhp.common.PacePerson -import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, Field, Instance, KeyValue, Publication, Qualifier, Relation, StructuredProperty} +import eu.dnetlib.dhp.schema.action.AtomicAction +import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, StructuredProperty} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} +import eu.dnetlib.dhp.utils.DHPUtils import org.apache.commons.lang3.StringUtils +import org.codehaus.jackson.map.ObjectMapper import scala.collection.JavaConverters._ @@ -77,6 +80,76 @@ object DLIToOAF { ) + val rel_inverse: Map[String, String] = Map( + "isRelatedTo" -> "isRelatedTo", + "IsSupplementedBy" -> "isSupplementTo", + "cites" -> "IsCitedBy", + "IsCitedBy" -> "cites", + "reviews" -> "IsReviewedBy" + ) + + + val PidTypeMap: Map[String, String] = Map( + "pbmid" -> "pmid", + "pmcid" -> "pmc", + "pmid" -> "pmid", + "pubmedid" -> "pmid", + "DOI" -> "doi", + "doi" -> "doi" + ) + + + def toActionSet(item: Oaf): (String, String) = { + val mapper = new ObjectMapper() + + item match { + case dataset: Dataset => + val a: AtomicAction[Dataset] = new AtomicAction[Dataset] + a.setClazz(classOf[Dataset]) + a.setPayload(dataset) + (dataset.getClass.getCanonicalName, mapper.writeValueAsString(a)) + case publication: Publication => + val a: AtomicAction[Publication] = new AtomicAction[Publication] + a.setClazz(classOf[Publication]) + a.setPayload(publication) + (publication.getClass.getCanonicalName, mapper.writeValueAsString(a)) + case relation: Relation => + val a: AtomicAction[Relation] = new AtomicAction[Relation] + a.setClazz(classOf[Relation]) + a.setPayload(relation) + (relation.getClass.getCanonicalName, mapper.writeValueAsString(a)) + case _ => + null + } + } + + def convertClinicalTrial(dataset: DLIDataset): (String, String) = { + val currentId = generateId(dataset.getId) + val pids = dataset.getPid.asScala.filter(p => "clinicaltrials.gov".equalsIgnoreCase(p.getQualifier.getClassname)).map(p => s"50|r3111dacbab5::${DHPUtils.md5(p.getValue.toLowerCase())}") + if (pids.isEmpty) + null + else + (currentId, pids.head) + } + + + def insertExternalRefs(publication: Publication, externalReferences: List[DLIExternalReference]): Publication = { + + val eRefs = externalReferences.map(e => { + val result = new ExternalReference() + result.setSitename(e.sitename) + result.setLabel(e.label) + result.setUrl(e.url) + result.setRefidentifier(e.pid) + result.setDataInfo(generateDataInfo()) + result.setQualifier(createQualifier(e.classId, "dnet:externalReference_typologies")) + result + }) + publication.setExternalReference(eRefs.asJava) + publication + + } + def filterPid(p: StructuredProperty): Boolean = { if (expectecdPidType.contains(p.getQualifier.getClassname) && p.getQualifier.getClassname.equalsIgnoreCase("url")) if (filteredURL.exists(u => p.getValue.contains(u))) @@ -97,7 +170,6 @@ object DLIToOAF { } def convertDLIDatasetToExternalReference(dataset: DLIDataset): DLIExternalReference = { - val currentId = generateId(dataset.getId) val pids = dataset.getPid.asScala.filter(filterPid) if (pids == null || pids.isEmpty) @@ -109,7 +181,7 @@ object DLIToOAF { pid.getQualifier.getClassname match { case "uniprot" => DLIExternalReference(generateId(dataset.getId), s"https://www.uniprot.org/uniprot/${pid.getValue}", "UniProt", extractTitle(dataset.getTitle), pid.getValue, "accessionNumber") case "ena" => - if(pid.getValue!= null && pid.getValue.nonEmpty && pid.getValue.length>7) + if (pid.getValue != null && pid.getValue.nonEmpty && pid.getValue.length > 7) DLIExternalReference(generateId(dataset.getId), s"https://www.ebi.ac.uk/ena/data/view/${pid.getValue.substring(0, 8)}", "European Nucleotide Archive", extractTitle(dataset.getTitle), pid.getValue, "accessionNumber") else null @@ -126,43 +198,50 @@ object DLIToOAF { } - def convertDLIPublicationToOAF(p: DLIPublication): Publication = { - + def convertDLIPublicationToOAF(inputPublication: DLIPublication): Publication = { val result = new Publication - result.setId(generateId(p.getId)) + val cleanedPids = inputPublication.getPid.asScala.filter(p => PidTypeMap.contains(p.getQualifier.getClassid)) + .map(p => { + p.setQualifier(createQualifier(PidTypeMap(p.getQualifier.getClassid), p.getQualifier.getSchemeid)) + p + }) + if (cleanedPids.isEmpty) + return null + result.setId(generateId(inputPublication.getId)) result.setDataInfo(generateDataInfo(invisibile = true)) - if (p.getCollectedfrom == null || p.getCollectedfrom.size() == 0 || (p.getCollectedfrom.size() == 1 && p.getCollectedfrom.get(0) == null)) + if (inputPublication.getCollectedfrom == null || inputPublication.getCollectedfrom.size() == 0 || (inputPublication.getCollectedfrom.size() == 1 && inputPublication.getCollectedfrom.get(0) == null)) return null - - result.setCollectedfrom(p.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).asJava) - result.setPid(p.getPid) - result.setDateofcollection(p.getDateofcollection) - result.setOriginalId(p.getPid.asScala.map(p => p.getValue).asJava) + result.setCollectedfrom(inputPublication.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava) + if(result.getCollectedfrom.isEmpty) + return null + result.setPid(cleanedPids.asJava) + result.setDateofcollection(inputPublication.getDateofcollection) + result.setOriginalId(inputPublication.getPid.asScala.map(p => p.getValue).asJava) result.setDateoftransformation(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"))) - if (p.getAuthor == null || p.getAuthor.isEmpty) + if (inputPublication.getAuthor == null || inputPublication.getAuthor.isEmpty) return null - result.setAuthor(p.getAuthor.asScala.map(convertAuthor).asJava) - result.setResulttype(createQualifier(p.getResulttype.getClassid, p.getResulttype.getClassname, "dnet:result_typologies", "dnet:result_typologies")) + result.setAuthor(inputPublication.getAuthor.asScala.map(convertAuthor).asJava) + result.setResulttype(createQualifier(inputPublication.getResulttype.getClassid, inputPublication.getResulttype.getClassname, "dnet:result_typologies", "dnet:result_typologies")) - if (p.getSubject != null) - result.setSubject(p.getSubject.asScala.map(convertSubject).asJava) + if (inputPublication.getSubject != null) + result.setSubject(inputPublication.getSubject.asScala.map(convertSubject).asJava) - if (p.getTitle == null || p.getTitle.isEmpty) + if (inputPublication.getTitle == null || inputPublication.getTitle.isEmpty) return null - result.setTitle(List(patchTitle(p.getTitle.get(0))).asJava) + result.setTitle(List(patchTitle(inputPublication.getTitle.get(0))).asJava) - if (p.getRelevantdate == null || p.getRelevantdate.size() == 0) + if (inputPublication.getRelevantdate == null || inputPublication.getRelevantdate.size() == 0) return null - result.setRelevantdate(p.getRelevantdate.asScala.map(patchRelevantDate).asJava) + result.setRelevantdate(inputPublication.getRelevantdate.asScala.map(patchRelevantDate).asJava) - result.setDescription(p.getDescription) + result.setDescription(inputPublication.getDescription) - result.setDateofacceptance(asField(p.getRelevantdate.get(0).getValue)) - result.setPublisher(p.getPublisher) - result.setSource(p.getSource) + result.setDateofacceptance(asField(inputPublication.getRelevantdate.get(0).getValue)) + result.setPublisher(inputPublication.getPublisher) + result.setSource(inputPublication.getSource) result.setBestaccessright(createQualifier("UNKNOWN", "not available", "dnet:access_modes", "dnet:access_modes")) val dois = result.getPid.asScala.filter(p => "doi".equalsIgnoreCase(p.getQualifier.getClassname)).map(p => p.getValue) @@ -170,7 +249,7 @@ object DLIToOAF { return null - val i: Instance = createInstance(s"https://dx.doi.org/${dois.head}", firstInstanceOrNull(p.getInstance()), result.getDateofacceptance) + val i: Instance = createInstance(s"https://dx.doi.org/${dois.head}", firstInstanceOrNull(inputPublication.getInstance()), result.getDateofacceptance) if (i != null) result.setInstance(List(i).asJava) @@ -211,7 +290,9 @@ object DLIToOAF { val result: Dataset = new Dataset result.setId(generateId(d.getId)) result.setDataInfo(generateDataInfo()) - result.setCollectedfrom(d.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).asJava) + result.setCollectedfrom(d.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava) + if(result.getCollectedfrom.isEmpty) + return null result.setPid(d.getPid) @@ -280,7 +361,7 @@ object DLIToOAF { if (dataset) i.setInstancetype(createQualifier("0021", "Dataset", "dnet:publication_resource", "dnet:publication_resource")) else - i.setInstancetype(createQualifier("0000", "UNKNOWN", "dnet:publication_resource", "dnet:publication_resource")) + i.setInstancetype(createQualifier("0000", "Unknown", "dnet:publication_resource", "dnet:publication_resource")) if (originalInstance != null && originalInstance.getHostedby != null) i.setHostedby(originalInstance.getHostedby) diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala index f3aa35549..edf951df4 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala @@ -4,10 +4,16 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.hadoop.mapred.SequenceFileOutputFormat import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.expressions.Window import org.apache.spark.{SparkConf, SparkContext} import org.codehaus.jackson.map.ObjectMapper + import scala.collection.mutable.ArrayBuffer @@ -36,57 +42,66 @@ object SparkExportContentForOpenAire { implicit val dliRelEncoder: Encoder[DLIRelation] = Encoders.bean(classOf[DLIRelation]) import spark.implicits._ -// -// val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j") -// .map(s => new ObjectMapper().readValue(s, classOf[DLIRelation])) -// .filter(p => p.getDataInfo.getDeletedbyinference == false) -// .map(DLIToOAF.convertDLIRelation).filter(p=>p!= null) -// spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS") -// -// val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset") -// .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) -// .filter(p => p.getDataInfo.getDeletedbyinference == false) -// .map(DLIToOAF.convertDLIDatasetTOOAF).filter(p=>p!= null) -// spark.createDataset(datRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS") -// -// -// val pubRDD:RDD[Publication] = sc.textFile(s"$workingPath/publication") -// .map(s => new ObjectMapper().readValue(s, classOf[DLIPublication])) -// .filter(p => p.getDataInfo.getDeletedbyinference == false) -// .map(DLIToOAF.convertDLIPublicationToOAF).filter(p=>p!= null) -// spark.createDataset(pubRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS") -// -// -// -// val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS").as[Publication] -// val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/datasetDS").as[OafDataset] - var relDS :Dataset[Relation] = spark.read.load(s"$workingPath/relationDS").as[Relation] -// -// -// pubs.joinWith(relDS, pubs("id").equalTo(relDS("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_f1") -// -// relDS= spark.read.load(s"$workingPath/relationDS_f1").as[Relation] -// -// relDS.joinWith(dats, relDS("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_filtered") -// -// -// val r_source = relDS.select(relDS("source")).distinct() -// val r_target = relDS.select(relDS("source")).distinct() -// -// -// pubs.joinWith(r_source, pubs("id").equalTo(r_source("source")), "inner").map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS_filtered") -// -// dats.joinWith(r_target, dats("id").equalTo(r_target("target")), "inner").map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS_filtered") -// -// spark.createDataset(sc.textFile(s"$workingPath/dataset") -// .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) -// .map(DLIToOAF.convertDLIDatasetToExternalReference) -// .filter(p => p != null)).as[DLIExternalReference].write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference") -// + + val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j") + .map(s => new ObjectMapper().readValue(s, classOf[DLIRelation])) + .filter(p => p.getDataInfo.getDeletedbyinference == false) + .map(DLIToOAF.convertDLIRelation).filter(p=>p!= null) + spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS") + + val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset") + .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) + .filter(p => p.getDataInfo.getDeletedbyinference == false) + .map(DLIToOAF.convertDLIDatasetTOOAF).filter(p=>p!= null) + spark.createDataset(datRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS") + + + val pubRDD:RDD[Publication] = sc.textFile(s"$workingPath/publication") + .map(s => new ObjectMapper().readValue(s, classOf[DLIPublication])) + .filter(p => p.getDataInfo.getDeletedbyinference == false) + .map(DLIToOAF.convertDLIPublicationToOAF).filter(p=>p!= null) + spark.createDataset(pubRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS") + + + + val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS").as[Publication] + val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/datasetDS").as[OafDataset] + val relDS1 :Dataset[Relation] = spark.read.load(s"$workingPath/relationDS").as[Relation] + + + val pub_id = pubs.select("id").distinct() + val dat_id = dats.select("id").distinct() + + + pub_id.joinWith(relDS1, pub_id("id").equalTo(relDS1("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_f1") + + val relDS2= spark.read.load(s"$workingPath/relationDS_f1").as[Relation] + + relDS2.joinWith(dat_id, relDS2("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_filtered") + + + val r_source = relDS2.select(relDS2("source")).distinct() + val r_target = relDS2.select(relDS2("target")).distinct() + + + val w2 = Window.partitionBy("id").orderBy("lastupdatetimestamp") + + pubs.joinWith(r_source, pubs("id").equalTo(r_source("source")), "inner").map(k => k._1) + .withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS_filtered") + + dats.joinWith(r_target, dats("id").equalTo(r_target("target")), "inner").map(k => k._1) + .withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS") + + spark.createDataset(sc.textFile(s"$workingPath/dataset") + .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) + .map(DLIToOAF.convertDLIDatasetToExternalReference) + .filter(p => p != null)).as[DLIExternalReference].write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference") val pf = spark.read.load(s"$workingPath/publicationDS_filtered").select("id") - relDS = spark.read.load(s"$workingPath/relationDS").as[Relation] - val relationTo = pf.joinWith(relDS, pf("id").equalTo(relDS("source")),"inner").map(t =>t._2) + val relDS3 = spark.read.load(s"$workingPath/relationDS").as[Relation] + val relationTo = pf.joinWith(relDS3, pf("id").equalTo(relDS3("source")),"inner").map(t =>t._2) val extRef = spark.read.load(s"$workingPath/externalReference").as[DLIExternalReference] @@ -100,19 +115,70 @@ object SparkExportContentForOpenAire { (f._1, dli_ext) })).write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference_grouped") + val pubf :Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS_filtered").as[Publication] + + val groupedERf:Dataset[(String, List[DLIExternalReference])]= spark.read.load(s"$workingPath/externalReference_grouped").as[(String, List[DLIExternalReference])] + + groupedERf.joinWith(pubf,pubf("id").equalTo(groupedERf("_1"))).map(t => + { + val publication = t._2 + if (t._1 != null) { + val eRefs = t._1._2 + DLIToOAF.insertExternalRefs(publication, eRefs) + + } else + publication + } + ).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS") + spark.createDataset(sc.textFile(s"$workingPath/dataset") + .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) + .map(DLIToOAF.convertClinicalTrial) + .filter(p => p != null)) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrials") + + val ct:Dataset[(String,String)] = spark.read.load(s"$workingPath/clinicalTrials").as[(String,String)] + + val relDS= spark.read.load(s"$workingPath/relationDS_f1").as[Relation] + + relDS.joinWith(ct, relDS("target").equalTo(ct("_1")), "inner") + .map(k =>{ + val currentRel = k._1 + currentRel.setTarget(k._2._2) + currentRel + }).write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrialsRels") + val clRels:Dataset[Relation] = spark.read.load(s"$workingPath/clinicalTrialsRels").as[Relation] + val rels:Dataset[Relation] = spark.read.load(s"$workingPath/relationDS_filtered").as[Relation] + + rels.union(clRels).flatMap(r => { + val inverseRel = new Relation + inverseRel.setSource(r.getTarget) + inverseRel.setTarget(r.getSource) + inverseRel.setDataInfo(r.getDataInfo) + inverseRel.setCollectedfrom(r.getCollectedfrom) + inverseRel.setRelType(r.getRelType) + inverseRel.setSubRelType(r.getSubRelType) + inverseRel.setRelClass(DLIToOAF.rel_inverse(r.getRelClass)) + List(r, inverseRel) + }).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS") + val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet) + val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.toActionSet) + val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.toActionSet) - - - - - + fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) } + + + + + + + }