diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 866de43a5..3e784b07f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -117,22 +117,25 @@ public class SparkPropagateRelation extends AbstractSparkAction { private static Dataset createNewRels( Dataset rels, - Dataset> mergedIds, + Dataset> mergedIds, MapFunction, Tuple2>, Tuple2>, Relation> mapRel) { Dataset> mapped = rels - .map( - (MapFunction>) r -> new Tuple3<>(getId(r, FieldType.SOURCE), r, getId(r, FieldType.TARGET)), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING())); + .map( + (MapFunction>) r -> new Tuple3<>(getId(r, FieldType.SOURCE), + r, getId(r, FieldType.TARGET)), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING())); Dataset, Tuple2>> relSource = mapped - .joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer"); + .joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer"); return relSource - .joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_2")), "left_outer") - .filter((FilterFunction, Tuple2>, Tuple2>>) r -> r._2() != null || r._1() != null) - .map(mapRel, Encoders.bean(Relation.class)) - .distinct(); + .joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_2")), "left_outer") + .filter( + (FilterFunction, Tuple2>, Tuple2>>) r -> r + ._2() != null || r._1() != null) + .map(mapRel, Encoders.bean(Relation.class)) + .distinct(); } private static Dataset processDataset(