diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 8e6e79eb32..3e784b07f2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -22,6 +22,7 @@ import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; +import scala.Tuple3; public class SparkPropagateRelation extends AbstractSparkAction { @@ -85,13 +86,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { Dataset rels = spark.read().textFile(relationPath).map(patchRelFn(), Encoders.bean(Relation.class)); - Dataset newRels = processDataset( - processDataset(rels, mergedIds, FieldType.SOURCE, getFixRelFn(FieldType.SOURCE)), - mergedIds, - FieldType.TARGET, - getFixRelFn(FieldType.TARGET)) - .filter(SparkPropagateRelation::containsDedup) - .distinct(); + Dataset newRels = createNewRels(rels, mergedIds, getFixRelFn()); Dataset updated = processDataset( processDataset(rels, mergedIds, FieldType.SOURCE, getDeletedFn()), @@ -120,6 +115,29 @@ public class SparkPropagateRelation extends AbstractSparkAction { .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); } + private static Dataset createNewRels( + Dataset rels, + Dataset> mergedIds, + MapFunction, Tuple2>, Tuple2>, Relation> mapRel) { + + Dataset> mapped = rels + .map( + (MapFunction>) r -> new Tuple3<>(getId(r, FieldType.SOURCE), + r, getId(r, FieldType.TARGET)), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING())); + + Dataset, Tuple2>> relSource = mapped + .joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer"); + + return relSource + .joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_2")), "left_outer") + .filter( + (FilterFunction, Tuple2>, Tuple2>>) r -> r + ._2() != null || r._1() != null) + .map(mapRel, Encoders.bean(Relation.class)) + .distinct(); + } + private static Dataset processDataset( Dataset rels, Dataset> mergedIds, @@ -153,28 +171,25 @@ public class SparkPropagateRelation extends AbstractSparkAction { } } - private static MapFunction, Tuple2>, Relation> getFixRelFn( - FieldType type) { + private static MapFunction, Tuple2>, Tuple2>, Relation> getFixRelFn() { return value -> { - if (value._2() != null) { - Relation r = value._1()._2(); - String id = value._2()._2(); - if (r.getDataInfo() == null) { - r.setDataInfo(new DataInfo()); - } - r.getDataInfo().setDeletedbyinference(false); - switch (type) { - case SOURCE: - r.setSource(id); - return r; - case TARGET: - r.setTarget(id); - return r; - default: - throw new IllegalArgumentException(""); - } + + Relation r = value._1()._1()._2(); + String newSource = value._1()._2() != null ? value._1()._2()._2() : null; + String newTarget = value._2() != null ? value._2()._2() : null; + + if (r.getDataInfo() == null) { + r.setDataInfo(new DataInfo()); } - return value._1()._2(); + r.getDataInfo().setDeletedbyinference(false); + + if (newSource != null) + r.setSource(newSource); + + if (newTarget != null) + r.setTarget(newTarget); + + return r; }; } @@ -192,8 +207,4 @@ public class SparkPropagateRelation extends AbstractSparkAction { }; } - private static boolean containsDedup(final Relation r) { - return r.getSource().toLowerCase().contains(ModelConstants.DEDUP) - || r.getTarget().toLowerCase().contains(ModelConstants.DEDUP); - } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 93caadf938..3dcf612154 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -12,6 +12,7 @@ import java.io.IOException; import java.io.Serializable; import java.net.URISyntaxException; import java.nio.file.Paths; +import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -66,6 +67,7 @@ public class SparkDedupTest implements Serializable { testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") .toAbsolutePath() .toString(); + testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") .toAbsolutePath() .toString(); @@ -534,7 +536,7 @@ public class SparkDedupTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(4862, relations); + assertEquals(4718, relations); // check deletedbyinference final Dataset mergeRels = spark