From b0dc92786f83c428498ab4c64102ca53d39fb54b Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 4 Jan 2021 17:01:35 +0100 Subject: [PATCH] defined a single oozie workflow for the generation of doiboost --- .../SparkGenerateDOIBoostActionSet.scala | 3 +- .../doiboost/generate_doiboost_as_params.json | 3 +- .../dhp/doiboost/oozie_app/config-default.xml | 42 +++ .../dhp/doiboost/oozie_app/workflow.xml | 326 ++++++++++++++++++ 4 files changed, 372 insertions(+), 2 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala index 7a6cd3faaa..da9da22b64 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala @@ -39,6 +39,7 @@ object SparkGenerateDOIBoostActionSet { val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath") val dbOrganizationPath = parser.get("dbOrganizationPath") val workingDirPath = parser.get("targetPath") + val sequenceFilePath = parser.get("sFilePath") spark.read.load(dbDatasetPath).as[OafDataset] .map(d =>DoiBoostMappingUtil.fixResult(d)) @@ -65,7 +66,7 @@ object SparkGenerateDOIBoostActionSet { val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)] - d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) + d.rdd.repartition(6000).map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$sequenceFilePath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_as_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_as_params.json index 6eb1ec6f16..0cf9955c94 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_as_params.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_as_params.json @@ -5,5 +5,6 @@ {"paramName": "cr", "paramLongName":"crossRefRelation", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true}, {"paramName": "da", "paramLongName":"dbaffiliationRelationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true}, {"paramName": "do", "paramLongName":"dbOrganizationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true}, - {"paramName": "w", "paramLongName":"targetPath", "paramDescription": "the Working Path", "paramRequired": true} + {"paramName": "w", "paramLongName":"targetPath", "paramDescription": "the Working Path", "paramRequired": true}, + {"paramName": "sp", "paramLongName":"sFilePath", "paramDescription": "the Sequence file Path", "paramRequired": true}, ] diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/config-default.xml new file mode 100644 index 0000000000..508202e301 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/config-default.xml @@ -0,0 +1,42 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + oozie.launcher.mapreduce.user.classpath.first + true + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml new file mode 100644 index 0000000000..d779725129 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -0,0 +1,326 @@ + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorIntersectionMemory + memory for individual executor + + + + sparkExecutorCores + number of cores used by single executor + + + + + + workingPath + the working Path + + + + hostedByMapPath + the hostedByMap Path + + + outputPath + the Path of the sequence file action set + + + + + + + inputPathCrossref + the Crossref input path + + + crossrefTimestamp + Timestamp for the Crossref incremental Harvesting + + + + + inputPathMAG + the MAG working path + + + + + + inputPathUnpayWall + the UnpayWall working path + + + + + inputPathOrcid + the ORCID working path + + + + + + + + + + + + ${wf:conf('resumeFrom') eq 'ConvertCrossrefToOAF'} + ${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'} + ${wf:conf('resumeFrom') eq 'PreprocessMag'} + ${wf:conf('resumeFrom') eq 'PreprocessUW'} + ${wf:conf('resumeFrom') eq 'PreprocessORCID'} + ${wf:conf('resumeFrom') eq 'CreateDOIBoost'} + ${wf:conf('resumeFrom') eq 'GenerateActionSet'} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.doiboost.crossref.CrossrefImporter + -t${inputPathCrossref}/index_update + -n${nameNode} + -ts${timestamp} + + + + + + + + + + + yarn-cluster + cluster + GenerateCrossrefDataset + eu.dnetlib.doiboost.crossref.CrossrefDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + ${sparkExtraOPT} + + --workingPath${inputPathCrossref} + --masteryarn-cluster + + + + + + + + + + + + + + + + + + yarn-cluster + cluster + ConvertCrossrefToOAF + eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + ${sparkExtraOPT} + + --sourcePath${inputPathCrossref}/crossref_ds + --targetPath${workingPath} + --masteryarn-cluster + + + + + + + + + + + + + + + + + + + + + yarn-cluster + cluster + Convert Mag to Dataset + eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} + + --sourcePath${inputPathMAG}/input + --targetPath${inputPathMAG}/dataset + --masteryarn-cluster + + + + + + + + + + yarn-cluster + cluster + Convert Mag to OAF Dataset + eu.dnetlib.doiboost.mag.SparkPreProcessMAG + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + ${sparkExtraOPT} + + --sourcePath${inputPathMAG}/dataset + --workingPath${inputPathMAG}/process + --targetPath${workingPath} + --masteryarn-cluster + + + + + + + + + + yarn-cluster + cluster + Convert UnpayWall to Dataset + eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + ${sparkExtraOPT} + + --sourcePath${inputPathUnpayWall}/uw_extracted + --targetPath${workingPath} + --masteryarn-cluster + + + + + + + + + + + yarn-cluster + cluster + Convert ORCID to Dataset + eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + ${sparkExtraOPT} + + --sourcePath${inputPathOrcid} + --targetPath${workingPath} + --masteryarn-cluster + + + + + + + + + yarn-cluster + cluster + Create DOIBoost Infospace + eu.dnetlib.doiboost.SparkGenerateDoiBoost + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + ${sparkExtraOPT} + + --hostedByMapPath${hostedByMapPath} + --affiliationPath${inputPathMAG}/process/Affiliations + --paperAffiliationPath${inputPathMAG}/process/PaperAuthorAffiliations + --workingDirPath${workingPath} + --masteryarn-cluster + + + + + + + + + yarn-cluster + cluster + Generate DOIBoost ActionSet + eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorIntersectionMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + ${sparkExtraOPT} + + --dbPublicationPath${workingPath}/doiBoostPublicationFiltered + --dbDatasetPath${workingPath}/crossrefDataset + --crossRefRelation${workingPath}/crossrefRelation + --dbaffiliationRelationPath${workingPath}/doiBoostPublicationAffiliation + -do${workingPath}/doiBoostOrganization + --targetPath${workingPath}/actionDataSet + --sFilePath${outputPath} + --masteryarn-cluster + + + + + + + \ No newline at end of file