forked from antonis.lempesis/dnet-hadoop
minor changes
This commit is contained in:
parent
9bdcb02179
commit
f61adfc2bb
|
@ -52,7 +52,7 @@ public class SparkCreateConnectedComponent {
|
||||||
|
|
||||||
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath,entity)).as(Encoders.bean(Relation.class));
|
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath,entity)).as(Encoders.bean(Relation.class));
|
||||||
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
|
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
|
||||||
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, 20).toJavaRDD();
|
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
|
||||||
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
|
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
|
||||||
c.getDocIds()
|
c.getDocIds()
|
||||||
.stream()
|
.stream()
|
||||||
|
|
Loading…
Reference in New Issue