From 3010a362bc1142520d083dba09317e1296dc5f77 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 30 Jul 2020 09:25:56 +0200 Subject: [PATCH] updated changing in the workflow of provision in the phase of aggregation. Removed serialization in JSON RDD and used spark Dataset --- .../eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala | 105 ++++++++++++++++ .../dhp/sx/ebi/SparkAddLinkUpdates.scala | 112 +++++++++++++++++- .../dhp/sx/ebi/SparkCreateEBIDataFrame.scala | 63 +++++----- .../eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java | 3 +- .../sx/graph/SparkSplitOafTODLIEntities.scala | 91 ++++++++++++++ .../dhp/sx/graph/SparkXMLToOAFDataset.scala | 68 +++++++++++ .../parser/DatasetScholexplorerParser.java | 9 ++ .../dhp/sx/ebi/oozie_app/config-default.xml | 36 +++--- .../dnetlib/dhp/sx/ebi/oozie_app/workflow.xml | 44 +++---- .../input_extract_entities_parameters.json | 7 +- .../dhp/sx/graph/step1/oozie_app/workflow.xml | 13 +- .../dhp/sx/graph/step2/oozie_app/workflow.xml | 44 ++----- 12 files changed, 476 insertions(+), 119 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala index 41fcd26362..be92b60ebe 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.sx.ebi import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown} import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.expressions.Aggregator @@ -35,6 +36,88 @@ object EBIAggregator { } + + def getDLIUnknownAggregator(): Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown] = new Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown]{ + + override def zero: DLIUnknown = new DLIUnknown() + + override def reduce(b: DLIUnknown, a: (String, DLIUnknown)): DLIUnknown = { + b.mergeFrom(a._2) + if (b.getId == null) + b.setId(a._2.getId) + b + } + + override def merge(wx: DLIUnknown, wy: DLIUnknown): DLIUnknown = { + wx.mergeFrom(wy) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: DLIUnknown): DLIUnknown = reduction + + override def bufferEncoder: Encoder[DLIUnknown] = + Encoders.kryo(classOf[DLIUnknown]) + + override def outputEncoder: Encoder[DLIUnknown] = + Encoders.kryo(classOf[DLIUnknown]) + } + + def getDLIDatasetAggregator(): Aggregator[(String, DLIDataset), DLIDataset, DLIDataset] = new Aggregator[(String, DLIDataset), DLIDataset, DLIDataset]{ + + override def zero: DLIDataset = new DLIDataset() + + override def reduce(b: DLIDataset, a: (String, DLIDataset)): DLIDataset = { + b.mergeFrom(a._2) + if (b.getId == null) + b.setId(a._2.getId) + b + } + + override def merge(wx: DLIDataset, wy: DLIDataset): DLIDataset = { + wx.mergeFrom(wy) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: DLIDataset): DLIDataset = reduction + + override def bufferEncoder: Encoder[DLIDataset] = + Encoders.kryo(classOf[DLIDataset]) + + override def outputEncoder: Encoder[DLIDataset] = + Encoders.kryo(classOf[DLIDataset]) + } + + + def getDLIPublicationAggregator(): Aggregator[(String, DLIPublication), DLIPublication, DLIPublication] = new Aggregator[(String, DLIPublication), DLIPublication, DLIPublication]{ + + override def zero: DLIPublication = new DLIPublication() + + override def reduce(b: DLIPublication, a: (String, DLIPublication)): DLIPublication = { + b.mergeFrom(a._2) + if (b.getId == null) + b.setId(a._2.getId) + b + } + + + override def merge(wx: DLIPublication, wy: DLIPublication): DLIPublication = { + wx.mergeFrom(wy) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: DLIPublication): DLIPublication = reduction + + override def bufferEncoder: Encoder[DLIPublication] = + Encoders.kryo(classOf[DLIPublication]) + + override def outputEncoder: Encoder[DLIPublication] = + Encoders.kryo(classOf[DLIPublication]) + } + + def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{ override def zero: Publication = new Publication() @@ -85,5 +168,27 @@ object EBIAggregator { } + def getDLIRelationAggregator(): Aggregator[(String, DLIRelation), DLIRelation, DLIRelation] = new Aggregator[(String, DLIRelation), DLIRelation, DLIRelation]{ + + override def zero: DLIRelation = new DLIRelation() + + override def reduce(b: DLIRelation, a: (String, DLIRelation)): DLIRelation = { + a._2 + } + + + override def merge(a: DLIRelation, b: DLIRelation): DLIRelation = { + if(b!= null) b else a + } + override def finish(reduction: DLIRelation): DLIRelation = reduction + + override def bufferEncoder: Encoder[DLIRelation] = + Encoders.kryo(classOf[DLIRelation]) + + override def outputEncoder: Encoder[DLIRelation] = + Encoders.kryo(classOf[DLIRelation]) + } + + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala index 897bbd5407..2d3f75b918 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala @@ -1,8 +1,9 @@ package eu.dnetlib.dhp.sx.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Instance, KeyValue, Oaf} +import eu.dnetlib.dhp.schema.oaf.{Author, Instance, Journal, KeyValue, Oaf, Publication, Dataset => OafDataset} import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIRelation, OafUtils, ProvenaceInfo} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, OafUtils, ProvenaceInfo} +import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal} import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.scholexplorer.relation.RelationMapper import org.apache.commons.io.IOUtils @@ -12,6 +13,7 @@ import org.json4s import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.parse +import org.apache.spark.sql.functions._ import scala.collection.JavaConverters._ @@ -28,6 +30,64 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin } + + def journalToOAF(pj:PMJournal): Journal = { + val j = new Journal + j.setIssnPrinted(pj.getIssn) + j.setVol(pj.getVolume) + j.setName(pj.getTitle) + j.setIss(pj.getIssue) + j.setDataInfo(OafUtils.generateDataInfo()) + j + } + + + def pubmedTOPublication(input:PMArticle):DLIPublication = { + + + val dnetPublicationId = s"50|${DHPUtils.md5(s"${input.getPmid}::pmid")}" + + val p = new DLIPublication + p.setId(dnetPublicationId) + p.setDataInfo(OafUtils.generateDataInfo()) + p.setPid(List(OafUtils.createSP(input.getPmid.toLowerCase.trim, "pmid", "dnet:pid_types")).asJava) + p.setCompletionStatus("complete") + val pi = new ProvenaceInfo + pi.setId("dli_________::europe_pmc__") + pi.setName( "Europe PMC") + pi.setCompletionStatus("complete") + pi.setCollectionMode("collected") + p.setDlicollectedfrom(List(pi).asJava) + p.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava) + + if (input.getAuthors != null && input.getAuthors.size() >0) { + var aths: List[Author] = List() + input.getAuthors.asScala.filter(a=> a!= null).foreach(a => { + val c = new Author + c.setFullname(a.getFullName) + c.setName(a.getForeName) + c.setSurname(a.getLastName) + aths = aths ::: List(c) + }) + if (aths.nonEmpty) + p.setAuthor(aths.asJava) + } + + + if (input.getJournal != null) + p.setJournal(journalToOAF(input.getJournal)) + p.setTitle(List(OafUtils.createSP(input.getTitle, "main title", "dnet:dataCite_title")).asJava) + p.setDateofacceptance(OafUtils.asField(input.getDate)) + val i = new Instance + i.setCollectedfrom(generatePubmedDLICollectedFrom()) + i.setDateofacceptance(p.getDateofacceptance) + i.setUrl(List(s"https://pubmed.ncbi.nlm.nih.gov/${input.getPmid}").asJava) + i.setInstancetype(createQualifier("0001", "Article", "dnet:publication_resource", "dnet:publication_resource")) + p.setInstance(List(i).asJava) + p + } + + def ebiLinksToOaf(input:(String, String)):List[Oaf] = { val pmid :String = input._1 val input_json :String = input._2 @@ -116,8 +176,16 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin val workingPath = parser.get("workingPath") implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + implicit val oafpubEncoder: Encoder[Publication] = Encoders.kryo[Publication] implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation]) implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) + implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication]) + implicit val atEncoder: Encoder[Author] = Encoders.kryo(classOf[Author]) + implicit val strEncoder:Encoder[String] = Encoders.STRING + implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle]) + implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal]) + implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) + val ds:Dataset[(String,String)] = spark.read.load(s"$workingPath/baseline_links_updates").as[(String,String)](Encoders.tuple(Encoders.STRING, Encoders.STRING)) @@ -133,6 +201,46 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset") + val idPublicationSolved:Dataset[String] = spark.read.load(s"$workingPath/baseline_links_updates").where(col("links").isNotNull).select("pmid").as[String] + val baseline:Dataset[(String, PMArticle)]= spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle].map(p=> (p.getPmid, p))(Encoders.tuple(strEncoder,PMEncoder)) + idPublicationSolved.joinWith(baseline, idPublicationSolved("pmid").equalTo(baseline("_1"))).map(k => pubmedTOPublication(k._2._2)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_publication") + + + val pmaDatasets = spark.read.load("/user/sandro.labruzzo/scholix/EBI/ebi_garr/baseline_dataset").as[PMArticle] + + pmaDatasets.map(p => pubmedTOPublication(p)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_publication_all") + + val pubs: Dataset[(String,Publication)] = spark.read.load("/user/sandro.labruzzo/scholix/EBI/publication").as[Publication].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING,oafpubEncoder)) + val pubdate:Dataset[(String,DLIPublication)] = spark.read.load(s"$workingPath/baseline_publication_all").as[DLIPublication].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING,pubEncoder)) + + + + pubs.joinWith(pubdate, pubs("_1").equalTo(pubdate("_1"))).map(k => k._2._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_publication_ebi") + + + + val dt : Dataset[DLIDataset] = spark.read.load(s"$workingPath/dataset").as[DLIDataset] + val update : Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_dataset").as[DLIDataset] + + + dt.union(update).map(d => (d.getId,d))(Encoders.tuple(Encoders.STRING, datEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset_ebi") + + + val rel: Dataset[DLIRelation] = spark.read.load(s"$workingPath/relation").as[DLIRelation] + val relupdate : Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_relation").as[DLIRelation] + + + rel.union(relupdate) + .map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIRelationAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite) + .save(s"$workingPath/baseline_relation_ebi") } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala index 60857f0fcf..008e3c99b4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} import eu.dnetlib.scholexplorer.relation.RelationMapper import org.apache.commons.io.IOUtils @@ -10,6 +11,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} import org.slf4j.{Logger, LoggerFactory} + import scala.collection.JavaConverters._ object SparkCreateEBIDataFrame { @@ -34,54 +36,51 @@ object SparkCreateEBIDataFrame { val relationMapper = RelationMapper.load implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) - implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) - implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) - implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + implicit val datasetEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) + implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication]) + implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation]) - logger.info("Extract Publication and relation from publication_xml") - val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s => - { - new ObjectMapper().readValue(s, classOf[String]) - }).flatMap(s => { - val d = new PublicationScholexplorerParser - d.parseObject(s, relationMapper).asScala.iterator}) +// logger.info("Extract Publication and relation from publication_xml") +// val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s => +// { +// new ObjectMapper().readValue(s, classOf[String]) +// }).flatMap(s => { +// val d = new PublicationScholexplorerParser +// d.parseObject(s, relationMapper).asScala.iterator}) +// +// val mapper = new ObjectMapper() +// mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) +// spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf") +// +// logger.info("Extract Publication and relation from dataset_xml") +// val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s => +// { +// new ObjectMapper().readValue(s, classOf[String]) +// }).flatMap(s => { +// val d = new DatasetScholexplorerParser +// d.parseObject(s, relationMapper).asScala.iterator}) - val mapper = new ObjectMapper() - mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) - spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf") - - logger.info("Extract Publication and relation from dataset_xml") - val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s => - { - new ObjectMapper().readValue(s, classOf[String]) - }).flatMap(s => { - val d = new DatasetScholexplorerParser - d.parseObject(s, relationMapper).asScala.iterator}) - - spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf") - val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset]) - val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication]) - val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation]) +// spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf") + val dataset: Dataset[DLIDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIDataset]).map(d => d.asInstanceOf[DLIDataset]) + val publication: Dataset[DLIPublication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIPublication]).map(d => d.asInstanceOf[DLIPublication]) + val relations: Dataset[DLIRelation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIRelation]).map(d => d.asInstanceOf[DLIRelation]) publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getPublicationAggregator().toColumn) + .agg(EBIAggregator.getDLIPublicationAggregator().toColumn) .map(p => p._2) .write.mode(SaveMode.Overwrite).save(s"$workingPath/publication") dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder)) .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getDatasetAggregator().toColumn) + .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) .map(p => p._2) .write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset") relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder)) .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getRelationAggregator().toColumn) + .agg(EBIAggregator.getDLIRelationAggregator().toColumn) .map(p => p._2) .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation") - - - relations.map(r => (r.getSource, r.getTarget))(Encoders.tuple(Encoders.STRING,Encoders.STRING)) } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java index 4a21985424..e27c9adaad 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java @@ -25,7 +25,8 @@ public class PMAuthor implements Serializable { } public String getFullName() { - return String.format("%s, %s", this.foreName, this.lastName); + return String + .format("%s, %s", this.foreName != null ? this.foreName : "", this.lastName != null ? this.lastName : ""); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala index be1e43963e..b93dd8c3ce 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala @@ -1,5 +1,96 @@ package eu.dnetlib.dhp.sx.graph +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown} +import eu.dnetlib.dhp.sx.ebi.EBIAggregator +import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal} +import org.apache.commons.io.IOUtils +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.LoggerFactory + object SparkSplitOafTODLIEntities { + + def getKeyRelation(rel:DLIRelation):String = { + s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}" + + + } + + def main(args: Array[String]): Unit = { + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) + val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass) + parser.parseArgument(args) + + val workingPath: String = parser.get("workingPath") + logger.info(s"Working dir path = $workingPath") + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] + implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] + implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] + implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation] + + + + val spark:SparkSession = SparkSession + .builder() + .appName(SparkSplitOafTODLIEntities.getClass.getSimpleName) + .master(parser.get("master")) + .getOrCreate() + + + val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/OAFDataset").as[Oaf] + + + OAFDataset + .filter(s => s != null && s.isInstanceOf[DLIPublication]) + .map(s =>s.asInstanceOf[DLIPublication]) + .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIPublicationAggregator().toColumn) + .map(p => p._2) + .repartition(1000) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/publication") + + OAFDataset + .filter(s => s != null && s.isInstanceOf[DLIDataset]) + .map(s =>s.asInstanceOf[DLIDataset]) + .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) + .map(p => p._2) + .repartition(1000) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset") + + + OAFDataset + .filter(s => s != null && s.isInstanceOf[DLIUnknown]) + .map(s =>s.asInstanceOf[DLIUnknown]) + .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, unkEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIUnknownAggregator().toColumn) + .map(p => p._2) + .repartition(1000) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/unknown") + + + OAFDataset + .filter(s => s != null && s.isInstanceOf[DLIRelation]) + .map(s =>s.asInstanceOf[DLIRelation]) + .map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIRelationAggregator().toColumn) + .map(p => p._2) + .repartition(1000) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation") + + + + + + + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala index 848b23253d..f74a6043b7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala @@ -1,5 +1,73 @@ package eu.dnetlib.dhp.sx.graph +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} +import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} +import eu.dnetlib.scholexplorer.relation.RelationMapper +import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.{IntWritable, Text} +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ + + +/** + * This new version of the Job read a sequential File containing XML stored in the aggregator and generates a Dataset OAF of heterogeneous + * entities like Dataset, Relation, Publication and Unknown + */ + object SparkXMLToOAFDataset { + + def main(args: Array[String]): Unit = { + val logger = LoggerFactory.getLogger(SparkXMLToOAFDataset.getClass) + val conf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkXMLToOAFDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json"))) + parser.parseArgument(args) + val spark = + SparkSession + .builder() + .config(conf) + .appName(SparkXMLToOAFDataset.getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sc = spark.sparkContext + + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset] + implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication] + implicit val relationEncoder:Encoder[DLIRelation] = Encoders.kryo[DLIRelation] + + val relationMapper = RelationMapper.load + + val inputPath: String = parser.get("sourcePath") + val entity: String = parser.get("entity") + val targetPath = parser.get("targetPath") + + logger.info(s"Input path is $inputPath") + logger.info(s"Entity path is $entity") + logger.info(s"Target Path is $targetPath") + + val scholixRdd:RDD[Oaf] = sc.sequenceFile(inputPath, classOf[IntWritable], classOf[Text]) + .map(s => s._2.toString) + .flatMap(s => { + entity match { + case "publication" => + val p = new PublicationScholexplorerParser + val l =p.parseObject(s, relationMapper) + if (l != null) l.asScala else List() + case "dataset" => + val d = new DatasetScholexplorerParser + val l =d.parseObject(s, relationMapper) + if (l != null) l.asScala else List() + } + }).filter(s => s!= null) + spark.createDataset(scholixRdd).write.mode(SaveMode.Append).save(targetPath) + + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java index 60371fa536..11d9905ccd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java @@ -317,6 +317,15 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { .collect(Collectors.toList())); } + // TERRIBLE HACK TO AVOID EMPTY COLLECTED FROM + if (parsedObject.getDlicollectedfrom() == null) { + + final KeyValue cf = new KeyValue(); + cf.setKey("dli_________::europe_pmc__"); + cf.setValue("Europe PMC"); + parsedObject.setCollectedfrom(Collections.singletonList(cf)); + } + if (StringUtils.isNotBlank(resolvedURL)) { Instance i = new Instance(); i.setCollectedfrom(parsedObject.getCollectedfrom().get(0)); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml index cac3cc2bb0..17cd6c9a33 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml @@ -1,7 +1,7 @@ - + - - jobTracker - yarn - - - nameNode - hdfs://hadoop-rm1.garr-pa1.d4science.org:8020 - - - hive_metastore_uris - thrift://hadoop-edge3.garr-pa1.d4science.org:9083 - - - spark2YarnHistoryServerAddress - http://hadoop-rm2.garr-pa1.d4science.org:19888 - + + + + + + + + + + + + + + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml index a5035c56c6..7e63362428 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml @@ -18,7 +18,7 @@ - + @@ -48,6 +48,28 @@ + + + yarn-cluster + cluster + Create EBI DataSet + + eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=1000 + ${sparkExtraOPT} + + --workingPath${workingPath} + --masteryarn + + + + + yarn-cluster @@ -71,27 +93,7 @@ - - - yarn-cluster - cluster - Create EBI DataSet - eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=1000 - ${sparkExtraOPT} - - --workingPath${workingPath} - --masteryarn - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json index 1c02109d01..febcfc8981 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json @@ -1,7 +1,4 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the result data", "paramRequired": true}, - {"paramName":"td", "paramLongName":"targetDir", "paramDescription": "the name of the result data", "paramRequired": true}, - {"paramName":"e", "paramLongName":"entities", "paramDescription": "the entity type to be filtered", "paramRequired": true} + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml index d74d68663f..c94394b1ee 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml @@ -101,12 +101,17 @@ yarn-cluster cluster Import ${entity} and related entities - eu.dnetlib.dhp.sx.graph.SparkScholexplorerGraphImporter + eu.dnetlib.dhp.sx.graph.SparkXMLToOAFDataset dhp-graph-mapper-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} - -mt yarn-cluster + + --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} + + -mt yarn --sourcePath${targetXMLPath} - --targetPath${targetEntityPath} + --targetPath${workingPath}/input/OAFDataset --entity${entity} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml index 46e2dc3f98..fabe7510bb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml @@ -1,16 +1,8 @@ - sourcePath - the source path - - - targetPath - the source path - - - targetDir - the name of the path + workingPath + the working path sparkDriverMemory @@ -20,32 +12,13 @@ sparkExecutorMemory memory for individual executor - - entities - the entities to be extracted - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - - @@ -53,19 +26,18 @@ ${nameNode} yarn-cluster cluster - Extract ${entities} - eu.dnetlib.dhp.sx.graph.SparkExtractEntitiesJob + Extract DLI Entities + eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 ${sparkExtraOPT} -mt yarn-cluster - --sourcePath${sourcePath} - --targetPath${targetPath} - --targetDir${targetDir} - --entities${entities} + --workingPath${workingPath}