implementation of the deletedbyinference test in propagating relations
This commit is contained in:
parent
cb0952428e
commit
418cf94642
|
@ -83,12 +83,14 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
||||||
.textFile(relationPath)
|
.textFile(relationPath)
|
||||||
.map(patchRelFn(), Encoders.bean(Relation.class));
|
.map(patchRelFn(), Encoders.bean(Relation.class));
|
||||||
|
|
||||||
|
//change raw ids with dedup ids
|
||||||
Dataset<Relation> newRels =
|
Dataset<Relation> newRels =
|
||||||
processDataset(
|
processDataset(
|
||||||
processDataset(rels, mergedIds, FieldType.SOURCE, getFixRelFn(FieldType.SOURCE)),
|
processDataset(rels, mergedIds, FieldType.SOURCE, getFixRelFn(FieldType.SOURCE)),
|
||||||
mergedIds, FieldType.TARGET, getFixRelFn(FieldType.TARGET))
|
mergedIds, FieldType.TARGET, getFixRelFn(FieldType.TARGET))
|
||||||
.filter(SparkPropagateRelation::containsDedup);
|
.filter(SparkPropagateRelation::containsDedup);
|
||||||
|
|
||||||
|
//update deletedbyinference
|
||||||
Dataset<Relation> updated = processDataset(
|
Dataset<Relation> updated = processDataset(
|
||||||
processDataset(rels, mergedIds, FieldType.SOURCE, getDeletedFn()),
|
processDataset(rels, mergedIds, FieldType.SOURCE, getDeletedFn()),
|
||||||
mergedIds, FieldType.TARGET, getDeletedFn());
|
mergedIds, FieldType.TARGET, getDeletedFn());
|
||||||
|
|
|
@ -4,17 +4,24 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.junit.jupiter.api.*;
|
import org.junit.jupiter.api.*;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -211,6 +218,27 @@ public class SparkDedupTest implements Serializable {
|
||||||
|
|
||||||
assertEquals(826, relations);
|
assertEquals(826, relations);
|
||||||
|
|
||||||
|
//check deletedbyinference
|
||||||
|
final Dataset<Relation> mergeRels = spark.read().load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")).as(Encoders.bean(Relation.class));
|
||||||
|
final JavaPairRDD<String, String> mergedIds = mergeRels
|
||||||
|
.where("relClass == 'merges'")
|
||||||
|
.select(mergeRels.col("target"))
|
||||||
|
.distinct()
|
||||||
|
.toJavaRDD()
|
||||||
|
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
|
||||||
|
|
||||||
|
JavaRDD<String> toCheck = jsc.textFile(testDedupGraphBasePath + "/relation")
|
||||||
|
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json))
|
||||||
|
.join(mergedIds)
|
||||||
|
.map(t -> t._2()._1())
|
||||||
|
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json))
|
||||||
|
.join(mergedIds)
|
||||||
|
.map(t -> t._2()._1());
|
||||||
|
|
||||||
|
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count();
|
||||||
|
long updated = toCheck.count();
|
||||||
|
|
||||||
|
assertEquals(updated, deletedbyinference);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
|
|
Loading…
Reference in New Issue