forked from D-Net/dnet-hadoop
Hosted By Map - refactoring and application of the new aggregator
This commit is contained in:
parent
a7bf314fd2
commit
8f7623e77a
|
@ -45,7 +45,7 @@ object SparkApplyHostedByMapToDatasource {
|
||||||
val graphPath = parser.get("graphPath")
|
val graphPath = parser.get("graphPath")
|
||||||
|
|
||||||
val outputPath = parser.get("outputPath")
|
val outputPath = parser.get("outputPath")
|
||||||
val workingPath = parser.get("workingPath")
|
val preparedInfoPath = parser.get("preparedInfoPath")
|
||||||
|
|
||||||
|
|
||||||
implicit val formats = DefaultFormats
|
implicit val formats = DefaultFormats
|
||||||
|
@ -55,17 +55,15 @@ object SparkApplyHostedByMapToDatasource {
|
||||||
implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
||||||
val mapper = new ObjectMapper()
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
val dats : Dataset[Datasource] = spark.read.textFile("$graphPath/datasource")
|
val dats : Dataset[Datasource] = spark.read.textFile(graphPath + "/datasource")
|
||||||
.map(r => mapper.readValue(r, classOf[Datasource]))
|
.map(r => mapper.readValue(r, classOf[Datasource]))
|
||||||
|
|
||||||
val pinfo : Dataset[EntityInfo] = spark.read.textFile("$workingPath/preparedInfo")
|
val pinfo : Dataset[EntityInfo] = Aggregators.datasourceToSingleId( spark.read.textFile(preparedInfoPath)
|
||||||
.map(ei => mapper.readValue(ei, classOf[EntityInfo]))
|
.map(ei => mapper.readValue(ei, classOf[EntityInfo])))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//c. dataset join risultato del passo prima di a per datasource id, gruppo per ds id e cambio compatibilita' se necessario
|
//c. dataset join risultato del passo prima di a per datasource id, gruppo per ds id e cambio compatibilita' se necessario
|
||||||
|
|
||||||
applyHBtoDats(pinfo, dats).write.mode(SaveMode.Overwrite).option("compression","gzip").json(s"$graphPath/datasource")
|
applyHBtoDats(pinfo, dats).write.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue