From 01d1f0fc8749b84850cc7c0da37b86ffdf44c346 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 11 Sep 2020 12:05:11 +0200 Subject: [PATCH] mail dispatchers --- .../broker/controllers/AjaxController.java | 2 +- .../controllers/objects/DispatcherStatus.java | 18 +++++- .../AbstractNotificationDispatcher.java | 33 +++++++++- .../events/output/DispatcherManager.java | 14 +++++ .../broker/events/output/EmailDispatcher.java | 62 +++++++++++++------ .../broker/events/output/MockDispatcher.java | 12 ++++ .../events/output/NotificationDispatcher.java | 8 +++ .../openaire/OpenaireBrokerController.java | 29 +++++++++ .../src/main/resources/application.properties | 3 +- .../main/resources/static/html/summary.html | 2 + .../resources/templates/dhp_openaire_mail.st | 17 +++++ .../controllers/OpenairePublicController.java | 6 +- .../elasticsearch/NotificationRepository.java | 2 + 13 files changed, 178 insertions(+), 30 deletions(-) create mode 100644 apps/dhp-broker-application/src/main/resources/templates/dhp_openaire_mail.st diff --git a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/controllers/AjaxController.java b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/controllers/AjaxController.java index 173b25f4..a566a741 100644 --- a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/controllers/AjaxController.java +++ b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/controllers/AjaxController.java @@ -77,7 +77,7 @@ public class AjaxController extends AbstractLbsController { final List dispatchers = dispatcherManager.getDispatchers() .stream() - .map(d -> new DispatcherStatus(d.getDispatcherName(), d.count())) + .map(d -> new DispatcherStatus(d.getDispatcherName(), d.count(), d.countErrors(), d.lastError())) .sorted() .collect(Collectors.toList()); diff --git a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/controllers/objects/DispatcherStatus.java b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/controllers/objects/DispatcherStatus.java index 2af18580..94622816 100644 --- a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/controllers/objects/DispatcherStatus.java +++ b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/controllers/objects/DispatcherStatus.java @@ -4,18 +4,30 @@ public class DispatcherStatus implements Comparable { private final String name; private final long count; + private final long countErrors; + private final String lastError; - public DispatcherStatus(final String name, final long count) { + public DispatcherStatus(final String name, final long count, final long countErrors, final String lastError) { this.name = name; this.count = count; + this.countErrors = countErrors; + this.lastError = lastError; } public String getName() { - return this.name; + return name; } public long getCount() { - return this.count; + return count; + } + + public long getCountErrors() { + return countErrors; + } + + public String getLastError() { + return lastError; } @Override diff --git a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/AbstractNotificationDispatcher.java b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/AbstractNotificationDispatcher.java index edaab3fb..d9fe24d5 100644 --- a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/AbstractNotificationDispatcher.java +++ b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/AbstractNotificationDispatcher.java @@ -1,5 +1,6 @@ package eu.dnetlib.broker.events.output; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.PostConstruct; @@ -10,6 +11,8 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.annotation.Autowired; +import com.google.common.base.Throwables; + import eu.dnetlib.broker.common.elasticsearch.Event; import eu.dnetlib.broker.common.subscriptions.Subscription; import eu.dnetlib.broker.utils.LbsQueue; @@ -30,6 +33,10 @@ public abstract class AbstractNotificationDispatcher implements NotificationD private final AtomicLong count = new AtomicLong(0); + private final AtomicLong countErrors = new AtomicLong(0); + + private String lastError = ""; + private static final Log log = LogFactory.getLog(AbstractNotificationDispatcher.class); @PostConstruct @@ -48,7 +55,8 @@ public abstract class AbstractNotificationDispatcher implements NotificationD this.count.incrementAndGet(); } catch (final Throwable e) { log.error("Error sending notification", e); - this.queue.offer(message); + this.countErrors.incrementAndGet(); + this.lastError = e.getMessage() + "\nStacktrave:\n" + Throwables.getStackTraceAsString(e); } } } @@ -64,6 +72,17 @@ public abstract class AbstractNotificationDispatcher implements NotificationD } } + @Override + public void sendNotification(final Subscription subscription, final Map params) { + try { + this.queue.offer(prepareAction(subscription, params)); + } catch (final Exception e) { + log.error("Error sending notification", e); + } + } + + abstract protected T prepareAction(final Subscription subscription, final Map params) throws Exception; + abstract protected T prepareAction(final Subscription subscription, final Event... events) throws Exception; abstract protected void performAction(final T message) throws Exception; @@ -82,9 +101,21 @@ public abstract class AbstractNotificationDispatcher implements NotificationD return this.count.get(); } + @Override + public long countErrors() { + return this.countErrors.get(); + } + + @Override + public String lastError() { + return lastError; + } + @Override public void resetCount() { this.count.set(0); + this.countErrors.set(0); + this.lastError = ""; } @Override diff --git a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/DispatcherManager.java b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/DispatcherManager.java index 09b5ca0c..b75d3369 100644 --- a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/DispatcherManager.java +++ b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/DispatcherManager.java @@ -2,6 +2,7 @@ package eu.dnetlib.broker.events.output; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.commons.logging.Log; @@ -33,6 +34,19 @@ public class DispatcherManager { } } + public void sendNotification(final Subscription s, final Map params) { + final Optional dispatcher = this.dispatchers + .stream() + .filter(d -> d.getMode() == s.getMode()) + .findFirst(); + + if (dispatcher.isPresent()) { + dispatcher.get().sendNotification(s, params); + } else { + log.error("Notification dispatcher not found, mode=" + s.getMode()); + } + } + public List getDispatchers() { return this.dispatchers; } diff --git a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/EmailDispatcher.java b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/EmailDispatcher.java index b73102f2..70892317 100644 --- a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/EmailDispatcher.java +++ b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/EmailDispatcher.java @@ -1,17 +1,20 @@ package eu.dnetlib.broker.events.output; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Date; +import java.util.Map; import java.util.Properties; -import java.util.stream.Collectors; import javax.mail.Authenticator; import javax.mail.Message; +import javax.mail.MessagingException; import javax.mail.PasswordAuthentication; import javax.mail.Session; import javax.mail.Transport; +import javax.mail.internet.AddressException; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; @@ -54,17 +57,51 @@ public class EmailDispatcher extends AbstractNotificationDispatcher { } if (events.length == 0) { log.warn("Event list is empty"); - throw new IllegalArgumentException("Event list is empty"); } - final String topics = Arrays.stream(events).map(e -> e.getTopic()).distinct().collect(Collectors.joining(", ")); + final String content = generateMailContent(subscription, null, events); + return prepareMimeMessage(subscription, content); + } + + @Override + protected Message prepareAction(final Subscription subscription, final Map params) throws Exception { + if (subscription == null || StringUtils.isBlank(subscription.getSubscriber())) { + log.warn("Invalid subscription"); + throw new IllegalArgumentException("Invalid subscription"); + } + final String content = generateMailContent(subscription, params); + + return prepareMimeMessage(subscription, content); + } + + private String generateMailContent(final Subscription subscription, final Map params, final Event... events) throws IOException { + final StringTemplate st = new StringTemplate(IOUtils.toString(emailTemplate.getInputStream(), StandardCharsets.UTF_8)); + st.setAttribute("sub", subscription); + + st.setAttribute("total", events.length); + st.setAttribute("max", MAX_NUMBER_OF_EVENTS); + + if (events.length > MAX_NUMBER_OF_EVENTS) { + st.setAttribute("events", Arrays.copyOfRange(events, 0, MAX_NUMBER_OF_EVENTS)); + } else { + st.setAttribute("events", events); + } + if (params != null) { + params.entrySet().forEach(e -> st.setAttribute(e.getKey(), e.getValue())); + } + + return st.toString(); + } + + private Message prepareMimeMessage(final Subscription subscription, final String content) + throws MessagingException, UnsupportedEncodingException, AddressException { final Session session = Session.getInstance(obtainProperties(), obtainAuthenticator()); final MimeMessage mimeMessage = new MimeMessage(session); mimeMessage.setFrom(new InternetAddress(props.getFrom(), props.getFromName())); - mimeMessage.setSubject("Notification for topic(s): " + topics); - mimeMessage.setContent(generateMailContent(subscription, events), "text/html; charset=utf-8"); + mimeMessage.setSubject("Notification for topic: " + subscription.getTopic()); + mimeMessage.setContent(content, "text/html; charset=utf-8"); mimeMessage.setSentDate(new Date()); mimeMessage.addRecipient(Message.RecipientType.TO, new InternetAddress(subscription.getSubscriber())); @@ -80,21 +117,6 @@ public class EmailDispatcher extends AbstractNotificationDispatcher { return mimeMessage; } - private String generateMailContent(final Subscription subscription, final Event... events) throws IOException { - final StringTemplate st = new StringTemplate(IOUtils.toString(emailTemplate.getInputStream(), StandardCharsets.UTF_8)); - st.setAttribute("sub", subscription); - - st.setAttribute("total", events.length); - st.setAttribute("max", MAX_NUMBER_OF_EVENTS); - - if (events.length > MAX_NUMBER_OF_EVENTS) { - st.setAttribute("events", Arrays.copyOfRange(events, 0, MAX_NUMBER_OF_EVENTS)); - } else { - st.setAttribute("events", events); - } - return st.toString(); - } - @Override protected void performAction(final Message message) throws Exception { log.info("Sending mail to " + Arrays.toString(message.getAllRecipients()) + "..."); diff --git a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/MockDispatcher.java b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/MockDispatcher.java index fb1c95a1..8e4bbe18 100644 --- a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/MockDispatcher.java +++ b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/MockDispatcher.java @@ -1,6 +1,7 @@ package eu.dnetlib.broker.events.output; import java.io.StringWriter; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,6 +29,17 @@ public class MockDispatcher extends AbstractNotificationDispatcher { return sw.toString(); } + @Override + protected String prepareAction(final Subscription subscription, final Map params) throws Exception { + final StringWriter sw = new StringWriter(); + sw.write("\n********************************"); + sw.write("\n* New notification"); + sw.write("\n* - Subscription : " + subscription); + params.entrySet().forEach(e -> sw.write("\n* - " + e.getKey() + ": " + e.getValue())); + sw.write("\n********************************"); + return sw.toString(); + } + @Override protected void performAction(final String message) throws Exception { log.info(message); diff --git a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/NotificationDispatcher.java b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/NotificationDispatcher.java index f2ec37ea..960a2c67 100644 --- a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/NotificationDispatcher.java +++ b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/NotificationDispatcher.java @@ -1,5 +1,7 @@ package eu.dnetlib.broker.events.output; +import java.util.Map; + import eu.dnetlib.broker.common.elasticsearch.Event; import eu.dnetlib.broker.common.subscriptions.NotificationMode; import eu.dnetlib.broker.common.subscriptions.Subscription; @@ -15,4 +17,10 @@ public interface NotificationDispatcher { long count(); NotificationMode getMode(); + + void sendNotification(Subscription subscription, Map params); + + long countErrors(); + + String lastError(); } diff --git a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/openaire/OpenaireBrokerController.java b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/openaire/OpenaireBrokerController.java index 1fd7e1ab..23c5907b 100644 --- a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/openaire/OpenaireBrokerController.java +++ b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/openaire/OpenaireBrokerController.java @@ -1,6 +1,9 @@ package eu.dnetlib.broker.openaire; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,6 +48,7 @@ import eu.dnetlib.broker.common.properties.ElasticSearchProperties; import eu.dnetlib.broker.common.subscriptions.MapCondition; import eu.dnetlib.broker.common.subscriptions.Subscription; import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository; +import eu.dnetlib.broker.events.output.DispatcherManager; import eu.dnetlib.broker.objects.OaBrokerEventPayload; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -67,6 +71,9 @@ public class OpenaireBrokerController extends AbstractLbsController { @Autowired private ElasticSearchProperties props; + @Autowired + private DispatcherManager dispatcher; + private static final Log log = LogFactory.getLog(OpenaireBrokerController.class); @ApiOperation("Return the datasources having events") @@ -222,6 +229,28 @@ public class OpenaireBrokerController extends AbstractLbsController { } + @ApiOperation("Send notifications") + @GetMapping("/notifications/send/{date}") + private List sendMailForNotifications(@PathVariable final long date) { + new Thread(() -> innerSendMailForNotifications(date)).start(); + return Arrays.asList("Sending ..."); + } + + private void innerSendMailForNotifications(final long date) { + for (final Subscription s : subscriptionRepo.findAll()) { + + final long count = notificationRepository.countBySubscriptionIdAndDateAfter(s.getSubscriptionId(), date); + if (count > 0) { + final Map params = new HashMap<>(); + params.put("oa_notifications_total", count); + dispatcher.sendNotification(s, params); + } + + s.setLastNotificationDate(new Date()); + subscriptionRepo.save(s); + } + } + private SimpleSubscriptionDesc subscriptionDesc(final Subscription s) { return new SimpleSubscriptionDesc(s.getSubscriptionId(), extractDatasource(s), s.getTopic(), s.getCreationDate(), s.getLastNotificationDate(), OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId())); diff --git a/apps/dhp-broker-application/src/main/resources/application.properties b/apps/dhp-broker-application/src/main/resources/application.properties index e7fc79c4..f485cfda 100644 --- a/apps/dhp-broker-application/src/main/resources/application.properties +++ b/apps/dhp-broker-application/src/main/resources/application.properties @@ -39,8 +39,7 @@ lbs.mail.smtpHost = smtp.isti.cnr.it lbs.mail.smtpPort = 587 lbs.mail.smtpUser = smtp-dnet lbs.mail.smtpPassword = hhr*7932 -lbs.mail.message.template = classpath:/templates/openaire_mail.st - +lbs.mail.message.template = classpath:/templates/dhp_openaire_mail.st lbs.queues.maxReturnedValues = 1000 diff --git a/apps/dhp-broker-application/src/main/resources/static/html/summary.html b/apps/dhp-broker-application/src/main/resources/static/html/summary.html index ef9d3914..b2e9dc60 100644 --- a/apps/dhp-broker-application/src/main/resources/static/html/summary.html +++ b/apps/dhp-broker-application/src/main/resources/static/html/summary.html @@ -83,12 +83,14 @@ Sent messages + Errors {{b.name}} {{b.count}} + {{b.countErrors}} diff --git a/apps/dhp-broker-application/src/main/resources/templates/dhp_openaire_mail.st b/apps/dhp-broker-application/src/main/resources/templates/dhp_openaire_mail.st new file mode 100644 index 00000000..d500b195 --- /dev/null +++ b/apps/dhp-broker-application/src/main/resources/templates/dhp_openaire_mail.st @@ -0,0 +1,17 @@ +You have received this mail because you are owner of the following subscription: + +
    +
  • ID: $sub.subscriptionId$
  • +
  • Topic: $sub.topic$
  • +
  • Number of events: $oa_notifications_total$
  • +
+ +
+ +

+You can access the notified events at this address: ....
+or using the Public Broker Service API. +

+ +
+This email message was auto-generated. Please do not respond. diff --git a/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java b/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java index 1f9662c9..d14bcc43 100644 --- a/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java +++ b/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java @@ -154,7 +154,7 @@ public class OpenairePublicController extends AbstractLbsController { boolean first = true; - IOUtils.write("[", gzOut); + IOUtils.write("[\n", gzOut); ScrollPage page = null; @@ -165,13 +165,13 @@ public class OpenairePublicController extends AbstractLbsController { if (first) { first = false; } else { - IOUtils.write(", ", gzOut); + IOUtils.write(",\n", gzOut); } IOUtils.write(gson.toJson(msg), gzOut); } } while (!page.isCompleted()); - IOUtils.write("]", gzOut); + IOUtils.write("\n]\n", gzOut); gzOut.flush(); diff --git a/libs/dnet-broker-apps-common/src/main/java/eu/dnetlib/broker/common/elasticsearch/NotificationRepository.java b/libs/dnet-broker-apps-common/src/main/java/eu/dnetlib/broker/common/elasticsearch/NotificationRepository.java index 758b39f6..dcb1f270 100644 --- a/libs/dnet-broker-apps-common/src/main/java/eu/dnetlib/broker/common/elasticsearch/NotificationRepository.java +++ b/libs/dnet-broker-apps-common/src/main/java/eu/dnetlib/broker/common/elasticsearch/NotificationRepository.java @@ -19,6 +19,8 @@ public interface NotificationRepository extends ElasticsearchRepository