apply limits (length, size) to pace Fields

This commit is contained in:
Claudio Atzori 2018-11-20 10:51:38 +01:00
parent 1ff5be3f04
commit d72960f8b9
7 changed files with 56 additions and 14 deletions

View File

@ -33,7 +33,7 @@ public class SparkTest {
public static void main(String[] args) {
final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Deduplication").setMaster("local[*]"));
final URL dataset = SparkTest.class.getResource("/eu/dnetlib/pace/results.json");
final URL dataset = SparkTest.class.getResource("/eu/dnetlib/pace/result.title.stackoverflow.json");
final JavaRDD<String> dataRDD = context.textFile(dataset.getPath());
counter = new SparkCounter(context);
@ -57,13 +57,17 @@ public class SparkTest {
RDD<Tuple2<Object, MapDocument>> vertexes = mapDocs.mapToPair(t -> new Tuple2<Object, MapDocument>( (long) t._1().hashCode(), t._2())).rdd();
//create relations between documents
final JavaPairRDD<String, String> relationRDD = mapDocs.reduceByKey((a, b) -> a) //the reduce is just to be sure that we haven't document with same id
JavaPairRDD<String, Iterable<MapDocument>> blocks = mapDocs.reduceByKey((a, b) -> a) //the reduce is just to be sure that we haven't document with same id
//Clustering: from <id, doc> to List<groupkey,doc>
.flatMapToPair(a -> {
final MapDocument currentDocument = a._2();
return getGroupingKeys(config, currentDocument).stream()
.map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator();
}).groupByKey() //group documents basing on the key
}).groupByKey(); //group documents basing on the key
log.info("blocks to process: " + blocks.count());
final JavaPairRDD<String, String> relationRDD = blocks
//create relations by comparing only elements in the same group
.flatMapToPair(it -> {
final SparkReporter reporter = new SparkReporter(counter);

View File

@ -24,9 +24,9 @@
],
"model" : [
{ "name" : "pid", "algo" : "Null", "type" : "JSON", "weight" : "0.0", "ignoreMissing" : "true", "path" : "pid", "overrideMatch" : "true" },
{ "name" : "title", "algo" : "JaroWinkler", "type" : "String", "weight" : "1.0", "ignoreMissing" : "false", "path" : "result/metadata/title[qualifier#classid = {main title}]/value", "length" : 10 },
{ "name" : "title", "algo" : "LevensteinTitle", "type" : "String", "weight" : "1.0", "ignoreMissing" : "false", "path" : "result/metadata/title[qualifier#classid = {main title}]/value" },
{ "name" : "dateofacceptance", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "result/metadata/dateofacceptance/value" } ,
{ "name" : "authors", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "result/metadata/author/fullname", "size" : 1 }
{ "name" : "authors", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "result/metadata/author/fullname", "size" : 2 }
],
"blacklists" : {
"title" : [

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,23 +1,28 @@
package eu.dnetlib.pace.distance;
import java.util.Collection;
import java.util.List;
import eu.dnetlib.pace.condition.ConditionAlgo;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.distance.eval.ConditionEvalMap;
import eu.dnetlib.pace.distance.eval.DistanceEval;
import eu.dnetlib.pace.distance.eval.DistanceEvalMap;
import eu.dnetlib.pace.distance.eval.ScoreResult;
import eu.dnetlib.pace.model.Document;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldDef;
import eu.dnetlib.pace.model.*;
import eu.dnetlib.pace.util.PaceException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* The distance between two documents is given by the weighted mean of the field distances
*/
public class DistanceScorer {
private static final Log log = LogFactory.getLog(DistanceScorer.class);
private Config config;
public DistanceScorer(final Config config) {
@ -79,7 +84,7 @@ public class DistanceScorer {
if (va.getType().equals(vb.getType())) {
de.setDistance(w * fd.distanceAlgo().distance(va, vb));
} else {
throw new IllegalArgumentException(String.format("Types are differents type: %s:%s - %s:%s", va, va.getType(), vb, vb.getType()));
throw new PaceException(String.format("Types are different: %s:%s - %s:%s", va, va.getType(), vb, vb.getType()));
}
}
return de;
@ -87,7 +92,27 @@ public class DistanceScorer {
}
private Field getValue(final Document d, final FieldDef fd) {
return d.values(fd.getName());
final Field v = d.values(fd.getName());
if (fd.getLength() > 0) {
if (v instanceof FieldValueImpl) {
((FieldValueImpl) v).setValue(StringUtils.substring(v.stringValue(), 0, fd.getLength()));
} else if (v instanceof FieldListImpl) {
List<String> strings = ((FieldListImpl) v).stringList();
strings = strings.stream()
.limit(fd.getSize() > 0 ? fd.getSize() : strings.size())
.map(s -> StringUtils.substring(s, 0, fd.getLength()))
.collect(Collectors.toList());
((FieldListImpl) v).clear();
((FieldListImpl) v).addAll(strings.stream()
.limit(fd.getSize() > 0 ? fd.getSize() : strings.size())
.map(s -> StringUtils.substring(s, 0, fd.getLength()))
.map(s -> new FieldValueImpl(v.getType(), v.getName(), s))
.collect(Collectors.toList()));
}
}
return v;
}
private double sumWeights(final Collection<FieldDef> fields) {

View File

@ -2,13 +2,18 @@ package eu.dnetlib.pace.distance.algo;
import com.wcohen.ss.AbstractStringDistance;
import eu.dnetlib.pace.distance.DistanceClass;
import eu.dnetlib.pace.distance.DistanceScorer;
import eu.dnetlib.pace.distance.SecondStringDistanceAlgo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
@DistanceClass("LevensteinTitle")
public class LevensteinTitle extends SecondStringDistanceAlgo {
private static final Log log = LogFactory.getLog(LevensteinTitle.class);
public LevensteinTitle(Map<String,Number> params){
super(params, new com.wcohen.ss.Levenstein());
}

View File

@ -2,8 +2,12 @@ package eu.dnetlib.pace.util;
public class PaceException extends RuntimeException {
public PaceException(String s, Throwable e){
public PaceException(String s, Throwable e) {
super(s, e);
}
public PaceException(String s) {
super(s);
}
}