From 51a03c0a50bd1a546865892698f840a97479b16e Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 14 Oct 2021 14:23:05 +0200 Subject: [PATCH] refactor code for EBI from dhp-graph-mapper into dhp-aggregation --- .../sx/bio/ebi/SparkDownloadEBILinks.scala | 4 +- .../dhp/sx/bio/ebi/SparkEBILinksToOaf.scala | 4 +- .../sx/bio}/ebi/oozie_app/config-default.xml | 0 .../dhp/sx/bio/ebi/oozie_app/workflow.xml | 105 ++++++++++++++++++ .../dhp/sx/graph/ebi/oozie_app/workflow.xml | 99 ----------------- .../ebi/update/oozie_app/config-default.xml | 68 ------------ .../graph/ebi/update/oozie_app/workflow.xml | 67 ----------- 7 files changed, 109 insertions(+), 238 deletions(-) rename dhp-workflows/{dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph => dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio}/ebi/oozie_app/config-default.xml (100%) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala index 578db1ea9..85fbd99c4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala @@ -66,7 +66,7 @@ object SparkDownloadEBILinks { val log: Logger = LoggerFactory.getLogger(getClass) val MAX_ITEM_PER_PARTITION = 20000 val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/ebi_download_update.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession @@ -86,7 +86,7 @@ object SparkDownloadEBILinks { val workingPath = parser.get("workingPath") log.info(s"workingPath -> $workingPath") - log.info("Getting max pubmedId where the links have been requested") + log.info("Getting max pubmedId where the links have already requested") val links: Dataset[EBILinkItem] = spark.read.load(s"$sourcePath/ebi_links_dataset").as[EBILinkItem] val lastPMIDRequested = links.map(l => l.id).select(max("value")).first.getLong(0) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala index 0db469769..10467884c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala @@ -14,7 +14,7 @@ object SparkEBILinksToOaf { def main(args: Array[String]): Unit = { val log: Logger = LoggerFactory.getLogger(getClass) val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession @@ -31,7 +31,7 @@ object SparkEBILinksToOaf { log.info(s"targetPath -> $targetPath") implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) - val ebLinks: Dataset[EBILinkItem] = spark.read.load(s"${sourcePath}_dataset").as[EBILinkItem].filter(l => l.links != null) + val ebLinks: Dataset[EBILinkItem] = spark.read.load(sourcePath).as[EBILinkItem].filter(l => l.links != null && l.links.startsWith("{")) ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/config-default.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml new file mode 100644 index 000000000..73b1a3b60 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml @@ -0,0 +1,105 @@ + + + + sourcePath + the Working Path + + + workingPath + the Working Path + + + targetPath + the OAF MDStore Path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + resumeFrom + DownloadEBILinks + node to start + + + + + + + + ${wf:conf('resumeFrom') eq 'DownloadEBILinks'} + ${wf:conf('resumeFrom') eq 'CreateEBIDataSet'} + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn-cluster + cluster + Incremental Download EBI Links + eu.dnetllib.dhp.sx.bio.ebi.SparkDownloadEBILinks + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=2000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${sourcePath} + --workingPath${workingPath} + --masteryarn + + + + + + + + + + + + + + + + yarn-cluster + cluster + Create OAF DataSet + eu.dnetllib.dhp.sx.bio.ebi.SparkEBILinksToOaf + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=2000 + ${sparkExtraOPT} + + --sourcePath${sourcePath}/ebi_links_dataset + --targetPath${targetPath} + --masteryarn + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml deleted file mode 100644 index 7612321c0..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml +++ /dev/null @@ -1,99 +0,0 @@ - - - - workingPath - the Working Path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - yarn-cluster - cluster - Create Baselnie DataSet - - eu.dnetlib.dhp.sx.ebi.SparkCreateBaselineDataFrame - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=1 - --driver-memory=${sparkDriverMemory} - --executor-cores=${sparkExecutorCores} - ${sparkExtraOPT} - - --workingPath${workingPath} - --masteryarn - --hdfsServerUri${nameNode} - - - - - - - - yarn-cluster - cluster - Create EBI DataSet - - eu.dnetlib.dhp.sx.ebi.SparkEBILinksToOaf - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=1000 - ${sparkExtraOPT} - - --workingPath${workingPath} - --masteryarn - - - - - - - - yarn-cluster - cluster - Create Baseline DataSet - - eu.dnetlib.dhp.sx.ebi.SparkAddLinkUpdates - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=1 - --driver-memory=${sparkDriverMemory} - --executor-cores=${sparkExecutorCores} - ${sparkExtraOPT} - - --workingPath${workingPath} - --masteryarn - - - - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/config-default.xml deleted file mode 100644 index 17cd6c9a3..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/config-default.xml +++ /dev/null @@ -1,68 +0,0 @@ - - - - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - hive_metastore_uris - thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 - - - - - - - - - - - - - - - - - - - - - - - - - oozie.launcher.mapreduce.user.classpath.first - true - - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - spark2EventLogDir - /user/spark/spark2ApplicationHistory - - - spark2ExtraListeners - "com.cloudera.spark.lineage.NavigatorAppListener" - - - spark2SqlQueryExecutionListeners - "com.cloudera.spark.lineage.NavigatorQueryListener" - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/workflow.xml deleted file mode 100644 index cd3bb8c71..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/workflow.xml +++ /dev/null @@ -1,67 +0,0 @@ - - - - sourcePath - the Working Path - - - workingPath - the Working Path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - yarn-cluster - cluster - Incremental Download EBI Links - eu.dnetlib.dhp.sx.graph.ebi.SparkDownloadEBILinks - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=2000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${sourcePath} - --workingPath${workingPath} - --masteryarn - - - - - - - - - - - - - - - \ No newline at end of file