From 45d06c45c7a9bea15dcfeeb93eac217ba0de36de Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 1 Dec 2020 14:29:18 +0100 Subject: [PATCH] collecting all the atoic actions for result type and save them all in the AS path --- .../bipfinder/CollectAndSave.java | 85 +++++++++ .../bipfinder/input_actionset_parameter.json | 32 ++++ .../bipfinder/oozie_app/config-default.xml | 58 +++++++ .../bipfinder/oozie_app/workflow.xml | 163 ++++++++++++++++++ 4 files changed, 338 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/CollectAndSave.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/CollectAndSave.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/CollectAndSave.java new file mode 100644 index 000000000..7164fc5ed --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/CollectAndSave.java @@ -0,0 +1,85 @@ + +package eu.dnetlib.dhp.actionmanager.bipfinder; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class CollectAndSave implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(CollectAndSave.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + CollectAndSave.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + collectAndSave(spark, inputPath, outputPath); + }); + } + + private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + sc + .sequenceFile(inputPath + "/publication", Text.class, Text.class) + .union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)) + .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)) + .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class)) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + ; + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json new file mode 100644 index 000000000..ae844a0c9 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false +}, +{ +"paramName": "ip", +"paramLongName": "inputPath", +"paramDescription": "the URL from where to get the programme file", +"paramRequired": true +}, +{ +"paramName": "o", +"paramLongName": "outputPath", +"paramDescription": "the path of the new ActionSet", +"paramRequired": true +}, + { + "paramName": "rtn", + "paramLongName": "resultTableName", + "paramDescription": "the path of the new ActionSet", + "paramRequired": true + }, + { + "paramName": "bsp", + "paramLongName": "bipScorePath", + "paramDescription": "the path of the new ActionSet", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/config-default.xml new file mode 100644 index 000000000..a1755f329 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/config-default.xml @@ -0,0 +1,58 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + + + oozie.launcher.mapreduce.user.classpath.first + true + + + sparkExecutorNumber + 4 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + sparkDriverMemory + 15G + + + sparkExecutorMemory + 6G + + + sparkExecutorCores + 1 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml new file mode 100644 index 000000000..c710c8b55 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml @@ -0,0 +1,163 @@ + + + + projectFileURL + the url where to get the projects file + + + + programmeFileURL + the url where to get the programme file + + + + topicFileURL + the url where to get the topic file + + + outputPath + path where to store the action set + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV + --hdfsNameNode${nameNode} + --fileURL${projectFileURL} + --hdfsPath${workingDir}/projects + --classForNameeu.dnetlib.dhp.actionmanager.project.utils.CSVProject + + + + + + + + eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV + --hdfsNameNode${nameNode} + --fileURL${programmeFileURL} + --hdfsPath${workingDir}/programme + --classForNameeu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme + + + + + + + + eu.dnetlib.dhp.actionmanager.project.utils.ReadExcel + --hdfsNameNode${nameNode} + --fileURL${topicFileURL} + --hdfsPath${workingDir}/topic + --classForNameeu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic + + + + + + + + eu.dnetlib.dhp.actionmanager.project.ReadProjectsFromDB + --hdfsPath${workingDir}/dbProjects + --hdfsNameNode${nameNode} + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + + + + + + + + yarn + cluster + PrepareProgramme + eu.dnetlib.dhp.actionmanager.project.PrepareProgramme + dhp-aggregation-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --programmePath${workingDir}/programme + --outputPath${workingDir}/preparedProgramme + + + + + + + + yarn + cluster + PrepareProjects + eu.dnetlib.dhp.actionmanager.project.PrepareProjects + dhp-aggregation-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --projectPath${workingDir}/projects + --outputPath${workingDir}/preparedProjects + --dbProjectPath${workingDir}/dbProjects + + + + + + + + yarn + cluster + ProjectProgrammeAS + eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob + dhp-aggregation-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --projectPath${workingDir}/preparedProjects + --programmePath${workingDir}/preparedProgramme + --topicPath${workingDir}/topic + --outputPath${outputPath} + + + + + + + \ No newline at end of file