update of the spark test

This commit is contained in:
Michele De Bonis 2018-10-18 10:12:44 +02:00
parent 951313eeb1
commit 1f0eeaf7ab
6 changed files with 127 additions and 46 deletions

View File

@ -41,6 +41,7 @@
<!--</exclusions>-->
<!--</dependency>-->
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-data-protos</artifactId>

View File

@ -0,0 +1,73 @@
package eu.dnetlib;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.pace.model.MapDocument;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class ConnectedComponent implements Serializable {
private Set<MapDocument> docs;
private String id;
public ConnectedComponent() {
}
public ConnectedComponent(String id, Set<MapDocument> docs) {
this.id = id;
this.docs = docs;
}
public Set<MapDocument> getDocs() {
return docs;
}
public void setDocs(Set<MapDocument> docs) {
this.docs = docs;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public void initializeID() {
if (docs.size() > 1) {
String ccID = getMin(docs.stream().map(doc -> doc.getIdentifier()).collect(Collectors.toList()));
String prefix = ccID.split("\\|")[0];
String id = ccID.split("::")[1];
this.id = prefix + "|dedup_______::" + id;
} else {
this.id = docs.iterator().next().getIdentifier();
}
}
public String getMin(List<String> ids){
String min = ids.get(0);
for(String id: ids)
if (min.compareTo(id) > 0) {
min = id;
}
return min;
}
@Override
public String toString(){
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(this);
} catch (IOException e) {
return null;
}
}
}

View File

@ -1,5 +1,6 @@
package eu.dnetlib;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import eu.dnetlib.graph.GraphProcessor;
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
@ -18,9 +19,11 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;
import scala.collection.Iterable;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
@ -30,12 +33,12 @@ public class SparkTest {
private static final Log log = LogFactory.getLog(SparkTest.class);
public static void main(String[] args) {
final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Hello World").setMaster("local[*]"));
final JavaRDD<String> dataRDD = context.textFile("file:///Users/sandro/Downloads/software.json");
final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Deduplication").setMaster("local[*]"));
final JavaRDD<String> dataRDD = context.textFile("file:///Users/miconis/Downloads/dumps/organizations_sample.json");
counter = new SparkCounter(context);
final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/software.pace.conf"));
final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
BlockProcessor.constructAccumulator(config);
BlockProcessor.accumulators.forEach(acc -> {
@ -45,61 +48,47 @@ public class SparkTest {
});
//create vertexes of the graph: <ID, MapDocument>
JavaPairRDD<String, MapDocument> mapDocs = dataRDD.mapToPair(it -> {
MapDocument mapDocument = PaceUtils.asMapDocument(config, it);
return new Tuple2<>(mapDocument.getIdentifier(), mapDocument);
});
RDD<Tuple2<Object, MapDocument>> vertexes = mapDocs.mapToPair(t -> new Tuple2<Object, MapDocument>( (long) t._1().hashCode(), t._2())).rdd();
final JavaPairRDD<String, String> relationRDD = mapDocs.reduceByKey((a, b) -> a)
//create relations between documents
final JavaPairRDD<String, String> relationRDD = mapDocs.reduceByKey((a, b) -> a) //the reduce is just to be sure that we haven't document with same id
//from <id, doc> to List<groupkey,doc>
.flatMapToPair(a -> {
final MapDocument currentDocument = a._2();
return getGroupingKeys(config, currentDocument).stream()
.map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator();
}).groupByKey().flatMapToPair(it -> {
}).groupByKey() //group documents basing on the key
//create relations by comparing only elements in the same group
.flatMapToPair(it -> {
final SparkReporter reporter = new SparkReporter(counter);
new BlockProcessor(config).process(it._1(), it._2(), reporter);
return reporter.getReport().iterator();
});
RDD<Tuple2<Object, String>> vertexes = relationRDD.groupByKey().map(it -> {
final RDD<Edge<String>> edgeRdd = relationRDD.map(it -> new Edge<>(it._1().hashCode(),it._2().hashCode(), "similarTo")).rdd();
Long id = (long) it._1().hashCode();
return new Tuple2<Object, String>(id, it._1());
JavaRDD<ConnectedComponent> ccs = GraphProcessor.findCCs(vertexes, edgeRdd, 20).toJavaRDD();
}).rdd();
final JavaRDD<ConnectedComponent> connectedComponents = ccs.filter(cc -> cc.getDocs().size()>1);
final JavaRDD<ConnectedComponent> nonDeduplicated = ccs.filter(cc -> cc.getDocs().size()==1);
final RDD<Edge<String>> edgeRdd = relationRDD.map(it -> new Edge<>(it._1().hashCode(), it._2().hashCode(), "similarTo")).rdd();
Tuple2<Object, RDD<String>> cc = GraphProcessor.findCCs(vertexes, edgeRdd, 20);
final Long total = (Long) cc._1();
final JavaRDD<String> map = mapDocs.map(Tuple2::_1);
final JavaRDD<String> duplicatesRDD = cc._2().toJavaRDD();
final JavaRDD<String> nonDuplicates = map.subtract(duplicatesRDD);
relationRDD.collect().forEach(it-> System.out.println(it._1()+"<--->"+it._2()));
System.out.println("Non duplicates: "+ nonDuplicates.count());
System.out.println("Connected Components: "+ total);
System.out.println("Non duplicates: " + nonDeduplicated.count());
System.out.println("Duplicates: " + connectedComponents.flatMap(cc -> cc.getDocs().iterator()).count());
System.out.println("Connected Components: " + connectedComponents.count());
counter.getAccumulators().values().forEach(it-> System.out.println(it.getGroup()+" "+it.getName()+" -->"+it.value()));
//print ids
// ccs.foreach(cc -> System.out.println(cc.getId()));
ccs.saveAsTextFile("file:///Users/miconis/Downloads/dumps/organizations_dedup");
}
static String readFromClasspath(final String filename) {
final StringWriter sw = new StringWriter();
try {

View File

@ -1,24 +1,43 @@
package eu.dnetlib.graph
import java.lang
import eu.dnetlib.ConnectedComponent
import eu.dnetlib.pace.model.MapDocument
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.collection.JavaConversions
;
object GraphProcessor {
def findCCs(vertexes: RDD[(VertexId,String)], edges:RDD[Edge[String]], maxIterations: Int): (Long, RDD[String]) = {
val graph: Graph[String, String] = Graph(vertexes, edges)
def findCCs(vertexes: RDD[(VertexId,MapDocument)], edges:RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = {
val graph: Graph[MapDocument, String] = Graph(vertexes, edges)
val cc = graph.connectedComponents(maxIterations).vertices
val joinResult = vertexes.leftOuterJoin(cc).map {
case (id, (openaireId, cc)) => {
if (cc.isEmpty){
(id, openaireId)
}
else {
(cc.get, openaireId)
}
}
}
val totalCC =cc.map{
case (openaireId, ccId) =>ccId
}.distinct().count()
val connectedComponents = joinResult.groupByKey().map[ConnectedComponent](cc => asConnectedComponent(cc))
val connectedComponents: RDD[String] = vertexes.join(cc).map {
case (id, (openaireId, ccId)) => openaireId
}.distinct()
(totalCC, connectedComponents)
(connectedComponents)
}
def asConnectedComponent(group: (VertexId, Iterable[MapDocument])) : ConnectedComponent = {
val docs = group._2.toSet[MapDocument]
val connectedComponent = new ConnectedComponent("empty", JavaConversions.setAsJavaSet[MapDocument](docs));
connectedComponent.initializeID();
connectedComponent
}
}

View File

@ -33,8 +33,6 @@ public class SparkReporter implements Reporter {
@Override
public void emit(String type, String from, String to) {
report.add(new Tuple2<>(from, to));
}

View File

@ -50,6 +50,7 @@ public class ScoreResult {
@Override
public String toString() {
final GsonBuilder b = new GsonBuilder();
b.serializeSpecialFloatingPointValues();
return b.setPrettyPrinting().create().toJson(this);
}
}