implemented mapping from uniprot, pdb and ebi links

This commit is contained in:
Sandro La Bruzzo 2021-06-24 17:20:00 +02:00
parent 080a280bea
commit 80e15cc455
19 changed files with 595 additions and 181 deletions

View File

@ -577,6 +577,8 @@ val REL_TYPE_VALUE:String = "resultResult"
rel.setSource(id)
rel.setTarget(s"unresolved::${r.relatedIdentifier}::${r.relatedIdentifierType}")
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
rel.getCollectedfrom.asScala.map(c => c.getValue)(collection.breakOut)
rel
})(collection breakOut)
}

View File

@ -142,6 +142,7 @@ case object Crossref2Oaf {
result.setDateofacceptance(asField(issuedDate))
}
else {
// TODO: take the oldest date between publishedPrint and publishedOnline
result.setDateofacceptance(asField(createdDate.getValue))
}
result.setRelevantdate(List(createdDate, postedDate, acceptedDate, publishedOnlineDate, publishedPrintDate).filter(p => p != null).asJava)

View File

@ -0,0 +1,272 @@
package eu.dnetlib.dhp.sx.bio
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, Instance, KeyValue, Oaf, Relation, StructuredProperty}
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import scala.collection.JavaConverters._
object BioDBToOAF {
case class EBILinkItem(id: Long, links: String) {}
case class EBILinks(relType: String, date: String, title: String, pmid: String, targetPid: String, targetPidType: String, targetUrl: String) {}
val dataInfo: DataInfo = OafMapperUtils.dataInfo(false, null, false, false, ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, "0.9")
val PDB_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue("10|opendoar____::d1c373ab1570cfb9a7dbb53c186b37a2", "Protein Data Bank")
val UNIPROT_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue("10|re3data_____::296e1abaf1302897a6838d3588cd0310", "UniProtKB/Swiss-Prot")
val SUBJ_CLASS = "Keywords"
UNIPROT_COLLECTED_FROM.setDataInfo(dataInfo)
PDB_COLLECTED_FROM.setDataInfo(dataInfo)
val EBI_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue("10|opendoar____::83e60e09c222f206c725385f53d7e567c", "EMBL-EBIs Protein Data Bank in Europe (PDBe)")
case class UniprotDate(date: String, date_info: String) {}
def uniprotToOAF(input: String): List[Oaf] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json = parse(input)
val pid = (json \ "pid").extract[String]
val d = new Dataset
d.setPid(
List(
OafMapperUtils.structuredProperty(pid, "uniprot", "uniprot", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo)
).asJava
)
d.setDataInfo(dataInfo)
d.setId(OafMapperUtils.createOpenaireId(50, s"uniprot_____::$pid", true))
d.setCollectedfrom(List(UNIPROT_COLLECTED_FROM).asJava)
val title: String = (json \ "title").extractOrElse[String](null)
if (title != null)
d.setTitle(List(OafMapperUtils.structuredProperty(title, ModelConstants.MAIN_TITLE_QUALIFIER, dataInfo)).asJava)
d.setOriginalId(List(pid).asJava)
val i = new Instance
i.setPid(d.getPid)
i.setUrl(List(s"https://www.uniprot.org/uniprot/$pid").asJava)
i.setInstancetype(OafMapperUtils.qualifier("0046", "Bioentity", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE))
i.setCollectedfrom(UNIPROT_COLLECTED_FROM)
d.setInstance(List(i).asJava)
val dates: List[UniprotDate] = for {
JObject(dateOBJ) <- json \ "dates"
JField("date", JString(date)) <- dateOBJ
JField("date_info", JString(date_info)) <- dateOBJ
} yield UniprotDate(date, date_info)
val subjects: List[String] = (json \\ "subjects").extractOrElse[List[String]](null)
if (subjects != null) {
d.setSubject(
subjects.map(s =>
OafMapperUtils.structuredProperty(s, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, null)
).asJava)
}
if (dates.nonEmpty) {
val i_date = dates.find(d => d.date_info.contains("entry version"))
if (i_date.isDefined) {
i.setDateofacceptance(OafMapperUtils.field(i_date.get.date, dataInfo))
d.setDateofacceptance(OafMapperUtils.field(i_date.get.date, dataInfo))
}
val relevant_dates: List[StructuredProperty] = dates.filter(d => !d.date_info.contains("entry version"))
.map(date => OafMapperUtils.structuredProperty(date.date, "UNKNOWN", "UNKNOWN", ModelConstants.DNET_DATACITE_DATE, ModelConstants.DNET_DATACITE_DATE, dataInfo))
if (relevant_dates != null && relevant_dates.nonEmpty)
d.setRelevantdate(relevant_dates.asJava)
d.setDateofacceptance(OafMapperUtils.field(i_date.get.date, dataInfo))
}
val references_pmid: List[String] = for {
JObject(reference) <- json \ "references"
JField("PubMed", JString(pid)) <- reference
} yield pid
val references_doi: List[String] = for {
JObject(reference) <- json \ "references"
JField(" DOI", JString(pid)) <- reference
} yield pid
if (references_pmid != null && references_pmid.nonEmpty) {
val rel = createRelation(references_pmid.head, "pmid", d.getId, UNIPROT_COLLECTED_FROM, "relationship", "isRelatedTo")
rel.getCollectedfrom
List(d, rel)
}
else if (references_doi != null && references_doi.nonEmpty) {
val rel = createRelation(references_doi.head, "doi", d.getId, UNIPROT_COLLECTED_FROM, "relationship", "isRelatedTo")
List(d, rel)
}
else
List(d)
}
def createRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, subRelType:String, relClass:String):Relation = {
val rel = new Relation
rel.setCollectedfrom(List(PDB_COLLECTED_FROM).asJava)
rel.setDataInfo(dataInfo)
rel.setRelType("resultResult")
rel.setSubRelType(subRelType)
rel.setRelClass(relClass)
rel.setSource(sourceId)
rel.setTarget(s"unresolved::$pid::$pidType")
rel.getTarget.startsWith("unresolved")
rel.setCollectedfrom(List(collectedFrom).asJava)
rel
}
def createSupplementaryRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue): Relation = {
createRelation(pid,pidType,sourceId,collectedFrom, "supplement","IsSupplementTo")
}
def pdbTOOaf(input: String): List[Oaf] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json = parse(input)
val pdb = (json \ "pdb").extract[String].toLowerCase
if (pdb.isEmpty)
return List()
val d = new Dataset
d.setPid(
List(
OafMapperUtils.structuredProperty(pdb, "pdb", "Protein Data Bank Identifier", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo)
).asJava
)
d.setCollectedfrom(List(PDB_COLLECTED_FROM).asJava)
d.setDataInfo(dataInfo)
d.setId(OafMapperUtils.createOpenaireId(50, s"pdb_________::$pdb", true))
d.setOriginalId(List(pdb).asJava)
val title = (json \ "title").extractOrElse[String](null)
if (title == null)
return List()
d.setTitle(List(OafMapperUtils.structuredProperty(title, ModelConstants.MAIN_TITLE_QUALIFIER, dataInfo)).asJava)
val authors: List[String] = (json \ "authors").extractOrElse[List[String]](null)
if (authors != null) {
val convertedAuthors = authors.zipWithIndex.map { a =>
val res = new Author
res.setFullname(a._1)
res.setRank(a._2 + 1)
res
}
d.setAuthor(convertedAuthors.asJava)
}
val i = new Instance
i.setPid(d.getPid)
i.setUrl(List(s"https://www.rcsb.org/structure/$pdb").asJava)
i.setInstancetype(OafMapperUtils.qualifier("0046", "Bioentity", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE))
i.setCollectedfrom(PDB_COLLECTED_FROM)
d.setInstance(List(i).asJava)
val pmid = (json \ "pmid").extractOrElse[String](null)
if (pmid != null)
List(d, createSupplementaryRelation(pmid, "pmid", d.getId, PDB_COLLECTED_FROM))
else
List(d)
}
def extractEBILinksFromDump(input: String): EBILinkItem = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json = parse(input)
val pmid = (json \ "publication" \ "pmid").extract[String]
val links = (json \ "links").extract[JObject]
EBILinkItem(pmid.toLong, compact(render(links)))
}
def EBITargetLinksFilter(input: EBILinks): Boolean = {
input.targetPidType.equalsIgnoreCase("ena") || input.targetPidType.equalsIgnoreCase("pdb") || input.targetPidType.equalsIgnoreCase("uniprot")
}
def parse_ebi_links(input: String): List[EBILinks] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json = parse(input)
val pmid = (json \ "request" \ "id").extract[String]
for {
JObject(link) <- json \\ "Link"
JField("Target", JObject(target)) <- link
JField("RelationshipType", JObject(relType)) <- link
JField("Name", JString(relation)) <- relType
JField("PublicationDate", JString(publicationDate)) <- link
JField("Title", JString(title)) <- target
JField("Identifier", JObject(identifier)) <- target
JField("IDScheme", JString(idScheme)) <- identifier
JField("IDURL", JString(idUrl)) <- identifier
JField("ID", JString(id)) <- identifier
} yield EBILinks(relation, publicationDate, title, pmid, id, idScheme, idUrl)
}
def convertEBILinksToOaf(input: EBILinks): List[Oaf] = {
val d = new Dataset
d.setCollectedfrom(List(EBI_COLLECTED_FROM).asJava)
d.setDataInfo(dataInfo)
d.setTitle(List(OafMapperUtils.structuredProperty(input.title, ModelConstants.MAIN_TITLE_QUALIFIER, dataInfo)).asJava)
val nsPrefix = input.targetPidType.toLowerCase.padTo(12, '_')
d.setId(OafMapperUtils.createOpenaireId(50, s"$nsPrefix::${input.targetPid.toLowerCase}", true))
d.setOriginalId(List(input.targetPid.toLowerCase).asJava)
d.setPid(
List(
OafMapperUtils.structuredProperty(input.targetPid.toLowerCase, input.targetPidType.toLowerCase, "Protein Data Bank Identifier", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo)
).asJava
)
val i = new Instance
i.setPid(d.getPid)
i.setUrl(List(input.targetUrl).asJava)
i.setInstancetype(OafMapperUtils.qualifier("0046", "Bioentity", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE))
i.setCollectedfrom(EBI_COLLECTED_FROM)
d.setInstance(List(i).asJava)
i.setDateofacceptance(OafMapperUtils.field(input.date, dataInfo))
d.setDateofacceptance(OafMapperUtils.field(input.date, dataInfo))
List(d, createRelation(input.pmid, "pmid", d.getId, EBI_COLLECTED_FROM,"relationship", "isRelatedTo"))
}
}

View File

@ -1,84 +0,0 @@
package eu.dnetlib.dhp.sx.bio
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, Instance, KeyValue, Oaf, Relation}
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import scala.collection.JavaConverters._
object PDBToOAF {
val dataInfo: DataInfo = OafMapperUtils.dataInfo(false, null, false, false, ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, "0.9")
val collectedFrom: KeyValue = OafMapperUtils.keyValue("10|opendoar____::d1c373ab1570cfb9a7dbb53c186b37a2", "Protein Data Bank")
def convert(input:String):List[Oaf]= {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json = parse(input)
val pdb = (json \ "pdb").extract[String].toLowerCase
if (pdb.isEmpty)
return List()
val d = new Dataset
d.setPid(
List(
OafMapperUtils.structuredProperty(pdb, "pdb", "pdb", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES,dataInfo)
).asJava
)
d.setCollectedfrom(List(collectedFrom).asJava)
d.setDataInfo(dataInfo)
d.setId(OafMapperUtils.createOpenaireId(50,s"pdb_________::$pdb", true))
d.setOriginalId(List(pdb).asJava)
val title = (json \ "title").extractOrElse[String](null)
if (title== null)
return List()
d.setTitle(List(OafMapperUtils.structuredProperty(title, ModelConstants.MAIN_TITLE_QUALIFIER, dataInfo)).asJava)
val authors:List[String] = (json \ "authors").extractOrElse[List[String]](null)
if (authors!= null)
{
val convertedAuthors = authors.zipWithIndex.map{a =>
val res = new Author
res.setFullname(a._1)
res.setRank(a._2+1)
res
}
d.setAuthor(convertedAuthors.asJava)
}
val i = new Instance
i.setPid(d.getPid)
i.setUrl(List(s"https://www.rcsb.org/structure/$pdb").asJava)
i.setInstancetype(OafMapperUtils.qualifier("0046", "Bioentity", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE))
i.setCollectedfrom(collectedFrom)
d.setInstance(List(i).asJava)
val pmid = (json \ "pmid").extractOrElse[String](null)
if (pmid != null) {
val rel = new Relation
rel.setCollectedfrom(List(collectedFrom).asJava)
rel.setDataInfo(dataInfo)
rel.setRelType("resultResult")
rel.setSubRelType("supplement")
rel.setRelClass("IsSupplementTo")
rel.setSource(d.getId)
rel.setTarget(s"unresolved::$pmid::pmid")
return List(d,rel)
}
List(d)
}
}

View File

@ -0,0 +1,44 @@
package eu.dnetlib.dhp.sx.bio
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkTransformBioDatabaseToOAF {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val log: Logger = LoggerFactory.getLogger(getClass)
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/bio_to_oaf_params.json")))
parser.parseArgument(args)
val database: String = parser.get("database")
log.info("database: {}", database)
val dbPath: String = parser.get("dbPath")
log.info("dbPath: {}", database)
val targetPath: String = parser.get("targetPath")
log.info("targetPath: {}", database)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc = spark.sparkContext
implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
database.toUpperCase() match {
case "UNIPROT" =>
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).write.mode(SaveMode.Overwrite).save(targetPath)
case "PDB"=>
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath)
}
}
}

View File

@ -165,13 +165,13 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
.appName(SparkEBILinksToOaf.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()

View File

@ -41,7 +41,7 @@ object SparkCreateBaselineDataFrame {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val log: Logger = LoggerFactory.getLogger(getClass)
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/baseline_to_oaf_params.json")))
parser.parseArgument(args)
val isLookupUrl: String = parser.get("isLookupUrl")
log.info("isLookupUrl: {}", isLookupUrl)
@ -52,7 +52,7 @@ object SparkCreateBaselineDataFrame {
SparkSession
.builder()
.config(conf)
.appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
.appName(SparkEBILinksToOaf.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
import spark.implicits._

View File

@ -1,86 +0,0 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkCreateEBIDataFrame {
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(SparkCreateEBIDataFrame.getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc = spark.sparkContext
val workingPath = parser.get("workingPath")
val relationMapper = RelationMapper.load
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
implicit val datasetEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
// logger.info("Extract Publication and relation from publication_xml")
// val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
// {
// new ObjectMapper().readValue(s, classOf[String])
// }).flatMap(s => {
// val d = new PublicationScholexplorerParser
// d.parseObject(s, relationMapper).asScala.iterator})
//
// val mapper = new ObjectMapper()
// mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
// spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
//
// logger.info("Extract Publication and relation from dataset_xml")
// val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s =>
// {
// new ObjectMapper().readValue(s, classOf[String])
// }).flatMap(s => {
// val d = new DatasetScholexplorerParser
// d.parseObject(s, relationMapper).asScala.iterator})
// spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
val dataset: Dataset[DLIDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIDataset]).map(d => d.asInstanceOf[DLIDataset])
val publication: Dataset[DLIPublication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIPublication]).map(d => d.asInstanceOf[DLIPublication])
val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation])
publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getRelationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
}
}

View File

@ -0,0 +1,40 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.sx.bio.BioDBToOAF
import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkEBILinksToOaf {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(SparkEBILinksToOaf.getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkEBILinksToOaf.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
import spark.implicits._
implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
val ebLinks:Dataset[EBILinkItem] = spark.read.load(sourcePath).as[EBILinkItem].filter(l => l.links!= null)
ebLinks.flatMap(j =>BioDBToOAF.parse_ebi_links(j.links))
.repartition(4000)
.filter(p => BioDBToOAF.EBITargetLinksFilter(p))
.flatMap(p => BioDBToOAF.convertEBILinksToOaf(p))
.write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -1,7 +1,7 @@
package eu.dnetlib.sx.pangaea
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame
import eu.dnetlib.dhp.sx.ebi.SparkEBILinksToOaf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}

View File

@ -0,0 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"db", "paramLongName":"database", "paramDescription": "should be PDB or UNIPROT", "paramRequired": true},
{"paramName":"p", "paramLongName":"dbPath", "paramDescription": "the path of the database to transform", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the OAF target path ", "paramRequired": true}
]

View File

@ -0,0 +1,19 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,111 @@
<workflow-app name="Transform_Pubmed_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>PDBPath</name>
<description>the PDB Database Working Path</description>
</property>
<property>
<name>UNIPROTDBPath</name>
<description>the UNIPROT Database Working Path</description>
</property>
<property>
<name>EBIDataset</name>
<description>the EBI Links Dataset Path</description>
</property>
<property>
<name>targetPath</name>
<description>the Target Working dir path</description>
</property>
</parameters>
<start to="ConvertPDB"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ConvertPDB">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Convert PDB to OAF Dataset</name>
<class>eu.dnetlib.dhp.sx.bio.SparkTransformBioDatabaseToOAF</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=2000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--dbPath</arg><arg>${PDBPath}</arg>
<arg>--database</arg><arg>PDB</arg>
<arg>--targetPath</arg><arg>${targetPath}/pdb_OAF</arg>
</spark>
<ok to="ConvertUNIPROT"/>
<error to="Kill"/>
</action>
<action name="ConvertUNIPROT">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Convert UNIPROT to OAF Dataset</name>
<class>eu.dnetlib.dhp.sx.bio.SparkTransformBioDatabaseToOAF</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=2000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--dbPath</arg><arg>${UNIPROTDBPath}</arg>
<arg>--database</arg><arg>UNIPROT</arg>
<arg>--targetPath</arg><arg>${targetPath}/uniprot_OAF</arg>
</spark>
<ok to="ConvertEBILinks"/>
<error to="Kill"/>
</action>
<action name="ConvertEBILinks">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Convert EBI Links to OAF Dataset</name>
<class>eu.dnetlib.dhp.sx.ebi.SparkEBILinksToOaf</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=2000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${EBIDataset}</arg>
<arg>--targetPath</arg><arg>${targetPath}/ebi_OAF</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,5 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"i", "paramLongName":"isLookupUrl","paramDescription": "isLookupUrl", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath","paramDescription": "the path of the sequencial file to read", "paramRequired": true}
]

View File

@ -1,5 +1,5 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"i", "paramLongName":"isLookupUrl","paramDescription": "isLookupUrl", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath","paramDescription": "the path of the sequencial file to read", "paramRequired": true}
{"paramName":"s", "paramLongName":"sourcePath","paramDescription": "the source Path", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath","paramDescription": "the oaf path ", "paramRequired": true}
]

View File

@ -54,7 +54,7 @@
<mode>cluster</mode>
<name>Create EBI DataSet</name>
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame</class>
<class>eu.dnetlib.dhp.sx.ebi.SparkEBILinksToOaf</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}

View File

@ -2,16 +2,21 @@ package eu.dnetlib.dhp.sx.bio.pubmed
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
import eu.dnetlib.dhp.sx.bio.PDBToOAF
import eu.dnetlib.dhp.sx.bio.BioDBToOAF
import eu.dnetlib.dhp.sx.ebi.SparkEBILinksToOaf
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.junit.jupiter.MockitoExtension
import java.io.{BufferedReader, FileInputStream, InputStream, InputStreamReader}
import java.util.zip.GZIPInputStream
import scala.collection.JavaConverters._
import scala.io.Source
import scala.xml.pull.XMLEventReader
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
@ExtendWith(Array(classOf[MockitoExtension]))
class BioScholixTest extends AbstractVocabularyTest{
@ -23,6 +28,21 @@ class BioScholixTest extends AbstractVocabularyTest{
super.setUpVocabulary()
}
class BufferedReaderIterator(reader: BufferedReader) extends Iterator[String] {
override def hasNext() = reader.ready
override def next() = reader.readLine()
}
object GzFileIterator {
def apply(is: InputStream, encoding: String) = {
new BufferedReaderIterator(
new BufferedReader(
new InputStreamReader(
new GZIPInputStream(
is), encoding)))
}
}
@Test
def testEBIData() = {
@ -60,7 +80,7 @@ class BioScholixTest extends AbstractVocabularyTest{
val records:String =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/pdb_dump")).mkString
records.lines.foreach(s => assertTrue(s.nonEmpty))
val result:List[Oaf]= records.lines.toList.flatMap(o => PDBToOAF.convert(o))
val result:List[Oaf]= records.lines.toList.flatMap(o => BioDBToOAF.pdbTOOaf(o))
@ -72,4 +92,62 @@ class BioScholixTest extends AbstractVocabularyTest{
}
@Test
def testUNIprotToOAF():Unit = {
assertNotNull(vocabularies)
assertTrue(vocabularies.vocabularyExists("dnet:publication_resource"))
val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false)
val records:String =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/uniprot_dump")).mkString
records.lines.foreach(s => assertTrue(s.nonEmpty))
val result:List[Oaf]= records.lines.toList.flatMap(o => BioDBToOAF.uniprotToOAF(o))
assertTrue(result.nonEmpty)
result.foreach(r => assertNotNull(r))
println(result.count(o => o.isInstanceOf[Relation]))
println(mapper.writeValueAsString(result.head))
}
case class EBILinks(relType:String, date:String, title:String, pmid:String, targetPid:String, targetPidType:String) {}
def parse_ebi_links(input:String):List[EBILinks] ={
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json = parse(input)
val pmid = (json \ "publication" \"pmid").extract[String]
for {
JObject(link) <- json \\ "Link"
JField("Target",JObject(target)) <- link
JField("RelationshipType",JObject(relType)) <- link
JField("Name", JString(relation)) <- relType
JField("PublicationDate",JString(publicationDate)) <- link
JField("Title", JString(title)) <- target
JField("Identifier",JObject(identifier)) <- target
JField("IDScheme", JString(idScheme)) <- identifier
JField("ID", JString(id)) <- identifier
} yield EBILinks(relation, publicationDate, title, pmid, id, idScheme)
}
@Test
def testEBILinksToOAF():Unit = {
val iterator = GzFileIterator(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi_links.gz"), "UTF-8")
val data = iterator.next()
val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false)
val res = BioDBToOAF.parse_ebi_links(BioDBToOAF.extractEBILinksFromDump(data).links).filter(BioDBToOAF.EBITargetLinksFilter).flatMap(BioDBToOAF.convertEBILinksToOaf)
print(res.length)
}
}

View File

@ -0,0 +1,6 @@
{"pid": "Q6GZX4", "dates": [{"date": "28-JUN-2011", "date_info": " integrated into UniProtKB/Swiss-Prot."}, {"date": "19-JUL-2004", "date_info": " sequence version 1."}, {"date": "12-AUG-2020", "date_info": " entry version 41."}], "title": "Putative transcription factor 001R;", "organism_species": "Frog virus 3 (isolate Goorha) (FV-3).", "subjects": ["Viruses", "Varidnaviria", "Bamfordvirae", "Nucleocytoviricota", "Megaviricetes", "Pimascovirales", "Iridoviridae", "Alphairidovirinae", "Ranavirus."], "references": [{"PubMed": "15165820"}, {" DOI": "10.1016/j.virol.2004.02.019"}]}
{"pid": "Q6GZX3", "dates": [{"date": "28-JUN-2011", "date_info": " integrated into UniProtKB/Swiss-Prot."}, {"date": "19-JUL-2004", "date_info": " sequence version 1."}, {"date": "12-AUG-2020", "date_info": " entry version 42."}], "title": "Uncharacterized protein 002L;", "organism_species": "Frog virus 3 (isolate Goorha) (FV-3).", "subjects": ["Viruses", "Varidnaviria", "Bamfordvirae", "Nucleocytoviricota", "Megaviricetes", "Pimascovirales", "Iridoviridae", "Alphairidovirinae", "Ranavirus."], "references": [{"PubMed": "15165820"}, {" DOI": "10.1016/j.virol.2004.02.019"}]}
{"pid": "Q197F8", "dates": [{"date": "16-JUN-2009", "date_info": " integrated into UniProtKB/Swiss-Prot."}, {"date": "11-JUL-2006", "date_info": " sequence version 1."}, {"date": "12-AUG-2020", "date_info": " entry version 27."}], "title": "Uncharacterized protein 002R;", "organism_species": "Invertebrate iridescent virus 3 (IIV-3) (Mosquito iridescent virus).", "subjects": ["Viruses", "Varidnaviria", "Bamfordvirae", "Nucleocytoviricota", "Megaviricetes", "Pimascovirales", "Iridoviridae", "Betairidovirinae", "Chloriridovirus."], "references": [{"PubMed": "16912294"}, {" DOI": "10.1128/jvi.00464-06"}]}
{"pid": "Q197F7", "dates": [{"date": "16-JUN-2009", "date_info": " integrated into UniProtKB/Swiss-Prot."}, {"date": "11-JUL-2006", "date_info": " sequence version 1."}, {"date": "12-AUG-2020", "date_info": " entry version 23."}], "title": "Uncharacterized protein 003L;", "organism_species": "Invertebrate iridescent virus 3 (IIV-3) (Mosquito iridescent virus).", "subjects": ["Viruses", "Varidnaviria", "Bamfordvirae", "Nucleocytoviricota", "Megaviricetes", "Pimascovirales", "Iridoviridae", "Betairidovirinae", "Chloriridovirus."], "references": [{"PubMed": "16912294"}, {" DOI": "10.1128/jvi.00464-06"}]}
{"pid": "Q6GZX2", "dates": [{"date": "28-JUN-2011", "date_info": " integrated into UniProtKB/Swiss-Prot."}, {"date": "19-JUL-2004", "date_info": " sequence version 1."}, {"date": "12-AUG-2020", "date_info": " entry version 36."}], "title": "Uncharacterized protein 3R;", "organism_species": "Frog virus 3 (isolate Goorha) (FV-3).", "subjects": ["Viruses", "Varidnaviria", "Bamfordvirae", "Nucleocytoviricota", "Megaviricetes", "Pimascovirales", "Iridoviridae", "Alphairidovirinae", "Ranavirus."], "references": [{"PubMed": "15165820"}, {" DOI": "10.1016/j.virol.2004.02.019"}]}
{"pid": "Q6GZX1", "dates": [{"date": "28-JUN-2011", "date_info": " integrated into UniProtKB/Swiss-Prot."}, {"date": "19-JUL-2004", "date_info": " sequence version 1."}, {"date": "12-AUG-2020", "date_info": " entry version 34."}], "title": "Uncharacterized protein 004R;", "organism_species": "Frog virus 3 (isolate Goorha) (FV-3).", "subjects": ["Viruses", "Varidnaviria", "Bamfordvirae", "Nucleocytoviricota", "Megaviricetes", "Pimascovirales", "Iridoviridae", "Alphairidovirinae", "Ranavirus."], "references": [{"PubMed": "15165820"}, {" DOI": "10.1016/j.virol.2004.02.019"}]}