remove old notifications

This commit is contained in:
Michele Artini 2020-09-09 15:32:06 +02:00
parent 9cfc124ac5
commit 028613b751
1 changed files with 25 additions and 13 deletions

View File

@ -11,6 +11,7 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
@ -45,9 +46,8 @@ public class IndexNotificationsJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
IndexNotificationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json")));
.toString(IndexNotificationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json")));
parser.parseArgument(args);
final SparkConf conf = new SparkConf();
@ -68,7 +68,7 @@ public class IndexNotificationsJob {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed");
final long now = new Date().getTime();
final long startTime = new Date().getTime();
final List<Subscription> subscriptions = listSubscriptions(brokerApiBaseUrl);
@ -77,7 +77,7 @@ public class IndexNotificationsJob {
if (subscriptions.size() > 0) {
final Dataset<Notification> notifications = ClusterUtils
.readPath(spark, eventsPath, Event.class)
.map(e -> generateNotifications(e, subscriptions, now), Encoders.bean(NotificationGroup.class))
.map(e -> generateNotifications(e, subscriptions, startTime), Encoders.bean(NotificationGroup.class))
.flatMap(g -> g.getData().iterator(), Encoders.bean(Notification.class));
final JavaRDD<String> inputRdd = notifications
@ -98,6 +98,10 @@ public class IndexNotificationsJob {
log.info("*** Start indexing");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
log.info("*** End indexing");
log.info("*** Deleting old notifications");
final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000);
log.info("*** Deleted notifications: " + message);
}
}
@ -106,8 +110,7 @@ public class IndexNotificationsJob {
final long date) {
final List<Notification> list = subscriptions
.stream()
.filter(
s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic()))
.filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic()))
.filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap()))
.map(s -> generateNotification(s, e, date))
.collect(Collectors.toList());
@ -138,18 +141,15 @@ public class IndexNotificationsJob {
if (conditions.containsKey("trust")
&& !SubscriptionUtils
.verifyFloatRange(
map.getTrust(), conditions.get("trust").get(0).getValue(),
conditions.get("trust").get(0).getOtherValue())) {
.verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) {
return false;
}
if (conditions.containsKey("targetDateofacceptance") && !conditions
.get("targetDateofacceptance")
.stream()
.anyMatch(
c -> SubscriptionUtils
.verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) {
.anyMatch(c -> SubscriptionUtils
.verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) {
return false;
}
@ -197,6 +197,18 @@ public class IndexNotificationsJob {
}
private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception {
final String url = brokerApiBaseUrl + "/api/notifications/byDate/0/" + l;
final HttpDelete req = new HttpDelete(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
return IOUtils.toString(response.getEntity().getContent());
}
}
}
private static String prepareForIndexing(final Notification n, final LongAccumulator acc)
throws JsonProcessingException {
acc.add(1);