implemented cut of connected component

This commit is contained in:
Sandro La Bruzzo 2020-07-13 14:18:42 +02:00
parent e2093e42db
commit d561b2dd21
4 changed files with 37 additions and 6 deletions

View File

@ -76,6 +76,19 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final String workingPath = parser.get("workingPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
int cut = 0;
try {
cut = Integer.parseInt(parser.get("cutConnectedComponent"));
} catch (Throwable e) {
log.error("unable to parse "+parser.get(" cut-off threshold"));
}
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
@ -112,7 +125,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final Dataset<Relation> mergeRels = spark
.createDataset(
GraphProcessor
.findCCs(vertexes.rdd(), edgeRdd, maxIterations)
.findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut)
.toJavaRDD()
.filter(k -> k.getDocIds().size() > 1)
.flatMap(cc -> ccToMergeRel(cc, dedupConf))
@ -120,6 +133,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
Encoders.bean(Relation.class));
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
}
}

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.dedup.graph;
import java.io.IOException;
import java.io.Serializable;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.annotate.JsonIgnore;
@ -21,9 +22,14 @@ public class ConnectedComponent implements Serializable {
public ConnectedComponent() {
}
public ConnectedComponent(Set<String> docIds) {
public ConnectedComponent(Set<String> docIds, final int cut) {
this.docIds = docIds;
createID();
if (cut > 0 && docIds.size() > cut){
docIds = docIds.stream().filter(s -> !ccId.equalsIgnoreCase(s)).limit(cut -1).collect(Collectors.toSet());
docIds.add(ccId);
}
}
public String createID() {
@ -41,6 +47,7 @@ public class ConnectedComponent implements Serializable {
public String getMin() {
final StringBuilder min = new StringBuilder();
docIds
.forEach(
i -> {

View File

@ -7,7 +7,7 @@ import scala.collection.JavaConversions;
object GraphProcessor {
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = {
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
val cc = graph.connectedComponents(maxIterations).vertices
@ -22,15 +22,15 @@ object GraphProcessor {
}
}
val connectedComponents = joinResult.groupByKey()
.map[ConnectedComponent](cc => asConnectedComponent(cc))
.map[ConnectedComponent](cc => asConnectedComponent(cc, cut))
connectedComponents
}
def asConnectedComponent(group: (VertexId, Iterable[String])): ConnectedComponent = {
def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = {
val docs = group._2.toSet[String]
val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs));
val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut);
connectedComponent
}

View File

@ -17,6 +17,13 @@
"paramDescription": "the url for the lookup service",
"paramRequired": true
},
{
"paramName": "cc",
"paramLongName": "cutConnectedComponent",
"paramDescription": "the number of maximum elements that belongs to a connected components",
"paramRequired": false
}
,
{
"paramName": "w",
"paramLongName": "workingPath",