dnet-applications/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/events/output/AbstractNotificationDispatc...

98 lines
2.4 KiB
Java

package eu.dnetlib.broker.events.output;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.broker.common.elasticsearch.Event;
import eu.dnetlib.broker.common.subscriptions.Subscription;
import eu.dnetlib.broker.utils.LbsQueue;
import eu.dnetlib.broker.utils.QueueManager;
import eu.dnetlib.broker.utils.ThreadManager;
public abstract class AbstractNotificationDispatcher<T> implements NotificationDispatcher, BeanNameAware, Runnable {
private String dispatcherName;
@Autowired
private QueueManager queueManager;
@Autowired
private ThreadManager threadManager;
private LbsQueue<T, T> queue;
private final AtomicLong count = new AtomicLong(0);
private static final Log log = LogFactory.getLog(AbstractNotificationDispatcher.class);
@PostConstruct
public void init() {
this.queue = this.queueManager.newQueue(this.dispatcherName + "-queue");
this.threadManager.newThread(this.dispatcherName, this);
}
@Override
public void run() {
while (true) {
for (final T message : this.queue.takeList()) {
if (message != null) {
try {
performAction(message);
this.count.incrementAndGet();
} catch (final Throwable e) {
log.error("Error sending notification", e);
this.queue.offer(message);
}
}
}
}
}
@Override
public void sendNotification(final Subscription subscription, final Event... events) {
try {
this.queue.offer(prepareAction(subscription, events));
} catch (final Exception e) {
log.error("Error sending notification", e);
}
}
abstract protected T prepareAction(final Subscription subscription, final Event... events) throws Exception;
abstract protected void performAction(final T message) throws Exception;
@Override
public String getDispatcherName() {
return this.dispatcherName;
}
public void setDispatcherName(final String dispatcherName) {
this.dispatcherName = dispatcherName;
}
@Override
public long count() {
return this.count.get();
}
@Override
public void resetCount() {
this.count.set(0);
}
@Override
public void setBeanName(final String name) {
if (StringUtils.isBlank(getDispatcherName())) {
setDispatcherName(name);
}
}
}