diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala
new file mode 100644
index 000000000..5d8da42c2
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala
@@ -0,0 +1,78 @@
+package eu.dnetlib.dhp.oa.dedup
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.common.HdfsSupport
+import eu.dnetlib.dhp.schema.oaf.Relation
+import eu.dnetlib.dhp.utils.ISLookupClientFactory
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
+import org.slf4j.LoggerFactory
+
+object SparkCleanRelation {
+ private val log = LoggerFactory.getLogger(classOf[SparkCleanRelation])
+
+ @throws[Exception]
+ def main(args: Array[String]): Unit = {
+ val parser = new ArgumentApplicationParser(
+ IOUtils.toString(
+ classOf[SparkCleanRelation].getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")
+ )
+ )
+ parser.parseArgument(args)
+ val conf = new SparkConf
+
+ new SparkCleanRelation(parser, AbstractSparkAction.getSparkSession(conf))
+ .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")))
+ }
+}
+
+class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession)
+ extends AbstractSparkAction(parser, spark) {
+ override def run(isLookUpService: ISLookUpService): Unit = {
+ val graphBasePath = parser.get("graphBasePath")
+ val inputPath = parser.get("inputPath")
+ val outputPath = parser.get("outputPath")
+
+ SparkCleanRelation.log.info("graphBasePath: '{}'", graphBasePath)
+ SparkCleanRelation.log.info("inputPath: '{}'", inputPath)
+ SparkCleanRelation.log.info("outputPath: '{}'", outputPath)
+
+ AbstractSparkAction.removeOutputDir(spark, outputPath)
+
+ val entities =
+ Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct")
+
+ val idsSchema = StructType.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>")
+
+ val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"),
+ idsSchema)
+
+ val ids = entities
+ .foldLeft(emptyIds)((ds, entity) => {
+ val entityPath = graphBasePath + '/' + entity
+ if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) {
+ ds.union(spark.read.schema(idsSchema).json(entityPath))
+ } else {
+ ds
+ }
+ })
+ .filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
+ .select("id")
+ .distinct()
+
+ val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath)
+ .filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
+
+ AbstractSparkAction.save(
+ relations
+ .join(ids, col("source") === ids("id"), "leftsemi")
+ .join(ids, col("target") === ids("id"), "leftsemi"),
+ outputPath,
+ SaveMode.Overwrite
+ )
+ }
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json
new file mode 100644
index 000000000..860539ad9
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json
@@ -0,0 +1,20 @@
+[
+ {
+ "paramName": "i",
+ "paramLongName": "graphBasePath",
+ "paramDescription": "the base path of raw graph",
+ "paramRequired": true
+ },
+ {
+ "paramName": "w",
+ "paramLongName": "inputPath",
+ "paramDescription": "the path to the input relation to cleanup",
+ "paramRequired": true
+ },
+ {
+ "paramName": "o",
+ "paramLongName": "outputPath",
+ "paramDescription": "the path of the output relation cleaned",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml
index 7c500493f..fedc68d9d 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml
@@ -92,9 +92,34 @@
--conf spark.sql.shuffle.partitions=15000
--graphBasePath${graphBasePath}
- --o${graphOutputPath}
+ --outputPath${workingPath}/propagaterelation/
--workingPath${workingPath}
+
+
+
+
+
+
+ yarn
+ cluster
+ Clean Relations
+ eu.dnetlib.dhp.oa.dedup.SparkCleanRelation
+ dhp-dedup-openaire-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=15000
+
+ --graphBasePath${graphBasePath}
+ --inputPath${workingPath}/propagaterelation/relation
+ --outputPath${graphOutputPath}/relation
+
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
index 6a2c6dcc5..c382e922e 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
@@ -1,11 +1,32 @@
package eu.dnetlib.dhp.oa.dedup;
-import static java.nio.file.Files.createTempDirectory;
-
-import static org.apache.spark.sql.functions.count;
-import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.Mockito.lenient;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.util.MapDocumentUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.Dataset;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import scala.Tuple2;
import java.io.File;
import java.io.FileReader;
@@ -19,36 +40,11 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FilterFunction;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.junit.jupiter.api.*;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Sets;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.common.ModelConstants;
-import eu.dnetlib.dhp.schema.oaf.*;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import eu.dnetlib.pace.util.MapDocumentUtil;
-import scala.Tuple2;
+import static java.nio.file.Files.createTempDirectory;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.count;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@@ -681,15 +677,16 @@ public class SparkDedupTest implements Serializable {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"));
+ String outputRelPath = testDedupGraphBasePath + "/propagaterelation";
parser
.parseArgument(
new String[] {
- "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
+ "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", outputRelPath
});
new SparkPropagateRelation(parser, spark).run(isLookUpService);
- long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
+ long relations = jsc.textFile(outputRelPath + "/relation").count();
// assertEquals(4860, relations);
System.out.println("relations = " + relations);
@@ -708,7 +705,7 @@ public class SparkDedupTest implements Serializable {
(PairFunction) r -> new Tuple2(r.getString(0), "d"));
JavaRDD toCheck = jsc
- .textFile(testDedupGraphBasePath + "/relation")
+ .textFile(outputRelPath + "/relation")
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json))
.join(mergedIds)
.map(t -> t._2()._1())
@@ -724,6 +721,65 @@ public class SparkDedupTest implements Serializable {
@Test
@Order(8)
+ void testCleanBaseRelations() throws Exception {
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
+
+ // append dangling relations to be cleaned up
+ Dataset df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testGraphBasePath + "/relation");
+ Dataset df_input =df_before
+ .unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a")))
+ .unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a")));
+ df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp");
+
+ parser
+ .parseArgument(
+ new String[]{
+ "--graphBasePath", testGraphBasePath,
+ "--inputPath", testGraphBasePath + "/relation",
+ "--outputPath", testDedupGraphBasePath + "/relation"
+ });
+
+ new SparkCleanRelation(parser, spark).run(isLookUpService);
+
+ Dataset df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation");
+
+ assertNotEquals(df_before.count(), df_input.count());
+ assertNotEquals(df_input.count(), df_after.count());
+ assertEquals(5, df_after.count());
+ }
+
+ @Test
+ @Order(9)
+ void testCleanDedupedRelations() throws Exception {
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
+
+ String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation";
+
+ // append dangling relations to be cleaned up
+ Dataset df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputRelPath);
+
+ df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false);
+
+ parser
+ .parseArgument(
+ new String[]{
+ "--graphBasePath", testGraphBasePath,
+ "--inputPath", inputRelPath,
+ "--outputPath", testDedupGraphBasePath + "/relation"
+ });
+
+ new SparkCleanRelation(parser, spark).run(isLookUpService);
+
+ Dataset df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation");
+
+ assertNotEquals(df_before.count(), df_after.count());
+ assertEquals(0, df_after.count());
+ }
+
+ @Test
+ @Order(10)
void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);