diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index 2772f8fd1..9927d6560 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -13,6 +13,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; @@ -24,6 +25,7 @@ import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats; import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.StatsAggregator; +import scala.Tuple2; public class GenerateStatsJob { @@ -71,9 +73,14 @@ public class GenerateStatsJob { ClusterUtils .readPath(spark, eventsPath, Event.class) - .groupByKey(e -> e.getTopic() + "@@@" + e.getMap().getTargetDatasourceId(), Encoders.STRING()) + .groupByKey( + (MapFunction) e -> e.getTopic() + "@@@" + e.getMap().getTargetDatasourceId(), + Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(DatasourceStats.class)) + .map( + (MapFunction, DatasourceStats>) t -> t._2, + Encoders.bean(DatasourceStats.class)) + .coalesce(1) .write() .mode(SaveMode.Overwrite) .jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties);