refactoring aggregators

This commit is contained in:
Michele Artini 2020-06-24 08:57:13 +02:00
parent d13e3d3f68
commit 8b9933b934
8 changed files with 252 additions and 91 deletions

View File

@ -67,7 +67,7 @@ public class GenerateEventsJob {
ClusterUtils.removeDir(spark, eventsPath);
final Dataset<ResultGroup> groups = ClusterUtils
.readPath(spark, workingPath + "/relation", ResultGroup.class);
.readPath(spark, workingPath + "/duplicates", ResultGroup.class);
final Dataset<Event> events = groups
.map(

View File

@ -11,18 +11,15 @@ import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProjectAggregator;
import scala.Tuple2;
public class JoinEntitiesJob {
@ -59,31 +56,33 @@ public class JoinEntitiesJob {
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
final Dataset<OaBrokerMainEntity> r1 = join(
r0, ClusterUtils.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class));
final Dataset<OaBrokerMainEntity> r2 = join(
r1, ClusterUtils.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class));
final Dataset<OaBrokerMainEntity> r3 = join(
r2, ClusterUtils.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class));
final Dataset<OaBrokerMainEntity> r4 = join(
r3, ClusterUtils.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class));
r0, ClusterUtils.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class),
new RelatedProjectAggregator());
// final Dataset<OaBrokerMainEntity> r2 = join(
// r1, ClusterUtils.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class), new
// RelatedDatasetAggregator());
// final Dataset<OaBrokerMainEntity> r3 = join(
// r2, ClusterUtils.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class), new
// RelatedPublicationAggregator());
// final Dataset<OaBrokerMainEntity> r4 = join(
// r3, ClusterUtils.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class), new
// RelatedSoftwareAggregator());
r4.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath);
r1.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath);
});
}
private static <T> Dataset<OaBrokerMainEntity> join(final Dataset<OaBrokerMainEntity> sources,
final Dataset<T> typedRels) {
final TypedColumn<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator<T>()
.toColumn();
final Dataset<T> typedRels,
final Aggregator<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity, OaBrokerMainEntity> aggr) {
return sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
.agg(aggr.toColumn())
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
}

View File

@ -48,7 +48,7 @@ public class PrepareGroupsJob {
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String groupsPath = workingPath + "/groups";
final String groupsPath = workingPath + "/duplicates";
log.info("groupsPath: {}", groupsPath);
final SparkConf conf = new SparkConf();

View File

@ -1,71 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
public class OaBrokerMainEntityAggregator<T>
extends Aggregator<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
private static final long serialVersionUID = -3687878788861013488L;
@Override
public OaBrokerMainEntity zero() {
return new OaBrokerMainEntity();
}
@Override
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, T> t) {
if (g.getOriginalId() == null) {
return t._1;
} else if (t._2 instanceof RelatedSoftware) {
g.getSoftwares().add(((RelatedSoftware) t._2).getRelSoftware());
} else if (t._2 instanceof RelatedDataset) {
g.getDatasets().add(((RelatedDataset) t._2).getRelDataset());
} else if (t._2 instanceof RelatedPublication) {
g.getPublications().add(((RelatedPublication) t._2).getRelPublication());
} else if (t._2 instanceof RelatedProject) {
g.getProjects().add(((RelatedProject) t._2).getRelProject());
} else {
throw new RuntimeException("Invalid Object: " + t._2.getClass());
}
return g;
}
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (g1.getOriginalId() != null) {
g1.getSoftwares().addAll(g2.getSoftwares());
g1.getDatasets().addAll(g2.getDatasets());
g1.getPublications().addAll(g2.getPublications());
g1.getProjects().addAll(g2.getProjects());
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OaBrokerMainEntity> bufferEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
}

View File

@ -0,0 +1,58 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
public class RelatedDatasetAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
private static final long serialVersionUID = 6969761680131482557L;
@Override
public OaBrokerMainEntity zero() {
return new OaBrokerMainEntity();
}
@Override
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedDataset> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1;
res.getDatasets().add(t._2.getRelDataset());
return res;
}
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOriginalId())) {
g1.getDatasets().addAll(g2.getDatasets());
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OaBrokerMainEntity> bufferEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
}

View File

@ -0,0 +1,58 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
public class RelatedProjectAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
private static final long serialVersionUID = 8559808519152275763L;
@Override
public OaBrokerMainEntity zero() {
return new OaBrokerMainEntity();
}
@Override
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedProject> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1;
res.getProjects().add(t._2.getRelProject());
return res;
}
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOriginalId())) {
g1.getProjects().addAll(g2.getProjects());
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OaBrokerMainEntity> bufferEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
}

View File

@ -0,0 +1,59 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
public class RelatedPublicationAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
private static final long serialVersionUID = 4656934981558135919L;
@Override
public OaBrokerMainEntity zero() {
return new OaBrokerMainEntity();
}
@Override
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
final Tuple2<OaBrokerMainEntity, RelatedPublication> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1;
res.getPublications().add(t._2.getRelPublication());
return res;
}
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOriginalId())) {
g1.getPublications().addAll(g2.getPublications());
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OaBrokerMainEntity> bufferEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
}

View File

@ -0,0 +1,58 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
public class RelatedSoftwareAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
private static final long serialVersionUID = -8987959389106443702L;
@Override
public OaBrokerMainEntity zero() {
return new OaBrokerMainEntity();
}
@Override
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedSoftware> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1;
res.getSoftwares().add(t._2.getRelSoftware());
return res;
}
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOriginalId())) {
g1.getSoftwares().addAll(g2.getSoftwares());
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OaBrokerMainEntity> bufferEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
}