diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala
index 578db1ea94..85fbd99c47 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala
@@ -66,7 +66,7 @@ object SparkDownloadEBILinks {
val log: Logger = LoggerFactory.getLogger(getClass)
val MAX_ITEM_PER_PARTITION = 20000
val conf: SparkConf = new SparkConf()
- val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json")))
+ val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/ebi_download_update.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
@@ -86,7 +86,7 @@ object SparkDownloadEBILinks {
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
- log.info("Getting max pubmedId where the links have been requested")
+ log.info("Getting max pubmedId where the links have already requested")
val links: Dataset[EBILinkItem] = spark.read.load(s"$sourcePath/ebi_links_dataset").as[EBILinkItem]
val lastPMIDRequested = links.map(l => l.id).select(max("value")).first.getLong(0)
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
index 0db469769c..10467884cd 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
@@ -14,7 +14,7 @@ object SparkEBILinksToOaf {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
- val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json")))
+ val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/ebi_to_df_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
@@ -31,7 +31,7 @@ object SparkEBILinksToOaf {
log.info(s"targetPath -> $targetPath")
implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
- val ebLinks: Dataset[EBILinkItem] = spark.read.load(s"${sourcePath}_dataset").as[EBILinkItem].filter(l => l.links != null)
+ val ebLinks: Dataset[EBILinkItem] = spark.read.load(sourcePath).as[EBILinkItem].filter(l => l.links != null && l.links.startsWith("{"))
ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links))
.filter(p => BioDBToOAF.EBITargetLinksFilter(p))
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/config-default.xml
rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml
new file mode 100644
index 0000000000..73b1a3b608
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml
@@ -0,0 +1,105 @@
+
+
+
+ sourcePath
+ the Working Path
+
+
+ workingPath
+ the Working Path
+
+
+ targetPath
+ the OAF MDStore Path
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ resumeFrom
+ DownloadEBILinks
+ node to start
+
+
+
+
+
+
+
+ ${wf:conf('resumeFrom') eq 'DownloadEBILinks'}
+ ${wf:conf('resumeFrom') eq 'CreateEBIDataSet'}
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ yarn-cluster
+ cluster
+ Incremental Download EBI Links
+ eu.dnetllib.dhp.sx.bio.ebi.SparkDownloadEBILinks
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=2000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${sourcePath}
+ --workingPath${workingPath}
+ --masteryarn
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ Create OAF DataSet
+ eu.dnetllib.dhp.sx.bio.ebi.SparkEBILinksToOaf
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=2000
+ ${sparkExtraOPT}
+
+ --sourcePath${sourcePath}/ebi_links_dataset
+ --targetPath${targetPath}
+ --masteryarn
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml
deleted file mode 100644
index 7612321c04..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml
+++ /dev/null
@@ -1,99 +0,0 @@
-
-
-
- workingPath
- the Working Path
-
-
- sparkDriverMemory
- memory for driver process
-
-
- sparkExecutorMemory
- memory for individual executor
-
-
- sparkExecutorCores
- number of cores used by single executor
-
-
-
-
-
-
-
- Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
- yarn-cluster
- cluster
- Create Baselnie DataSet
-
- eu.dnetlib.dhp.sx.ebi.SparkCreateBaselineDataFrame
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=1
- --driver-memory=${sparkDriverMemory}
- --executor-cores=${sparkExecutorCores}
- ${sparkExtraOPT}
-
- --workingPath${workingPath}
- --masteryarn
- --hdfsServerUri${nameNode}
-
-
-
-
-
-
-
- yarn-cluster
- cluster
- Create EBI DataSet
-
- eu.dnetlib.dhp.sx.ebi.SparkEBILinksToOaf
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.sql.shuffle.partitions=1000
- ${sparkExtraOPT}
-
- --workingPath${workingPath}
- --masteryarn
-
-
-
-
-
-
-
- yarn-cluster
- cluster
- Create Baseline DataSet
-
- eu.dnetlib.dhp.sx.ebi.SparkAddLinkUpdates
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=1
- --driver-memory=${sparkDriverMemory}
- --executor-cores=${sparkExecutorCores}
- ${sparkExtraOPT}
-
- --workingPath${workingPath}
- --masteryarn
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/config-default.xml
deleted file mode 100644
index 17cd6c9a33..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/config-default.xml
+++ /dev/null
@@ -1,68 +0,0 @@
-
-
-
-
-
- jobTracker
- yarnRM
-
-
- nameNode
- hdfs://nameservice1
-
-
- hive_metastore_uris
- thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
-
-
- spark2YarnHistoryServerAddress
- http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- oozie.launcher.mapreduce.user.classpath.first
- true
-
-
-
- oozie.use.system.libpath
- true
-
-
- oozie.action.sharelib.for.spark
- spark2
-
-
- spark2EventLogDir
- /user/spark/spark2ApplicationHistory
-
-
- spark2ExtraListeners
- "com.cloudera.spark.lineage.NavigatorAppListener"
-
-
- spark2SqlQueryExecutionListeners
- "com.cloudera.spark.lineage.NavigatorQueryListener"
-
-
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/workflow.xml
deleted file mode 100644
index cd3bb8c714..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/workflow.xml
+++ /dev/null
@@ -1,67 +0,0 @@
-
-
-
- sourcePath
- the Working Path
-
-
- workingPath
- the Working Path
-
-
- sparkDriverMemory
- memory for driver process
-
-
- sparkExecutorMemory
- memory for individual executor
-
-
- sparkExecutorCores
- number of cores used by single executor
-
-
-
-
-
-
-
- Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
-
- yarn-cluster
- cluster
- Incremental Download EBI Links
- eu.dnetlib.dhp.sx.graph.ebi.SparkDownloadEBILinks
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.shuffle.partitions=2000
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
-
- --sourcePath${sourcePath}
- --workingPath${workingPath}
- --masteryarn
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file