From 22cb9e0da7ce016853c741017f98ca300e4197ab Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 15 May 2020 18:18:01 +0200 Subject: [PATCH] simple code to get file from URL --- .../eu/dnetlib/dhp/schema/oaf/Programme.java | 4 + .../dhp/actionset/h2020programme/GetFile.java | 53 +++++++++ .../h2020programme/action_set_parameters.json | 0 .../oozie_app/lib/scripts/getprogrammefile.sh | 0 .../oozie_app/lib/scripts/getprojectfile.sh | 0 .../h2020programme/oozie_app/workflow.xml | 112 ++++++++++++++++++ .../actionset/h2020programme/parameters.json | 86 ++++++++++++++ 7 files changed, 255 insertions(+) create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Programme.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionset/h2020programme/GetFile.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/action_set_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/lib/scripts/getprogrammefile.sh create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/lib/scripts/getprojectfile.sh create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/parameters.json diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Programme.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Programme.java new file mode 100644 index 0000000000..f913336163 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Programme.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.schema.oaf; + +public class Programme { +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionset/h2020programme/GetFile.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionset/h2020programme/GetFile.java new file mode 100644 index 0000000000..2fed1a0e36 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionset/h2020programme/GetFile.java @@ -0,0 +1,53 @@ + +package eu.dnetlib.dhp.actionset.h2020programme; + +import java.io.*; +import java.net.URL; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class GetFile { + + private static final Log log = LogFactory.getLog(GetFile.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GetFile.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionset/h2020programme/parameters.json"))); + + Configuration conf = new Configuration(); + + parser.parseArgument(args); + + final String fileURL = parser.get("fileURL"); + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("hdfsNameNode"); + + conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fsDataOutputStream = fileSystem.append(hdfsWritePath); + } else { + fsDataOutputStream = fileSystem.create(hdfsWritePath); + } + + InputStream is = new BufferedInputStream(new URL(fileURL).openStream()); + + org.apache.hadoop.io.IOUtils.copyBytes(is, fsDataOutputStream, 4096, true); + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/action_set_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/action_set_parameters.json new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/lib/scripts/getprogrammefile.sh b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/lib/scripts/getprogrammefile.sh new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/lib/scripts/getprojectfile.sh b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/lib/scripts/getprojectfile.sh new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/workflow.xml new file mode 100644 index 0000000000..3e7f684012 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/workflow.xml @@ -0,0 +1,112 @@ + + + + sequenceFilePath + the path to store the sequence file of the native metadata collected + + + + mdStorePath + the path of the native mdstore + + + + apiDescription + A json encoding of the API Description class + + + + dataSourceInfo + A json encoding of the Datasource Info + + + identifierPath + An xpath to retrieve the metadata idnentifier for the generation of DNet Identifier + + + + metadataEncoding + The type of the metadata XML/JSON + + + + timestamp + The timestamp of the collection date + + + + workflowId + The identifier of the workflow + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.collection.worker.DnetCollectorWorker + -p${sequenceFilePath} + -a${apiDescription} + -n${nameNode} + -rh${rmq_host} + -ru${rmq_user} + -rp${rmq_pwd} + -rr${rmq_report} + -ro${rmq_ongoing} + -usandro.labruzzo + -w${workflowId} + + + + + + + ${jobTracker} + ${nameNode} + yarn + cluster + GenerateNativeStoreSparkJob + eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob + dhp-aggregations-1.0.0-SNAPSHOT.jar + --num-executors 50 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + --encoding ${metadataEncoding} + --dateOfCollection ${timestamp} + --provenance ${dataSourceInfo} + --xpath${identifierPath} + --input${sequenceFilePath} + --output${mdStorePath} + -rh${rmq_host} + -ru${rmq_user} + -rp${rmq_pwd} + -rr${rmq_report} + -ro${rmq_ongoing} + -w${workflowId} + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/parameters.json new file mode 100644 index 0000000000..4a6aec5ee1 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/parameters.json @@ -0,0 +1,86 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "e", + "paramLongName": "encoding", + "paramDescription": "the encoding of the input record should be JSON or XML", + "paramRequired": true + }, + { + "paramName": "d", + "paramLongName": "dateOfCollection", + "paramDescription": "the date when the record has been stored", + "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "provenance", + "paramDescription": "the infos about the provenance of the collected records", + "paramRequired": true + }, + { + "paramName": "x", + "paramLongName": "xpath", + "paramDescription": "the xpath to identify the record identifier", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "input", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "output", + "paramDescription": "the path of the result DataFrame on HDFS", + "paramRequired": true + }, + { + "paramName": "ru", + "paramLongName": "rabbitUser", + "paramDescription": "the user to connect with RabbitMq for messaging", + "paramRequired": true + }, + { + "paramName": "rp", + "paramLongName": "rabbitPassword", + "paramDescription": "the password to connect with RabbitMq for messaging", + "paramRequired": true + }, + { + "paramName": "rh", + "paramLongName": "rabbitHost", + "paramDescription": "the host of the RabbitMq server", + "paramRequired": true + }, + { + "paramName": "ro", + "paramLongName": "rabbitOngoingQueue", + "paramDescription": "the name of the ongoing queue", + "paramRequired": true + }, + { + "paramName": "rr", + "paramLongName": "rabbitReportQueue", + "paramDescription": "the name of the report queue", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workflowId", + "paramDescription": "the identifier of the dnet Workflow", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "isTest", + "paramDescription": "the name of the report queue", + "paramRequired": false + } +] \ No newline at end of file