From 7ee2757fcd97d2d842638f4e52290df8217f465b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 13 Aug 2021 12:41:01 +0200 Subject: [PATCH] fixed DownloadCSV parameters spec; workflow patching the hostedby replaces the graph content (publication, datasource) rather than creating a copy --- .../SparkApplyHostedByMapToDatasource.scala | 13 ++++++++----- .../hostedbymap/SparkApplyHostedByMapToResult.scala | 7 +++++-- .../graph/hostedbymap/download_csv_parameters.json | 6 ++++++ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala index 424567830..1b18ba3ae 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala @@ -27,8 +27,8 @@ object SparkApplyHostedByMapToDatasource { d })(Encoders.bean((classOf[Datasource]))) } - def main(args: Array[String]): Unit = { + def main(args: Array[String]): Unit = { val logger: Logger = LoggerFactory.getLogger(getClass) val conf: SparkConf = new SparkConf() @@ -41,18 +41,15 @@ object SparkApplyHostedByMapToDatasource { .appName(getClass.getSimpleName) .master(parser.get("master")).getOrCreate() - val graphPath = parser.get("graphPath") - val outputPath = parser.get("outputPath") val preparedInfoPath = parser.get("preparedInfoPath") - implicit val formats = DefaultFormats - implicit val mapEncoderPubs: Encoder[Datasource] = Encoders.bean(classOf[Datasource]) implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) + val mapper = new ObjectMapper() val dats : Dataset[Datasource] = spark.read.textFile(graphPath + "/datasource") @@ -62,6 +59,12 @@ object SparkApplyHostedByMapToDatasource { .map(ei => mapper.readValue(ei, classOf[EntityInfo]))) applyHBtoDats(pinfo, dats).write.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath) + + spark.read.textFile(outputPath) + .write + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .text(graphPath + "/datasource") } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala index 8b0b45513..db7ab4ac0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala @@ -75,8 +75,11 @@ object SparkApplyHostedByMapToResult { applyHBtoPubs(pinfo, pubs).write.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath) - - + spark.read.textFile(outputPath) + .write + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .text(graphPath + "/publication") } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/download_csv_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/download_csv_parameters.json index cf417c675..22c65eb35 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/download_csv_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hostedbymap/download_csv_parameters.json @@ -11,6 +11,12 @@ "paramDescription": "the path where to find the pre-processed data for unibi gold list and doj artciles", "paramRequired": true }, + { + "paramName":"of", + "paramLongName":"outputFile", + "paramDescription": "the output json file produced by the CSV downlaod procedure", + "paramRequired": true + }, { "paramName": "hnn", "paramLongName": "hdfsNameNode",