forked from D-Net/dnet-hadoop
bug fix in propagation of relations
This commit is contained in:
parent
55964cbd81
commit
3c12eeadce
|
@ -73,6 +73,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
||||||
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
|
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
|
||||||
.as(Encoders.bean(Relation.class));
|
.as(Encoders.bean(Relation.class));
|
||||||
|
|
||||||
|
//<mergedObjectID, dedupID>
|
||||||
Dataset<Tuple2<String, String>> mergedIds = mergeRels
|
Dataset<Tuple2<String, String>> mergedIds = mergeRels
|
||||||
.where(col("relClass").equalTo(ModelConstants.MERGES))
|
.where(col("relClass").equalTo(ModelConstants.MERGES))
|
||||||
.select(col("source"), col("target"))
|
.select(col("source"), col("target"))
|
||||||
|
@ -115,25 +116,31 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
||||||
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
|
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//redirect the relations to the dedupID
|
||||||
private static Dataset<Relation> createNewRels(
|
private static Dataset<Relation> createNewRels(
|
||||||
Dataset<Relation> rels,
|
Dataset<Relation> rels, //all the relations to be redirected
|
||||||
Dataset<Tuple2<String, String>> mergedIds,
|
Dataset<Tuple2<String, String>> mergedIds, //merge rels: <mergedObjectID, dedupID>
|
||||||
MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> mapRel) {
|
MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> mapRel) {
|
||||||
|
|
||||||
|
//<sourceID, relation, targetID>
|
||||||
Dataset<Tuple3<String, Relation, String>> mapped = rels
|
Dataset<Tuple3<String, Relation, String>> mapped = rels
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<Relation, Tuple3<String, Relation, String>>) r -> new Tuple3<>(getId(r, FieldType.SOURCE),
|
(MapFunction<Relation, Tuple3<String, Relation, String>>) r -> new Tuple3<>(getId(r, FieldType.SOURCE),
|
||||||
r, getId(r, FieldType.TARGET)),
|
r, getId(r, FieldType.TARGET)),
|
||||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING()));
|
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING()));
|
||||||
|
|
||||||
|
//< <sourceID, relation, target>, <sourceID, dedupID> >
|
||||||
Dataset<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>> relSource = mapped
|
Dataset<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>> relSource = mapped
|
||||||
.joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer");
|
.joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer");
|
||||||
|
|
||||||
return relSource
|
//< <<sourceID, relation, targetID>, <sourceID, dedupID>>, <targetID, dedupID> >
|
||||||
.joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_2")), "left_outer")
|
Dataset<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>> relSourceTarget = relSource
|
||||||
|
.joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_1")), "left_outer");
|
||||||
|
|
||||||
|
return relSourceTarget
|
||||||
.filter(
|
.filter(
|
||||||
(FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>) r -> r
|
(FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>)
|
||||||
._2() != null || r._1() != null)
|
r -> r._1()._1() != null || r._2() != null)
|
||||||
.map(mapRel, Encoders.bean(Relation.class))
|
.map(mapRel, Encoders.bean(Relation.class))
|
||||||
.distinct();
|
.distinct();
|
||||||
}
|
}
|
||||||
|
|
|
@ -536,7 +536,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
|
|
||||||
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
||||||
|
|
||||||
assertEquals(4718, relations);
|
assertEquals(4862, relations);
|
||||||
|
|
||||||
// check deletedbyinference
|
// check deletedbyinference
|
||||||
final Dataset<Relation> mergeRels = spark
|
final Dataset<Relation> mergeRels = spark
|
||||||
|
|
|
@ -57,9 +57,6 @@ public class SparkOpenorgsProvisionTest implements Serializable {
|
||||||
private static String testDedupGraphBasePath;
|
private static String testDedupGraphBasePath;
|
||||||
private static final String testActionSetId = "test-orchestrator";
|
private static final String testActionSetId = "test-orchestrator";
|
||||||
|
|
||||||
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
|
||||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void cleanUp() throws IOException, URISyntaxException {
|
public static void cleanUp() throws IOException, URISyntaxException {
|
||||||
|
|
||||||
|
@ -259,7 +256,7 @@ public class SparkOpenorgsProvisionTest implements Serializable {
|
||||||
|
|
||||||
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
||||||
|
|
||||||
assertEquals(2520, relations);
|
assertEquals(4894, relations);
|
||||||
|
|
||||||
// check deletedbyinference
|
// check deletedbyinference
|
||||||
final Dataset<Relation> mergeRels = spark
|
final Dataset<Relation> mergeRels = spark
|
||||||
|
|
Loading…
Reference in New Issue