From 3d8e2aa1468c3b112f49f30ae8532522ef67470c Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 14 Jul 2021 14:37:06 +0200 Subject: [PATCH] Code refactor: - removed old workflows in doiboost - splitted workflow of doiboost in preprocess and process --- .../orcid/SparkConvertORCIDToOAF.scala | 17 +- .../doiboost/orcid/SparkPreprocessORCID.scala | 2 +- .../doiboost/crossref/oozie_app/workflow.xml | 101 -------- .../intersection/oozie_app/config-default.xml | 38 --- .../intersection/oozie_app/workflow.xml | 96 -------- .../doiboost/mag/oozie_app/config-default.xml | 42 ---- .../dhp/doiboost/mag/oozie_app/workflow.xml | 92 -------- .../oozie_app/config-default.xml | 0 .../preprocess/oozie_app/workflow.xml | 216 ++++++++++++++++++ .../oozie_app/config-default.xml | 0 .../{ => process}/oozie_app/workflow.xml | 206 +---------------- .../unpaywall/oozie_app/config-default.xml | 38 --- .../doiboost/unpaywall/oozie_app/workflow.xml | 55 ----- .../dnetlib/doiboost/mag/MAGMappingTest.scala | 49 ++-- .../orcid/MappingORCIDToOAFTest.scala | 6 +- 15 files changed, 264 insertions(+), 694 deletions(-) delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml rename dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/{crossref => preprocess}/oozie_app/config-default.xml (100%) create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml rename dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/{ => process}/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/{ => process}/oozie_app/workflow.xml (54%) delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala index 9117bcb347..fa4a93e008 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala @@ -10,6 +10,16 @@ import org.slf4j.{Logger, LoggerFactory} object SparkConvertORCIDToOAF { val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) + + def run(spark:SparkSession, workingPath:String, targetPath:String) :Unit = { + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] + import spark.implicits._ + val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] + + logger.info("Converting ORCID to OAF") + dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath) + } + def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json"))) @@ -21,16 +31,11 @@ object SparkConvertORCIDToOAF { .appName(getClass.getSimpleName) .master(parser.get("master")).getOrCreate() - implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] - import spark.implicits._ val workingPath = parser.get("workingPath") val targetPath = parser.get("targetPath") - val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] - - logger.info("Converting ORCID to OAF") - dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath) + run(spark,workingPath, targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala index d6911cfa78..31f3319122 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala @@ -50,7 +50,7 @@ object SparkPreprocessORCID { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml deleted file mode 100644 index 63c2e9ef29..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml +++ /dev/null @@ -1,101 +0,0 @@ - - - - workingPath - the working dir base path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - timestamp - Timestamp for incremental Harvesting - - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.doiboost.crossref.CrossrefImporter - -t${workingPath}/input/crossref/index_update - -n${nameNode} - -ts${timestamp} - - - - - - - - yarn-cluster - cluster - ExtractCrossrefToOAF - eu.dnetlib.doiboost.crossref.CrossrefDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --workingPath/data/doiboost/input/crossref - --masteryarn-cluster - - - - - - - - - - - - - - - - - - yarn-cluster - cluster - ConvertCrossrefToOAF - eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --sourcePath${workingPath}/input/crossref/crossref_ds - --targetPath${workingPath}/process/ - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml deleted file mode 100644 index cf617a84c4..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/config-default.xml +++ /dev/null @@ -1,38 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 - - - spark2EventLogDir - /user/spark/spark2ApplicationHistory - - - spark2ExtraListeners - "com.cloudera.spark.lineage.NavigatorAppListener" - - - spark2SqlQueryExecutionListeners - "com.cloudera.spark.lineage.NavigatorQueryListener" - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml deleted file mode 100644 index dcde62c9d9..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml +++ /dev/null @@ -1,96 +0,0 @@ - - - - hostedByMapPath - the Hosted By Map Path - - - affiliationPath - the Affliation Path - - - paperAffiliationPath - the paperAffiliation Path - - - workingDirPath - the Working Path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - yarn-cluster - cluster - Create DOIBoost Infospace - eu.dnetlib.doiboost.SparkGenerateDoiBoost - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --hostedByMapPath${hostedByMapPath} - --affiliationPath${affiliationPath} - --paperAffiliationPath${paperAffiliationPath} - --workingDirPath${workingDirPath} - --masteryarn-cluster - - - - - - - - - yarn-cluster - cluster - Generate DOIBoost ActionSet - eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --dbPublicationPath${workingDirPath}/doiBoostPublicationFiltered - --dbDatasetPath${workingDirPath}/crossrefDataset - --crossRefRelation${workingDirPath}/crossrefRelation - --dbaffiliationRelationPath${workingDirPath}/doiBoostPublicationAffiliation - -do${workingDirPath}/doiBoostOrganization - --targetPath${workingDirPath}/actionDataSet - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml deleted file mode 100644 index 59e5c059fc..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - oozie.wf.rerun.failnodes - false - - - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 - - - spark2EventLogDir - /user/spark/spark2ApplicationHistory - - - spark2ExtraListeners - "com.cloudera.spark.lineage.NavigatorAppListener" - - - spark2SqlQueryExecutionListeners - "com.cloudera.spark.lineage.NavigatorQueryListener" - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml deleted file mode 100644 index 9d19dddc71..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml +++ /dev/null @@ -1,92 +0,0 @@ - - - - sourcePath - the working dir base path - - - targetPath - the working dir base path - - - workingPath - the working dir base path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - yarn-cluster - cluster - Convert Mag to Dataset - eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - ${sparkExtraOPT} - - --sourcePath${sourcePath} - --targetPath${workingPath} - --masteryarn-cluster - - - - - - - - - - yarn-cluster - cluster - Convert Mag to OAF Dataset - eu.dnetlib.doiboost.mag.SparkPreProcessMAG - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --sourcePath${workingPath} - --workingPath${workingPath}/process - --targetPath${targetPath} - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml rename to dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml new file mode 100644 index 0000000000..03f7b7566c --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml @@ -0,0 +1,216 @@ + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + inputPathCrossref + the Crossref input path + + + crossrefDumpPath + the Crossref dump path + + + + + MAGDumpPath + the MAG dump working path + + + + inputPathMAG + the MAG working path + + + + + + inputPathOrcid + the ORCID input path + + + + workingPathOrcid + the ORCID working path + + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + ${wf:conf('resumeFrom') eq 'UnpackCrossrefEntries'} + ${wf:conf('resumeFrom') eq 'GenerateCrossrefDataset'} + ${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'} + ${wf:conf('resumeFrom') eq 'ConvertMagToDataset'} + ${wf:conf('resumeFrom') eq 'PreProcessORCID'} + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords + --hdfsServerUri${nameNode} + --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz + --workingPath${crossrefDumpPath} + --outputPath${crossrefDumpPath}/files/ + + + + + + + + yarn-cluster + cluster + SparkUnpackCrossrefEntries + eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn-cluster + --sourcePath${crossrefDumpPath}/files + --targetPath${crossrefDumpPath}/crossref_unpack/ + + + + + + + + + yarn-cluster + cluster + SparkGenerateCrossrefDataset + eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=7G + --executor-cores=2 + --driver-memory=7G + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn-cluster + --sourcePath${crossrefDumpPath}/crossref_unpack/ + --targetPath${inputPathCrossref}/crossref_ds + + + + + + + + + + + + + + + + + + + + + + + + + + + + yarn-cluster + cluster + Convert Mag to Dataset + eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${MAGDumpPath} + --targetPath${inputPathMAG}/dataset + --masteryarn-cluster + + + + + + + + + yarn-cluster + cluster + Convert ORCID to Dataset + eu.dnetlib.doiboost.orcid.SparkPreprocessORCID + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${inputPathOrcid} + --workingPath${workingPathOrcid} + --masteryarn-cluster + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/config-default.xml rename to dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml similarity index 54% rename from dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml rename to dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml index 34b4b5c5e3..f845d97f30 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + sparkDriverMemory @@ -17,8 +17,6 @@ sparkExecutorCores number of cores used by single executor - - workingPath @@ -40,29 +38,8 @@ inputPathCrossref the Crossref input path - - crossrefDumpPath - the Crossref dump path - - - - - - - - - - - - - - - MAGDumpPath - the MAG dump working path - - inputPathMAG the MAG working path @@ -76,11 +53,6 @@ - - inputPathOrcid - the ORCID input path - - workingPathOrcid the ORCID working path @@ -103,15 +75,12 @@ - ${wf:conf('resumeFrom') eq 'ConvertCrossrefToOAF'} - ${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'} ${wf:conf('resumeFrom') eq 'PreprocessMag'} ${wf:conf('resumeFrom') eq 'PreprocessUW'} - ${wf:conf('resumeFrom') eq 'PreprocessORCID'} + ${wf:conf('resumeFrom') eq 'ProcessORCID'} ${wf:conf('resumeFrom') eq 'CreateDOIBoost'} ${wf:conf('resumeFrom') eq 'GenerateActionSet'} - ${wf:conf('resumeFrom') eq 'GenerateCrossrefDataset'} - + @@ -119,170 +88,6 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords - --hdfsServerUri${nameNode} - --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz - --workingPath${crossrefDumpPath} - --outputPath${crossrefDumpPath}/files/ - - - - - - - - yarn-cluster - cluster - SparkUnpackCrossrefEntries - eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn-cluster - --sourcePath${crossrefDumpPath}/files - --targetPath${crossrefDumpPath}/crossref_unpack/ - - - - - - - - - yarn-cluster - cluster - SparkGenerateCrossrefDataset - eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=7G - --executor-cores=2 - --driver-memory=7G - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn-cluster - --sourcePath${crossrefDumpPath}/crossref_unpack/ - --targetPath${inputPathCrossref}/crossref_ds - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - yarn-cluster - cluster - Convert Mag to Dataset - eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${MAGDumpPath} - --targetPath${inputPathMAG}/dataset - --masteryarn-cluster - - - - - - yarn-cluster @@ -326,7 +131,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathMAG}/dataset - --workingPath${inputPathMAG}/process + --workingPath${inputPathMAG}/process_p --targetPath${workingPath} --masteryarn-cluster @@ -368,7 +173,7 @@ yarn-cluster cluster Convert ORCID to Dataset - eu.dnetlib.doiboost.orcid.SparkPreprocessORCID + eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF dhp-doiboost-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -380,7 +185,6 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePath${inputPathOrcid} --workingPath${workingPathOrcid} --targetPath${workingPath}/orcidPublication --masteryarn-cluster diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml deleted file mode 100644 index cf617a84c4..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/config-default.xml +++ /dev/null @@ -1,38 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 - - - spark2EventLogDir - /user/spark/spark2ApplicationHistory - - - spark2ExtraListeners - "com.cloudera.spark.lineage.NavigatorAppListener" - - - spark2SqlQueryExecutionListeners - "com.cloudera.spark.lineage.NavigatorQueryListener" - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml deleted file mode 100644 index d2a69752ea..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/unpaywall/oozie_app/workflow.xml +++ /dev/null @@ -1,55 +0,0 @@ - - - - sourcePath - the working dir base path - - - targetPath - the working dir base path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - yarn-cluster - cluster - Convert UnpayWall to Dataset - eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --sourcePath${sourcePath}/uw_extracted - --targetPath${targetPath} - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala index 7eb50665e2..46d4ec08d6 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala @@ -1,22 +1,15 @@ package eu.dnetlib.doiboost.mag -import java.sql.Timestamp - -import eu.dnetlib.dhp.schema.oaf.Publication -import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.api.java.function.MapFunction -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} -import org.junit.jupiter.api.Test -import org.slf4j.{Logger, LoggerFactory} +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Dataset, SparkSession} +import org.codehaus.jackson.map.ObjectMapper import org.junit.jupiter.api.Assertions._ -import org.apache.spark.sql.functions._ +import org.junit.jupiter.api.Test +import org.json4s.DefaultFormats +import org.slf4j.{Logger, LoggerFactory} -import scala.collection.JavaConverters._ +import java.sql.Timestamp import scala.io.Source -import scala.reflect.ClassTag -import scala.util.matching.Regex @@ -65,14 +58,19 @@ class MAGMappingTest { @Test def normalizeDoiTest():Unit = { - import org.json4s.jackson.Serialization.write - import org.json4s.DefaultFormats + implicit val formats = DefaultFormats - val conf = new SparkConf().setAppName("test").setMaster("local[2]") - val sc = new SparkContext(conf) - val spark = SparkSession.builder.config(sc.getConf).getOrCreate() + val conf = new SparkConf() + conf.setMaster("local[*]") + conf.set("spark.driver.host", "localhost") + val spark: SparkSession = + SparkSession + .builder() + .appName(getClass.getSimpleName) + .config(conf) + .getOrCreate() val path = getClass.getResource("magPapers.json").getPath import org.apache.spark.sql.Encoders @@ -90,14 +88,19 @@ class MAGMappingTest { @Test def normalizeDoiTest2():Unit = { - import org.json4s.jackson.Serialization.write import org.json4s.DefaultFormats implicit val formats = DefaultFormats - val conf = new SparkConf().setAppName("test").setMaster("local[2]") - val sc = new SparkContext(conf) - val spark = SparkSession.builder.config(sc.getConf).getOrCreate() + val conf = new SparkConf() + conf.setMaster("local[*]") + conf.set("spark.driver.host", "localhost") + val spark: SparkSession = + SparkSession + .builder() + .appName(getClass.getSimpleName) + .config(conf) + .getOrCreate() val path = getClass.getResource("duplicatedMagPapers.json").getPath import org.apache.spark.sql.Encoders diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala index 076a01526f..b484dc0878 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala @@ -48,6 +48,8 @@ class MappingORCIDToOAFTest { SparkPreprocessORCID.run( spark,sourcePath, workingPath) + SparkConvertORCIDToOAF.run(spark, workingPath,targetPath) + val mapper = new ObjectMapper() @@ -61,6 +63,8 @@ class MappingORCIDToOAFTest { assertTrue(oA == p.count()) println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(p.first())) + spark.close() + } @@ -78,7 +82,7 @@ class MappingORCIDToOAFTest { val oaf = ORCIDToOAF.convertTOOAF(orcid) assert(oaf.getPid.size() == 1) oaf.getPid.toList.foreach(pid => assert(pid.getQualifier.getClassid.equals("doi"))) - oaf.getPid.toList.foreach(pid => assert(pid.getValue.equals("10.1042/BCJ20160876".toLowerCase()))) + oaf.getPid.toList.foreach(pid => assert(pid.getValue.equals("10.1042/BCJ20160876"))) //println(mapper.writeValueAsString(ORCIDToOAF.convertTOOAF(orcid)))