From d9cbca83f7911b53190a9f999f5e02a7f00382ac Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 28 Oct 2021 16:13:24 +0200 Subject: [PATCH] moved filter on next phase --- .../dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala index 5ca7d9782..db93bc43f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala @@ -75,7 +75,7 @@ object SparkResolveRelation { if (targetResolved != null && targetResolved._1.nonEmpty) currentRelation.setTarget(targetResolved._1) currentRelation - }.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")) + } .write .mode(SaveMode.Overwrite) .save(s"$workingPath/relation_resolved") @@ -88,6 +88,7 @@ object SparkResolveRelation { fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation")) spark.read.load(s"$workingPath/relation_resolved").as[Relation] + .filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")) .map(r => mapper.writeValueAsString(r)) .write .option("compression", "gzip")