fix OptimisticLockException for ack recieved before process status saved

This commit is contained in:
Efstratios Giannopoulos 2024-06-28 14:32:37 +03:00
parent 7f4b203eb1
commit f6c0c118d7
5 changed files with 196 additions and 84 deletions

View File

@ -48,8 +48,12 @@ public class InboxRepositoryImpl implements InboxRepository {
EntityTransaction transaction = null; EntityTransaction transaction = null;
CandidateInfo candidate = null; CandidateInfo candidate = null;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try{
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
@ -92,7 +96,8 @@ public class InboxRepositoryImpl implements InboxRepository {
transaction.rollback(); transaction.rollback();
candidate = null; candidate = null;
} finally { } finally {
tenantEntityManager.reloadTenantFilters(); if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem getting list of queue inbox. Skipping: {}", ex.getMessage(), ex); logger.error("Problem getting list of queue inbox. Skipping: {}", ex.getMessage(), ex);
@ -107,8 +112,12 @@ public class InboxRepositoryImpl implements InboxRepository {
boolean success = false; boolean success = false;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try{
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
@ -138,7 +147,8 @@ public class InboxRepositoryImpl implements InboxRepository {
transaction.rollback(); transaction.rollback();
success = false; success = false;
} finally { } finally {
tenantEntityManager.reloadTenantFilters(); if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
@ -152,8 +162,12 @@ public class InboxRepositoryImpl implements InboxRepository {
boolean success = false; boolean success = false;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try{
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
@ -184,7 +198,8 @@ public class InboxRepositoryImpl implements InboxRepository {
transaction.rollback(); transaction.rollback();
success = false; success = false;
} finally { } finally {
tenantEntityManager.reloadTenantFilters(); if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
@ -198,8 +213,12 @@ public class InboxRepositoryImpl implements InboxRepository {
boolean success; boolean success;
QueueInboxEntity queueMessage = null; QueueInboxEntity queueMessage = null;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try{
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
@ -223,7 +242,8 @@ public class InboxRepositoryImpl implements InboxRepository {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) transaction.rollback(); if (transaction != null) transaction.rollback();
} finally { } finally {
tenantEntityManager.reloadTenantFilters(); if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) { } catch (Exception ex) {
success = false; success = false;
@ -272,8 +292,12 @@ public class InboxRepositoryImpl implements InboxRepository {
} else { } else {
EventProcessingStatus status = this.emitQueueInboxEntity(queueInboxMessage); EventProcessingStatus status = this.emitQueueInboxEntity(queueInboxMessage);
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try{
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
@ -313,7 +337,8 @@ public class InboxRepositoryImpl implements InboxRepository {
transaction.rollback(); transaction.rollback();
success = false; success = false;
} finally { } finally {
tenantEntityManager.reloadTenantFilters(); if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
@ -327,8 +352,12 @@ public class InboxRepositoryImpl implements InboxRepository {
EntityTransaction transaction = null; EntityTransaction transaction = null;
EventProcessingStatus status = EventProcessingStatus.Discard; EventProcessingStatus status = EventProcessingStatus.Discard;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { TenantEntityManager tenantEntityManager = null;
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); EntityManager entityManager = null;
try{
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
transaction = entityManager.getTransaction(); transaction = entityManager.getTransaction();
@ -350,6 +379,8 @@ public class InboxRepositoryImpl implements InboxRepository {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) if (transaction != null)
transaction.rollback(); transaction.rollback();
} finally {
if (entityManager != null) entityManager.close();
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);

View File

@ -41,7 +41,6 @@ import org.springframework.stereotype.Component;
import javax.management.InvalidApplicationException; import javax.management.InvalidApplicationException;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
@Component @Component
@ -93,19 +92,19 @@ public class AnnotationEntityCreatedIntegrationEventHandlerImpl implements Annot
try { try {
if (this.tenantScope.isMultitenant() && properties.getTenantId() != null) { if (this.tenantScope.isMultitenant() && properties.getTenantId() != null) {
TenantEntity tenant = queryFactory.query(TenantQuery.class).disableTracking().ids(properties.getTenantId()).firstAs(new BaseFieldSet().ensure(Tenant._id).ensure(Tenant._code)); TenantEntity tenant = this.queryFactory.query(TenantQuery.class).disableTracking().ids(properties.getTenantId()).firstAs(new BaseFieldSet().ensure(Tenant._id).ensure(Tenant._code));
if (tenant == null) { if (tenant == null) {
logger.error("missing tenant from event message"); logger.error("missing tenant from event message");
return EventProcessingStatus.Error; return EventProcessingStatus.Error;
} }
this.tenantScope.setTempTenant(tenantEntityManager, properties.getTenantId(), tenant.getCode()); this.tenantScope.setTempTenant(this.tenantEntityManager, properties.getTenantId(), tenant.getCode());
} else if (this.tenantScope.isMultitenant()) { } else if (this.tenantScope.isMultitenant()) {
this.tenantScope.setTempTenant(tenantEntityManager, null, this.tenantScope.getDefaultTenantCode()); this.tenantScope.setTempTenant(this.tenantEntityManager, null, this.tenantScope.getDefaultTenantCode());
} }
currentPrincipalResolver.push(InboxPrincipal.build(properties, claimExtractorProperties)); this.currentPrincipalResolver.push(InboxPrincipal.build(properties, this.claimExtractorProperties));
this.sendNotification(event); this.sendNotification(event);
auditService.track(AuditableAction.Annotation_Created_Notify, Map.ofEntries( this.auditService.track(AuditableAction.Annotation_Created_Notify, Map.ofEntries(
new AbstractMap.SimpleEntry<String, Object>("model", event) new AbstractMap.SimpleEntry<String, Object>("model", event)
)); ));
@ -113,9 +112,9 @@ public class AnnotationEntityCreatedIntegrationEventHandlerImpl implements Annot
status = EventProcessingStatus.Error; status = EventProcessingStatus.Error;
logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
} finally { } finally {
currentPrincipalResolver.pop(); this.currentPrincipalResolver.pop();
try { try {
tenantScope.removeTempTenant(this.tenantEntityManager); this.tenantScope.removeTempTenant(this.tenantEntityManager);
this.tenantEntityManager.reloadTenantFilters(); this.tenantEntityManager.reloadTenantFilters();
} catch (InvalidApplicationException e) { } catch (InvalidApplicationException e) {
} }
@ -160,7 +159,7 @@ public class AnnotationEntityCreatedIntegrationEventHandlerImpl implements Annot
NotifyIntegrationEvent notifyIntegrationEvent = new NotifyIntegrationEvent(); NotifyIntegrationEvent notifyIntegrationEvent = new NotifyIntegrationEvent();
notifyIntegrationEvent.setUserId(user.getId()); notifyIntegrationEvent.setUserId(user.getId());
notifyIntegrationEvent.setNotificationType(notificationProperties.getDescriptionAnnotationCreated()); notifyIntegrationEvent.setNotificationType(this.notificationProperties.getDescriptionAnnotationCreated());
NotificationFieldData data = new NotificationFieldData(); NotificationFieldData data = new NotificationFieldData();
List<FieldInfo> fieldInfoList = new ArrayList<>(); List<FieldInfo> fieldInfoList = new ArrayList<>();

View File

@ -2,12 +2,14 @@ package org.opencdmp.integrationevent.outbox;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.List;
@ConfigurationProperties(prefix = "queue.task.publisher.options") @ConfigurationProperties(prefix = "queue.task.publisher.options")
public class OutboxProperties { public class OutboxProperties {
private final String exchange; private final String exchange;
private final int handleAckRetries;
private final int handleNackRetries;
private final int handleAckWaitInMilliSeconds;
private final int handleNackWaitInMilliSeconds;
private final String tenantDefaultLocaleRemovalTopic; private final String tenantDefaultLocaleRemovalTopic;
private final String tenantDefaultLocaleTouchedTopic; private final String tenantDefaultLocaleTouchedTopic;
@ -38,8 +40,8 @@ public class OutboxProperties {
private final String generateFileTopic; private final String generateFileTopic;
public OutboxProperties(String exchange, public OutboxProperties(String exchange, int handleAckRetries, int handleNackRetries, int handleAckWaitInMilliSeconds, int handleNackWaitInMilliSeconds,
String tenantDefaultLocaleRemovalTopic, String tenantDefaultLocaleRemovalTopic,
String tenantDefaultLocaleTouchedTopic, String tenantDefaultLocaleTouchedTopic,
String tenantTouchTopic, String tenantTouchTopic,
String tenantRemovalTopic, String tenantRemovalTopic,
@ -57,6 +59,10 @@ public class OutboxProperties {
String generateFileTopic String generateFileTopic
) { ) {
this.exchange = exchange; this.exchange = exchange;
this.handleAckRetries = handleAckRetries;
this.handleNackRetries = handleNackRetries;
this.handleAckWaitInMilliSeconds = handleAckWaitInMilliSeconds;
this.handleNackWaitInMilliSeconds = handleNackWaitInMilliSeconds;
this.tenantDefaultLocaleRemovalTopic = tenantDefaultLocaleRemovalTopic; this.tenantDefaultLocaleRemovalTopic = tenantDefaultLocaleRemovalTopic;
this.tenantDefaultLocaleTouchedTopic = tenantDefaultLocaleTouchedTopic; this.tenantDefaultLocaleTouchedTopic = tenantDefaultLocaleTouchedTopic;
this.tenantTouchTopic = tenantTouchTopic; this.tenantTouchTopic = tenantTouchTopic;
@ -76,70 +82,82 @@ public class OutboxProperties {
} }
public String getExchange() { public String getExchange() {
return exchange; return this.exchange;
}
public int getHandleAckRetries() {
return this.handleAckRetries;
}
public int getHandleNackRetries() {
return this.handleNackRetries;
}
public int getHandleAckWaitInMilliSeconds() {
return this.handleAckWaitInMilliSeconds;
} }
public String getTenantDefaultLocaleRemovalTopic() { public String getTenantDefaultLocaleRemovalTopic() {
return tenantDefaultLocaleRemovalTopic; return this.tenantDefaultLocaleRemovalTopic;
} }
public String getTenantDefaultLocaleTouchedTopic() { public String getTenantDefaultLocaleTouchedTopic() {
return tenantDefaultLocaleTouchedTopic; return this.tenantDefaultLocaleTouchedTopic;
} }
public String getTenantTouchTopic() { public String getTenantTouchTopic() {
return tenantTouchTopic; return this.tenantTouchTopic;
} }
public String getTenantRemovalTopic() { public String getTenantRemovalTopic() {
return tenantRemovalTopic; return this.tenantRemovalTopic;
} }
public String getTenantReactivationTopic() { public String getTenantReactivationTopic() {
return tenantReactivationTopic; return this.tenantReactivationTopic;
} }
public String getTenantUserInviteTopic() { public String getTenantUserInviteTopic() {
return tenantUserInviteTopic; return this.tenantUserInviteTopic;
} }
public String getUserRemovalTopic() { public String getUserRemovalTopic() {
return userRemovalTopic; return this.userRemovalTopic;
} }
public String getUserTouchTopic() { public String getUserTouchTopic() {
return userTouchTopic; return this.userTouchTopic;
} }
public String getDmpTouchTopic() { public String getDmpTouchTopic() {
return dmpTouchTopic; return this.dmpTouchTopic;
} }
public String getDescriptionTouchTopic() { public String getDescriptionTouchTopic() {
return descriptionTouchTopic; return this.descriptionTouchTopic;
} }
public String getAnnotationEntitiesTouchTopic() { public String getAnnotationEntitiesTouchTopic() {
return annotationEntitiesTouchTopic; return this.annotationEntitiesTouchTopic;
} }
public String getAnnotationEntitiesRemovalTopic() { public String getAnnotationEntitiesRemovalTopic() {
return annotationEntitiesRemovalTopic; return this.annotationEntitiesRemovalTopic;
} }
public String getNotifyTopic() { public String getNotifyTopic() {
return notifyTopic; return this.notifyTopic;
} }
public String getForgetMeCompletedTopic() { public String getForgetMeCompletedTopic() {
return forgetMeCompletedTopic; return this.forgetMeCompletedTopic;
} }
public String getWhatYouKnowAboutMeCompletedTopic() { public String getWhatYouKnowAboutMeCompletedTopic() {
return whatYouKnowAboutMeCompletedTopic; return this.whatYouKnowAboutMeCompletedTopic;
} }
public String getGenerateFileTopic() { public String getGenerateFileTopic() {
return generateFileTopic; return this.generateFileTopic;
} }
} }

View File

@ -16,6 +16,7 @@ import org.opencdmp.commons.fake.FakeRequestScope;
import org.opencdmp.data.QueueOutboxEntity; import org.opencdmp.data.QueueOutboxEntity;
import org.opencdmp.data.TenantEntityManager; import org.opencdmp.data.TenantEntityManager;
import org.opencdmp.query.QueueOutboxQuery; import org.opencdmp.query.QueueOutboxQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
@ -27,6 +28,7 @@ import java.util.stream.Collectors;
public class OutboxRepositoryImpl implements OutboxRepository { public class OutboxRepositoryImpl implements OutboxRepository {
private static final Logger log = LoggerFactory.getLogger(OutboxRepositoryImpl.class);
protected final ApplicationContext applicationContext; protected final ApplicationContext applicationContext;
private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(OutboxRepositoryImpl.class)); private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(OutboxRepositoryImpl.class));
@ -50,8 +52,12 @@ public class OutboxRepositoryImpl implements OutboxRepository {
EntityTransaction transaction = null; EntityTransaction transaction = null;
CandidateInfo candidate = null; CandidateInfo candidate = null;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try{
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
@ -96,7 +102,8 @@ public class OutboxRepositoryImpl implements OutboxRepository {
transaction.rollback(); transaction.rollback();
candidate = null; candidate = null;
} finally { } finally {
tenantEntityManager.reloadTenantFilters(); if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
@ -111,8 +118,11 @@ public class OutboxRepositoryImpl implements OutboxRepository {
boolean success = false; boolean success = false;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try{
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
@ -142,7 +152,8 @@ public class OutboxRepositoryImpl implements OutboxRepository {
transaction.rollback(); transaction.rollback();
success = false; success = false;
} finally { } finally {
tenantEntityManager.reloadTenantFilters(); if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
@ -156,8 +167,11 @@ public class OutboxRepositoryImpl implements OutboxRepository {
boolean success = false; boolean success = false;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try{
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
@ -188,7 +202,8 @@ public class OutboxRepositoryImpl implements OutboxRepository {
transaction.rollback(); transaction.rollback();
success = false; success = false;
} finally { } finally {
tenantEntityManager.reloadTenantFilters(); if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
@ -202,8 +217,11 @@ public class OutboxRepositoryImpl implements OutboxRepository {
Boolean success = false; Boolean success = false;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try {
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
@ -243,7 +261,8 @@ public class OutboxRepositoryImpl implements OutboxRepository {
transaction.rollback(); transaction.rollback();
success = false; success = false;
} finally { } finally {
tenantEntityManager.reloadTenantFilters(); if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
@ -253,14 +272,32 @@ public class OutboxRepositoryImpl implements OutboxRepository {
@Override @Override
public void handleConfirm(List<UUID> confirmedMessages) { public void handleConfirm(List<UUID> confirmedMessages) {
EntityTransaction transaction = null;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try {
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
this.handleConfirmWithRetries(entityManager, confirmedMessages);
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
} finally {
if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
}
private void handleConfirmWithRetries(EntityManager entityManager, List<UUID> confirmedMessages) throws InterruptedException {
EntityTransaction transaction = null;
for (int i = 0; i < this.outboxProperties.getHandleAckRetries() + 1; i++) {
try {
transaction = entityManager.getTransaction(); transaction = entityManager.getTransaction();
transaction.begin(); transaction.begin();
@ -272,37 +309,55 @@ public class OutboxRepositoryImpl implements OutboxRepository {
} else { } else {
for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) { for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) {
queueOutboxMessage = queryFactory.query(QueueOutboxQuery.class).ids(queueOutboxMessage.getId()).first();
queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED); queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED);
queueOutboxMessage.setConfirmedAt(Instant.now()); queueOutboxMessage.setConfirmedAt(Instant.now());
entityManager.merge(queueOutboxMessage); entityManager.merge(queueOutboxMessage);
entityManager.flush();
} }
entityManager.flush();
} }
transaction.commit(); transaction.commit();
return;
} catch (OptimisticLockException ex) {
logger.warn("Problem handle ack {}. Rolling back any message emit db changes and marking error. Retrying...", confirmedMessages.stream().map(UUID::toString).collect(Collectors.joining(",")));
if (transaction != null) transaction.rollback();
Thread.sleep(this.outboxProperties.getHandleAckWaitInMilliSeconds());
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) if (transaction != null) transaction.rollback();
transaction.rollback(); return;
} finally {
tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
} }
} }
@Override @Override
public void handleNack(List<UUID> nackedMessages) { public void handleNack(List<UUID> nackedMessages) {
EntityTransaction transaction = null;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try {
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
this.handleNackWithRetries(entityManager, nackedMessages);
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
} finally {
if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
}
private void handleNackWithRetries(EntityManager entityManager, List<UUID> nackedMessages) throws InterruptedException {
EntityTransaction transaction = null;
for (int i = 0; i < this.outboxProperties.getHandleNackRetries() + 1; i++) {
try {
transaction = entityManager.getTransaction(); transaction = entityManager.getTransaction();
transaction.begin(); transaction.begin();
@ -323,15 +378,16 @@ public class OutboxRepositoryImpl implements OutboxRepository {
} }
transaction.commit(); transaction.commit();
return;
} catch (OptimisticLockException ex) {
logger.warn("Problem handle nack {}. Rolling back any message emit db changes and marking error. Retrying...", nackedMessages.stream().map(UUID::toString).collect(Collectors.joining(",")));
if (transaction != null) transaction.rollback();
Thread.sleep(this.outboxProperties.getHandleNackRetries());
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) if (transaction != null) transaction.rollback();
transaction.rollback(); return;
} finally {
tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
} }
} }
@ -341,8 +397,11 @@ public class OutboxRepositoryImpl implements OutboxRepository {
QueueOutboxEntity queueMessage = null; QueueOutboxEntity queueMessage = null;
boolean success; boolean success;
try (FakeRequestScope ignored = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); TenantEntityManager tenantEntityManager = null;
try (EntityManager entityManager = this.entityManagerFactory.createEntityManager()) { EntityManager entityManager = null;
try {
tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class);
entityManager = this.entityManagerFactory.createEntityManager();
tenantEntityManager.setEntityManager(entityManager); tenantEntityManager.setEntityManager(entityManager);
tenantEntityManager.disableTenantFilters(); tenantEntityManager.disableTenantFilters();
@ -362,7 +421,8 @@ public class OutboxRepositoryImpl implements OutboxRepository {
if (transaction != null) if (transaction != null)
transaction.rollback(); transaction.rollback();
} finally { } finally {
tenantEntityManager.reloadTenantFilters(); if (entityManager != null) entityManager.close();
if (tenantEntityManager != null) tenantEntityManager.reloadTenantFilters();
} }
} catch (Exception ex) { } catch (Exception ex) {
success = false; success = false;

View File

@ -25,6 +25,10 @@ queue:
enable: true enable: true
options: options:
exchange: null exchange: null
handleAckRetries: 3
handleNackRetries: 3
handleAckWaitInMilliSeconds: 50
handleNackWaitInMilliSeconds: 50
tenant-default-locale-removal-topic: tenant_default_locale.remove tenant-default-locale-removal-topic: tenant_default_locale.remove
tenant-default-locale-touched-topic: tenant_default_locale.touch tenant-default-locale-touched-topic: tenant_default_locale.touch
forget-me-completed-topic: forgetme.completed forget-me-completed-topic: forgetme.completed