Added First Spark Implementation of dedup

This commit is contained in:
Sandro La Bruzzo 2018-10-12 12:53:47 +02:00
parent 67e5f9858b
commit 674ea3909f
3 changed files with 43 additions and 3 deletions

View File

@ -151,6 +151,7 @@ public class BlockProcessor {
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
final ScoreResult sr = similarity(algo, pivot, curr);
log.info(sr.toString()+"SCORE "+ sr.getScore());
emitOutput(sr, idPivot, idCurr, context);
i++;
}

View File

@ -31,11 +31,11 @@ public class SparkTest {
public static void main(String[] args) {
final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Hello World").setMaster("local[*]"));
final JavaRDD<String> dataRDD = context.textFile("file:///Users/sandro/Downloads/organizations_complete.json");
final JavaRDD<String> dataRDD = context.textFile("file:///Users/sandro/Downloads/software.json");
counter = new SparkCounter(context);
final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf"));
final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/software.pace.conf"));
BlockProcessor.constructAccumulator(config);
BlockProcessor.accumulators.forEach(acc -> {
@ -77,7 +77,6 @@ public class SparkTest {
final Long total = (Long) cc._1();
final JavaRDD<String> map = mapDocs.map(Tuple2::_1);
@ -87,10 +86,13 @@ public class SparkTest {
final JavaRDD<String> nonDuplicates = map.subtract(duplicatesRDD);
relationRDD.collect().forEach(it-> System.out.println(it._1()+"<--->"+it._2()));
System.out.println("Non duplicates: "+ nonDuplicates.count());
System.out.println("Connected Components: "+ total);
counter.getAccumulators().values().forEach(it-> System.out.println(it.getGroup()+" "+it.getName()+" -->"+it.value()));

View File

@ -0,0 +1,37 @@
{
"wf" : {
"threshold" : "0.99",
"dedupRun" : "001",
"entityType" : "result",
"subEntityType" : "resulttype",
"subEntityValue" : "software",
"orderField" : "title",
"queueMaxSize" : "2000",
"groupMaxSize" : "10",
"slidingWindowSize" : "200",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_isAffiliatedWith", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true"
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } }
],
"strictConditions" : [
{ "name" : "exactMatch", "fields" : [ "doi", "resulttype", "url" ] }
],
"conditions" : [
{ "name" : "titleVersionMatch", "fields" : [ "title" ] }
],
"model" : [
{ "name" : "doi", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "pid[qualifier#classid = {doi}]/value" },
{ "name" : "title", "algo" : "LevensteinTitle", "type" : "String", "weight" : "1.0", "ignoreMissing" : "false", "path" : "result/metadata/title[qualifier#classid = {main title}]/value" },
{ "name" : "url", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "result/instance/url" },
{ "name" : "resulttype", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "false", "path" : "result/metadata/resulttype/classid" }
],
"blacklists" : {
}
}
}