From af49424b599010a997835de6a2d771933c1f42fa Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 4 Aug 2023 14:27:39 +0200 Subject: [PATCH] Add a "CleanRelation" action after the PropagateRelation to filter out all relations that have been deleyted by inference or that are pointing to dangling entities --- .../dhp/oa/dedup/SparkCleanRelation.scala | 74 ++++++++++ .../oa/dedup/cleanRelation_parameters.json | 20 +++ .../dedup/consistency/oozie_app/workflow.xml | 27 +++- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 132 +++++++++++++----- 4 files changed, 214 insertions(+), 39 deletions(-) create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala new file mode 100644 index 000000000..5f1e63ca8 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala @@ -0,0 +1,74 @@ +package eu.dnetlib.dhp.oa.dedup + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.HdfsSupport +import eu.dnetlib.dhp.schema.oaf.Relation +import eu.dnetlib.dhp.utils.ISLookupClientFactory +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.slf4j.LoggerFactory + +object SparkCleanRelation { + private val log = LoggerFactory.getLogger(classOf[SparkCleanRelation]) + + @throws[Exception] + def main(args: Array[String]): Unit = { + val parser = new ArgumentApplicationParser( + IOUtils.toString( + classOf[SparkCleanRelation].getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json") + ) + ) + parser.parseArgument(args) + val conf = new SparkConf + + new SparkCleanRelation(parser, AbstractSparkAction.getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))) + } +} + +class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession) + extends AbstractSparkAction(parser, spark) { + override def run(isLookUpService: ISLookUpService): Unit = { + val graphBasePath = parser.get("graphBasePath") + val inputPath = parser.get("inputPath") + val outputPath = parser.get("outputPath") + + SparkCleanRelation.log.info("graphBasePath: '{}'", graphBasePath) + SparkCleanRelation.log.info("inputPath: '{}'", inputPath) + SparkCleanRelation.log.info("outputPath: '{}'", outputPath) + + AbstractSparkAction.removeOutputDir(spark, outputPath) + + val entities = + Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct") + + val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"), + new StructType().add(StructField("id", DataTypes.StringType, true))) + + val ids = entities + .foldLeft(emptyIds)((ds, entity) => { + val entityPath = graphBasePath + '/' + entity + if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) { + ds.union(spark.read.schema("`id` STRING").json(entityPath)) + } else { + ds + } + }) + .distinct() + + val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath) + .filter(col("dataInfo.deletedbyinference").isNull || col("dataInfo.deletedbyinference") === false) + + AbstractSparkAction.save( + relations + .join(ids, col("source") === ids("id"), "leftsemi") + .join(ids, col("target") === ids("id"), "leftsemi"), + outputPath, + SaveMode.Overwrite + ) + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json new file mode 100644 index 000000000..860539ad9 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "inputPath", + "paramDescription": "the path to the input relation to cleanup", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "the path of the output relation cleaned", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index 7c500493f..fedc68d9d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -92,9 +92,34 @@ --conf spark.sql.shuffle.partitions=15000 --graphBasePath${graphBasePath} - --o${graphOutputPath} + --outputPath${workingPath}/propagaterelation/ --workingPath${workingPath} + + + + + + + yarn + cluster + Clean Relations + eu.dnetlib.dhp.oa.dedup.SparkCleanRelation + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15000 + + --graphBasePath${graphBasePath} + --inputPath${workingPath}/propagaterelation/relation + --outputPath${graphOutputPath}/relation + diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 6a2c6dcc5..c382e922e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -1,11 +1,32 @@ package eu.dnetlib.dhp.oa.dedup; -import static java.nio.file.Files.createTempDirectory; - -import static org.apache.spark.sql.functions.count; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.lenient; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.util.MapDocumentUtil; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import scala.Tuple2; import java.io.File; import java.io.FileReader; @@ -19,36 +40,11 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.*; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Sets; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.util.MapDocumentUtil; -import scala.Tuple2; +import static java.nio.file.Files.createTempDirectory; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.lenient; @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @@ -681,15 +677,16 @@ public class SparkDedupTest implements Serializable { ArgumentApplicationParser parser = new ArgumentApplicationParser( classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")); + String outputRelPath = testDedupGraphBasePath + "/propagaterelation"; parser .parseArgument( new String[] { - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", outputRelPath }); new SparkPropagateRelation(parser, spark).run(isLookUpService); - long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); + long relations = jsc.textFile(outputRelPath + "/relation").count(); // assertEquals(4860, relations); System.out.println("relations = " + relations); @@ -708,7 +705,7 @@ public class SparkDedupTest implements Serializable { (PairFunction) r -> new Tuple2(r.getString(0), "d")); JavaRDD toCheck = jsc - .textFile(testDedupGraphBasePath + "/relation") + .textFile(outputRelPath + "/relation") .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) .join(mergedIds) .map(t -> t._2()._1()) @@ -724,6 +721,65 @@ public class SparkDedupTest implements Serializable { @Test @Order(8) + void testCleanBaseRelations() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); + + // append dangling relations to be cleaned up + Dataset df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testGraphBasePath + "/relation"); + Dataset df_input =df_before + .unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a"))) + .unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a"))); + df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp"); + + parser + .parseArgument( + new String[]{ + "--graphBasePath", testGraphBasePath, + "--inputPath", testGraphBasePath + "/relation", + "--outputPath", testDedupGraphBasePath + "/relation" + }); + + new SparkCleanRelation(parser, spark).run(isLookUpService); + + Dataset df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation"); + + assertNotEquals(df_before.count(), df_input.count()); + assertNotEquals(df_input.count(), df_after.count()); + assertEquals(5, df_after.count()); + } + + @Test + @Order(9) + void testCleanDedupedRelations() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); + + String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation"; + + // append dangling relations to be cleaned up + Dataset df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputRelPath); + + df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false); + + parser + .parseArgument( + new String[]{ + "--graphBasePath", testGraphBasePath, + "--inputPath", inputRelPath, + "--outputPath", testDedupGraphBasePath + "/relation" + }); + + new SparkCleanRelation(parser, spark).run(isLookUpService); + + Dataset df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation"); + + assertNotEquals(df_before.count(), df_after.count()); + assertEquals(0, df_after.count()); + } + + @Test + @Order(10) void testRelations() throws Exception { testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);