mail dispatchers
This commit is contained in:
parent
436d129008
commit
eaa4f0e174
|
@ -77,7 +77,7 @@ public class AjaxController extends AbstractLbsController {
|
|||
|
||||
final List<DispatcherStatus> dispatchers = dispatcherManager.getDispatchers()
|
||||
.stream()
|
||||
.map(d -> new DispatcherStatus(d.getDispatcherName(), d.count()))
|
||||
.map(d -> new DispatcherStatus(d.getDispatcherName(), d.count(), d.countErrors(), d.lastError()))
|
||||
.sorted()
|
||||
.collect(Collectors.toList());
|
||||
|
||||
|
|
|
@ -4,18 +4,30 @@ public class DispatcherStatus implements Comparable<DispatcherStatus> {
|
|||
|
||||
private final String name;
|
||||
private final long count;
|
||||
private final long countErrors;
|
||||
private final String lastError;
|
||||
|
||||
public DispatcherStatus(final String name, final long count) {
|
||||
public DispatcherStatus(final String name, final long count, final long countErrors, final String lastError) {
|
||||
this.name = name;
|
||||
this.count = count;
|
||||
this.countErrors = countErrors;
|
||||
this.lastError = lastError;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
return name;
|
||||
}
|
||||
|
||||
public long getCount() {
|
||||
return this.count;
|
||||
return count;
|
||||
}
|
||||
|
||||
public long getCountErrors() {
|
||||
return countErrors;
|
||||
}
|
||||
|
||||
public String getLastError() {
|
||||
return lastError;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package eu.dnetlib.broker.events.output;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
@ -10,6 +11,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.springframework.beans.factory.BeanNameAware;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
|
||||
import eu.dnetlib.broker.common.elasticsearch.Event;
|
||||
import eu.dnetlib.broker.common.subscriptions.Subscription;
|
||||
import eu.dnetlib.broker.utils.LbsQueue;
|
||||
|
@ -30,6 +33,10 @@ public abstract class AbstractNotificationDispatcher<T> implements NotificationD
|
|||
|
||||
private final AtomicLong count = new AtomicLong(0);
|
||||
|
||||
private final AtomicLong countErrors = new AtomicLong(0);
|
||||
|
||||
private String lastError = "";
|
||||
|
||||
private static final Log log = LogFactory.getLog(AbstractNotificationDispatcher.class);
|
||||
|
||||
@PostConstruct
|
||||
|
@ -48,7 +55,8 @@ public abstract class AbstractNotificationDispatcher<T> implements NotificationD
|
|||
this.count.incrementAndGet();
|
||||
} catch (final Throwable e) {
|
||||
log.error("Error sending notification", e);
|
||||
this.queue.offer(message);
|
||||
this.countErrors.incrementAndGet();
|
||||
this.lastError = e.getMessage() + "\nStacktrave:\n" + Throwables.getStackTraceAsString(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -64,6 +72,17 @@ public abstract class AbstractNotificationDispatcher<T> implements NotificationD
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendNotification(final Subscription subscription, final Map<String, Object> params) {
|
||||
try {
|
||||
this.queue.offer(prepareAction(subscription, params));
|
||||
} catch (final Exception e) {
|
||||
log.error("Error sending notification", e);
|
||||
}
|
||||
}
|
||||
|
||||
abstract protected T prepareAction(final Subscription subscription, final Map<String, Object> params) throws Exception;
|
||||
|
||||
abstract protected T prepareAction(final Subscription subscription, final Event... events) throws Exception;
|
||||
|
||||
abstract protected void performAction(final T message) throws Exception;
|
||||
|
@ -82,9 +101,21 @@ public abstract class AbstractNotificationDispatcher<T> implements NotificationD
|
|||
return this.count.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long countErrors() {
|
||||
return this.countErrors.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lastError() {
|
||||
return lastError;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetCount() {
|
||||
this.count.set(0);
|
||||
this.countErrors.set(0);
|
||||
this.lastError = "";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -2,6 +2,7 @@ package eu.dnetlib.broker.events.output;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -33,6 +34,19 @@ public class DispatcherManager {
|
|||
}
|
||||
}
|
||||
|
||||
public void sendNotification(final Subscription s, final Map<String, Object> params) {
|
||||
final Optional<NotificationDispatcher> dispatcher = this.dispatchers
|
||||
.stream()
|
||||
.filter(d -> d.getMode() == s.getMode())
|
||||
.findFirst();
|
||||
|
||||
if (dispatcher.isPresent()) {
|
||||
dispatcher.get().sendNotification(s, params);
|
||||
} else {
|
||||
log.error("Notification dispatcher not found, mode=" + s.getMode());
|
||||
}
|
||||
}
|
||||
|
||||
public List<NotificationDispatcher> getDispatchers() {
|
||||
return this.dispatchers;
|
||||
}
|
||||
|
|
|
@ -1,17 +1,20 @@
|
|||
package eu.dnetlib.broker.events.output;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.mail.Authenticator;
|
||||
import javax.mail.Message;
|
||||
import javax.mail.MessagingException;
|
||||
import javax.mail.PasswordAuthentication;
|
||||
import javax.mail.Session;
|
||||
import javax.mail.Transport;
|
||||
import javax.mail.internet.AddressException;
|
||||
import javax.mail.internet.InternetAddress;
|
||||
import javax.mail.internet.MimeMessage;
|
||||
|
||||
|
@ -54,17 +57,51 @@ public class EmailDispatcher extends AbstractNotificationDispatcher<Message> {
|
|||
}
|
||||
if (events.length == 0) {
|
||||
log.warn("Event list is empty");
|
||||
throw new IllegalArgumentException("Event list is empty");
|
||||
}
|
||||
|
||||
final String topics = Arrays.stream(events).map(e -> e.getTopic()).distinct().collect(Collectors.joining(", "));
|
||||
final String content = generateMailContent(subscription, null, events);
|
||||
|
||||
return prepareMimeMessage(subscription, content);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Message prepareAction(final Subscription subscription, final Map<String, Object> params) throws Exception {
|
||||
if (subscription == null || StringUtils.isBlank(subscription.getSubscriber())) {
|
||||
log.warn("Invalid subscription");
|
||||
throw new IllegalArgumentException("Invalid subscription");
|
||||
}
|
||||
final String content = generateMailContent(subscription, params);
|
||||
|
||||
return prepareMimeMessage(subscription, content);
|
||||
}
|
||||
|
||||
private String generateMailContent(final Subscription subscription, final Map<String, Object> params, final Event... events) throws IOException {
|
||||
final StringTemplate st = new StringTemplate(IOUtils.toString(emailTemplate.getInputStream(), StandardCharsets.UTF_8));
|
||||
st.setAttribute("sub", subscription);
|
||||
|
||||
st.setAttribute("total", events.length);
|
||||
st.setAttribute("max", MAX_NUMBER_OF_EVENTS);
|
||||
|
||||
if (events.length > MAX_NUMBER_OF_EVENTS) {
|
||||
st.setAttribute("events", Arrays.copyOfRange(events, 0, MAX_NUMBER_OF_EVENTS));
|
||||
} else {
|
||||
st.setAttribute("events", events);
|
||||
}
|
||||
if (params != null) {
|
||||
params.entrySet().forEach(e -> st.setAttribute(e.getKey(), e.getValue()));
|
||||
}
|
||||
|
||||
return st.toString();
|
||||
}
|
||||
|
||||
private Message prepareMimeMessage(final Subscription subscription, final String content)
|
||||
throws MessagingException, UnsupportedEncodingException, AddressException {
|
||||
final Session session = Session.getInstance(obtainProperties(), obtainAuthenticator());
|
||||
|
||||
final MimeMessage mimeMessage = new MimeMessage(session);
|
||||
mimeMessage.setFrom(new InternetAddress(props.getFrom(), props.getFromName()));
|
||||
mimeMessage.setSubject("Notification for topic(s): " + topics);
|
||||
mimeMessage.setContent(generateMailContent(subscription, events), "text/html; charset=utf-8");
|
||||
mimeMessage.setSubject("Notification for topic: " + subscription.getTopic());
|
||||
mimeMessage.setContent(content, "text/html; charset=utf-8");
|
||||
mimeMessage.setSentDate(new Date());
|
||||
|
||||
mimeMessage.addRecipient(Message.RecipientType.TO, new InternetAddress(subscription.getSubscriber()));
|
||||
|
@ -80,21 +117,6 @@ public class EmailDispatcher extends AbstractNotificationDispatcher<Message> {
|
|||
return mimeMessage;
|
||||
}
|
||||
|
||||
private String generateMailContent(final Subscription subscription, final Event... events) throws IOException {
|
||||
final StringTemplate st = new StringTemplate(IOUtils.toString(emailTemplate.getInputStream(), StandardCharsets.UTF_8));
|
||||
st.setAttribute("sub", subscription);
|
||||
|
||||
st.setAttribute("total", events.length);
|
||||
st.setAttribute("max", MAX_NUMBER_OF_EVENTS);
|
||||
|
||||
if (events.length > MAX_NUMBER_OF_EVENTS) {
|
||||
st.setAttribute("events", Arrays.copyOfRange(events, 0, MAX_NUMBER_OF_EVENTS));
|
||||
} else {
|
||||
st.setAttribute("events", events);
|
||||
}
|
||||
return st.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void performAction(final Message message) throws Exception {
|
||||
log.info("Sending mail to " + Arrays.toString(message.getAllRecipients()) + "...");
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package eu.dnetlib.broker.events.output;
|
||||
|
||||
import java.io.StringWriter;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -28,6 +29,17 @@ public class MockDispatcher extends AbstractNotificationDispatcher<String> {
|
|||
return sw.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String prepareAction(final Subscription subscription, final Map<String, Object> params) throws Exception {
|
||||
final StringWriter sw = new StringWriter();
|
||||
sw.write("\n********************************");
|
||||
sw.write("\n* New notification");
|
||||
sw.write("\n* - Subscription : " + subscription);
|
||||
params.entrySet().forEach(e -> sw.write("\n* - " + e.getKey() + ": " + e.getValue()));
|
||||
sw.write("\n********************************");
|
||||
return sw.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void performAction(final String message) throws Exception {
|
||||
log.info(message);
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package eu.dnetlib.broker.events.output;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import eu.dnetlib.broker.common.elasticsearch.Event;
|
||||
import eu.dnetlib.broker.common.subscriptions.NotificationMode;
|
||||
import eu.dnetlib.broker.common.subscriptions.Subscription;
|
||||
|
@ -15,4 +17,10 @@ public interface NotificationDispatcher {
|
|||
long count();
|
||||
|
||||
NotificationMode getMode();
|
||||
|
||||
void sendNotification(Subscription subscription, Map<String, Object> params);
|
||||
|
||||
long countErrors();
|
||||
|
||||
String lastError();
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package eu.dnetlib.broker.openaire;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -45,6 +48,7 @@ import eu.dnetlib.broker.common.properties.ElasticSearchProperties;
|
|||
import eu.dnetlib.broker.common.subscriptions.MapCondition;
|
||||
import eu.dnetlib.broker.common.subscriptions.Subscription;
|
||||
import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository;
|
||||
import eu.dnetlib.broker.events.output.DispatcherManager;
|
||||
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
|
@ -67,6 +71,9 @@ public class OpenaireBrokerController extends AbstractLbsController {
|
|||
@Autowired
|
||||
private ElasticSearchProperties props;
|
||||
|
||||
@Autowired
|
||||
private DispatcherManager dispatcher;
|
||||
|
||||
private static final Log log = LogFactory.getLog(OpenaireBrokerController.class);
|
||||
|
||||
@ApiOperation("Return the datasources having events")
|
||||
|
@ -222,6 +229,28 @@ public class OpenaireBrokerController extends AbstractLbsController {
|
|||
|
||||
}
|
||||
|
||||
@ApiOperation("Send notifications")
|
||||
@GetMapping("/notifications/send/{date}")
|
||||
private List<String> sendMailForNotifications(@PathVariable final long date) {
|
||||
new Thread(() -> innerSendMailForNotifications(date)).start();
|
||||
return Arrays.asList("Sending ...");
|
||||
}
|
||||
|
||||
private void innerSendMailForNotifications(final long date) {
|
||||
for (final Subscription s : subscriptionRepo.findAll()) {
|
||||
|
||||
final long count = notificationRepository.countBySubscriptionIdAndDateAfter(s.getSubscriptionId(), date);
|
||||
if (count > 0) {
|
||||
final Map<String, Object> params = new HashMap<>();
|
||||
params.put("oa_notifications_total", count);
|
||||
dispatcher.sendNotification(s, params);
|
||||
}
|
||||
|
||||
s.setLastNotificationDate(new Date());
|
||||
subscriptionRepo.save(s);
|
||||
}
|
||||
}
|
||||
|
||||
private SimpleSubscriptionDesc subscriptionDesc(final Subscription s) {
|
||||
return new SimpleSubscriptionDesc(s.getSubscriptionId(), extractDatasource(s), s.getTopic(), s.getCreationDate(), s.getLastNotificationDate(),
|
||||
OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId()));
|
||||
|
|
|
@ -39,8 +39,7 @@ lbs.mail.smtpHost = smtp.isti.cnr.it
|
|||
lbs.mail.smtpPort = 587
|
||||
lbs.mail.smtpUser = smtp-dnet
|
||||
lbs.mail.smtpPassword = hhr*7932
|
||||
lbs.mail.message.template = classpath:/templates/openaire_mail.st
|
||||
|
||||
lbs.mail.message.template = classpath:/templates/dhp_openaire_mail.st
|
||||
|
||||
lbs.queues.maxReturnedValues = 1000
|
||||
|
||||
|
|
|
@ -83,12 +83,14 @@
|
|||
<tr>
|
||||
<th></th>
|
||||
<th class="text-right">Sent messages</th>
|
||||
<th class="text-right">Errors</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<tr ng-repeat="b in summary.dispatchers">
|
||||
<th>{{b.name}}</th>
|
||||
<td class="text-right">{{b.count}}</td>
|
||||
<td class="text-right" ng-class="{ 'text-danger' : b.countErrors > 0 }" title="{{b.lastError}}">{{b.countErrors}}</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
You have received this mail because you are owner of the following subscription:
|
||||
|
||||
<ul>
|
||||
<li><b>ID: </b>$sub.subscriptionId$</li>
|
||||
<li><b>Topic: </b>$sub.topic$</li>
|
||||
<li><b>Number of events: </b>$oa_notifications_total$</li>
|
||||
</ul>
|
||||
|
||||
<hr />
|
||||
|
||||
<p>
|
||||
You can access the notified events at this address: .... <br />
|
||||
or using the Public Broker Service API.
|
||||
</p>
|
||||
|
||||
<hr />
|
||||
This email message was auto-generated. Please do not respond.
|
|
@ -154,7 +154,7 @@ public class OpenairePublicController extends AbstractLbsController {
|
|||
|
||||
boolean first = true;
|
||||
|
||||
IOUtils.write("[", gzOut);
|
||||
IOUtils.write("[\n", gzOut);
|
||||
|
||||
ScrollPage<NotificationMessage> page = null;
|
||||
|
||||
|
@ -165,13 +165,13 @@ public class OpenairePublicController extends AbstractLbsController {
|
|||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
IOUtils.write(", ", gzOut);
|
||||
IOUtils.write(",\n", gzOut);
|
||||
}
|
||||
IOUtils.write(gson.toJson(msg), gzOut);
|
||||
}
|
||||
} while (!page.isCompleted());
|
||||
|
||||
IOUtils.write("]", gzOut);
|
||||
IOUtils.write("\n]\n", gzOut);
|
||||
|
||||
gzOut.flush();
|
||||
|
||||
|
|
|
@ -19,6 +19,8 @@ public interface NotificationRepository extends ElasticsearchRepository<Notifica
|
|||
|
||||
long countBySubscriptionId(String subscriptionId);
|
||||
|
||||
long countBySubscriptionIdAndDateAfter(String subscriptionId, long from);
|
||||
|
||||
void deleteBySubscriptionId(String subscriptionId);
|
||||
|
||||
long deleteByDateBetween(long from, long to);
|
||||
|
|
Loading…
Reference in New Issue