[broker] added coalesce(1) on the stats dataset before storing it on postgres

This commit is contained in:
Claudio Atzori 2021-07-09 15:47:06 +02:00
parent c64c0a0743
commit ef612105ba
1 changed files with 9 additions and 2 deletions

View File

@ -13,6 +13,7 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn;
@ -24,6 +25,7 @@ import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.StatsAggregator;
import scala.Tuple2;
public class GenerateStatsJob {
@ -71,9 +73,14 @@ public class GenerateStatsJob {
ClusterUtils
.readPath(spark, eventsPath, Event.class)
.groupByKey(e -> e.getTopic() + "@@@" + e.getMap().getTargetDatasourceId(), Encoders.STRING())
.groupByKey(
(MapFunction<Event, String>) e -> e.getTopic() + "@@@" + e.getMap().getTargetDatasourceId(),
Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(DatasourceStats.class))
.map(
(MapFunction<Tuple2<String, DatasourceStats>, DatasourceStats>) t -> t._2,
Encoders.bean(DatasourceStats.class))
.coalesce(1)
.write()
.mode(SaveMode.Overwrite)
.jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties);