From 2caaaec42d24023b7195f8922bafd43bc12494dd Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 1 Sep 2023 09:32:57 +0200 Subject: [PATCH] Include SparkCleanRelation logic in SparkPropagateRelation SparkPropagateRelation includes merge relations Revised tests for SparkPropagateRelation --- .../dhp/oa/dedup/RelationAggregator.java | 57 ------- .../dhp/oa/dedup/SparkCleanRelation.scala | 78 --------- .../dhp/oa/dedup/SparkPropagateRelation.java | 138 ++++++++-------- .../oa/dedup/cleanRelation_parameters.json | 20 --- .../dedup/consistency/oozie_app/workflow.xml | 28 +--- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 150 ++++++++---------- .../oa/dedup/SparkOpenorgsProvisionTest.java | 103 ++++++------ 7 files changed, 185 insertions(+), 389 deletions(-) delete mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java delete mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala delete mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java deleted file mode 100644 index 96d783dbf..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java +++ /dev/null @@ -1,57 +0,0 @@ - -package eu.dnetlib.dhp.oa.dedup; - -import java.util.Objects; - -import org.apache.spark.sql.Encoder; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.expressions.Aggregator; - -import eu.dnetlib.dhp.schema.oaf.Relation; - -public class RelationAggregator extends Aggregator { - - private static final Relation ZERO = new Relation(); - - @Override - public Relation zero() { - return ZERO; - } - - @Override - public Relation reduce(Relation b, Relation a) { - return mergeRel(b, a); - } - - @Override - public Relation merge(Relation b, Relation a) { - return mergeRel(b, a); - } - - @Override - public Relation finish(Relation r) { - return r; - } - - private Relation mergeRel(Relation b, Relation a) { - if (Objects.equals(b, ZERO)) { - return a; - } - if (Objects.equals(a, ZERO)) { - return b; - } - - b.mergeFrom(a); - return b; - } - - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(Relation.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.kryo(Relation.class); - } -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala deleted file mode 100644 index 5d8da42c2..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala +++ /dev/null @@ -1,78 +0,0 @@ -package eu.dnetlib.dhp.oa.dedup - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.common.HdfsSupport -import eu.dnetlib.dhp.schema.oaf.Relation -import eu.dnetlib.dhp.utils.ISLookupClientFactory -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService -import org.apache.commons.io.IOUtils -import org.apache.spark.SparkConf -import org.apache.spark.sql._ -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.types.{DataTypes, StructField, StructType} -import org.slf4j.LoggerFactory - -object SparkCleanRelation { - private val log = LoggerFactory.getLogger(classOf[SparkCleanRelation]) - - @throws[Exception] - def main(args: Array[String]): Unit = { - val parser = new ArgumentApplicationParser( - IOUtils.toString( - classOf[SparkCleanRelation].getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json") - ) - ) - parser.parseArgument(args) - val conf = new SparkConf - - new SparkCleanRelation(parser, AbstractSparkAction.getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))) - } -} - -class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession) - extends AbstractSparkAction(parser, spark) { - override def run(isLookUpService: ISLookUpService): Unit = { - val graphBasePath = parser.get("graphBasePath") - val inputPath = parser.get("inputPath") - val outputPath = parser.get("outputPath") - - SparkCleanRelation.log.info("graphBasePath: '{}'", graphBasePath) - SparkCleanRelation.log.info("inputPath: '{}'", inputPath) - SparkCleanRelation.log.info("outputPath: '{}'", outputPath) - - AbstractSparkAction.removeOutputDir(spark, outputPath) - - val entities = - Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct") - - val idsSchema = StructType.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>") - - val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"), - idsSchema) - - val ids = entities - .foldLeft(emptyIds)((ds, entity) => { - val entityPath = graphBasePath + '/' + entity - if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) { - ds.union(spark.read.schema(idsSchema).json(entityPath)) - } else { - ds - } - }) - .filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true") - .select("id") - .distinct() - - val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath) - .filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true") - - AbstractSparkAction.save( - relations - .join(ids, col("source") === ids("id"), "leftsemi") - .join(ids, col("target") === ids("id"), "leftsemi"), - outputPath, - SaveMode.Overwrite - ) - } -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 175ebf8a6..739295c91 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -3,23 +3,19 @@ package eu.dnetlib.dhp.oa.dedup; import static org.apache.spark.sql.functions.col; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.Objects; - -import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; @@ -70,73 +66,63 @@ public class SparkPropagateRelation extends AbstractSparkAction { log.info("workingPath: '{}'", workingPath); log.info("graphOutputPath: '{}'", graphOutputPath); - final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation"); - removeOutputDir(spark, outputRelationPath); - Dataset mergeRels = spark - .read() - .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) - .as(REL_BEAN_ENC); + .read() + .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) + .as(REL_BEAN_ENC); // - Dataset mergedIds = mergeRels - .where(col("relClass").equalTo(ModelConstants.MERGES)) - .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) - .distinct() - .cache(); + Dataset idsToMerge = mergeRels + .where(col("relClass").equalTo(ModelConstants.MERGES)) + .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) + .distinct(); Dataset allRels = spark - .read() - .schema(REL_BEAN_ENC.schema()) - .json(DedupUtility.createEntityPath(graphBasePath, "relation")); + .read() + .schema(REL_BEAN_ENC.schema()) + .json(graphBasePath + "/relation"); Dataset dedupedRels = allRels - .joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer") - .joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer") - .select("_1._1", "_1._2.dedupID", "_2.dedupID") - .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) - .flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC); + .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") + .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") + .select("_1._1", "_1._2.dedupID", "_2.dedupID") + .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) + .map((MapFunction, Relation>) t -> { + Relation rel = t._1(); + String newSource = t._2(); + String newTarget = t._3(); - Dataset processedRelations = distinctRelations( - dedupedRels.union(mergeRels.map((MapFunction) r -> r, REL_KRYO_ENC))) - .filter((FilterFunction) r -> !Objects.equals(r.getSource(), r.getTarget())); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } - save(processedRelations, outputRelationPath, SaveMode.Overwrite); - } + if (newSource != null || newTarget != null) { + rel.getDataInfo().setDeletedbyinference(false); - private static Iterator addInferredRelations(Tuple3 t) throws Exception { - Relation existingRel = t._1(); - String newSource = t._2(); - String newTarget = t._3(); + if (newSource != null) + rel.setSource(newSource); - if (newSource == null && newTarget == null) { - return Collections.singleton(t._1()).iterator(); - } + if (newTarget != null) + rel.setTarget(newTarget); + } - // update existing relation - if (existingRel.getDataInfo() == null) { - existingRel.setDataInfo(new DataInfo()); - } - existingRel.getDataInfo().setDeletedbyinference(true); + return rel; + }, REL_BEAN_ENC); - // Create new relation inferred by dedupIDs - Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel); + // ids of records that are both not deletedbyinference and not invisible + Dataset ids = validIds(spark, graphBasePath); - inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo())); - inferredRel.getDataInfo().setDeletedbyinference(false); + // filter relations that point to valid records, can force them to be visible + Dataset cleanedRels = dedupedRels + .join(ids, col("source").equalTo(ids.col("id")), "leftsemi") + .join(ids, col("target").equalTo(ids.col("id")), "leftsemi") + .as(REL_BEAN_ENC) + .map((MapFunction) r -> { + r.getDataInfo().setInvisible(false); + return r; + }, REL_KRYO_ENC); - if (newSource != null) - inferredRel.setSource(newSource); - - if (newTarget != null) - inferredRel.setTarget(newTarget); - - return Arrays.asList(existingRel, inferredRel).iterator(); - } - - private Dataset distinctRelations(Dataset rels) { - return rels - .filter(getRelationFilterFunction()) + Dataset distinctRels = cleanedRels .groupByKey( (MapFunction) r -> String .join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), @@ -146,13 +132,33 @@ public class SparkPropagateRelation extends AbstractSparkAction { return b; }) .map((MapFunction, Relation>) Tuple2::_2, REL_BEAN_ENC); + + final String outputRelationPath = graphOutputPath + "/relation"; + removeOutputDir(spark, outputRelationPath); + save( + distinctRels + .union(mergeRels) + .filter("source != target AND dataInfo.deletedbyinference != true AND dataInfo.invisible != true"), + outputRelationPath, + SaveMode.Overwrite); } - private FilterFunction getRelationFilterFunction() { - return r -> StringUtils.isNotBlank(r.getSource()) || - StringUtils.isNotBlank(r.getTarget()) || - StringUtils.isNotBlank(r.getRelType()) || - StringUtils.isNotBlank(r.getSubRelType()) || - StringUtils.isNotBlank(r.getRelClass()); + static Dataset validIds(SparkSession spark, String graphBasePath) { + StructType idsSchema = StructType + .fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>"); + + Dataset allIds = spark.emptyDataset(RowEncoder.apply(idsSchema)); + + for (EntityType entityType : ModelSupport.entityTypes.keySet()) { + String entityPath = graphBasePath + '/' + entityType.name(); + if (HdfsSupport.exists(entityPath, spark.sparkContext().hadoopConfiguration())) { + allIds = allIds.union(spark.read().schema(idsSchema).json(entityPath)); + } + } + + return allIds + .filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true") + .select("id") + .distinct(); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json deleted file mode 100644 index 860539ad9..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json +++ /dev/null @@ -1,20 +0,0 @@ -[ - { - "paramName": "i", - "paramLongName": "graphBasePath", - "paramDescription": "the base path of raw graph", - "paramRequired": true - }, - { - "paramName": "w", - "paramLongName": "inputPath", - "paramDescription": "the path to the input relation to cleanup", - "paramRequired": true - }, - { - "paramName": "o", - "paramLongName": "outputPath", - "paramDescription": "the path of the output relation cleaned", - "paramRequired": true - } -] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index b724e5d0b..0083339cf 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -100,35 +100,9 @@ --conf spark.sql.shuffle.partitions=15000 --graphBasePath${graphBasePath} - --graphOutputPath${workingPath}/propagaterelation/ + --graphOutputPath${graphOutputPath} --workingPath${workingPath} - - - - - - - yarn - cluster - Clean Relations - eu.dnetlib.dhp.oa.dedup.SparkCleanRelation - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15000 - - --graphBasePath${graphBasePath} - --inputPath${workingPath}/propagaterelation/relation - --outputPath${graphOutputPath}/relation - diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 38bd72a5e..6c4935637 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup; import static java.nio.file.Files.createTempDirectory; -import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.count; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.lenient; @@ -23,14 +22,13 @@ import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -46,8 +44,6 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.util.MapDocumentUtil; -import scala.Tuple2; @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @@ -62,6 +58,8 @@ public class SparkDedupTest implements Serializable { private static String testGraphBasePath; private static String testOutputBasePath; private static String testDedupGraphBasePath; + private static String testConsistencyGraphBasePath; + private static final String testActionSetId = "test-orchestrator"; private static String whitelistPath; private static List whiteList; @@ -75,6 +73,7 @@ public class SparkDedupTest implements Serializable { .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) .toFile() .getAbsolutePath(); + testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") .toAbsolutePath() .toString(); @@ -83,6 +82,10 @@ public class SparkDedupTest implements Serializable { .toAbsolutePath() .toString(); + testConsistencyGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + whitelistPath = Paths .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI()) .toFile() @@ -674,22 +677,45 @@ public class SparkDedupTest implements Serializable { assertEquals(mergedOrp, deletedOrp); } + @Test + @Order(6) + void copyRelationsNoOpenorgsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyRelationsNoOpenorgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); + + new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService); + + final Dataset outputRels = spark.read().text(testDedupGraphBasePath + "/relation"); + + System.out.println(outputRels.count()); + // assertEquals(2382, outputRels.count()); + } + @Test @Order(7) void propagateRelationTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")); - String outputRelPath = testDedupGraphBasePath + "/propagaterelation"; parser .parseArgument( new String[] { - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", outputRelPath + "-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath }); new SparkPropagateRelation(parser, spark).run(isLookUpService); - long relations = jsc.textFile(outputRelPath + "/relation").count(); + long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); // assertEquals(4860, relations); System.out.println("relations = " + relations); @@ -699,95 +725,52 @@ public class SparkDedupTest implements Serializable { .read() .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) .as(Encoders.bean(Relation.class)); - final JavaPairRDD mergedIds = mergeRels - .where("relClass == 'merges'") - .select(mergeRels.col("target")) - .distinct() - .toJavaRDD() - .mapToPair( - (PairFunction) r -> new Tuple2(r.getString(0), "d")); - JavaRDD toCheck = jsc - .textFile(outputRelPath + "/relation") - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()) - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()); + Dataset inputRels = spark + .read() + .json(testDedupGraphBasePath + "/relation"); - long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); - long updated = toCheck.count(); + Dataset outputRels = spark + .read() + .json(testConsistencyGraphBasePath + "/relation"); - assertEquals(updated, deletedbyinference); + assertEquals( + 0, outputRels + .filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true") + .count()); + + assertEquals( + 5, outputRels + .filter("relClass NOT IN ('merges', 'isMergedIn')") + .count()); + + assertEquals(5 + mergeRels.count(), outputRels.count()); } @Test @Order(8) - void testCleanBaseRelations() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); - - // append dangling relations to be cleaned up + void testCleanedPropagatedRelations() throws Exception { Dataset df_before = spark .read() .schema(Encoders.bean(Relation.class).schema()) - .json(testGraphBasePath + "/relation"); - Dataset df_input = df_before - .unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a"))) - .unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a"))); - df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp"); - - parser - .parseArgument( - new String[] { - "--graphBasePath", testGraphBasePath, - "--inputPath", testGraphBasePath + "/relation", - "--outputPath", testDedupGraphBasePath + "/relation" - }); - - new SparkCleanRelation(parser, spark).run(isLookUpService); + .json(testDedupGraphBasePath + "/relation"); Dataset df_after = spark .read() .schema(Encoders.bean(Relation.class).schema()) - .json(testDedupGraphBasePath + "/relation"); - - assertNotEquals(df_before.count(), df_input.count()); - assertNotEquals(df_input.count(), df_after.count()); - assertEquals(5, df_after.count()); - } - - @Test - @Order(9) - void testCleanDedupedRelations() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); - - String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation"; - - // append dangling relations to be cleaned up - Dataset df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputRelPath); - - df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false); - - parser - .parseArgument( - new String[] { - "--graphBasePath", testGraphBasePath, - "--inputPath", inputRelPath, - "--outputPath", testDedupGraphBasePath + "/relation" - }); - - new SparkCleanRelation(parser, spark).run(isLookUpService); - - Dataset df_after = spark - .read() - .schema(Encoders.bean(Relation.class).schema()) - .json(testDedupGraphBasePath + "/relation"); + .json(testConsistencyGraphBasePath + "/relation"); assertNotEquals(df_before.count(), df_after.count()); - assertEquals(0, df_after.count()); + + assertEquals( + 0, df_after + .filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true") + .count()); + + assertEquals( + 5, df_after + .filter("relClass NOT IN ('merges', 'isMergedIn')") + .count()); } @Test @@ -813,6 +796,7 @@ public class SparkDedupTest implements Serializable { public static void finalCleanUp() throws IOException { FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath)); } public boolean isDeletedByInference(String s) { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java index a0bf6b37e..73e768cf1 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup; import static java.nio.file.Files.createTempDirectory; +import static org.apache.spark.sql.functions.col; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.lenient; @@ -15,10 +16,6 @@ import java.nio.file.Paths; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -33,8 +30,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.util.MapDocumentUtil; -import scala.Tuple2; @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @@ -44,11 +39,11 @@ public class SparkOpenorgsProvisionTest implements Serializable { ISLookUpService isLookUpService; private static SparkSession spark; - private static JavaSparkContext jsc; private static String testGraphBasePath; private static String testOutputBasePath; private static String testDedupGraphBasePath; + private static String testConsistencyGraphBasePath; private static final String testActionSetId = "test-orchestrator"; @BeforeAll @@ -64,6 +59,9 @@ public class SparkOpenorgsProvisionTest implements Serializable { testDedupGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-") .toAbsolutePath() .toString(); + testConsistencyGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); @@ -76,8 +74,13 @@ public class SparkOpenorgsProvisionTest implements Serializable { .master("local[*]") .config(conf) .getOrCreate(); + } - jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath)); } @BeforeEach @@ -186,26 +189,21 @@ public class SparkOpenorgsProvisionTest implements Serializable { new SparkUpdateEntity(parser, spark).run(isLookUpService); - long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); + Dataset organizations = spark.read().json(testDedupGraphBasePath + "/organization"); - long mergedOrgs = spark + Dataset mergedOrgs = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") - .as(Encoders.bean(Relation.class)) .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); + .select("target") + .distinct(); - assertEquals(80, organizations); + assertEquals(80, organizations.count()); - long deletedOrgs = jsc - .textFile(testDedupGraphBasePath + "/organization") - .filter(this::isDeletedByInference) - .count(); + Dataset deletedOrgs = organizations + .filter("dataInfo.deletedbyinference = TRUE"); - assertEquals(mergedOrgs, deletedOrgs); + assertEquals(mergedOrgs.count(), deletedOrgs.count()); } @Test @@ -226,10 +224,9 @@ public class SparkOpenorgsProvisionTest implements Serializable { new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService); - final JavaRDD rels = jsc.textFile(testDedupGraphBasePath + "/relation"); - - assertEquals(2382, rels.count()); + final Dataset outputRels = spark.read().text(testDedupGraphBasePath + "/relation"); + assertEquals(2382, outputRels.count()); } @Test @@ -244,51 +241,41 @@ public class SparkOpenorgsProvisionTest implements Serializable { parser .parseArgument( new String[] { - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + "-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath }); new SparkPropagateRelation(parser, spark).run(isLookUpService); - long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - - assertEquals(4896, relations); - - // check deletedbyinference final Dataset mergeRels = spark .read() .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) .as(Encoders.bean(Relation.class)); - final JavaPairRDD mergedIds = mergeRels + + Dataset inputRels = spark + .read() + .json(testDedupGraphBasePath + "/relation"); + + Dataset outputRels = spark + .read() + .json(testConsistencyGraphBasePath + "/relation"); + + final Dataset mergedIds = mergeRels .where("relClass == 'merges'") - .select(mergeRels.col("target")) - .distinct() - .toJavaRDD() - .mapToPair( - (PairFunction) r -> new Tuple2(r.getString(0), "d")); + .select(col("target").as("id")) + .distinct(); - JavaRDD toCheck = jsc - .textFile(testDedupGraphBasePath + "/relation") - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()) - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()); + Dataset toUpdateRels = inputRels + .as("rel") + .join(mergedIds.as("s"), col("rel.source").equalTo(col("s.id")), "left_outer") + .join(mergedIds.as("t"), col("rel.target").equalTo(col("t.id")), "left_outer") + .filter("s.id IS NOT NULL OR t.id IS NOT NULL") + .distinct(); - long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); - long updated = toCheck.count(); + Dataset updatedRels = inputRels + .select("source", "target", "relClass") + .except(outputRels.select("source", "target", "relClass")); - assertEquals(updated, deletedbyinference); + assertEquals(toUpdateRels.count(), updatedRels.count()); + assertEquals(140, outputRels.count()); } - - @AfterAll - public static void finalCleanUp() throws IOException { - FileUtils.deleteDirectory(new File(testOutputBasePath)); - FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); - } - - public boolean isDeletedByInference(String s) { - return s.contains("\"deletedbyinference\":true"); - } - }