diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index 8a7229b64..8a9009f32 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -3,11 +3,16 @@ package eu.dnetlib.dhp.broker.oa; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.IOException; import java.util.Optional; +import java.util.Properties; import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.TypedColumn; import org.slf4j.Logger; @@ -29,7 +34,7 @@ public class GenerateStatsJob { IOUtils .toString( GenerateStatsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -43,21 +48,50 @@ public class GenerateStatsJob { final String eventsPath = parser.get("workingPath") + "/events"; log.info("eventsPath: {}", eventsPath); - final String statsPath = parser.get("workingPath") + "/stats"; - log.info("stats: {}", statsPath); + final String dbUrl = parser.get("dbUrl"); + log.info("dbUrl: {}", dbUrl); + + final String dbUser = parser.get("dbUser"); + log.info("dbUser: {}", dbUser); + + final String dbPassword = parser.get("dbPassword"); + log.info("dbPassword: {}", "***"); + + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); + log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); final TypedColumn aggr = new StatsAggregator().toColumn(); + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPassword); + runWithSparkSession(conf, isSparkSessionManaged, spark -> { - final Dataset stats = ClusterUtils + ClusterUtils .readPath(spark, eventsPath, Event.class) - .groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING()) + .groupByKey(e -> e.getTopic() + "@@@" + e.getMap().getTargetDatasourceId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(DatasourceStats.class)); + .map(t -> t._2, Encoders.bean(DatasourceStats.class)) + .write() + .jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties); + + log.info("*** updateStats"); + updateStats(brokerApiBaseUrl); + log.info("*** ALL done."); - ClusterUtils.save(stats, statsPath, DatasourceStats.class, null); }); } + private static String updateStats(final String brokerApiBaseUrl) throws IOException { + final String url = brokerApiBaseUrl + "/api/openaireBroker/stats/update"; + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index cb7acb46d..792a2354a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -47,8 +47,9 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString( + IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -116,7 +117,8 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter( + s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -147,15 +149,18 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch(c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java new file mode 100644 index 000000000..0748624f7 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -0,0 +1,113 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.broker.api.ShortEventMessage; +import eu.dnetlib.broker.objects.OaBrokerEventPayload; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import scala.Tuple2; + +public class PartitionEventsByDsIdJob { + + private static final Logger log = LoggerFactory.getLogger(PartitionEventsByDsIdJob.class); + private static final String OPENDOAR_NSPREFIX = "opendoar____::"; + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + PartitionEventsByDsIdJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId"; + log.info("partitionPath: {}", partitionPath); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils + .readPath(spark, eventsPath, Event.class) + .filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) + .filter(e -> e.getMap().getTargetDatasourceId().contains(OPENDOAR_NSPREFIX)) + .map( + e -> new Tuple2<>( + StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX), + messageFromNotification(e)), + Encoders.tuple(Encoders.STRING(), Encoders.bean(ShortEventMessage.class))) + .write() + .partitionBy("_1") + .mode(SaveMode.Overwrite) + .json(partitionPath); + + }); + renameSubDirs(partitionPath); + + } + + private static void renameSubDirs(final String path) throws IOException { + final String prefix = "_1="; + final FileSystem fs = FileSystem.get(new Configuration()); + + log.info("** Renaming subdirs of " + path); + for (final FileStatus fileStatus : fs.listStatus(new Path(path))) { + if (fileStatus.isDirectory()) { + final Path oldPath = fileStatus.getPath(); + final String oldName = oldPath.getName(); + if (oldName.startsWith(prefix)) { + final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, prefix)); + log.info(" * " + oldPath.getName() + " -> " + newPath.getName()); + fs.rename(oldPath, newPath); + } + } + } + } + + private static ShortEventMessage messageFromNotification(final Event e) { + final Gson gson = new Gson(); + + final OaBrokerEventPayload payload = gson.fromJson(e.getPayload(), OaBrokerEventPayload.class); + + final ShortEventMessage res = new ShortEventMessage(); + + res.setOriginalId(payload.getResult().getOriginalId()); + res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null)); + res.setTopic(e.getTopic()); + res.setTrust(payload.getTrust()); + res.generateMessageFromObject(payload.getHighlight()); + + return res; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java index 8b628809d..979bac2da 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java @@ -2,8 +2,6 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.stats; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; public class DatasourceStats implements Serializable { @@ -15,7 +13,8 @@ public class DatasourceStats implements Serializable { private String id; private String name; private String type; - private Map topics = new HashMap<>(); + private String topic; + private long size = 0l; public String getId() { return id; @@ -41,21 +40,24 @@ public class DatasourceStats implements Serializable { this.type = type; } - public Map getTopics() { - return topics; + public String getTopic() { + return topic; } - public void setTopics(final Map topics) { - this.topics = topics; + public void setTopic(final String topic) { + this.topic = topic; } - public void incrementTopic(final String topic, final long inc) { - if (topics.containsKey(topic)) { - topics.put(topic, topics.get(topic) + inc); - } else { - topics.put(topic, inc); - } + public long getSize() { + return size; + } + public void setSize(final long size) { + this.size = size; + } + + public void incrementSize(final long inc) { + this.size = this.size + inc; } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java index 5aa6698e3..240e2d211 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java @@ -25,7 +25,8 @@ public class StatsAggregator extends Aggregator stats0.incrementTopic(e.getKey(), e.getValue())); + stats0.incrementSize(stats1.getSize()); return stats0; } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml new file mode 100644 index 000000000..f629c2101 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -0,0 +1,137 @@ + + + + + graphInputPath + the path where the graph is stored + + + workingPath + the path where the the generated data will be stored + + + datasourceIdWhitelist + - + a white list (comma separeted, - for empty list) of datasource ids + + + datasourceTypeWhitelist + - + a white list (comma separeted, - for empty list) of datasource types + + + datasourceIdBlacklist + - + a black list (comma separeted, - for empty list) of datasource ids + + + esEventIndexName + the elasticsearch index name for events + + + esNotificationsIndexName + the elasticsearch index name for notifications + + + esIndexHost + the elasticsearch host + + + maxIndexedEventsForDsAndTopic + the max number of events for each couple (ds/topic) + + + brokerApiBaseUrl + the url of the broker service api + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + IndexNotificationsOnESJob + eu.dnetlib.dhp.broker.oa.IndexNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --index${esNotificationsIndexName} + --esHost${esIndexHost} + --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index f629c2101..407b9f42f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -44,6 +44,18 @@ brokerApiBaseUrl the url of the broker service api + + brokerDbUrl + the url of the broker database + + + brokerDbUser + the user of the broker database + + + brokerDbPassword + the password of the broker database + sparkDriverMemory memory for driver process @@ -99,23 +111,23 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - IndexNotificationsOnESJob - eu.dnetlib.dhp.broker.oa.IndexNotificationsJob + GenerateStatsJob + eu.dnetlib.dhp.broker.oa.GenerateStatsJob dhp-broker-events-${projectVersion}.jar + --executor-cores=${sparkExecutorCores} --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -123,14 +135,14 @@ --conf spark.sql.shuffle.partitions=3840 --workingPath${workingPath} - --index${esNotificationsIndexName} - --esHost${esIndexHost} + --dbUrl${brokerDbUrl} + --dbUser${brokerDbUser} + --dbPassword${brokerDbPassword} --brokerApiBaseUrl${brokerApiBaseUrl} - diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json new file mode 100644 index 000000000..15d7d251f --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "dburl", + "paramLongName": "dbUrl", + "paramDescription": "the broker database url", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "dbUser", + "paramDescription": "the broker database user", + "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "dbPassword", + "paramDescription": "the broker database password", + "paramRequired": true + }, + { + "paramName": "broker", + "paramLongName": "brokerApiBaseUrl", + "paramDescription": "the url of the broker service api", + "paramRequired": true + } +] diff --git a/pom.xml b/pom.xml index e88e1d51b..52edd497f 100644 --- a/pom.xml +++ b/pom.xml @@ -663,7 +663,7 @@ 3.3.3 3.4.2 [2.12,3.0) - 3.1.0 + 3.1.1 7.5.0 4.7.2 1.1