From 5faeefbda8160685a16d7628d68fa534f03146c7 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 5 Aug 2021 10:54:03 +0200 Subject: [PATCH] added script to download the dump,changed the workflow input paramenters --- .../crossref/ExtractCrossrefRecords.java | 8 +- .../oozie_app/download.sh | 2 + .../oozie_app/workflow.xml | 119 +++++++++--------- 3 files changed, 68 insertions(+), 61 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/download.sh diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java index c7cae1fcb..c7d3770a0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java @@ -29,16 +29,16 @@ public class ExtractCrossrefRecords { "/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json"))); parser.parseArgument(args); final String hdfsServerUri = parser.get("hdfsServerUri"); - final String workingPath = parser.get("workingPath"); + final String workingPath = hdfsServerUri.concat(parser.get("workingPath")); final String outputPath = parser.get("outputPath"); final String crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz"); - Path hdfsreadpath = new Path(hdfsServerUri.concat(crossrefFileNameTarGz)); + Path hdfsreadpath = new Path(workingPath.concat("/").concat(crossrefFileNameTarGz)); Configuration conf = new Configuration(); - conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath)); + conf.set("fs.defaultFS", workingPath); conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf); + FileSystem fs = FileSystem.get(URI.create(workingPath), conf); FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath); try (TarArchiveInputStream tais = new TarArchiveInputStream( new GzipCompressorInputStream(crossrefFileStream))) { diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/download.sh b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/download.sh new file mode 100644 index 000000000..662f5313c --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/download.sh @@ -0,0 +1,2 @@ +#!/bin/bash +curl -LSs -H "Crossref-Plus-API-Token: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJodHRwOi8vY3Jvc3NyZWYub3JnLyIsImF1ZCI6Im1kcGx1cyIsImp0aSI6Ijk3YTZkNGVkLTg5MjktNGQ2Yi05NWY1LTY2YmMyNDgzNTRjNCJ9.5DPM4gRibUBYBtrUSpRz3RGHYVB-8f61jQBW_q-r-hs" $1 | hdfs dfs -put $2/$3 \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml index 506d86a08..2d3ad6bc3 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml @@ -1,13 +1,5 @@ - + - - crossrefDumpPath - the working dir base path - - - inputPathCrossref - the working dir base path - sparkDriverMemory memory for driver process @@ -18,27 +10,82 @@ sparkExecutorCores - 2 number of cores used by single executor + + + crossrefdumpfilename + the Crossref input path + + + crossrefDumpPath + the Crossref dump path + + + - + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + + + mapred.job.queue.name + ${queueName} + + + download.sh + ${url} + ${crossrefDumpPath} + ${crossrefdumpfilename} + HADOOP_USER_NAME=${wf:user()} + download.sh + + + + + + ${jobTracker} ${nameNode} eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords --hdfsServerUri${nameNode} - --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz + --crossrefFileNameTarGz${crossrefdumpfilename} --workingPath${crossrefDumpPath} - --outputPath${workingDir}/files/ + --outputPath${crossrefDumpPath}/files/ @@ -48,7 +95,7 @@ yarn-cluster cluster - SparkGenerateCrossrefDataset + SparkUnpackCrossrefEntries eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries dhp-doiboost-${projectVersion}.jar @@ -63,56 +110,14 @@ --masteryarn-cluster --sourcePath${crossrefDumpPath}/files - --targetPath${inputPathCrossref}/crossref_ds - - - - - - - - - yarn-cluster - cluster - SparkGenerateCrossrefDataset - eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn-cluster - --sourcePath${inputPathCrossref}/crossref_ds - --targetPath${inputPathCrossref}/crossref_ds_updates + --targetPath${crossrefDumpPath}/crossref_unpack/ - - - - - - - - - - - - - - - - \ No newline at end of file