diff --git a/backend/core/src/main/java/org/opencdmp/integrationevent/inbox/InboxRepositoryImpl.java b/backend/core/src/main/java/org/opencdmp/integrationevent/inbox/InboxRepositoryImpl.java index 04828f1c1..85d45608b 100644 --- a/backend/core/src/main/java/org/opencdmp/integrationevent/inbox/InboxRepositoryImpl.java +++ b/backend/core/src/main/java/org/opencdmp/integrationevent/inbox/InboxRepositoryImpl.java @@ -48,8 +48,12 @@ public class InboxRepositoryImpl implements InboxRepository { EntityTransaction transaction = null; CandidateInfo candidate = null; try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try{ + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); + tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); @@ -92,7 +96,8 @@ public class InboxRepositoryImpl implements InboxRepository { transaction.rollback(); candidate = null; } finally { - tenantEntityManager.reloadTenantFilters(); + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); } } catch (Exception ex) { logger.error("Problem getting list of queue inbox. Skipping: {}", ex.getMessage(), ex); @@ -107,8 +112,12 @@ public class InboxRepositoryImpl implements InboxRepository { boolean success = false; try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try{ + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); + tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); @@ -138,7 +147,8 @@ public class InboxRepositoryImpl implements InboxRepository { transaction.rollback(); success = false; } finally { - tenantEntityManager.reloadTenantFilters(); + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); @@ -152,8 +162,12 @@ public class InboxRepositoryImpl implements InboxRepository { boolean success = false; try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try{ + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); + tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); @@ -184,7 +198,8 @@ public class InboxRepositoryImpl implements InboxRepository { transaction.rollback(); success = false; } finally { - tenantEntityManager.reloadTenantFilters(); + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); @@ -198,8 +213,12 @@ public class InboxRepositoryImpl implements InboxRepository { boolean success; QueueInboxEntity queueMessage = null; try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try{ + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); + tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); @@ -223,7 +242,8 @@ public class InboxRepositoryImpl implements InboxRepository { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); if (transaction != null) transaction.rollback(); } finally { - tenantEntityManager.reloadTenantFilters(); + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); } } catch (Exception ex) { success = false; @@ -272,8 +292,12 @@ public class InboxRepositoryImpl implements InboxRepository { } else { EventProcessingStatus status = this.emitQueueInboxEntity(queueInboxMessage); try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try{ + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); + tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); @@ -313,7 +337,8 @@ public class InboxRepositoryImpl implements InboxRepository { transaction.rollback(); success = false; } finally { - tenantEntityManager.reloadTenantFilters(); + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); @@ -327,8 +352,12 @@ public class InboxRepositoryImpl implements InboxRepository { EntityTransaction transaction = null; EventProcessingStatus status = EventProcessingStatus.Discard; try (FakeRequestScope ignored = new FakeRequestScope()) { - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try{ + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); + tenantEntityManager.setEntityManager(entityManager); transaction = entityManager.getTransaction(); @@ -350,6 +379,8 @@ public class InboxRepositoryImpl implements InboxRepository { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); if (transaction != null) transaction.rollback(); + } finally { + if (entityManager != null) entityManager.close(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); diff --git a/backend/core/src/main/java/org/opencdmp/integrationevent/inbox/annotationentitycreated/AnnotationEntityCreatedIntegrationEventHandlerImpl.java b/backend/core/src/main/java/org/opencdmp/integrationevent/inbox/annotationentitycreated/AnnotationEntityCreatedIntegrationEventHandlerImpl.java index 3841b0516..e9807ef05 100644 --- a/backend/core/src/main/java/org/opencdmp/integrationevent/inbox/annotationentitycreated/AnnotationEntityCreatedIntegrationEventHandlerImpl.java +++ b/backend/core/src/main/java/org/opencdmp/integrationevent/inbox/annotationentitycreated/AnnotationEntityCreatedIntegrationEventHandlerImpl.java @@ -41,7 +41,6 @@ import org.springframework.stereotype.Component; import javax.management.InvalidApplicationException; import java.util.*; -import java.util.stream.Collectors; @Component @@ -93,19 +92,19 @@ public class AnnotationEntityCreatedIntegrationEventHandlerImpl implements Annot try { if (this.tenantScope.isMultitenant() && properties.getTenantId() != null) { - TenantEntity tenant = queryFactory.query(TenantQuery.class).disableTracking().ids(properties.getTenantId()).firstAs(new BaseFieldSet().ensure(Tenant._id).ensure(Tenant._code)); + TenantEntity tenant = this.queryFactory.query(TenantQuery.class).disableTracking().ids(properties.getTenantId()).firstAs(new BaseFieldSet().ensure(Tenant._id).ensure(Tenant._code)); if (tenant == null) { logger.error("missing tenant from event message"); return EventProcessingStatus.Error; } - this.tenantScope.setTempTenant(tenantEntityManager, properties.getTenantId(), tenant.getCode()); + this.tenantScope.setTempTenant(this.tenantEntityManager, properties.getTenantId(), tenant.getCode()); } else if (this.tenantScope.isMultitenant()) { - this.tenantScope.setTempTenant(tenantEntityManager, null, this.tenantScope.getDefaultTenantCode()); + this.tenantScope.setTempTenant(this.tenantEntityManager, null, this.tenantScope.getDefaultTenantCode()); } - currentPrincipalResolver.push(InboxPrincipal.build(properties, claimExtractorProperties)); + this.currentPrincipalResolver.push(InboxPrincipal.build(properties, this.claimExtractorProperties)); this.sendNotification(event); - auditService.track(AuditableAction.Annotation_Created_Notify, Map.ofEntries( + this.auditService.track(AuditableAction.Annotation_Created_Notify, Map.ofEntries( new AbstractMap.SimpleEntry("model", event) )); @@ -113,9 +112,9 @@ public class AnnotationEntityCreatedIntegrationEventHandlerImpl implements Annot status = EventProcessingStatus.Error; logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); } finally { - currentPrincipalResolver.pop(); + this.currentPrincipalResolver.pop(); try { - tenantScope.removeTempTenant(this.tenantEntityManager); + this.tenantScope.removeTempTenant(this.tenantEntityManager); this.tenantEntityManager.reloadTenantFilters(); } catch (InvalidApplicationException e) { } @@ -160,7 +159,7 @@ public class AnnotationEntityCreatedIntegrationEventHandlerImpl implements Annot NotifyIntegrationEvent notifyIntegrationEvent = new NotifyIntegrationEvent(); notifyIntegrationEvent.setUserId(user.getId()); - notifyIntegrationEvent.setNotificationType(notificationProperties.getDescriptionAnnotationCreated()); + notifyIntegrationEvent.setNotificationType(this.notificationProperties.getDescriptionAnnotationCreated()); NotificationFieldData data = new NotificationFieldData(); List fieldInfoList = new ArrayList<>(); diff --git a/backend/core/src/main/java/org/opencdmp/integrationevent/outbox/OutboxProperties.java b/backend/core/src/main/java/org/opencdmp/integrationevent/outbox/OutboxProperties.java index 884e51cb6..82e07e717 100644 --- a/backend/core/src/main/java/org/opencdmp/integrationevent/outbox/OutboxProperties.java +++ b/backend/core/src/main/java/org/opencdmp/integrationevent/outbox/OutboxProperties.java @@ -2,12 +2,14 @@ package org.opencdmp.integrationevent.outbox; import org.springframework.boot.context.properties.ConfigurationProperties; -import java.util.List; - @ConfigurationProperties(prefix = "queue.task.publisher.options") public class OutboxProperties { private final String exchange; + private final int handleAckRetries; + private final int handleNackRetries; + private final int handleAckWaitInMilliSeconds; + private final int handleNackWaitInMilliSeconds; private final String tenantDefaultLocaleRemovalTopic; private final String tenantDefaultLocaleTouchedTopic; @@ -38,8 +40,8 @@ public class OutboxProperties { private final String generateFileTopic; - public OutboxProperties(String exchange, - String tenantDefaultLocaleRemovalTopic, + public OutboxProperties(String exchange, int handleAckRetries, int handleNackRetries, int handleAckWaitInMilliSeconds, int handleNackWaitInMilliSeconds, + String tenantDefaultLocaleRemovalTopic, String tenantDefaultLocaleTouchedTopic, String tenantTouchTopic, String tenantRemovalTopic, @@ -57,6 +59,10 @@ public class OutboxProperties { String generateFileTopic ) { this.exchange = exchange; + this.handleAckRetries = handleAckRetries; + this.handleNackRetries = handleNackRetries; + this.handleAckWaitInMilliSeconds = handleAckWaitInMilliSeconds; + this.handleNackWaitInMilliSeconds = handleNackWaitInMilliSeconds; this.tenantDefaultLocaleRemovalTopic = tenantDefaultLocaleRemovalTopic; this.tenantDefaultLocaleTouchedTopic = tenantDefaultLocaleTouchedTopic; this.tenantTouchTopic = tenantTouchTopic; @@ -76,70 +82,82 @@ public class OutboxProperties { } public String getExchange() { - return exchange; + return this.exchange; + } + + public int getHandleAckRetries() { + return this.handleAckRetries; + } + + public int getHandleNackRetries() { + return this.handleNackRetries; + } + + public int getHandleAckWaitInMilliSeconds() { + return this.handleAckWaitInMilliSeconds; } public String getTenantDefaultLocaleRemovalTopic() { - return tenantDefaultLocaleRemovalTopic; + return this.tenantDefaultLocaleRemovalTopic; } public String getTenantDefaultLocaleTouchedTopic() { - return tenantDefaultLocaleTouchedTopic; + return this.tenantDefaultLocaleTouchedTopic; } public String getTenantTouchTopic() { - return tenantTouchTopic; + return this.tenantTouchTopic; } public String getTenantRemovalTopic() { - return tenantRemovalTopic; + return this.tenantRemovalTopic; } public String getTenantReactivationTopic() { - return tenantReactivationTopic; + return this.tenantReactivationTopic; } public String getTenantUserInviteTopic() { - return tenantUserInviteTopic; + return this.tenantUserInviteTopic; } public String getUserRemovalTopic() { - return userRemovalTopic; + return this.userRemovalTopic; } public String getUserTouchTopic() { - return userTouchTopic; + return this.userTouchTopic; } public String getDmpTouchTopic() { - return dmpTouchTopic; + return this.dmpTouchTopic; } public String getDescriptionTouchTopic() { - return descriptionTouchTopic; + return this.descriptionTouchTopic; } public String getAnnotationEntitiesTouchTopic() { - return annotationEntitiesTouchTopic; + return this.annotationEntitiesTouchTopic; } public String getAnnotationEntitiesRemovalTopic() { - return annotationEntitiesRemovalTopic; + return this.annotationEntitiesRemovalTopic; } public String getNotifyTopic() { - return notifyTopic; + return this.notifyTopic; } public String getForgetMeCompletedTopic() { - return forgetMeCompletedTopic; + return this.forgetMeCompletedTopic; } public String getWhatYouKnowAboutMeCompletedTopic() { - return whatYouKnowAboutMeCompletedTopic; + return this.whatYouKnowAboutMeCompletedTopic; } public String getGenerateFileTopic() { - return generateFileTopic; + return this.generateFileTopic; } } diff --git a/backend/core/src/main/java/org/opencdmp/integrationevent/outbox/OutboxRepositoryImpl.java b/backend/core/src/main/java/org/opencdmp/integrationevent/outbox/OutboxRepositoryImpl.java index 72a6c0013..6f321889b 100644 --- a/backend/core/src/main/java/org/opencdmp/integrationevent/outbox/OutboxRepositoryImpl.java +++ b/backend/core/src/main/java/org/opencdmp/integrationevent/outbox/OutboxRepositoryImpl.java @@ -16,6 +16,7 @@ import org.opencdmp.commons.fake.FakeRequestScope; import org.opencdmp.data.QueueOutboxEntity; import org.opencdmp.data.TenantEntityManager; import org.opencdmp.query.QueueOutboxQuery; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; @@ -27,6 +28,7 @@ import java.util.stream.Collectors; public class OutboxRepositoryImpl implements OutboxRepository { + private static final Logger log = LoggerFactory.getLogger(OutboxRepositoryImpl.class); protected final ApplicationContext applicationContext; private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(OutboxRepositoryImpl.class)); @@ -50,8 +52,12 @@ public class OutboxRepositoryImpl implements OutboxRepository { EntityTransaction transaction = null; CandidateInfo candidate = null; try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try{ + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); + tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); @@ -96,7 +102,8 @@ public class OutboxRepositoryImpl implements OutboxRepository { transaction.rollback(); candidate = null; } finally { - tenantEntityManager.reloadTenantFilters(); + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); } } catch (Exception ex) { logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); @@ -111,8 +118,11 @@ public class OutboxRepositoryImpl implements OutboxRepository { boolean success = false; try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try{ + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); @@ -142,7 +152,8 @@ public class OutboxRepositoryImpl implements OutboxRepository { transaction.rollback(); success = false; } finally { - tenantEntityManager.reloadTenantFilters(); + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); @@ -156,8 +167,11 @@ public class OutboxRepositoryImpl implements OutboxRepository { boolean success = false; try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try{ + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); @@ -188,7 +202,8 @@ public class OutboxRepositoryImpl implements OutboxRepository { transaction.rollback(); success = false; } finally { - tenantEntityManager.reloadTenantFilters(); + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); @@ -202,8 +217,11 @@ public class OutboxRepositoryImpl implements OutboxRepository { Boolean success = false; try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try { + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); @@ -243,7 +261,8 @@ public class OutboxRepositoryImpl implements OutboxRepository { transaction.rollback(); success = false; } finally { - tenantEntityManager.reloadTenantFilters(); + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); } } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); @@ -253,14 +272,32 @@ public class OutboxRepositoryImpl implements OutboxRepository { @Override public void handleConfirm(List confirmedMessages) { - EntityTransaction transaction = null; - try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try { + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); + this.handleConfirmWithRetries(entityManager, confirmedMessages); + + } catch (Exception ex) { + logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } finally { + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); + } + } catch (Exception ex) { + logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + } + + private void handleConfirmWithRetries(EntityManager entityManager, List confirmedMessages) throws InterruptedException { + EntityTransaction transaction = null; + for (int i = 0; i < this.outboxProperties.getHandleAckRetries() + 1; i++) { + try { transaction = entityManager.getTransaction(); transaction.begin(); @@ -272,37 +309,55 @@ public class OutboxRepositoryImpl implements OutboxRepository { } else { for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) { + queueOutboxMessage = queryFactory.query(QueueOutboxQuery.class).ids(queueOutboxMessage.getId()).first(); queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED); queueOutboxMessage.setConfirmedAt(Instant.now()); entityManager.merge(queueOutboxMessage); + entityManager.flush(); } - - entityManager.flush(); } transaction.commit(); + return; + } catch (OptimisticLockException ex) { + logger.warn("Problem handle ack {}. Rolling back any message emit db changes and marking error. Retrying...", confirmedMessages.stream().map(UUID::toString).collect(Collectors.joining(","))); + if (transaction != null) transaction.rollback(); + Thread.sleep(this.outboxProperties.getHandleAckWaitInMilliSeconds()); } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); - if (transaction != null) - transaction.rollback(); - } finally { - tenantEntityManager.reloadTenantFilters(); + if (transaction != null) transaction.rollback(); + return; } - } catch (Exception ex) { - logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); } } @Override public void handleNack(List nackedMessages) { - EntityTransaction transaction = null; - try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try { + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); + this.handleNackWithRetries(entityManager, nackedMessages); + } catch (Exception ex) { + logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } finally { + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); + } + } catch (Exception ex) { + logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + } + + private void handleNackWithRetries(EntityManager entityManager, List nackedMessages) throws InterruptedException { + EntityTransaction transaction = null; + for (int i = 0; i < this.outboxProperties.getHandleNackRetries() + 1; i++) { + try { transaction = entityManager.getTransaction(); transaction.begin(); @@ -323,15 +378,16 @@ public class OutboxRepositoryImpl implements OutboxRepository { } transaction.commit(); + return; + } catch (OptimisticLockException ex) { + logger.warn("Problem handle nack {}. Rolling back any message emit db changes and marking error. Retrying...", nackedMessages.stream().map(UUID::toString).collect(Collectors.joining(","))); + if (transaction != null) transaction.rollback(); + Thread.sleep(this.outboxProperties.getHandleNackRetries()); } catch (Exception ex) { logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); - if (transaction != null) - transaction.rollback(); - } finally { - tenantEntityManager.reloadTenantFilters(); + if (transaction != null) transaction.rollback(); + return; } - } catch (Exception ex) { - logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); } } @@ -341,8 +397,11 @@ public class OutboxRepositoryImpl implements OutboxRepository { QueueOutboxEntity queueMessage = null; boolean success; try (FakeRequestScope ignored = new FakeRequestScope()) { - TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); - try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { + TenantEntityManager tenantEntityManager = null; + EntityManager entityManager = null; + try { + tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); + entityManager = this.entityManagerFactory.createEntityManager(); tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.disableTenantFilters(); @@ -362,7 +421,8 @@ public class OutboxRepositoryImpl implements OutboxRepository { if (transaction != null) transaction.rollback(); } finally { - tenantEntityManager.reloadTenantFilters(); + if (entityManager != null) entityManager.close(); + if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters(); } } catch (Exception ex) { success = false; diff --git a/backend/web/src/main/resources/config/queue.yml b/backend/web/src/main/resources/config/queue.yml index a3d217e24..f8eb40e76 100644 --- a/backend/web/src/main/resources/config/queue.yml +++ b/backend/web/src/main/resources/config/queue.yml @@ -25,6 +25,10 @@ queue: enable: true options: exchange: null + handleAckRetries: 3 + handleNackRetries: 3 + handleAckWaitInMilliSeconds: 50 + handleNackWaitInMilliSeconds: 50 tenant-default-locale-removal-topic: tenant_default_locale.remove tenant-default-locale-touched-topic: tenant_default_locale.touch forget-me-completed-topic: forgetme.completed