From cd17e19044be9414d733ed7208943047d53710a0 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 8 Jul 2021 21:20:19 +0200 Subject: [PATCH] implemented branch workflow to import datacite and crossref in scholexplorer --- .../FilterCrossrefEntitiesSpark.scala | 46 ++++++++++ .../datacite/filter_crossref_param.json | 21 +++++ .../scholix/oozie_app/config-default.xml | 23 +++++ .../datacite/scholix/oozie_app/workflow.xml | 84 +++++++++++++++++++ .../dhp/sx/graph/step1/oozie_app/workflow.xml | 2 +- 5 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala new file mode 100644 index 000000000..5860c50ac --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala @@ -0,0 +1,46 @@ +package eu.dnetlib.dhp.actionmanager.datacite + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup +import eu.dnetlib.dhp.schema.mdstore.MetadataRecord +import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} +import eu.dnetlib.dhp.utils.ISLookupClientFactory +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import scala.io.Source + +object FilterCrossrefEntitiesSpark { + + val log: Logger = LoggerFactory.getLogger(getClass.getClass) + + def main(args: Array[String]): Unit = { + val conf = new SparkConf + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json")).mkString) + parser.parseArgument(args) + val master = parser.get("master") + val sourcePath = parser.get("sourcePath") + log.info("sourcePath: {}", sourcePath) + val targetPath = parser.get("targetPath") + log.info("targetPath: {}", targetPath) + + + + val spark: SparkSession = SparkSession.builder().config(conf) + .appName(getClass.getSimpleName) + .master(master) + .getOrCreate() + + + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val resEncoder: Encoder[Result] = Encoders.kryo[Result] + + val d:Dataset[Oaf]= spark.read.load(sourcePath).as[Oaf] + + d.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]).write.mode(SaveMode.Overwrite).save(targetPath) + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json new file mode 100644 index 000000000..63e080337 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json @@ -0,0 +1,21 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source mdstore path", + "paramRequired": true + }, + + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the target mdstore path", + "paramRequired": true + }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml new file mode 100644 index 000000000..dd3c32c62 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml new file mode 100644 index 000000000..397288c69 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml @@ -0,0 +1,84 @@ + + + + datacitePath + the path of Datacite spark dataset + + + isLookupUrl + The IS lookUp service endopoint + + + crossrefPath + the path of Crossref spark dataset + + + + targetPath + the path of Crossref spark dataset + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn-cluster + cluster + ImportDatacite + eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${datacitePath} + --targetPath${targetPath}/datacite_oaf + --isLookupUrl${isLookupUrl} + --exportLinkstrue + --masteryarn-cluster + + + + + + + + + yarn-cluster + cluster + FilterCrossrefEntities + eu.dnetlib.dhp.actionmanager.datacite.FilterCrossrefEntitiesSpark + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${crossrefPath} + --targetPath${targetPath}/crossref_oaf + --masteryarn-cluster + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml index bac3687e2..1895ab741 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml @@ -10,7 +10,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]