mail dispatchers

This commit is contained in:
Michele Artini 2020-09-11 12:05:11 +02:00
parent f3da2a7dae
commit 01d1f0fc87
13 changed files with 178 additions and 30 deletions

View File

@ -77,7 +77,7 @@ public class AjaxController extends AbstractLbsController {
final List<DispatcherStatus> dispatchers = dispatcherManager.getDispatchers() final List<DispatcherStatus> dispatchers = dispatcherManager.getDispatchers()
.stream() .stream()
.map(d -> new DispatcherStatus(d.getDispatcherName(), d.count())) .map(d -> new DispatcherStatus(d.getDispatcherName(), d.count(), d.countErrors(), d.lastError()))
.sorted() .sorted()
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@ -4,18 +4,30 @@ public class DispatcherStatus implements Comparable<DispatcherStatus> {
private final String name; private final String name;
private final long count; private final long count;
private final long countErrors;
private final String lastError;
public DispatcherStatus(final String name, final long count) { public DispatcherStatus(final String name, final long count, final long countErrors, final String lastError) {
this.name = name; this.name = name;
this.count = count; this.count = count;
this.countErrors = countErrors;
this.lastError = lastError;
} }
public String getName() { public String getName() {
return this.name; return name;
} }
public long getCount() { public long getCount() {
return this.count; return count;
}
public long getCountErrors() {
return countErrors;
}
public String getLastError() {
return lastError;
} }
@Override @Override

View File

@ -1,5 +1,6 @@
package eu.dnetlib.broker.events.output; package eu.dnetlib.broker.events.output;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -10,6 +11,8 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.base.Throwables;
import eu.dnetlib.broker.common.elasticsearch.Event; import eu.dnetlib.broker.common.elasticsearch.Event;
import eu.dnetlib.broker.common.subscriptions.Subscription; import eu.dnetlib.broker.common.subscriptions.Subscription;
import eu.dnetlib.broker.utils.LbsQueue; import eu.dnetlib.broker.utils.LbsQueue;
@ -30,6 +33,10 @@ public abstract class AbstractNotificationDispatcher<T> implements NotificationD
private final AtomicLong count = new AtomicLong(0); private final AtomicLong count = new AtomicLong(0);
private final AtomicLong countErrors = new AtomicLong(0);
private String lastError = "";
private static final Log log = LogFactory.getLog(AbstractNotificationDispatcher.class); private static final Log log = LogFactory.getLog(AbstractNotificationDispatcher.class);
@PostConstruct @PostConstruct
@ -48,7 +55,8 @@ public abstract class AbstractNotificationDispatcher<T> implements NotificationD
this.count.incrementAndGet(); this.count.incrementAndGet();
} catch (final Throwable e) { } catch (final Throwable e) {
log.error("Error sending notification", e); log.error("Error sending notification", e);
this.queue.offer(message); this.countErrors.incrementAndGet();
this.lastError = e.getMessage() + "\nStacktrave:\n" + Throwables.getStackTraceAsString(e);
} }
} }
} }
@ -64,6 +72,17 @@ public abstract class AbstractNotificationDispatcher<T> implements NotificationD
} }
} }
@Override
public void sendNotification(final Subscription subscription, final Map<String, Object> params) {
try {
this.queue.offer(prepareAction(subscription, params));
} catch (final Exception e) {
log.error("Error sending notification", e);
}
}
abstract protected T prepareAction(final Subscription subscription, final Map<String, Object> params) throws Exception;
abstract protected T prepareAction(final Subscription subscription, final Event... events) throws Exception; abstract protected T prepareAction(final Subscription subscription, final Event... events) throws Exception;
abstract protected void performAction(final T message) throws Exception; abstract protected void performAction(final T message) throws Exception;
@ -82,9 +101,21 @@ public abstract class AbstractNotificationDispatcher<T> implements NotificationD
return this.count.get(); return this.count.get();
} }
@Override
public long countErrors() {
return this.countErrors.get();
}
@Override
public String lastError() {
return lastError;
}
@Override @Override
public void resetCount() { public void resetCount() {
this.count.set(0); this.count.set(0);
this.countErrors.set(0);
this.lastError = "";
} }
@Override @Override

View File

@ -2,6 +2,7 @@ package eu.dnetlib.broker.events.output;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -33,6 +34,19 @@ public class DispatcherManager {
} }
} }
public void sendNotification(final Subscription s, final Map<String, Object> params) {
final Optional<NotificationDispatcher> dispatcher = this.dispatchers
.stream()
.filter(d -> d.getMode() == s.getMode())
.findFirst();
if (dispatcher.isPresent()) {
dispatcher.get().sendNotification(s, params);
} else {
log.error("Notification dispatcher not found, mode=" + s.getMode());
}
}
public List<NotificationDispatcher> getDispatchers() { public List<NotificationDispatcher> getDispatchers() {
return this.dispatchers; return this.dispatchers;
} }

View File

@ -1,17 +1,20 @@
package eu.dnetlib.broker.events.output; package eu.dnetlib.broker.events.output;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.stream.Collectors;
import javax.mail.Authenticator; import javax.mail.Authenticator;
import javax.mail.Message; import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication; import javax.mail.PasswordAuthentication;
import javax.mail.Session; import javax.mail.Session;
import javax.mail.Transport; import javax.mail.Transport;
import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress; import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage; import javax.mail.internet.MimeMessage;
@ -54,17 +57,51 @@ public class EmailDispatcher extends AbstractNotificationDispatcher<Message> {
} }
if (events.length == 0) { if (events.length == 0) {
log.warn("Event list is empty"); log.warn("Event list is empty");
throw new IllegalArgumentException("Event list is empty");
} }
final String topics = Arrays.stream(events).map(e -> e.getTopic()).distinct().collect(Collectors.joining(", ")); final String content = generateMailContent(subscription, null, events);
return prepareMimeMessage(subscription, content);
}
@Override
protected Message prepareAction(final Subscription subscription, final Map<String, Object> params) throws Exception {
if (subscription == null || StringUtils.isBlank(subscription.getSubscriber())) {
log.warn("Invalid subscription");
throw new IllegalArgumentException("Invalid subscription");
}
final String content = generateMailContent(subscription, params);
return prepareMimeMessage(subscription, content);
}
private String generateMailContent(final Subscription subscription, final Map<String, Object> params, final Event... events) throws IOException {
final StringTemplate st = new StringTemplate(IOUtils.toString(emailTemplate.getInputStream(), StandardCharsets.UTF_8));
st.setAttribute("sub", subscription);
st.setAttribute("total", events.length);
st.setAttribute("max", MAX_NUMBER_OF_EVENTS);
if (events.length > MAX_NUMBER_OF_EVENTS) {
st.setAttribute("events", Arrays.copyOfRange(events, 0, MAX_NUMBER_OF_EVENTS));
} else {
st.setAttribute("events", events);
}
if (params != null) {
params.entrySet().forEach(e -> st.setAttribute(e.getKey(), e.getValue()));
}
return st.toString();
}
private Message prepareMimeMessage(final Subscription subscription, final String content)
throws MessagingException, UnsupportedEncodingException, AddressException {
final Session session = Session.getInstance(obtainProperties(), obtainAuthenticator()); final Session session = Session.getInstance(obtainProperties(), obtainAuthenticator());
final MimeMessage mimeMessage = new MimeMessage(session); final MimeMessage mimeMessage = new MimeMessage(session);
mimeMessage.setFrom(new InternetAddress(props.getFrom(), props.getFromName())); mimeMessage.setFrom(new InternetAddress(props.getFrom(), props.getFromName()));
mimeMessage.setSubject("Notification for topic(s): " + topics); mimeMessage.setSubject("Notification for topic: " + subscription.getTopic());
mimeMessage.setContent(generateMailContent(subscription, events), "text/html; charset=utf-8"); mimeMessage.setContent(content, "text/html; charset=utf-8");
mimeMessage.setSentDate(new Date()); mimeMessage.setSentDate(new Date());
mimeMessage.addRecipient(Message.RecipientType.TO, new InternetAddress(subscription.getSubscriber())); mimeMessage.addRecipient(Message.RecipientType.TO, new InternetAddress(subscription.getSubscriber()));
@ -80,21 +117,6 @@ public class EmailDispatcher extends AbstractNotificationDispatcher<Message> {
return mimeMessage; return mimeMessage;
} }
private String generateMailContent(final Subscription subscription, final Event... events) throws IOException {
final StringTemplate st = new StringTemplate(IOUtils.toString(emailTemplate.getInputStream(), StandardCharsets.UTF_8));
st.setAttribute("sub", subscription);
st.setAttribute("total", events.length);
st.setAttribute("max", MAX_NUMBER_OF_EVENTS);
if (events.length > MAX_NUMBER_OF_EVENTS) {
st.setAttribute("events", Arrays.copyOfRange(events, 0, MAX_NUMBER_OF_EVENTS));
} else {
st.setAttribute("events", events);
}
return st.toString();
}
@Override @Override
protected void performAction(final Message message) throws Exception { protected void performAction(final Message message) throws Exception {
log.info("Sending mail to " + Arrays.toString(message.getAllRecipients()) + "..."); log.info("Sending mail to " + Arrays.toString(message.getAllRecipients()) + "...");

View File

@ -1,6 +1,7 @@
package eu.dnetlib.broker.events.output; package eu.dnetlib.broker.events.output;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -28,6 +29,17 @@ public class MockDispatcher extends AbstractNotificationDispatcher<String> {
return sw.toString(); return sw.toString();
} }
@Override
protected String prepareAction(final Subscription subscription, final Map<String, Object> params) throws Exception {
final StringWriter sw = new StringWriter();
sw.write("\n********************************");
sw.write("\n* New notification");
sw.write("\n* - Subscription : " + subscription);
params.entrySet().forEach(e -> sw.write("\n* - " + e.getKey() + ": " + e.getValue()));
sw.write("\n********************************");
return sw.toString();
}
@Override @Override
protected void performAction(final String message) throws Exception { protected void performAction(final String message) throws Exception {
log.info(message); log.info(message);

View File

@ -1,5 +1,7 @@
package eu.dnetlib.broker.events.output; package eu.dnetlib.broker.events.output;
import java.util.Map;
import eu.dnetlib.broker.common.elasticsearch.Event; import eu.dnetlib.broker.common.elasticsearch.Event;
import eu.dnetlib.broker.common.subscriptions.NotificationMode; import eu.dnetlib.broker.common.subscriptions.NotificationMode;
import eu.dnetlib.broker.common.subscriptions.Subscription; import eu.dnetlib.broker.common.subscriptions.Subscription;
@ -15,4 +17,10 @@ public interface NotificationDispatcher {
long count(); long count();
NotificationMode getMode(); NotificationMode getMode();
void sendNotification(Subscription subscription, Map<String, Object> params);
long countErrors();
String lastError();
} }

View File

@ -1,6 +1,9 @@
package eu.dnetlib.broker.openaire; package eu.dnetlib.broker.openaire;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -45,6 +48,7 @@ import eu.dnetlib.broker.common.properties.ElasticSearchProperties;
import eu.dnetlib.broker.common.subscriptions.MapCondition; import eu.dnetlib.broker.common.subscriptions.MapCondition;
import eu.dnetlib.broker.common.subscriptions.Subscription; import eu.dnetlib.broker.common.subscriptions.Subscription;
import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository; import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository;
import eu.dnetlib.broker.events.output.DispatcherManager;
import eu.dnetlib.broker.objects.OaBrokerEventPayload; import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
@ -67,6 +71,9 @@ public class OpenaireBrokerController extends AbstractLbsController {
@Autowired @Autowired
private ElasticSearchProperties props; private ElasticSearchProperties props;
@Autowired
private DispatcherManager dispatcher;
private static final Log log = LogFactory.getLog(OpenaireBrokerController.class); private static final Log log = LogFactory.getLog(OpenaireBrokerController.class);
@ApiOperation("Return the datasources having events") @ApiOperation("Return the datasources having events")
@ -222,6 +229,28 @@ public class OpenaireBrokerController extends AbstractLbsController {
} }
@ApiOperation("Send notifications")
@GetMapping("/notifications/send/{date}")
private List<String> sendMailForNotifications(@PathVariable final long date) {
new Thread(() -> innerSendMailForNotifications(date)).start();
return Arrays.asList("Sending ...");
}
private void innerSendMailForNotifications(final long date) {
for (final Subscription s : subscriptionRepo.findAll()) {
final long count = notificationRepository.countBySubscriptionIdAndDateAfter(s.getSubscriptionId(), date);
if (count > 0) {
final Map<String, Object> params = new HashMap<>();
params.put("oa_notifications_total", count);
dispatcher.sendNotification(s, params);
}
s.setLastNotificationDate(new Date());
subscriptionRepo.save(s);
}
}
private SimpleSubscriptionDesc subscriptionDesc(final Subscription s) { private SimpleSubscriptionDesc subscriptionDesc(final Subscription s) {
return new SimpleSubscriptionDesc(s.getSubscriptionId(), extractDatasource(s), s.getTopic(), s.getCreationDate(), s.getLastNotificationDate(), return new SimpleSubscriptionDesc(s.getSubscriptionId(), extractDatasource(s), s.getTopic(), s.getCreationDate(), s.getLastNotificationDate(),
OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId())); OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId()));

View File

@ -39,8 +39,7 @@ lbs.mail.smtpHost = smtp.isti.cnr.it
lbs.mail.smtpPort = 587 lbs.mail.smtpPort = 587
lbs.mail.smtpUser = smtp-dnet lbs.mail.smtpUser = smtp-dnet
lbs.mail.smtpPassword = hhr*7932 lbs.mail.smtpPassword = hhr*7932
lbs.mail.message.template = classpath:/templates/openaire_mail.st lbs.mail.message.template = classpath:/templates/dhp_openaire_mail.st
lbs.queues.maxReturnedValues = 1000 lbs.queues.maxReturnedValues = 1000

View File

@ -83,12 +83,14 @@
<tr> <tr>
<th></th> <th></th>
<th class="text-right">Sent messages</th> <th class="text-right">Sent messages</th>
<th class="text-right">Errors</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
<tr ng-repeat="b in summary.dispatchers"> <tr ng-repeat="b in summary.dispatchers">
<th>{{b.name}}</th> <th>{{b.name}}</th>
<td class="text-right">{{b.count}}</td> <td class="text-right">{{b.count}}</td>
<td class="text-right" ng-class="{ 'text-danger' : b.countErrors > 0 }" title="{{b.lastError}}">{{b.countErrors}}</td>
</tr> </tr>
</tbody> </tbody>
</table> </table>

View File

@ -0,0 +1,17 @@
You have received this mail because you are owner of the following subscription:
<ul>
<li><b>ID: </b>$sub.subscriptionId$</li>
<li><b>Topic: </b>$sub.topic$</li>
<li><b>Number of events: </b>$oa_notifications_total$</li>
</ul>
<hr />
<p>
You can access the notified events at this address: .... <br />
or using the Public Broker Service API.
</p>
<hr />
This email message was auto-generated. Please do not respond.

View File

@ -154,7 +154,7 @@ public class OpenairePublicController extends AbstractLbsController {
boolean first = true; boolean first = true;
IOUtils.write("[", gzOut); IOUtils.write("[\n", gzOut);
ScrollPage<NotificationMessage> page = null; ScrollPage<NotificationMessage> page = null;
@ -165,13 +165,13 @@ public class OpenairePublicController extends AbstractLbsController {
if (first) { if (first) {
first = false; first = false;
} else { } else {
IOUtils.write(", ", gzOut); IOUtils.write(",\n", gzOut);
} }
IOUtils.write(gson.toJson(msg), gzOut); IOUtils.write(gson.toJson(msg), gzOut);
} }
} while (!page.isCompleted()); } while (!page.isCompleted());
IOUtils.write("]", gzOut); IOUtils.write("\n]\n", gzOut);
gzOut.flush(); gzOut.flush();

View File

@ -19,6 +19,8 @@ public interface NotificationRepository extends ElasticsearchRepository<Notifica
long countBySubscriptionId(String subscriptionId); long countBySubscriptionId(String subscriptionId);
long countBySubscriptionIdAndDateAfter(String subscriptionId, long from);
void deleteBySubscriptionId(String subscriptionId); void deleteBySubscriptionId(String subscriptionId);
long deleteByDateBetween(long from, long to); long deleteByDateBetween(long from, long to);