From d0edb7b773e9a19d01c00209c3e1a1d07c82906d Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 2 Oct 2018 17:07:17 +0200 Subject: [PATCH] Added First Implementation of Spark Test --- dnet-dedup-test/pom.xml | 61 ++++++ .../main/java/eu/dnetlib/BlockProcessor.java | 190 ++++++++++++++++++ .../src/main/java/eu/dnetlib/Counters.java | 28 +++ .../src/main/java/eu/dnetlib/Reporter.java | 13 ++ .../src/main/java/eu/dnetlib/SparkTest.java | 108 ++++++++++ .../data/transform/AbstractProtoMapper.java | 150 ++++++++++++++ .../pace/model/ProtoDocumentBuilder.java | 36 ++++ .../java/eu/dnetlib/pace/utils/PaceUtils.java | 90 +++++++++ .../eu/dnetlib/proto/utils/OAFProtoUtils.java | 44 ++++ .../eu/dnetlib/pace/organization.pace.conf | 34 ++++ .../eu/dnetlib/pace/result.full.pace.conf | 51 +++++ .../eu/dnetlib/pace/result.simple.pace.conf | 21 ++ dnet-pace-core/pom.xml | 2 +- .../BlacklistAwareClusteringCombiner.java | 7 +- .../eu/dnetlib/pace/config/DedupConfig.java | 3 +- .../java/eu/dnetlib/pace/model/Field.java | 4 +- .../eu/dnetlib/pace/model/MapDocument.java | 3 +- 17 files changed, 839 insertions(+), 6 deletions(-) create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/Counters.java create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/data/transform/AbstractProtoMapper.java create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/pace/model/ProtoDocumentBuilder.java create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/pace/utils/PaceUtils.java create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/proto/utils/OAFProtoUtils.java create mode 100644 dnet-dedup-test/src/main/resources/eu/dnetlib/pace/organization.pace.conf create mode 100644 dnet-dedup-test/src/main/resources/eu/dnetlib/pace/result.full.pace.conf create mode 100644 dnet-dedup-test/src/main/resources/eu/dnetlib/pace/result.simple.pace.conf diff --git a/dnet-dedup-test/pom.xml b/dnet-dedup-test/pom.xml index 0b01f96..885a497 100644 --- a/dnet-dedup-test/pom.xml +++ b/dnet-dedup-test/pom.xml @@ -15,15 +15,76 @@ 1.0-SNAPSHOT + + + + + + + + + + + + + + + + + + + + + + + + + + + + eu.dnetlib + dnet-openaire-data-protos + 3.9.3-proto250 + + eu.dnetlib dnet-pace-core 2.6.8-SNAPSHOT + + org.apache.spark + spark-core_2.11 + ${spark.version} + + + + + + + + + + + + + + + + + + + + + + + 2.2.0 + + \ No newline at end of file diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java b/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java new file mode 100644 index 0000000..db212ba --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/BlockProcessor.java @@ -0,0 +1,190 @@ +package eu.dnetlib; + +import com.google.common.collect.Lists; +import eu.dnetlib.pace.clustering.NGramUtils; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.config.WfConfig; +import eu.dnetlib.pace.distance.PaceDocumentDistance; +import eu.dnetlib.pace.distance.eval.ScoreResult; +import eu.dnetlib.pace.model.Field; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.model.MapDocumentComparator; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Stream; + +public class BlockProcessor { + + private static final Log log = LogFactory.getLog(BlockProcessor.class); + + private DedupConfig dedupConf; + + public BlockProcessor(DedupConfig dedupConf) { + this.dedupConf = dedupConf; + } + + public List> process(final String key, final Stream documents, final Reporter context) throws IOException, InterruptedException { + + final Queue q = prepare(documents); + + if (q.size() > 1) { + log.info("reducing key: '" + key + "' records: " + q.size()); + //process(q, context); + return process(simplifyQueue(q, key, context), context); + } else { + context.incrementCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1", 1); + return new ArrayList<>(); + } + } + + private Queue prepare(final Stream documents) { + final Queue queue = new PriorityQueue(100, new MapDocumentComparator(dedupConf.getWf().getOrderField())); + + final Set seen = new HashSet(); + final int queueMaxSize = dedupConf.getWf().getQueueMaxSize(); + + documents.forEach(doc -> { + if (queue.size() <= queueMaxSize) { + final String id = doc.getIdentifier(); + + if (!seen.contains(id)) { + seen.add(id); + queue.add(doc); + } + } + }); + + return queue; + } + + private Queue simplifyQueue(final Queue queue, final String ngram, final Reporter context) { + final Queue q = new LinkedList(); + + String fieldRef = ""; + final List tempResults = Lists.newArrayList(); + + while (!queue.isEmpty()) { + final MapDocument result = queue.remove(); + + final String orderFieldName = dedupConf.getWf().getOrderField(); + final Field orderFieldValue = result.values(orderFieldName); + if (!orderFieldValue.isEmpty()) { + final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue()); + if (field.equals(fieldRef)) { + tempResults.add(result); + } else { + populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram); + tempResults.clear(); + tempResults.add(result); + fieldRef = field; + } + } else { + context.incrementCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField(), 1); + } + } + populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram); + + return q; + } + + private void populateSimplifiedQueue(final Queue q, + final List tempResults, + final Reporter context, + final String fieldRef, + final String ngram) { + WfConfig wf = dedupConf.getWf(); + if (tempResults.size() < wf.getGroupMaxSize()) { + q.addAll(tempResults); + } else { + context.incrementCounter(wf.getEntityType(), String.format("Skipped records for count(%s) >= %s", wf.getOrderField(), wf.getGroupMaxSize()), tempResults.size()); + log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram); + } + } + + private List> process(final Queue queue, final Reporter context) throws IOException, InterruptedException { + + final PaceDocumentDistance algo = new PaceDocumentDistance(); + List> resultEmit = new ArrayList<>(); + + while (!queue.isEmpty()) { + + final MapDocument pivot = queue.remove(); + final String idPivot = pivot.getIdentifier(); + + WfConfig wf = dedupConf.getWf(); + final Field fieldsPivot = pivot.values(wf.getOrderField()); + final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue(); + + if (fieldPivot != null) { + // System.out.println(idPivot + " --> " + fieldPivot); + + int i = 0; + for (final MapDocument curr : queue) { + final String idCurr = curr.getIdentifier(); + + if (mustSkip(idCurr)) { + + context.incrementCounter(wf.getEntityType(), "skip list", 1); + + break; + } + + if (i > wf.getSlidingWindowSize()) { + break; + } + + final Field fieldsCurr = curr.values(wf.getOrderField()); + final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue(); + + if (!idCurr.equals(idPivot) && (fieldCurr != null)) { + + final ScoreResult sr = similarity(algo, pivot, curr); + emitOutput(sr, idPivot, idCurr,context, resultEmit); + i++; + } + } + } + } + return resultEmit; + } + + private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Reporter context,List> emitResult) throws IOException, InterruptedException { + final double d = sr.getScore(); + + if (d >= dedupConf.getWf().getThreshold()) { + + writeSimilarity(idPivot, idCurr, emitResult); + context.incrementCounter(dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)", 1); + } else { + context.incrementCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold(), 1); + } + } + + private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) { + try { + return algo.between(a, b, dedupConf); + } catch(Throwable e) { + log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e); + throw new IllegalArgumentException(e); + } + } + + private boolean mustSkip(final String idPivot) { + return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot)); + } + + private String getNsPrefix(final String id) { + return StringUtils.substringBetween(id, "|", "::"); + } + + private void writeSimilarity( final String from, final String to, List> emitResult){ + emitResult.add(new Tuple2<>(from, to)); + emitResult.add(new Tuple2<>( to, from)); + } + +} diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/Counters.java b/dnet-dedup-test/src/main/java/eu/dnetlib/Counters.java new file mode 100644 index 0000000..f9ce0c6 --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/Counters.java @@ -0,0 +1,28 @@ +package eu.dnetlib; + +import org.apache.commons.logging.Log; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + +class Counters extends HashMap> implements Serializable { + + public AtomicLong get(String counterGroup, String counterName) { + if (!super.containsKey(counterGroup)) { + super.put(counterGroup, new HashMap<>()); + } + if (!super.get(counterGroup).containsKey(counterName)) { + super.get(counterGroup).put(counterName, new AtomicLong(0)); + } + return super.get(counterGroup).get(counterName); + } + + public void print(final Log log) { + entrySet().forEach(cg -> { + cg.getValue().entrySet().forEach(cn -> { + log.info(cg.getKey() + " " + cn.getKey() + " " + cn.getValue()); + }); + }); + } +} \ No newline at end of file diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java b/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java new file mode 100644 index 0000000..6dc675a --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/Reporter.java @@ -0,0 +1,13 @@ +package eu.dnetlib; + + +import java.io.IOException; +import java.io.Serializable; + +public interface Reporter extends Serializable { + + void incrementCounter(String counterGroup, String counterName, long delta); + + void emit(final String type, final String from, final String to) throws IOException, InterruptedException; + +} diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java b/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java new file mode 100644 index 0000000..14a7abf --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/SparkTest.java @@ -0,0 +1,108 @@ +package eu.dnetlib; + +import com.google.common.collect.Sets; +import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.utils.PaceUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; + + +import java.io.IOException; +import java.io.StringWriter; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + + +public class SparkTest { + + class Results extends HashMap> { + public Results(Set keys) { + super(keys.size()); + keys.forEach(k -> put(k, new HashSet<>())); + } + } + + + public static void main(String[] args) { + final JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Hello World").setMaster("local[*]")); + final JavaRDD dataRDD = context.textFile("file:///Users/sandro/Downloads/organizations.json"); + + final Counters c = new Counters(); + + long count = dataRDD.mapToPair(it -> { + final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf")); + MapDocument mapDocument = PaceUtils.asMapDocument(config, it); + return new Tuple2<>(mapDocument.getIdentifier(), mapDocument); + }).reduceByKey((a, b) -> a).flatMapToPair(a -> { + final MapDocument currentDocument = a._2(); + final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf")); + return getGroupingKeys(config, currentDocument).stream() + .map(it -> new Tuple2<>(it, currentDocument)).collect(Collectors.toList()).iterator(); + }).groupByKey().flatMapToPair(it -> { + final DedupConfig config = DedupConfig.load(readFromClasspath("/eu/dnetlib/pace/organization.pace.conf")); + return process(config, it, c).iterator(); + }).count(); + + + System.out.println("total Element = " + count); + +// final MapDocument resA = result(config, "A", "Recent results from CDF"); +// final MapDocument resB = result(config, "B", "Recent results from CDF"); +// +// final ScoreResult sr = new PaceDocumentDistance().between(resA, resB, config); +// final double d = sr.getScore(); +// System.out.println(String.format(" d ---> %s", d)); + + } + + + static String readFromClasspath(final String filename) { + final StringWriter sw = new StringWriter(); + try { + IOUtils.copy(SparkTest.class.getResourceAsStream(filename), sw); + return sw.toString(); + } catch (final IOException e) { + throw new RuntimeException("cannot load resource from classpath: " + filename); + } + } + + + static Set getGroupingKeys(DedupConfig conf, MapDocument doc) { + return Sets.newHashSet(BlacklistAwareClusteringCombiner.filterAndCombine(doc, conf)); + } + + + static List> process(DedupConfig conf, Tuple2> entry, Counters c) { + try { + return new BlockProcessor(conf).process(entry._1(), StreamSupport.stream(entry._2().spliterator(),false), new Reporter() { + @Override + public void incrementCounter(String counterGroup, String counterName, long delta) { + c.get(counterGroup, counterName).addAndGet(delta); + } + + @Override + public void emit(String type, String from, String to) { + + } + }); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + return new ArrayList<>(); + } + } + + + + + + + + + +} diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/data/transform/AbstractProtoMapper.java b/dnet-dedup-test/src/main/java/eu/dnetlib/data/transform/AbstractProtoMapper.java new file mode 100644 index 0000000..2d5a6e0 --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/data/transform/AbstractProtoMapper.java @@ -0,0 +1,150 @@ +package eu.dnetlib.data.transform; + +import java.util.List; + +import org.apache.commons.lang.StringUtils; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.protobuf.Descriptors.EnumValueDescriptor; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.GeneratedMessage; +import com.google.protobuf.Message; +import com.googlecode.protobuf.format.JsonFormat; + +import eu.dnetlib.pace.config.Type; + +/** + * AbstractProtoMapper provide common navigation methods on the protocolbuffers Messages. + * + * @author claudio + */ +public abstract class AbstractProtoMapper { + + private static final String COND_WRAPPER = "\\{|\\}"; + private static final String COND_SEPARATOR = "#"; + /** The Constant PATH_SEPARATOR. */ + private static final String PATH_SEPARATOR = "/"; + + /** + * Process multi path. + * + * @param proto + * the proto + * @param paths + * the paths + * @return the list + */ + protected List processMultiPath(final GeneratedMessage proto, final List paths, final Type type) { + final List response = Lists.newArrayList(); + for (final String pathElements : paths) { + response.addAll(processPath(proto, pathElements, type)); + } + return response; + } + + /** + * Process path. + * + * @param proto + * the proto + * @param path + * the path + * @return the list + */ + protected List processPath(final GeneratedMessage proto, final String path, final Type type) { + return processPath(proto, Lists.newLinkedList(Splitter.on(PATH_SEPARATOR).trimResults().split(path)), type); + } + + /** + * Process path. + * + * @param proto + * the proto + * @param pathElements + * the list + * @return the list + */ + protected List processPath(final GeneratedMessage proto, final List pathElements, final Type type) { + + final List response = Lists.newArrayList(); + + if (pathElements.isEmpty()) throw new RuntimeException("ProtoBuf navigation path is empty"); + + final String fieldPathCond = pathElements.get(0); + + final String fieldPath = StringUtils.substringBefore(fieldPathCond, "["); + final String cond = getCondition(fieldPathCond); + + final FieldDescriptor fd = proto.getDescriptorForType().findFieldByName(fieldPath); + if ((fd != null)) { + if (fd.isRepeated()) { + final int count = proto.getRepeatedFieldCount(fd); + for (int i = 0; i < count; i++) { + final Object field = proto.getRepeatedField(fd, i); + response.addAll(generateFields(fd, field, pathElements, cond, type)); + } + } else { + final Object field = proto.getField(fd); + response.addAll(generateFields(fd, field, pathElements, cond, type)); + } + } else throw new IllegalArgumentException("Invalid protobuf path (field not found): " + StringUtils.join(pathElements, ">") + "\nMessage:\n" + proto); + + return response; + } + + /** + * Generate fields. + * + * @param fd + * the fd + * @param field + * the field + * @param list + * the list + * @return the list + */ + private List generateFields(final FieldDescriptor fd, final Object field, final List list, final String cond, final Type type) { + + final List res = Lists.newArrayList(); + if (field instanceof GeneratedMessage) { + if (list.size() > 1) { + + if (StringUtils.isBlank(cond)) return processPath((GeneratedMessage) field, list.subList(1, list.size()), type); + else { + + final List condPath = + Lists.newLinkedList(Splitter.on(COND_SEPARATOR).trimResults().split(StringUtils.substringBefore(cond, "="))); + + final String val = (String) Iterables.getOnlyElement(processPath((GeneratedMessage) field, condPath, type)); + final String condVal = StringUtils.substringAfter(cond, "=").replaceAll(COND_WRAPPER, "").trim(); + + return val.equals(condVal) ? processPath((GeneratedMessage) field, list.subList(1, list.size()), type) : res; + } + } + else if (Type.JSON.equals(type)) { + res.add(JsonFormat.printToString((Message) field)); + return res; + } else throw new RuntimeException("No primitive type found"); + } else { + if (list.size() == 1) { + + switch (fd.getType()) { + case ENUM: + res.add(((EnumValueDescriptor) field).getName()); + break; + default: + res.add(field); + break; + } + return res; + } + else throw new RuntimeException("Found a primitive type before the path end"); + } + } + + private String getCondition(final String fieldPathCond) { + return fieldPathCond.contains("[") ? StringUtils.substringAfter(fieldPathCond, "[").replace("]", "") : ""; + } +} \ No newline at end of file diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/pace/model/ProtoDocumentBuilder.java b/dnet-dedup-test/src/main/java/eu/dnetlib/pace/model/ProtoDocumentBuilder.java new file mode 100644 index 0000000..bb53424 --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/pace/model/ProtoDocumentBuilder.java @@ -0,0 +1,36 @@ +package eu.dnetlib.pace.model; + +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Maps; +import com.google.protobuf.GeneratedMessage; + +import eu.dnetlib.data.transform.AbstractProtoMapper; + +public class ProtoDocumentBuilder extends AbstractProtoMapper { + + public static MapDocument newInstance(final String id, final GeneratedMessage proto, final List fields) { + final Map fieldMap = new ProtoDocumentBuilder().generateFieldMap(proto, fields); + return new MapDocument(id, fieldMap); + } + + private Map generateFieldMap(final GeneratedMessage proto, final List fields) { + final Map fieldMap = Maps.newHashMap(); + + for (final FieldDef fd : fields) { + + final FieldList fl = new FieldListImpl(fd.getName(), fd.getType()); + + for (final Object o : processPath(proto, fd.getPathList(), fd.getType())) { + + fl.add(new FieldValueImpl(fd.getType(), fd.getName(), o)); + } + + fieldMap.put(fd.getName(), fl); + } + + return fieldMap; + } + +} \ No newline at end of file diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/pace/utils/PaceUtils.java b/dnet-dedup-test/src/main/java/eu/dnetlib/pace/utils/PaceUtils.java new file mode 100644 index 0000000..f1ff217 --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/pace/utils/PaceUtils.java @@ -0,0 +1,90 @@ +package eu.dnetlib.pace.utils; + +import com.google.common.collect.Lists; +import com.googlecode.protobuf.format.JsonFormat; +import eu.dnetlib.data.proto.OafProtos; +import eu.dnetlib.data.proto.ResultProtos; +import eu.dnetlib.pace.config.Config; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.model.ProtoDocumentBuilder; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static eu.dnetlib.proto.utils.OAFProtoUtils.*; +import static eu.dnetlib.proto.utils.OAFProtoUtils.author; +import static eu.dnetlib.proto.utils.OAFProtoUtils.sp; + +public class PaceUtils { + + public static MapDocument result(final Config config, final String id, final String title) { + return result(config, id, title, null, new ArrayList<>(), null); + } + + public static MapDocument result(final Config config, final String id, final String title, final String date) { + return result(config, id, title, date, new ArrayList<>(), null); + } + + public static MapDocument result(final Config config, final String id, final String title, final String date, final List pid) { + return result(config, id, title, date, pid, null); + } + + public static MapDocument result(final Config config, final String id, final String title, final String date, final String pid) { + return result(config, id, title, date, pid, null); + } + + public static MapDocument result(final Config config, final String id, final String title, final String date, final String pid, final List authors) { + return result(config, id, title, date, Lists.newArrayList(pid), authors); + } + + public static MapDocument result(final Config config, final String id, final String title, final String date, final List pid, final List authors) { + final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder(); + if (!StringUtils.isBlank(title)) { + metadata.addTitle(getStruct(title, getQualifier("main title", "dnet:titles"))); + metadata.addTitle(getStruct(RandomStringUtils.randomAlphabetic(10), getQualifier("alternative title", "dnet:titles"))); + } + if (!StringUtils.isBlank(date)) { + metadata.setDateofacceptance(sf(date)); + } + + final OafProtos.OafEntity.Builder entity = oafEntity(id, eu.dnetlib.data.proto.TypeProtos.Type.result); + final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata); + + if (authors != null) { + result.getMetadataBuilder().addAllAuthor( + IntStream.range(0, authors.size()) + .mapToObj(i -> author(authors.get(i), i)) + .collect(Collectors.toCollection(LinkedList::new))); + } + + entity.setResult(result); + + if (pid != null) { + for (String p : pid) { + if (!StringUtils.isBlank(p)) { + entity.addPid(sp(p, "doi")); + //entity.addPid(sp(RandomStringUtils.randomAlphabetic(10), "oai")); + } + } + } + + final OafProtos.OafEntity build = entity.build(); + return ProtoDocumentBuilder.newInstance(id, build, config.model()); + } + + public static MapDocument asMapDocument(DedupConfig conf, final String json) { + OafProtos.OafEntity.Builder b = OafProtos.OafEntity.newBuilder(); + try { + JsonFormat.merge(json, b); + } catch (JsonFormat.ParseException e) { + throw new IllegalArgumentException(e); + } + return ProtoDocumentBuilder.newInstance(b.getId(), b.build(), conf.getPace().getModel()); + } +} diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/proto/utils/OAFProtoUtils.java b/dnet-dedup-test/src/main/java/eu/dnetlib/proto/utils/OAFProtoUtils.java new file mode 100644 index 0000000..4a32638 --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/proto/utils/OAFProtoUtils.java @@ -0,0 +1,44 @@ +package eu.dnetlib.proto.utils; + +import eu.dnetlib.data.proto.FieldTypeProtos; +import eu.dnetlib.data.proto.OafProtos; + +public class OAFProtoUtils { + + + public static FieldTypeProtos.Author author(final String s, int rank) { + final eu.dnetlib.pace.model.Person p = new eu.dnetlib.pace.model.Person(s, false); + final FieldTypeProtos.Author.Builder author = FieldTypeProtos.Author.newBuilder(); + if (p.isAccurate()) { + author.setName(p.getNormalisedFirstName()); + author.setSurname(p.getNormalisedSurname()); + } + author.setFullname(p.getNormalisedFullname()); + author.setRank(rank); + + return author.build(); + } + + public static FieldTypeProtos.StructuredProperty sp(final String pid, final String type) { + FieldTypeProtos.StructuredProperty.Builder pidSp = FieldTypeProtos.StructuredProperty.newBuilder().setValue(pid) + .setQualifier(FieldTypeProtos.Qualifier.newBuilder().setClassid(type).setClassname(type).setSchemeid("dnet:pid_types").setSchemename("dnet:pid_types")); + return pidSp.build(); + } + + public static FieldTypeProtos.StringField.Builder sf(final String s) { return FieldTypeProtos.StringField.newBuilder().setValue(s); } + + public static FieldTypeProtos.StructuredProperty.Builder getStruct(final String value, final FieldTypeProtos.Qualifier.Builder qualifier) { + return FieldTypeProtos.StructuredProperty.newBuilder().setValue(value).setQualifier(qualifier); + } + + public static FieldTypeProtos.Qualifier.Builder getQualifier(final String classname, final String schemename) { + return + FieldTypeProtos.Qualifier.newBuilder().setClassid(classname).setClassname(classname).setSchemeid(schemename).setSchemename(schemename); + } + + public static OafProtos.OafEntity.Builder oafEntity(final String id, final eu.dnetlib.data.proto.TypeProtos.Type type) { + final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder().setId(id).setType(type); + return entity; + } + +} diff --git a/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/organization.pace.conf b/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/organization.pace.conf new file mode 100644 index 0000000..0dcfe51 --- /dev/null +++ b/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/organization.pace.conf @@ -0,0 +1,34 @@ +{ + "wf" : { + "threshold" : "0.85", + "dedupRun" : "001", + "entityType" : "organization", + "orderField" : "legalname", + "queueMaxSize" : "20000", + "groupMaxSize" : "20", + "slidingWindowSize" : "400", + "rootBuilder" : [ "organization", "projectOrganization_participation_isParticipant", "datasourceOrganization_provision_isProvidedBy" ], + "includeChildren" : "true" + }, + "pace" : { + "clustering" : [ + { "name" : "ngrampairs", "fields" : [ "legalname" ], "params" : { "max" : 1, "ngramLen" : "3" } }, + { "name" : "suffixprefix", "fields" : [ "legalname" ], "params" : { "max" : 1, "len" : "3" } }, + { "name" : "immutablefieldvalue", "fields" : [ "country" ], "params" : { } }, + { "name" : "spacetrimmingfieldvalue", "fields" : [ "legalshortname" ], "params" : { "randomLength" : "5" } }, + { "name" : "urlclustering", "fields" : [ "websiteurl" ], "params" : { } } + ], + "conditions" : [ + { "name" : "exactMatch", "fields" : [ "country" ] }, + { "name" : "mustBeDifferent", "fields" : [ "gridid" ] } + ], + "model" : [ + { "name" : "legalname", "algo" : "LevensteinTitle", "type" : "String", "weight" : "0.2", "ignoreMissing" : "false", "path" : "organization/metadata/legalname/value" }, + { "name" : "legalshortname", "algo" : "LevensteinTitle", "type" : "String", "weight" : "0.2", "ignoreMissing" : "true", "path" : "organization/metadata/legalshortname/value" }, + { "name" : "websiteurl", "algo" : "urlMatcher", "type" : "URL", "weight" : "0.6", "ignoreMissing" : "true", "path" : "organization/metadata/websiteurl/value", "params" : { "host" : 0.5, "path" : 0.5 } }, + { "name" : "country", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "organization/metadata/country/classid" }, + { "name" : "gridid", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "pid[qualifier#classid = {grid}]/value" } + ], + "blacklists" : { } + } +} \ No newline at end of file diff --git a/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/result.full.pace.conf b/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/result.full.pace.conf new file mode 100644 index 0000000..78529e0 --- /dev/null +++ b/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/result.full.pace.conf @@ -0,0 +1,51 @@ +{ + "wf" : { + "threshold" : "0.99", + "run" : "001", + "entityType" : "result", + "orderField" : "title", + "queueMaxSize" : "2000", + "groupMaxSize" : "10", + "slidingWindowSize" : "200", + "rootBuilder" : [ "result" ], + "includeChildren" : "true" + }, + "pace" : { + "clustering" : [ + { "name" : "acronyms", "fields" : [ "title" ], "params" : { "max" : "1", "minLen" : "2", "maxLen" : "4"} }, + { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, + { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } } + ], + "conditions" : [ + { "name" : "yearMatch", "fields" : [ "dateofacceptance" ] }, + { "name" : "titleVersionMatch", "fields" : [ "title" ] }, + { "name" : "sizeMatch", "fields" : [ "authors" ] } , + { "name" : "pidMatch", "fields" : [ "pid" ] } + ], + "model" : [ + { "name" : "pid", "algo" : "Null", "type" : "JSON", "weight" : "0.0", "ignoreMissing" : "true", "path" : "pid", "overrideMatch" : "true" }, + { "name" : "title", "algo" : "JaroWinkler", "type" : "String", "weight" : "1.0", "ignoreMissing" : "false", "path" : "result/metadata/title[qualifier#classid = {main title}]/value" }, + { "name" : "dateofacceptance", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "result/metadata/dateofacceptance/value" } , + { "name" : "authors", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "result/metadata/author/fullname" } + ], + "blacklists" : { + "title" : [ + "^(Corpus Oral Dialectal \\(COD\\)\\.).*$", + "^(Kiri Karl Morgensternile).*$", + "^(\\[Eksliibris Aleksandr).*\\]$", + "^(\\[Eksliibris Aleksandr).*$", + "^(Eksliibris Aleksandr).*$", + "^(Kiri A\\. de Vignolles).*$", + "^(2 kirja Karl Morgensternile).*$", + "^(Pirita kloostri idaosa arheoloogilised).*$", + "^(Kiri tundmatule).*$", + "^(Kiri Jenaer Allgemeine Literaturzeitung toimetusele).*$", + "^(Eksliibris Nikolai Birukovile).*$", + "^(Eksliibris Nikolai Issakovile).*$", + "^(WHP Cruise Summary Information of section).*$", + "^(Measurement of the top quark\\-pair production cross section with ATLAS in pp collisions at).*$", + "^(Measurement of the spin\\-dependent structure function).*" + ] } + } + +} \ No newline at end of file diff --git a/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/result.simple.pace.conf b/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/result.simple.pace.conf new file mode 100644 index 0000000..0249fb8 --- /dev/null +++ b/dnet-dedup-test/src/main/resources/eu/dnetlib/pace/result.simple.pace.conf @@ -0,0 +1,21 @@ +{ + "wf" : { + "threshold" : "0.99", + "run" : "001", + "entityType" : "result", + "orderField" : "title", + "queueMaxSize" : "2000", + "groupMaxSize" : "10", + "slidingWindowSize" : "200", + "rootBuilder" : [ "result" ], + "includeChildren" : "true" + }, + "pace" : { + "conditions" : [ ], + "model" : [ + { "name" : "title", "algo" : "JaroWinkler", "type" : "String", "weight" : "1.0", "ignoreMissing" : "false", "path" : "result/metadata/title[qualifier#classid = {main title}]/value" } + ], + "blacklists" : { } + } + +} \ No newline at end of file diff --git a/dnet-pace-core/pom.xml b/dnet-pace-core/pom.xml index 8e0e0f7..cb31ed6 100644 --- a/dnet-pace-core/pom.xml +++ b/dnet-pace-core/pom.xml @@ -23,7 +23,7 @@ com.google.guava guava - ${google.guava.version} + 15.0 com.google.code.gson diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/clustering/BlacklistAwareClusteringCombiner.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/clustering/BlacklistAwareClusteringCombiner.java index b007853..4ecedc4 100644 --- a/dnet-pace-core/src/main/java/eu/dnetlib/pace/clustering/BlacklistAwareClusteringCombiner.java +++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/clustering/BlacklistAwareClusteringCombiner.java @@ -22,9 +22,12 @@ public class BlacklistAwareClusteringCombiner extends ClusteringCombiner { private static final Log log = LogFactory.getLog(BlacklistAwareClusteringCombiner.class); - public static Collection filterAndCombine(final MapDocument a, final Config conf, final Map> blacklists) { - final Document filtered = new BlacklistAwareClusteringCombiner().filter(a, blacklists); + + + public static Collection filterAndCombine(final MapDocument a, final Config conf) { + + final Document filtered = new BlacklistAwareClusteringCombiner().filter(a, conf.blacklists()); return combine(filtered, conf); } diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/DedupConfig.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/DedupConfig.java index 5116f36..eb04184 100644 --- a/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/DedupConfig.java +++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/config/DedupConfig.java @@ -1,6 +1,7 @@ package eu.dnetlib.pace.config; import java.io.IOException; +import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -19,7 +20,7 @@ import eu.dnetlib.pace.model.FieldDef; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -public class DedupConfig implements Config { +public class DedupConfig implements Config, Serializable { private static final Log log = LogFactory.getLog(DedupConfig.class); diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/Field.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/Field.java index 9c97ce3..4b7a73e 100644 --- a/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/Field.java +++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/Field.java @@ -2,10 +2,12 @@ package eu.dnetlib.pace.model; import eu.dnetlib.pace.config.Type; +import java.io.Serializable; + /** * The Interface Field. */ -public interface Field extends Iterable { +public interface Field extends Iterable, Serializable { /** * Gets the name. diff --git a/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/MapDocument.java b/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/MapDocument.java index 74935de..77b7c12 100644 --- a/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/MapDocument.java +++ b/dnet-pace-core/src/main/java/eu/dnetlib/pace/model/MapDocument.java @@ -1,5 +1,6 @@ package eu.dnetlib.pace.model; +import java.io.Serializable; import java.util.Map; import java.util.Set; @@ -10,7 +11,7 @@ import com.google.common.collect.Maps; /** * The Class MapDocument. */ -public class MapDocument implements Document { +public class MapDocument implements Document, Serializable { /** The identifier. */ private String identifier;