forked from D-Net/dnet-hadoop
code refactor see ticket #7065
This commit is contained in:
parent
8f99d2af86
commit
5606014b17
|
@ -1,14 +1,12 @@
|
|||
package eu.dnetlib.dhp.sx.graph.bio
|
||||
package eu.dnetllib.dhp.sx.bio
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, OafMapperUtils}
|
||||
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, Instance, KeyValue, Oaf, Relation, StructuredProperty}
|
||||
import eu.dnetlib.dhp.schema.oaf._
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.JsonAST.{JField, JObject, JString}
|
||||
import org.json4s.jackson.JsonMethods.{compact, parse, render}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import collection.JavaConverters._
|
||||
object BioDBToOAF {
|
||||
|
||||
case class EBILinkItem(id: Long, links: String) {}
|
||||
|
@ -17,23 +15,23 @@ object BioDBToOAF {
|
|||
|
||||
case class UniprotDate(date: String, date_info: String) {}
|
||||
|
||||
case class ScholixResolved(pid:String, pidType:String, typology:String, tilte:List[String], datasource:List[String], date:List[String], authors:List[String]){}
|
||||
case class ScholixResolved(pid: String, pidType: String, typology: String, tilte: List[String], datasource: List[String], date: List[String], authors: List[String]) {}
|
||||
|
||||
val DATA_INFO: DataInfo = OafMapperUtils.dataInfo(false, null, false, false, ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, "0.9")
|
||||
val SUBJ_CLASS = "Keywords"
|
||||
|
||||
val DATE_RELATION_KEY = "RelationDate"
|
||||
|
||||
val resolvedURL:Map[String,String] = Map(
|
||||
"genbank"-> "https://www.ncbi.nlm.nih.gov/nuccore/",
|
||||
val resolvedURL: Map[String, String] = Map(
|
||||
"genbank" -> "https://www.ncbi.nlm.nih.gov/nuccore/",
|
||||
"ncbi-n" -> "https://www.ncbi.nlm.nih.gov/nuccore/",
|
||||
"ncbi-wgs" -> "https://www.ncbi.nlm.nih.gov/nuccore/",
|
||||
"ncbi-p" -> "https://www.ncbi.nlm.nih.gov/protein/",
|
||||
"ena" -> "https://www.ebi.ac.uk/ena/browser/view/",
|
||||
"clinicaltrials.gov"-> "https://clinicaltrials.gov/ct2/show/",
|
||||
"onim"-> "https://omim.org/entry/",
|
||||
"refseq"-> "https://www.ncbi.nlm.nih.gov/nuccore/",
|
||||
"geo"-> "https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc="
|
||||
"clinicaltrials.gov" -> "https://clinicaltrials.gov/ct2/show/",
|
||||
"onim" -> "https://omim.org/entry/",
|
||||
"refseq" -> "https://www.ncbi.nlm.nih.gov/nuccore/",
|
||||
"geo" -> "https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc="
|
||||
)
|
||||
|
||||
|
||||
|
@ -45,7 +43,7 @@ object BioDBToOAF {
|
|||
val ElsevierCollectedFrom: KeyValue = OafMapperUtils.keyValue("10|openaire____::8f87e10869299a5fe80b315695296b88", "Elsevier")
|
||||
val springerNatureCollectedFrom: KeyValue = OafMapperUtils.keyValue("10|openaire____::6e380d9cf51138baec8480f5a0ce3a2e", "Springer Nature")
|
||||
val EBICollectedFrom: KeyValue = OafMapperUtils.keyValue("10|opendoar____::83e60e09c222f206c725385f53d7e567c", "EMBL-EBIs Protein Data Bank in Europe (PDBe)")
|
||||
val pubmedCollectedFrom:KeyValue = OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central")
|
||||
val pubmedCollectedFrom: KeyValue = OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central")
|
||||
|
||||
UNIPROTCollectedFrom.setDataInfo(DATA_INFO)
|
||||
PDBCollectedFrom.setDataInfo(DATA_INFO)
|
||||
|
@ -58,9 +56,9 @@ object BioDBToOAF {
|
|||
|
||||
Map(
|
||||
"uniprot" -> UNIPROTCollectedFrom,
|
||||
"pdb"-> PDBCollectedFrom,
|
||||
"elsevier" ->ElsevierCollectedFrom,
|
||||
"ebi" ->EBICollectedFrom,
|
||||
"pdb" -> PDBCollectedFrom,
|
||||
"elsevier" -> ElsevierCollectedFrom,
|
||||
"ebi" -> EBICollectedFrom,
|
||||
"Springer Nature" -> springerNatureCollectedFrom,
|
||||
"NCBI Nucleotide" -> ncbiCollectedFrom,
|
||||
"European Nucleotide Archive" -> enaCollectedFrom,
|
||||
|
@ -68,7 +66,7 @@ object BioDBToOAF {
|
|||
)
|
||||
}
|
||||
|
||||
def crossrefLinksToOaf(input:String):Oaf = {
|
||||
def crossrefLinksToOaf(input: String): Oaf = {
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json = parse(input)
|
||||
val source_pid = (json \ "Source" \ "Identifier" \ "ID").extract[String].toLowerCase
|
||||
|
@ -77,16 +75,16 @@ object BioDBToOAF {
|
|||
val target_pid = (json \ "Target" \ "Identifier" \ "ID").extract[String].toLowerCase
|
||||
val target_pid_type = (json \ "Target" \ "Identifier" \ "IDScheme").extract[String].toLowerCase
|
||||
|
||||
val relation_semantic= (json \ "RelationshipType" \ "Name").extract[String]
|
||||
val relation_semantic = (json \ "RelationshipType" \ "Name").extract[String]
|
||||
|
||||
val date = GraphCleaningFunctions.cleanDate((json \ "LinkedPublicationDate").extract[String])
|
||||
|
||||
createRelation(target_pid, target_pid_type, generate_unresolved_id(source_pid, source_pid_type),collectedFromMap("elsevier"),"relationship", relation_semantic, date)
|
||||
createRelation(target_pid, target_pid_type, generate_unresolved_id(source_pid, source_pid_type), collectedFromMap("elsevier"), "relationship", relation_semantic, date)
|
||||
|
||||
}
|
||||
|
||||
|
||||
def scholixResolvedToOAF(input:ScholixResolved):Oaf = {
|
||||
def scholixResolvedToOAF(input: ScholixResolved): Oaf = {
|
||||
|
||||
val d = new Dataset
|
||||
|
||||
|
@ -127,14 +125,14 @@ object BioDBToOAF {
|
|||
d.setInstance(List(i).asJava)
|
||||
|
||||
if (input.authors != null && input.authors.nonEmpty) {
|
||||
val authors = input.authors.map(a =>{
|
||||
val authors = input.authors.map(a => {
|
||||
val authorOAF = new Author
|
||||
authorOAF.setFullname(a)
|
||||
authorOAF
|
||||
})
|
||||
d.setAuthor(authors.asJava)
|
||||
}
|
||||
if (input.date!= null && input.date.nonEmpty) {
|
||||
if (input.date != null && input.date.nonEmpty) {
|
||||
val dt = input.date.head
|
||||
i.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO))
|
||||
d.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO))
|
||||
|
@ -190,7 +188,7 @@ object BioDBToOAF {
|
|||
OafMapperUtils.structuredProperty(s, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, null)
|
||||
).asJava)
|
||||
}
|
||||
var i_date:Option[UniprotDate] = None
|
||||
var i_date: Option[UniprotDate] = None
|
||||
|
||||
if (dates.nonEmpty) {
|
||||
i_date = dates.find(d => d.date_info.contains("entry version"))
|
||||
|
@ -231,13 +229,12 @@ object BioDBToOAF {
|
|||
}
|
||||
|
||||
|
||||
|
||||
def generate_unresolved_id(pid:String, pidType:String) :String = {
|
||||
def generate_unresolved_id(pid: String, pidType: String): String = {
|
||||
s"unresolved::$pid::$pidType"
|
||||
}
|
||||
|
||||
|
||||
def createRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, subRelType:String, relClass:String, date:String):Relation = {
|
||||
def createRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, subRelType: String, relClass: String, date: String): Relation = {
|
||||
|
||||
val rel = new Relation
|
||||
rel.setCollectedfrom(List(collectedFromMap("pdb")).asJava)
|
||||
|
@ -251,7 +248,7 @@ object BioDBToOAF {
|
|||
rel.setTarget(s"unresolved::$pid::$pidType")
|
||||
|
||||
|
||||
val dateProps:KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date)
|
||||
val dateProps: KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date)
|
||||
|
||||
rel.setProperties(List(dateProps).asJava)
|
||||
|
||||
|
@ -262,8 +259,8 @@ object BioDBToOAF {
|
|||
}
|
||||
|
||||
|
||||
def createSupplementaryRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, date:String): Relation = {
|
||||
createRelation(pid,pidType,sourceId,collectedFrom, ModelConstants.SUPPLEMENT, ModelConstants.IS_SUPPLEMENT_TO, date)
|
||||
def createSupplementaryRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, date: String): Relation = {
|
||||
createRelation(pid, pidType, sourceId, collectedFrom, ModelConstants.SUPPLEMENT, ModelConstants.IS_SUPPLEMENT_TO, date)
|
||||
}
|
||||
|
||||
|
|
@ -1,8 +1,8 @@
|
|||
package eu.dnetlib.dhp.sx.graph.bio
|
||||
package eu.dnetllib.dhp.sx.bio
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
|
||||
import BioDBToOAF.ScholixResolved
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||
import eu.dnetllib.dhp.sx.bio.BioDBToOAF.ScholixResolved
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
||||
|
@ -33,15 +33,14 @@ object SparkTransformBioDatabaseToOAF {
|
|||
|
||||
implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
||||
import spark.implicits._
|
||||
|
||||
database.toUpperCase() match {
|
||||
case "UNIPROT" =>
|
||||
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
case "PDB"=>
|
||||
case "PDB" =>
|
||||
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
case "SCHOLIX" =>
|
||||
spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
case "CROSSREF_LINKS"=>
|
||||
case "CROSSREF_LINKS" =>
|
||||
spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
}
|
||||
}
|
|
@ -1,10 +1,10 @@
|
|||
package eu.dnetlib.dhp.sx.graph.ebi
|
||||
package eu.dnetllib.dhp.sx.bio.ebi
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||
import eu.dnetlib.dhp.schema.oaf.Result
|
||||
import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf}
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
||||
import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf}
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
|
||||
|
@ -24,24 +24,24 @@ import scala.xml.pull.XMLEventReader
|
|||
object SparkCreateBaselineDataFrame {
|
||||
|
||||
|
||||
def requestBaseLineUpdatePage(maxFile:String):List[(String,String)] = {
|
||||
val data =requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/")
|
||||
def requestBaseLineUpdatePage(maxFile: String): List[(String, String)] = {
|
||||
val data = requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/")
|
||||
|
||||
val result =data.lines.filter(l => l.startsWith("<a href=")).map{l =>
|
||||
val result = data.lines.filter(l => l.startsWith("<a href=")).map { l =>
|
||||
val end = l.lastIndexOf("\">")
|
||||
val start = l.indexOf("<a href=\"")
|
||||
|
||||
if (start>= 0 && end >start)
|
||||
l.substring(start+9, (end-start))
|
||||
if (start >= 0 && end > start)
|
||||
l.substring(start + 9, (end - start))
|
||||
else
|
||||
""
|
||||
}.filter(s =>s.endsWith(".gz") ).filter(s => s > maxFile).map(s => (s,s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s")).toList
|
||||
}.filter(s => s.endsWith(".gz")).filter(s => s > maxFile).map(s => (s, s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s")).toList
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
|
||||
def downloadBaselinePart(url:String):InputStream = {
|
||||
def downloadBaselinePart(url: String): InputStream = {
|
||||
val r = new HttpGet(url)
|
||||
val timeout = 60; // seconds
|
||||
val config = RequestConfig.custom()
|
||||
|
@ -55,7 +55,7 @@ object SparkCreateBaselineDataFrame {
|
|||
|
||||
}
|
||||
|
||||
def requestPage(url:String):String = {
|
||||
def requestPage(url: String): String = {
|
||||
val r = new HttpGet(url)
|
||||
val timeout = 60; // seconds
|
||||
val config = RequestConfig.custom()
|
||||
|
@ -90,25 +90,21 @@ object SparkCreateBaselineDataFrame {
|
|||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def downloadBaseLineUpdate(baselinePath:String, hdfsServerUri:String ):Unit = {
|
||||
def downloadBaseLineUpdate(baselinePath: String, hdfsServerUri: String): Unit = {
|
||||
|
||||
|
||||
val conf = new Configuration
|
||||
conf.set("fs.defaultFS", hdfsServerUri)
|
||||
val fs = FileSystem.get(conf)
|
||||
val p = new Path(baselinePath)
|
||||
val files = fs.listFiles(p,false)
|
||||
val files = fs.listFiles(p, false)
|
||||
var max_file = ""
|
||||
while (files.hasNext) {
|
||||
val c = files.next()
|
||||
val data = c.getPath.toString
|
||||
val fileName = data.substring(data.lastIndexOf("/")+1)
|
||||
val fileName = data.substring(data.lastIndexOf("/") + 1)
|
||||
|
||||
if (fileName> max_file)
|
||||
if (fileName > max_file)
|
||||
max_file = fileName
|
||||
}
|
||||
|
||||
|
@ -119,7 +115,7 @@ object SparkCreateBaselineDataFrame {
|
|||
val fsDataOutputStream: FSDataOutputStream = fs.create(hdfsWritePath, true)
|
||||
val i = downloadBaselinePart(u._2)
|
||||
val buffer = Array.fill[Byte](1024)(0)
|
||||
while(i.read(buffer)>0) {
|
||||
while (i.read(buffer) > 0) {
|
||||
fsDataOutputStream.write(buffer)
|
||||
}
|
||||
i.close()
|
||||
|
@ -134,11 +130,11 @@ object SparkCreateBaselineDataFrame {
|
|||
override def zero: PMArticle = new PMArticle
|
||||
|
||||
override def reduce(b: PMArticle, a: (String, PMArticle)): PMArticle = {
|
||||
if (b != null && b.getPmid!= null) b else a._2
|
||||
if (b != null && b.getPmid != null) b else a._2
|
||||
}
|
||||
|
||||
override def merge(b1: PMArticle, b2: PMArticle): PMArticle = {
|
||||
if (b1 != null && b1.getPmid!= null) b1 else b2
|
||||
if (b1 != null && b1.getPmid != null) b1 else b2
|
||||
|
||||
}
|
||||
|
||||
|
@ -153,7 +149,7 @@ object SparkCreateBaselineDataFrame {
|
|||
def main(args: Array[String]): Unit = {
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/baseline_to_oaf_params.json")))
|
||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json")))
|
||||
parser.parseArgument(args)
|
||||
val isLookupUrl: String = parser.get("isLookupUrl")
|
||||
log.info("isLookupUrl: {}", isLookupUrl)
|
||||
|
@ -175,9 +171,9 @@ object SparkCreateBaselineDataFrame {
|
|||
.config(conf)
|
||||
.appName(SparkEBILinksToOaf.getClass.getSimpleName)
|
||||
.master(parser.get("master")).getOrCreate()
|
||||
import spark.implicits._
|
||||
|
||||
val sc = spark.sparkContext
|
||||
import spark.implicits._
|
||||
|
||||
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
|
||||
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
|
||||
|
@ -186,21 +182,21 @@ object SparkCreateBaselineDataFrame {
|
|||
|
||||
downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri)
|
||||
|
||||
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000)
|
||||
val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{
|
||||
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline_ftp", 2000)
|
||||
val ds: Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i => {
|
||||
val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
|
||||
new PMParser(xml)
|
||||
|
||||
} ))
|
||||
}))
|
||||
|
||||
ds.map(p => (p.getPmid,p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1)
|
||||
ds.map(p => (p.getPmid, p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1)
|
||||
.agg(pmArticleAggregator.toColumn)
|
||||
.map(p => p._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset")
|
||||
|
||||
val exported_dataset = spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle]
|
||||
exported_dataset
|
||||
.map(a => PubMedToOaf.convert(a, vocabularies)).as[Result]
|
||||
.filter(p => p!= null)
|
||||
.filter(p => p != null)
|
||||
.write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
}
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
package eu.dnetlib.dhp.sx.graph.ebi
|
||||
package eu.dnetllib.dhp.sx.bio.ebi
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.EBILinkItem
|
||||
import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal}
|
||||
import eu.dnetllib.dhp.sx.bio.BioDBToOAF.EBILinkItem
|
||||
import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal}
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.http.client.config.RequestConfig
|
||||
import org.apache.http.client.methods.HttpGet
|
||||
|
@ -14,15 +14,15 @@ import org.slf4j.{Logger, LoggerFactory}
|
|||
|
||||
object SparkDownloadEBILinks {
|
||||
|
||||
def createEBILinks(pmid:Long):EBILinkItem = {
|
||||
def createEBILinks(pmid: Long): EBILinkItem = {
|
||||
|
||||
val res = requestLinks(pmid)
|
||||
if (res!=null)
|
||||
if (res != null)
|
||||
return EBILinkItem(pmid, res)
|
||||
null
|
||||
}
|
||||
|
||||
def requestPage(url:String):String = {
|
||||
def requestPage(url: String): String = {
|
||||
val r = new HttpGet(url)
|
||||
val timeout = 60; // seconds
|
||||
val config = RequestConfig.custom()
|
||||
|
@ -56,10 +56,11 @@ object SparkDownloadEBILinks {
|
|||
}
|
||||
}
|
||||
|
||||
def requestLinks(PMID:Long):String = {
|
||||
def requestLinks(PMID: Long): String = {
|
||||
requestPage(s"https://www.ebi.ac.uk/europepmc/webservices/rest/MED/$PMID/datalinks?format=json")
|
||||
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||
|
@ -86,26 +87,26 @@ object SparkDownloadEBILinks {
|
|||
log.info(s"workingPath -> $workingPath")
|
||||
|
||||
log.info("Getting max pubmedId where the links have been requested")
|
||||
val links:Dataset[EBILinkItem] = spark.read.load(s"$sourcePath/ebi_links_dataset").as[EBILinkItem]
|
||||
val lastPMIDRequested =links.map(l => l.id).select(max("value")).first.getLong(0)
|
||||
val links: Dataset[EBILinkItem] = spark.read.load(s"$sourcePath/ebi_links_dataset").as[EBILinkItem]
|
||||
val lastPMIDRequested = links.map(l => l.id).select(max("value")).first.getLong(0)
|
||||
|
||||
log.info("Retrieving PMID to request links")
|
||||
val pubmed = spark.read.load(s"$sourcePath/baseline_dataset").as[PMArticle]
|
||||
pubmed.map(p => p.getPmid.toLong).where(s"value > $lastPMIDRequested").write.mode(SaveMode.Overwrite).save(s"$workingPath/id_to_request")
|
||||
|
||||
val pmidToReq:Dataset[Long] = spark.read.load(s"$workingPath/id_to_request").as[Long]
|
||||
val pmidToReq: Dataset[Long] = spark.read.load(s"$workingPath/id_to_request").as[Long]
|
||||
|
||||
val total = pmidToReq.count()
|
||||
|
||||
spark.createDataset(pmidToReq.rdd.repartition((total/MAX_ITEM_PER_PARTITION).toInt).map(pmid =>createEBILinks(pmid)).filter(l => l!= null)).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_update")
|
||||
spark.createDataset(pmidToReq.rdd.repartition((total / MAX_ITEM_PER_PARTITION).toInt).map(pmid => createEBILinks(pmid)).filter(l => l != null)).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_update")
|
||||
|
||||
val updates:Dataset[EBILinkItem] =spark.read.load(s"$workingPath/links_update").as[EBILinkItem]
|
||||
val updates: Dataset[EBILinkItem] = spark.read.load(s"$workingPath/links_update").as[EBILinkItem]
|
||||
|
||||
links.union(updates).groupByKey(_.id)
|
||||
.reduceGroups{(x,y) =>
|
||||
if (x == null || x.links ==null)
|
||||
.reduceGroups { (x, y) =>
|
||||
if (x == null || x.links == null)
|
||||
y
|
||||
if (y ==null || y.links ==null)
|
||||
if (y == null || y.links == null)
|
||||
x
|
||||
if (x.links.length > y.links.length)
|
||||
x
|
|
@ -1,15 +1,14 @@
|
|||
package eu.dnetlib.dhp.sx.graph.ebi
|
||||
package eu.dnetllib.dhp.sx.bio.ebi
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||
import eu.dnetlib.dhp.sx.graph.bio
|
||||
import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF
|
||||
import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.EBILinkItem
|
||||
import eu.dnetllib.dhp.sx.bio.BioDBToOAF
|
||||
import eu.dnetllib.dhp.sx.bio.BioDBToOAF.EBILinkItem
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||
import org.apache.spark.sql._
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
object SparkEBILinksToOaf {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
@ -24,17 +23,17 @@ object SparkEBILinksToOaf {
|
|||
.appName(SparkEBILinksToOaf.getClass.getSimpleName)
|
||||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
|
||||
import spark.implicits._
|
||||
val sourcePath = parser.get("sourcePath")
|
||||
log.info(s"sourcePath -> $sourcePath")
|
||||
val targetPath = parser.get("targetPath")
|
||||
log.info(s"targetPath -> $targetPath")
|
||||
|
||||
import spark.implicits._
|
||||
implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
||||
|
||||
val ebLinks:Dataset[EBILinkItem] = spark.read.load(s"${sourcePath}_dataset").as[EBILinkItem].filter(l => l.links!= null)
|
||||
val ebLinks: Dataset[EBILinkItem] = spark.read.load(s"${sourcePath}_dataset").as[EBILinkItem].filter(l => l.links != null)
|
||||
|
||||
ebLinks.flatMap(j =>BioDBToOAF.parse_ebi_links(j.links))
|
||||
ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links))
|
||||
.filter(p => BioDBToOAF.EBITargetLinksFilter(p))
|
||||
.flatMap(p => BioDBToOAF.convertEBILinksToOaf(p))
|
||||
.write.mode(SaveMode.Overwrite).save(targetPath)
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.sx.graph.bio.pubmed;
|
||||
package eu.dnetllib.dhp.sx.bio.pubmed;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.sx.graph.bio.pubmed;
|
||||
package eu.dnetllib.dhp.sx.bio.pubmed;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.sx.graph.bio.pubmed;
|
||||
package eu.dnetllib.dhp.sx.bio.pubmed;
|
||||
|
||||
public class PMGrant {
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.sx.graph.bio.pubmed;
|
||||
package eu.dnetllib.dhp.sx.bio.pubmed;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.sx.graph.bio.pubmed
|
||||
package eu.dnetllib.dhp.sx.bio.pubmed
|
||||
|
||||
import scala.xml.MetaData
|
||||
import scala.xml.pull.{EvElemEnd, EvElemStart, EvText, XMLEventReader}
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.sx.graph.bio.pubmed;
|
||||
package eu.dnetllib.dhp.sx.bio.pubmed;
|
||||
|
||||
public class PMSubject {
|
||||
private String value;
|
|
@ -1,11 +1,12 @@
|
|||
package eu.dnetlib.dhp.sx.graph.bio.pubmed
|
||||
package eu.dnetllib.dhp.sx.bio.pubmed
|
||||
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||
import eu.dnetlib.dhp.schema.oaf._
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType}
|
||||
import eu.dnetlib.dhp.schema.oaf._
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import java.util.regex.Pattern
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
object PubMedToOaf {
|
||||
|
||||
|
@ -15,7 +16,7 @@ object PubMedToOaf {
|
|||
"doi" -> "https://dx.doi.org/"
|
||||
)
|
||||
|
||||
def cleanDoi(doi:String):String = {
|
||||
def cleanDoi(doi: String): String = {
|
||||
|
||||
val regex = "^10.\\d{4,9}\\/[\\[\\]\\-\\<\\>._;()\\/:A-Z0-9]+$"
|
||||
|
||||
|
@ -71,14 +72,14 @@ object PubMedToOaf {
|
|||
if (article.getPublicationTypes == null)
|
||||
return null
|
||||
val i = new Instance
|
||||
var pidList: List[StructuredProperty] = List(OafMapperUtils.structuredProperty(article.getPmid, PidType.pmid.toString, PidType.pmid.toString, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo))
|
||||
val pidList: List[StructuredProperty] = List(OafMapperUtils.structuredProperty(article.getPmid, PidType.pmid.toString, PidType.pmid.toString, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo))
|
||||
if (pidList == null)
|
||||
return null
|
||||
|
||||
var alternateIdentifier :StructuredProperty = null
|
||||
var alternateIdentifier: StructuredProperty = null
|
||||
if (article.getDoi != null) {
|
||||
val normalizedPid = cleanDoi(article.getDoi)
|
||||
if (normalizedPid!= null)
|
||||
if (normalizedPid != null)
|
||||
alternateIdentifier = OafMapperUtils.structuredProperty(normalizedPid, PidType.doi.toString, PidType.doi.toString, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo)
|
||||
}
|
||||
|
||||
|
@ -102,10 +103,10 @@ object PubMedToOaf {
|
|||
return result
|
||||
result.setDataInfo(dataInfo)
|
||||
i.setPid(pidList.asJava)
|
||||
if (alternateIdentifier!= null)
|
||||
if (alternateIdentifier != null)
|
||||
i.setAlternateIdentifier(List(alternateIdentifier).asJava)
|
||||
result.setInstance(List(i).asJava)
|
||||
i.getPid.asScala.filter(p => "pmid".equalsIgnoreCase(p.getQualifier.getClassid)).map(p => p.getValue)(collection breakOut)
|
||||
i.getPid.asScala.filter(p => "pmid".equalsIgnoreCase(p.getQualifier.getClassid)).map(p => p.getValue)(collection.breakOut)
|
||||
val urlLists: List[String] = pidList
|
||||
.map(s => (urlMap.getOrElse(s.getQualifier.getClassid, ""), s.getValue))
|
||||
.filter(t => t._1.nonEmpty)
|
||||
|
@ -136,7 +137,7 @@ object PubMedToOaf {
|
|||
}
|
||||
|
||||
|
||||
val subjects: List[StructuredProperty] = article.getSubjects.asScala.map(s => OafMapperUtils.structuredProperty(s.getValue, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, dataInfo))(collection breakOut)
|
||||
val subjects: List[StructuredProperty] = article.getSubjects.asScala.map(s => OafMapperUtils.structuredProperty(s.getValue, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, dataInfo))(collection.breakOut)
|
||||
if (subjects != null)
|
||||
result.setSubject(subjects.asJava)
|
||||
|
||||
|
@ -148,7 +149,7 @@ object PubMedToOaf {
|
|||
author.setFullname(a.getFullName)
|
||||
author.setRank(index + 1)
|
||||
author
|
||||
}(collection breakOut)
|
||||
}(collection.breakOut)
|
||||
|
||||
|
||||
if (authors != null && authors.nonEmpty)
|
|
@ -1,13 +1,9 @@
|
|||
<workflow-app name="Transform_Pubmed_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||
<workflow-app name="Download_Transform_Pubmed_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>baselineWorkingPath</name>
|
||||
<description>the Baseline Working Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>targetPath</name>
|
||||
<description>the Target Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>The IS lookUp service endopoint</description>
|
||||
|
@ -24,8 +20,8 @@
|
|||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Convert Baseline to Dataset</name>
|
||||
<class>eu.dnetlib.dhp.sx.graph.ebi.SparkCreateBaselineDataFrame</class>
|
||||
<name>Convert Baseline to OAF Dataset</name>
|
||||
<class>eu.dnetllib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
|
@ -38,9 +34,10 @@
|
|||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--workingPath</arg><arg>${baselineWorkingPath}</arg>
|
||||
<arg>--targetPath</arg><arg>${targetPath}</arg>
|
||||
<arg>--targetPath</arg><arg>${baselineWorkingPath}/transformed</arg>
|
||||
<arg>--master</arg><arg>yarn</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
|
@ -1,13 +1,10 @@
|
|||
package eu.dnetlib.dhp.sx.graph.bio.pubmed
|
||||
package eu.dnetllib.dhp.sx.bio
|
||||
|
||||
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.{CleaningFunctions, OafMapperUtils, PidType}
|
||||
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
|
||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
|
||||
import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.ScholixResolved
|
||||
import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF
|
||||
import eu.dnetlib.dhp.sx.graph.bio.pubmed.PubMedToOaf.dataInfo
|
||||
import eu.dnetlib.dhp.sx.graph.ebi.SparkDownloadEBILinks
|
||||
import eu.dnetllib.dhp.sx.bio.BioDBToOAF.ScholixResolved
|
||||
import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMParser, PubMedToOaf}
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.JsonAST.{JField, JObject, JString}
|
||||
import org.json4s.jackson.JsonMethods.parse
|
||||
|
@ -55,7 +52,7 @@ class BioScholixTest extends AbstractVocabularyTest{
|
|||
|
||||
@Test
|
||||
def testEBIData() = {
|
||||
val inputXML = Source.fromInputStream(getClass.getResourceAsStream("pubmed.xml")).mkString
|
||||
val inputXML = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed.xml")).mkString
|
||||
val xml = new XMLEventReader(Source.fromBytes(inputXML.getBytes()))
|
||||
new PMParser(xml).foreach(s =>println(mapper.writeValueAsString(s)))
|
||||
}
|
||||
|
@ -65,7 +62,7 @@ class BioScholixTest extends AbstractVocabularyTest{
|
|||
def testPubmedToOaf(): Unit = {
|
||||
assertNotNull(vocabularies)
|
||||
assertTrue(vocabularies.vocabularyExists("dnet:publication_resource"))
|
||||
val records:String =Source.fromInputStream(getClass.getResourceAsStream("pubmed_dump")).mkString
|
||||
val records:String =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump")).mkString
|
||||
val r:List[Oaf] = records.lines.toList.map(s=>mapper.readValue(s, classOf[PMArticle])).map(a => PubMedToOaf.convert(a, vocabularies))
|
||||
assertEquals(10, r.size)
|
||||
assertTrue(r.map(p => p.asInstanceOf[Result]).flatMap(p => p.getInstance().asScala.map(i => i.getInstancetype.getClassid)).exists(p => "0037".equalsIgnoreCase(p)))
|
|
@ -84,7 +84,8 @@ public class PropagationConstant {
|
|||
return di;
|
||||
}
|
||||
|
||||
public static Qualifier getQualifier(String inference_class_id, String inference_class_name, String qualifierSchema) {
|
||||
public static Qualifier getQualifier(String inference_class_id, String inference_class_name,
|
||||
String qualifierSchema) {
|
||||
Qualifier pa = new Qualifier();
|
||||
pa.setClassid(inference_class_id);
|
||||
pa.setClassname(inference_class_name);
|
||||
|
@ -108,7 +109,11 @@ public class PropagationConstant {
|
|||
r.setRelClass(rel_class);
|
||||
r.setRelType(rel_type);
|
||||
r.setSubRelType(subrel_type);
|
||||
r.setDataInfo(getDataInfo(inference_provenance, inference_class_id, inference_class_name, ModelConstants.DNET_PROVENANCE_ACTIONS));
|
||||
r
|
||||
.setDataInfo(
|
||||
getDataInfo(
|
||||
inference_provenance, inference_class_id, inference_class_name,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS));
|
||||
return r;
|
||||
}
|
||||
|
||||
|
|
|
@ -173,7 +173,10 @@ public class SparkOrcidToResultFromSemRelJob {
|
|||
if (toaddpid) {
|
||||
StructuredProperty p = new StructuredProperty();
|
||||
p.setValue(autoritative_author.getOrcid());
|
||||
p.setQualifier(getQualifier(ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES));
|
||||
p
|
||||
.setQualifier(
|
||||
getQualifier(
|
||||
ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES));
|
||||
p
|
||||
.setDataInfo(
|
||||
getDataInfo(
|
||||
|
|
|
@ -10,7 +10,6 @@ import java.util.List;
|
|||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
|
@ -22,6 +21,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import scala.Tuple2;
|
||||
|
|
|
@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
|||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
|
@ -20,6 +19,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import scala.Tuple2;
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package eu.dnetlib.dhp.sx.graph.pangaea
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.sx.graph.ebi.SparkEBILinksToOaf
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
||||
|
|
|
@ -84,13 +84,15 @@ public class IndexRecordTransformerTest {
|
|||
|
||||
@Test
|
||||
public void testForEOSCFutureTraining() throws IOException, TransformerException {
|
||||
final String record = IOUtils.toString(getClass().getResourceAsStream("eosc-future/training-notebooks-seadatanet.xml"));
|
||||
final String record = IOUtils
|
||||
.toString(getClass().getResourceAsStream("eosc-future/training-notebooks-seadatanet.xml"));
|
||||
testRecordTransformation(record);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForEOSCFutureAirQualityCopernicus() throws IOException, TransformerException {
|
||||
final String record = IOUtils.toString(getClass().getResourceAsStream("eosc-future/air-quality-copernicus.xml"));
|
||||
final String record = IOUtils
|
||||
.toString(getClass().getResourceAsStream("eosc-future/air-quality-copernicus.xml"));
|
||||
testRecordTransformation(record);
|
||||
}
|
||||
|
||||
|
@ -102,12 +104,11 @@ public class IndexRecordTransformerTest {
|
|||
|
||||
@Test
|
||||
public void testForEOSCFutureB2SharePlotRelatedORP() throws IOException, TransformerException {
|
||||
final String record = IOUtils.toString(getClass().getResourceAsStream("eosc-future/b2share-plot-related-orp.xml"));
|
||||
final String record = IOUtils
|
||||
.toString(getClass().getResourceAsStream("eosc-future/b2share-plot-related-orp.xml"));
|
||||
testRecordTransformation(record);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void testRecordTransformation(final String record) throws IOException, TransformerException {
|
||||
final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml"));
|
||||
final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"));
|
||||
|
|
Loading…
Reference in New Issue