forked from D-Net/dnet-hadoop
moved filter on next phase
This commit is contained in:
parent
1be9aa0a5f
commit
d9cbca83f7
|
@ -75,7 +75,7 @@ object SparkResolveRelation {
|
||||||
if (targetResolved != null && targetResolved._1.nonEmpty)
|
if (targetResolved != null && targetResolved._1.nonEmpty)
|
||||||
currentRelation.setTarget(targetResolved._1)
|
currentRelation.setTarget(targetResolved._1)
|
||||||
currentRelation
|
currentRelation
|
||||||
}.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved"))
|
}
|
||||||
.write
|
.write
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.save(s"$workingPath/relation_resolved")
|
.save(s"$workingPath/relation_resolved")
|
||||||
|
@ -88,6 +88,7 @@ object SparkResolveRelation {
|
||||||
fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation"))
|
fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation"))
|
||||||
|
|
||||||
spark.read.load(s"$workingPath/relation_resolved").as[Relation]
|
spark.read.load(s"$workingPath/relation_resolved").as[Relation]
|
||||||
|
.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved"))
|
||||||
.map(r => mapper.writeValueAsString(r))
|
.map(r => mapper.writeValueAsString(r))
|
||||||
.write
|
.write
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
|
|
Loading…
Reference in New Issue