forked from antonis.lempesis/dnet-hadoop
Hosted By Map - refactoring
This commit is contained in:
parent
8f7623e77a
commit
8ba8c77f92
|
@ -29,7 +29,6 @@ object SparkApplyHostedByMapToResult {
|
||||||
val i = p.getInstance().asScala
|
val i = p.getInstance().asScala
|
||||||
if (i.size == 1) {
|
if (i.size == 1) {
|
||||||
val inst: Instance = i(0)
|
val inst: Instance = i(0)
|
||||||
|
|
||||||
inst.getHostedby.setKey(ei.getHb_id)
|
inst.getHostedby.setKey(ei.getHb_id)
|
||||||
inst.getHostedby.setValue(ei.getName)
|
inst.getHostedby.setValue(ei.getName)
|
||||||
if (ei.getOpenaccess) {
|
if (ei.getOpenaccess) {
|
||||||
|
@ -60,7 +59,7 @@ object SparkApplyHostedByMapToResult {
|
||||||
val graphPath = parser.get("graphPath")
|
val graphPath = parser.get("graphPath")
|
||||||
|
|
||||||
val outputPath = parser.get("outputPath")
|
val outputPath = parser.get("outputPath")
|
||||||
val workingPath = parser.get("workingPath")
|
val preparedInfoPath = parser.get("preparedInfoPath")
|
||||||
|
|
||||||
|
|
||||||
implicit val formats = DefaultFormats
|
implicit val formats = DefaultFormats
|
||||||
|
@ -70,15 +69,15 @@ object SparkApplyHostedByMapToResult {
|
||||||
implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
||||||
val mapper = new ObjectMapper()
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
val pubs : Dataset[Publication] = spark.read.textFile("$graphPath/publication")
|
val pubs : Dataset[Publication] = spark.read.textFile(graphPath + "/publication")
|
||||||
.map(r => mapper.readValue(r, classOf[Publication]))
|
.map(r => mapper.readValue(r, classOf[Publication]))
|
||||||
|
|
||||||
val pinfo : Dataset[EntityInfo] = spark.read.textFile("$workingPath/preparedInfo")
|
val pinfo : Dataset[EntityInfo] = spark.read.textFile(preparedInfoPath)
|
||||||
.map(ei => mapper.readValue(ei, classOf[EntityInfo]))
|
.map(ei => mapper.readValue(ei, classOf[EntityInfo]))
|
||||||
|
|
||||||
//a. publication join risultato del passo precedente su result id (left) setto la istanza (se piu' di una instance
|
//a. publication join risultato del passo precedente su result id (left) setto la istanza (se piu' di una instance
|
||||||
// nel result => salto)con l'hosted by anche access right della instance se openaccess e' true
|
// nel result => salto)con l'hosted by anche access right della instance se openaccess e' true
|
||||||
applyHBtoPubs(pinfo, pubs).write.mode(SaveMode.Overwrite).option("compression","gzip").json("$graphPath/publication")
|
applyHBtoPubs(pinfo, pubs).write.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue