From 5606014b178bc6fd72b1121652de816b5be717ef Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 12 Oct 2021 08:11:53 +0200 Subject: [PATCH] code refactor see ticket #7065 --- .../eu/dnetllib/dhp/sx}/bio/BioDBToOAF.scala | 77 +++++++++--------- .../bio/SparkTransformBioDatabaseToOAF.scala | 15 ++-- .../ebi/SparkCreateBaselineDataFrame.scala | 62 +++++++------- .../sx/bio}/ebi/SparkDownloadEBILinks.scala | 39 ++++----- .../dhp/sx/bio}/ebi/SparkEBILinksToOaf.scala | 21 +++-- .../dhp/sx}/bio/pubmed/PMArticle.java | 2 +- .../dnetllib/dhp/sx}/bio/pubmed/PMAuthor.java | 2 +- .../dnetllib/dhp/sx}/bio/pubmed/PMGrant.java | 2 +- .../dhp/sx}/bio/pubmed/PMJournal.java | 2 +- .../dhp/sx}/bio/pubmed/PMParser.scala | 2 +- .../dhp/sx}/bio/pubmed/PMSubject.java | 2 +- .../dhp/sx}/bio/pubmed/PubMedToOaf.scala | 23 +++--- .../sx/bio}/ebi/baseline_to_oaf_params.json | 0 .../dhp/sx/bio}/ebi/ebi_download_update.json | 0 .../dhp/sx/bio}/ebi/ebi_to_df_params.json | 0 .../bio}/pubmed/oozie_app/config-default.xml | 0 .../dhp/sx/bio}/pubmed/oozie_app/workflow.xml | 13 ++- .../dnetllib/dhp/sx/bio}/BioScholixTest.scala | 15 ++-- .../dnetlib/dhp/sx/graph/bio/crossref_links | 0 .../eu/dnetlib/dhp/sx/graph/bio/ebi_links.gz | Bin .../eu/dnetlib/dhp/sx/graph/bio}/ls_result | 0 .../eu/dnetlib/dhp/sx/graph/bio/pdb_dump | 0 .../eu/dnetlib/dhp/sx/graph/bio}/pubmed.xml | 0 .../eu/dnetlib/dhp/sx/graph/bio}/pubmed_dump | 0 .../dnetlib/dhp/sx/graph/bio/scholix_resolved | 0 .../eu/dnetlib/dhp/sx/graph/bio/uniprot_dump | 0 .../eu/dnetlib/dhp/PropagationConstant.java | 11 ++- .../SparkOrcidToResultFromSemRelJob.java | 7 +- ...kResultToCommunityFromOrganizationJob.java | 4 +- ...parkResultToCommunityThroughSemRelJob.java | 4 +- .../SparkGeneratePanagaeaDataset.scala | 1 - .../provision/IndexRecordTransformerTest.java | 11 +-- 32 files changed, 155 insertions(+), 160 deletions(-) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx}/bio/BioDBToOAF.scala (83%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx}/bio/SparkTransformBioDatabaseToOAF.scala (86%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio}/ebi/SparkCreateBaselineDataFrame.scala (76%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio}/ebi/SparkDownloadEBILinks.scala (70%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio}/ebi/SparkEBILinksToOaf.scala (67%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx}/bio/pubmed/PMArticle.java (97%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx}/bio/pubmed/PMAuthor.java (92%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx}/bio/pubmed/PMGrant.java (93%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx}/bio/pubmed/PMJournal.java (94%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx}/bio/pubmed/PMParser.scala (99%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx}/bio/pubmed/PMSubject.java (94%) rename dhp-workflows/{dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx}/bio/pubmed/PubMedToOaf.scala (93%) rename dhp-workflows/{dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio}/ebi/baseline_to_oaf_params.json (100%) rename dhp-workflows/{dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio}/ebi/ebi_download_update.json (100%) rename dhp-workflows/{dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio}/ebi/ebi_to_df_params.json (100%) rename dhp-workflows/{dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio}/pubmed/oozie_app/config-default.xml (100%) rename dhp-workflows/{dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio}/pubmed/oozie_app/workflow.xml (78%) rename dhp-workflows/{dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/bio/pubmed => dhp-aggregation/src/test/java/eu/dnetllib/dhp/sx/bio}/BioScholixTest.scala (92%) rename dhp-workflows/{dhp-graph-mapper => dhp-aggregation}/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/crossref_links (100%) rename dhp-workflows/{dhp-graph-mapper => dhp-aggregation}/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ebi_links.gz (100%) rename dhp-workflows/{dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed => dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio}/ls_result (100%) rename dhp-workflows/{dhp-graph-mapper => dhp-aggregation}/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pdb_dump (100%) rename dhp-workflows/{dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed => dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio}/pubmed.xml (100%) rename dhp-workflows/{dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed => dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio}/pubmed_dump (100%) rename dhp-workflows/{dhp-graph-mapper => dhp-aggregation}/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved (100%) rename dhp-workflows/{dhp-graph-mapper => dhp-aggregation}/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump (100%) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/BioDBToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/BioDBToOAF.scala similarity index 83% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/BioDBToOAF.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/BioDBToOAF.scala index 90b65c8f7..dffc88c6c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/BioDBToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/BioDBToOAF.scala @@ -1,14 +1,12 @@ -package eu.dnetlib.dhp.sx.graph.bio +package eu.dnetllib.dhp.sx.bio import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, OafMapperUtils} -import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, Instance, KeyValue, Oaf, Relation, StructuredProperty} +import eu.dnetlib.dhp.schema.oaf._ import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.{compact, parse, render} - -import scala.collection.JavaConverters._ - +import collection.JavaConverters._ object BioDBToOAF { case class EBILinkItem(id: Long, links: String) {} @@ -17,23 +15,23 @@ object BioDBToOAF { case class UniprotDate(date: String, date_info: String) {} - case class ScholixResolved(pid:String, pidType:String, typology:String, tilte:List[String], datasource:List[String], date:List[String], authors:List[String]){} + case class ScholixResolved(pid: String, pidType: String, typology: String, tilte: List[String], datasource: List[String], date: List[String], authors: List[String]) {} val DATA_INFO: DataInfo = OafMapperUtils.dataInfo(false, null, false, false, ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, "0.9") val SUBJ_CLASS = "Keywords" val DATE_RELATION_KEY = "RelationDate" - val resolvedURL:Map[String,String] = Map( - "genbank"-> "https://www.ncbi.nlm.nih.gov/nuccore/", - "ncbi-n" -> "https://www.ncbi.nlm.nih.gov/nuccore/", - "ncbi-wgs" -> "https://www.ncbi.nlm.nih.gov/nuccore/", - "ncbi-p" -> "https://www.ncbi.nlm.nih.gov/protein/", - "ena" -> "https://www.ebi.ac.uk/ena/browser/view/", - "clinicaltrials.gov"-> "https://clinicaltrials.gov/ct2/show/", - "onim"-> "https://omim.org/entry/", - "refseq"-> "https://www.ncbi.nlm.nih.gov/nuccore/", - "geo"-> "https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc=" + val resolvedURL: Map[String, String] = Map( + "genbank" -> "https://www.ncbi.nlm.nih.gov/nuccore/", + "ncbi-n" -> "https://www.ncbi.nlm.nih.gov/nuccore/", + "ncbi-wgs" -> "https://www.ncbi.nlm.nih.gov/nuccore/", + "ncbi-p" -> "https://www.ncbi.nlm.nih.gov/protein/", + "ena" -> "https://www.ebi.ac.uk/ena/browser/view/", + "clinicaltrials.gov" -> "https://clinicaltrials.gov/ct2/show/", + "onim" -> "https://omim.org/entry/", + "refseq" -> "https://www.ncbi.nlm.nih.gov/nuccore/", + "geo" -> "https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc=" ) @@ -45,7 +43,7 @@ object BioDBToOAF { val ElsevierCollectedFrom: KeyValue = OafMapperUtils.keyValue("10|openaire____::8f87e10869299a5fe80b315695296b88", "Elsevier") val springerNatureCollectedFrom: KeyValue = OafMapperUtils.keyValue("10|openaire____::6e380d9cf51138baec8480f5a0ce3a2e", "Springer Nature") val EBICollectedFrom: KeyValue = OafMapperUtils.keyValue("10|opendoar____::83e60e09c222f206c725385f53d7e567c", "EMBL-EBIs Protein Data Bank in Europe (PDBe)") - val pubmedCollectedFrom:KeyValue = OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central") + val pubmedCollectedFrom: KeyValue = OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central") UNIPROTCollectedFrom.setDataInfo(DATA_INFO) PDBCollectedFrom.setDataInfo(DATA_INFO) @@ -58,9 +56,9 @@ object BioDBToOAF { Map( "uniprot" -> UNIPROTCollectedFrom, - "pdb"-> PDBCollectedFrom, - "elsevier" ->ElsevierCollectedFrom, - "ebi" ->EBICollectedFrom, + "pdb" -> PDBCollectedFrom, + "elsevier" -> ElsevierCollectedFrom, + "ebi" -> EBICollectedFrom, "Springer Nature" -> springerNatureCollectedFrom, "NCBI Nucleotide" -> ncbiCollectedFrom, "European Nucleotide Archive" -> enaCollectedFrom, @@ -68,7 +66,7 @@ object BioDBToOAF { ) } - def crossrefLinksToOaf(input:String):Oaf = { + def crossrefLinksToOaf(input: String): Oaf = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json = parse(input) val source_pid = (json \ "Source" \ "Identifier" \ "ID").extract[String].toLowerCase @@ -77,16 +75,16 @@ object BioDBToOAF { val target_pid = (json \ "Target" \ "Identifier" \ "ID").extract[String].toLowerCase val target_pid_type = (json \ "Target" \ "Identifier" \ "IDScheme").extract[String].toLowerCase - val relation_semantic= (json \ "RelationshipType" \ "Name").extract[String] + val relation_semantic = (json \ "RelationshipType" \ "Name").extract[String] val date = GraphCleaningFunctions.cleanDate((json \ "LinkedPublicationDate").extract[String]) - createRelation(target_pid, target_pid_type, generate_unresolved_id(source_pid, source_pid_type),collectedFromMap("elsevier"),"relationship", relation_semantic, date) + createRelation(target_pid, target_pid_type, generate_unresolved_id(source_pid, source_pid_type), collectedFromMap("elsevier"), "relationship", relation_semantic, date) } - def scholixResolvedToOAF(input:ScholixResolved):Oaf = { + def scholixResolvedToOAF(input: ScholixResolved): Oaf = { val d = new Dataset @@ -127,18 +125,18 @@ object BioDBToOAF { d.setInstance(List(i).asJava) if (input.authors != null && input.authors.nonEmpty) { - val authors = input.authors.map(a =>{ + val authors = input.authors.map(a => { val authorOAF = new Author authorOAF.setFullname(a) authorOAF }) d.setAuthor(authors.asJava) } - if (input.date!= null && input.date.nonEmpty) { - val dt = input.date.head - i.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO)) - d.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO)) - } + if (input.date != null && input.date.nonEmpty) { + val dt = input.date.head + i.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO)) + d.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(dt), DATA_INFO)) + } d } @@ -190,7 +188,7 @@ object BioDBToOAF { OafMapperUtils.structuredProperty(s, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, null) ).asJava) } - var i_date:Option[UniprotDate] = None + var i_date: Option[UniprotDate] = None if (dates.nonEmpty) { i_date = dates.find(d => d.date_info.contains("entry version")) @@ -218,12 +216,12 @@ object BioDBToOAF { if (references_pmid != null && references_pmid.nonEmpty) { - val rel = createRelation(references_pmid.head, "pmid", d.getId, collectedFromMap("uniprot"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, if (i_date.isDefined) i_date.get.date else null) + val rel = createRelation(references_pmid.head, "pmid", d.getId, collectedFromMap("uniprot"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, if (i_date.isDefined) i_date.get.date else null) rel.getCollectedfrom List(d, rel) } else if (references_doi != null && references_doi.nonEmpty) { - val rel = createRelation(references_doi.head, "doi", d.getId, collectedFromMap("uniprot"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, if (i_date.isDefined) i_date.get.date else null) + val rel = createRelation(references_doi.head, "doi", d.getId, collectedFromMap("uniprot"), ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, if (i_date.isDefined) i_date.get.date else null) List(d, rel) } else @@ -231,13 +229,12 @@ object BioDBToOAF { } - - def generate_unresolved_id(pid:String, pidType:String) :String = { + def generate_unresolved_id(pid: String, pidType: String): String = { s"unresolved::$pid::$pidType" } - def createRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, subRelType:String, relClass:String, date:String):Relation = { + def createRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, subRelType: String, relClass: String, date: String): Relation = { val rel = new Relation rel.setCollectedfrom(List(collectedFromMap("pdb")).asJava) @@ -251,7 +248,7 @@ object BioDBToOAF { rel.setTarget(s"unresolved::$pid::$pidType") - val dateProps:KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date) + val dateProps: KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date) rel.setProperties(List(dateProps).asJava) @@ -262,8 +259,8 @@ object BioDBToOAF { } - def createSupplementaryRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, date:String): Relation = { - createRelation(pid,pidType,sourceId,collectedFrom, ModelConstants.SUPPLEMENT, ModelConstants.IS_SUPPLEMENT_TO, date) + def createSupplementaryRelation(pid: String, pidType: String, sourceId: String, collectedFrom: KeyValue, date: String): Relation = { + createRelation(pid, pidType, sourceId, collectedFrom, ModelConstants.SUPPLEMENT, ModelConstants.IS_SUPPLEMENT_TO, date) } @@ -338,7 +335,7 @@ object BioDBToOAF { def EBITargetLinksFilter(input: EBILinks): Boolean = { - input.targetPidType.equalsIgnoreCase("ena") || input.targetPidType.equalsIgnoreCase("pdb") || input.targetPidType.equalsIgnoreCase("uniprot") + input.targetPidType.equalsIgnoreCase("ena") || input.targetPidType.equalsIgnoreCase("pdb") || input.targetPidType.equalsIgnoreCase("uniprot") } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala similarity index 86% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/SparkTransformBioDatabaseToOAF.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala index d66cc84ec..16d2b25a6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/SparkTransformBioDatabaseToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala @@ -1,8 +1,8 @@ -package eu.dnetlib.dhp.sx.graph.bio +package eu.dnetllib.dhp.sx.bio import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} -import BioDBToOAF.ScholixResolved +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetllib.dhp.sx.bio.BioDBToOAF.ScholixResolved import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -31,17 +31,16 @@ object SparkTransformBioDatabaseToOAF { .master(parser.get("master")).getOrCreate() val sc = spark.sparkContext - implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) - import spark.implicits._ - + implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + import spark.implicits._ database.toUpperCase() match { case "UNIPROT" => spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).write.mode(SaveMode.Overwrite).save(targetPath) - case "PDB"=> + case "PDB" => spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath) case "SCHOLIX" => spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).write.mode(SaveMode.Overwrite).save(targetPath) - case "CROSSREF_LINKS"=> + case "CROSSREF_LINKS" => spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala similarity index 76% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkCreateBaselineDataFrame.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index ee76cf8ad..97b3cdc99 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -1,10 +1,10 @@ -package eu.dnetlib.dhp.sx.graph.ebi +package eu.dnetllib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.oaf.Result -import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf} import eu.dnetlib.dhp.utils.ISLookupClientFactory +import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf} import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path} @@ -24,24 +24,24 @@ import scala.xml.pull.XMLEventReader object SparkCreateBaselineDataFrame { - def requestBaseLineUpdatePage(maxFile:String):List[(String,String)] = { - val data =requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/") + def requestBaseLineUpdatePage(maxFile: String): List[(String, String)] = { + val data = requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/") - val result =data.lines.filter(l => l.startsWith("") val start = l.indexOf("= 0 && end >start) - l.substring(start+9, (end-start)) + if (start >= 0 && end > start) + l.substring(start + 9, (end - start)) else "" - }.filter(s =>s.endsWith(".gz") ).filter(s => s > maxFile).map(s => (s,s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s")).toList + }.filter(s => s.endsWith(".gz")).filter(s => s > maxFile).map(s => (s, s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s")).toList result } - def downloadBaselinePart(url:String):InputStream = { + def downloadBaselinePart(url: String): InputStream = { val r = new HttpGet(url) val timeout = 60; // seconds val config = RequestConfig.custom() @@ -55,7 +55,7 @@ object SparkCreateBaselineDataFrame { } - def requestPage(url:String):String = { + def requestPage(url: String): String = { val r = new HttpGet(url) val timeout = 60; // seconds val config = RequestConfig.custom() @@ -90,25 +90,21 @@ object SparkCreateBaselineDataFrame { } - - - - - def downloadBaseLineUpdate(baselinePath:String, hdfsServerUri:String ):Unit = { + def downloadBaseLineUpdate(baselinePath: String, hdfsServerUri: String): Unit = { val conf = new Configuration conf.set("fs.defaultFS", hdfsServerUri) - val fs = FileSystem.get(conf) + val fs = FileSystem.get(conf) val p = new Path(baselinePath) - val files = fs.listFiles(p,false) + val files = fs.listFiles(p, false) var max_file = "" while (files.hasNext) { val c = files.next() val data = c.getPath.toString - val fileName = data.substring(data.lastIndexOf("/")+1) + val fileName = data.substring(data.lastIndexOf("/") + 1) - if (fileName> max_file) + if (fileName > max_file) max_file = fileName } @@ -119,7 +115,7 @@ object SparkCreateBaselineDataFrame { val fsDataOutputStream: FSDataOutputStream = fs.create(hdfsWritePath, true) val i = downloadBaselinePart(u._2) val buffer = Array.fill[Byte](1024)(0) - while(i.read(buffer)>0) { + while (i.read(buffer) > 0) { fsDataOutputStream.write(buffer) } i.close() @@ -134,11 +130,11 @@ object SparkCreateBaselineDataFrame { override def zero: PMArticle = new PMArticle override def reduce(b: PMArticle, a: (String, PMArticle)): PMArticle = { - if (b != null && b.getPmid!= null) b else a._2 + if (b != null && b.getPmid != null) b else a._2 } override def merge(b1: PMArticle, b2: PMArticle): PMArticle = { - if (b1 != null && b1.getPmid!= null) b1 else b2 + if (b1 != null && b1.getPmid != null) b1 else b2 } @@ -153,7 +149,7 @@ object SparkCreateBaselineDataFrame { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() val log: Logger = LoggerFactory.getLogger(getClass) - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/baseline_to_oaf_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json"))) parser.parseArgument(args) val isLookupUrl: String = parser.get("isLookupUrl") log.info("isLookupUrl: {}", isLookupUrl) @@ -175,32 +171,32 @@ object SparkCreateBaselineDataFrame { .config(conf) .appName(SparkEBILinksToOaf.getClass.getSimpleName) .master(parser.get("master")).getOrCreate() - import spark.implicits._ val sc = spark.sparkContext + import spark.implicits._ - implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle]) - implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal]) - implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) - implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle]) + implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal]) + implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) + implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri) - val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000) - val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{ + val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline_ftp", 2000) + val ds: Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i => { val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes())) new PMParser(xml) - } )) + })) - ds.map(p => (p.getPmid,p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1) + ds.map(p => (p.getPmid, p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1) .agg(pmArticleAggregator.toColumn) .map(p => p._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset") val exported_dataset = spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle] exported_dataset .map(a => PubMedToOaf.convert(a, vocabularies)).as[Result] - .filter(p => p!= null) + .filter(p => p != null) .write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkDownloadEBILinks.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala similarity index 70% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkDownloadEBILinks.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala index e940fdff0..578db1ea9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkDownloadEBILinks.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala @@ -1,8 +1,8 @@ -package eu.dnetlib.dhp.sx.graph.ebi +package eu.dnetllib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.EBILinkItem -import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal} +import eu.dnetllib.dhp.sx.bio.BioDBToOAF.EBILinkItem +import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal} import org.apache.commons.io.IOUtils import org.apache.http.client.config.RequestConfig import org.apache.http.client.methods.HttpGet @@ -14,15 +14,15 @@ import org.slf4j.{Logger, LoggerFactory} object SparkDownloadEBILinks { - def createEBILinks(pmid:Long):EBILinkItem = { + def createEBILinks(pmid: Long): EBILinkItem = { val res = requestLinks(pmid) - if (res!=null) + if (res != null) return EBILinkItem(pmid, res) null } - def requestPage(url:String):String = { + def requestPage(url: String): String = { val r = new HttpGet(url) val timeout = 60; // seconds val config = RequestConfig.custom() @@ -56,10 +56,11 @@ object SparkDownloadEBILinks { } } - def requestLinks(PMID:Long):String = { + def requestLinks(PMID: Long): String = { requestPage(s"https://www.ebi.ac.uk/europepmc/webservices/rest/MED/$PMID/datalinks?format=json") } + def main(args: Array[String]): Unit = { val log: Logger = LoggerFactory.getLogger(getClass) @@ -76,9 +77,9 @@ object SparkDownloadEBILinks { import spark.implicits._ - implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle]) - implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal]) - implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) + implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle]) + implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal]) + implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) val sourcePath = parser.get("sourcePath") log.info(s"sourcePath -> $sourcePath") @@ -86,29 +87,29 @@ object SparkDownloadEBILinks { log.info(s"workingPath -> $workingPath") log.info("Getting max pubmedId where the links have been requested") - val links:Dataset[EBILinkItem] = spark.read.load(s"$sourcePath/ebi_links_dataset").as[EBILinkItem] - val lastPMIDRequested =links.map(l => l.id).select(max("value")).first.getLong(0) + val links: Dataset[EBILinkItem] = spark.read.load(s"$sourcePath/ebi_links_dataset").as[EBILinkItem] + val lastPMIDRequested = links.map(l => l.id).select(max("value")).first.getLong(0) log.info("Retrieving PMID to request links") val pubmed = spark.read.load(s"$sourcePath/baseline_dataset").as[PMArticle] pubmed.map(p => p.getPmid.toLong).where(s"value > $lastPMIDRequested").write.mode(SaveMode.Overwrite).save(s"$workingPath/id_to_request") - val pmidToReq:Dataset[Long] = spark.read.load(s"$workingPath/id_to_request").as[Long] + val pmidToReq: Dataset[Long] = spark.read.load(s"$workingPath/id_to_request").as[Long] val total = pmidToReq.count() - spark.createDataset(pmidToReq.rdd.repartition((total/MAX_ITEM_PER_PARTITION).toInt).map(pmid =>createEBILinks(pmid)).filter(l => l!= null)).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_update") + spark.createDataset(pmidToReq.rdd.repartition((total / MAX_ITEM_PER_PARTITION).toInt).map(pmid => createEBILinks(pmid)).filter(l => l != null)).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_update") - val updates:Dataset[EBILinkItem] =spark.read.load(s"$workingPath/links_update").as[EBILinkItem] + val updates: Dataset[EBILinkItem] = spark.read.load(s"$workingPath/links_update").as[EBILinkItem] links.union(updates).groupByKey(_.id) - .reduceGroups{(x,y) => - if (x == null || x.links ==null) + .reduceGroups { (x, y) => + if (x == null || x.links == null) y - if (y ==null || y.links ==null) + if (y == null || y.links == null) x if (x.links.length > y.links.length) - x + x else y }.map(_._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_final") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala similarity index 67% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala index 1924d919e..0db469769 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala @@ -1,15 +1,14 @@ -package eu.dnetlib.dhp.sx.graph.ebi +package eu.dnetllib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.Oaf -import eu.dnetlib.dhp.sx.graph.bio -import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF -import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.EBILinkItem +import eu.dnetllib.dhp.sx.bio.BioDBToOAF +import eu.dnetllib.dhp.sx.bio.BioDBToOAF.EBILinkItem import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} + object SparkEBILinksToOaf { def main(args: Array[String]): Unit = { @@ -24,17 +23,17 @@ object SparkEBILinksToOaf { .appName(SparkEBILinksToOaf.getClass.getSimpleName) .master(parser.get("master")).getOrCreate() + + import spark.implicits._ val sourcePath = parser.get("sourcePath") log.info(s"sourcePath -> $sourcePath") val targetPath = parser.get("targetPath") log.info(s"targetPath -> $targetPath") + implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) - import spark.implicits._ - implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + val ebLinks: Dataset[EBILinkItem] = spark.read.load(s"${sourcePath}_dataset").as[EBILinkItem].filter(l => l.links != null) - val ebLinks:Dataset[EBILinkItem] = spark.read.load(s"${sourcePath}_dataset").as[EBILinkItem].filter(l => l.links!= null) - - ebLinks.flatMap(j =>BioDBToOAF.parse_ebi_links(j.links)) + ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)) .write.mode(SaveMode.Overwrite).save(targetPath) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMArticle.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMArticle.java similarity index 97% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMArticle.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMArticle.java index 211cbcffb..305bb89be 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMArticle.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMArticle.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.sx.graph.bio.pubmed; +package eu.dnetllib.dhp.sx.bio.pubmed; import java.io.Serializable; import java.util.ArrayList; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMAuthor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMAuthor.java similarity index 92% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMAuthor.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMAuthor.java index ba69998c5..c89929981 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMAuthor.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMAuthor.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.sx.graph.bio.pubmed; +package eu.dnetllib.dhp.sx.bio.pubmed; import java.io.Serializable; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMGrant.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMGrant.java similarity index 93% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMGrant.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMGrant.java index 0c3fd4601..7df5dd5f2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMGrant.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMGrant.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.sx.graph.bio.pubmed; +package eu.dnetllib.dhp.sx.bio.pubmed; public class PMGrant { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMJournal.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMJournal.java similarity index 94% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMJournal.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMJournal.java index d251354d4..6065416f8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMJournal.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMJournal.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.sx.graph.bio.pubmed; +package eu.dnetllib.dhp.sx.bio.pubmed; import java.io.Serializable; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMParser.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMParser.scala similarity index 99% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMParser.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMParser.scala index 8744bdfb4..8fa226b7d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMParser.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMParser.scala @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.sx.graph.bio.pubmed +package eu.dnetllib.dhp.sx.bio.pubmed import scala.xml.MetaData import scala.xml.pull.{EvElemEnd, EvElemStart, EvText, XMLEventReader} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMSubject.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMSubject.java similarity index 94% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMSubject.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMSubject.java index 354b2cbe5..e6ab61b87 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PMSubject.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMSubject.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.sx.graph.bio.pubmed; +package eu.dnetllib.dhp.sx.bio.pubmed; public class PMSubject { private String value; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PubMedToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PubMedToOaf.scala similarity index 93% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PubMedToOaf.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PubMedToOaf.scala index 202eb7b14..a1777a230 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PubMedToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PubMedToOaf.scala @@ -1,11 +1,12 @@ -package eu.dnetlib.dhp.sx.graph.bio.pubmed +package eu.dnetllib.dhp.sx.bio.pubmed + import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType} +import eu.dnetlib.dhp.schema.oaf._ +import scala.collection.JavaConverters._ import java.util.regex.Pattern -import scala.collection.JavaConverters._ object PubMedToOaf { @@ -15,7 +16,7 @@ object PubMedToOaf { "doi" -> "https://dx.doi.org/" ) - def cleanDoi(doi:String):String = { + def cleanDoi(doi: String): String = { val regex = "^10.\\d{4,9}\\/[\\[\\]\\-\\<\\>._;()\\/:A-Z0-9]+$" @@ -71,14 +72,14 @@ object PubMedToOaf { if (article.getPublicationTypes == null) return null val i = new Instance - var pidList: List[StructuredProperty] = List(OafMapperUtils.structuredProperty(article.getPmid, PidType.pmid.toString, PidType.pmid.toString, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo)) + val pidList: List[StructuredProperty] = List(OafMapperUtils.structuredProperty(article.getPmid, PidType.pmid.toString, PidType.pmid.toString, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo)) if (pidList == null) return null - var alternateIdentifier :StructuredProperty = null + var alternateIdentifier: StructuredProperty = null if (article.getDoi != null) { val normalizedPid = cleanDoi(article.getDoi) - if (normalizedPid!= null) + if (normalizedPid != null) alternateIdentifier = OafMapperUtils.structuredProperty(normalizedPid, PidType.doi.toString, PidType.doi.toString, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, dataInfo) } @@ -102,10 +103,10 @@ object PubMedToOaf { return result result.setDataInfo(dataInfo) i.setPid(pidList.asJava) - if (alternateIdentifier!= null) + if (alternateIdentifier != null) i.setAlternateIdentifier(List(alternateIdentifier).asJava) result.setInstance(List(i).asJava) - i.getPid.asScala.filter(p => "pmid".equalsIgnoreCase(p.getQualifier.getClassid)).map(p => p.getValue)(collection breakOut) + i.getPid.asScala.filter(p => "pmid".equalsIgnoreCase(p.getQualifier.getClassid)).map(p => p.getValue)(collection.breakOut) val urlLists: List[String] = pidList .map(s => (urlMap.getOrElse(s.getQualifier.getClassid, ""), s.getValue)) .filter(t => t._1.nonEmpty) @@ -136,7 +137,7 @@ object PubMedToOaf { } - val subjects: List[StructuredProperty] = article.getSubjects.asScala.map(s => OafMapperUtils.structuredProperty(s.getValue, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, dataInfo))(collection breakOut) + val subjects: List[StructuredProperty] = article.getSubjects.asScala.map(s => OafMapperUtils.structuredProperty(s.getValue, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, dataInfo))(collection.breakOut) if (subjects != null) result.setSubject(subjects.asJava) @@ -148,7 +149,7 @@ object PubMedToOaf { author.setFullname(a.getFullName) author.setRank(index + 1) author - }(collection breakOut) + }(collection.breakOut) if (authors != null && authors.nonEmpty) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/baseline_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/baseline_to_oaf_params.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_download_update.json similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_download_update.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/pubmed/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/pubmed/oozie_app/config-default.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml similarity index 78% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/pubmed/oozie_app/workflow.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml index 914d1c2c7..f5a98ba5e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/pubmed/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml @@ -1,13 +1,9 @@ - + baselineWorkingPath the Baseline Working Path - - targetPath - the Target Path - isLookupUrl The IS lookUp service endopoint @@ -24,8 +20,8 @@ yarn cluster - Convert Baseline to Dataset - eu.dnetlib.dhp.sx.graph.ebi.SparkCreateBaselineDataFrame + Convert Baseline to OAF Dataset + eu.dnetllib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame dhp-graph-mapper-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -38,9 +34,10 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --workingPath${baselineWorkingPath} - --targetPath${targetPath} + --targetPath${baselineWorkingPath}/transformed --masteryarn --isLookupUrl${isLookupUrl} + --hdfsServerUri${nameNode} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/BioScholixTest.scala b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetllib/dhp/sx/bio/BioScholixTest.scala similarity index 92% rename from dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/BioScholixTest.scala rename to dhp-workflows/dhp-aggregation/src/test/java/eu/dnetllib/dhp/sx/bio/BioScholixTest.scala index 87279eb21..c072f149c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/BioScholixTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetllib/dhp/sx/bio/BioScholixTest.scala @@ -1,13 +1,10 @@ -package eu.dnetlib.dhp.sx.graph.bio.pubmed +package eu.dnetllib.dhp.sx.bio import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature} -import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf.utils.{CleaningFunctions, OafMapperUtils, PidType} +import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result} -import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.ScholixResolved -import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF -import eu.dnetlib.dhp.sx.graph.bio.pubmed.PubMedToOaf.dataInfo -import eu.dnetlib.dhp.sx.graph.ebi.SparkDownloadEBILinks +import eu.dnetllib.dhp.sx.bio.BioDBToOAF.ScholixResolved +import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMParser, PubMedToOaf} import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.parse @@ -55,7 +52,7 @@ class BioScholixTest extends AbstractVocabularyTest{ @Test def testEBIData() = { - val inputXML = Source.fromInputStream(getClass.getResourceAsStream("pubmed.xml")).mkString + val inputXML = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed.xml")).mkString val xml = new XMLEventReader(Source.fromBytes(inputXML.getBytes())) new PMParser(xml).foreach(s =>println(mapper.writeValueAsString(s))) } @@ -65,7 +62,7 @@ class BioScholixTest extends AbstractVocabularyTest{ def testPubmedToOaf(): Unit = { assertNotNull(vocabularies) assertTrue(vocabularies.vocabularyExists("dnet:publication_resource")) - val records:String =Source.fromInputStream(getClass.getResourceAsStream("pubmed_dump")).mkString + val records:String =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump")).mkString val r:List[Oaf] = records.lines.toList.map(s=>mapper.readValue(s, classOf[PMArticle])).map(a => PubMedToOaf.convert(a, vocabularies)) assertEquals(10, r.size) assertTrue(r.map(p => p.asInstanceOf[Result]).flatMap(p => p.getInstance().asScala.map(i => i.getInstancetype.getClassid)).exists(p => "0037".equalsIgnoreCase(p))) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/crossref_links b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/crossref_links similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/crossref_links rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/crossref_links diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ebi_links.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ebi_links.gz similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ebi_links.gz rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ebi_links.gz diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/ls_result b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ls_result similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/ls_result rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/ls_result diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pdb_dump b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pdb_dump similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pdb_dump rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pdb_dump diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/pubmed.xml b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/pubmed.xml rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/pubmed_dump b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed/pubmed_dump rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index 0d7c74475..23e97a97a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -69,7 +69,7 @@ public class PropagationConstant { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_COUNTRY_INSTREPO_CLASS_ID, PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS)); + ModelConstants.DNET_PROVENANCE_ACTIONS)); return nc; } @@ -84,7 +84,8 @@ public class PropagationConstant { return di; } - public static Qualifier getQualifier(String inference_class_id, String inference_class_name, String qualifierSchema) { + public static Qualifier getQualifier(String inference_class_id, String inference_class_name, + String qualifierSchema) { Qualifier pa = new Qualifier(); pa.setClassid(inference_class_id); pa.setClassname(inference_class_name); @@ -108,7 +109,11 @@ public class PropagationConstant { r.setRelClass(rel_class); r.setRelType(rel_type); r.setSubRelType(subrel_type); - r.setDataInfo(getDataInfo(inference_provenance, inference_class_id, inference_class_name, ModelConstants.DNET_PROVENANCE_ACTIONS)); + r + .setDataInfo( + getDataInfo( + inference_provenance, inference_class_id, inference_class_name, + ModelConstants.DNET_PROVENANCE_ACTIONS)); return r; } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java index 68949b900..a38b4da2e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java @@ -173,14 +173,17 @@ public class SparkOrcidToResultFromSemRelJob { if (toaddpid) { StructuredProperty p = new StructuredProperty(); p.setValue(autoritative_author.getOrcid()); - p.setQualifier(getQualifier(ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES)); + p + .setQualifier( + getQualifier( + ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES)); p .setDataInfo( getDataInfo( PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS)); + ModelConstants.DNET_PROVENANCE_ACTIONS)); Optional> authorPid = Optional.ofNullable(author.getPid()); if (authorPid.isPresent()) { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index 1289ff644..50df08f8c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -10,7 +10,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -22,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Result; import scala.Tuple2; @@ -130,7 +130,7 @@ public class SparkResultToCommunityFromOrganizationJob { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS))); + ModelConstants.DNET_PROVENANCE_ACTIONS))); propagatedContexts.add(newContext); } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index 7f76ead94..f31a26230 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -20,6 +19,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -126,7 +126,7 @@ public class SparkResultToCommunityThroughSemRelJob { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS))); + ModelConstants.DNET_PROVENANCE_ACTIONS))); return newContext; } return null; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala index bf726cf59..79c75d6df 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala @@ -1,7 +1,6 @@ package eu.dnetlib.dhp.sx.graph.pangaea import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.sx.graph.ebi.SparkEBILinksToOaf import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java index 1c7dce3f2..64935e79d 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java @@ -84,13 +84,15 @@ public class IndexRecordTransformerTest { @Test public void testForEOSCFutureTraining() throws IOException, TransformerException { - final String record = IOUtils.toString(getClass().getResourceAsStream("eosc-future/training-notebooks-seadatanet.xml")); + final String record = IOUtils + .toString(getClass().getResourceAsStream("eosc-future/training-notebooks-seadatanet.xml")); testRecordTransformation(record); } @Test public void testForEOSCFutureAirQualityCopernicus() throws IOException, TransformerException { - final String record = IOUtils.toString(getClass().getResourceAsStream("eosc-future/air-quality-copernicus.xml")); + final String record = IOUtils + .toString(getClass().getResourceAsStream("eosc-future/air-quality-copernicus.xml")); testRecordTransformation(record); } @@ -102,12 +104,11 @@ public class IndexRecordTransformerTest { @Test public void testForEOSCFutureB2SharePlotRelatedORP() throws IOException, TransformerException { - final String record = IOUtils.toString(getClass().getResourceAsStream("eosc-future/b2share-plot-related-orp.xml")); + final String record = IOUtils + .toString(getClass().getResourceAsStream("eosc-future/b2share-plot-related-orp.xml")); testRecordTransformation(record); } - - private void testRecordTransformation(final String record) throws IOException, TransformerException { final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml")); final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"));