package gr.cite.notification.integrationevent.inbox; import gr.cite.notification.common.JsonHandlingService; import gr.cite.notification.common.enums.IsActive; import gr.cite.notification.common.scope.fake.FakeRequestScope; import gr.cite.notification.data.QueueInboxEntity; import gr.cite.notification.data.TenantEntityManager; import gr.cite.notification.integrationevent.TrackedEvent; import gr.cite.notification.integrationevent.inbox.notify.NotifyIntegrationEventHandler; import gr.cite.notification.integrationevent.inbox.tenantremoval.TenantRemovalIntegrationEventHandler; import gr.cite.notification.integrationevent.inbox.tenanttouched.TenantTouchedIntegrationEventHandler; import gr.cite.notification.integrationevent.inbox.userremoval.UserRemovalIntegrationEventHandler; import gr.cite.notification.integrationevent.inbox.usertouched.UserTouchedIntegrationEventHandler; import gr.cite.notification.query.QueueInboxQuery; import gr.cite.queueinbox.entity.QueueInbox; import gr.cite.queueinbox.entity.QueueInboxStatus; import gr.cite.queueinbox.repository.CandidateInfo; import gr.cite.queueinbox.repository.InboxRepository; import gr.cite.queueinbox.task.MessageOptions; import gr.cite.rabbitmq.IntegrationEventMessageConstants; import gr.cite.rabbitmq.consumer.InboxCreatorParams; import gr.cite.tools.data.query.Ordering; import gr.cite.tools.data.query.QueryFactory; import gr.cite.tools.logging.LoggerService; import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManagerFactory; import jakarta.persistence.EntityTransaction; import jakarta.persistence.OptimisticLockException; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import java.time.Instant; import java.util.List; import java.util.UUID; import java.util.function.Function; public class InboxRepositoryImpl implements InboxRepository { private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(InboxRepositoryImpl.class)); protected final ApplicationContext applicationContext; private final JsonHandlingService jsonHandlingService; private final InboxProperties inboxProperties; public InboxRepositoryImpl( ApplicationContext applicationContext, InboxProperties inboxProperties ) { this.applicationContext = applicationContext; this.jsonHandlingService = this.applicationContext.getBean(JsonHandlingService.class); this.inboxProperties = inboxProperties; } @Override public CandidateInfo candidate(Instant lastCandidateCreationTimestamp, MessageOptions options) { EntityTransaction transaction = null; EntityManager entityManager = null; CandidateInfo candidate = null; try (FakeRequestScope ignored = new FakeRequestScope()) { try { QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); QueueInboxEntity item = queryFactory.query(QueueInboxQuery.class) .isActives(IsActive.Active) .status(QueueInboxStatus.PENDING, QueueInboxStatus.ERROR) .retryThreshold(options.getRetryThreashold()) .createdAfter(lastCandidateCreationTimestamp) .ordering(new Ordering().addAscending(QueueInboxEntity._createdAt)) .first(); if (item != null) { QueueInboxStatus prevState = item.getStatus(); item.setStatus(QueueInboxStatus.PROCESSING); entityManager.merge(item); entityManager.flush(); candidate = new CandidateInfo(); candidate.setId(item.getId()); candidate.setCreatedAt(item.getCreatedAt()); candidate.setPreviousState(prevState); } transaction.commit(); } catch (OptimisticLockException ex) { // we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working logger.debug("Concurrency exception getting queue inbox. Skipping: {} ", ex.getMessage()); if (transaction != null) transaction.rollback(); candidate = null; } catch (Exception ex) { logger.error("Problem getting list of queue inbox. Skipping: {}", ex.getMessage(), ex); if (transaction != null) transaction.rollback(); candidate = null; } finally { if (entityManager != null) entityManager.close(); } } catch (Exception ex) { logger.error("Problem getting list of queue inbox. Skipping: {}", ex.getMessage(), ex); } return candidate; } @Override public Boolean shouldOmit(CandidateInfo candidate, Function shouldOmit) { EntityTransaction transaction = null; EntityManager entityManager = null; boolean success = false; try (FakeRequestScope ignored = new FakeRequestScope()) { try { EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); QueueInboxEntity item = queryFactory.query(QueueInboxQuery.class).ids(candidate.getId()).first(); if (item == null) { logger.warn("Could not lookup queue inbox {} to process. Continuing...", candidate.getId()); } else { if (shouldOmit.apply(item)) { item.setStatus(QueueInboxStatus.OMITTED); entityManager.merge(item); entityManager.flush(); success = true; } } transaction.commit(); } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); if (transaction != null) transaction.rollback(); success = false; } finally { if (entityManager != null) entityManager.close(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); } return success; } @Override public boolean shouldWait(CandidateInfo candidate, Function itIsTimeFunc) { EntityTransaction transaction = null; EntityManager entityManager = null; boolean success = false; try (FakeRequestScope ignored = new FakeRequestScope()) { try { EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); QueueInboxEntity item = queryFactory.query(QueueInboxQuery.class).ids(candidate.getId()).first(); if (item.getRetryCount() != null && item.getRetryCount() >= 1) { Boolean itIsTime = itIsTimeFunc.apply(item); if (!itIsTime) { item.setStatus(candidate.getPreviousState()); entityManager.merge(item); entityManager.flush(); success = true; } success = !itIsTime; } transaction.commit(); } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); if (transaction != null) transaction.rollback(); success = false; } finally { if (entityManager != null) entityManager.close(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); } return success; } @Override public QueueInbox create(InboxCreatorParams inboxCreatorParams) { EntityTransaction transaction = null; EntityManager entityManager = null; boolean success = false; QueueInboxEntity queueMessage = null; try (FakeRequestScope ignored = new FakeRequestScope()) { try { queueMessage = this.createQueueInboxEntity(inboxCreatorParams); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); entityManager.persist(queueMessage); entityManager.flush(); transaction.commit(); } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); if (transaction != null) transaction.rollback(); success = false; } finally { if (entityManager != null) entityManager.close(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); } return queueMessage; } private QueueInboxEntity createQueueInboxEntity(InboxCreatorParams inboxCreatorParams) { QueueInboxEntity queueMessage = new QueueInboxEntity(); queueMessage.setId(UUID.randomUUID()); Object tenantId = inboxCreatorParams.getHeaders() != null ? inboxCreatorParams.getHeaders().getOrDefault(IntegrationEventMessageConstants.TENANT, null) : null; if (tenantId instanceof UUID) queueMessage.setTenantId((UUID) tenantId); else if (tenantId instanceof String) { try { queueMessage.setTenantId(UUID.fromString((String) tenantId)); } catch (Exception e) { } } queueMessage.setExchange(this.inboxProperties.getExchange()); queueMessage.setRoute(inboxCreatorParams.getRoutingKey()); queueMessage.setQueue(inboxCreatorParams.getQueueName()); queueMessage.setApplicationId(inboxCreatorParams.getAppId()); queueMessage.setMessageId(UUID.fromString(inboxCreatorParams.getMessageId())); queueMessage.setMessage(inboxCreatorParams.getMessageBody()); queueMessage.setIsActive(IsActive.Active); queueMessage.setStatus(QueueInboxStatus.PENDING); queueMessage.setRetryCount(0); queueMessage.setCreatedAt(Instant.now()); return queueMessage; } @Override public Boolean emit(CandidateInfo candidateInfo) { EntityTransaction transaction = null; EntityManager entityManager = null; boolean success = false; try (FakeRequestScope ignored = new FakeRequestScope()) { try { EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); tenantEntityManager.setEntityManager(entityManager); transaction.begin(); QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); QueueInboxEntity queueInboxMessage = queryFactory.query(QueueInboxQuery.class).ids(candidateInfo.getId()).first(); if (queueInboxMessage == null) { logger.warn("Could not lookup queue inbox {} to process. Continuing...", candidateInfo.getId()); } else { EventProcessingStatus status = this.processMessage(queueInboxMessage); switch (status) { case Success: { queueInboxMessage.setStatus(QueueInboxStatus.SUCCESSFUL); break; } case Postponed: { queueInboxMessage.setStatus(QueueInboxStatus.PARKED); break; } case Error: { queueInboxMessage.setStatus(QueueInboxStatus.ERROR); queueInboxMessage.setRetryCount(queueInboxMessage.getRetryCount() != null ? queueInboxMessage.getRetryCount() + 1 : 0); break; } case Discard: default: { queueInboxMessage.setStatus(QueueInboxStatus.DISCARD); break; } } success = status == EventProcessingStatus.Success; entityManager.merge(queueInboxMessage); entityManager.flush(); } transaction.commit(); } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); if (transaction != null) transaction.rollback(); success = false; } finally { if (entityManager != null) entityManager.close(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); } return success; } private EventProcessingStatus processMessage(QueueInboxEntity queueInboxMessage) { IntegrationEventHandler handler; logger.debug("Processing message with routing key '{}'", queueInboxMessage.getRoute()); if (this.routingKeyMatched(queueInboxMessage.getRoute(), this.inboxProperties.getTenantRemovalTopic())) handler = this.applicationContext.getBean(TenantRemovalIntegrationEventHandler.class); else if (this.routingKeyMatched(queueInboxMessage.getRoute(), this.inboxProperties.getTenantTouchedTopic())) handler = this.applicationContext.getBean(TenantTouchedIntegrationEventHandler.class); else if (this.routingKeyMatched(queueInboxMessage.getRoute(), this.inboxProperties.getUserRemovalTopic())) handler = this.applicationContext.getBean(UserRemovalIntegrationEventHandler.class); else if (this.routingKeyMatched(queueInboxMessage.getRoute(), this.inboxProperties.getUserTouchedTopic())) handler = this.applicationContext.getBean(UserTouchedIntegrationEventHandler.class); else if (this.routingKeyMatched(queueInboxMessage.getRoute(), this.inboxProperties.getNotifyTopic())) handler = this.applicationContext.getBean(NotifyIntegrationEventHandler.class); else { logger.error("No handler found for message routing key '{}'. Discarding.", queueInboxMessage.getRoute()); handler = null; } if (handler == null) return EventProcessingStatus.Discard; IntegrationEventProperties properties = new IntegrationEventProperties(); properties.setAppId(queueInboxMessage.getApplicationId()); properties.setMessageId(queueInboxMessage.getMessageId().toString()); properties.setTenantId(queueInboxMessage.getTenantId()); TrackedEvent event = this.jsonHandlingService.fromJsonSafe(TrackedEvent.class, queueInboxMessage.getMessage()); // using (LogContext.PushProperty(this._logTrackingConfig.LogTrackingContextName, @event.TrackingContextTag)) // { try { return handler.handle(properties, queueInboxMessage.getMessage()); } catch (Exception ex) { logger.error("problem handling event from routing key " + queueInboxMessage.getRoute() + ". Setting nack and continuing...", ex); return EventProcessingStatus.Error; } // } } private Boolean routingKeyMatched(String routingKey, List topics) { if (topics == null || topics.isEmpty()) return false; return topics.stream().anyMatch(x -> x.equals(routingKey)); } }