From 10068c00ea1dc757262b4bbe2307fcb1b0e3fd30 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 14 Jul 2021 14:37:06 +0200 Subject: [PATCH] Code refactor: - removed old workflows in doiboost - splitted workflow of doiboost in preprocess and process --- .../orcid/SparkConvertORCIDToOAF.scala | 17 +- .../doiboost/orcid/SparkPreprocessORCID.scala | 2 +- .../doiboost/crossref/oozie_app/workflow.xml | 101 ---- .../intersection/oozie_app/config-default.xml | 38 -- .../intersection/oozie_app/workflow.xml | 96 ---- .../doiboost/mag/oozie_app/config-default.xml | 42 -- .../dhp/doiboost/mag/oozie_app/workflow.xml | 92 ---- .../dhp/doiboost/oozie_app/config-default.xml | 42 -- .../dhp/doiboost/oozie_app/workflow.xml | 453 ------------------ .../oozie_app/config-default.xml | 0 .../preprocess/oozie_app/workflow.xml | 216 +++++++++ .../unpaywall/oozie_app/config-default.xml | 38 -- .../doiboost/unpaywall/oozie_app/workflow.xml | 55 --- .../dnetlib/doiboost/mag/MAGMappingTest.scala | 25 +- .../orcid/MappingORCIDToOAFTest.scala | 4 + 15 files changed, 240 insertions(+), 981 deletions(-) delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml rename dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/{crossref => preprocess}/oozie_app/config-default.xml (100%) create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala index 9117bcb34..fa4a93e00 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala @@ -10,6 +10,16 @@ import org.slf4j.{Logger, LoggerFactory} object SparkConvertORCIDToOAF { val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) + + def run(spark:SparkSession, workingPath:String, targetPath:String) :Unit = { + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] + import spark.implicits._ + val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] + + logger.info("Converting ORCID to OAF") + dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath) + } + def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json"))) @@ -21,16 +31,11 @@ object SparkConvertORCIDToOAF { .appName(getClass.getSimpleName) .master(parser.get("master")).getOrCreate() - implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] - import spark.implicits._ val workingPath = parser.get("workingPath") val targetPath = parser.get("targetPath") - val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] - - logger.info("Converting ORCID to OAF") - dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath) + run(spark,workingPath, targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala index d6911cfa7..31f331912 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala @@ -50,7 +50,7 @@ object SparkPreprocessORCID { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml deleted file mode 100644 index 63c2e9ef2..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml +++ /dev/null @@ -1,101 +0,0 @@ - - - - workingPath - the working dir base path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - timestamp - Timestamp for incremental Harvesting - - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.doiboost.crossref.CrossrefImporter - -t${workingPath}/input/crossref/index_update - -n${nameNode} - -ts${timestamp} - - - - - - - - yarn-cluster - cluster - ExtractCrossrefToOAF - eu.dnetlib.doiboost.crossref.CrossrefDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --workingPath/data/doiboost/input/crossref - --masteryarn-cluster - - - - - - - - - - - - - - - - - - yarn-cluster - cluster - ConvertCrossrefToOAF - eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --sourcePath${workingPath}/input/crossref/crossref_ds - --targetPath${workingPath}/process/ - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml deleted file mode 100644 index cf617a84c..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml +++ /dev/null @@ -1,38 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 - - - spark2EventLogDir - /user/spark/spark2ApplicationHistory - - - spark2ExtraListeners - "com.cloudera.spark.lineage.NavigatorAppListener" - - - spark2SqlQueryExecutionListeners - "com.cloudera.spark.lineage.NavigatorQueryListener" - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml deleted file mode 100644 index dcde62c9d..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml +++ /dev/null @@ -1,96 +0,0 @@ - - - - hostedByMapPath - the Hosted By Map Path - - - affiliationPath - the Affliation Path - - - paperAffiliationPath - the paperAffiliation Path - - - workingDirPath - the Working Path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - yarn-cluster - cluster - Create DOIBoost Infospace - eu.dnetlib.doiboost.SparkGenerateDoiBoost - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --hostedByMapPath${hostedByMapPath} - --affiliationPath${affiliationPath} - --paperAffiliationPath${paperAffiliationPath} - --workingDirPath${workingDirPath} - --masteryarn-cluster - - - - - - - - - yarn-cluster - cluster - Generate DOIBoost ActionSet - eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --dbPublicationPath${workingDirPath}/doiBoostPublicationFiltered - --dbDatasetPath${workingDirPath}/crossrefDataset - --crossRefRelation${workingDirPath}/crossrefRelation - --dbaffiliationRelationPath${workingDirPath}/doiBoostPublicationAffiliation - -do${workingDirPath}/doiBoostOrganization - --targetPath${workingDirPath}/actionDataSet - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml deleted file mode 100644 index 59e5c059f..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - oozie.wf.rerun.failnodes - false - - - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 - - - spark2EventLogDir - /user/spark/spark2ApplicationHistory - - - spark2ExtraListeners - "com.cloudera.spark.lineage.NavigatorAppListener" - - - spark2SqlQueryExecutionListeners - "com.cloudera.spark.lineage.NavigatorQueryListener" - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml deleted file mode 100644 index 9d19dddc7..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml +++ /dev/null @@ -1,92 +0,0 @@ - - - - sourcePath - the working dir base path - - - targetPath - the working dir base path - - - workingPath - the working dir base path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - yarn-cluster - cluster - Convert Mag to Dataset - eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - ${sparkExtraOPT} - - --sourcePath${sourcePath} - --targetPath${workingPath} - --masteryarn-cluster - - - - - - - - - - yarn-cluster - cluster - Convert Mag to OAF Dataset - eu.dnetlib.doiboost.mag.SparkPreProcessMAG - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --sourcePath${workingPath} - --workingPath${workingPath}/process - --targetPath${targetPath} - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/config-default.xml deleted file mode 100644 index 508202e30..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/config-default.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - oozie.launcher.mapreduce.user.classpath.first - true - - - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 - - - spark2EventLogDir - /user/spark/spark2ApplicationHistory - - - spark2ExtraListeners - "com.cloudera.spark.lineage.NavigatorAppListener" - - - spark2SqlQueryExecutionListeners - "com.cloudera.spark.lineage.NavigatorQueryListener" - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml deleted file mode 100644 index 34b4b5c5e..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ /dev/null @@ -1,453 +0,0 @@ - - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorIntersectionMemory - memory for individual executor - - - - sparkExecutorCores - number of cores used by single executor - - - - - - workingPath - the working Path - - - - hostedByMapPath - the hostedByMap Path - - - outputPath - the Path of the sequence file action set - - - - - - inputPathCrossref - the Crossref input path - - - crossrefDumpPath - the Crossref dump path - - - - - - - - - - - - - - - - - MAGDumpPath - the MAG dump working path - - - - inputPathMAG - the MAG working path - - - - - - inputPathUnpayWall - the UnpayWall working path - - - - - inputPathOrcid - the ORCID input path - - - - workingPathOrcid - the ORCID working path - - - - - - ${jobTracker} - ${nameNode} - - - oozie.action.sharelib.for.spark - ${oozieActionShareLibForSpark2} - - - - - - - - - ${wf:conf('resumeFrom') eq 'ConvertCrossrefToOAF'} - ${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'} - ${wf:conf('resumeFrom') eq 'PreprocessMag'} - ${wf:conf('resumeFrom') eq 'PreprocessUW'} - ${wf:conf('resumeFrom') eq 'PreprocessORCID'} - ${wf:conf('resumeFrom') eq 'CreateDOIBoost'} - ${wf:conf('resumeFrom') eq 'GenerateActionSet'} - ${wf:conf('resumeFrom') eq 'GenerateCrossrefDataset'} - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords - --hdfsServerUri${nameNode} - --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz - --workingPath${crossrefDumpPath} - --outputPath${crossrefDumpPath}/files/ - - - - - - - - yarn-cluster - cluster - SparkUnpackCrossrefEntries - eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn-cluster - --sourcePath${crossrefDumpPath}/files - --targetPath${crossrefDumpPath}/crossref_unpack/ - - - - - - - - - yarn-cluster - cluster - SparkGenerateCrossrefDataset - eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=7G - --executor-cores=2 - --driver-memory=7G - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn-cluster - --sourcePath${crossrefDumpPath}/crossref_unpack/ - --targetPath${inputPathCrossref}/crossref_ds - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - yarn-cluster - cluster - Convert Mag to Dataset - eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${MAGDumpPath} - --targetPath${inputPathMAG}/dataset - --masteryarn-cluster - - - - - - - - - yarn-cluster - cluster - ConvertCrossrefToOAF - eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${inputPathCrossref}/crossref_ds - --targetPath${workingPath} - --masteryarn-cluster - - - - - - - - yarn-cluster - cluster - Convert Mag to OAF Dataset - eu.dnetlib.doiboost.mag.SparkProcessMAG - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorIntersectionMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${inputPathMAG}/dataset - --workingPath${inputPathMAG}/process - --targetPath${workingPath} - --masteryarn-cluster - - - - - - - - - - yarn-cluster - cluster - Convert UnpayWall to Dataset - eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${inputPathUnpayWall}/uw_extracted - --targetPath${workingPath}/uwPublication - --masteryarn-cluster - - - - - - - - - yarn-cluster - cluster - Convert ORCID to Dataset - eu.dnetlib.doiboost.orcid.SparkPreprocessORCID - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${inputPathOrcid} - --workingPath${workingPathOrcid} - --targetPath${workingPath}/orcidPublication - --masteryarn-cluster - - - - - - - - - yarn-cluster - cluster - Create DOIBoost Infospace - eu.dnetlib.doiboost.SparkGenerateDoiBoost - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorIntersectionMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --hostedByMapPath${hostedByMapPath} - --affiliationPath${inputPathMAG}/dataset/Affiliations - --paperAffiliationPath${inputPathMAG}/dataset/PaperAuthorAffiliations - --workingPath${workingPath} - --masteryarn-cluster - - - - - - - - - yarn-cluster - cluster - Generate DOIBoost ActionSet - eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --dbPublicationPath${workingPath}/doiBoostPublicationFiltered - --dbDatasetPath${workingPath}/crossrefDataset - --crossRefRelation${workingPath}/crossrefRelation - --dbaffiliationRelationPath${workingPath}/doiBoostPublicationAffiliation - --dbOrganizationPath${workingPath}/doiBoostOrganization - --targetPath${workingPath}/actionDataSet - --sFilePath${outputPath} - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml rename to dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml new file mode 100644 index 000000000..03f7b7566 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml @@ -0,0 +1,216 @@ + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + inputPathCrossref + the Crossref input path + + + crossrefDumpPath + the Crossref dump path + + + + + MAGDumpPath + the MAG dump working path + + + + inputPathMAG + the MAG working path + + + + + + inputPathOrcid + the ORCID input path + + + + workingPathOrcid + the ORCID working path + + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + ${wf:conf('resumeFrom') eq 'UnpackCrossrefEntries'} + ${wf:conf('resumeFrom') eq 'GenerateCrossrefDataset'} + ${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'} + ${wf:conf('resumeFrom') eq 'ConvertMagToDataset'} + ${wf:conf('resumeFrom') eq 'PreProcessORCID'} + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords + --hdfsServerUri${nameNode} + --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz + --workingPath${crossrefDumpPath} + --outputPath${crossrefDumpPath}/files/ + + + + + + + + yarn-cluster + cluster + SparkUnpackCrossrefEntries + eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn-cluster + --sourcePath${crossrefDumpPath}/files + --targetPath${crossrefDumpPath}/crossref_unpack/ + + + + + + + + + yarn-cluster + cluster + SparkGenerateCrossrefDataset + eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=7G + --executor-cores=2 + --driver-memory=7G + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn-cluster + --sourcePath${crossrefDumpPath}/crossref_unpack/ + --targetPath${inputPathCrossref}/crossref_ds + + + + + + + + + + + + + + + + + + + + + + + + + + + + yarn-cluster + cluster + Convert Mag to Dataset + eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${MAGDumpPath} + --targetPath${inputPathMAG}/dataset + --masteryarn-cluster + + + + + + + + + yarn-cluster + cluster + Convert ORCID to Dataset + eu.dnetlib.doiboost.orcid.SparkPreprocessORCID + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${inputPathOrcid} + --workingPath${workingPathOrcid} + --masteryarn-cluster + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml deleted file mode 100644 index cf617a84c..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml +++ /dev/null @@ -1,38 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 - - - spark2EventLogDir - /user/spark/spark2ApplicationHistory - - - spark2ExtraListeners - "com.cloudera.spark.lineage.NavigatorAppListener" - - - spark2SqlQueryExecutionListeners - "com.cloudera.spark.lineage.NavigatorQueryListener" - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml deleted file mode 100644 index d2a69752e..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml +++ /dev/null @@ -1,55 +0,0 @@ - - - - sourcePath - the working dir base path - - - targetPath - the working dir base path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - yarn-cluster - cluster - Convert UnpayWall to Dataset - eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --sourcePath${sourcePath}/uw_extracted - --targetPath${targetPath} - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala index dbdedcbd0..46d4ec08d 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala @@ -1,22 +1,15 @@ package eu.dnetlib.doiboost.mag -import java.sql.Timestamp - -import eu.dnetlib.dhp.schema.oaf.Publication -import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.api.java.function.MapFunction -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} -import org.junit.jupiter.api.Test -import org.slf4j.{Logger, LoggerFactory} +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Dataset, SparkSession} +import org.codehaus.jackson.map.ObjectMapper import org.junit.jupiter.api.Assertions._ -import org.apache.spark.sql.functions._ +import org.junit.jupiter.api.Test +import org.json4s.DefaultFormats +import org.slf4j.{Logger, LoggerFactory} -import scala.collection.JavaConverters._ +import java.sql.Timestamp import scala.io.Source -import scala.reflect.ClassTag -import scala.util.matching.Regex @@ -65,8 +58,7 @@ class MAGMappingTest { @Test def normalizeDoiTest():Unit = { - import org.json4s.jackson.Serialization.write - import org.json4s.DefaultFormats + implicit val formats = DefaultFormats @@ -96,7 +88,6 @@ class MAGMappingTest { @Test def normalizeDoiTest2():Unit = { - import org.json4s.jackson.Serialization.write import org.json4s.DefaultFormats implicit val formats = DefaultFormats diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala index 5fced8edb..b484dc087 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala @@ -48,6 +48,8 @@ class MappingORCIDToOAFTest { SparkPreprocessORCID.run( spark,sourcePath, workingPath) + SparkConvertORCIDToOAF.run(spark, workingPath,targetPath) + val mapper = new ObjectMapper() @@ -62,6 +64,8 @@ class MappingORCIDToOAFTest { println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(p.first())) spark.close() + + }