diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala index 1cb49ab591..d633a3fdc3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala @@ -6,7 +6,7 @@ import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal, PMParser} - +import org.apache.spark.sql.expressions.Aggregator import scala.io.Source import scala.xml.pull.XMLEventReader @@ -14,6 +14,26 @@ import scala.xml.pull.XMLEventReader object SparkCreateBaselineDataFrame { + val pmArticleAggregator: Aggregator[(String, PMArticle), PMArticle, PMArticle] = new Aggregator[(String, PMArticle), PMArticle, PMArticle] with Serializable { + override def zero: PMArticle = new PMArticle + + override def reduce(b: PMArticle, a: (String, PMArticle)): PMArticle = { + if (b != null && b.getPmid!= null) b else a._2 + } + + override def merge(b1: PMArticle, b2: PMArticle): PMArticle = { + if (b1 != null && b1.getPmid!= null) b1 else b2 + + } + + override def finish(reduction: PMArticle): PMArticle = reduction + + override def bufferEncoder: Encoder[PMArticle] = Encoders.kryo[PMArticle] + + override def outputEncoder: Encoder[PMArticle] = Encoders.kryo[PMArticle] + } + + def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json"))) @@ -24,6 +44,8 @@ object SparkCreateBaselineDataFrame { .config(conf) .appName(SparkCreateEBIDataFrame.getClass.getSimpleName) .master(parser.get("master")).getOrCreate() + import spark.implicits._ + val sc = spark.sparkContext @@ -39,10 +61,8 @@ object SparkCreateBaselineDataFrame { } )) - ds.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset") - - - - + ds.map(p => (p.getPmid,p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1) + .agg(pmArticleAggregator.toColumn) + .map(p => p._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset") } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala index eb9863a60b..6fd35610bd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala @@ -115,7 +115,6 @@ class PMParser(xml:XMLEventReader) extends Iterator[PMArticle] { case "LastName" => { if (currentAuthor != null) currentAuthor.setLastName(text.trim) - } case "ForeName" => if (currentAuthor != null) currentAuthor.setForeName(text.trim) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pubmed/oozie_app/workflow.xml index a940220dab..594c864d3f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pubmed/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pubmed/oozie_app/workflow.xml @@ -24,6 +24,7 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=2000 --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala index 098291a325..a32895314c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala @@ -1,6 +1,9 @@ package eu.dnetlib.dhp.sx.ebi import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializationFeature + + import eu.dnetlib.dhp.sx.ebi.model.PMParser import org.junit.jupiter.api.Test @@ -13,10 +16,12 @@ class TestEBI { @Test def testEBIData() = { + + val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT) val inputXML = Source.fromInputStream(getClass.getResourceAsStream("pubmed.xml")).mkString val xml = new XMLEventReader(Source.fromBytes(inputXML.getBytes())) - val mapper = new ObjectMapper() + new PMParser(xml).foreach(s =>println(mapper.writeValueAsString(s)))