code refactor

This commit is contained in:
Sandro La Bruzzo 2021-07-08 09:08:25 +02:00
parent a01dbe0ab0
commit a4a54a3786
1 changed files with 1 additions and 11 deletions

View File

@ -81,9 +81,6 @@ object SparkCreateScholix {
val scholix_final:Dataset[Scholix] = spark.read.load(s"$targetPath/scholix").as[Scholix] val scholix_final:Dataset[Scholix] = spark.read.load(s"$targetPath/scholix").as[Scholix]
val stats:Dataset[(String,String,Long)]= scholix_final.map(s => (s.getSource.getDnetIdentifier, s.getTarget.getObjectType)).groupBy("_1", "_2").agg(count("_1")).as[(String,String,Long)] val stats:Dataset[(String,String,Long)]= scholix_final.map(s => (s.getSource.getDnetIdentifier, s.getTarget.getObjectType)).groupBy("_1", "_2").agg(count("_1")).as[(String,String,Long)]
@ -94,10 +91,7 @@ object SparkCreateScholix {
.map(_._2) .map(_._2)
.write.mode(SaveMode.Overwrite).save(s"$targetPath/related_entities") .write.mode(SaveMode.Overwrite).save(s"$targetPath/related_entities")
val relatedEntitiesDS:Dataset[RelatedEntities] = spark.read.load(s"$targetPath/related_entities").as[RelatedEntities].filter(r => r.relatedPublication>0 || r.relatedDataset > 0)
val relatedEntitiesDS:Dataset[RelatedEntities] = spark.read.load(s"$targetPath/related_entities").as[RelatedEntities].filter(r => r.relatedPublication>0 || r.relatedDataset > 0)
relatedEntitiesDS.joinWith(summaryDS, relatedEntitiesDS("id").equalTo(summaryDS("_1")), "inner").map{i => relatedEntitiesDS.joinWith(summaryDS, relatedEntitiesDS("id").equalTo(summaryDS("_1")), "inner").map{i =>
val re = i._1 val re = i._1
@ -109,8 +103,4 @@ object SparkCreateScholix {
}.write.mode(SaveMode.Overwrite).save(s"${summaryPath}_filtered") }.write.mode(SaveMode.Overwrite).save(s"${summaryPath}_filtered")
} }
} }