forked from D-Net/dnet-hadoop
minor fix
This commit is contained in:
parent
8fac10c91e
commit
848aabbb6c
|
@ -42,6 +42,7 @@ object SparkCreateScholix {
|
||||||
|
|
||||||
|
|
||||||
val relationDS: Dataset[(String, Relation)] = spark.read.load(relationPath).as[Relation]
|
val relationDS: Dataset[(String, Relation)] = spark.read.load(relationPath).as[Relation]
|
||||||
|
.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
|
||||||
.map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder))
|
.map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder))
|
||||||
|
|
||||||
val summaryDS: Dataset[(String, ScholixSummary)] = spark.read.load(summaryPath).as[ScholixSummary]
|
val summaryDS: Dataset[(String, ScholixSummary)] = spark.read.load(summaryPath).as[ScholixSummary]
|
||||||
|
|
|
@ -34,7 +34,7 @@ object SparkCreateSummaryObject {
|
||||||
implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
|
implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
|
||||||
|
|
||||||
|
|
||||||
val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Oaf].filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])
|
val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result].filter(r=>r.getDataInfo== null || r.getDataInfo.getDeletedbyinference== false)
|
||||||
|
|
||||||
ds.repartition(6000).map(r => ScholixUtils.resultToSummary(r)).filter(s => s!= null).write.mode(SaveMode.Overwrite).save(targetPath)
|
ds.repartition(6000).map(r => ScholixUtils.resultToSummary(r)).filter(s => s!= null).write.mode(SaveMode.Overwrite).save(targetPath)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue