diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 3e784b07f..2acec6546 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -73,6 +73,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .as(Encoders.bean(Relation.class)); + // Dataset> mergedIds = mergeRels .where(col("relClass").equalTo(ModelConstants.MERGES)) .select(col("source"), col("target")) @@ -115,25 +116,31 @@ public class SparkPropagateRelation extends AbstractSparkAction { .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); } + //redirect the relations to the dedupID private static Dataset createNewRels( - Dataset rels, - Dataset> mergedIds, + Dataset rels, //all the relations to be redirected + Dataset> mergedIds, //merge rels: MapFunction, Tuple2>, Tuple2>, Relation> mapRel) { + // Dataset> mapped = rels .map( (MapFunction>) r -> new Tuple3<>(getId(r, FieldType.SOURCE), r, getId(r, FieldType.TARGET)), Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING())); + //< , > Dataset, Tuple2>> relSource = mapped .joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer"); - return relSource - .joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_2")), "left_outer") + //< <, >, > + Dataset, Tuple2>, Tuple2>> relSourceTarget = relSource + .joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_1")), "left_outer"); + + return relSourceTarget .filter( - (FilterFunction, Tuple2>, Tuple2>>) r -> r - ._2() != null || r._1() != null) + (FilterFunction, Tuple2>, Tuple2>>) + r -> r._1()._1() != null || r._2() != null) .map(mapRel, Encoders.bean(Relation.class)) .distinct(); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 3dcf61215..bf4913056 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -536,7 +536,7 @@ public class SparkDedupTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(4718, relations); + assertEquals(4862, relations); // check deletedbyinference final Dataset mergeRels = spark diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java index 41f81547e..606dd9e5b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java @@ -57,9 +57,6 @@ public class SparkOpenorgsProvisionTest implements Serializable { private static String testDedupGraphBasePath; private static final String testActionSetId = "test-orchestrator"; - protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - @BeforeAll public static void cleanUp() throws IOException, URISyntaxException { @@ -259,7 +256,7 @@ public class SparkOpenorgsProvisionTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(2520, relations); + assertEquals(4894, relations); // check deletedbyinference final Dataset mergeRels = spark