Hosted By Map - refactoring
This commit is contained in:
parent
1e952cccf6
commit
eccf3851b0
|
@ -23,10 +23,6 @@ object SparkProduceHostedByMap {
|
||||||
implicit val tupleForJoinEncoder: Encoder[(String, HostedByItemType)] = Encoders.tuple(Encoders.STRING, Encoders.product[HostedByItemType])
|
implicit val tupleForJoinEncoder: Encoder[(String, HostedByItemType)] = Encoders.tuple(Encoders.STRING, Encoders.product[HostedByItemType])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def toHostedByItemType(input: ((HostedByInfo, HostedByInfo), HostedByInfo)) : HostedByItemType = {
|
def toHostedByItemType(input: ((HostedByInfo, HostedByInfo), HostedByInfo)) : HostedByItemType = {
|
||||||
val openaire: HostedByInfo = input._1._1
|
val openaire: HostedByInfo = input._1._1
|
||||||
val doaj: HostedByInfo = input._1._2
|
val doaj: HostedByInfo = input._1._2
|
||||||
|
@ -217,7 +213,7 @@ object SparkProduceHostedByMap {
|
||||||
.union(doajHostedByDataset(spark, workingDirPath + "/doaj"))
|
.union(doajHostedByDataset(spark, workingDirPath + "/doaj"))
|
||||||
.flatMap(hbi => toList(hbi))).filter(hbi => hbi._2.id.startsWith("10|"))
|
.flatMap(hbi => toList(hbi))).filter(hbi => hbi._2.id.startsWith("10|"))
|
||||||
.map(hbi => toHostedByMap(hbi))(Encoders.STRING)
|
.map(hbi => toHostedByMap(hbi))(Encoders.STRING)
|
||||||
.rdd.saveAsTextFile(outputPath + "/hostedByMap", classOf[GzipCodec])
|
.rdd.saveAsTextFile(outputPath , classOf[GzipCodec])
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue