diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 2acec6546e..1fe83cec29 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -73,7 +73,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .as(Encoders.bean(Relation.class)); - // + // Dataset> mergedIds = mergeRels .where(col("relClass").equalTo(ModelConstants.MERGES)) .select(col("source"), col("target")) @@ -116,31 +116,32 @@ public class SparkPropagateRelation extends AbstractSparkAction { .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); } - //redirect the relations to the dedupID + // redirect the relations to the dedupID private static Dataset createNewRels( - Dataset rels, //all the relations to be redirected - Dataset> mergedIds, //merge rels: + Dataset rels, // all the relations to be redirected + Dataset> mergedIds, // merge rels: MapFunction, Tuple2>, Tuple2>, Relation> mapRel) { - // + // Dataset> mapped = rels .map( (MapFunction>) r -> new Tuple3<>(getId(r, FieldType.SOURCE), r, getId(r, FieldType.TARGET)), Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING())); - //< , > + // < , > Dataset, Tuple2>> relSource = mapped .joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer"); - //< <, >, > + // < <, >, > Dataset, Tuple2>, Tuple2>> relSourceTarget = relSource - .joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_1")), "left_outer"); + .joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_1")), "left_outer"); return relSourceTarget .filter( - (FilterFunction, Tuple2>, Tuple2>>) - r -> r._1()._1() != null || r._2() != null) + (FilterFunction, Tuple2>, Tuple2>>) r -> r + ._1() + ._1() != null || r._2() != null) .map(mapRel, Encoders.bean(Relation.class)) .distinct(); }