forked from D-Net/dnet-hadoop
Include SparkCleanRelation logic in SparkPropagateRelation
SparkPropagateRelation includes merge relations Revised tests for SparkPropagateRelation
This commit is contained in:
parent
488d9a1cea
commit
2caaaec42d
|
@ -1,57 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.dedup;
|
|
||||||
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
import org.apache.spark.sql.Encoder;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.expressions.Aggregator;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
|
|
||||||
public class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
|
|
||||||
|
|
||||||
private static final Relation ZERO = new Relation();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Relation zero() {
|
|
||||||
return ZERO;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Relation reduce(Relation b, Relation a) {
|
|
||||||
return mergeRel(b, a);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Relation merge(Relation b, Relation a) {
|
|
||||||
return mergeRel(b, a);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Relation finish(Relation r) {
|
|
||||||
return r;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Relation mergeRel(Relation b, Relation a) {
|
|
||||||
if (Objects.equals(b, ZERO)) {
|
|
||||||
return a;
|
|
||||||
}
|
|
||||||
if (Objects.equals(a, ZERO)) {
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
|
|
||||||
b.mergeFrom(a);
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Encoder<Relation> bufferEncoder() {
|
|
||||||
return Encoders.kryo(Relation.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Encoder<Relation> outputEncoder() {
|
|
||||||
return Encoders.kryo(Relation.class);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,78 +0,0 @@
|
||||||
package eu.dnetlib.dhp.oa.dedup
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation
|
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService
|
|
||||||
import org.apache.commons.io.IOUtils
|
|
||||||
import org.apache.spark.SparkConf
|
|
||||||
import org.apache.spark.sql._
|
|
||||||
import org.apache.spark.sql.functions.col
|
|
||||||
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
|
|
||||||
import org.slf4j.LoggerFactory
|
|
||||||
|
|
||||||
object SparkCleanRelation {
|
|
||||||
private val log = LoggerFactory.getLogger(classOf[SparkCleanRelation])
|
|
||||||
|
|
||||||
@throws[Exception]
|
|
||||||
def main(args: Array[String]): Unit = {
|
|
||||||
val parser = new ArgumentApplicationParser(
|
|
||||||
IOUtils.toString(
|
|
||||||
classOf[SparkCleanRelation].getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")
|
|
||||||
)
|
|
||||||
)
|
|
||||||
parser.parseArgument(args)
|
|
||||||
val conf = new SparkConf
|
|
||||||
|
|
||||||
new SparkCleanRelation(parser, AbstractSparkAction.getSparkSession(conf))
|
|
||||||
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession)
|
|
||||||
extends AbstractSparkAction(parser, spark) {
|
|
||||||
override def run(isLookUpService: ISLookUpService): Unit = {
|
|
||||||
val graphBasePath = parser.get("graphBasePath")
|
|
||||||
val inputPath = parser.get("inputPath")
|
|
||||||
val outputPath = parser.get("outputPath")
|
|
||||||
|
|
||||||
SparkCleanRelation.log.info("graphBasePath: '{}'", graphBasePath)
|
|
||||||
SparkCleanRelation.log.info("inputPath: '{}'", inputPath)
|
|
||||||
SparkCleanRelation.log.info("outputPath: '{}'", outputPath)
|
|
||||||
|
|
||||||
AbstractSparkAction.removeOutputDir(spark, outputPath)
|
|
||||||
|
|
||||||
val entities =
|
|
||||||
Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct")
|
|
||||||
|
|
||||||
val idsSchema = StructType.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>")
|
|
||||||
|
|
||||||
val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"),
|
|
||||||
idsSchema)
|
|
||||||
|
|
||||||
val ids = entities
|
|
||||||
.foldLeft(emptyIds)((ds, entity) => {
|
|
||||||
val entityPath = graphBasePath + '/' + entity
|
|
||||||
if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) {
|
|
||||||
ds.union(spark.read.schema(idsSchema).json(entityPath))
|
|
||||||
} else {
|
|
||||||
ds
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
|
|
||||||
.select("id")
|
|
||||||
.distinct()
|
|
||||||
|
|
||||||
val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath)
|
|
||||||
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
|
|
||||||
|
|
||||||
AbstractSparkAction.save(
|
|
||||||
relations
|
|
||||||
.join(ids, col("source") === ids("id"), "leftsemi")
|
|
||||||
.join(ids, col("target") === ids("id"), "leftsemi"),
|
|
||||||
outputPath,
|
|
||||||
SaveMode.Overwrite
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,23 +3,19 @@ package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
import static org.apache.spark.sql.functions.col;
|
import static org.apache.spark.sql.functions.col;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
import org.apache.commons.beanutils.BeanUtils;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.ReduceFunction;
|
import org.apache.spark.api.java.function.ReduceFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
|
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
|
||||||
|
import org.apache.spark.sql.types.StructType;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
|
@ -70,73 +66,63 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
||||||
log.info("workingPath: '{}'", workingPath);
|
log.info("workingPath: '{}'", workingPath);
|
||||||
log.info("graphOutputPath: '{}'", graphOutputPath);
|
log.info("graphOutputPath: '{}'", graphOutputPath);
|
||||||
|
|
||||||
final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation");
|
|
||||||
removeOutputDir(spark, outputRelationPath);
|
|
||||||
|
|
||||||
Dataset<Relation> mergeRels = spark
|
Dataset<Relation> mergeRels = spark
|
||||||
.read()
|
.read()
|
||||||
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
|
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
|
||||||
.as(REL_BEAN_ENC);
|
.as(REL_BEAN_ENC);
|
||||||
|
|
||||||
// <mergedObjectID, dedupID>
|
// <mergedObjectID, dedupID>
|
||||||
Dataset<Row> mergedIds = mergeRels
|
Dataset<Row> idsToMerge = mergeRels
|
||||||
.where(col("relClass").equalTo(ModelConstants.MERGES))
|
.where(col("relClass").equalTo(ModelConstants.MERGES))
|
||||||
.select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
|
.select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
|
||||||
.distinct()
|
.distinct();
|
||||||
.cache();
|
|
||||||
|
|
||||||
Dataset<Row> allRels = spark
|
Dataset<Row> allRels = spark
|
||||||
.read()
|
.read()
|
||||||
.schema(REL_BEAN_ENC.schema())
|
.schema(REL_BEAN_ENC.schema())
|
||||||
.json(DedupUtility.createEntityPath(graphBasePath, "relation"));
|
.json(graphBasePath + "/relation");
|
||||||
|
|
||||||
Dataset<Relation> dedupedRels = allRels
|
Dataset<Relation> dedupedRels = allRels
|
||||||
.joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
|
.joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
|
||||||
.joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
|
.joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
|
||||||
.select("_1._1", "_1._2.dedupID", "_2.dedupID")
|
.select("_1._1", "_1._2.dedupID", "_2.dedupID")
|
||||||
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
|
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
|
||||||
.flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC);
|
.map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
|
||||||
|
Relation rel = t._1();
|
||||||
|
String newSource = t._2();
|
||||||
|
String newTarget = t._3();
|
||||||
|
|
||||||
Dataset<Relation> processedRelations = distinctRelations(
|
if (rel.getDataInfo() == null) {
|
||||||
dedupedRels.union(mergeRels.map((MapFunction<Relation, Relation>) r -> r, REL_KRYO_ENC)))
|
rel.setDataInfo(new DataInfo());
|
||||||
.filter((FilterFunction<Relation>) r -> !Objects.equals(r.getSource(), r.getTarget()));
|
}
|
||||||
|
|
||||||
save(processedRelations, outputRelationPath, SaveMode.Overwrite);
|
if (newSource != null || newTarget != null) {
|
||||||
}
|
rel.getDataInfo().setDeletedbyinference(false);
|
||||||
|
|
||||||
private static Iterator<Relation> addInferredRelations(Tuple3<Relation, String, String> t) throws Exception {
|
if (newSource != null)
|
||||||
Relation existingRel = t._1();
|
rel.setSource(newSource);
|
||||||
String newSource = t._2();
|
|
||||||
String newTarget = t._3();
|
|
||||||
|
|
||||||
if (newSource == null && newTarget == null) {
|
if (newTarget != null)
|
||||||
return Collections.singleton(t._1()).iterator();
|
rel.setTarget(newTarget);
|
||||||
}
|
}
|
||||||
|
|
||||||
// update existing relation
|
return rel;
|
||||||
if (existingRel.getDataInfo() == null) {
|
}, REL_BEAN_ENC);
|
||||||
existingRel.setDataInfo(new DataInfo());
|
|
||||||
}
|
|
||||||
existingRel.getDataInfo().setDeletedbyinference(true);
|
|
||||||
|
|
||||||
// Create new relation inferred by dedupIDs
|
// ids of records that are both not deletedbyinference and not invisible
|
||||||
Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel);
|
Dataset<Row> ids = validIds(spark, graphBasePath);
|
||||||
|
|
||||||
inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo()));
|
// filter relations that point to valid records, can force them to be visible
|
||||||
inferredRel.getDataInfo().setDeletedbyinference(false);
|
Dataset<Relation> cleanedRels = dedupedRels
|
||||||
|
.join(ids, col("source").equalTo(ids.col("id")), "leftsemi")
|
||||||
|
.join(ids, col("target").equalTo(ids.col("id")), "leftsemi")
|
||||||
|
.as(REL_BEAN_ENC)
|
||||||
|
.map((MapFunction<Relation, Relation>) r -> {
|
||||||
|
r.getDataInfo().setInvisible(false);
|
||||||
|
return r;
|
||||||
|
}, REL_KRYO_ENC);
|
||||||
|
|
||||||
if (newSource != null)
|
Dataset<Relation> distinctRels = cleanedRels
|
||||||
inferredRel.setSource(newSource);
|
|
||||||
|
|
||||||
if (newTarget != null)
|
|
||||||
inferredRel.setTarget(newTarget);
|
|
||||||
|
|
||||||
return Arrays.asList(existingRel, inferredRel).iterator();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
|
|
||||||
return rels
|
|
||||||
.filter(getRelationFilterFunction())
|
|
||||||
.groupByKey(
|
.groupByKey(
|
||||||
(MapFunction<Relation, String>) r -> String
|
(MapFunction<Relation, String>) r -> String
|
||||||
.join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
|
.join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
|
||||||
|
@ -146,13 +132,33 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
||||||
return b;
|
return b;
|
||||||
})
|
})
|
||||||
.map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC);
|
.map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC);
|
||||||
|
|
||||||
|
final String outputRelationPath = graphOutputPath + "/relation";
|
||||||
|
removeOutputDir(spark, outputRelationPath);
|
||||||
|
save(
|
||||||
|
distinctRels
|
||||||
|
.union(mergeRels)
|
||||||
|
.filter("source != target AND dataInfo.deletedbyinference != true AND dataInfo.invisible != true"),
|
||||||
|
outputRelationPath,
|
||||||
|
SaveMode.Overwrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
private FilterFunction<Relation> getRelationFilterFunction() {
|
static Dataset<Row> validIds(SparkSession spark, String graphBasePath) {
|
||||||
return r -> StringUtils.isNotBlank(r.getSource()) ||
|
StructType idsSchema = StructType
|
||||||
StringUtils.isNotBlank(r.getTarget()) ||
|
.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>");
|
||||||
StringUtils.isNotBlank(r.getRelType()) ||
|
|
||||||
StringUtils.isNotBlank(r.getSubRelType()) ||
|
Dataset<Row> allIds = spark.emptyDataset(RowEncoder.apply(idsSchema));
|
||||||
StringUtils.isNotBlank(r.getRelClass());
|
|
||||||
|
for (EntityType entityType : ModelSupport.entityTypes.keySet()) {
|
||||||
|
String entityPath = graphBasePath + '/' + entityType.name();
|
||||||
|
if (HdfsSupport.exists(entityPath, spark.sparkContext().hadoopConfiguration())) {
|
||||||
|
allIds = allIds.union(spark.read().schema(idsSchema).json(entityPath));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return allIds
|
||||||
|
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
|
||||||
|
.select("id")
|
||||||
|
.distinct();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,20 +0,0 @@
|
||||||
[
|
|
||||||
{
|
|
||||||
"paramName": "i",
|
|
||||||
"paramLongName": "graphBasePath",
|
|
||||||
"paramDescription": "the base path of raw graph",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "w",
|
|
||||||
"paramLongName": "inputPath",
|
|
||||||
"paramDescription": "the path to the input relation to cleanup",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "o",
|
|
||||||
"paramLongName": "outputPath",
|
|
||||||
"paramDescription": "the path of the output relation cleaned",
|
|
||||||
"paramRequired": true
|
|
||||||
}
|
|
||||||
]
|
|
|
@ -100,35 +100,9 @@
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--graphOutputPath</arg><arg>${workingPath}/propagaterelation/</arg>
|
<arg>--graphOutputPath</arg><arg>${graphOutputPath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="CleanRelation"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="CleanRelation">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Clean Relations</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.dedup.SparkCleanRelation</class>
|
|
||||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
|
||||||
<arg>--inputPath</arg><arg>${workingPath}/propagaterelation/relation</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="group_entities"/>
|
<ok to="group_entities"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
|
@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
import static java.nio.file.Files.createTempDirectory;
|
import static java.nio.file.Files.createTempDirectory;
|
||||||
|
|
||||||
import static org.apache.spark.sql.functions.col;
|
|
||||||
import static org.apache.spark.sql.functions.count;
|
import static org.apache.spark.sql.functions.count;
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
import static org.mockito.Mockito.lenient;
|
import static org.mockito.Mockito.lenient;
|
||||||
|
@ -23,14 +22,13 @@ import java.util.stream.Collectors;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.junit.jupiter.api.*;
|
import org.junit.jupiter.api.*;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
|
@ -46,8 +44,6 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||||
|
@ -62,6 +58,8 @@ public class SparkDedupTest implements Serializable {
|
||||||
private static String testGraphBasePath;
|
private static String testGraphBasePath;
|
||||||
private static String testOutputBasePath;
|
private static String testOutputBasePath;
|
||||||
private static String testDedupGraphBasePath;
|
private static String testDedupGraphBasePath;
|
||||||
|
private static String testConsistencyGraphBasePath;
|
||||||
|
|
||||||
private static final String testActionSetId = "test-orchestrator";
|
private static final String testActionSetId = "test-orchestrator";
|
||||||
private static String whitelistPath;
|
private static String whitelistPath;
|
||||||
private static List<String> whiteList;
|
private static List<String> whiteList;
|
||||||
|
@ -75,6 +73,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
|
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
|
||||||
.toFile()
|
.toFile()
|
||||||
.getAbsolutePath();
|
.getAbsolutePath();
|
||||||
|
|
||||||
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
|
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
|
||||||
.toAbsolutePath()
|
.toAbsolutePath()
|
||||||
.toString();
|
.toString();
|
||||||
|
@ -83,6 +82,10 @@ public class SparkDedupTest implements Serializable {
|
||||||
.toAbsolutePath()
|
.toAbsolutePath()
|
||||||
.toString();
|
.toString();
|
||||||
|
|
||||||
|
testConsistencyGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
|
||||||
|
.toAbsolutePath()
|
||||||
|
.toString();
|
||||||
|
|
||||||
whitelistPath = Paths
|
whitelistPath = Paths
|
||||||
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI())
|
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI())
|
||||||
.toFile()
|
.toFile()
|
||||||
|
@ -674,22 +677,45 @@ public class SparkDedupTest implements Serializable {
|
||||||
assertEquals(mergedOrp, deletedOrp);
|
assertEquals(mergedOrp, deletedOrp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Order(6)
|
||||||
|
void copyRelationsNoOpenorgsTest() throws Exception {
|
||||||
|
|
||||||
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkCopyRelationsNoOpenorgs.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
|
||||||
|
parser
|
||||||
|
.parseArgument(
|
||||||
|
new String[] {
|
||||||
|
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
|
||||||
|
});
|
||||||
|
|
||||||
|
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
|
final Dataset<Row> outputRels = spark.read().text(testDedupGraphBasePath + "/relation");
|
||||||
|
|
||||||
|
System.out.println(outputRels.count());
|
||||||
|
// assertEquals(2382, outputRels.count());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Order(7)
|
@Order(7)
|
||||||
void propagateRelationTest() throws Exception {
|
void propagateRelationTest() throws Exception {
|
||||||
|
|
||||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"));
|
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"));
|
||||||
String outputRelPath = testDedupGraphBasePath + "/propagaterelation";
|
|
||||||
parser
|
parser
|
||||||
.parseArgument(
|
.parseArgument(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", outputRelPath
|
"-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath
|
||||||
});
|
});
|
||||||
|
|
||||||
new SparkPropagateRelation(parser, spark).run(isLookUpService);
|
new SparkPropagateRelation(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
long relations = jsc.textFile(outputRelPath + "/relation").count();
|
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
||||||
|
|
||||||
// assertEquals(4860, relations);
|
// assertEquals(4860, relations);
|
||||||
System.out.println("relations = " + relations);
|
System.out.println("relations = " + relations);
|
||||||
|
@ -699,95 +725,52 @@ public class SparkDedupTest implements Serializable {
|
||||||
.read()
|
.read()
|
||||||
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
|
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
|
||||||
.as(Encoders.bean(Relation.class));
|
.as(Encoders.bean(Relation.class));
|
||||||
final JavaPairRDD<String, String> mergedIds = mergeRels
|
|
||||||
.where("relClass == 'merges'")
|
|
||||||
.select(mergeRels.col("target"))
|
|
||||||
.distinct()
|
|
||||||
.toJavaRDD()
|
|
||||||
.mapToPair(
|
|
||||||
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
|
|
||||||
|
|
||||||
JavaRDD<String> toCheck = jsc
|
Dataset<Row> inputRels = spark
|
||||||
.textFile(outputRelPath + "/relation")
|
.read()
|
||||||
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json))
|
.json(testDedupGraphBasePath + "/relation");
|
||||||
.join(mergedIds)
|
|
||||||
.map(t -> t._2()._1())
|
|
||||||
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json))
|
|
||||||
.join(mergedIds)
|
|
||||||
.map(t -> t._2()._1());
|
|
||||||
|
|
||||||
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count();
|
Dataset<Row> outputRels = spark
|
||||||
long updated = toCheck.count();
|
.read()
|
||||||
|
.json(testConsistencyGraphBasePath + "/relation");
|
||||||
|
|
||||||
assertEquals(updated, deletedbyinference);
|
assertEquals(
|
||||||
|
0, outputRels
|
||||||
|
.filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
5, outputRels
|
||||||
|
.filter("relClass NOT IN ('merges', 'isMergedIn')")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
assertEquals(5 + mergeRels.count(), outputRels.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Order(8)
|
@Order(8)
|
||||||
void testCleanBaseRelations() throws Exception {
|
void testCleanedPropagatedRelations() throws Exception {
|
||||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
|
||||||
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
|
|
||||||
|
|
||||||
// append dangling relations to be cleaned up
|
|
||||||
Dataset<Row> df_before = spark
|
Dataset<Row> df_before = spark
|
||||||
.read()
|
.read()
|
||||||
.schema(Encoders.bean(Relation.class).schema())
|
.schema(Encoders.bean(Relation.class).schema())
|
||||||
.json(testGraphBasePath + "/relation");
|
.json(testDedupGraphBasePath + "/relation");
|
||||||
Dataset<Row> df_input = df_before
|
|
||||||
.unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a")))
|
|
||||||
.unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a")));
|
|
||||||
df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp");
|
|
||||||
|
|
||||||
parser
|
|
||||||
.parseArgument(
|
|
||||||
new String[] {
|
|
||||||
"--graphBasePath", testGraphBasePath,
|
|
||||||
"--inputPath", testGraphBasePath + "/relation",
|
|
||||||
"--outputPath", testDedupGraphBasePath + "/relation"
|
|
||||||
});
|
|
||||||
|
|
||||||
new SparkCleanRelation(parser, spark).run(isLookUpService);
|
|
||||||
|
|
||||||
Dataset<Row> df_after = spark
|
Dataset<Row> df_after = spark
|
||||||
.read()
|
.read()
|
||||||
.schema(Encoders.bean(Relation.class).schema())
|
.schema(Encoders.bean(Relation.class).schema())
|
||||||
.json(testDedupGraphBasePath + "/relation");
|
.json(testConsistencyGraphBasePath + "/relation");
|
||||||
|
|
||||||
assertNotEquals(df_before.count(), df_input.count());
|
|
||||||
assertNotEquals(df_input.count(), df_after.count());
|
|
||||||
assertEquals(5, df_after.count());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
@Order(9)
|
|
||||||
void testCleanDedupedRelations() throws Exception {
|
|
||||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
|
||||||
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
|
|
||||||
|
|
||||||
String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation";
|
|
||||||
|
|
||||||
// append dangling relations to be cleaned up
|
|
||||||
Dataset<Row> df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputRelPath);
|
|
||||||
|
|
||||||
df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false);
|
|
||||||
|
|
||||||
parser
|
|
||||||
.parseArgument(
|
|
||||||
new String[] {
|
|
||||||
"--graphBasePath", testGraphBasePath,
|
|
||||||
"--inputPath", inputRelPath,
|
|
||||||
"--outputPath", testDedupGraphBasePath + "/relation"
|
|
||||||
});
|
|
||||||
|
|
||||||
new SparkCleanRelation(parser, spark).run(isLookUpService);
|
|
||||||
|
|
||||||
Dataset<Row> df_after = spark
|
|
||||||
.read()
|
|
||||||
.schema(Encoders.bean(Relation.class).schema())
|
|
||||||
.json(testDedupGraphBasePath + "/relation");
|
|
||||||
|
|
||||||
assertNotEquals(df_before.count(), df_after.count());
|
assertNotEquals(df_before.count(), df_after.count());
|
||||||
assertEquals(0, df_after.count());
|
|
||||||
|
assertEquals(
|
||||||
|
0, df_after
|
||||||
|
.filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
5, df_after
|
||||||
|
.filter("relClass NOT IN ('merges', 'isMergedIn')")
|
||||||
|
.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -813,6 +796,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
public static void finalCleanUp() throws IOException {
|
public static void finalCleanUp() throws IOException {
|
||||||
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
||||||
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
||||||
|
FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath));
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isDeletedByInference(String s) {
|
public boolean isDeletedByInference(String s) {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
import static java.nio.file.Files.createTempDirectory;
|
import static java.nio.file.Files.createTempDirectory;
|
||||||
|
|
||||||
|
import static org.apache.spark.sql.functions.col;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.mockito.Mockito.lenient;
|
import static org.mockito.Mockito.lenient;
|
||||||
|
|
||||||
|
@ -15,10 +16,6 @@ import java.nio.file.Paths;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
|
@ -33,8 +30,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||||
|
@ -44,11 +39,11 @@ public class SparkOpenorgsProvisionTest implements Serializable {
|
||||||
ISLookUpService isLookUpService;
|
ISLookUpService isLookUpService;
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
private static JavaSparkContext jsc;
|
|
||||||
|
|
||||||
private static String testGraphBasePath;
|
private static String testGraphBasePath;
|
||||||
private static String testOutputBasePath;
|
private static String testOutputBasePath;
|
||||||
private static String testDedupGraphBasePath;
|
private static String testDedupGraphBasePath;
|
||||||
|
private static String testConsistencyGraphBasePath;
|
||||||
private static final String testActionSetId = "test-orchestrator";
|
private static final String testActionSetId = "test-orchestrator";
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
|
@ -64,6 +59,9 @@ public class SparkOpenorgsProvisionTest implements Serializable {
|
||||||
testDedupGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-")
|
testDedupGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-")
|
||||||
.toAbsolutePath()
|
.toAbsolutePath()
|
||||||
.toString();
|
.toString();
|
||||||
|
testConsistencyGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-")
|
||||||
|
.toAbsolutePath()
|
||||||
|
.toString();
|
||||||
|
|
||||||
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
||||||
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
||||||
|
@ -76,8 +74,13 @@ public class SparkOpenorgsProvisionTest implements Serializable {
|
||||||
.master("local[*]")
|
.master("local[*]")
|
||||||
.config(conf)
|
.config(conf)
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
@AfterAll
|
||||||
|
public static void finalCleanUp() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
||||||
|
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
||||||
|
FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath));
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
|
@ -186,26 +189,21 @@ public class SparkOpenorgsProvisionTest implements Serializable {
|
||||||
|
|
||||||
new SparkUpdateEntity(parser, spark).run(isLookUpService);
|
new SparkUpdateEntity(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count();
|
Dataset<Row> organizations = spark.read().json(testDedupGraphBasePath + "/organization");
|
||||||
|
|
||||||
long mergedOrgs = spark
|
Dataset<Row> mergedOrgs = spark
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
|
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
|
||||||
.as(Encoders.bean(Relation.class))
|
|
||||||
.where("relClass=='merges'")
|
.where("relClass=='merges'")
|
||||||
.javaRDD()
|
.select("target")
|
||||||
.map(Relation::getTarget)
|
.distinct();
|
||||||
.distinct()
|
|
||||||
.count();
|
|
||||||
|
|
||||||
assertEquals(80, organizations);
|
assertEquals(80, organizations.count());
|
||||||
|
|
||||||
long deletedOrgs = jsc
|
Dataset<Row> deletedOrgs = organizations
|
||||||
.textFile(testDedupGraphBasePath + "/organization")
|
.filter("dataInfo.deletedbyinference = TRUE");
|
||||||
.filter(this::isDeletedByInference)
|
|
||||||
.count();
|
|
||||||
|
|
||||||
assertEquals(mergedOrgs, deletedOrgs);
|
assertEquals(mergedOrgs.count(), deletedOrgs.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -226,10 +224,9 @@ public class SparkOpenorgsProvisionTest implements Serializable {
|
||||||
|
|
||||||
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
|
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
final JavaRDD<String> rels = jsc.textFile(testDedupGraphBasePath + "/relation");
|
final Dataset<Row> outputRels = spark.read().text(testDedupGraphBasePath + "/relation");
|
||||||
|
|
||||||
assertEquals(2382, rels.count());
|
|
||||||
|
|
||||||
|
assertEquals(2382, outputRels.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -244,51 +241,41 @@ public class SparkOpenorgsProvisionTest implements Serializable {
|
||||||
parser
|
parser
|
||||||
.parseArgument(
|
.parseArgument(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
|
"-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath
|
||||||
});
|
});
|
||||||
|
|
||||||
new SparkPropagateRelation(parser, spark).run(isLookUpService);
|
new SparkPropagateRelation(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
|
||||||
|
|
||||||
assertEquals(4896, relations);
|
|
||||||
|
|
||||||
// check deletedbyinference
|
|
||||||
final Dataset<Relation> mergeRels = spark
|
final Dataset<Relation> mergeRels = spark
|
||||||
.read()
|
.read()
|
||||||
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
|
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
|
||||||
.as(Encoders.bean(Relation.class));
|
.as(Encoders.bean(Relation.class));
|
||||||
final JavaPairRDD<String, String> mergedIds = mergeRels
|
|
||||||
|
Dataset<Row> inputRels = spark
|
||||||
|
.read()
|
||||||
|
.json(testDedupGraphBasePath + "/relation");
|
||||||
|
|
||||||
|
Dataset<Row> outputRels = spark
|
||||||
|
.read()
|
||||||
|
.json(testConsistencyGraphBasePath + "/relation");
|
||||||
|
|
||||||
|
final Dataset<Row> mergedIds = mergeRels
|
||||||
.where("relClass == 'merges'")
|
.where("relClass == 'merges'")
|
||||||
.select(mergeRels.col("target"))
|
.select(col("target").as("id"))
|
||||||
.distinct()
|
.distinct();
|
||||||
.toJavaRDD()
|
|
||||||
.mapToPair(
|
|
||||||
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
|
|
||||||
|
|
||||||
JavaRDD<String> toCheck = jsc
|
Dataset<Row> toUpdateRels = inputRels
|
||||||
.textFile(testDedupGraphBasePath + "/relation")
|
.as("rel")
|
||||||
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json))
|
.join(mergedIds.as("s"), col("rel.source").equalTo(col("s.id")), "left_outer")
|
||||||
.join(mergedIds)
|
.join(mergedIds.as("t"), col("rel.target").equalTo(col("t.id")), "left_outer")
|
||||||
.map(t -> t._2()._1())
|
.filter("s.id IS NOT NULL OR t.id IS NOT NULL")
|
||||||
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json))
|
.distinct();
|
||||||
.join(mergedIds)
|
|
||||||
.map(t -> t._2()._1());
|
|
||||||
|
|
||||||
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count();
|
Dataset<Row> updatedRels = inputRels
|
||||||
long updated = toCheck.count();
|
.select("source", "target", "relClass")
|
||||||
|
.except(outputRels.select("source", "target", "relClass"));
|
||||||
|
|
||||||
assertEquals(updated, deletedbyinference);
|
assertEquals(toUpdateRels.count(), updatedRels.count());
|
||||||
|
assertEquals(140, outputRels.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterAll
|
|
||||||
public static void finalCleanUp() throws IOException {
|
|
||||||
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
|
||||||
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isDeletedByInference(String s) {
|
|
||||||
return s.contains("\"deletedbyinference\":true");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue