forked from D-Net/dnet-hadoop
Merge pull request 'Add a "CleanRelation" action after the PropagateRelation to filter out all relations that have been deleted by inference or that are pointing to dangling entities' (#328) from cleanup_relations_after_dedup into beta
Reviewed-on: D-Net/dnet-hadoop#328
This commit is contained in:
commit
c334fe2438
|
@ -0,0 +1,78 @@
|
||||||
|
package eu.dnetlib.dhp.oa.dedup
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
|
import eu.dnetlib.dhp.common.HdfsSupport
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation
|
||||||
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService
|
||||||
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.sql._
|
||||||
|
import org.apache.spark.sql.functions.col
|
||||||
|
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
object SparkCleanRelation {
|
||||||
|
private val log = LoggerFactory.getLogger(classOf[SparkCleanRelation])
|
||||||
|
|
||||||
|
@throws[Exception]
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils.toString(
|
||||||
|
classOf[SparkCleanRelation].getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
parser.parseArgument(args)
|
||||||
|
val conf = new SparkConf
|
||||||
|
|
||||||
|
new SparkCleanRelation(parser, AbstractSparkAction.getSparkSession(conf))
|
||||||
|
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession)
|
||||||
|
extends AbstractSparkAction(parser, spark) {
|
||||||
|
override def run(isLookUpService: ISLookUpService): Unit = {
|
||||||
|
val graphBasePath = parser.get("graphBasePath")
|
||||||
|
val inputPath = parser.get("inputPath")
|
||||||
|
val outputPath = parser.get("outputPath")
|
||||||
|
|
||||||
|
SparkCleanRelation.log.info("graphBasePath: '{}'", graphBasePath)
|
||||||
|
SparkCleanRelation.log.info("inputPath: '{}'", inputPath)
|
||||||
|
SparkCleanRelation.log.info("outputPath: '{}'", outputPath)
|
||||||
|
|
||||||
|
AbstractSparkAction.removeOutputDir(spark, outputPath)
|
||||||
|
|
||||||
|
val entities =
|
||||||
|
Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct")
|
||||||
|
|
||||||
|
val idsSchema = StructType.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>")
|
||||||
|
|
||||||
|
val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"),
|
||||||
|
idsSchema)
|
||||||
|
|
||||||
|
val ids = entities
|
||||||
|
.foldLeft(emptyIds)((ds, entity) => {
|
||||||
|
val entityPath = graphBasePath + '/' + entity
|
||||||
|
if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) {
|
||||||
|
ds.union(spark.read.schema(idsSchema).json(entityPath))
|
||||||
|
} else {
|
||||||
|
ds
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
|
||||||
|
.select("id")
|
||||||
|
.distinct()
|
||||||
|
|
||||||
|
val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath)
|
||||||
|
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
|
||||||
|
|
||||||
|
AbstractSparkAction.save(
|
||||||
|
relations
|
||||||
|
.join(ids, col("source") === ids("id"), "leftsemi")
|
||||||
|
.join(ids, col("target") === ids("id"), "leftsemi"),
|
||||||
|
outputPath,
|
||||||
|
SaveMode.Overwrite
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "i",
|
||||||
|
"paramLongName": "graphBasePath",
|
||||||
|
"paramDescription": "the base path of raw graph",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "w",
|
||||||
|
"paramLongName": "inputPath",
|
||||||
|
"paramDescription": "the path to the input relation to cleanup",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "o",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path of the output relation cleaned",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -92,9 +92,34 @@
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--o</arg><arg>${graphOutputPath}</arg>
|
<arg>--outputPath</arg><arg>${workingPath}/propagaterelation/</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
<ok to="CleanRelation"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="CleanRelation">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Clean Relations</name>
|
||||||
|
<class>eu.dnetlib.dhp.oa.dedup.SparkCleanRelation</class>
|
||||||
|
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
|
<arg>--inputPath</arg><arg>${workingPath}/propagaterelation/relation</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
|
||||||
|
</spark>
|
||||||
<ok to="group_entities"/>
|
<ok to="group_entities"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
|
@ -1,11 +1,32 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.dedup;
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
import static java.nio.file.Files.createTempDirectory;
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import static org.apache.spark.sql.functions.count;
|
import com.google.common.collect.Sets;
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import static org.mockito.Mockito.lenient;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.junit.jupiter.api.*;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
|
@ -19,36 +40,11 @@ import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import static java.nio.file.Files.createTempDirectory;
|
||||||
import org.apache.commons.io.IOUtils;
|
import static org.apache.spark.sql.functions.col;
|
||||||
import org.apache.spark.SparkConf;
|
import static org.apache.spark.sql.functions.count;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import static org.mockito.Mockito.lenient;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.Row;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.junit.jupiter.api.*;
|
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
|
||||||
import org.mockito.Mock;
|
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||||
|
@ -681,15 +677,16 @@ public class SparkDedupTest implements Serializable {
|
||||||
|
|
||||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"));
|
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"));
|
||||||
|
String outputRelPath = testDedupGraphBasePath + "/propagaterelation";
|
||||||
parser
|
parser
|
||||||
.parseArgument(
|
.parseArgument(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
|
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", outputRelPath
|
||||||
});
|
});
|
||||||
|
|
||||||
new SparkPropagateRelation(parser, spark).run(isLookUpService);
|
new SparkPropagateRelation(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
long relations = jsc.textFile(outputRelPath + "/relation").count();
|
||||||
|
|
||||||
// assertEquals(4860, relations);
|
// assertEquals(4860, relations);
|
||||||
System.out.println("relations = " + relations);
|
System.out.println("relations = " + relations);
|
||||||
|
@ -708,7 +705,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
|
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
|
||||||
|
|
||||||
JavaRDD<String> toCheck = jsc
|
JavaRDD<String> toCheck = jsc
|
||||||
.textFile(testDedupGraphBasePath + "/relation")
|
.textFile(outputRelPath + "/relation")
|
||||||
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json))
|
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json))
|
||||||
.join(mergedIds)
|
.join(mergedIds)
|
||||||
.map(t -> t._2()._1())
|
.map(t -> t._2()._1())
|
||||||
|
@ -724,6 +721,65 @@ public class SparkDedupTest implements Serializable {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Order(8)
|
@Order(8)
|
||||||
|
void testCleanBaseRelations() throws Exception {
|
||||||
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
|
||||||
|
|
||||||
|
// append dangling relations to be cleaned up
|
||||||
|
Dataset<Row> df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testGraphBasePath + "/relation");
|
||||||
|
Dataset<Row> df_input =df_before
|
||||||
|
.unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a")))
|
||||||
|
.unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a")));
|
||||||
|
df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp");
|
||||||
|
|
||||||
|
parser
|
||||||
|
.parseArgument(
|
||||||
|
new String[]{
|
||||||
|
"--graphBasePath", testGraphBasePath,
|
||||||
|
"--inputPath", testGraphBasePath + "/relation",
|
||||||
|
"--outputPath", testDedupGraphBasePath + "/relation"
|
||||||
|
});
|
||||||
|
|
||||||
|
new SparkCleanRelation(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
|
Dataset<Row> df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation");
|
||||||
|
|
||||||
|
assertNotEquals(df_before.count(), df_input.count());
|
||||||
|
assertNotEquals(df_input.count(), df_after.count());
|
||||||
|
assertEquals(5, df_after.count());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Order(9)
|
||||||
|
void testCleanDedupedRelations() throws Exception {
|
||||||
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
|
||||||
|
|
||||||
|
String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation";
|
||||||
|
|
||||||
|
// append dangling relations to be cleaned up
|
||||||
|
Dataset<Row> df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputRelPath);
|
||||||
|
|
||||||
|
df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false);
|
||||||
|
|
||||||
|
parser
|
||||||
|
.parseArgument(
|
||||||
|
new String[]{
|
||||||
|
"--graphBasePath", testGraphBasePath,
|
||||||
|
"--inputPath", inputRelPath,
|
||||||
|
"--outputPath", testDedupGraphBasePath + "/relation"
|
||||||
|
});
|
||||||
|
|
||||||
|
new SparkCleanRelation(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
|
Dataset<Row> df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation");
|
||||||
|
|
||||||
|
assertNotEquals(df_before.count(), df_after.count());
|
||||||
|
assertEquals(0, df_after.count());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Order(10)
|
||||||
void testRelations() throws Exception {
|
void testRelations() throws Exception {
|
||||||
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
|
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
|
||||||
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
|
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
|
||||||
|
|
Loading…
Reference in New Issue