From 7dc824fc237ceaebe3d310a5057228ab114203dd Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 7 May 2021 12:53:50 +0200 Subject: [PATCH] imported changes in stable_id into master --- .../orcid/SparkConvertORCIDToOAF.scala | 27 +- .../doiboost/orcid/SparkPreprocessORCID.scala | 56 ++++ .../doiboost/convert_map_to_oaf_params.json | 6 +- .../doiboost/crossref/oozie_app/workflow.xml | 101 ------- .../intersection/oozie_app/config-default.xml | 38 --- .../intersection/oozie_app/workflow.xml | 96 ------- .../dhp/doiboost/mag/oozie_app/workflow.xml | 92 ------ .../oozie_app/config-default.xml | 0 .../preprocess/oozie_app/workflow.xml | 194 +++++++++++++ .../dhp/doiboost/preprocess_orcid_params.json | 6 + .../oozie_app/config-default.xml | 4 +- .../doiboost/process/oozie_app/workflow.xml | 262 ++++++++++++++++++ .../unpaywall/oozie_app/config-default.xml | 38 --- .../doiboost/unpaywall/oozie_app/workflow.xml | 55 ---- 14 files changed, 526 insertions(+), 449 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml rename dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/{crossref => preprocess}/oozie_app/config-default.xml (100%) create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json rename dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/{mag => process}/oozie_app/config-default.xml (91%) create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala index 025d68b90..6109322ae 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala @@ -12,31 +12,10 @@ import org.slf4j.{Logger, LoggerFactory} object SparkConvertORCIDToOAF { val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) - def run(spark:SparkSession,sourcePath:String,workingPath:String, targetPath:String):Unit = { + def run(spark:SparkSession,workingPath:String, targetPath:String):Unit = { import spark.implicits._ implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] - val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s)) - - spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author") - - val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null) - - spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works") - - val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor] - - val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork] - - works.joinWith(authors, authors("oid").equalTo(works("oid"))) - .map(i =>{ - val doi = i._1.doi - val author = i._2 - (doi, author) - }).groupBy(col("_1").alias("doi")) - .agg(collect_list(col("_2")).alias("authors")) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor") - val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] logger.info("Converting ORCID to OAF") @@ -55,10 +34,10 @@ object SparkConvertORCIDToOAF { .master(parser.get("master")).getOrCreate() - val sourcePath = parser.get("sourcePath") + val workingPath = parser.get("workingPath") val targetPath = parser.get("targetPath") - run(spark, sourcePath, workingPath, targetPath) + run(spark, workingPath, targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala new file mode 100644 index 000000000..71b2231d7 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala @@ -0,0 +1,56 @@ +package eu.dnetlib.doiboost.orcid + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Publication +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.functions.{col, collect_list} +import org.slf4j.{Logger, LoggerFactory} + +class SparkPreprocessORCID { + val logger: Logger = LoggerFactory.getLogger(getClass) + + def run(spark:SparkSession,sourcePath:String,workingPath:String):Unit = { + import spark.implicits._ + + val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s)) + + spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author") + + val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null) + + spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works") + + val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor] + + val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork] + + works.joinWith(authors, authors("oid").equalTo(works("oid"))) + .map(i =>{ + val doi = i._1.doi + val author = i._2 + (doi, author) + }).groupBy(col("_1").alias("doi")) + .agg(collect_list(col("_2")).alias("authors")) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor") + } + + def main(args: Array[String]): Unit = { + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sourcePath = parser.get("sourcePath") + val workingPath = parser.get("workingPath") + run(spark, sourcePath, workingPath) + } + +} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json index 312bd0751..c97231fdd 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json @@ -1,6 +1,6 @@ [ - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true}, - {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working dir path", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target dir path", "paramRequired": true}, + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml deleted file mode 100644 index 63c2e9ef2..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml +++ /dev/null @@ -1,101 +0,0 @@ - - - - workingPath - the working dir base path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - timestamp - Timestamp for incremental Harvesting - - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.doiboost.crossref.CrossrefImporter - -t${workingPath}/input/crossref/index_update - -n${nameNode} - -ts${timestamp} - - - - - - - - yarn-cluster - cluster - ExtractCrossrefToOAF - eu.dnetlib.doiboost.crossref.CrossrefDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --workingPath/data/doiboost/input/crossref - --masteryarn-cluster - - - - - - - - - - - - - - - - - - yarn-cluster - cluster - ConvertCrossrefToOAF - eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --sourcePath${workingPath}/input/crossref/crossref_ds - --targetPath${workingPath}/process/ - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml deleted file mode 100644 index cf617a84c..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml +++ /dev/null @@ -1,38 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 - - - spark2EventLogDir - /user/spark/spark2ApplicationHistory - - - spark2ExtraListeners - "com.cloudera.spark.lineage.NavigatorAppListener" - - - spark2SqlQueryExecutionListeners - "com.cloudera.spark.lineage.NavigatorQueryListener" - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml deleted file mode 100644 index dcde62c9d..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml +++ /dev/null @@ -1,96 +0,0 @@ - - - - hostedByMapPath - the Hosted By Map Path - - - affiliationPath - the Affliation Path - - - paperAffiliationPath - the paperAffiliation Path - - - workingDirPath - the Working Path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - yarn-cluster - cluster - Create DOIBoost Infospace - eu.dnetlib.doiboost.SparkGenerateDoiBoost - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --hostedByMapPath${hostedByMapPath} - --affiliationPath${affiliationPath} - --paperAffiliationPath${paperAffiliationPath} - --workingDirPath${workingDirPath} - --masteryarn-cluster - - - - - - - - - yarn-cluster - cluster - Generate DOIBoost ActionSet - eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --dbPublicationPath${workingDirPath}/doiBoostPublicationFiltered - --dbDatasetPath${workingDirPath}/crossrefDataset - --crossRefRelation${workingDirPath}/crossrefRelation - --dbaffiliationRelationPath${workingDirPath}/doiBoostPublicationAffiliation - -do${workingDirPath}/doiBoostOrganization - --targetPath${workingDirPath}/actionDataSet - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml deleted file mode 100644 index 9d19dddc7..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml +++ /dev/null @@ -1,92 +0,0 @@ - - - - sourcePath - the working dir base path - - - targetPath - the working dir base path - - - workingPath - the working dir base path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - yarn-cluster - cluster - Convert Mag to Dataset - eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - ${sparkExtraOPT} - - --sourcePath${sourcePath} - --targetPath${workingPath} - --masteryarn-cluster - - - - - - - - - - yarn-cluster - cluster - Convert Mag to OAF Dataset - eu.dnetlib.doiboost.mag.SparkPreProcessMAG - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --sourcePath${workingPath} - --workingPath${workingPath}/process - --targetPath${targetPath} - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml rename to dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml new file mode 100644 index 000000000..09feecf3a --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml @@ -0,0 +1,194 @@ + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorIntersectionMemory + memory for individual executor + + + + sparkExecutorCores + number of cores used by single executor + + + + inputPathCrossref + the Crossref input path + + + crossrefTimestamp + Timestamp for the Crossref incremental Harvesting + + + esServer + elasticsearch server url for the Crossref Harvesting + + + esIndex + elasticsearch index name for the Crossref Harvesting + + + + MAGDumpPath + the MAG dump working path + + + + inputPathMAG + the MAG working path + + + + inputPathOrcid + the ORCID input path + + + + workingPathOrcid + the ORCID working path + + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + eu.dnetlib.doiboost.crossref.CrossrefImporter + --targetPath${inputPathCrossref}/index_update + --namenode${nameNode} + --esServer${esServer} + --esIndex${esIndex} + --timestamp${crossrefTimestamp} + + + + + + + + + + + yarn-cluster + cluster + GenerateCrossrefDataset + eu.dnetlib.doiboost.crossref.CrossrefDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --workingPath${inputPathCrossref} + --masteryarn-cluster + + + + + + + + + + + + + + + + + + + + + + + + + + + + + yarn-cluster + cluster + Convert Mag to Dataset + eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${MAGDumpPath} + --targetPath${inputPathMAG}/dataset + --masteryarn-cluster + + + + + + + + + + + yarn-cluster + cluster + Convert ORCID to Dataset + eu.dnetlib.doiboost.orcid.SparkPreprocessORCID + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${inputPathOrcid} + --workingPath${workingPathOrcid} + --masteryarn-cluster + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json new file mode 100644 index 000000000..08444d732 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working dir path", "paramRequired": true}, + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/config-default.xml similarity index 91% rename from dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml rename to dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/config-default.xml index 59e5c059f..508202e30 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/config-default.xml @@ -16,8 +16,8 @@ spark2 - oozie.wf.rerun.failnodes - false + oozie.launcher.mapreduce.user.classpath.first + true hive_metastore_uris diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml new file mode 100644 index 000000000..e5e29323e --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml @@ -0,0 +1,262 @@ + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorIntersectionMemory + memory for individual executor + + + + sparkExecutorCores + number of cores used by single executor + + + + workingPath + the working Path + + + + hostedByMapPath + the hostedByMap Path + + + outputPath + the Path of the sequence file action set + + + + + + inputPathCrossref + the Crossref input path + + + + + inputPathMAG + the MAG working path + + + + + + inputPathUnpayWall + the UnpayWall working path + + + + + inputPathOrcid + the ORCID input path + + + + workingPathOrcid + the ORCID working path + + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + ${wf:conf('resumeFrom') eq 'PreprocessMag'} + ${wf:conf('resumeFrom') eq 'PreprocessUW'} + ${wf:conf('resumeFrom') eq 'PreprocessORCID'} + ${wf:conf('resumeFrom') eq 'CreateDOIBoost'} + ${wf:conf('resumeFrom') eq 'GenerateActionSet'} + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn-cluster + cluster + ConvertCrossrefToOAF + eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${inputPathCrossref}/crossref_ds + --targetPath${workingPath} + --masteryarn-cluster + + + + + + + + yarn-cluster + cluster + Convert Mag to OAF Dataset + eu.dnetlib.doiboost.mag.SparkProcessMAG + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorIntersectionMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${inputPathMAG}/dataset + --workingPath${inputPathMAG}/process_p + --targetPath${workingPath} + --masteryarn-cluster + + + + + + + + + + yarn-cluster + cluster + Convert UnpayWall to Dataset + eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${inputPathUnpayWall}/uw_extracted + --targetPath${workingPath}/uwPublication + --masteryarn-cluster + + + + + + + + + yarn-cluster + cluster + Convert ORCID to Dataset + eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --workingPath${workingPathOrcid} + --targetPath${workingPath}/orcidPublication + --masteryarn-cluster + + + + + + + + + yarn-cluster + cluster + Create DOIBoost Infospace + eu.dnetlib.doiboost.SparkGenerateDoiBoost + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorIntersectionMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hostedByMapPath${hostedByMapPath} + --affiliationPath${inputPathMAG}/dataset/Affiliations + --paperAffiliationPath${inputPathMAG}/dataset/PaperAuthorAffiliations + --workingPath${workingPath} + --masteryarn-cluster + + + + + + + + + yarn-cluster + cluster + Generate DOIBoost ActionSet + eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --dbPublicationPath${workingPath}/doiBoostPublicationFiltered + --dbDatasetPath${workingPath}/crossrefDataset + --crossRefRelation${workingPath}/crossrefRelation + --dbaffiliationRelationPath${workingPath}/doiBoostPublicationAffiliation + --dbOrganizationPath${workingPath}/doiBoostOrganization + --targetPath${workingPath}/actionDataSet + --sFilePath${outputPath} + --masteryarn-cluster + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml deleted file mode 100644 index cf617a84c..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml +++ /dev/null @@ -1,38 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 - - - spark2EventLogDir - /user/spark/spark2ApplicationHistory - - - spark2ExtraListeners - "com.cloudera.spark.lineage.NavigatorAppListener" - - - spark2SqlQueryExecutionListeners - "com.cloudera.spark.lineage.NavigatorQueryListener" - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml deleted file mode 100644 index d2a69752e..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml +++ /dev/null @@ -1,55 +0,0 @@ - - - - sourcePath - the working dir base path - - - targetPath - the working dir base path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - yarn-cluster - cluster - Convert UnpayWall to Dataset - eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --sourcePath${sourcePath}/uw_extracted - --targetPath${targetPath} - --masteryarn-cluster - - - - - - - \ No newline at end of file