1
0
Fork 0

fixed DownloadCSV parameters spec; workflow patching the hostedby replaces the graph content (publication, datasource) rather than creating a copy

This commit is contained in:
Claudio Atzori 2021-08-13 12:41:01 +02:00
parent c3ad4ab701
commit 7ee2757fcd
3 changed files with 19 additions and 7 deletions

View File

@ -27,8 +27,8 @@ object SparkApplyHostedByMapToDatasource {
d d
})(Encoders.bean((classOf[Datasource]))) })(Encoders.bean((classOf[Datasource])))
} }
def main(args: Array[String]): Unit = {
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass) val logger: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
@ -41,18 +41,15 @@ object SparkApplyHostedByMapToDatasource {
.appName(getClass.getSimpleName) .appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate() .master(parser.get("master")).getOrCreate()
val graphPath = parser.get("graphPath") val graphPath = parser.get("graphPath")
val outputPath = parser.get("outputPath") val outputPath = parser.get("outputPath")
val preparedInfoPath = parser.get("preparedInfoPath") val preparedInfoPath = parser.get("preparedInfoPath")
implicit val formats = DefaultFormats implicit val formats = DefaultFormats
implicit val mapEncoderPubs: Encoder[Datasource] = Encoders.bean(classOf[Datasource]) implicit val mapEncoderPubs: Encoder[Datasource] = Encoders.bean(classOf[Datasource])
implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
val dats : Dataset[Datasource] = spark.read.textFile(graphPath + "/datasource") val dats : Dataset[Datasource] = spark.read.textFile(graphPath + "/datasource")
@ -62,6 +59,12 @@ object SparkApplyHostedByMapToDatasource {
.map(ei => mapper.readValue(ei, classOf[EntityInfo]))) .map(ei => mapper.readValue(ei, classOf[EntityInfo])))
applyHBtoDats(pinfo, dats).write.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath) applyHBtoDats(pinfo, dats).write.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath)
spark.read.textFile(outputPath)
.write
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.text(graphPath + "/datasource")
} }

View File

@ -75,8 +75,11 @@ object SparkApplyHostedByMapToResult {
applyHBtoPubs(pinfo, pubs).write.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath) applyHBtoPubs(pinfo, pubs).write.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath)
spark.read.textFile(outputPath)
.write
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.text(graphPath + "/publication")
} }

View File

@ -11,6 +11,12 @@
"paramDescription": "the path where to find the pre-processed data for unibi gold list and doj artciles", "paramDescription": "the path where to find the pre-processed data for unibi gold list and doj artciles",
"paramRequired": true "paramRequired": true
}, },
{
"paramName":"of",
"paramLongName":"outputFile",
"paramDescription": "the output json file produced by the CSV downlaod procedure",
"paramRequired": true
},
{ {
"paramName": "hnn", "paramName": "hnn",
"paramLongName": "hdfsNameNode", "paramLongName": "hdfsNameNode",