1
0
Fork 0

fixed resolve relation join

This commit is contained in:
Sandro La Bruzzo 2021-07-23 14:17:17 +02:00
parent 4a439c3863
commit cfde63a7c3
1 changed files with 2 additions and 2 deletions

View File

@ -44,7 +44,7 @@ object SparkResolveRelation {
val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_1")), "left").map{ relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map{
m => m =>
val sourceResolved = m._2 val sourceResolved = m._2
val currentRelation = m._1._2 val currentRelation = m._1._2
@ -57,7 +57,7 @@ object SparkResolveRelation {
val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_1")), "left").map{ relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{
m => m =>
val targetResolved = m._2 val targetResolved = m._2
val currentRelation = m._1._2 val currentRelation = m._1._2