forked from D-Net/dnet-hadoop
updated maven project structure
This commit is contained in:
parent
72ebf7c0f3
commit
1b46966383
|
@ -1,70 +1,60 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet45-parent</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<relativePath />
|
||||
<artifactId>dnet-dedup</artifactId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
|
||||
<artifactId>dnet-pace-core</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>2.6.8-SNAPSHOT</version>
|
||||
<scm>
|
||||
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-pace-core/trunk</developerConnection>
|
||||
</scm>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>edu.cmu</groupId>
|
||||
<artifactId>secondstring</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>15.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
<version>${google.gson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-lang</groupId>
|
||||
<artifactId>commons-lang</artifactId>
|
||||
<version>${commons.lang.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<version>${commons.io.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-collections</groupId>
|
||||
<artifactId>commons-collections</artifactId>
|
||||
<version>${commons.collections.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.googlecode.protobuf-java-format</groupId>
|
||||
<artifactId>protobuf-java-format</artifactId>
|
||||
<version>1.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.antlr</groupId>
|
||||
<artifactId>stringtemplate</artifactId>
|
||||
<version>3.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<version>${commons.logging.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>${junit.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,199 @@
|
|||
package eu.dnetlib.pace.util;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import eu.dnetlib.pace.clustering.NGramUtils;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.config.WfConfig;
|
||||
import eu.dnetlib.pace.distance.PaceDocumentDistance;
|
||||
import eu.dnetlib.pace.distance.eval.ScoreResult;
|
||||
import eu.dnetlib.pace.model.Field;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import eu.dnetlib.pace.model.MapDocumentComparator;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
public class BlockProcessor {
|
||||
|
||||
public static final List<String> accumulators= new ArrayList<>();
|
||||
|
||||
private static final Log log = LogFactory.getLog(BlockProcessor.class);
|
||||
|
||||
private DedupConfig dedupConf;
|
||||
|
||||
|
||||
public static void constructAccumulator( final DedupConfig dedupConf) {
|
||||
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "records per hash key = 1"));
|
||||
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()));
|
||||
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize())));
|
||||
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "skip list"));
|
||||
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)"));
|
||||
accumulators.add(String.format("%s::%s",dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()));
|
||||
}
|
||||
|
||||
public BlockProcessor(DedupConfig dedupConf) {
|
||||
this.dedupConf = dedupConf;
|
||||
}
|
||||
|
||||
public void process(final String key, final Iterable<MapDocument> documents, final Reporter context) {
|
||||
|
||||
final Queue<MapDocument> q = prepare(documents);
|
||||
|
||||
if (q.size() > 1) {
|
||||
log.info("reducing key: '" + key + "' records: " + q.size());
|
||||
//process(q, context);
|
||||
process(simplifyQueue(q, key, context), context);
|
||||
} else {
|
||||
context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1);
|
||||
}
|
||||
}
|
||||
|
||||
private Queue<MapDocument> prepare(final Iterable<MapDocument> documents) {
|
||||
final Queue<MapDocument> queue = new PriorityQueue<>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
|
||||
|
||||
final Set<String> seen = new HashSet<String>();
|
||||
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
|
||||
|
||||
documents.forEach(doc -> {
|
||||
if (queue.size() <= queueMaxSize) {
|
||||
final String id = doc.getIdentifier();
|
||||
|
||||
if (!seen.contains(id)) {
|
||||
seen.add(id);
|
||||
queue.add(doc);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Reporter context) {
|
||||
final Queue<MapDocument> q = new LinkedList<>();
|
||||
|
||||
String fieldRef = "";
|
||||
final List<MapDocument> tempResults = Lists.newArrayList();
|
||||
|
||||
while (!queue.isEmpty()) {
|
||||
final MapDocument result = queue.remove();
|
||||
|
||||
final String orderFieldName = dedupConf.getWf().getOrderField();
|
||||
final Field orderFieldValue = result.values(orderFieldName);
|
||||
if (!orderFieldValue.isEmpty()) {
|
||||
final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
|
||||
if (field.equals(fieldRef)) {
|
||||
tempResults.add(result);
|
||||
} else {
|
||||
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
||||
tempResults.clear();
|
||||
tempResults.add(result);
|
||||
fieldRef = field;
|
||||
}
|
||||
} else {
|
||||
context.incrementCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField(), 1);
|
||||
}
|
||||
}
|
||||
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
||||
|
||||
return q;
|
||||
}
|
||||
|
||||
private void populateSimplifiedQueue(final Queue<MapDocument> q,
|
||||
final List<MapDocument> tempResults,
|
||||
final Reporter context,
|
||||
final String fieldRef,
|
||||
final String ngram) {
|
||||
WfConfig wf = dedupConf.getWf();
|
||||
if (tempResults.size() < wf.getGroupMaxSize()) {
|
||||
q.addAll(tempResults);
|
||||
} else {
|
||||
context.incrementCounter(wf.getEntityType(), String.format("Skipped records for count(%s) >= %s", wf.getOrderField(), wf.getGroupMaxSize()), tempResults.size());
|
||||
log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
|
||||
}
|
||||
}
|
||||
|
||||
private void process(final Queue<MapDocument> queue, final Reporter context) {
|
||||
|
||||
final PaceDocumentDistance algo = new PaceDocumentDistance();
|
||||
|
||||
while (!queue.isEmpty()) {
|
||||
|
||||
final MapDocument pivot = queue.remove();
|
||||
final String idPivot = pivot.getIdentifier();
|
||||
|
||||
WfConfig wf = dedupConf.getWf();
|
||||
final Field fieldsPivot = pivot.values(wf.getOrderField());
|
||||
final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
|
||||
|
||||
if (fieldPivot != null) {
|
||||
// System.out.println(idPivot + " --> " + fieldPivot);
|
||||
|
||||
int i = 0;
|
||||
for (final MapDocument curr : queue) {
|
||||
final String idCurr = curr.getIdentifier();
|
||||
|
||||
if (mustSkip(idCurr)) {
|
||||
|
||||
context.incrementCounter(wf.getEntityType(), "skip list", 1);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if (i > wf.getSlidingWindowSize()) {
|
||||
break;
|
||||
}
|
||||
|
||||
final Field fieldsCurr = curr.values(wf.getOrderField());
|
||||
final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
|
||||
|
||||
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
|
||||
|
||||
final ScoreResult sr = similarity(algo, pivot, curr);
|
||||
log.info(sr.toString()+"SCORE "+ sr.getScore());
|
||||
emitOutput(sr, idPivot, idCurr, context);
|
||||
i++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Reporter context) {
|
||||
final double d = sr.getScore();
|
||||
|
||||
if (d >= dedupConf.getWf().getThreshold()) {
|
||||
|
||||
writeSimilarity(context, idPivot, idCurr);
|
||||
context.incrementCounter(dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)", 1);
|
||||
} else {
|
||||
context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
|
||||
try {
|
||||
return algo.between(a, b, dedupConf);
|
||||
} catch(Throwable e) {
|
||||
log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean mustSkip(final String idPivot) {
|
||||
return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
|
||||
}
|
||||
|
||||
private String getNsPrefix(final String id) {
|
||||
return StringUtils.substringBetween(id, "|", "::");
|
||||
}
|
||||
|
||||
private void writeSimilarity(final Reporter context, final String from, final String to) {
|
||||
final String type = dedupConf.getWf().getEntityType();
|
||||
|
||||
context.emit(type, from, to);
|
||||
context.emit(type, to, from);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
package eu.dnetlib.pace.util;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public interface Reporter extends Serializable {
|
||||
|
||||
void incrementCounter(String counterGroup, String counterName, long delta);
|
||||
|
||||
void emit(String type, String from, String to);
|
||||
}
|
Loading…
Reference in New Issue