From 04fdcacd837957a44813c592ba0bf528b68d7946 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 11 Jun 2020 11:25:18 +0200 Subject: [PATCH] results with all joined entities --- .../dhp/broker/model/EventFactory.java | 6 +- .../broker/oa/GenerateEventsApplication.java | 303 ++++-------------- .../dhp/broker/oa/matchers/UpdateMatcher.java | 11 +- .../AbstractEnrichMissingDataset.java | 28 +- .../EnrichMissingDatasetIsReferencedBy.java | 5 + .../EnrichMissingDatasetIsRelatedTo.java | 5 + .../EnrichMissingDatasetIsSupplementedBy.java | 5 + .../EnrichMissingDatasetIsSupplementedTo.java | 5 + .../EnrichMissingDatasetReferences.java | 5 + .../relatedProjects/EnrichMissingProject.java | 23 +- .../relatedProjects/EnrichMoreProject.java | 23 +- .../AbstractEnrichMissingPublication.java | 27 +- ...nrichMissingPublicationIsReferencedBy.java | 4 + .../EnrichMissingPublicationIsRelatedTo.java | 5 + ...ichMissingPublicationIsSupplementedBy.java | 4 + ...ichMissingPublicationIsSupplementedTo.java | 5 + .../EnrichMissingPublicationReferences.java | 5 + .../EnrichMissingSoftware.java | 23 +- .../relatedSoftware/EnrichMoreSoftware.java | 23 +- .../simple/EnrichMissingAbstract.java | 17 +- .../simple/EnrichMissingAuthorOrcid.java | 11 +- .../simple/EnrichMissingOpenAccess.java | 13 +- .../oa/matchers/simple/EnrichMissingPid.java | 14 +- .../simple/EnrichMissingPublicationDate.java | 18 +- .../matchers/simple/EnrichMissingSubject.java | 13 +- .../matchers/simple/EnrichMoreOpenAccess.java | 13 +- .../oa/matchers/simple/EnrichMorePid.java | 13 +- .../oa/matchers/simple/EnrichMoreSubject.java | 13 +- .../dhp/broker/oa/util/EventFinder.java | 86 +++++ .../dhp/broker/oa/util/UpdateInfo.java | 20 +- .../simple}/ResultAggregator.java | 8 +- .../{ => aggregators/simple}/ResultGroup.java | 10 +- .../aggregators/withRels/RelatedDataset.java | 30 ++ .../withRels/RelatedEntityFactory.java | 28 ++ .../aggregators/withRels/RelatedProject.java | 30 ++ .../withRels/RelatedPublication.java | 30 ++ .../aggregators/withRels/RelatedSoftware.java | 30 ++ .../withRels/ResultWithRelations.java | 55 ++++ .../ResultWithRelationsAggregator.java | 68 ++++ 39 files changed, 650 insertions(+), 385 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java rename dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/{ => aggregators/simple}/ResultAggregator.java (75%) rename dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/{ => aggregators/simple}/ResultGroup.java (58%) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelations.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelationsAggregator.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index df33fae0da..9146cf422f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -37,7 +37,7 @@ public class EventFactory { final Map map = createMapFromResult(updateInfo); final String eventId = calculateEventId( - updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), + updateInfo.getTopicPath(), updateInfo.getTarget().getResult().getOriginalId().get(0), updateInfo.getHighlightValueAsString()); res.setEventId(eventId); @@ -54,8 +54,8 @@ public class EventFactory { private static Map createMapFromResult(final UpdateInfo updateInfo) { final Map map = new HashMap<>(); - final Result source = updateInfo.getSource(); - final Result target = updateInfo.getTarget(); + final Result source = updateInfo.getSource().getResult(); + final Result target = updateInfo.getTarget().getResult(); final List collectedFrom = target.getCollectedfrom(); if (collectedFrom.size() == 1) { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index ecf4e3eff4..a097671927 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -3,15 +3,9 @@ package eu.dnetlib.dhp.broker.oa; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -26,38 +20,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; -import eu.dnetlib.dhp.broker.model.EventFactory; -import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy; -import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo; -import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy; -import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo; -import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences; -import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject; -import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject; -import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy; -import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo; -import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy; -import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo; -import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences; -import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware; -import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; +import eu.dnetlib.dhp.broker.oa.util.EventFinder; import eu.dnetlib.dhp.broker.oa.util.EventGroup; -import eu.dnetlib.dhp.broker.oa.util.ResultAggregator; -import eu.dnetlib.dhp.broker.oa.util.ResultGroup; -import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; +import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; +import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelationsAggregator; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -72,40 +44,6 @@ public class GenerateEventsApplication { private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class); - // Simple Matchers - private static final UpdateMatcher enrichMissingAbstract = new EnrichMissingAbstract(); - private static final UpdateMatcher enrichMissingAuthorOrcid = new EnrichMissingAuthorOrcid(); - private static final UpdateMatcher enrichMissingOpenAccess = new EnrichMissingOpenAccess(); - private static final UpdateMatcher enrichMissingPid = new EnrichMissingPid(); - private static final UpdateMatcher enrichMissingPublicationDate = new EnrichMissingPublicationDate(); - private static final UpdateMatcher enrichMissingSubject = new EnrichMissingSubject(); - private static final UpdateMatcher enrichMoreOpenAccess = new EnrichMoreOpenAccess(); - private static final UpdateMatcher enrichMorePid = new EnrichMorePid(); - private static final UpdateMatcher enrichMoreSubject = new EnrichMoreSubject(); - - // Advanced matchers - private static final UpdateMatcher>, ?> enrichMissingProject = new EnrichMissingProject(); - private static final UpdateMatcher>, ?> enrichMoreProject = new EnrichMoreProject(); - - private static final UpdateMatcher>, ?> enrichMissingSoftware = new EnrichMissingSoftware(); - private static final UpdateMatcher>, ?> enrichMoreSoftware = new EnrichMoreSoftware(); - - private static final UpdateMatcher>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo(); - private static final UpdateMatcher>, ?> enrichMissingPublicationIsReferencedBy = new EnrichMissingPublicationIsReferencedBy(); - private static final UpdateMatcher>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences(); - private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedTo = new EnrichMissingPublicationIsSupplementedTo(); - private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedBy = new EnrichMissingPublicationIsSupplementedBy(); - - private static final UpdateMatcher>, ?> enrichMisissingDatasetIsRelatedTo = new EnrichMissingDatasetIsRelatedTo(); - private static final UpdateMatcher>, ?> enrichMissingDatasetIsReferencedBy = new EnrichMissingDatasetIsReferencedBy(); - private static final UpdateMatcher>, ?> enrichMissingDatasetReferences = new EnrichMissingDatasetReferences(); - private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedTo = new EnrichMissingDatasetIsSupplementedTo(); - private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedBy = new EnrichMissingDatasetIsSupplementedBy(); - - // Aggregators - private static final TypedColumn, ResultGroup> resultAggrTypedColumn = new ResultAggregator() - .toColumn(); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(final String[] args) throws Exception { @@ -145,14 +83,10 @@ public class GenerateEventsApplication { final Dataset all = spark.emptyDataset(Encoders.kryo(Event.class)); for (final Class r1 : BrokerConstants.RESULT_CLASSES) { - all.union(generateSimpleEvents(spark, graphPath, r1, dedupConfig)); - - for (final Class r2 : BrokerConstants.RESULT_CLASSES) { - all.union(generateRelationEvents(spark, graphPath, r1, r2, dedupConfig)); - } + all.union(generateEvents(spark, graphPath, r1, dedupConfig)); } - all.write().mode(SaveMode.Overwrite).json(eventsPath); + all.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(eventsPath); }); } @@ -161,203 +95,83 @@ public class GenerateEventsApplication { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static Dataset generateSimpleEvents(final SparkSession spark, + private static Dataset generateEvents( + final SparkSession spark, final String graphPath, - final Class resultClazz, + final Class sourceClass, final DedupConfig dedupConfig) { - final Dataset results = readPath( - spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), Result.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()); + final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass); final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class) .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); + final TypedColumn, ResultGroup> aggr = new ResultAggregator() + .toColumn(); + return results - .joinWith(mergedRels, results.col("id").equalTo(mergedRels.col("source")), "inner") - .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) - .agg(resultAggrTypedColumn) + .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner") + .groupByKey( + (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .filter(ResultGroup::isValid) .map( - (MapFunction) g -> GenerateEventsApplication - .generateSimpleEvents(g, dedupConfig), + (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } - private static EventGroup generateSimpleEvents(final ResultGroup results, final DedupConfig dedupConfig) { - final List> list = new ArrayList<>(); - - for (final Result target : results.getData()) { - list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, results.getData(), dedupConfig)); - list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, results.getData(), dedupConfig)); - list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, results.getData(), dedupConfig)); - list.addAll(enrichMissingPid.searchUpdatesForRecord(target, results.getData(), dedupConfig)); - list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, results.getData(), dedupConfig)); - list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, results.getData(), dedupConfig)); - list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, results.getData(), dedupConfig)); - list.addAll(enrichMorePid.searchUpdatesForRecord(target, results.getData(), dedupConfig)); - list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, results.getData(), dedupConfig)); - } - - final EventGroup events = new EventGroup(); - list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement); - return events; - } - - private static Dataset generateRelationEvents( + private static Dataset expandResultsWithRelations( final SparkSession spark, final String graphPath, - final Class sourceClass, - final Class targetClass, - final DedupConfig dedupConfig) { - - final Dataset sources = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()); - - final Dataset targets = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass); - - final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class) - .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); + final Class sourceClass) { + final Dataset projects = readPath(spark, graphPath + "/project", Project.class); + final Dataset datasets = readPath( + spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); + final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); - final Dataset duplicates = sources - .joinWith(mergedRels, sources.col("id").equalTo(rels.col("source")), "inner") - .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) - .agg(resultAggrTypedColumn) - .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) - .filter(ResultGroup::isValid); + final Dataset r0 = readPath( + spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(r -> new ResultWithRelations(r), Encoders.kryo(ResultWithRelations.class)); + final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); + final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); + final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); + final Dataset r4 = join( + r3, rels, relatedEntities(publications, rels, RelatedProject.class)); + ; - if (targetClass == Project.class) { - // TODO join using: generateProjectsEvents - } else if (targetClass == Software.class) { - // TODO join using: generateSoftwareEvents - } else if (targetClass == Publication.class) { - // TODO join using: generatePublicationRelatedEvents - } else if (targetClass == eu.dnetlib.dhp.schema.oaf.Dataset.class) { - // TODO join using: generateDatasetRelatedEvents - } - - return null; + return r4; } - private List generateProjectsEvents(final Collection>> childrenWithProjects, - final DedupConfig dedupConfig) { - final List> list = new ArrayList<>(); - - for (final Pair> target : childrenWithProjects) { - list.addAll(enrichMissingProject.searchUpdatesForRecord(target, childrenWithProjects, dedupConfig)); - list.addAll(enrichMoreProject.searchUpdatesForRecord(target, childrenWithProjects, dedupConfig)); - } - - return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); + private static Dataset relatedEntities(final Dataset targets, final Dataset rels, + final Class clazz) { + return rels + .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") + .map( + t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), + Encoders.kryo(clazz)); } - private List generateSoftwareEvents(final Collection>> childrenWithSoftwares, - final DedupConfig dedupConfig) { - final List> list = new ArrayList<>(); + private static Dataset join(final Dataset sources, + final Dataset rels, + final Dataset typedRels) { - for (final Pair> target : childrenWithSoftwares) { - list.addAll(enrichMissingSoftware.searchUpdatesForRecord(target, childrenWithSoftwares, dedupConfig)); - list.addAll(enrichMoreSoftware.searchUpdatesForRecord(target, childrenWithSoftwares, dedupConfig)); - } - return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); - } - - private List generatePublicationRelatedEvents(final String relType, - final Collection>>> childrenWithRels, - final DedupConfig dedupConfig) { - - final List> list = new ArrayList<>(); - - final List>> cleanedChildrens = childrenWithRels - .stream() - .filter(p -> p.getRight().containsKey(relType)) - .map(p -> Pair.of(p.getLeft(), p.getRight().get(relType))) - .filter(p -> p.getRight().size() > 0) - .collect(Collectors.toList()); - - for (final Pair> target : cleanedChildrens) { - if (relType.equals("isRelatedTo")) { - list - .addAll( - enrichMisissingPublicationIsRelatedTo - .searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); - } else if (relType.equals("references")) { - list - .addAll( - enrichMissingPublicationReferences - .searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); - } else if (relType.equals("isReferencedBy")) { - list - .addAll( - enrichMissingPublicationIsReferencedBy - .searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); - } else if (relType.equals("isSupplementedTo")) { - list - .addAll( - enrichMissingPublicationIsSupplementedTo - .searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); - } else if (relType.equals("isSupplementedBy")) { - list - .addAll( - enrichMissingPublicationIsSupplementedBy - .searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); - } - } - - return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); - - } - - private List generateDatasetRelatedEvents(final String relType, - final Collection>>> childrenWithRels, - final DedupConfig dedupConfig) { - - final List> list = new ArrayList<>(); - - final List>> cleanedChildrens = childrenWithRels - .stream() - .filter(p -> p.getRight().containsKey(relType)) - .map(p -> Pair.of(p.getLeft(), p.getRight().get(relType))) - .filter(p -> p.getRight().size() > 0) - .collect(Collectors.toList()); - - for (final Pair> target : cleanedChildrens) { - if (relType.equals("isRelatedTo")) { - list - .addAll( - enrichMisissingDatasetIsRelatedTo - .searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); - } else if (relType.equals("references")) { - list - .addAll( - enrichMissingDatasetReferences.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); - } else if (relType.equals("isReferencedBy")) { - list - .addAll( - enrichMissingDatasetIsReferencedBy - .searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); - } else if (relType.equals("isSupplementedTo")) { - list - .addAll( - enrichMissingDatasetIsSupplementedTo - .searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); - } else if (relType.equals("isSupplementedBy")) { - list - .addAll( - enrichMissingDatasetIsSupplementedBy - .searchUpdatesForRecord(target, cleanedChildrens, dedupConfig)); - } - } - - return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); + final TypedColumn, ResultWithRelations> aggr = new ResultWithRelationsAggregator() + .toColumn(); + ; + return sources + .joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getResult().getId(), Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.kryo(ResultWithRelations.class)); } public static Dataset readPath( @@ -386,7 +200,6 @@ public class GenerateEventsApplication { // dedupConfig.getWf().setConfigurationId("???"); return dedupConfig; - } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index 95d43ae686..fd87d81dd6 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -11,10 +11,11 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.pace.config.DedupConfig; -public abstract class UpdateMatcher { +public abstract class UpdateMatcher { private final boolean multipleUpdate; @@ -22,12 +23,13 @@ public abstract class UpdateMatcher { this.multipleUpdate = multipleUpdate; } - public Collection> searchUpdatesForRecord(final K res, final Collection others, + public Collection> searchUpdatesForRecord(final ResultWithRelations res, + final Collection others, final DedupConfig dedupConfig) { final Map> infoMap = new HashMap<>(); - for (final K source : others) { + for (final ResultWithRelations source : others) { if (source != res) { for (final UpdateInfo info : findUpdates(source, res, dedupConfig)) { final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); @@ -53,7 +55,8 @@ public abstract class UpdateMatcher { } } - protected abstract List> findUpdates(K source, K target, DedupConfig dedupConfig); + protected abstract List> findUpdates(ResultWithRelations source, ResultWithRelations target, + DedupConfig dedupConfig); protected static boolean isMissing(final List> list) { return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java index 3cf7b18f9a..a2ce32a9d4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java @@ -5,18 +5,17 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; - import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.pace.config.DedupConfig; public abstract class AbstractEnrichMissingDataset - extends UpdateMatcher>, eu.dnetlib.broker.objects.Dataset> { + extends UpdateMatcher { private final Topic topic; @@ -25,21 +24,27 @@ public abstract class AbstractEnrichMissingDataset this.topic = topic; } + protected abstract boolean filterByType(String relType); + @Override protected final List> findUpdates( - final Pair> source, - final Pair> target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { final Set existingDatasets = target - .getRight() + .getDatasets() .stream() + .filter(rel -> filterByType(rel.getRelType())) + .map(RelatedDataset::getRelDataset) .map(Dataset::getId) .collect(Collectors.toSet()); return source - .getRight() + .getDatasets() .stream() + .filter(rel -> filterByType(rel.getRelType())) + .map(RelatedDataset::getRelDataset) .filter(d -> !existingDatasets.contains(d.getId())) .map(ConversionUtils::oafDatasetToBrokerDataset) .map(i -> generateUpdateInfo(i, source, target, dedupConfig)) @@ -49,12 +54,12 @@ public abstract class AbstractEnrichMissingDataset protected final UpdateInfo generateUpdateInfo( final eu.dnetlib.broker.objects.Dataset highlightValue, - final Pair> source, - final Pair> target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( getTopic(), - highlightValue, source.getLeft(), target.getLeft(), + highlightValue, source, target, (p, rel) -> p.getDatasets().add(rel), rel -> rel.getInstances().get(0).getUrl(), dedupConfig); @@ -63,4 +68,5 @@ public abstract class AbstractEnrichMissingDataset public Topic getTopic() { return topic; } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsReferencedBy.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsReferencedBy.java index 74ce761f48..21786687ee 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsReferencedBy.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsReferencedBy.java @@ -9,4 +9,9 @@ public class EnrichMissingDatasetIsReferencedBy extends AbstractEnrichMissingDat super(Topic.ENRICH_MISSING_DATASET_IS_REFERENCED_BY); } + @Override + protected boolean filterByType(final String relType) { + return relType.equals("isReferencedBy"); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsRelatedTo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsRelatedTo.java index 05a8910591..0f3739434e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsRelatedTo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsRelatedTo.java @@ -9,4 +9,9 @@ public class EnrichMissingDatasetIsRelatedTo extends AbstractEnrichMissingDatase super(Topic.ENRICH_MISSING_DATASET_IS_RELATED_TO); } + @Override + protected boolean filterByType(final String relType) { + return relType.equals("isRelatedTo"); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedBy.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedBy.java index 23bd68fa1b..cde227feed 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedBy.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedBy.java @@ -9,4 +9,9 @@ public class EnrichMissingDatasetIsSupplementedBy extends AbstractEnrichMissingD super(Topic.ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY); } + @Override + protected boolean filterByType(final String relType) { + return relType.equals("isSupplementedBy"); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedTo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedTo.java index 03160b6f00..750165ff5a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedTo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedTo.java @@ -9,4 +9,9 @@ public class EnrichMissingDatasetIsSupplementedTo extends AbstractEnrichMissingD super(Topic.ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO); } + @Override + protected boolean filterByType(final String relType) { + return relType.equals("isSupplementedTo"); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetReferences.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetReferences.java index bf1df053d6..b1c0afe16f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetReferences.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetReferences.java @@ -9,4 +9,9 @@ public class EnrichMissingDatasetReferences extends AbstractEnrichMissingDataset super(Topic.ENRICH_MISSING_DATASET_REFERENCES); } + @Override + protected boolean filterByType(final String relType) { + return relType.equals("references"); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java index 22817a25d9..5462877950 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java @@ -5,34 +5,33 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; - import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; public class EnrichMissingProject - extends UpdateMatcher>, eu.dnetlib.broker.objects.Project> { + extends UpdateMatcher { public EnrichMissingProject() { super(true); } @Override - protected List> findUpdates(final Pair> source, - final Pair> target, + protected List> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { - if (source.getRight().isEmpty()) { + if (source.getProjects().isEmpty()) { return Arrays.asList(); } else { return target - .getRight() + .getProjects() .stream() + .map(RelatedProject::getRelProject) .map(ConversionUtils::oafProjectToBrokerProject) .map(p -> generateUpdateInfo(p, source, target, dedupConfig)) .collect(Collectors.toList()); @@ -41,12 +40,12 @@ public class EnrichMissingProject public UpdateInfo generateUpdateInfo( final eu.dnetlib.broker.objects.Project highlightValue, - final Pair> source, - final Pair> target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MISSING_PROJECT, - highlightValue, source.getLeft(), target.getLeft(), + highlightValue, source, target, (p, prj) -> p.getProjects().add(prj), prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode(), dedupConfig); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java index 016bdd2836..54ebe7b713 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java @@ -5,36 +5,37 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; - import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMoreProject extends UpdateMatcher>, eu.dnetlib.broker.objects.Project> { +public class EnrichMoreProject extends UpdateMatcher { public EnrichMoreProject() { super(true); } @Override - protected List> findUpdates(final Pair> source, - final Pair> target, + protected List> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { final Set existingProjects = source - .getRight() + .getProjects() .stream() + .map(RelatedProject::getRelProject) .map(Project::getId) .collect(Collectors.toSet()); return target - .getRight() + .getProjects() .stream() + .map(RelatedProject::getRelProject) .filter(p -> !existingProjects.contains(p.getId())) .map(ConversionUtils::oafProjectToBrokerProject) .map(p -> generateUpdateInfo(p, source, target, dedupConfig)) @@ -43,12 +44,12 @@ public class EnrichMoreProject extends UpdateMatcher> public UpdateInfo generateUpdateInfo( final eu.dnetlib.broker.objects.Project highlightValue, - final Pair> source, - final Pair> target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MORE_PROJECT, - highlightValue, source.getLeft(), target.getLeft(), + highlightValue, source, target, (p, prj) -> p.getProjects().add(prj), prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode(), dedupConfig); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java index ec575e68d7..8793d38dce 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java @@ -5,18 +5,17 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; - import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.pace.config.DedupConfig; public abstract class AbstractEnrichMissingPublication - extends UpdateMatcher>, eu.dnetlib.broker.objects.Publication> { + extends UpdateMatcher { private final Topic topic; @@ -25,21 +24,27 @@ public abstract class AbstractEnrichMissingPublication this.topic = topic; } + protected abstract boolean filterByType(String relType); + @Override protected final List> findUpdates( - final Pair> source, - final Pair> target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { final Set existingPublications = target - .getRight() + .getPublications() .stream() + .filter(rel -> filterByType(rel.getRelType())) + .map(RelatedPublication::getRelPublication) .map(Publication::getId) .collect(Collectors.toSet()); return source - .getRight() + .getPublications() .stream() + .filter(rel -> filterByType(rel.getRelType())) + .map(RelatedPublication::getRelPublication) .filter(d -> !existingPublications.contains(d.getId())) .map(ConversionUtils::oafResultToBrokerPublication) .map(i -> generateUpdateInfo(i, source, target, dedupConfig)) @@ -49,12 +54,12 @@ public abstract class AbstractEnrichMissingPublication protected final UpdateInfo generateUpdateInfo( final eu.dnetlib.broker.objects.Publication highlightValue, - final Pair> source, - final Pair> target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( getTopic(), - highlightValue, source.getLeft(), target.getLeft(), + highlightValue, source, target, (p, rel) -> p.getPublications().add(rel), rel -> rel.getInstances().get(0).getUrl(), dedupConfig); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsReferencedBy.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsReferencedBy.java index 73fa8a45f6..eebb5c1a66 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsReferencedBy.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsReferencedBy.java @@ -9,4 +9,8 @@ public class EnrichMissingPublicationIsReferencedBy extends AbstractEnrichMissin super(Topic.ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY); } + @Override + protected boolean filterByType(final String relType) { + return relType.equals("isReferencedBy"); + } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsRelatedTo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsRelatedTo.java index 361ea3b342..a8aa550d44 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsRelatedTo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsRelatedTo.java @@ -9,4 +9,9 @@ public class EnrichMissingPublicationIsRelatedTo extends AbstractEnrichMissingPu super(Topic.ENRICH_MISSING_PUBLICATION_IS_RELATED_TO); } + @Override + protected boolean filterByType(final String relType) { + return relType.equals("isRelatedTo"); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedBy.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedBy.java index 7e8863b1ed..762ac942e4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedBy.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedBy.java @@ -9,4 +9,8 @@ public class EnrichMissingPublicationIsSupplementedBy extends AbstractEnrichMiss super(Topic.ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY); } + @Override + protected boolean filterByType(final String relType) { + return relType.equals("isSupplementedBy"); + } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedTo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedTo.java index dc4e513777..fc7196a015 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedTo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedTo.java @@ -9,4 +9,9 @@ public class EnrichMissingPublicationIsSupplementedTo extends AbstractEnrichMiss super(Topic.ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO); } + @Override + protected boolean filterByType(final String relType) { + return relType.equals("isSupplementedTo"); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationReferences.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationReferences.java index 5198098bcf..da19944549 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationReferences.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationReferences.java @@ -9,4 +9,9 @@ public class EnrichMissingPublicationReferences extends AbstractEnrichMissingPub super(Topic.ENRICH_MISSING_PUBLICATION_REFERENCES); } + @Override + protected boolean filterByType(final String relType) { + return relType.equals("references"); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java index 699d546ec5..1ce5415d5c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java @@ -5,18 +5,16 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; - import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; public class EnrichMissingSoftware - extends UpdateMatcher>, eu.dnetlib.broker.objects.Software> { + extends UpdateMatcher { public EnrichMissingSoftware() { super(true); @@ -24,16 +22,17 @@ public class EnrichMissingSoftware @Override protected List> findUpdates( - final Pair> source, - final Pair> target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { - if (source.getRight().isEmpty()) { + if (source.getSoftwares().isEmpty()) { return Arrays.asList(); } else { return target - .getRight() + .getSoftwares() .stream() + .map(RelatedSoftware::getRelSoftware) .map(ConversionUtils::oafSoftwareToBrokerSoftware) .map(p -> generateUpdateInfo(p, source, target, dedupConfig)) .collect(Collectors.toList()); @@ -42,12 +41,12 @@ public class EnrichMissingSoftware public UpdateInfo generateUpdateInfo( final eu.dnetlib.broker.objects.Software highlightValue, - final Pair> source, - final Pair> target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MISSING_SOFTWARE, - highlightValue, source.getLeft(), target.getLeft(), + highlightValue, source, target, (p, s) -> p.getSoftwares().add(s), s -> s.getName(), dedupConfig); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java index 45631df20a..4d1f4f23f1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java @@ -5,18 +5,17 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; - import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.pace.config.DedupConfig; public class EnrichMoreSoftware - extends UpdateMatcher>, eu.dnetlib.broker.objects.Software> { + extends UpdateMatcher { public EnrichMoreSoftware() { super(true); @@ -24,19 +23,21 @@ public class EnrichMoreSoftware @Override protected List> findUpdates( - final Pair> source, - final Pair> target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { final Set existingSoftwares = source - .getRight() + .getSoftwares() .stream() + .map(RelatedSoftware::getRelSoftware) .map(Software::getId) .collect(Collectors.toSet()); return target - .getRight() + .getSoftwares() .stream() + .map(RelatedSoftware::getRelSoftware) .filter(p -> !existingSoftwares.contains(p.getId())) .map(ConversionUtils::oafSoftwareToBrokerSoftware) .map(p -> generateUpdateInfo(p, source, target, dedupConfig)) @@ -45,12 +46,12 @@ public class EnrichMoreSoftware public UpdateInfo generateUpdateInfo( final eu.dnetlib.broker.objects.Software highlightValue, - final Pair> source, - final Pair> target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MORE_SOFTWARE, - highlightValue, source.getLeft(), target.getLeft(), + highlightValue, source, target, (p, s) -> p.getSoftwares().add(s), s -> s.getName(), dedupConfig); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java index 7dc340b3c7..db99724798 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java @@ -8,28 +8,31 @@ import java.util.List; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMissingAbstract extends UpdateMatcher { +public class EnrichMissingAbstract extends UpdateMatcher { public EnrichMissingAbstract() { super(false); } @Override - protected List> findUpdates(final Result source, final Result target, + protected List> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { - if (isMissing(target.getDescription()) && !isMissing(source.getDescription())) { + if (isMissing(target.getResult().getDescription()) && !isMissing(source.getResult().getDescription())) { return Arrays - .asList(generateUpdateInfo(source.getDescription().get(0).getValue(), source, target, dedupConfig)); + .asList( + generateUpdateInfo( + source.getResult().getDescription().get(0).getValue(), source, target, dedupConfig)); } return new ArrayList<>(); } public UpdateInfo generateUpdateInfo(final String highlightValue, - final Result source, - final Result target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MISSING_ABSTRACT, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java index 7a1677ae2d..1226aaf45e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java @@ -9,17 +9,18 @@ import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMissingAuthorOrcid extends UpdateMatcher> { +public class EnrichMissingAuthorOrcid extends UpdateMatcher> { public EnrichMissingAuthorOrcid() { super(true); } @Override - protected List>> findUpdates(final Result source, final Result target, + protected List>> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { // TODO // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); @@ -27,8 +28,8 @@ public class EnrichMissingAuthorOrcid extends UpdateMatcher> generateUpdateInfo(final Pair highlightValue, - final Result source, - final Result target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MISSING_AUTHOR_ORCID, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java index d14490ba85..69bd3630a2 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java @@ -11,19 +11,21 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMissingOpenAccess extends UpdateMatcher { +public class EnrichMissingOpenAccess extends UpdateMatcher { public EnrichMissingOpenAccess() { super(true); } @Override - protected List> findUpdates(final Result source, final Result target, + protected List> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { final long count = target + .getResult() .getInstance() .stream() .map(i -> i.getAccessright().getClassid()) @@ -35,6 +37,7 @@ public class EnrichMissingOpenAccess extends UpdateMatcher { } return source + .getResult() .getInstance() .stream() .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS)) @@ -45,8 +48,8 @@ public class EnrichMissingOpenAccess extends UpdateMatcher { } public UpdateInfo generateUpdateInfo(final Instance highlightValue, - final Result source, - final Result target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MISSING_OA_VERSION, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java index 20303ec1b4..4b7b1735ba 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java @@ -10,25 +10,27 @@ import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMissingPid extends UpdateMatcher { +public class EnrichMissingPid extends UpdateMatcher { public EnrichMissingPid() { super(true); } @Override - protected List> findUpdates(final Result source, final Result target, + protected List> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { - final long count = target.getPid().size(); + final long count = target.getResult().getPid().size(); if (count > 0) { return Arrays.asList(); } return source + .getResult() .getPid() .stream() .map(ConversionUtils::oafPidToBrokerPid) @@ -36,7 +38,9 @@ public class EnrichMissingPid extends UpdateMatcher { .collect(Collectors.toList()); } - public UpdateInfo generateUpdateInfo(final Pid highlightValue, final Result source, final Result target, + public UpdateInfo generateUpdateInfo(final Pid highlightValue, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MISSING_PID, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java index e1de8ce4d3..ecf8da1571 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java @@ -8,28 +8,32 @@ import java.util.List; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMissingPublicationDate extends UpdateMatcher { +public class EnrichMissingPublicationDate extends UpdateMatcher { public EnrichMissingPublicationDate() { super(false); } @Override - protected List> findUpdates(final Result source, final Result target, + protected List> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { - if (isMissing(target.getDateofacceptance()) && !isMissing(source.getDateofacceptance())) { + if (isMissing(target.getResult().getDateofacceptance()) + && !isMissing(source.getResult().getDateofacceptance())) { return Arrays - .asList(generateUpdateInfo(source.getDateofacceptance().getValue(), source, target, dedupConfig)); + .asList( + generateUpdateInfo( + source.getResult().getDateofacceptance().getValue(), source, target, dedupConfig)); } return new ArrayList<>(); } public UpdateInfo generateUpdateInfo(final String highlightValue, - final Result source, - final Result target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MISSING_PUBLICATION_DATE, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java index c51f8991c4..9d3a3aa44a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java @@ -11,21 +11,23 @@ import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMissingSubject extends UpdateMatcher> { +public class EnrichMissingSubject extends UpdateMatcher> { public EnrichMissingSubject() { super(true); } @Override - protected List>> findUpdates(final Result source, final Result target, + protected List>> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { final Set existingTypes = target + .getResult() .getSubject() .stream() .map(StructuredProperty::getQualifier) @@ -33,6 +35,7 @@ public class EnrichMissingSubject extends UpdateMatcher !existingTypes.contains(pid.getQualifier().getClassid())) @@ -42,8 +45,8 @@ public class EnrichMissingSubject extends UpdateMatcher> generateUpdateInfo(final Pair highlightValue, - final Result source, - final Result target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java index 2ac04fd12f..fc1112d732 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java @@ -11,19 +11,21 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMoreOpenAccess extends UpdateMatcher { +public class EnrichMoreOpenAccess extends UpdateMatcher { public EnrichMoreOpenAccess() { super(true); } @Override - protected List> findUpdates(final Result source, final Result target, + protected List> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { final Set urls = target + .getResult() .getInstance() .stream() .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS)) @@ -32,6 +34,7 @@ public class EnrichMoreOpenAccess extends UpdateMatcher { .collect(Collectors.toSet()); return source + .getResult() .getInstance() .stream() .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS)) @@ -43,8 +46,8 @@ public class EnrichMoreOpenAccess extends UpdateMatcher { } public UpdateInfo generateUpdateInfo(final Instance highlightValue, - final Result source, - final Result target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MORE_OA_VERSION, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java index e4bf5d2c2e..7984cc5215 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java @@ -10,25 +10,28 @@ import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMorePid extends UpdateMatcher { +public class EnrichMorePid extends UpdateMatcher { public EnrichMorePid() { super(true); } @Override - protected List> findUpdates(final Result source, final Result target, + protected List> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { final Set existingPids = target + .getResult() .getPid() .stream() .map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue()) .collect(Collectors.toSet()); return source + .getResult() .getPid() .stream() .filter(pid -> !existingPids.contains(pid.getQualifier().getClassid() + "::" + pid.getValue())) @@ -37,7 +40,9 @@ public class EnrichMorePid extends UpdateMatcher { .collect(Collectors.toList()); } - public UpdateInfo generateUpdateInfo(final Pid highlightValue, final Result source, final Result target, + public UpdateInfo generateUpdateInfo(final Pid highlightValue, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MORE_PID, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java index d6e607c31c..1a522c745b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java @@ -11,25 +11,28 @@ import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMoreSubject extends UpdateMatcher> { +public class EnrichMoreSubject extends UpdateMatcher> { public EnrichMoreSubject() { super(true); } @Override - protected List>> findUpdates(final Result source, final Result target, + protected List>> findUpdates(final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { final Set existingSubjects = target + .getResult() .getSubject() .stream() .map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue()) .collect(Collectors.toSet()); return source + .getResult() .getPid() .stream() .filter(pid -> !existingSubjects.contains(pid.getQualifier().getClassid() + "::" + pid.getValue())) @@ -39,8 +42,8 @@ public class EnrichMoreSubject extends UpdateMatcher> generateUpdateInfo(final Pair highlightValue, - final Result source, - final Result target, + final ResultWithRelations source, + final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java new file mode 100644 index 0000000000..b4de08db77 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -0,0 +1,86 @@ + +package eu.dnetlib.dhp.broker.oa.util; + +import java.util.ArrayList; +import java.util.List; + +import eu.dnetlib.dhp.broker.model.EventFactory; +import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy; +import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo; +import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy; +import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo; +import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences; +import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject; +import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject; +import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy; +import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo; +import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy; +import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo; +import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences; +import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware; +import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; +import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; +import eu.dnetlib.pace.config.DedupConfig; + +public class EventFinder { + + private static List> matchers = new ArrayList<>(); + static { + matchers.add(new EnrichMissingAbstract()); + matchers.add(new EnrichMissingAuthorOrcid()); + matchers.add(new EnrichMissingOpenAccess()); + matchers.add(new EnrichMissingPid()); + matchers.add(new EnrichMissingPublicationDate()); + matchers.add(new EnrichMissingSubject()); + matchers.add(new EnrichMoreOpenAccess()); + matchers.add(new EnrichMorePid()); + matchers.add(new EnrichMoreSubject()); + + // Advanced matchers + matchers.add(new EnrichMissingProject()); + matchers.add(new EnrichMoreProject()); + matchers.add(new EnrichMissingSoftware()); + matchers.add(new EnrichMoreSoftware()); + matchers.add(new EnrichMissingPublicationIsRelatedTo()); + matchers.add(new EnrichMissingPublicationIsReferencedBy()); + matchers.add(new EnrichMissingPublicationReferences()); + matchers.add(new EnrichMissingPublicationIsSupplementedTo()); + matchers.add(new EnrichMissingPublicationIsSupplementedBy()); + matchers.add(new EnrichMissingDatasetIsRelatedTo()); + matchers.add(new EnrichMissingDatasetIsReferencedBy()); + matchers.add(new EnrichMissingDatasetReferences()); + matchers.add(new EnrichMissingDatasetIsSupplementedTo()); + matchers.add(new EnrichMissingDatasetIsSupplementedBy()); + matchers.add(new EnrichMissingAbstract()); + } + + public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) { + final List> list = new ArrayList<>(); + + for (final ResultWithRelations target : results.getData()) { + for (final UpdateMatcher matcher : matchers) { + list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig)); + } + } + + return asEventGroup(list); + } + + private static EventGroup asEventGroup(final List> list) { + final EventGroup events = new EventGroup(); + list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement); + return events; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index 893aa2827d..9da6364137 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -14,6 +14,7 @@ import eu.dnetlib.broker.objects.OpenAireEventPayload; import eu.dnetlib.broker.objects.Provenance; import eu.dnetlib.broker.objects.Publication; import eu.dnetlib.dhp.broker.model.Topic; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Result; @@ -28,9 +29,9 @@ public final class UpdateInfo { private final T highlightValue; - private final Result source; + private final ResultWithRelations source; - private final Result target; + private final ResultWithRelations target; private final BiConsumer compileHighlight; @@ -40,7 +41,8 @@ public final class UpdateInfo { private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class); - public UpdateInfo(final Topic topic, final T highlightValue, final Result source, final Result target, + public UpdateInfo(final Topic topic, final T highlightValue, final ResultWithRelations source, + final ResultWithRelations target, final BiConsumer compileHighlight, final Function highlightToString, final DedupConfig dedupConfig) { @@ -50,18 +52,18 @@ public final class UpdateInfo { this.target = target; this.compileHighlight = compileHighlight; this.highlightToString = highlightToString; - this.trust = calculateTrust(dedupConfig, source, target); + this.trust = calculateTrust(dedupConfig, source.getResult(), target.getResult()); } public T getHighlightValue() { return highlightValue; } - public Result getSource() { + public ResultWithRelations getSource() { return source; } - public Result getTarget() { + public ResultWithRelations getTarget() { return target; } @@ -101,20 +103,22 @@ public final class UpdateInfo { public OpenAireEventPayload asBrokerPayload() { - final Publication p = ConversionUtils.oafResultToBrokerPublication(getSource()); + final Publication p = ConversionUtils.oafResultToBrokerPublication(getSource().getResult()); compileHighlight.accept(p, getHighlightValue()); final Publication hl = new Publication(); compileHighlight.accept(hl, getHighlightValue()); - final String provId = getSource().getOriginalId().stream().findFirst().orElse(null); + final String provId = getSource().getResult().getOriginalId().stream().findFirst().orElse(null); final String provRepo = getSource() + .getResult() .getCollectedfrom() .stream() .map(KeyValue::getValue) .findFirst() .orElse(null); final String provUrl = getSource() + .getResult() .getInstance() .stream() .map(Instance::getUrl) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java similarity index 75% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultAggregator.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java index 475c768149..397f306600 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java @@ -1,15 +1,15 @@ -package eu.dnetlib.dhp.broker.oa.util; +package eu.dnetlib.dhp.broker.oa.util.aggregators.simple; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.Result; import scala.Tuple2; -public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> { +public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> { /** * @@ -22,7 +22,7 @@ public class ResultAggregator extends Aggregator, Resul } @Override - public ResultGroup reduce(final ResultGroup group, final Tuple2 t) { + public ResultGroup reduce(final ResultGroup group, final Tuple2 t) { return group.addElement(t._1); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java similarity index 58% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultGroup.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java index 2be673db03..81f189d62e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultGroup.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java @@ -1,11 +1,11 @@ -package eu.dnetlib.dhp.broker.oa.util; +package eu.dnetlib.dhp.broker.oa.util.aggregators.simple; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; public class ResultGroup implements Serializable { @@ -14,13 +14,13 @@ public class ResultGroup implements Serializable { */ private static final long serialVersionUID = -3360828477088669296L; - private final List data = new ArrayList<>(); + private final List data = new ArrayList<>(); - public List getData() { + public List getData() { return data; } - public ResultGroup addElement(final Result elem) { + public ResultGroup addElement(final ResultWithRelations elem) { data.add(elem); return this; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java new file mode 100644 index 0000000000..84cf693ad9 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java @@ -0,0 +1,30 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import eu.dnetlib.dhp.schema.oaf.Dataset; + +public class RelatedDataset { + + private final String source; + private final String relType; + private final Dataset relDataset; + + public RelatedDataset(final String source, final String relType, final Dataset relDataset) { + this.source = source; + this.relType = relType; + this.relDataset = relDataset; + } + + public String getSource() { + return source; + } + + public String getRelType() { + return relType; + } + + public Dataset getRelDataset() { + return relDataset; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java new file mode 100644 index 0000000000..490724a44b --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java @@ -0,0 +1,28 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Software; + +public class RelatedEntityFactory { + + @SuppressWarnings("unchecked") + public static RT newRelatedEntity(final String sourceId, final String relType, final T target, + final Class clazz) { + if (clazz == RelatedProject.class) { + return (RT) new RelatedProject(sourceId, relType, (Project) target); + } + if (clazz == RelatedSoftware.class) { + return (RT) new RelatedSoftware(sourceId, relType, (Software) target); + } + if (clazz == RelatedDataset.class) { + return (RT) new RelatedDataset(sourceId, relType, (Dataset) target); + } + if (clazz == RelatedPublication.class) { + return (RT) new RelatedPublication(sourceId, relType, (Publication) target); + } + return null; + } +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java new file mode 100644 index 0000000000..2d16f54096 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java @@ -0,0 +1,30 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import eu.dnetlib.dhp.schema.oaf.Project; + +public class RelatedProject { + + private final String source; + private final String relType; + private final Project relProject; + + public RelatedProject(final String source, final String relType, final Project relProject) { + this.source = source; + this.relType = relType; + this.relProject = relProject; + } + + public String getSource() { + return source; + } + + public String getRelType() { + return relType; + } + + public Project getRelProject() { + return relProject; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java new file mode 100644 index 0000000000..f1545c0045 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java @@ -0,0 +1,30 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import eu.dnetlib.dhp.schema.oaf.Publication; + +public class RelatedPublication { + + private final String source; + private final String relType; + private final Publication relPublication; + + public RelatedPublication(final String source, final String relType, final Publication relPublication) { + this.source = source; + this.relType = relType; + this.relPublication = relPublication; + } + + public String getSource() { + return source; + } + + public String getRelType() { + return relType; + } + + public Publication getRelPublication() { + return relPublication; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java new file mode 100644 index 0000000000..e5873d2632 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java @@ -0,0 +1,30 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import eu.dnetlib.dhp.schema.oaf.Software; + +public class RelatedSoftware { + + private final String source; + private final String relType; + private final Software relSoftware; + + public RelatedSoftware(final String source, final String relType, final Software relSoftware) { + this.source = source; + this.relType = relType; + this.relSoftware = relSoftware; + } + + public String getSource() { + return source; + } + + public String getRelType() { + return relType; + } + + public Software getRelSoftware() { + return relSoftware; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelations.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelations.java new file mode 100644 index 0000000000..2d762aded4 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelations.java @@ -0,0 +1,55 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import eu.dnetlib.dhp.schema.oaf.Result; + +public class ResultWithRelations implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -1368401915974311571L; + + private Result result; + + private final List datasets = new ArrayList<>(); + private final List publications = new ArrayList<>(); + private final List softwares = new ArrayList<>(); + private final List projects = new ArrayList<>(); + + public ResultWithRelations() { + } + + public ResultWithRelations(final Result result) { + this.result = result; + } + + public Result getResult() { + return result; + } + + public List getDatasets() { + return datasets; + } + + public List getPublications() { + return publications; + } + + public List getSoftwares() { + return softwares; + } + + public List getProjects() { + return projects; + } + + public void setResult(final Result result) { + this.result = result; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelationsAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelationsAggregator.java new file mode 100644 index 0000000000..b4922a64fb --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelationsAggregator.java @@ -0,0 +1,68 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import scala.Tuple2; + +public class ResultWithRelationsAggregator + extends Aggregator, ResultWithRelations, ResultWithRelations> { + + /** + * + */ + private static final long serialVersionUID = -3687878788861013488L; + + @Override + public ResultWithRelations zero() { + return new ResultWithRelations(); + } + + @Override + public ResultWithRelations finish(final ResultWithRelations g) { + return g; + } + + @Override + public ResultWithRelations reduce(final ResultWithRelations g, final Tuple2 t) { + if (g.getResult() == null) { + return t._1; + } else if (t._2 instanceof RelatedSoftware) { + g.getSoftwares().add((RelatedSoftware) t._2); + } else if (t._2 instanceof RelatedDataset) { + g.getDatasets().add((RelatedDataset) t._2); + } else if (t._2 instanceof RelatedPublication) { + g.getPublications().add((RelatedPublication) t._2); + } else if (t._2 instanceof RelatedProject) { + g.getProjects().add((RelatedProject) t._2); + } + return g; + + } + + @Override + public ResultWithRelations merge(final ResultWithRelations g1, final ResultWithRelations g2) { + if (g1.getResult() != null) { + g1.getSoftwares().addAll(g2.getSoftwares()); + g1.getDatasets().addAll(g2.getDatasets()); + g1.getPublications().addAll(g2.getPublications()); + g1.getProjects().addAll(g2.getProjects()); + return g1; + } else { + return g2; + } + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(ResultWithRelations.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(ResultWithRelations.class); + } + +}