2018-10-18 15:30:51 +02:00
|
|
|
package eu.dnetlib.graph
|
2018-10-18 10:12:44 +02:00
|
|
|
|
|
|
|
import eu.dnetlib.ConnectedComponent
|
|
|
|
import eu.dnetlib.pace.model.MapDocument
|
2018-10-11 15:19:20 +02:00
|
|
|
import org.apache.spark.graphx._
|
|
|
|
import org.apache.spark.rdd.RDD
|
2018-10-18 10:12:44 +02:00
|
|
|
|
2018-10-18 15:30:51 +02:00
|
|
|
import scala.collection.JavaConversions;
|
2018-10-11 15:19:20 +02:00
|
|
|
|
|
|
|
object GraphProcessor {
|
|
|
|
|
2018-10-18 15:30:51 +02:00
|
|
|
def findCCs(vertexes: RDD[(VertexId, MapDocument)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = {
|
2019-08-06 12:09:34 +02:00
|
|
|
val graph: Graph[MapDocument, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
|
2018-10-11 15:19:20 +02:00
|
|
|
val cc = graph.connectedComponents(maxIterations).vertices
|
|
|
|
|
2018-10-18 10:12:44 +02:00
|
|
|
val joinResult = vertexes.leftOuterJoin(cc).map {
|
|
|
|
case (id, (openaireId, cc)) => {
|
2018-10-18 15:30:51 +02:00
|
|
|
if (cc.isEmpty) {
|
2018-10-18 10:12:44 +02:00
|
|
|
(id, openaireId)
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
(cc.get, openaireId)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
val connectedComponents = joinResult.groupByKey().map[ConnectedComponent](cc => asConnectedComponent(cc))
|
2018-10-11 15:19:20 +02:00
|
|
|
|
2018-10-18 10:12:44 +02:00
|
|
|
(connectedComponents)
|
|
|
|
|
|
|
|
}
|
2018-10-11 15:19:20 +02:00
|
|
|
|
2018-10-18 15:30:51 +02:00
|
|
|
def asConnectedComponent(group: (VertexId, Iterable[MapDocument])): ConnectedComponent = {
|
2018-10-18 10:12:44 +02:00
|
|
|
val docs = group._2.toSet[MapDocument]
|
2019-06-18 14:05:31 +02:00
|
|
|
val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[MapDocument](docs));
|
2018-10-18 10:12:44 +02:00
|
|
|
connectedComponent
|
2018-10-11 15:19:20 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|