Compare commits

...

2 Commits

3 changed files with 15 additions and 11 deletions

View File

@ -73,6 +73,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(Encoders.bean(Relation.class)); .as(Encoders.bean(Relation.class));
//<mergedObjectID, dedupID>
Dataset<Tuple2<String, String>> mergedIds = mergeRels Dataset<Tuple2<String, String>> mergedIds = mergeRels
.where(col("relClass").equalTo(ModelConstants.MERGES)) .where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source"), col("target")) .select(col("source"), col("target"))
@ -115,25 +116,31 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class)); .map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
} }
//redirect the relations to the dedupID
private static Dataset<Relation> createNewRels( private static Dataset<Relation> createNewRels(
Dataset<Relation> rels, Dataset<Relation> rels, //all the relations to be redirected
Dataset<Tuple2<String, String>> mergedIds, Dataset<Tuple2<String, String>> mergedIds, //merge rels: <mergedObjectID, dedupID>
MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> mapRel) { MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> mapRel) {
//<sourceID, relation, targetID>
Dataset<Tuple3<String, Relation, String>> mapped = rels Dataset<Tuple3<String, Relation, String>> mapped = rels
.map( .map(
(MapFunction<Relation, Tuple3<String, Relation, String>>) r -> new Tuple3<>(getId(r, FieldType.SOURCE), (MapFunction<Relation, Tuple3<String, Relation, String>>) r -> new Tuple3<>(getId(r, FieldType.SOURCE),
r, getId(r, FieldType.TARGET)), r, getId(r, FieldType.TARGET)),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING())); Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING()));
//< <sourceID, relation, target>, <sourceID, dedupID> >
Dataset<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>> relSource = mapped Dataset<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>> relSource = mapped
.joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer"); .joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer");
return relSource //< <<sourceID, relation, targetID>, <sourceID, dedupID>>, <targetID, dedupID> >
.joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_2")), "left_outer") Dataset<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>> relSourceTarget = relSource
.joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_1")), "left_outer");
return relSourceTarget
.filter( .filter(
(FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>) r -> r (FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>)
._2() != null || r._1() != null) r -> r._1()._1() != null || r._2() != null)
.map(mapRel, Encoders.bean(Relation.class)) .map(mapRel, Encoders.bean(Relation.class))
.distinct(); .distinct();
} }

View File

@ -536,7 +536,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4718, relations); assertEquals(4862, relations);
// check deletedbyinference // check deletedbyinference
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark

View File

@ -57,9 +57,6 @@ public class SparkOpenorgsProvisionTest implements Serializable {
private static String testDedupGraphBasePath; private static String testDedupGraphBasePath;
private static final String testActionSetId = "test-orchestrator"; private static final String testActionSetId = "test-orchestrator";
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@BeforeAll @BeforeAll
public static void cleanUp() throws IOException, URISyntaxException { public static void cleanUp() throws IOException, URISyntaxException {
@ -259,7 +256,7 @@ public class SparkOpenorgsProvisionTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(2520, relations); assertEquals(4894, relations);
// check deletedbyinference // check deletedbyinference
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark