From 7387416e909bad95b95b2303f5b6bdff76e47749 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 12 Oct 2021 12:36:30 +0200 Subject: [PATCH] added params skip update to direct transform in OAF, this should be set to true in production --- .../ebi/SparkCreateBaselineDataFrame.scala | 28 ++++++++++--------- .../sx/bio/ebi/baseline_to_oaf_params.json | 1 + .../dhp/sx/bio/pubmed/oozie_app/workflow.xml | 6 ++++ 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index 2fc9623a8..17bf3fa6b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -32,7 +32,7 @@ object SparkCreateBaselineDataFrame { val start = l.indexOf("= 0 && end > start) - l.substring(start + 9, (end - start)) + l.substring(start + 9, end - start) else "" }.filter(s => s.endsWith(".gz")).filter(s => s > maxFile).map(s => (s, s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s")).toList @@ -158,6 +158,9 @@ object SparkCreateBaselineDataFrame { val hdfsServerUri = parser.get("hdfsServerUri") log.info("hdfsServerUri: {}", targetPath) + val skipUpdate = parser.get("skipUpdate") + log.info("skipUpdate: {}", skipUpdate) + val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) @@ -176,18 +179,17 @@ object SparkCreateBaselineDataFrame { implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) - downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri) - - val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline", 2000) - val ds: Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i => { - val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes())) - new PMParser(xml) - - })) - - ds.map(p => (p.getPmid, p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1) - .agg(pmArticleAggregator.toColumn) - .map(p => p._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset") + if (!"true".equalsIgnoreCase(skipUpdate)) { + downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri) + val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline", 2000) + val ds: Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i => { + val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes())) + new PMParser(xml) + })) + ds.map(p => (p.getPmid, p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1) + .agg(pmArticleAggregator.toColumn) + .map(p => p._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset") + } val exported_dataset = spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle] exported_dataset diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json index 4bee770bd..8dc8a2aae 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json @@ -3,5 +3,6 @@ {"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the oaf path ", "paramRequired": true}, + {"paramName":"s", "paramLongName":"skipUpdate", "paramDescription": "skip update ", "paramRequired": false}, {"paramName":"h", "paramLongName":"hdfsServerUri", "paramDescription": "the working path ", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml index 4ed6dd8bf..21fd2d153 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml @@ -12,6 +12,11 @@ targetPath The target path + + skipUpdate + false + The request block size + @@ -42,6 +47,7 @@ --masteryarn --isLookupUrl${isLookupUrl} --hdfsServerUri${nameNode} + --skipUpdate${skipUpdate}