Notification service config update

This commit is contained in:
Thomas Georgios Giannos 2024-01-12 13:55:50 +02:00
parent 2fb387825d
commit 93ce7ecd6d
6 changed files with 54 additions and 50 deletions

View File

@ -50,7 +50,7 @@ public class OutboxRepositoryImpl implements OutboxRepository {
EntityTransaction transaction = null; EntityTransaction transaction = null;
EntityManager entityManager = null; EntityManager entityManager = null;
CandidateInfo candidate = null; CandidateInfo candidate = null;
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
try { try {
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
@ -86,18 +86,18 @@ public class OutboxRepositoryImpl implements OutboxRepository {
transaction.commit(); transaction.commit();
} catch (OptimisticLockException ex) { } catch (OptimisticLockException ex) {
// we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working // we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working
this.logger.debug("Concurrency exception getting queue outbox. Skipping: {} ", ex.getMessage()); logger.debug("Concurrency exception getting queue outbox. Skipping: {} ", ex.getMessage());
if (transaction != null) transaction.rollback(); if (transaction != null) transaction.rollback();
candidate = null; candidate = null;
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
if (transaction != null) transaction.rollback(); if (transaction != null) transaction.rollback();
candidate = null; candidate = null;
} finally { } finally {
if (entityManager != null) entityManager.close(); if (entityManager != null) entityManager.close();
} }
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
} }
return candidate; return candidate;
@ -107,9 +107,9 @@ public class OutboxRepositoryImpl implements OutboxRepository {
public Boolean shouldOmit(CandidateInfo candidate, Function<QueueOutbox, Boolean> shouldOmit) { public Boolean shouldOmit(CandidateInfo candidate, Function<QueueOutbox, Boolean> shouldOmit) {
EntityTransaction transaction = null; EntityTransaction transaction = null;
EntityManager entityManager = null; EntityManager entityManager = null;
Boolean success = false; boolean success = false;
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
try { try {
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
@ -122,7 +122,7 @@ public class OutboxRepositoryImpl implements OutboxRepository {
QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidate.getId()).first(); QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidate.getId()).first();
if (item == null) { if (item == null) {
this.logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidate.getId()); logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidate.getId());
} else { } else {
if (shouldOmit.apply(item)) { if (shouldOmit.apply(item)) {
item.setNotifyStatus(QueueOutboxNotifyStatus.OMITTED); item.setNotifyStatus(QueueOutboxNotifyStatus.OMITTED);
@ -135,14 +135,14 @@ public class OutboxRepositoryImpl implements OutboxRepository {
transaction.commit(); transaction.commit();
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) transaction.rollback(); if (transaction != null) transaction.rollback();
success = false; success = false;
} finally { } finally {
if (entityManager != null) entityManager.close(); if (entityManager != null) entityManager.close();
} }
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
} }
return success; return success;
} }
@ -151,9 +151,9 @@ public class OutboxRepositoryImpl implements OutboxRepository {
public Boolean shouldWait(CandidateInfo candidate, Function<QueueOutbox, Boolean> itIsTimeFunc) { public Boolean shouldWait(CandidateInfo candidate, Function<QueueOutbox, Boolean> itIsTimeFunc) {
EntityTransaction transaction = null; EntityTransaction transaction = null;
EntityManager entityManager = null; EntityManager entityManager = null;
Boolean success = false; boolean success = false;
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
try { try {
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
@ -180,14 +180,14 @@ public class OutboxRepositoryImpl implements OutboxRepository {
} }
transaction.commit(); transaction.commit();
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) transaction.rollback(); if (transaction != null) transaction.rollback();
success = false; success = false;
} finally { } finally {
if (entityManager != null) entityManager.close(); if (entityManager != null) entityManager.close();
} }
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
} }
return success; return success;
} }
@ -198,7 +198,7 @@ public class OutboxRepositoryImpl implements OutboxRepository {
EntityManager entityManager = null; EntityManager entityManager = null;
Boolean success = false; Boolean success = false;
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
try { try {
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
@ -211,7 +211,7 @@ public class OutboxRepositoryImpl implements OutboxRepository {
QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidateInfo.getId()).first(); QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidateInfo.getId()).first();
if (item == null) { if (item == null) {
this.logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidateInfo.getId()); logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidateInfo.getId());
} else { } else {
success = publish.apply(item); success = publish.apply(item);
@ -235,14 +235,14 @@ public class OutboxRepositoryImpl implements OutboxRepository {
transaction.commit(); transaction.commit();
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) transaction.rollback(); if (transaction != null) transaction.rollback();
success = false; success = false;
} finally { } finally {
if (entityManager != null) entityManager.close(); if (entityManager != null) entityManager.close();
} }
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
} }
return success; return success;
} }
@ -252,7 +252,7 @@ public class OutboxRepositoryImpl implements OutboxRepository {
EntityTransaction transaction = null; EntityTransaction transaction = null;
EntityManager entityManager = null; EntityManager entityManager = null;
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
try { try {
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
@ -265,7 +265,7 @@ public class OutboxRepositoryImpl implements OutboxRepository {
List<QueueOutboxEntity> queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(confirmedMessages).collect(); List<QueueOutboxEntity> queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(confirmedMessages).collect();
if (queueOutboxMessages == null) { if (queueOutboxMessages == null) {
this.logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", confirmedMessages.stream().map(x -> x.toString()).collect(Collectors.toList()))); logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", confirmedMessages.stream().map(x -> x.toString()).collect(Collectors.toList())));
} else { } else {
for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) { for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) {
@ -279,13 +279,13 @@ public class OutboxRepositoryImpl implements OutboxRepository {
transaction.commit(); transaction.commit();
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) transaction.rollback(); if (transaction != null) transaction.rollback();
} finally { } finally {
if (entityManager != null) entityManager.close(); if (entityManager != null) entityManager.close();
} }
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
} }
} }
@ -294,7 +294,7 @@ public class OutboxRepositoryImpl implements OutboxRepository {
EntityTransaction transaction = null; EntityTransaction transaction = null;
EntityManager entityManager = null; EntityManager entityManager = null;
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
try { try {
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
@ -307,7 +307,7 @@ public class OutboxRepositoryImpl implements OutboxRepository {
List<QueueOutboxEntity> queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(nackedMessages).collect(); List<QueueOutboxEntity> queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(nackedMessages).collect();
if (queueOutboxMessages == null) { if (queueOutboxMessages == null) {
this.logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", nackedMessages.stream().map(x -> x.toString()).collect(Collectors.toList()))); logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", nackedMessages.stream().map(x -> x.toString()).collect(Collectors.toList())));
} else { } else {
for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) { for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) {
@ -321,13 +321,13 @@ public class OutboxRepositoryImpl implements OutboxRepository {
transaction.commit(); transaction.commit();
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) transaction.rollback(); if (transaction != null) transaction.rollback();
} finally { } finally {
if (entityManager != null) entityManager.close(); if (entityManager != null) entityManager.close();
} }
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
} }
} }
@ -335,9 +335,9 @@ public class OutboxRepositoryImpl implements OutboxRepository {
public QueueOutbox create(IntegrationEvent item) { public QueueOutbox create(IntegrationEvent item) {
EntityTransaction transaction = null; EntityTransaction transaction = null;
EntityManager entityManager = null; EntityManager entityManager = null;
Boolean success = false; boolean success = false;
QueueOutboxEntity queueMessage = null; QueueOutboxEntity queueMessage = null;
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { try (FakeRequestScope ignored = new FakeRequestScope()) {
try { try {
queueMessage = this.mapEvent((OutboxIntegrationEvent) item); queueMessage = this.mapEvent((OutboxIntegrationEvent) item);
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
@ -352,14 +352,14 @@ public class OutboxRepositoryImpl implements OutboxRepository {
transaction.commit(); transaction.commit();
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) transaction.rollback(); if (transaction != null) transaction.rollback();
success = false; success = false;
} finally { } finally {
if (entityManager != null) entityManager.close(); if (entityManager != null) entityManager.close();
} }
} catch (Exception ex) { } catch (Exception ex) {
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
} }
return queueMessage; return queueMessage;
} }

View File

@ -15,4 +15,4 @@ queue:
options: options:
exchange: cite_dmp_devel_queue exchange: cite_dmp_devel_queue
rabbitmq: rabbitmq:
enable: false enable: true

View File

@ -26,11 +26,6 @@ queue:
options: options:
exchange: null exchange: null
forget-me-completed-topic: forgetme.completed forget-me-completed-topic: forgetme.completed
notify-topic: notification.notify
tenant-reactivation-topic: tenant.reactivated
tenant-removal-topic: tenant.remove
tenant-touch-topic: tenant.touch
tenant-user-invite-topic: tenant.invite
what-you-know-about-me-completed-topic: whatyouknowaboutme.completed what-you-know-about-me-completed-topic: whatyouknowaboutme.completed
generate-file-topic: generate.file generate-file-topic: generate.file
rabbitmq: rabbitmq:
@ -46,8 +41,11 @@ queue:
enable: false enable: false
options: options:
exchange: null exchange: null
user-removal-topic: [ "user.remove" ] notify-topic: notification.notify
user-touched-topic: [ "user.touch" ] tenant-removal-topic: tenant.remove
tenant-touch-topic: tenant.touch
user-removal-topic: user.remove
user-touch-topic: user.touch
rabbitmq: rabbitmq:
enable: false enable: false
interval-seconds: 30 interval-seconds: 30

View File

@ -32,6 +32,9 @@ public class AppRabbitConfigurer extends RabbitConfigurer {
@Bean @Bean
public InboxBindings inboxBindingsCreator() { public InboxBindings inboxBindingsCreator() {
List<String> bindingItems = new ArrayList<>(); List<String> bindingItems = new ArrayList<>();
bindingItems.addAll(this.inboxProperties.getNotifyTopic());
bindingItems.addAll(this.inboxProperties.getTenantRemovalTopic());
bindingItems.addAll(this.inboxProperties.getTenantTouchedTopic());
bindingItems.addAll(this.inboxProperties.getUserRemovalTopic()); bindingItems.addAll(this.inboxProperties.getUserRemovalTopic());
bindingItems.addAll(this.inboxProperties.getUserTouchedTopic()); bindingItems.addAll(this.inboxProperties.getUserTouchedTopic());

View File

@ -1,6 +1,7 @@
package gr.cite.notification.integrationevent.inbox; package gr.cite.notification.integrationevent.inbox;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.ConstructorBinding;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
@ -8,6 +9,7 @@ import java.util.List;
@Validated @Validated
@ConfigurationProperties(prefix = "queue.task.listener.options") @ConfigurationProperties(prefix = "queue.task.listener.options")
@ConstructorBinding
public class InboxProperties { public class InboxProperties {
@NotNull @NotNull
@ -17,7 +19,7 @@ public class InboxProperties {
private final List<String> notifyTopic; private final List<String> notifyTopic;
@NotNull @NotNull
private final List<String> tenantRemovedTopic; private final List<String> tenantRemovalTopic;
@NotNull @NotNull
private final List<String> tenantTouchedTopic; private final List<String> tenantTouchedTopic;
@ -29,12 +31,15 @@ public class InboxProperties {
private final List<String> userTouchedTopic; private final List<String> userTouchedTopic;
public InboxProperties( public InboxProperties(
String exchange, List<String> notifyTopic, List<String> tenantRemovedTopic, List<String> tenantTouchedTopic, String exchange,
List<String> notifyTopic,
List<String> tenantRemovalTopic,
List<String> tenantTouchedTopic,
List<String> userRemovalTopic, List<String> userRemovalTopic,
List<String> userTouchedTopic) { List<String> userTouchedTopic) {
this.exchange = exchange; this.exchange = exchange;
this.notifyTopic = notifyTopic; this.notifyTopic = notifyTopic;
this.tenantRemovedTopic = tenantRemovedTopic; this.tenantRemovalTopic = tenantRemovalTopic;
this.tenantTouchedTopic = tenantTouchedTopic; this.tenantTouchedTopic = tenantTouchedTopic;
this.userRemovalTopic = userRemovalTopic; this.userRemovalTopic = userRemovalTopic;
this.userTouchedTopic = userTouchedTopic; this.userTouchedTopic = userTouchedTopic;
@ -44,8 +49,8 @@ public class InboxProperties {
return notifyTopic; return notifyTopic;
} }
public List<String> getTenantRemovedTopic() { public List<String> getTenantRemovalTopic() {
return tenantRemovedTopic; return tenantRemovalTopic;
} }
public List<String> getTenantTouchedTopic() { public List<String> getTenantTouchedTopic() {

View File

@ -26,14 +26,12 @@ import jakarta.persistence.EntityTransaction;
import jakarta.persistence.OptimisticLockException; import jakarta.persistence.OptimisticLockException;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function; import java.util.function.Function;
@Component
public class InboxRepositoryImpl implements InboxRepository { public class InboxRepositoryImpl implements InboxRepository {
private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(InboxRepositoryImpl.class)); private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(InboxRepositoryImpl.class));
@ -325,15 +323,15 @@ public class InboxRepositoryImpl implements InboxRepository {
private EventProcessingStatus processMessage(String routingKey, String messageId, String appId, String message) { private EventProcessingStatus processMessage(String routingKey, String messageId, String appId, String message) {
IntegrationEventHandler handler; IntegrationEventHandler handler;
if (this.RoutingKeyMatched(routingKey, this.inboxProperties.getNotifyTopic())) if (this.routingKeyMatched(routingKey, this.inboxProperties.getNotifyTopic()))
handler = this.applicationContext.getBean(NotifyIntegrationEventHandler.class); handler = this.applicationContext.getBean(NotifyIntegrationEventHandler.class);
else if (this.RoutingKeyMatched(routingKey, this.inboxProperties.getTenantRemovedTopic())) else if (this.routingKeyMatched(routingKey, this.inboxProperties.getTenantRemovalTopic()))
handler = this.applicationContext.getBean(TenantRemovalIntegrationEventHandler.class); handler = this.applicationContext.getBean(TenantRemovalIntegrationEventHandler.class);
else if (this.RoutingKeyMatched(routingKey, this.inboxProperties.getTenantTouchedTopic())) else if (this.routingKeyMatched(routingKey, this.inboxProperties.getTenantTouchedTopic()))
handler = this.applicationContext.getBean(TenantTouchedIntegrationEventHandler.class); handler = this.applicationContext.getBean(TenantTouchedIntegrationEventHandler.class);
else if (this.RoutingKeyMatched(routingKey, this.inboxProperties.getUserRemovalTopic())) else if (this.routingKeyMatched(routingKey, this.inboxProperties.getUserRemovalTopic()))
handler = this.applicationContext.getBean(UserRemovalIntegrationEventHandler.class); handler = this.applicationContext.getBean(UserRemovalIntegrationEventHandler.class);
else if (this.RoutingKeyMatched(routingKey, this.inboxProperties.getUserTouchedTopic())) else if (this.routingKeyMatched(routingKey, this.inboxProperties.getUserTouchedTopic()))
handler = this.applicationContext.getBean(UserTouchedIntegrationEventHandler.class); handler = this.applicationContext.getBean(UserTouchedIntegrationEventHandler.class);
else else
handler = null; handler = null;
@ -357,7 +355,7 @@ public class InboxRepositoryImpl implements InboxRepository {
// } // }
} }
private Boolean RoutingKeyMatched(String routingKey, List<String> topics) { private Boolean routingKeyMatched(String routingKey, List<String> topics) {
if (topics == null || topics.isEmpty()) if (topics == null || topics.isEmpty())
return false; return false;
return topics.stream().anyMatch(x -> x.equals(routingKey)); return topics.stream().anyMatch(x -> x.equals(routingKey));