Compare commits

...

2 Commits

3 changed files with 15 additions and 11 deletions

View File

@ -73,6 +73,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(Encoders.bean(Relation.class));
//<mergedObjectID, dedupID>
Dataset<Tuple2<String, String>> mergedIds = mergeRels
.where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source"), col("target"))
@ -115,25 +116,31 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
}
//redirect the relations to the dedupID
private static Dataset<Relation> createNewRels(
Dataset<Relation> rels,
Dataset<Tuple2<String, String>> mergedIds,
Dataset<Relation> rels, //all the relations to be redirected
Dataset<Tuple2<String, String>> mergedIds, //merge rels: <mergedObjectID, dedupID>
MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> mapRel) {
//<sourceID, relation, targetID>
Dataset<Tuple3<String, Relation, String>> mapped = rels
.map(
(MapFunction<Relation, Tuple3<String, Relation, String>>) r -> new Tuple3<>(getId(r, FieldType.SOURCE),
r, getId(r, FieldType.TARGET)),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING()));
//< <sourceID, relation, target>, <sourceID, dedupID> >
Dataset<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>> relSource = mapped
.joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer");
return relSource
.joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_2")), "left_outer")
//< <<sourceID, relation, targetID>, <sourceID, dedupID>>, <targetID, dedupID> >
Dataset<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>> relSourceTarget = relSource
.joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_1")), "left_outer");
return relSourceTarget
.filter(
(FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>) r -> r
._2() != null || r._1() != null)
(FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>)
r -> r._1()._1() != null || r._2() != null)
.map(mapRel, Encoders.bean(Relation.class))
.distinct();
}

View File

@ -536,7 +536,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4718, relations);
assertEquals(4862, relations);
// check deletedbyinference
final Dataset<Relation> mergeRels = spark

View File

@ -57,9 +57,6 @@ public class SparkOpenorgsProvisionTest implements Serializable {
private static String testDedupGraphBasePath;
private static final String testActionSetId = "test-orchestrator";
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@BeforeAll
public static void cleanUp() throws IOException, URISyntaxException {
@ -259,7 +256,7 @@ public class SparkOpenorgsProvisionTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(2520, relations);
assertEquals(4894, relations);
// check deletedbyinference
final Dataset<Relation> mergeRels = spark