diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 4e012293b..32b01679f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -83,12 +83,14 @@ public class SparkPropagateRelation extends AbstractSparkAction { .textFile(relationPath) .map(patchRelFn(), Encoders.bean(Relation.class)); + //change raw ids with dedup ids Dataset newRels = processDataset( processDataset(rels, mergedIds, FieldType.SOURCE, getFixRelFn(FieldType.SOURCE)), mergedIds, FieldType.TARGET, getFixRelFn(FieldType.TARGET)) .filter(SparkPropagateRelation::containsDedup); + //update deletedbyinference Dataset updated = processDataset( processDataset(rels, mergedIds, FieldType.SOURCE, getDeletedFn()), mergedIds, FieldType.TARGET, getDeletedFn()); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 880e917ce..d0098e7e2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -4,17 +4,24 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import scala.Tuple2; import java.io.File; import java.io.IOException; @@ -211,6 +218,27 @@ public class SparkDedupTest implements Serializable { assertEquals(826, relations); + //check deletedbyinference + final Dataset mergeRels = spark.read().load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")).as(Encoders.bean(Relation.class)); + final JavaPairRDD mergedIds = mergeRels + .where("relClass == 'merges'") + .select(mergeRels.col("target")) + .distinct() + .toJavaRDD() + .mapToPair((PairFunction) r -> new Tuple2(r.getString(0), "d")); + + JavaRDD toCheck = jsc.textFile(testDedupGraphBasePath + "/relation") + .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) + .join(mergedIds) + .map(t -> t._2()._1()) + .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) + .join(mergedIds) + .map(t -> t._2()._1()); + + long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); + long updated = toCheck.count(); + + assertEquals(updated, deletedbyinference); } @AfterAll