diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index eaeb6d271..dbe2fdd47 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -53,7 +53,7 @@ public class GenerateEventsJob { final String dedupConfigProfileId = parser.get("dedupConfProfile"); log.info("dedupConfigProfileId: {}", dedupConfigProfileId); - final String eventsPath = workingPath + "/eventsPath"; + final String eventsPath = workingPath + "/events"; log.info("eventsPath: {}", eventsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java index 04840afe9..a963f073d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java @@ -29,15 +29,17 @@ public class RelatedDatasetAggregator @Override public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { - final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; - res.getDatasets().add(t._2.getRelDataset()); + final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; + if (t._2 != null) { + res.getDatasets().add(t._2.getRelDataset()); + } return res; } @Override public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { - if (StringUtils.isNotBlank(g1.getOriginalId())) { + if (StringUtils.isNotBlank(g1.getOpenaireId())) { g1.getDatasets().addAll(g2.getDatasets()); return g1; } else { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java index 025cc413a..3fedb1a32 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java @@ -29,15 +29,17 @@ public class RelatedProjectAggregator @Override public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { - final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; - res.getProjects().add(t._2.getRelProject()); + final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; + if (t._2 != null) { + res.getProjects().add(t._2.getRelProject()); + } return res; } @Override public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { - if (StringUtils.isNotBlank(g1.getOriginalId())) { + if (StringUtils.isNotBlank(g1.getOpenaireId())) { g1.getProjects().addAll(g2.getProjects()); return g1; } else { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java index 1b54d4a12..b331599ad 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java @@ -30,15 +30,17 @@ public class RelatedPublicationAggregator @Override public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { - final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; - res.getPublications().add(t._2.getRelPublication()); + final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; + if (t._2 != null) { + res.getPublications().add(t._2.getRelPublication()); + } return res; } @Override public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { - if (StringUtils.isNotBlank(g1.getOriginalId())) { + if (StringUtils.isNotBlank(g1.getOpenaireId())) { g1.getPublications().addAll(g2.getPublications()); return g1; } else { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java index 871cc4f06..d3b1c3407 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java @@ -29,15 +29,17 @@ public class RelatedSoftwareAggregator @Override public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { - final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; - res.getSoftwares().add(t._2.getRelSoftware()); + final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; + if (t._2 != null) { + res.getSoftwares().add(t._2.getRelSoftware()); + } return res; } @Override public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { - if (StringUtils.isNotBlank(g1.getOriginalId())) { + if (StringUtils.isNotBlank(g1.getOpenaireId())) { g1.getSoftwares().addAll(g2.getSoftwares()); return g1; } else {