From 9ca438d9b1ab2ebee9896b94b3544cb0bfac87f8 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 10 Jun 2021 14:59:45 +0200 Subject: [PATCH] imported from branch stable_ids generation of Actionset datacite --- .../datacite/ExportActionSetJobNode.scala | 40 ++++++++++++++++ .../datacite/exportDataset_parameters.json | 21 +++++++++ .../oozie_app/config-default.xml | 23 ++++++++++ .../datacite_export/oozie_app/workflow.xml | 46 +++++++++++++++++++ 4 files changed, 130 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite_export/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite_export/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala new file mode 100644 index 000000000..42241c773 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala @@ -0,0 +1,40 @@ +package eu.dnetlib.dhp.actionmanager.datacite +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Oaf +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.hadoop.mapred.SequenceFileOutputFormat +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import scala.io.Source + +object ExportActionSetJobNode { + + val log: Logger = LoggerFactory.getLogger(ExportActionSetJobNode.getClass) + + def main(args: Array[String]): Unit = { + val conf = new SparkConf + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json")).mkString) + parser.parseArgument(args) + val master = parser.get("master") + val sourcePath = parser.get("sourcePath") + val targetPath = parser.get("targetPath") + + val spark: SparkSession = SparkSession.builder().config(conf) + .appName(ExportActionSetJobNode.getClass.getSimpleName) + .master(master) + .getOrCreate() + implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING) + + spark.read.load(sourcePath).as[Oaf] + .map(o =>DataciteToOAFTransformation.toActionSet(o)) + .filter(o => o!= null) + .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) + + + } + +} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json new file mode 100644 index 000000000..63e080337 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json @@ -0,0 +1,21 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source mdstore path", + "paramRequired": true + }, + + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the target mdstore path", + "paramRequired": true + }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite_export/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite_export/oozie_app/config-default.xml new file mode 100644 index 000000000..dd3c32c62 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite_export/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite_export/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite_export/oozie_app/workflow.xml new file mode 100644 index 000000000..c4c260b96 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite_export/oozie_app/workflow.xml @@ -0,0 +1,46 @@ + + + + sourcePath + the working path of Datacite stores + + + outputPath + the path of Datacite ActionSet + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn-cluster + cluster + ExportDataset + eu.dnetlib.dhp.actionmanager.datacite.ExportActionSetJobNode + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${sourcePath} + --targetPath${outputPath} + --masteryarn-cluster + + + + + + +