From 2393d9da2f376890cd9fa62936aaef97b6580c8e Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 26 Jun 2020 11:20:45 +0200 Subject: [PATCH 1/4] limits --- .../dhp/broker/oa/matchers/UpdateMatcher.java | 33 ++++++++++--------- .../AbstractEnrichMissingDataset.java | 2 +- .../relatedProjects/EnrichMissingProject.java | 2 +- .../relatedProjects/EnrichMoreProject.java | 2 +- .../AbstractEnrichMissingPublication.java | 2 +- .../EnrichMissingSoftware.java | 2 +- .../relatedSoftware/EnrichMoreSoftware.java | 2 +- .../simple/EnrichMissingAbstract.java | 2 +- .../simple/EnrichMissingAuthorOrcid.java | 2 +- .../simple/EnrichMissingOpenAccess.java | 2 +- .../oa/matchers/simple/EnrichMissingPid.java | 2 +- .../simple/EnrichMissingPublicationDate.java | 2 +- .../matchers/simple/EnrichMissingSubject.java | 2 +- .../matchers/simple/EnrichMoreOpenAccess.java | 2 +- .../oa/matchers/simple/EnrichMorePid.java | 2 +- .../oa/matchers/simple/EnrichMoreSubject.java | 2 +- .../dhp/broker/oa/util/EventFinder.java | 27 ++++++++++----- 17 files changed, 51 insertions(+), 39 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index c0287bda0..7f82f9a2b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -1,13 +1,14 @@ package eu.dnetlib.dhp.broker.oa.matchers; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; @@ -19,15 +20,15 @@ import eu.dnetlib.pace.config.DedupConfig; public abstract class UpdateMatcher { - private final boolean multipleUpdate; + private final int maxNumber; private final Function topicFunction; private final BiConsumer compileHighlightFunction; private final Function highlightToStringFunction; - public UpdateMatcher(final boolean multipleUpdate, final Function topicFunction, + public UpdateMatcher(final int maxNumber, final Function topicFunction, final BiConsumer compileHighlightFunction, final Function highlightToStringFunction) { - this.multipleUpdate = multipleUpdate; + this.maxNumber = maxNumber; this.topicFunction = topicFunction; this.compileHighlightFunction = compileHighlightFunction; this.highlightToStringFunction = highlightToStringFunction; @@ -57,17 +58,19 @@ public abstract class UpdateMatcher { } } - final Collection> values = infoMap.values(); + final List> values = infoMap + .values() + .stream() + .sorted((o1, o2) -> Float.compare(o2.getTrust(), o1.getTrust())) // DESCENDING + .collect(Collectors.toList()); - if (values.isEmpty() || multipleUpdate) { - return values; + if (values.isEmpty()) { + return new ArrayList<>(); + } else if (values.size() > maxNumber) { + System.err.println("Too many events (" + values.size() + ") matched by " + getClass().getSimpleName()); + return values.subList(0, maxNumber); } else { - final UpdateInfo v = values - .stream() - .sorted((o1, o2) -> Float.compare(o1.getTrust(), o2.getTrust())) - .findFirst() - .get(); - return Arrays.asList(v); + return values; } } @@ -81,8 +84,8 @@ public abstract class UpdateMatcher { return StringUtils.isBlank(field); } - public boolean isMultipleUpdate() { - return multipleUpdate; + public int getMaxNumber() { + return maxNumber; } public Function getTopicFunction() { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java index c8b93596a..f21c1c7b3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java @@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public abstract class AbstractEnrichMissingDataset extends UpdateMatcher { public AbstractEnrichMissingDataset(final Topic topic) { - super(true, + super(10, rel -> topic, (p, rel) -> p.getDatasets().add(rel), rel -> rel.getOpenaireId()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java index 49c546bba..4b563d381 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java @@ -12,7 +12,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMissingProject extends UpdateMatcher { public EnrichMissingProject() { - super(true, + super(20, prj -> Topic.ENRICH_MISSING_PROJECT, (p, prj) -> p.getProjects().add(prj), prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java index 6954a3fb5..85b2cbe28 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java @@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMoreProject extends UpdateMatcher { public EnrichMoreProject() { - super(true, + super(20, prj -> Topic.ENRICH_MORE_PROJECT, (p, prj) -> p.getProjects().add(prj), prj -> projectAsString(prj)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java index cc4f68f87..f951131b1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java @@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public abstract class AbstractEnrichMissingPublication extends UpdateMatcher { public AbstractEnrichMissingPublication(final Topic topic) { - super(true, + super(10, rel -> topic, (p, rel) -> p.getPublications().add(rel), rel -> rel.getOpenaireId()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java index d01f0c370..a638024bc 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java @@ -13,7 +13,7 @@ public class EnrichMissingSoftware extends UpdateMatcher { public EnrichMissingSoftware() { - super(true, + super(10, s -> Topic.ENRICH_MISSING_SOFTWARE, (p, s) -> p.getSoftwares().add(s), s -> s.getOpenaireId()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java index a612b6074..2bc370187 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java @@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMoreSoftware extends UpdateMatcher { public EnrichMoreSoftware() { - super(true, + super(10, s -> Topic.ENRICH_MORE_SOFTWARE, (p, s) -> p.getSoftwares().add(s), s -> s.getOpenaireId()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java index 73462bae8..b61696e45 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java @@ -12,7 +12,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMissingAbstract extends UpdateMatcher { public EnrichMissingAbstract() { - super(false, + super(1, s -> Topic.ENRICH_MISSING_ABSTRACT, (p, s) -> p.getAbstracts().add(s), s -> s); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java index 2a01188a9..7bbc43fe3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java @@ -15,7 +15,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMissingAuthorOrcid extends UpdateMatcher { public EnrichMissingAuthorOrcid() { - super(true, + super(40, aut -> Topic.ENRICH_MISSING_AUTHOR_ORCID, (p, aut) -> p.getCreators().add(aut), aut -> aut.getOrcid()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java index 487382957..41a00dcd1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java @@ -14,7 +14,7 @@ import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; public class EnrichMissingOpenAccess extends UpdateMatcher { public EnrichMissingOpenAccess() { - super(true, + super(20, i -> Topic.ENRICH_MISSING_OA_VERSION, (p, i) -> p.getInstances().add(i), OaBrokerInstance::getUrl); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java index ee1617b1e..4863bdeb7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java @@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMissingPid extends UpdateMatcher { public EnrichMissingPid() { - super(true, + super(10, pid -> Topic.ENRICH_MISSING_PID, (p, pid) -> p.getPids().add(pid), pid -> pid.getType() + "::" + pid.getValue()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java index 2c0533fa3..e7b65dad8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java @@ -12,7 +12,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMissingPublicationDate extends UpdateMatcher { public EnrichMissingPublicationDate() { - super(false, + super(1, date -> Topic.ENRICH_MISSING_PUBLICATION_DATE, (p, date) -> p.setPublicationdate(date), s -> s); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java index 9ab9fce48..f762e3f52 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java @@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMissingSubject extends UpdateMatcher { public EnrichMissingSubject() { - super(true, + super(20, s -> Topic.fromPath("ENRICH/MISSING/SUBJECT/" + s.getType()), (p, s) -> p.getSubjects().add(s), s -> subjectAsString(s)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java index e90a8f201..9ce362a97 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java @@ -14,7 +14,7 @@ import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; public class EnrichMoreOpenAccess extends UpdateMatcher { public EnrichMoreOpenAccess() { - super(true, + super(20, i -> Topic.ENRICH_MORE_OA_VERSION, (p, i) -> p.getInstances().add(i), OaBrokerInstance::getUrl); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java index 43b4f0628..583960037 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java @@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMorePid extends UpdateMatcher { public EnrichMorePid() { - super(true, + super(20, pid -> Topic.ENRICH_MORE_PID, (p, pid) -> p.getPids().add(pid), pid -> pidAsString(pid)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java index 97b289b69..150029462 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java @@ -13,7 +13,7 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; public class EnrichMoreSubject extends UpdateMatcher { public EnrichMoreSubject() { - super(true, + super(20, s -> Topic.fromPath("ENRICH/MORE/SUBJECT/" + s.getType()), (p, s) -> p.getSubjects().add(s), s -> subjectAsString(s)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index 1a3f514e8..e142b5904 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -7,7 +7,16 @@ import java.util.List; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.EventFactory; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.pace.config.DedupConfig; @@ -16,17 +25,17 @@ public class EventFinder { private static List> matchers = new ArrayList<>(); static { matchers.add(new EnrichMissingAbstract()); - // matchers.add(new EnrichMissingAuthorOrcid()); - // matchers.add(new EnrichMissingOpenAccess()); - // matchers.add(new EnrichMissingPid()); - // matchers.add(new EnrichMissingPublicationDate()); - // matchers.add(new EnrichMissingSubject()); - // matchers.add(new EnrichMoreOpenAccess()); - // matchers.add(new EnrichMorePid()); - // matchers.add(new EnrichMoreSubject()); + matchers.add(new EnrichMissingAuthorOrcid()); + matchers.add(new EnrichMissingOpenAccess()); + matchers.add(new EnrichMissingPid()); + matchers.add(new EnrichMissingPublicationDate()); + matchers.add(new EnrichMissingSubject()); + matchers.add(new EnrichMoreOpenAccess()); + matchers.add(new EnrichMorePid()); + matchers.add(new EnrichMoreSubject()); // // Advanced matchers - // matchers.add(new EnrichMissingProject()); + matchers.add(new EnrichMissingProject()); // matchers.add(new EnrichMoreProject()); // matchers.add(new EnrichMissingSoftware()); // matchers.add(new EnrichMoreSoftware()); From 8d59fdf34eaa438d0362bd3393b0755570ee7b9d Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 26 Jun 2020 14:32:58 +0200 Subject: [PATCH 2/4] WIP: dataset based PrepareRelationsJob --- .../CreateRelatedEntitiesJob_phase2.java | 17 --- .../dhp/oa/provision/PrepareRelationsJob.java | 128 ++++++++++++++++-- .../dhp/oa/provision/RelationComparator.java | 43 ++++++ .../dhp/oa/provision/RelationList.java | 25 ++++ .../dhp/oa/provision/SortableRelation.java | 80 +++++++++++ .../model/ProvisionModelSupport.java | 7 +- .../provision/model/RelatedEntityWrapper.java | 4 - .../dhp/oa/provision/model/TypedRow.java | 64 --------- .../oa/provision/utils/XmlRecordFactory.java | 4 - 9 files changed, 271 insertions(+), 101 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationComparator.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationList.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SortableRelation.java delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 5ef30d6e1..bfcc648a3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -19,7 +19,6 @@ import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; @@ -28,8 +27,6 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; -import eu.dnetlib.dhp.oa.provision.model.TypedRow; -import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -305,20 +302,6 @@ public class CreateRelatedEntitiesJob_phase2 { private static FilterFunction filterEmptyEntityFn() { return (FilterFunction) v -> Objects.nonNull(v.getEntity()); - /* - * return (FilterFunction) v -> Optional .ofNullable(v.getEntity()) .map(e -> - * StringUtils.isNotBlank(e.getId())) .orElse(false); - */ - } - - private static TypedRow getTypedRow(String type, OafEntity entity) - throws JsonProcessingException { - TypedRow t = new TypedRow(); - t.setType(type); - t.setDeleted(entity.getDataInfo().getDeletedbyinference()); - t.setId(entity.getId()); - t.setOaf(OBJECT_MAPPER.writeValueAsString(entity)); - return t; } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 19823120c..bf9806787 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -3,28 +3,33 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; +import java.util.*; +import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -102,13 +107,15 @@ public class PrepareRelationsJob { log.info("maxRelations: {}", maxRelations); SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareRelationsRDD( + prepareRelationsDataset( spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); }); } @@ -125,9 +132,8 @@ public class PrepareRelationsJob { * @param maxRelations maximum number of allowed outgoing edges * @param relPartitions number of partitions for the output RDD */ - private static void prepareRelationsRDD( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, - int relPartitions) { + private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, + Set relationFilter, int maxRelations, int relPartitions) { // group by SOURCE and apply limit RDD bySource = readPathRelationRDD(spark, inputRelationsPath) @@ -163,6 +169,108 @@ public class PrepareRelationsJob { .parquet(outputPath); } + private static void prepareRelationsDataset( + SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, + int relPartitions) { + + Dataset bySource = pruneRelations( + spark, inputRelationsPath, relationFilter, maxRelations, relPartitions, + (Function) r -> r.getSource()); + Dataset byTarget = pruneRelations( + spark, inputRelationsPath, relationFilter, maxRelations, relPartitions, + (Function) r -> r.getTarget()); + + bySource + .union(byTarget) + .repartition(relPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + private static Dataset pruneRelations(SparkSession spark, String inputRelationsPath, + Set relationFilter, int maxRelations, int relPartitions, + Function idFn) { + return readRelations(spark, inputRelationsPath, relationFilter, relPartitions) + .groupByKey( + (MapFunction) r -> idFn.call(r), + Encoders.STRING()) + .agg(new RelationAggregator(maxRelations).toColumn()) + .flatMap( + (FlatMapFunction, Relation>) t -> t + ._2() + .getRelations() + .iterator(), + Encoders.bean(Relation.class)); + } + + private static Dataset readRelations(SparkSession spark, String inputRelationsPath, + Set relationFilter, int relPartitions) { + return spark + .read() + .textFile(inputRelationsPath) + .repartition(relPartitions) + .map( + (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), + Encoders.kryo(Relation.class)) + .filter((FilterFunction) rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false); + } + + public static class RelationAggregator + extends Aggregator { + + private int maxRelations; + + public RelationAggregator(int maxRelations) { + this.maxRelations = maxRelations; + } + + @Override + public RelationList zero() { + return new RelationList(); + } + + @Override + public RelationList reduce(RelationList b, Relation a) { + b.getRelations().add(a); + return getSortableRelationList(b); + } + + @Override + public RelationList merge(RelationList b1, RelationList b2) { + b1.getRelations().addAll(b2.getRelations()); + return getSortableRelationList(b1); + } + + @Override + public RelationList finish(RelationList r) { + return getSortableRelationList(r); + } + + private RelationList getSortableRelationList(RelationList b1) { + RelationList sr = new RelationList(); + sr + .setRelations( + b1 + .getRelations() + .stream() + .limit(maxRelations) + .collect(Collectors.toCollection(() -> new PriorityQueue<>(new RelationComparator())))); + return sr; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(RelationList.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(RelationList.class); + } + } + /** * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text * file, diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationComparator.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationComparator.java new file mode 100644 index 000000000..f2209c26c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationComparator.java @@ -0,0 +1,43 @@ + +package eu.dnetlib.dhp.oa.provision; + +import java.util.Comparator; +import java.util.Map; +import java.util.Optional; + +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class RelationComparator implements Comparator { + + private static final Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("review", 2); + weights.put("citation", 3); + weights.put("affiliation", 4); + weights.put("relationship", 5); + weights.put("publicationDataset", 6); + weights.put("similarity", 7); + + weights.put("provision", 8); + weights.put("participation", 9); + weights.put("dedup", 10); + } + + private Integer getWeight(Relation o) { + return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE); + } + + @Override + public int compare(Relation o1, Relation o2) { + return ComparisonChain + .start() + .compare(getWeight(o1), getWeight(o2)) + .result(); + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationList.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationList.java new file mode 100644 index 000000000..6e5fd7dba --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationList.java @@ -0,0 +1,25 @@ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.Serializable; +import java.util.PriorityQueue; +import java.util.Queue; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class RelationList implements Serializable { + + private Queue relations; + + public RelationList() { + this.relations = new PriorityQueue<>(new RelationComparator()); + } + + public Queue getRelations() { + return relations; + } + + public void setRelations(Queue relations) { + this.relations = relations; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SortableRelation.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SortableRelation.java new file mode 100644 index 000000000..8ce92a6a0 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SortableRelation.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.Serializable; +import java.util.Map; +import java.util.Optional; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class SortableRelation extends Relation implements Comparable, Serializable { + + private static final Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("review", 2); + weights.put("citation", 3); + weights.put("affiliation", 4); + weights.put("relationship", 5); + weights.put("publicationDataset", 6); + weights.put("similarity", 7); + + weights.put("provision", 8); + weights.put("participation", 9); + weights.put("dedup", 10); + } + + private static final long serialVersionUID = 34753984579L; + + private String groupingKey; + + public static SortableRelation create(Relation r, String groupingKey) { + SortableRelation sr = new SortableRelation(); + sr.setGroupingKey(groupingKey); + sr.setSource(r.getSource()); + sr.setTarget(r.getTarget()); + sr.setRelType(r.getRelType()); + sr.setSubRelType(r.getSubRelType()); + sr.setRelClass(r.getRelClass()); + sr.setDataInfo(r.getDataInfo()); + sr.setCollectedfrom(r.getCollectedfrom()); + sr.setLastupdatetimestamp(r.getLastupdatetimestamp()); + sr.setProperties(r.getProperties()); + sr.setValidated(r.getValidated()); + sr.setValidationDate(r.getValidationDate()); + + return sr; + } + + @JsonIgnore + public Relation asRelation() { + return this; + } + + @Override + public int compareTo(SortableRelation o) { + return ComparisonChain + .start() + .compare(getGroupingKey(), o.getGroupingKey()) + .compare(getWeight(this), getWeight(o)) + .result(); + } + + private Integer getWeight(SortableRelation o) { + return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE); + } + + public String getGroupingKey() { + return groupingKey; + } + + public void setGroupingKey(String groupingKey) { + this.groupingKey = groupingKey; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index 051fe923d..c09ed86e5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -5,6 +5,8 @@ import java.util.List; import com.google.common.collect.Lists; +import eu.dnetlib.dhp.oa.provision.RelationList; +import eu.dnetlib.dhp.oa.provision.SortableRelation; import eu.dnetlib.dhp.schema.common.ModelSupport; public class ProvisionModelSupport { @@ -15,11 +17,12 @@ public class ProvisionModelSupport { .addAll( Lists .newArrayList( - TypedRow.class, RelatedEntityWrapper.class, JoinedEntity.class, RelatedEntity.class, - SortableRelationKey.class)); + SortableRelationKey.class, + SortableRelation.class, + RelationList.class)); return modelClasses.toArray(new Class[] {}); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java index cbb143ee2..4a4a4a5be 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java @@ -16,10 +16,6 @@ public class RelatedEntityWrapper implements Serializable { } public RelatedEntityWrapper(Relation relation, RelatedEntity target) { - this(null, relation, target); - } - - public RelatedEntityWrapper(TypedRow entity, Relation relation, RelatedEntity target) { this.relation = relation; this.target = target; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java deleted file mode 100644 index cbec372e4..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java +++ /dev/null @@ -1,64 +0,0 @@ - -package eu.dnetlib.dhp.oa.provision.model; - -import java.io.Serializable; - -import com.google.common.base.Objects; - -public class TypedRow implements Serializable { - - private String id; - - private Boolean deleted; - - private String type; - - private String oaf; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public Boolean getDeleted() { - return deleted; - } - - public void setDeleted(Boolean deleted) { - this.deleted = deleted; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getOaf() { - return oaf; - } - - public void setOaf(String oaf) { - this.oaf = oaf; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - TypedRow typedRow2 = (TypedRow) o; - return Objects.equal(id, typedRow2.id); - } - - @Override - public int hashCode() { - return Objects.hashCode(id); - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index 5d8d9fa20..db9a68d3d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -121,10 +121,6 @@ public class XmlRecordFactory implements Serializable { } } - private static OafEntity toOafEntity(TypedRow typedRow) { - return parseOaf(typedRow.getOaf(), typedRow.getType()); - } - private static OafEntity parseOaf(final String json, final String type) { try { switch (EntityType.valueOf(type)) { From 7817338e0510da3c798f8076625f6962ee207b87 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 26 Jun 2020 17:58:33 +0200 Subject: [PATCH 3/4] added test to verify the relation pre-processing --- .../dhp/oa/provision/PrepareRelationsJob.java | 68 ++++--------- .../dhp/oa/provision/oozie_app/workflow.xml | 1 + .../oa/provision/PrepareRelationsJobTest.java | 93 ++++++++++++++++++ .../eu/dnetlib/dhp/oa/provision/relations.gz | Bin 0 -> 681 bytes .../src/test/resources/log4j.properties | 11 +++ 5 files changed, 122 insertions(+), 51 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.gz create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/log4j.properties diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index bf9806787..601cf6449 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -115,7 +115,7 @@ public class PrepareRelationsJob { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareRelationsDataset( + prepareRelationsRDD( spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); }); } @@ -148,21 +148,8 @@ public class PrepareRelationsJob { .map(Tuple2::_2) .rdd(); - // group by TARGET and apply limit - RDD byTarget = readPathRelationRDD(spark, inputRelationsPath) - .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) - .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r)) - .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) - .groupBy(Tuple2::_1) - .map(Tuple2::_2) - .map(t -> Iterables.limit(t, maxRelations)) - .flatMap(Iterable::iterator) - .map(Tuple2::_2) - .rdd(); - spark - .createDataset(bySource.union(byTarget), Encoders.bean(Relation.class)) + .createDataset(bySource, Encoders.bean(Relation.class)) .repartition(relPartitions) .write() .mode(SaveMode.Overwrite) @@ -172,41 +159,7 @@ public class PrepareRelationsJob { private static void prepareRelationsDataset( SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, int relPartitions) { - - Dataset bySource = pruneRelations( - spark, inputRelationsPath, relationFilter, maxRelations, relPartitions, - (Function) r -> r.getSource()); - Dataset byTarget = pruneRelations( - spark, inputRelationsPath, relationFilter, maxRelations, relPartitions, - (Function) r -> r.getTarget()); - - bySource - .union(byTarget) - .repartition(relPartitions) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - - private static Dataset pruneRelations(SparkSession spark, String inputRelationsPath, - Set relationFilter, int maxRelations, int relPartitions, - Function idFn) { - return readRelations(spark, inputRelationsPath, relationFilter, relPartitions) - .groupByKey( - (MapFunction) r -> idFn.call(r), - Encoders.STRING()) - .agg(new RelationAggregator(maxRelations).toColumn()) - .flatMap( - (FlatMapFunction, Relation>) t -> t - ._2() - .getRelations() - .iterator(), - Encoders.bean(Relation.class)); - } - - private static Dataset readRelations(SparkSession spark, String inputRelationsPath, - Set relationFilter, int relPartitions) { - return spark + spark .read() .textFile(inputRelationsPath) .repartition(relPartitions) @@ -214,7 +167,20 @@ public class PrepareRelationsJob { (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), Encoders.kryo(Relation.class)) .filter((FilterFunction) rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false); + .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false) + .groupByKey( + (MapFunction) Relation::getSource, + Encoders.STRING()) + .agg(new RelationAggregator(maxRelations).toColumn()) + .flatMap( + (FlatMapFunction, Relation>) t -> Iterables + .limit(t._2().getRelations(), maxRelations) + .iterator(), + Encoders.bean(Relation.class)) + .repartition(relPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); } public static class RelationAggregator diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 0d5121cf1..697a00a09 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -133,6 +133,7 @@ --inputRelationsPath${inputGraphRootPath}/relation --outputPath${workingDir}/relation + --maxRelations${maxRelations} --relPartitions5000 diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java new file mode 100644 index 000000000..c16bbc6fb --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java @@ -0,0 +1,93 @@ + +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class PrepareRelationsJobTest { + + private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJobTest.class); + + public static final String SUBRELTYPE = "subRelType"; + public static final String OUTCOME = "outcome"; + public static final String SUPPLEMENT = "supplement"; + + private static SparkSession spark; + + private static Path workingDir; + + @BeforeAll + public static void setUp() throws IOException { + workingDir = Files.createTempDirectory(PrepareRelationsJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); + + spark = SparkSession + .builder() + .appName(PrepareRelationsJobTest.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testRunPrepareRelationsJob(@TempDir Path testPath) throws Exception { + + final int maxRelations = 10; + PrepareRelationsJob + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputRelationsPath", getClass().getResource("relations.gz").getPath(), + "-outputPath", testPath.toString(), + "-relPartitions", "10", + "-relationFilter", "asd", + "-maxRelations", String.valueOf(maxRelations) + }); + + Dataset out = spark.read() + .parquet(testPath.toString()) + .as(Encoders.bean(Relation.class)) + .cache(); + + Assertions.assertEquals(10, out.count()); + + Dataset freq = out.toDF().cube(SUBRELTYPE).count().filter((FilterFunction) value -> !value.isNullAt(0)); + long outcome = freq.filter(freq.col(SUBRELTYPE).equalTo(OUTCOME)).collectAsList().get(0).getAs("count"); + long supplement = freq.filter(freq.col(SUBRELTYPE).equalTo(SUPPLEMENT)).collectAsList().get(0).getAs("count"); + + Assertions.assertTrue(outcome > supplement); + Assertions.assertEquals(7, outcome); + Assertions.assertEquals(3, supplement); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.gz b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.gz new file mode 100644 index 0000000000000000000000000000000000000000..13bc01c8c37f15ce901e259b2510c7495cd9316d GIT binary patch literal 681 zcmV;a0#^MWiwFpQ^YvZ;19D|-VRUJ4ZgT+b+cA&RFc`-1olkM{jL=<%FdxQ~3_8E7O|s)qCN@sT zXqQ)2oTKgi7}CLnF~(Qzejj8=HqAkvZA@WSd6x3Rp!;QJ^`yf|V&2&DB$^lLVP9_Kh12Bou}HFgURC!#KYrNko9%A5-!!{z zbNQEELb4LC2N}eDUN*hVr7qXsXD`>wI8LD)r)tXObo_Q3Fiy#gd8^BQba8`WRvKDIfV=!R*u^F)am<`x|EC*~q z#sjt=`(fRFd2GRpFV-_4Y6N6KfCDlizyaA1;DC$>a6ncBI3P1(-GF{NI%|sIm+TRR z-$Pf-$fT%(kzr8 Date: Mon, 29 Jun 2020 08:43:56 +0200 Subject: [PATCH 4/4] all events matchers --- .../dhp/broker/oa/util/EventFinder.java | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index e142b5904..6dfca4fcb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -7,7 +7,20 @@ import java.util.List; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.EventFactory; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy; +import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo; +import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy; +import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo; +import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences; import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject; +import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject; +import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy; +import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo; +import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy; +import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo; +import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences; +import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware; +import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess; @@ -36,20 +49,20 @@ public class EventFinder { // // Advanced matchers matchers.add(new EnrichMissingProject()); - // matchers.add(new EnrichMoreProject()); - // matchers.add(new EnrichMissingSoftware()); - // matchers.add(new EnrichMoreSoftware()); - // matchers.add(new EnrichMissingPublicationIsRelatedTo()); - // matchers.add(new EnrichMissingPublicationIsReferencedBy()); - // matchers.add(new EnrichMissingPublicationReferences()); - // matchers.add(new EnrichMissingPublicationIsSupplementedTo()); - // matchers.add(new EnrichMissingPublicationIsSupplementedBy()); - // matchers.add(new EnrichMissingDatasetIsRelatedTo()); - // matchers.add(new EnrichMissingDatasetIsReferencedBy()); - // matchers.add(new EnrichMissingDatasetReferences()); - // matchers.add(new EnrichMissingDatasetIsSupplementedTo()); - // matchers.add(new EnrichMissingDatasetIsSupplementedBy()); - // matchers.add(new EnrichMissingAbstract()); + matchers.add(new EnrichMoreProject()); + matchers.add(new EnrichMissingSoftware()); + matchers.add(new EnrichMoreSoftware()); + matchers.add(new EnrichMissingPublicationIsRelatedTo()); + matchers.add(new EnrichMissingPublicationIsReferencedBy()); + matchers.add(new EnrichMissingPublicationReferences()); + matchers.add(new EnrichMissingPublicationIsSupplementedTo()); + matchers.add(new EnrichMissingPublicationIsSupplementedBy()); + matchers.add(new EnrichMissingDatasetIsRelatedTo()); + matchers.add(new EnrichMissingDatasetIsReferencedBy()); + matchers.add(new EnrichMissingDatasetReferences()); + matchers.add(new EnrichMissingDatasetIsSupplementedTo()); + matchers.add(new EnrichMissingDatasetIsSupplementedBy()); + matchers.add(new EnrichMissingAbstract()); } public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) {