dnet-applications/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/matchers/SubscriptionEventMatcher.java

145 lines
5.0 KiB
Java

package eu.dnetlib.broker.matchers;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.util.CloseableIterator;
import org.springframework.stereotype.Component;
import eu.dnetlib.broker.common.elasticsearch.Event;
import eu.dnetlib.broker.common.elasticsearch.Notification;
import eu.dnetlib.broker.common.elasticsearch.NotificationRepository;
import eu.dnetlib.broker.common.properties.ElasticSearchProperties;
import eu.dnetlib.broker.common.subscriptions.MapCondition;
import eu.dnetlib.broker.common.subscriptions.Subscription;
import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository;
import eu.dnetlib.broker.events.output.DispatcherManager;
import eu.dnetlib.broker.utils.LbsQueue;
import eu.dnetlib.broker.utils.QueueManager;
import eu.dnetlib.broker.utils.ThreadManager;
@Profile("!openaire")
@Component
public class SubscriptionEventMatcher implements Runnable {
@Autowired
private QueueManager queueManager;
@Autowired
private DispatcherManager dispatcherManager;
@Autowired
private SubscriptionRepository subscriptionRepo;
@Autowired
private ElasticSearchProperties elasticSearchProperties;
private LbsQueue<Subscription, Subscription> queue;
@Autowired
private ElasticsearchOperations esOperations;
@Autowired
private NotificationRepository notificationRepository;
@Autowired
private ThreadManager threadManager;
private static final Log log = LogFactory.getLog(SubscriptionEventMatcher.class);
@PostConstruct
public void init() {
queue = queueManager.newQueue("subscr-events-matcher-queue", Subscription::isReady);
threadManager.newThread("subscr-events-matcher", this);
}
public void startMatching(final Subscription s) {
if (queue.offer(s)) {
log.info("Matching of subscription " + s.getSubscriptionId() + " in queue");
} else {
log.info("Subscription " + s.getSubscriptionId() + " not queued");
}
}
@Override
public void run() {
log.info("SubscriptionEventMatcher started: " + Thread.currentThread().getName());
while (true) {
try {
final Subscription s = queue.takeOne();
if (s != null && Subscription.isReady(s)) {
startSubscriptionEventsMatcher(s);
s.setLastNotificationDate(new Date());
subscriptionRepo.save(s);
}
} catch (final Throwable e) {
log.error("Error iterating matching queue", e);
}
}
}
private void startSubscriptionEventsMatcher(final Subscription s) {
log.info("Start matching subscription: " + s);
try {
// TODO: eseguire una query simile a quella dell'api di openaire, includendo "> subscription.lastNotificationDate"
final BoolQueryBuilder mapQuery = QueryBuilders.boolQuery();
s.getConditionsAsList()
.stream()
.map(MapCondition::asQueryBuilder)
.filter(Objects::nonNull)
.forEach(mapQuery::must);
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("topic", s.getTopic()).operator(Operator.AND))
.must(QueryBuilders.rangeQuery("creationDate").from(s.getLastNotificationDate() != null ? s.getLastNotificationDate().getTime() : 0))
.must(QueryBuilders.nestedQuery("map", mapQuery, ScoreMode.None)))
.withSearchType(SearchType.DEFAULT)
.withPageable(PageRequest.of(0, 10))
.build();
final List<Event> events = new ArrayList<>();
final CloseableIterator<SearchHit<Event>> it =
esOperations.searchForStream(searchQuery, Event.class, IndexCoordinates.of(elasticSearchProperties.getEventsIndexName()));
while (it.hasNext()) {
final Event e = it.next().getContent();
final Notification n = new Notification(s, e);
if (isNotAlreadyNotified(n)) {
notificationRepository.save(n);
events.add(e);
}
}
log.info("End matching subscription: " + s.getSubscriptionId());
dispatcherManager.dispatch(s, events.toArray(new Event[events.size()]));
} catch (final Throwable e) {
log.error("Error matching subscription: " + s.getSubscriptionId(), e);
}
}
private boolean isNotAlreadyNotified(final Notification n) {
return !notificationRepository.findById(n.getNotificationId()).isPresent();
}
}