diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index 8a7229b647..8a9009f324 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -3,11 +3,16 @@ package eu.dnetlib.dhp.broker.oa; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.IOException; import java.util.Optional; +import java.util.Properties; import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.TypedColumn; import org.slf4j.Logger; @@ -29,7 +34,7 @@ public class GenerateStatsJob { IOUtils .toString( GenerateStatsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -43,21 +48,50 @@ public class GenerateStatsJob { final String eventsPath = parser.get("workingPath") + "/events"; log.info("eventsPath: {}", eventsPath); - final String statsPath = parser.get("workingPath") + "/stats"; - log.info("stats: {}", statsPath); + final String dbUrl = parser.get("dbUrl"); + log.info("dbUrl: {}", dbUrl); + + final String dbUser = parser.get("dbUser"); + log.info("dbUser: {}", dbUser); + + final String dbPassword = parser.get("dbPassword"); + log.info("dbPassword: {}", "***"); + + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); + log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); final TypedColumn aggr = new StatsAggregator().toColumn(); + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPassword); + runWithSparkSession(conf, isSparkSessionManaged, spark -> { - final Dataset stats = ClusterUtils + ClusterUtils .readPath(spark, eventsPath, Event.class) - .groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING()) + .groupByKey(e -> e.getTopic() + "@@@" + e.getMap().getTargetDatasourceId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(DatasourceStats.class)); + .map(t -> t._2, Encoders.bean(DatasourceStats.class)) + .write() + .jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties); + + log.info("*** updateStats"); + updateStats(brokerApiBaseUrl); + log.info("*** ALL done."); - ClusterUtils.save(stats, statsPath, DatasourceStats.class, null); }); } + private static String updateStats(final String brokerApiBaseUrl) throws IOException { + final String url = brokerApiBaseUrl + "/api/openaireBroker/stats/update"; + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java index 8b628809db..979bac2da6 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java @@ -2,8 +2,6 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.stats; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; public class DatasourceStats implements Serializable { @@ -15,7 +13,8 @@ public class DatasourceStats implements Serializable { private String id; private String name; private String type; - private Map topics = new HashMap<>(); + private String topic; + private long size = 0l; public String getId() { return id; @@ -41,21 +40,24 @@ public class DatasourceStats implements Serializable { this.type = type; } - public Map getTopics() { - return topics; + public String getTopic() { + return topic; } - public void setTopics(final Map topics) { - this.topics = topics; + public void setTopic(final String topic) { + this.topic = topic; } - public void incrementTopic(final String topic, final long inc) { - if (topics.containsKey(topic)) { - topics.put(topic, topics.get(topic) + inc); - } else { - topics.put(topic, inc); - } + public long getSize() { + return size; + } + public void setSize(final long size) { + this.size = size; + } + + public void incrementSize(final long inc) { + this.size = this.size + inc; } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java index 5aa6698e39..240e2d2112 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java @@ -25,7 +25,8 @@ public class StatsAggregator extends Aggregator stats0.incrementTopic(e.getKey(), e.getValue())); + stats0.incrementSize(stats1.getSize()); return stats0; } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index a9741a3074..407b9f42f7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -44,6 +44,18 @@ brokerApiBaseUrl the url of the broker service api + + brokerDbUrl + the url of the broker database + + + brokerDbUser + the user of the broker database + + + brokerDbPassword + the password of the broker database + sparkDriverMemory memory for driver process @@ -99,18 +111,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - PartitionEventsByDsIdJob - eu.dnetlib.dhp.broker.oa.PartitionEventsByDsIdJob + GenerateStatsJob + eu.dnetlib.dhp.broker.oa.GenerateStatsJob dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -122,8 +134,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --graphPath${graphInputPath} --workingPath${workingPath} + --dbUrl${brokerDbUrl} + --dbUser${brokerDbUser} + --dbPassword${brokerDbPassword} + --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json new file mode 100644 index 0000000000..15d7d251f6 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "dburl", + "paramLongName": "dbUrl", + "paramDescription": "the broker database url", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "dbUser", + "paramDescription": "the broker database user", + "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "dbPassword", + "paramDescription": "the broker database password", + "paramRequired": true + }, + { + "paramName": "broker", + "paramLongName": "brokerApiBaseUrl", + "paramDescription": "the url of the broker service api", + "paramRequired": true + } +]