diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 080350c1cf..b890ed3287 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -11,6 +11,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; @@ -45,9 +46,8 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString(IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -68,7 +68,7 @@ public class IndexNotificationsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed"); - final long now = new Date().getTime(); + final long startTime = new Date().getTime(); final List subscriptions = listSubscriptions(brokerApiBaseUrl); @@ -77,7 +77,7 @@ public class IndexNotificationsJob { if (subscriptions.size() > 0) { final Dataset notifications = ClusterUtils .readPath(spark, eventsPath, Event.class) - .map(e -> generateNotifications(e, subscriptions, now), Encoders.bean(NotificationGroup.class)) + .map(e -> generateNotifications(e, subscriptions, startTime), Encoders.bean(NotificationGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Notification.class)); final JavaRDD inputRdd = notifications @@ -98,6 +98,10 @@ public class IndexNotificationsJob { log.info("*** Start indexing"); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); log.info("*** End indexing"); + + log.info("*** Deleting old notifications"); + final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000); + log.info("*** Deleted notifications: " + message); } } @@ -106,8 +110,7 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter( - s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -138,18 +141,15 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange( - map.getTrust(), conditions.get("trust").get(0).getValue(), - conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch( - c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch(c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } @@ -197,6 +197,18 @@ public class IndexNotificationsJob { } + private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception { + final String url = brokerApiBaseUrl + "/api/notifications/byDate/0/" + l; + final HttpDelete req = new HttpDelete(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + + } + private static String prepareForIndexing(final Notification n, final LongAccumulator acc) throws JsonProcessingException { acc.add(1);