implementation of the integration test, addition of document blocks to group entities after clustering

This commit is contained in:
miconis 2019-05-21 16:38:26 +02:00
parent 3dfbf5fab7
commit a5526f6254
32 changed files with 2429 additions and 859 deletions

@ -0,0 +1,52 @@
package eu.dnetlib;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.PaceException;
import java.util.Set;
public class DocumentsBlock implements Serializable {
String key;
Set<MapDocument> documents;
public DocumentsBlock(String key, Set<MapDocument> documents) {
this.key = key;
this.documents = documents;
public DocumentsBlock(String key, Iterable<MapDocument> documents) {
this.key = key;
this.documents = Sets.newHashSet(documents);
public String getKey() {
return key;
public void setKey(String key) {
this.key = key;
public Iterable<MapDocument> getDocuments() {
return documents;
public void setDocuments(Set<MapDocument> documents) {
this.documents = documents;
public String toString(){
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(this);
} catch (IOException e) {
throw new PaceException("Failed to create Json: ", e);

@ -1,11 +1,11 @@
package eu.dnetlib;
import eu.dnetlib.graph.GraphProcessor;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.BlockProcessor;
import eu.dnetlib.pace.utils.PaceUtils;
import eu.dnetlib.reporter.SparkCounter;
import eu.dnetlib.reporter.SparkBlockProcessor;
import eu.dnetlib.reporter.SparkReporter;
@ -13,13 +13,14 @@ import;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;
import java.util.Map;
public class SparkLocalTest {
public static SparkCounter counter ;
public static void main(String[] args) {
@ -31,27 +32,23 @@ public class SparkLocalTest {
final JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
final URL dataset = SparkTest.class.getResource("/eu/dnetlib/pace/");
final URL dataset = SparkLocalTest.class.getResource("/eu/dnetlib/pace/softwares.json");
final JavaRDD<String> dataRDD = context.textFile(dataset.getPath());
counter = new SparkCounter(context);
//read the configuration from the classpath
final DedupConfig config = DedupConfig.load(Utility.readFromClasspath("/eu/dnetlib/pace/org.curr.conf"));
final DedupConfig config = DedupConfig.load(Utility.readFromClasspath("/eu/dnetlib/pace/software.test.pace.conf", SparkLocalTest.class));
BlockProcessor.accumulators.forEach(acc -> {
final String[] values = acc.split("::");
counter.incrementCounter(values[0], values[1], 0);
Map<String, LongAccumulator> accumulators = Utility.constructAccumulator(config,;
//create vertexes of the graph: <ID, MapDocument>
JavaPairRDD<String, MapDocument> mapDocs = dataRDD.mapToPair(it -> {
MapDocument mapDocument = PaceUtils.asMapDocument(config, it);
return new Tuple2<>(mapDocument.getIdentifier(), mapDocument);
// mapDocs.filter(d -> d._2().getFieldMap().get("doi").stringValue().length() > 0).foreach(d -> System.out.println(d));
// mapDocs.filter(d -> d._2().getFieldMap().get("documentationUrl").stringValue().length() > 0).foreach(d -> System.out.println(d));
RDD<Tuple2<Object, MapDocument>> vertexes = mapDocs.mapToPair(t -> new Tuple2<Object, MapDocument>( (long) t._1().hashCode(), t._2())).rdd();
//create relations between documents
@ -64,19 +61,16 @@ public class SparkLocalTest {
.map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator();
}).groupByKey();//group documents basing on the key
// blocks = blocks.filter(b -> Iterables.size(b._2())>2);
// vertexes = blocks.flatMap(b -> b._2().iterator()).map(t -> new Tuple2<Object, MapDocument>((long) t.getIdentifier().hashCode(), t)).rdd();
//print blocks
blocks.foreach(b -> {
String print = b._1() + ": ";
for (MapDocument doc : b._2()) {
print += doc.getIdentifier() + " ";
// -> new DocumentsBlock(group._1(), group._2())).foreach(b -> System.out.println(b));
//create relations by comparing only elements in the same group
final JavaPairRDD<String, String> relationRDD = blocks.flatMapToPair(it -> {
final SparkReporter reporter = new SparkReporter(counter);
new BlockProcessor(config).process(it._1(), it._2(), reporter);
final SparkReporter reporter = new SparkReporter();
new SparkBlockProcessor(config).process(it._1(), it._2(), reporter, accumulators);
return reporter.getReport().iterator();
@ -87,28 +81,27 @@ public class SparkLocalTest {
final JavaRDD<ConnectedComponent> connectedComponents = ccs.filter(cc -> cc.getDocs().size()>1);
final JavaRDD<ConnectedComponent> nonDeduplicated = ccs.filter(cc -> cc.getDocs().size()==1);
System.out.println("Non duplicates: " + nonDeduplicated.count());
System.out.println("Duplicates: " + connectedComponents.flatMap(cc -> cc.getDocs().iterator()).count());
System.out.println("Connected Components: " + connectedComponents.count());
counter.getAccumulators().values().forEach(it-> System.out.println(it.getGroup()+" "+it.getName()+" -->"+it.value()));
//print deduped
connectedComponents.foreach(cc -> {
System.out.println("cc = " + cc.getId());
for (MapDocument doc: cc.getDocs()) {
System.out.println(doc.getIdentifier() + "; ln: " + doc.getFieldMap().get("legalname").stringValue() + "; sn: " + doc.getFieldMap().get("legalshortname").stringValue());
//print nondeduped
nonDeduplicated.foreach(cc -> {
System.out.println("nd = " + cc.getId());
System.out.println(cc.getDocs().iterator().next().getFieldMap().get("legalname").stringValue() + "; sn: " + cc.getDocs().iterator().next().getFieldMap().get("legalshortname").stringValue());
// //print nondeduped
// nonDeduplicated.foreach(cc -> {
// System.out.println("nd = " + cc.getId());
// });
//print ids
//// ccs.foreach(cc -> System.out.println(cc.getId()));
//// connectedComponents.saveAsTextFile("file:///Users/miconis/Downloads/dumps/organizations_dedup");
System.out.println("Non duplicates: " + nonDeduplicated.count());
System.out.println("Duplicates: " + connectedComponents.flatMap(cc -> cc.getDocs().iterator()).count());
System.out.println("Connected Components: " + connectedComponents.count());
accumulators.forEach((name, acc) -> System.out.println(name + " -> " + acc.value()));
// //print ids
// ccs.foreach(cc -> System.out.println(cc.getId()));
// connectedComponents.saveAsTextFile("file:///Users/miconis/Downloads/dumps/organizations_dedup");

@ -3,9 +3,8 @@ package eu.dnetlib;
import eu.dnetlib.graph.GraphProcessor;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.BlockProcessor;
import eu.dnetlib.pace.utils.PaceUtils;
import eu.dnetlib.reporter.SparkCounter;
import eu.dnetlib.reporter.SparkBlockProcessor;
import eu.dnetlib.reporter.SparkReporter;
@ -13,16 +12,22 @@ import;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;
import java.util.Map;
public class SparkTest {
public static SparkCounter counter ;
public static void main(String[] args) throws IOException {
final String inputSpacePath = args[0];
final String dedupConfigPath = args[1];
final String groupsPath = args[2] + "_groups";
final String outputPath = args[2] + "_output";
final SparkSession spark = SparkSession
@ -31,19 +36,11 @@ public class SparkTest {
final JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
final JavaRDD<String> dataRDD = Utility.loadDataFromHDFS(args[0], context);
final JavaRDD<String> dataRDD = Utility.loadDataFromHDFS(inputSpacePath, context);
counter = new SparkCounter(context);
final DedupConfig config = Utility.loadConfigFromHDFS(dedupConfigPath);
final DedupConfig config = Utility.loadConfigFromHDFS(args[1]);
BlockProcessor.accumulators.forEach(acc -> {
final String[] values = acc.split("::");
counter.incrementCounter(values[0], values[1], 0);
Map<String, LongAccumulator> accumulators = Utility.constructAccumulator(config,;
//create vertexes of the graph: <ID, MapDocument>
JavaPairRDD<String, MapDocument> mapDocs = dataRDD.mapToPair(it -> {
@ -52,7 +49,7 @@ public class SparkTest {
RDD<Tuple2<Object, MapDocument>> vertexes = mapDocs.mapToPair(t -> new Tuple2<Object, MapDocument>( (long) t._1().hashCode(), t._2())).rdd();
//create relations between documents
//group documents basing on clustering
JavaPairRDD<String, Iterable<MapDocument>> blocks = mapDocs.reduceByKey((a, b) -> a) //the reduce is just to be sure that we haven't document with same id
//Clustering: from <id, doc> to List<groupkey,doc>
.flatMapToPair(a -> {
@ -60,21 +57,15 @@ public class SparkTest {
return Utility.getGroupingKeys(config, currentDocument).stream()
.map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator();
}).groupByKey();//group documents basing on the key
//print blocks
blocks.foreach(b -> {
String print = b._1() + ": ";
for (MapDocument doc : b._2()) {
print += doc.getIdentifier() + " ";
Utility.deleteIfExists(groupsPath); -> new DocumentsBlock(group._1(), group._2())).saveAsTextFile(groupsPath);
//create relations by comparing only elements in the same group
final JavaPairRDD<String, String> relationRDD = blocks.flatMapToPair(it -> {
final SparkReporter reporter = new SparkReporter(counter);
new BlockProcessor(config).process(it._1(), it._2(), reporter);
final SparkReporter reporter = new SparkReporter();
new SparkBlockProcessor(config).process(it._1(), it._2(), reporter, accumulators);
return reporter.getReport().iterator();
@ -82,31 +73,17 @@ public class SparkTest {
JavaRDD<ConnectedComponent> ccs = GraphProcessor.findCCs(vertexes, edgeRdd, 20).toJavaRDD();
//save connected components on textfile
final JavaRDD<ConnectedComponent> connectedComponents = ccs.filter(cc -> cc.getDocs().size()>1);
final JavaRDD<ConnectedComponent> nonDeduplicated = ccs.filter(cc -> cc.getDocs().size()==1);
System.out.println("Non duplicates: " + nonDeduplicated.count());
System.out.println("Duplicates: " + connectedComponents.flatMap(cc -> cc.getDocs().iterator()).count());
System.out.println("Connected Components: " + connectedComponents.count());
counter.getAccumulators().values().forEach(it-> System.out.println(it.getGroup()+" "+it.getName()+" -->"+it.value()));
//print deduped
connectedComponents.foreach(cc -> {
System.out.println("cc = " + cc.getId());
for (MapDocument doc: cc.getDocs()) {
System.out.println(doc.getIdentifier() + "; ln: " + doc.getFieldMap().get("legalname").stringValue() + "; sn: " + doc.getFieldMap().get("legalshortname").stringValue());
//print nondeduped
nonDeduplicated.foreach(cc -> {
System.out.println("nd = " + cc.getId());
System.out.println(cc.getDocs().iterator().next().getFieldMap().get("legalname").stringValue() + "; sn: " + cc.getDocs().iterator().next().getFieldMap().get("legalshortname").stringValue());
// print ids
// ccs.foreach(cc -> System.out.println(cc.getId()));
// connectedComponents.saveAsTextFile("file:///Users/miconis/Downloads/dumps/organizations_dedup");
accumulators.forEach((name, acc) -> System.out.println(name + " -> " + acc.value()));

@ -9,24 +9,55 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.util.LongAccumulator;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class Utility {
public static Map<String, LongAccumulator> constructAccumulator(final DedupConfig dedupConf, final SparkContext context) {
Map<String, LongAccumulator> accumulators = new HashMap<>();
String acc1 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "records per hash key = 1");
accumulators.put(acc1, context.longAccumulator(acc1));
String acc2 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField());
accumulators.put(acc2, context.longAccumulator(acc2));
String acc3 = String.format("%s::%s",dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize()));
accumulators.put(acc3, context.longAccumulator(acc3));
String acc4 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "skip list");
accumulators.put(acc4, context.longAccumulator(acc4));
String acc5 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)");
accumulators.put(acc5, context.longAccumulator(acc5));
String acc6 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold());
accumulators.put(acc6, context.longAccumulator(acc6));
return accumulators;
public static JavaRDD<String> loadDataFromHDFS(String path, JavaSparkContext context) {
return context.textFile(path);
public static void deleteIfExists(String path) throws IOException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(new Path(path))){
fileSystem.delete(new Path(path), true);
public static DedupConfig loadConfigFromHDFS(String path) throws IOException {
Configuration conf = new Configuration();
// conf.set("fs.defaultFS", "");
FileSystem fileSystem = FileSystem.get(conf);
FSDataInputStream inputStream = new FSDataInputStream( Path(path)));
@ -34,10 +65,10 @@ public class Utility {
static String readFromClasspath(final String filename) {
static <T> String readFromClasspath(final String filename, final Class<T> clazz) {
final StringWriter sw = new StringWriter();
try {
IOUtils.copy(SparkTest.class.getResourceAsStream(filename), sw);
IOUtils.copy(clazz.getResourceAsStream(filename), sw);
return sw.toString();
} catch (final IOException e) {
throw new RuntimeException("cannot load resource from classpath: " + filename);

@ -83,6 +83,7 @@ public class PaceUtils {
try {
JsonFormat.merge(json, b);
} catch (JsonFormat.ParseException e) {
System.out.println("**************************** " + json);
throw new IllegalArgumentException(e);
return ProtoDocumentBuilder.newInstance(b.getId(),, conf.getPace().getModel());

View File

@ -0,0 +1,191 @@
package eu.dnetlib.reporter;
import eu.dnetlib.pace.clustering.NGramUtils;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.config.WfConfig;
import eu.dnetlib.pace.distance.PaceDocumentDistance;
import eu.dnetlib.pace.distance.eval.ScoreResult;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.model.MapDocumentComparator;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.util.LongAccumulator;
import java.util.*;
public class SparkBlockProcessor {
public static final List<String> accumulators= new ArrayList<>();
private static final Log log = LogFactory.getLog(SparkBlockProcessor.class);
private DedupConfig dedupConf;
public SparkBlockProcessor(DedupConfig dedupConf) {
this.dedupConf = dedupConf;
public void process(final String key, final Iterable<MapDocument> documents, final SparkReporter context, Map<String, LongAccumulator> accumulators) {
final Queue<MapDocument> q = prepare(documents);
if (q.size() > 1) {
//"reducing key: '" + key + "' records: " + q.size());
//process(q, context);
process(simplifyQueue(q, key, context, accumulators), context, accumulators);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1, accumulators);
private Queue<MapDocument> prepare(final Iterable<MapDocument> documents) {
final Queue<MapDocument> queue = new PriorityQueue<>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
final Set<String> seen = new HashSet<String>();
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
documents.forEach(doc -> {
if (queue.size() <= queueMaxSize) {
final String id = doc.getIdentifier();
if (!seen.contains(id)) {
return queue;
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final SparkReporter context, Map<String, LongAccumulator> accumulators) {
final Queue<MapDocument> q = new LinkedList<>();
String fieldRef = "";
final List<MapDocument> tempResults = Lists.newArrayList();
while (!queue.isEmpty()) {
final MapDocument result = queue.remove();
final String orderFieldName = dedupConf.getWf().getOrderField();
final Field orderFieldValue = result.values(orderFieldName);
if (!orderFieldValue.isEmpty()) {
final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
if (field.equals(fieldRef)) {
} else {
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram, accumulators);
fieldRef = field;
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField(), 1, accumulators);
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram, accumulators);
return q;
private void populateSimplifiedQueue(final Queue<MapDocument> q,
final List<MapDocument> tempResults,
final SparkReporter context,
final String fieldRef,
final String ngram,
Map<String, LongAccumulator> accumulators) {
WfConfig wf = dedupConf.getWf();
if (tempResults.size() < wf.getGroupMaxSize()) {
} else {
context.incrementCounter(wf.getEntityType(), String.format("Skipped records for count(%s) >= %s", wf.getOrderField(), wf.getGroupMaxSize()), tempResults.size(), accumulators);
//"Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
private void process(final Queue<MapDocument> queue, final SparkReporter context, Map<String, LongAccumulator> accumulators) {
final PaceDocumentDistance algo = new PaceDocumentDistance();
while (!queue.isEmpty()) {
final MapDocument pivot = queue.remove();
final String idPivot = pivot.getIdentifier();
WfConfig wf = dedupConf.getWf();
final Field fieldsPivot = pivot.values(wf.getOrderField());
final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
if (fieldPivot != null) {
// System.out.println(idPivot + " --> " + fieldPivot);
int i = 0;
for (final MapDocument curr : queue) {
final String idCurr = curr.getIdentifier();
if (mustSkip(idCurr)) {
context.incrementCounter(wf.getEntityType(), "skip list", 1, accumulators);
if (i > wf.getSlidingWindowSize()) {
final Field fieldsCurr = curr.values(wf.getOrderField());
final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
final ScoreResult sr = similarity(algo, pivot, curr);
//"SCORE "+ sr.getScore());
emitOutput(sr, idPivot, idCurr, context, accumulators);
private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final SparkReporter context, Map<String, LongAccumulator> accumulators) {
final double d = sr.getScore();
if (d >= dedupConf.getWf().getThreshold()) {
writeSimilarity(context, idPivot, idCurr);
context.incrementCounter(dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)", 1, accumulators);
} else {
context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1, accumulators);
private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
try {
return algo.between(a, b, dedupConf);
} catch(Throwable e) {
log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
throw new IllegalArgumentException(e);
private boolean mustSkip(final String idPivot) {
return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
private String getNsPrefix(final String id) {
return StringUtils.substringBetween(id, "|", "::");
private void writeSimilarity(final SparkReporter context, final String from, final String to) {
final String type = dedupConf.getWf().getEntityType();
context.emit(type, from, to);
// context.emit(type, to, from);

@ -1,36 +0,0 @@
package eu.dnetlib.reporter;
import eu.dnetlib.DnetAccumulator;
import java.util.HashMap;
import java.util.Map;
public class SparkCounter {
final JavaSparkContext javaSparkContext;
public SparkCounter(final JavaSparkContext context){
this.javaSparkContext = context;
final Map<String, DnetAccumulator> accumulators = new HashMap<>();
public void incrementCounter(String counterGroup, String counterName, long delta) {
final String accumulatorName = String.format("%s::%s", counterGroup, counterName);
DnetAccumulator currentAccumulator = null;
if (!accumulators.containsKey(accumulatorName)) {
currentAccumulator = new DnetAccumulator(counterGroup, counterName);,accumulatorName);
accumulators.put(accumulatorName, currentAccumulator);
} else {
currentAccumulator = accumulators.get(accumulatorName);
public Map<String, DnetAccumulator> getAccumulators() {
return accumulators;

@ -3,29 +3,30 @@ package eu.dnetlib.reporter;
import eu.dnetlib.pace.util.Reporter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.util.LongAccumulator;
import scala.Serializable;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class SparkReporter implements Reporter {
final SparkCounter counter;
public class SparkReporter implements Serializable {
final List<Tuple2<String, String>> report = new ArrayList<>();
private static final Log log = LogFactory.getLog(SparkReporter.class);
public SparkReporter(SparkCounter counter){
this.counter = counter;
public SparkReporter(){}
public void incrementCounter(String counterGroup, String counterName, long delta, Map<String, LongAccumulator> accumulators) {
final String accumulatorName = String.format("%s::%s", counterGroup, counterName);
if (accumulators.containsKey(accumulatorName)){
public void incrementCounter(String counterGroup, String counterName, long delta) {
counter.incrementCounter(counterGroup, counterName, delta);
public void emit(String type, String from, String to) {
report.add(new Tuple2<>(from, to));

@ -0,0 +1,40 @@
"wf" : {
"threshold" : "0.99",
"dedupRun" : "001",
"entityType" : "result",
"subEntityType" : "resulttype",
"subEntityValue" : "software",
"orderField" : "title",
"queueMaxSize" : "2000",
"groupMaxSize" : "10",
"slidingWindowSize" : "200",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_isAffiliatedWith", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true"
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } },
{ "name" : "urlclustering", "fields": [ "url" ], "params" : { } }
"strictConditions" : [
{ "name" : "doiExactMatch", "fields": [ "doi" ] },
{ "name" : "exactMatch", "fields" : [ "url", "documentationUrl" ] }
"conditions" : [
{ "name" : "exactMatch", "fields" : ["resulttype"] }
"model" : [
{ "name" : "doi", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "pid[qualifier#classid = {doi}]/value" },
{ "name" : "title", "algo" : "LevensteinTitleIgnoreVersion", "type" : "String", "weight" : "1.0", "ignoreMissing" : "false", "path" : "result/metadata/title[qualifier#classid = {main title}]/value", "length" : 250, "size" : 5 },
{ "name" : "url", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "result/instance/url" },
{ "name" : "resulttype", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "false", "path" : "result/metadata/resulttype/classid" },
{ "name" : "documentationUrl", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "false", "path" : "result/metadata/documentationUrl/value" }
"blacklists" : {

File diff suppressed because one or more lines are too long

@ -1,12 +1,11 @@
package eu.dnetlib.pace;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
import org.junit.Test;
import java.util.Properties;
import static junit.framework.Assert.assertEquals;
@ -28,14 +27,15 @@ public class DedupTestIT {
conf.setProperty(OozieClient.APP_PATH, "hdfs://");
conf.setProperty(OozieClient.USER_NAME, "michele.debonis");
conf.setProperty("oozie.action.sharelib.for.spark", "spark2");
conf.setProperty("oozie.use.system.libpath", "true");
// setting workflow parameters
conf.setProperty("jobTracker", "");
conf.setProperty("nameNode", "hdfs://");
conf.setProperty("dedupConfiguration", prop.getProperty("dedup.configuration"));
conf.setProperty("inputSpace", prop.getProperty(""));
// conf.setProperty("inputDir", "/usr/tucu/inputdir");
// conf.setProperty("outputDir", "/usr/tucu/outputdir");
conf.setProperty("outputPath", prop.getProperty("output"));
conf.setProperty("statisticsPath", prop.getProperty("dedup.statistics"));
// submit and start the workflow job
String jobId =;
@ -49,9 +49,10 @@ public class DedupTestIT {
// print the final status of the workflow job
System.out.println("JOB LOG = " + wc.getJobLog(jobId));
// System.out.println("JOB LOG = " + wc.getJobLog(jobId));
assertEquals(WorkflowJob.Status.SUCCEEDED, wc.getJobInfo(jobId).getStatus());
@ -66,14 +67,4 @@ public class DedupTestIT {
return prop;
static String readFromClasspath(final String filename) {
final StringWriter sw = new StringWriter();
try {
IOUtils.copy(DedupTestIT.class.getResourceAsStream(filename), sw);
return sw.toString();
} catch (final IOException e) {
throw new RuntimeException("cannot load resource from classpath: " + filename);

@ -1,17 +0,0 @@
package eu.dnetlib.pace;
import eu.dnetlib.SparkLocalTest;
import org.junit.Test;
public class SparkTester {
public void sparkLocalTest() throws IOException {
SparkLocalTest.main(new String[]{});

@ -1,2 +1,3 @@ = /eu/dnetlib/pace/
dedup.configuration = /eu/dnetlib/pace/org.curr.conf = oozieJob/inputSpace/
dedup.configuration = oozieJob/dedupConfig/org.curr.conf = oozieJob/output/orgdedup

@ -22,6 +22,7 @@ public class UrlClustering extends AbstractPaceFunctions implements ClusteringFu
public Collection<String> apply(List<Field> fields) {
try {
.filter(f -> !f.isEmpty())
@ -29,13 +30,17 @@ public class UrlClustering extends AbstractPaceFunctions implements ClusteringFu
catch (IllegalStateException e){
return new HashSet<>();
public Map<String, Integer> getParams() {
return null;
private URL asUrl(final String value) {
private URL asUrl(String value) {
try {
return new URL(value);
} catch (MalformedURLException e) {
@ -44,4 +49,5 @@ public class UrlClustering extends AbstractPaceFunctions implements ClusteringFu

@ -6,7 +6,6 @@ import eu.dnetlib.pace.condition.ConditionAlgo;
import eu.dnetlib.pace.model.ClusteringDef;
import eu.dnetlib.pace.model.CondDef;
import eu.dnetlib.pace.model.FieldDef;
import eu.dnetlib.pace.model.TreeNodeDef;
import eu.dnetlib.pace.util.PaceResolver;
import org.apache.commons.collections.CollectionUtils;
import org.codehaus.jackson.annotate.JsonIgnore;
@ -24,11 +23,9 @@ public class PaceConfig implements Serializable {
private List<ClusteringDef> clustering;
private Map<String, List<String>> blacklists;
private Map<String, TreeNodeDef> decisionTree;
private Map<String, FieldDef> modelMap;
public static PaceResolver paceResolver;
// public PaceResolver paceResolver;
public PaceConfig() {}
@ -38,7 +35,7 @@ public class PaceConfig implements Serializable {
modelMap.put(fd.getName(), fd);
paceResolver = new PaceResolver();
// paceResolver = new PaceResolver();
public List<FieldDef> getModel() {
@ -61,14 +58,6 @@ public class PaceConfig implements Serializable {
return conditions;
public Map<String, TreeNodeDef> getDecisionTree() {
return decisionTree;
public void setDecisionTree(Map<String, TreeNodeDef> decisionTree) {
this.decisionTree = decisionTree;
public List<ConditionAlgo> getConditionAlgos() {
return asConditionAlgos(getConditions());

@ -32,6 +32,9 @@ public class LevensteinTitleIgnoreVersion extends SecondStringDistanceAlgo {
ca = ca.replaceAll("\\d", "").replaceAll(getRomans(ca), "").trim();
cb = cb.replaceAll("\\d", "").replaceAll(getRomans(cb), "").trim();
ca = filterAllStopWords(ca);
cb = filterAllStopWords(cb);
final String cca = finalCleanup(ca);
final String ccb = finalCleanup(cb);

View File

@ -2,12 +2,17 @@ package eu.dnetlib.pace.model;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import eu.dnetlib.pace.clustering.*;
import eu.dnetlib.pace.config.PaceConfig;
import eu.dnetlib.pace.util.PaceException;
import eu.dnetlib.pace.util.PaceResolver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ClusteringDef implements Serializable {
@ -18,6 +23,8 @@ public class ClusteringDef implements Serializable {
private Map<String, Integer> params;
PaceResolver paceResolver = new PaceResolver();
public ClusteringDef() {}
public String getName() {
@ -30,7 +37,7 @@ public class ClusteringDef implements Serializable {
public ClusteringFunction clusteringFunction() {
try {
return PaceConfig.paceResolver.getClusteringFunction(getName(), params);
return paceResolver.getClusteringFunction(getName(), params);
} catch (PaceException e) {
return null;

@ -7,6 +7,7 @@ import java.util.List;
import eu.dnetlib.pace.condition.*;
import eu.dnetlib.pace.config.PaceConfig;
import eu.dnetlib.pace.util.PaceException;
import eu.dnetlib.pace.util.PaceResolver;
public class CondDef implements Serializable {
@ -15,10 +16,12 @@ public class CondDef implements Serializable {
private List<String> fields;
PaceResolver paceResolver = new PaceResolver();
public CondDef() {}
public ConditionAlgo conditionAlgo(final List<FieldDef> fields){
return PaceConfig.paceResolver.getConditionAlgo(getName(), fields);
return paceResolver.getConditionAlgo(getName(), fields);
public String getName() {

View File

@ -6,6 +6,7 @@ import;
import eu.dnetlib.pace.config.PaceConfig;
import eu.dnetlib.pace.config.Type;
import eu.dnetlib.pace.distance.DistanceAlgo;
import eu.dnetlib.pace.util.PaceResolver;
import java.util.HashMap;
@ -33,6 +34,8 @@ public class FieldDef implements Serializable {
private double weight;
PaceResolver paceResolver = new PaceResolver();
* Sets maximum size for the repeatable fields in the model. -1 for unbounded size.
@ -85,7 +88,7 @@ public class FieldDef implements Serializable {
params.put("length", getLength());
params.put("weight", getWeight());
return PaceConfig.paceResolver.getDistanceAlgo(getAlgo(), params);
return paceResolver.getDistanceAlgo(getAlgo(), params);
public boolean isIgnoreMissing() {

@ -1,145 +0,0 @@
package eu.dnetlib.pace.model;
import eu.dnetlib.pace.config.PaceConfig;
import eu.dnetlib.pace.tree.Comparator;
import eu.dnetlib.pace.util.PaceException;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import java.util.List;
public class TreeNodeDef implements Serializable {
private List<FieldConf> fields; //list of fields involved in the tree node (contains comparators to be used and field on which apply the comparator)
private AggType aggregation; //how to aggregate similarity measures for every field
private double threshold; //threshold on the similarity measure
private String positive; //specifies the next node in case of positive result: similarity>=th
private String negative; //specifies the next node in case of negative result: similarity<th
private String undefined; //specifies the next node in case of undefined result: similarity=-1
boolean ignoreMissing = true; //specifies what to do in case of missing field
public TreeNodeDef() {
//compute the similarity measure between two documents
public double evaluate(MapDocument doc1, MapDocument doc2) {
DescriptiveStatistics stats = new DescriptiveStatistics();
for (FieldConf fieldConf : fields) {
double weight = fieldConf.getWeight();
double similarity = comparator(fieldConf).compare(doc1.getFieldMap().get(fieldConf.getField()), doc2.getFieldMap().get(fieldConf.getField()));
//if similarity is -1 means that a comparator gave undefined, do not add result to the stats
if (similarity != -1) {
stats.addValue(weight * similarity);
else {
if (!ignoreMissing) //if the missing value has not to be ignored, return -1
return -1;
switch (aggregation){
case AVG:
return stats.getMean();
case SUM:
return stats.getSum();
case MAX:
return stats.getMax();
case MIN:
return stats.getMin();
return 0.0;
private Comparator comparator(final FieldConf field){
return PaceConfig.paceResolver.getComparator(field.getComparator(), field.getParams());
public TreeNodeDef(List<FieldConf> fields, double threshold, AggType aggregation, String positive, String negative, String undefined) {
this.fields = fields;
this.threshold = threshold;
this.aggregation = aggregation;
this.positive = positive;
this.negative = negative;
this.undefined = undefined;
public boolean isIgnoreMissing() {
return ignoreMissing;
public void setIgnoreMissing(boolean ignoreMissing) {
this.ignoreMissing = ignoreMissing;
public List<FieldConf> getFields() {
return fields;
public void setFields(List<FieldConf> fields) {
this.fields = fields;
public double getThreshold() {
return threshold;
public void setThreshold(double threshold) {
this.threshold = threshold;
public AggType getAggregation() {
return aggregation;
public void setAggregation(AggType aggregation) {
this.aggregation = aggregation;
public String getPositive() {
return positive;
public void setPositive(String positive) {
this.positive = positive;
public String getNegative() {
return negative;
public void setNegative(String negative) {
this.negative = negative;
public String getUndefined() {
return undefined;
public void setUndefined(String undefined) {
this.undefined = undefined;
public String toString() {
try {
return new ObjectMapper().writeValueAsString(this);
} catch (IOException e) {
throw new PaceException("Impossible to convert to JSON: ", e);

@ -1,33 +0,0 @@
package eu.dnetlib.pace.tree;
import eu.dnetlib.pace.model.Field;
import org.apache.commons.lang.StringUtils;
import java.util.Map;
abstract class AbstractComparator implements Comparator {
Map<String, Number> params;
public AbstractComparator(Map<String, Number> params){
this.params = params;
public double compare(Field a, Field b) {
return 0.0;
public static double stringSimilarity(String s1, String s2) {
String longer = s1, shorter = s2;
if (s1.length() < s2.length()) { // longer should always have greater length
longer = s2; shorter = s1;
int longerLength = longer.length();
if (longerLength == 0) //if strings have 0 length return 0 (no similarity)
return 0.0;
return (longerLength - StringUtils.getLevenshteinDistance(longer, shorter)) / (double) longerLength;

@ -1,42 +0,0 @@
package eu.dnetlib.pace.tree;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
import java.util.List;
import java.util.Map;
public class CoauthorsMatch extends AbstractComparator {
public CoauthorsMatch(Map<String, Number> params) {
public double compare(Field a, Field b) {
final List<String> c1 = ((FieldList) a).stringList();
final List<String> c2 = ((FieldList) b).stringList();
int size1 = c1.size();
int size2 = c2.size();
//few coauthors or too many coauthors
if (size1 < params.getOrDefault("minCoauthors", 5).intValue() || size2 < params.getOrDefault("minCoauthors", 5).intValue() || (size1+size2 > params.getOrDefault("maxCoauthors", 200).intValue()))
return -1;
int coauthorship = 0;
for (String ca1: c1){
for (String ca2: c2){
if (stringSimilarity(ca1.replaceAll("\\.","").replaceAll(" ",""), ca2.replaceAll("\\.","").replaceAll(" ",""))>= params.getOrDefault("simTh", 0.7).doubleValue())
return coauthorship;

@ -1,10 +0,0 @@
package eu.dnetlib.pace.tree;
import eu.dnetlib.pace.model.Field;
public interface Comparator {
//compare two fields and returns: the distace measure, -1 if undefined
public double compare(Field a, Field b);

@ -1,14 +0,0 @@
package eu.dnetlib.pace.tree;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
public @interface ComparatorClass {
public String value();

@ -1,25 +0,0 @@
package eu.dnetlib.pace.tree;
import eu.dnetlib.pace.model.Field;
import java.util.Map;
public class ExactMatch extends AbstractComparator {
public ExactMatch(Map<String, Number> params) {
public double compare(Field a, Field b) {
if (a.stringValue().isEmpty() || b.stringValue().isEmpty())
return -1;
else if (a.stringValue().equals(b.stringValue()))
return 1;
return 0;

@ -1,31 +0,0 @@
package eu.dnetlib.pace.tree;
import eu.dnetlib.pace.model.Field;
import java.util.Map;
public class SimilarMatch extends AbstractComparator {
public SimilarMatch(Map<String, Number> params) {
public double compare(Field a, Field b) {
if (a.stringValue().isEmpty() || b.stringValue().isEmpty())
return -1; //undefined if one name is missing
//take only the first name
String firstname1 = a.stringValue().split(" ")[0];
String firstname2 = b.stringValue().split(" ")[0];
if (firstname1.toLowerCase().trim().replaceAll("\\.","").replaceAll("\\s","").length()<=2 || firstname2.toLowerCase().replaceAll("\\.", "").replaceAll("\\s","").length()<=2) //too short names (considered similar)
return 1;
return stringSimilarity(firstname1,firstname2);

@ -1,36 +0,0 @@
package eu.dnetlib.pace.tree;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldListImpl;
import java.util.Map;
public class TopicsMatch extends AbstractComparator {
public TopicsMatch(Map<String, Number> params) {
public double compare(Field a, Field b) {
double[] t1 = ((FieldListImpl) a).doubleArray();
double[] t2 = ((FieldListImpl) b).doubleArray();
if (t1 == null || t2 == null)
return -1; //0 similarity if no topics in one of the authors or in both
double area = 0.0;
double min_value[] = new double[t1.length];
for(int i=0; i<t1.length; i++){
min_value[i] = (t1[i]<t2[i])?t1[i]:t2[i];
area += min_value[i];
return area;

@ -1,22 +0,0 @@
package eu.dnetlib.pace.tree;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldList;
import java.util.List;
import java.util.Map;
public class UndefinedNode implements Comparator {
Map<String, Number> params;
public double compare(Field a, Field b) {
final List<String> sa = ((FieldList) a).stringList();
final List<String> sb = ((FieldList) b).stringList();
return 0;

@ -1,21 +0,0 @@
import eu.dnetlib.pace.util.PaceException;
public enum AggType {
public static AggType getEnum(String value) {
try {
return AggType.valueOf(value);
catch (IllegalArgumentException e) {
throw new PaceException("Undefined aggregation type", e);

@ -1,20 +0,0 @@
public enum MatchType {
public static MatchType getEnum(String value) {
try {
return MatchType.valueOf(value);
catch (IllegalArgumentException e) {
return MatchType.UNDEFINED;

@ -7,8 +7,6 @@ import eu.dnetlib.pace.condition.ConditionClass;
import eu.dnetlib.pace.distance.DistanceAlgo;
import eu.dnetlib.pace.distance.DistanceClass;
import eu.dnetlib.pace.model.FieldDef;
import eu.dnetlib.pace.tree.Comparator;
import eu.dnetlib.pace.tree.ComparatorClass;
import org.reflections.Reflections;
@ -22,7 +20,6 @@ public class PaceResolver implements Serializable {
private final Map<String, Class<ClusteringFunction>> clusteringFunctions;
private final Map<String, Class<ConditionAlgo>> conditionAlgos;
private final Map<String, Class<DistanceAlgo>> distanceAlgos;
private final Map<String, Class<Comparator>> comparators;
public PaceResolver() {
@ -37,10 +34,6 @@ public class PaceResolver implements Serializable {
this.distanceAlgos = new Reflections("eu.dnetlib").getTypesAnnotatedWith(DistanceClass.class).stream()
.collect(Collectors.toMap(cl -> cl.getAnnotation(DistanceClass.class).value(), cl -> (Class<DistanceAlgo>)cl));
this.comparators = new Reflections("eu.dnetlib").getTypesAnnotatedWith(ComparatorClass.class).stream()
.collect(Collectors.toMap(cl -> cl.getAnnotation(ComparatorClass.class).value(), cl -> (Class<Comparator>) cl));
public ClusteringFunction getClusteringFunction(String name, Map<String, Integer> params) throws PaceException {
@ -67,12 +60,4 @@ public class PaceResolver implements Serializable {
public Comparator getComparator(String name, Map<String, Number> params) throws PaceException {
try {
return comparators.get(name).getDeclaredConstructor(Map.class).newInstance(params);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | NullPointerException e) {
throw new PaceException(name + " not found ", e);