From f7379255b690683382afbf623bac8635979658d6 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 15 Jun 2021 09:22:54 +0200 Subject: [PATCH] changed the workflow to extract info from the dump --- .../crossref/UnpackCrossrefDumpEntries.scala | 54 ------- .../oozie_app/workflow.xml | 58 +++++-- .../dhp/doiboost/oozie_app/workflow.xml | 151 ++++++++++++------ 3 files changed, 143 insertions(+), 120 deletions(-) delete mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala deleted file mode 100644 index 4f8189cf36..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala +++ /dev/null @@ -1,54 +0,0 @@ -package eu.dnetlib.doiboost.crossref - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.{Logger, LoggerFactory} -import org.json4s -import org.json4s.DefaultFormats -import org.json4s.JsonAST._ -import org.json4s.jackson.JsonMethods._ - -import scala.io.Source - -object UnpackCrossrefDumpEntries { - - - val log: Logger = LoggerFactory.getLogger(UnpackCrossrefDumpEntries.getClass) - - - - def extractDump(input:String):List[String] = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) - - val a = (json \ "items").extract[JArray] - a.arr.map(s => compact(render(s))) - } - - - - def main(args: Array[String]): Unit = { - val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString) - parser.parseArgument(args) - val master = parser.get("master") - val sourcePath = parser.get("sourcePath") - val targetPath = parser.get("targetPath") - - val spark: SparkSession = SparkSession.builder().config(conf) - .appName(UnpackCrossrefDumpEntries.getClass.getSimpleName) - .master(master) - .getOrCreate() - - - val sc: SparkContext = spark.sparkContext - - - sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) - .saveAsTextFile(targetPath, classOf[GzipCodec]); - - - } -} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml index fdd4218d05..c7dc8bed45 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml @@ -1,9 +1,13 @@ - - - - + + crossrefDumpPath + the working dir base path + + + inputPathCrossref + the working dir base path + sparkDriverMemory memory for driver process @@ -14,25 +18,27 @@ sparkExecutorCores + 2 number of cores used by single executor - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + ${jobTracker} ${nameNode} eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords --hdfsServerUri${nameNode} - --workingPath/data/doiboost/crossref/ - --crossrefFileNameTarGzcrossref.tar.gz + --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz + --workingPath${crossrefDumpPath} + --outputPath${workingDir}/files/ @@ -42,24 +48,42 @@ yarn-cluster cluster - SparkCreateCrossredDataset - eu.dnetlib.doiboost.crossref.GenerateCrossrefDatasetSpark + SparkGenerateCrossrefDataset + eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset dhp-doiboost-${projectVersion}.jar - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=20 - --executor-memory=6G - --driver-memory=7G + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --masteryarn-cluster - --sourcePath/data/doiboost/crossref/filess - --targetPath/tmp/miriam/crossref/crossrefDataset + --sourcePath${workingDir}/files + --targetPath${inputPathCrossref}/crossref_ds_updated + + + + + + + + + + + + + + + + + diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 6cb8a577a7..f5ce1c323d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -41,17 +41,21 @@ the Crossref input path - crossrefTimestamp - Timestamp for the Crossref incremental Harvesting - - - esServer - elasticsearch server url for the Crossref Harvesting - - - esIndex - elasticsearch index name for the Crossref Harvesting + crossrefDumpPath + the Crossref dump path + + + + + + + + + + + + @@ -114,55 +118,104 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + - eu.dnetlib.doiboost.crossref.CrossrefImporter - --targetPath${inputPathCrossref}/index_update - --namenode${nameNode} - --esServer${esServer} - --esIndex${esIndex} - --timestamp${crossrefTimestamp} + ${jobTracker} + ${nameNode} + eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords + --hdfsServerUri${nameNode} + --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz + --workingPath${crossrefDumpPath} + --outputPath${workingDir}/files - + + + + yarn-cluster + cluster + SparkGenerateCrossrefDataset + eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn-cluster + --sourcePath${workingDir}/files/ + --targetPath${inputPathCrossref}/crossref_ds_updated + + + + + + + + + + + + + + + - - - yarn-cluster - cluster - GenerateCrossrefDataset - eu.dnetlib.doiboost.crossref.CrossrefDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --workingPath${inputPathCrossref} - --masteryarn-cluster - - - - + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - + + + + + + + + +