From fe62ccd6dd9589c4bd5626c0422e1eb283311959 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Mon, 28 Oct 2019 12:12:50 +0100 Subject: [PATCH] implemented oozie wf --- .../dhp/graph/SparkGraphImporterJob.java | 1 - .../dhp/graph/oozie_app/config-default.xml | 14 +++++++ .../dnetlib/dhp/graph/oozie_app/workflow.xml | 38 +++++++++++++++++++ .../dhp/graph/SparkGraphImporterJobTest.java | 2 + 4 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index 24989c44b..a2fe08b67 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -8,7 +8,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml new file mode 100644 index 000000000..3832110ca --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml @@ -0,0 +1,14 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml new file mode 100644 index 000000000..c814c07bb --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -0,0 +1,38 @@ + + + + sourcePath + the source path + + + targetPath + the target path + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + MapGraphIntoDataFrame + eu.dnetlib.dhp.graph.SparkGraphImporterJob + dhp-aggregations-1.0.0-SNAPSHOT.jar + --num-executors 50 --conf -spark.extraListeners=com.cloudera.spark.lineage.NavigatorAppListener -spark.sql.queryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener" + -mt yarn-cluster + --input${sourcePath} + --outputDir${targetPath} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java index 09135482c..5854aa519 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/SparkGraphImporterJobTest.java @@ -1,10 +1,12 @@ package eu.dnetlib.dhp.graph; +import org.junit.Ignore; import org.junit.Test; public class SparkGraphImporterJobTest { @Test + @Ignore public void testImport() throws Exception { SparkGraphImporterJob.main(new String[]{"-mt", "local[*]","-i", "/home/sandro/part-m-02236", "-o", "/tmp/dataframes"}); }