dnet-applications/dhp-broker-application/.svn/pristine/5f/5fcca311acd76bf784b86c9d7e8...

71 lines
2.4 KiB
Plaintext

package eu.dnetlib.lbs.events.manager;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.elasticsearch.EventRepository;
import eu.dnetlib.lbs.elasticsearch.Notification;
import eu.dnetlib.lbs.elasticsearch.NotificationRepository;
import eu.dnetlib.lbs.events.output.DispatcherManager;
import eu.dnetlib.lbs.subscriptions.NotificationFrequency;
import eu.dnetlib.lbs.subscriptions.Subscription;
import eu.dnetlib.lbs.subscriptions.SubscriptionRepository;
import eu.dnetlib.lbs.utils.LbsQueue;
public class EventManager implements Runnable {
private final EventRepository eventRepository;
private final NotificationRepository notificationRepository;
private final SubscriptionRepository subscriptionRepo;
private final DispatcherManager dispatcherManager;
private final LbsQueue<String, Event> queue;
private static final Log log = LogFactory.getLog(EventManager.class);
public EventManager(final EventRepository eventRepository, final NotificationRepository notificationRepository,
final SubscriptionRepository subscriptionRepo,
final DispatcherManager dispatcherManager,
final LbsQueue<String, Event> queue) {
this.eventRepository = eventRepository;
this.notificationRepository = notificationRepository;
this.subscriptionRepo = subscriptionRepo;
this.dispatcherManager = dispatcherManager;
this.queue = queue;
}
public boolean add(final String s) {
return this.queue.offer(s);
}
@Override
public void run() {
log.info("Event indexer started: " + Thread.currentThread().getName());
while (true) {
final List<Event> list = this.queue.takeList();
this.eventRepository.saveAll(list);
// TODO: cache of Subscription with NotificationFrequency.realtime
list.stream().filter(Event::isInstantMessage).forEach(e -> {
final Iterable<Subscription> iter = this.subscriptionRepo.findByTopic(e.getTopic());
StreamSupport.stream(iter.spliterator(), false)
.filter(s -> s.verifyEventConditions(e))
.filter(s -> s.getFrequency() == NotificationFrequency.realtime)
.forEach(s -> {
final Notification n = new Notification(s, e);
this.notificationRepository.save(n);
this.dispatcherManager.dispatch(s, e);
});
});
}
}
public LbsQueue<String, Event> getQueue() {
return this.queue;
}
}