diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/BioDBToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/BioDBToOAF.scala
similarity index 83%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/BioDBToOAF.scala
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/BioDBToOAF.scala
index 90b65c8f70..dffc88c6ca 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/BioDBToOAF.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/BioDBToOAF.scala
@@ -1,14 +1,12 @@
-package eu.dnetlib.dhp.sx.graph.bio
+package eu.dnetllib.dhp.sx.bio
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, OafMapperUtils}
-import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, Instance, KeyValue, Oaf, Relation, StructuredProperty}
+import eu.dnetlib.dhp.schema.oaf._
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.{compact, parse, render}
-
-import scala.collection.JavaConverters._
-
+import collection.JavaConverters._
object BioDBToOAF {
case class EBILinkItem(id: Long, links: String) {}
@@ -17,23 +15,23 @@ object BioDBToOAF {
case class UniprotDate(date: String, date_info: String) {}
- case class ScholixResolved(pid:String, pidType:String, typology:String, tilte:List[String], datasource:List[String], date:List[String], authors:List[String]){}
+ case class ScholixResolved(pid: String, pidType: String, typology: String, tilte: List[String], datasource: List[String], date: List[String], authors: List[String]) {}
val DATA_INFO: DataInfo = OafMapperUtils.dataInfo(false, null, false, false, ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, "0.9")
val SUBJ_CLASS = "Keywords"
val DATE_RELATION_KEY = "RelationDate"
- val resolvedURL:Map[String,String] = Map(
- "genbank"-> "https://www.ncbi.nlm.nih.gov/nuccore/",
- "ncbi-n" -> "https://www.ncbi.nlm.nih.gov/nuccore/",
- "ncbi-wgs" -> "https://www.ncbi.nlm.nih.gov/nuccore/",
- "ncbi-p" -> "https://www.ncbi.nlm.nih.gov/protein/",
- "ena" -> "https://www.ebi.ac.uk/ena/browser/view/",
- "clinicaltrials.gov"-> "https://clinicaltrials.gov/ct2/show/",
- "onim"-> "https://omim.org/entry/",
- "refseq"-> "https://www.ncbi.nlm.nih.gov/nuccore/",
- "geo"-> "https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc="
+ val resolvedURL: Map[String, String] = Map(
+ "genbank" -> "https://www.ncbi.nlm.nih.gov/nuccore/",
+ "ncbi-n" -> "https://www.ncbi.nlm.nih.gov/nuccore/",
+ "ncbi-wgs" -> "https://www.ncbi.nlm.nih.gov/nuccore/",
+ "ncbi-p" -> "https://www.ncbi.nlm.nih.gov/protein/",
+ "ena" -> "https://www.ebi.ac.uk/ena/browser/view/",
+ "clinicaltrials.gov" -> "https://clinicaltrials.gov/ct2/show/",
+ "onim" -> "https://omim.org/entry/",
+ "refseq" -> "https://www.ncbi.nlm.nih.gov/nuccore/",
+ "geo" -> "https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc="
)
@@ -45,7 +43,7 @@ object BioDBToOAF {
val ElsevierCollectedFrom: KeyValue = OafMapperUtils.keyValue("10|openaire____::8f87e10869299a5fe80b315695296b88", "Elsevier")
val springerNatureCollectedFrom: KeyValue = OafMapperUtils.keyValue("10|openaire____::6e380d9cf51138baec8480f5a0ce3a2e", "Springer Nature")
val EBICollectedFrom: KeyValue = OafMapperUtils.keyValue("10|opendoar____::83e60e09c222f206c725385f53d7e567c", "EMBL-EBIs Protein Data Bank in Europe (PDBe)")
- val pubmedCollectedFrom:KeyValue = OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central")
+ val pubmedCollectedFrom: KeyValue = OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central")
UNIPROTCollectedFrom.setDataInfo(DATA_INFO)
PDBCollectedFrom.setDataInfo(DATA_INFO)
@@ -58,9 +56,9 @@ object BioDBToOAF {
Map(
"uniprot" -> UNIPROTCollectedFrom,
- "pdb"-> PDBCollectedFrom,
- "elsevier" ->ElsevierCollectedFrom,
- "ebi" ->EBICollectedFrom,
+ "pdb" -> PDBCollectedFrom,
+ "elsevier" -> ElsevierCollectedFrom,
+ "ebi" -> EBICollectedFrom,
"Springer Nature" -> springerNatureCollectedFrom,
"NCBI Nucleotide" -> ncbiCollectedFrom,
"European Nucleotide Archive" -> enaCollectedFrom,
@@ -68,7 +66,7 @@ object BioDBToOAF {
)
}
- def crossrefLinksToOaf(input:String):Oaf = {
+ def crossrefLinksToOaf(input: String): Oaf = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json = parse(input)
val source_pid = (json \ "Source" \ "Identifier" \ "ID").extract[String].toLowerCase
@@ -77,16 +75,16 @@ object BioDBToOAF {
val target_pid = (json \ "Target" \ "Identifier" \ "ID").extract[String].toLowerCase
val target_pid_type = (json \ "Target" \ "Identifier" \ "IDScheme").extract[String].toLowerCase
- val relation_semantic= (json \ "RelationshipType" \ "Name").extract[String]
+ val relation_semantic = (json \ "RelationshipType" \ "Name").extract[String]
val date = GraphCleaningFunctions.cleanDate((json \ "LinkedPublicationDate").extract[String])
- createRelation(target_pid, target_pid_type, generate_unresolved_id(source_pid, source_pid_type),collectedFromMap("elsevier"),"relationship", relation_semantic, date)
+ createRelation(target_pid, target_pid_type, generate_unresolved_id(source_pid, source_pid_type), collectedFromMap("elsevier"), "relationship", relation_semantic, date)
}
- def scholixResolvedToOAF(input:ScholixResolved):Oaf = {
+ def scholixResolvedToOAF(input: ScholixResolved): Oaf = {
val d = new Dataset
@@ -127,18 +125,18 @@ object BioDBToOAF {
d.setInstance(List(i).asJava)
if (input.authors != null && input.authors.nonEmpty) {
- val authors = input.authors.map(a =>{
+ val authors = input.authors.map(a => {
val authorOAF = new Author
authorOAF.setFullname(a)
authorOAF
})
d.setAuthor(authors.asJava)
}
- if (input.date!= null && input.date.nonEmpty) {
- val dt = input.date.head
- i.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO))
- d.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO))
- }
+ if (input.date != null && input.date.nonEmpty) {
+ val dt = input.date.head
+ i.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO))
+ d.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO))
+ }
d
}
@@ -190,7 +188,7 @@ object BioDBToOAF {
OafMapperUtils.structuredProperty(s, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, null)
).asJava)
}
- var i_date:Option[UniprotDate] = None
+ var i_date: Option[UniprotDate] = None
if (dates.nonEmpty) {
i_date = dates.find(d => d.date_info.contains("entry version"))
@@ -218,12 +216,12 @@ object BioDBToOAF {
if (references_pmid != null && references_pmid.nonEmpty) {
- val rel = createRelation(references_pmid.head, "pmid", d.getId, collectedFromMap("uniprot"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, if (i_date.isDefined) i_date.get.date else null)
+ val rel = createRelation(references_pmid.head, "pmid", d.getId, collectedFromMap("uniprot"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, if (i_date.isDefined) i_date.get.date else null)
rel.getCollectedfrom
List(d, rel)
}
else if (references_doi != null && references_doi.nonEmpty) {
- val rel = createRelation(references_doi.head, "doi", d.getId, collectedFromMap("uniprot"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, if (i_date.isDefined) i_date.get.date else null)
+ val rel = createRelation(references_doi.head, "doi", d.getId, collectedFromMap("uniprot"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, if (i_date.isDefined) i_date.get.date else null)
List(d, rel)
}
else
@@ -231,13 +229,12 @@ object BioDBToOAF {
}
-
- def generate_unresolved_id(pid:String, pidType:String) :String = {
+ def generate_unresolved_id(pid: String, pidType: String): String = {
s"unresolved::$pid::$pidType"
}
- def createRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, subRelType:String, relClass:String, date:String):Relation = {
+ def createRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, subRelType: String, relClass: String, date: String): Relation = {
val rel = new Relation
rel.setCollectedfrom(List(collectedFromMap("pdb")).asJava)
@@ -251,7 +248,7 @@ object BioDBToOAF {
rel.setTarget(s"unresolved::$pid::$pidType")
- val dateProps:KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date)
+ val dateProps: KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date)
rel.setProperties(List(dateProps).asJava)
@@ -262,8 +259,8 @@ object BioDBToOAF {
}
- def createSupplementaryRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, date:String): Relation = {
- createRelation(pid,pidType,sourceId,collectedFrom, ModelConstants.SUPPLEMENT, ModelConstants.IS_SUPPLEMENT_TO, date)
+ def createSupplementaryRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, date: String): Relation = {
+ createRelation(pid, pidType, sourceId, collectedFrom, ModelConstants.SUPPLEMENT, ModelConstants.IS_SUPPLEMENT_TO, date)
}
@@ -338,7 +335,7 @@ object BioDBToOAF {
def EBITargetLinksFilter(input: EBILinks): Boolean = {
- input.targetPidType.equalsIgnoreCase("ena") || input.targetPidType.equalsIgnoreCase("pdb") || input.targetPidType.equalsIgnoreCase("uniprot")
+ input.targetPidType.equalsIgnoreCase("ena") || input.targetPidType.equalsIgnoreCase("pdb") || input.targetPidType.equalsIgnoreCase("uniprot")
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
similarity index 86%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/SparkTransformBioDatabaseToOAF.scala
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
index d66cc84eca..16d2b25a62 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/SparkTransformBioDatabaseToOAF.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
@@ -1,8 +1,8 @@
-package eu.dnetlib.dhp.sx.graph.bio
+package eu.dnetllib.dhp.sx.bio
import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
-import BioDBToOAF.ScholixResolved
+import eu.dnetlib.dhp.schema.oaf.Oaf
+import eu.dnetllib.dhp.sx.bio.BioDBToOAF.ScholixResolved
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
@@ -31,17 +31,16 @@ object SparkTransformBioDatabaseToOAF {
.master(parser.get("master")).getOrCreate()
val sc = spark.sparkContext
- implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
- import spark.implicits._
-
+ implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
+ import spark.implicits._
database.toUpperCase() match {
case "UNIPROT" =>
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).write.mode(SaveMode.Overwrite).save(targetPath)
- case "PDB"=>
+ case "PDB" =>
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath)
case "SCHOLIX" =>
spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).write.mode(SaveMode.Overwrite).save(targetPath)
- case "CROSSREF_LINKS"=>
+ case "CROSSREF_LINKS" =>
spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath)
}
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala
similarity index 76%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkCreateBaselineDataFrame.scala
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala
index ee76cf8ad1..97b3cdc99a 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkCreateBaselineDataFrame.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala
@@ -1,10 +1,10 @@
-package eu.dnetlib.dhp.sx.graph.ebi
+package eu.dnetllib.dhp.sx.bio.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.oaf.Result
-import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf}
import eu.dnetlib.dhp.utils.ISLookupClientFactory
+import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
@@ -24,24 +24,24 @@ import scala.xml.pull.XMLEventReader
object SparkCreateBaselineDataFrame {
- def requestBaseLineUpdatePage(maxFile:String):List[(String,String)] = {
- val data =requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/")
+ def requestBaseLineUpdatePage(maxFile: String): List[(String, String)] = {
+ val data = requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/")
- val result =data.lines.filter(l => l.startsWith("")
val start = l.indexOf("= 0 && end >start)
- l.substring(start+9, (end-start))
+ if (start >= 0 && end > start)
+ l.substring(start + 9, (end - start))
else
""
- }.filter(s =>s.endsWith(".gz") ).filter(s => s > maxFile).map(s => (s,s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s")).toList
+ }.filter(s => s.endsWith(".gz")).filter(s => s > maxFile).map(s => (s, s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s")).toList
result
}
- def downloadBaselinePart(url:String):InputStream = {
+ def downloadBaselinePart(url: String): InputStream = {
val r = new HttpGet(url)
val timeout = 60; // seconds
val config = RequestConfig.custom()
@@ -55,7 +55,7 @@ object SparkCreateBaselineDataFrame {
}
- def requestPage(url:String):String = {
+ def requestPage(url: String): String = {
val r = new HttpGet(url)
val timeout = 60; // seconds
val config = RequestConfig.custom()
@@ -90,25 +90,21 @@ object SparkCreateBaselineDataFrame {
}
-
-
-
-
- def downloadBaseLineUpdate(baselinePath:String, hdfsServerUri:String ):Unit = {
+ def downloadBaseLineUpdate(baselinePath: String, hdfsServerUri: String): Unit = {
val conf = new Configuration
conf.set("fs.defaultFS", hdfsServerUri)
- val fs = FileSystem.get(conf)
+ val fs = FileSystem.get(conf)
val p = new Path(baselinePath)
- val files = fs.listFiles(p,false)
+ val files = fs.listFiles(p, false)
var max_file = ""
while (files.hasNext) {
val c = files.next()
val data = c.getPath.toString
- val fileName = data.substring(data.lastIndexOf("/")+1)
+ val fileName = data.substring(data.lastIndexOf("/") + 1)
- if (fileName> max_file)
+ if (fileName > max_file)
max_file = fileName
}
@@ -119,7 +115,7 @@ object SparkCreateBaselineDataFrame {
val fsDataOutputStream: FSDataOutputStream = fs.create(hdfsWritePath, true)
val i = downloadBaselinePart(u._2)
val buffer = Array.fill[Byte](1024)(0)
- while(i.read(buffer)>0) {
+ while (i.read(buffer) > 0) {
fsDataOutputStream.write(buffer)
}
i.close()
@@ -134,11 +130,11 @@ object SparkCreateBaselineDataFrame {
override def zero: PMArticle = new PMArticle
override def reduce(b: PMArticle, a: (String, PMArticle)): PMArticle = {
- if (b != null && b.getPmid!= null) b else a._2
+ if (b != null && b.getPmid != null) b else a._2
}
override def merge(b1: PMArticle, b2: PMArticle): PMArticle = {
- if (b1 != null && b1.getPmid!= null) b1 else b2
+ if (b1 != null && b1.getPmid != null) b1 else b2
}
@@ -153,7 +149,7 @@ object SparkCreateBaselineDataFrame {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val log: Logger = LoggerFactory.getLogger(getClass)
- val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/baseline_to_oaf_params.json")))
+ val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json")))
parser.parseArgument(args)
val isLookupUrl: String = parser.get("isLookupUrl")
log.info("isLookupUrl: {}", isLookupUrl)
@@ -175,32 +171,32 @@ object SparkCreateBaselineDataFrame {
.config(conf)
.appName(SparkEBILinksToOaf.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
- import spark.implicits._
val sc = spark.sparkContext
+ import spark.implicits._
- implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
- implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
- implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
- implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
+ implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
+ implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
+ implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
+ implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri)
- val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000)
- val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{
+ val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline_ftp", 2000)
+ val ds: Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i => {
val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
new PMParser(xml)
- } ))
+ }))
- ds.map(p => (p.getPmid,p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1)
+ ds.map(p => (p.getPmid, p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1)
.agg(pmArticleAggregator.toColumn)
.map(p => p._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset")
val exported_dataset = spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle]
exported_dataset
.map(a => PubMedToOaf.convert(a, vocabularies)).as[Result]
- .filter(p => p!= null)
+ .filter(p => p != null)
.write.mode(SaveMode.Overwrite).save(targetPath)
}
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkDownloadEBILinks.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala
similarity index 70%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkDownloadEBILinks.scala
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala
index e940fdff08..578db1ea94 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkDownloadEBILinks.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala
@@ -1,8 +1,8 @@
-package eu.dnetlib.dhp.sx.graph.ebi
+package eu.dnetllib.dhp.sx.bio.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.EBILinkItem
-import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal}
+import eu.dnetllib.dhp.sx.bio.BioDBToOAF.EBILinkItem
+import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal}
import org.apache.commons.io.IOUtils
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.HttpGet
@@ -14,15 +14,15 @@ import org.slf4j.{Logger, LoggerFactory}
object SparkDownloadEBILinks {
- def createEBILinks(pmid:Long):EBILinkItem = {
+ def createEBILinks(pmid: Long): EBILinkItem = {
val res = requestLinks(pmid)
- if (res!=null)
+ if (res != null)
return EBILinkItem(pmid, res)
null
}
- def requestPage(url:String):String = {
+ def requestPage(url: String): String = {
val r = new HttpGet(url)
val timeout = 60; // seconds
val config = RequestConfig.custom()
@@ -56,10 +56,11 @@ object SparkDownloadEBILinks {
}
}
- def requestLinks(PMID:Long):String = {
+ def requestLinks(PMID: Long): String = {
requestPage(s"https://www.ebi.ac.uk/europepmc/webservices/rest/MED/$PMID/datalinks?format=json")
}
+
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
@@ -76,9 +77,9 @@ object SparkDownloadEBILinks {
import spark.implicits._
- implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
- implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
- implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
+ implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
+ implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
+ implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
@@ -86,29 +87,29 @@ object SparkDownloadEBILinks {
log.info(s"workingPath -> $workingPath")
log.info("Getting max pubmedId where the links have been requested")
- val links:Dataset[EBILinkItem] = spark.read.load(s"$sourcePath/ebi_links_dataset").as[EBILinkItem]
- val lastPMIDRequested =links.map(l => l.id).select(max("value")).first.getLong(0)
+ val links: Dataset[EBILinkItem] = spark.read.load(s"$sourcePath/ebi_links_dataset").as[EBILinkItem]
+ val lastPMIDRequested = links.map(l => l.id).select(max("value")).first.getLong(0)
log.info("Retrieving PMID to request links")
val pubmed = spark.read.load(s"$sourcePath/baseline_dataset").as[PMArticle]
pubmed.map(p => p.getPmid.toLong).where(s"value > $lastPMIDRequested").write.mode(SaveMode.Overwrite).save(s"$workingPath/id_to_request")
- val pmidToReq:Dataset[Long] = spark.read.load(s"$workingPath/id_to_request").as[Long]
+ val pmidToReq: Dataset[Long] = spark.read.load(s"$workingPath/id_to_request").as[Long]
val total = pmidToReq.count()
- spark.createDataset(pmidToReq.rdd.repartition((total/MAX_ITEM_PER_PARTITION).toInt).map(pmid =>createEBILinks(pmid)).filter(l => l!= null)).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_update")
+ spark.createDataset(pmidToReq.rdd.repartition((total / MAX_ITEM_PER_PARTITION).toInt).map(pmid => createEBILinks(pmid)).filter(l => l != null)).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_update")
- val updates:Dataset[EBILinkItem] =spark.read.load(s"$workingPath/links_update").as[EBILinkItem]
+ val updates: Dataset[EBILinkItem] = spark.read.load(s"$workingPath/links_update").as[EBILinkItem]
links.union(updates).groupByKey(_.id)
- .reduceGroups{(x,y) =>
- if (x == null || x.links ==null)
+ .reduceGroups { (x, y) =>
+ if (x == null || x.links == null)
y
- if (y ==null || y.links ==null)
+ if (y == null || y.links == null)
x
if (x.links.length > y.links.length)
- x
+ x
else
y
}.map(_._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_final")
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
similarity index 67%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
index 1924d919e5..0db469769c 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
@@ -1,15 +1,14 @@
-package eu.dnetlib.dhp.sx.graph.ebi
+package eu.dnetllib.dhp.sx.bio.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
-import eu.dnetlib.dhp.sx.graph.bio
-import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF
-import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.EBILinkItem
+import eu.dnetllib.dhp.sx.bio.BioDBToOAF
+import eu.dnetllib.dhp.sx.bio.BioDBToOAF.EBILinkItem
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
+
object SparkEBILinksToOaf {
def main(args: Array[String]): Unit = {
@@ -24,17 +23,17 @@ object SparkEBILinksToOaf {
.appName(SparkEBILinksToOaf.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
+
+ import spark.implicits._
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
+ implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
- import spark.implicits._
- implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
+ val ebLinks: Dataset[EBILinkItem] = spark.read.load(s"${sourcePath}_dataset").as[EBILinkItem].filter(l => l.links != null)
- val ebLinks:Dataset[EBILinkItem] = spark.read.load(s"${sourcePath}_dataset").as[EBILinkItem].filter(l => l.links!= null)
-
- ebLinks.flatMap(j =>BioDBToOAF.parse_ebi_links(j.links))
+ ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links))
.filter(p => BioDBToOAF.EBITargetLinksFilter(p))
.flatMap(p => BioDBToOAF.convertEBILinksToOaf(p))
.write.mode(SaveMode.Overwrite).save(targetPath)
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMArticle.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMArticle.java
similarity index 97%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMArticle.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMArticle.java
index 211cbcffb4..305bb89be0 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMArticle.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMArticle.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.sx.graph.bio.pubmed;
+package eu.dnetllib.dhp.sx.bio.pubmed;
import java.io.Serializable;
import java.util.ArrayList;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMAuthor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMAuthor.java
similarity index 92%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMAuthor.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMAuthor.java
index ba69998c5d..c89929981b 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMAuthor.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMAuthor.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.sx.graph.bio.pubmed;
+package eu.dnetllib.dhp.sx.bio.pubmed;
import java.io.Serializable;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMGrant.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMGrant.java
similarity index 93%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMGrant.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMGrant.java
index 0c3fd46010..7df5dd5f2f 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMGrant.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMGrant.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.sx.graph.bio.pubmed;
+package eu.dnetllib.dhp.sx.bio.pubmed;
public class PMGrant {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMJournal.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMJournal.java
similarity index 94%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMJournal.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMJournal.java
index d251354d47..6065416f8d 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMJournal.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMJournal.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.sx.graph.bio.pubmed;
+package eu.dnetllib.dhp.sx.bio.pubmed;
import java.io.Serializable;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMParser.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMParser.scala
similarity index 99%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMParser.scala
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMParser.scala
index 8744bdfb4c..8fa226b7d5 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMParser.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMParser.scala
@@ -1,4 +1,4 @@
-package eu.dnetlib.dhp.sx.graph.bio.pubmed
+package eu.dnetllib.dhp.sx.bio.pubmed
import scala.xml.MetaData
import scala.xml.pull.{EvElemEnd, EvElemStart, EvText, XMLEventReader}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMSubject.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMSubject.java
similarity index 94%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMSubject.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMSubject.java
index 354b2cbe5d..e6ab61b875 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMSubject.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMSubject.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.sx.graph.bio.pubmed;
+package eu.dnetllib.dhp.sx.bio.pubmed;
public class PMSubject {
private String value;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PubMedToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PubMedToOaf.scala
similarity index 93%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PubMedToOaf.scala
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PubMedToOaf.scala
index 202eb7b14d..a1777a230c 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PubMedToOaf.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PubMedToOaf.scala
@@ -1,11 +1,12 @@
-package eu.dnetlib.dhp.sx.graph.bio.pubmed
+package eu.dnetllib.dhp.sx.bio.pubmed
+
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.common.ModelConstants
-import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType}
+import eu.dnetlib.dhp.schema.oaf._
+import scala.collection.JavaConverters._
import java.util.regex.Pattern
-import scala.collection.JavaConverters._
object PubMedToOaf {
@@ -15,7 +16,7 @@ object PubMedToOaf {
"doi" -> "https://dx.doi.org/"
)
- def cleanDoi(doi:String):String = {
+ def cleanDoi(doi: String): String = {
val regex = "^10.\\d{4,9}\\/[\\[\\]\\-\\<\\>._;()\\/:A-Z0-9]+$"
@@ -71,14 +72,14 @@ object PubMedToOaf {
if (article.getPublicationTypes == null)
return null
val i = new Instance
- var pidList: List[StructuredProperty] = List(OafMapperUtils.structuredProperty(article.getPmid, PidType.pmid.toString, PidType.pmid.toString, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo))
+ val pidList: List[StructuredProperty] = List(OafMapperUtils.structuredProperty(article.getPmid, PidType.pmid.toString, PidType.pmid.toString, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo))
if (pidList == null)
return null
- var alternateIdentifier :StructuredProperty = null
+ var alternateIdentifier: StructuredProperty = null
if (article.getDoi != null) {
val normalizedPid = cleanDoi(article.getDoi)
- if (normalizedPid!= null)
+ if (normalizedPid != null)
alternateIdentifier = OafMapperUtils.structuredProperty(normalizedPid, PidType.doi.toString, PidType.doi.toString, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo)
}
@@ -102,10 +103,10 @@ object PubMedToOaf {
return result
result.setDataInfo(dataInfo)
i.setPid(pidList.asJava)
- if (alternateIdentifier!= null)
+ if (alternateIdentifier != null)
i.setAlternateIdentifier(List(alternateIdentifier).asJava)
result.setInstance(List(i).asJava)
- i.getPid.asScala.filter(p => "pmid".equalsIgnoreCase(p.getQualifier.getClassid)).map(p => p.getValue)(collection breakOut)
+ i.getPid.asScala.filter(p => "pmid".equalsIgnoreCase(p.getQualifier.getClassid)).map(p => p.getValue)(collection.breakOut)
val urlLists: List[String] = pidList
.map(s => (urlMap.getOrElse(s.getQualifier.getClassid, ""), s.getValue))
.filter(t => t._1.nonEmpty)
@@ -136,7 +137,7 @@ object PubMedToOaf {
}
- val subjects: List[StructuredProperty] = article.getSubjects.asScala.map(s => OafMapperUtils.structuredProperty(s.getValue, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, dataInfo))(collection breakOut)
+ val subjects: List[StructuredProperty] = article.getSubjects.asScala.map(s => OafMapperUtils.structuredProperty(s.getValue, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, dataInfo))(collection.breakOut)
if (subjects != null)
result.setSubject(subjects.asJava)
@@ -148,7 +149,7 @@ object PubMedToOaf {
author.setFullname(a.getFullName)
author.setRank(index + 1)
author
- }(collection breakOut)
+ }(collection.breakOut)
if (authors != null && authors.nonEmpty)
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/baseline_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/baseline_to_oaf_params.json
rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_download_update.json
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json
rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_download_update.json
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json
rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/pubmed/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/pubmed/oozie_app/config-default.xml
rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
similarity index 78%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/pubmed/oozie_app/workflow.xml
rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
index 914d1c2c78..f5a98ba5ef 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/pubmed/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
@@ -1,13 +1,9 @@
-
+
baselineWorkingPath
the Baseline Working Path
-
- targetPath
- the Target Path
-
isLookupUrl
The IS lookUp service endopoint
@@ -24,8 +20,8 @@
yarn
cluster
- Convert Baseline to Dataset
- eu.dnetlib.dhp.sx.graph.ebi.SparkCreateBaselineDataFrame
+ Convert Baseline to OAF Dataset
+ eu.dnetllib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame
dhp-graph-mapper-${projectVersion}.jar
--executor-memory=${sparkExecutorMemory}
@@ -38,9 +34,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--workingPath${baselineWorkingPath}
- --targetPath${targetPath}
+ --targetPath${baselineWorkingPath}/transformed
--masteryarn
--isLookupUrl${isLookupUrl}
+ --hdfsServerUri${nameNode}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/BioScholixTest.scala b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetllib/dhp/sx/bio/BioScholixTest.scala
similarity index 92%
rename from dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/BioScholixTest.scala
rename to dhp-workflows/dhp-aggregation/src/test/java/eu/dnetllib/dhp/sx/bio/BioScholixTest.scala
index 87279eb210..c072f149ce 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/BioScholixTest.scala
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetllib/dhp/sx/bio/BioScholixTest.scala
@@ -1,13 +1,10 @@
-package eu.dnetlib.dhp.sx.graph.bio.pubmed
+package eu.dnetllib.dhp.sx.bio
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
-import eu.dnetlib.dhp.schema.common.ModelConstants
-import eu.dnetlib.dhp.schema.oaf.utils.{CleaningFunctions, OafMapperUtils, PidType}
+import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
-import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.ScholixResolved
-import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF
-import eu.dnetlib.dhp.sx.graph.bio.pubmed.PubMedToOaf.dataInfo
-import eu.dnetlib.dhp.sx.graph.ebi.SparkDownloadEBILinks
+import eu.dnetllib.dhp.sx.bio.BioDBToOAF.ScholixResolved
+import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMParser, PubMedToOaf}
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
@@ -55,7 +52,7 @@ class BioScholixTest extends AbstractVocabularyTest{
@Test
def testEBIData() = {
- val inputXML = Source.fromInputStream(getClass.getResourceAsStream("pubmed.xml")).mkString
+ val inputXML = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed.xml")).mkString
val xml = new XMLEventReader(Source.fromBytes(inputXML.getBytes()))
new PMParser(xml).foreach(s =>println(mapper.writeValueAsString(s)))
}
@@ -65,7 +62,7 @@ class BioScholixTest extends AbstractVocabularyTest{
def testPubmedToOaf(): Unit = {
assertNotNull(vocabularies)
assertTrue(vocabularies.vocabularyExists("dnet:publication_resource"))
- val records:String =Source.fromInputStream(getClass.getResourceAsStream("pubmed_dump")).mkString
+ val records:String =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump")).mkString
val r:List[Oaf] = records.lines.toList.map(s=>mapper.readValue(s, classOf[PMArticle])).map(a => PubMedToOaf.convert(a, vocabularies))
assertEquals(10, r.size)
assertTrue(r.map(p => p.asInstanceOf[Result]).flatMap(p => p.getInstance().asScala.map(i => i.getInstancetype.getClassid)).exists(p => "0037".equalsIgnoreCase(p)))
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/crossref_links b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/crossref_links
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/crossref_links
rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/crossref_links
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ebi_links.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ebi_links.gz
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ebi_links.gz
rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ebi_links.gz
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/ls_result b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ls_result
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/ls_result
rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ls_result
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pdb_dump b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pdb_dump
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pdb_dump
rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pdb_dump
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/pubmed.xml b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/pubmed.xml
rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed.xml
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/pubmed_dump b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/pubmed_dump
rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved
rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump
rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java
index 0d7c74475e..23e97a97a8 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java
@@ -69,7 +69,7 @@ public class PropagationConstant {
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_COUNTRY_INSTREPO_CLASS_ID,
PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME,
- ModelConstants.DNET_PROVENANCE_ACTIONS));
+ ModelConstants.DNET_PROVENANCE_ACTIONS));
return nc;
}
@@ -84,7 +84,8 @@ public class PropagationConstant {
return di;
}
- public static Qualifier getQualifier(String inference_class_id, String inference_class_name, String qualifierSchema) {
+ public static Qualifier getQualifier(String inference_class_id, String inference_class_name,
+ String qualifierSchema) {
Qualifier pa = new Qualifier();
pa.setClassid(inference_class_id);
pa.setClassname(inference_class_name);
@@ -108,7 +109,11 @@ public class PropagationConstant {
r.setRelClass(rel_class);
r.setRelType(rel_type);
r.setSubRelType(subrel_type);
- r.setDataInfo(getDataInfo(inference_provenance, inference_class_id, inference_class_name, ModelConstants.DNET_PROVENANCE_ACTIONS));
+ r
+ .setDataInfo(
+ getDataInfo(
+ inference_provenance, inference_class_id, inference_class_name,
+ ModelConstants.DNET_PROVENANCE_ACTIONS));
return r;
}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java
index 68949b9004..a38b4da2e8 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java
@@ -173,14 +173,17 @@ public class SparkOrcidToResultFromSemRelJob {
if (toaddpid) {
StructuredProperty p = new StructuredProperty();
p.setValue(autoritative_author.getOrcid());
- p.setQualifier(getQualifier(ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES));
+ p
+ .setQualifier(
+ getQualifier(
+ ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES));
p
.setDataInfo(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID,
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME,
- ModelConstants.DNET_PROVENANCE_ACTIONS));
+ ModelConstants.DNET_PROVENANCE_ACTIONS));
Optional> authorPid = Optional.ofNullable(author.getPid());
if (authorPid.isPresent()) {
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java
index 1289ff644f..50df08f8c8 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java
@@ -10,7 +10,6 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
-import eu.dnetlib.dhp.schema.common.ModelConstants;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@@ -22,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
@@ -130,7 +130,7 @@ public class SparkResultToCommunityFromOrganizationJob {
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME,
- ModelConstants.DNET_PROVENANCE_ACTIONS)));
+ ModelConstants.DNET_PROVENANCE_ACTIONS)));
propagatedContexts.add(newContext);
}
}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java
index 7f76ead94b..f31a262307 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java
@@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.*;
import java.util.stream.Collectors;
-import eu.dnetlib.dhp.schema.common.ModelConstants;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@@ -20,6 +19,7 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
@@ -126,7 +126,7 @@ public class SparkResultToCommunityThroughSemRelJob {
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
- ModelConstants.DNET_PROVENANCE_ACTIONS)));
+ ModelConstants.DNET_PROVENANCE_ACTIONS)));
return newContext;
}
return null;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala
index bf726cf595..79c75d6df7 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala
@@ -1,7 +1,6 @@
package eu.dnetlib.dhp.sx.graph.pangaea
import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.sx.graph.ebi.SparkEBILinksToOaf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java
index 1c7dce3f29..64935e79d0 100644
--- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java
+++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java
@@ -84,13 +84,15 @@ public class IndexRecordTransformerTest {
@Test
public void testForEOSCFutureTraining() throws IOException, TransformerException {
- final String record = IOUtils.toString(getClass().getResourceAsStream("eosc-future/training-notebooks-seadatanet.xml"));
+ final String record = IOUtils
+ .toString(getClass().getResourceAsStream("eosc-future/training-notebooks-seadatanet.xml"));
testRecordTransformation(record);
}
@Test
public void testForEOSCFutureAirQualityCopernicus() throws IOException, TransformerException {
- final String record = IOUtils.toString(getClass().getResourceAsStream("eosc-future/air-quality-copernicus.xml"));
+ final String record = IOUtils
+ .toString(getClass().getResourceAsStream("eosc-future/air-quality-copernicus.xml"));
testRecordTransformation(record);
}
@@ -102,12 +104,11 @@ public class IndexRecordTransformerTest {
@Test
public void testForEOSCFutureB2SharePlotRelatedORP() throws IOException, TransformerException {
- final String record = IOUtils.toString(getClass().getResourceAsStream("eosc-future/b2share-plot-related-orp.xml"));
+ final String record = IOUtils
+ .toString(getClass().getResourceAsStream("eosc-future/b2share-plot-related-orp.xml"));
testRecordTransformation(record);
}
-
-
private void testRecordTransformation(final String record) throws IOException, TransformerException {
final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml"));
final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"));