package eu.dnetlib.enabling.is.sn; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import javax.xml.ws.wsaddressing.W3CEndpointReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * Asynchronous but in-order delivery of notifications. * * @author marko * */ public class AsynchronousNotificationSenderImpl extends AbstractNotificationSender implements Runnable { // NOPMD /** * logger. */ private static final Log log = LogFactory.getLog(AsynchronousNotificationSenderImpl.class); // NOPMD by marko on 11/24/08 5:02 PM /** * Encapsulates an notification job. * * @author marko * */ class NotificationJob { /** * notification destination. */ private final transient W3CEndpointReference destination; /** * notification message. */ private final transient NotificationMessage message; /** * construct a new notification job. * * @param destination destination * @param message message */ public NotificationJob(final W3CEndpointReference destination, final NotificationMessage message) { super(); this.destination = destination; this.message = message; } public W3CEndpointReference getDestination() { return destination; } public NotificationMessage getMessage() { return message; } } /** * job queue. */ private BlockingQueue jobQueue = new LinkedBlockingQueue(); /** * {@inheritDoc} * * @see eu.dnetlib.enabling.is.sn.NotificationSender#send(javax.xml.ws.wsaddressing.W3CEndpointReference, * eu.dnetlib.enabling.is.sn.NotificationMessage) */ @Override public void send(final W3CEndpointReference destination, final NotificationMessage message) { log.debug("queuing asynchronous notification"); try { jobQueue.put(new NotificationJob(destination, message)); } catch (InterruptedException e) { log.warn("possibly lost notification", e); } } /** * start this notification sender (called by spring lifecycle). */ void start() { new Thread(this).start(); // NOPMD } /** * {@inheritDoc} * @see java.lang.Runnable#run() */ @Override public void run() { while (true) { try { final NotificationJob job = jobQueue.take(); try { getInvoker().send(job.getDestination(), job.getMessage(), 0); } catch (javax.xml.ws.soap.SOAPFaultException t) { log.fatal("error sending notification to " + job.getDestination().toString(), t); } } catch (InterruptedException e) { log.warn("possibly lost notification", e); } } } public BlockingQueue getJobQueue() { return jobQueue; } public void setJobQueue(final BlockingQueue jobQueue) { this.jobQueue = jobQueue; } }