From 8b9933b934eac804d50f126411a3f27ee2384182 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 24 Jun 2020 08:57:13 +0200 Subject: [PATCH] refactoring aggregators --- .../dhp/broker/oa/GenerateEventsJob.java | 2 +- .../dhp/broker/oa/JoinEntitiesJob.java | 35 +++++---- .../dhp/broker/oa/PrepareGroupsJob.java | 2 +- .../OaBrokerMainEntityAggregator.java | 71 ------------------- .../withRels/RelatedDatasetAggregator.java | 58 +++++++++++++++ .../withRels/RelatedProjectAggregator.java | 58 +++++++++++++++ .../RelatedPublicationAggregator.java | 59 +++++++++++++++ .../withRels/RelatedSoftwareAggregator.java | 58 +++++++++++++++ 8 files changed, 252 insertions(+), 91 deletions(-) delete mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OaBrokerMainEntityAggregator.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index 089fbf6d4..eaeb6d271 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -67,7 +67,7 @@ public class GenerateEventsJob { ClusterUtils.removeDir(spark, eventsPath); final Dataset groups = ClusterUtils - .readPath(spark, workingPath + "/relation", ResultGroup.class); + .readPath(spark, workingPath + "/duplicates", ResultGroup.class); final Dataset events = groups .map( diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java index da77a4673..868faa8f5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java @@ -11,18 +11,15 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.TypedColumn; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProjectAggregator; import scala.Tuple2; public class JoinEntitiesJob { @@ -59,31 +56,33 @@ public class JoinEntitiesJob { .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); final Dataset r1 = join( - r0, ClusterUtils.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class)); - final Dataset r2 = join( - r1, ClusterUtils.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class)); - final Dataset r3 = join( - r2, ClusterUtils.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class)); - final Dataset r4 = join( - r3, ClusterUtils.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class)); + r0, ClusterUtils.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class), + new RelatedProjectAggregator()); + // final Dataset r2 = join( + // r1, ClusterUtils.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class), new + // RelatedDatasetAggregator()); + // final Dataset r3 = join( + // r2, ClusterUtils.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class), new + // RelatedPublicationAggregator()); + // final Dataset r4 = join( + // r3, ClusterUtils.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class), new + // RelatedSoftwareAggregator()); - r4.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath); + r1.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath); }); } private static Dataset join(final Dataset sources, - final Dataset typedRels) { - - final TypedColumn, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator() - .toColumn(); + final Dataset typedRels, + final Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> aggr) { return sources .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) - .agg(aggr) + .agg(aggr.toColumn()) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java index 159047dad..934ddff59 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -48,7 +48,7 @@ public class PrepareGroupsJob { final String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); - final String groupsPath = workingPath + "/groups"; + final String groupsPath = workingPath + "/duplicates"; log.info("groupsPath: {}", groupsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OaBrokerMainEntityAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OaBrokerMainEntityAggregator.java deleted file mode 100644 index 6a2d9b06d..000000000 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OaBrokerMainEntityAggregator.java +++ /dev/null @@ -1,71 +0,0 @@ - -package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; - -import org.apache.spark.sql.Encoder; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.expressions.Aggregator; - -import eu.dnetlib.broker.objects.OaBrokerMainEntity; -import scala.Tuple2; - -public class OaBrokerMainEntityAggregator - extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> { - - /** - * - */ - private static final long serialVersionUID = -3687878788861013488L; - - @Override - public OaBrokerMainEntity zero() { - return new OaBrokerMainEntity(); - } - - @Override - public OaBrokerMainEntity finish(final OaBrokerMainEntity g) { - return g; - } - - @Override - public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { - if (g.getOriginalId() == null) { - return t._1; - } else if (t._2 instanceof RelatedSoftware) { - g.getSoftwares().add(((RelatedSoftware) t._2).getRelSoftware()); - } else if (t._2 instanceof RelatedDataset) { - g.getDatasets().add(((RelatedDataset) t._2).getRelDataset()); - } else if (t._2 instanceof RelatedPublication) { - g.getPublications().add(((RelatedPublication) t._2).getRelPublication()); - } else if (t._2 instanceof RelatedProject) { - g.getProjects().add(((RelatedProject) t._2).getRelProject()); - } else { - throw new RuntimeException("Invalid Object: " + t._2.getClass()); - } - return g; - - } - - @Override - public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { - if (g1.getOriginalId() != null) { - g1.getSoftwares().addAll(g2.getSoftwares()); - g1.getDatasets().addAll(g2.getDatasets()); - g1.getPublications().addAll(g2.getPublications()); - g1.getProjects().addAll(g2.getProjects()); - return g1; - } else { - return g2; - } - } - - @Override - public Encoder bufferEncoder() { - return Encoders.bean(OaBrokerMainEntity.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.bean(OaBrokerMainEntity.class); - } - -} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java new file mode 100644 index 000000000..04840afe9 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java @@ -0,0 +1,58 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import scala.Tuple2; + +public class RelatedDatasetAggregator + extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> { + + /** + * + */ + private static final long serialVersionUID = 6969761680131482557L; + + @Override + public OaBrokerMainEntity zero() { + return new OaBrokerMainEntity(); + } + + @Override + public OaBrokerMainEntity finish(final OaBrokerMainEntity g) { + return g; + } + + @Override + public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { + final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; + res.getDatasets().add(t._2.getRelDataset()); + return res; + + } + + @Override + public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { + if (StringUtils.isNotBlank(g1.getOriginalId())) { + g1.getDatasets().addAll(g2.getDatasets()); + return g1; + } else { + return g2; + } + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java new file mode 100644 index 000000000..025cc413a --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java @@ -0,0 +1,58 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import scala.Tuple2; + +public class RelatedProjectAggregator + extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> { + + /** + * + */ + private static final long serialVersionUID = 8559808519152275763L; + + @Override + public OaBrokerMainEntity zero() { + return new OaBrokerMainEntity(); + } + + @Override + public OaBrokerMainEntity finish(final OaBrokerMainEntity g) { + return g; + } + + @Override + public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { + final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; + res.getProjects().add(t._2.getRelProject()); + return res; + + } + + @Override + public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { + if (StringUtils.isNotBlank(g1.getOriginalId())) { + g1.getProjects().addAll(g2.getProjects()); + return g1; + } else { + return g2; + } + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java new file mode 100644 index 000000000..1b54d4a12 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java @@ -0,0 +1,59 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import scala.Tuple2; + +public class RelatedPublicationAggregator + extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> { + + /** + * + */ + private static final long serialVersionUID = 4656934981558135919L; + + @Override + public OaBrokerMainEntity zero() { + return new OaBrokerMainEntity(); + } + + @Override + public OaBrokerMainEntity finish(final OaBrokerMainEntity g) { + return g; + } + + @Override + public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, + final Tuple2 t) { + final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; + res.getPublications().add(t._2.getRelPublication()); + return res; + + } + + @Override + public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { + if (StringUtils.isNotBlank(g1.getOriginalId())) { + g1.getPublications().addAll(g2.getPublications()); + return g1; + } else { + return g2; + } + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java new file mode 100644 index 000000000..871cc4f06 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java @@ -0,0 +1,58 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import scala.Tuple2; + +public class RelatedSoftwareAggregator + extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> { + + /** + * + */ + private static final long serialVersionUID = -8987959389106443702L; + + @Override + public OaBrokerMainEntity zero() { + return new OaBrokerMainEntity(); + } + + @Override + public OaBrokerMainEntity finish(final OaBrokerMainEntity g) { + return g; + } + + @Override + public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { + final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; + res.getSoftwares().add(t._2.getRelSoftware()); + return res; + + } + + @Override + public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { + if (StringUtils.isNotBlank(g1.getOriginalId())) { + g1.getSoftwares().addAll(g2.getSoftwares()); + return g1; + } else { + return g2; + } + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); + } + +}