From f6100a8ea2912837aad6e90008ef1bff32f59e44 Mon Sep 17 00:00:00 2001 From: amentis Date: Tue, 12 Dec 2023 10:14:26 +0200 Subject: [PATCH] add inbox, outbox --- .../java/eu/eudat/data/QueueInboxEntity.java | 177 ++++++++ .../java/eu/eudat/data/QueueOutboxEntity.java | 175 +++++++ .../integrationevent/AppRabbitConfigurer.java | 47 ++ .../InboxIntegrationEventConfigurer.java | 30 ++ .../IntegrationEventContextImpl.java | 20 + .../OutboxIntegrationEventConfigurer.java | 69 +++ .../OutboxPropertiesConfiguration.java | 11 + .../eudat/integrationevent/TrackedEvent.java | 18 + .../inbox/ConsistencyHandler.java | 5 + .../inbox/ConsistencyPredicates.java | 4 + .../inbox/EventProcessingStatus.java | 8 + .../inbox/InboxPrincipal.java | 56 +++ .../inbox/InboxProperties.java | 42 ++ .../inbox/InboxRepositoryImpl.java | 338 ++++++++++++++ .../inbox/IntegrationEventHandler.java | 5 + .../inbox/IntegrationEventProperties.java | 22 + .../UserRemovalConsistencyHandler.java | 26 ++ .../UserRemovalConsistencyPredicates.java | 21 + .../UserRemovalIntegrationEvent.java | 26 ++ .../UserRemovalIntegrationEventHandler.java | 6 + ...serRemovalIntegrationEventHandlerImpl.java | 125 +++++ .../UserTouchedIntegrationEvent.java | 45 ++ .../UserTouchedIntegrationEventHandler.java | 6 + ...serTouchedIntegrationEventHandlerImpl.java | 116 +++++ .../outbox/OutboxIntegrationEvent.java | 27 ++ .../outbox/OutboxProperties.java | 89 ++++ .../outbox/OutboxRepositoryImpl.java | 428 ++++++++++++++++++ .../outbox/OutboxService.java | 5 + .../outbox/OutboxServiceImpl.java | 43 ++ .../java/eu/eudat/query/QueueInboxQuery.java | 219 +++++++++ .../java/eu/eudat/query/QueueOutboxQuery.java | 236 ++++++++++ dmp-backend/pom.xml | 17 + .../src/main/resources/config/application.yml | 3 +- .../src/main/resources/config/queue-devel.yml | 18 + .../web/src/main/resources/config/queue.yml | 58 +++ 35 files changed, 2540 insertions(+), 1 deletion(-) create mode 100644 dmp-backend/core/src/main/java/eu/eudat/data/QueueInboxEntity.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/data/QueueOutboxEntity.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/AppRabbitConfigurer.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/InboxIntegrationEventConfigurer.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/IntegrationEventContextImpl.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/OutboxIntegrationEventConfigurer.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/OutboxPropertiesConfiguration.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/TrackedEvent.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/ConsistencyHandler.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/ConsistencyPredicates.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/EventProcessingStatus.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxPrincipal.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxProperties.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxRepositoryImpl.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/IntegrationEventHandler.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/IntegrationEventProperties.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalConsistencyHandler.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalConsistencyPredicates.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEvent.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEventHandler.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEventHandlerImpl.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEvent.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEventHandler.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEventHandlerImpl.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxIntegrationEvent.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxProperties.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxRepositoryImpl.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxService.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxServiceImpl.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/query/QueueInboxQuery.java create mode 100644 dmp-backend/core/src/main/java/eu/eudat/query/QueueOutboxQuery.java create mode 100644 dmp-backend/web/src/main/resources/config/queue-devel.yml create mode 100644 dmp-backend/web/src/main/resources/config/queue.yml diff --git a/dmp-backend/core/src/main/java/eu/eudat/data/QueueInboxEntity.java b/dmp-backend/core/src/main/java/eu/eudat/data/QueueInboxEntity.java new file mode 100644 index 000000000..dc2e18d51 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/data/QueueInboxEntity.java @@ -0,0 +1,177 @@ +package eu.eudat.data; + +import eu.eudat.commons.enums.IsActive; +import eu.eudat.data.converters.enums.IsActiveConverter; +import gr.cite.queueinbox.entity.QueueInbox; +import gr.cite.queueinbox.entity.QueueInboxStatus; +import jakarta.persistence.*; + +import java.time.Instant; +import java.util.UUID; + +@Entity +@Table(name = "\"QueueInbox\"") +public class QueueInboxEntity implements QueueInbox { + @Id + @Column(name = "\"id\"", columnDefinition = "uuid", updatable = false, nullable = false) + private UUID id; + public final static String _id = "id"; + + @Column(name = "\"queue\"", nullable = false, length = 50) + private String queue; + public final static String _queue = "queue"; + + @Column(name = "\"exchange\"", nullable = false, length = 50) + private String exchange; + public final static String _exchange = "exchange"; + + @Column(name = "\"route\"", nullable = false, length = 50) + private String route; + public final static String _route = "route"; + + @Column(name = "\"application_id\"", nullable = false, length = 100) + private String applicationId; + public final static String _applicationId = "applicationId"; + + @Column(name = "\"message_id\"", columnDefinition = "uuid", nullable = false) + private UUID messageId; + public final static String _messageId = "messageId"; + + @Column(name = "\"message\"", columnDefinition = "json", nullable = false) + private String message; + public final static String _message = "message"; + + @Column(name = "\"retry_count\"", nullable = true) + private Integer retryCount; + public final static String _retryCount = "retryCount"; + + @Column(name = "\"tenant\"", columnDefinition = "uuid", nullable = true) + private UUID tenantId; + public final static String _tenantId = "tenantId"; + + @Column(name = "\"is_active\"", length = 20, nullable = false) + @Convert(converter = IsActiveConverter.class) + private IsActive isActive; + public final static String _isActive = "isActive"; + + //TODO: as integer + @Column(name = "\"status\"", length = 50, nullable = false) + @Enumerated(EnumType.STRING) + private QueueInboxStatus status; + public final static String _status = "status"; + + @Column(name = "\"created_at\"", nullable = false) + private Instant createdAt; + public final static String _createdAt = "createdAt"; + + @Column(name = "\"updated_at\"", nullable = false) + @Version + private Instant updatedAt; + public final static String _updatedAt = "updatedAt"; + + public UUID getId() { + return id; + } + + public void setId(UUID id) { + this.id = id; + } + + public String getExchange() { + return exchange; + } + + public void setExchange(String exchange) { + this.exchange = exchange; + } + + public String getRoute() { + return route; + } + + public void setRoute(String route) { + this.route = route; + } + + public UUID getMessageId() { + return messageId; + } + + public void setMessageId(UUID messageId) { + this.messageId = messageId; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public UUID getTenantId() { + return tenantId; + } + + public void setTenantId(UUID tenantId) { + this.tenantId = tenantId; + } + + public IsActive getIsActive() { + return isActive; + } + + public void setIsActive(IsActive isActive) { + this.isActive = isActive; + } + + public Instant getCreatedAt() { + return createdAt; + } + + public void setCreatedAt(Instant createdAt) { + this.createdAt = createdAt; + } + + public Instant getUpdatedAt() { + return updatedAt; + } + + public void setUpdatedAt(Instant updatedAt) { + this.updatedAt = updatedAt; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public String getApplicationId() { + return applicationId; + } + + public void setApplicationId(String applicationId) { + this.applicationId = applicationId; + } + + @Override + public Integer getRetryCount() { + return retryCount; + } + + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; + } + + public QueueInboxStatus getStatus() { + return status; + } + + public void setStatus(QueueInboxStatus status) { + this.status = status; + } +} + diff --git a/dmp-backend/core/src/main/java/eu/eudat/data/QueueOutboxEntity.java b/dmp-backend/core/src/main/java/eu/eudat/data/QueueOutboxEntity.java new file mode 100644 index 000000000..721d5c94d --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/data/QueueOutboxEntity.java @@ -0,0 +1,175 @@ +package eu.eudat.data; + +import eu.eudat.commons.enums.IsActive; +import eu.eudat.data.converters.enums.IsActiveConverter; +import gr.cite.queueoutbox.entity.QueueOutbox; +import gr.cite.queueoutbox.entity.QueueOutboxNotifyStatus; + +import jakarta.persistence.*; +import java.time.Instant; +import java.util.UUID; + +@Entity +@Table(name = "\"QueueOutbox\"") +public class QueueOutboxEntity implements QueueOutbox { + @Id + @Column(name = "\"id\"", columnDefinition = "uuid", updatable = false, nullable = false) + private UUID id; + public final static String _id = "id"; + + @Column(name = "\"exchange\"", nullable = false, length = 50) + private String exchange; + public final static String _exchange = "exchange"; + + @Column(name = "\"route\"", length = 50) + private String route; + public final static String _route = "route"; + + @Column(name = "\"message_id\"", columnDefinition = "uuid", nullable = false) + private UUID messageId; + public final static String _messageId = "messageId"; + + @Column(name = "\"message\"", columnDefinition = "json", nullable = false) + private String message; + public final static String _message = "message"; + + //TODO: as integer + @Column(name = "\"notify_status\"", length = 20, nullable = false) + @Enumerated(EnumType.STRING) + private QueueOutboxNotifyStatus notifyStatus; + public final static String _notifyStatus = "notifyStatus"; + + @Column(name = "\"retry_count\"", nullable = false) + private int retryCount; + public final static String _retryCount = "retryCount"; + + @Column(name = "\"published_at\"", nullable = true) + private Instant publishedAt; + public final static String _publishedAt = "publishedAt"; + + @Column(name = "\"confirmed_at\"", nullable = true) + private Instant confirmedAt; + public final static String _confirmedAt = "confirmedAt"; + + @Column(name = "\"tenant\"", columnDefinition = "uuid", nullable = true) + private UUID tenantId; + public final static String _tenantId = "tenantId"; + + @Column(name = "\"is_active\"", length = 20, nullable = false) + @Convert(converter = IsActiveConverter.class) + private IsActive isActive; + public final static String _isActive = "isActive"; + + @Column(name = "\"created_at\"", nullable = false) + private Instant createdAt; + public final static String _createdAt = "createdAt"; + + @Column(name = "\"updated_at\"", nullable = false) + private Instant updatedAt; + public final static String _updatedAt = "updatedAt"; + + public UUID getId() { + return id; + } + + public void setId(UUID id) { + this.id = id; + } + + public String getExchange() { + return exchange; + } + + public void setExchange(String exchange) { + this.exchange = exchange; + } + + public String getRoute() { + return route; + } + + public void setRoute(String route) { + this.route = route; + } + + public UUID getMessageId() { + return messageId; + } + + public void setMessageId(UUID messageId) { + this.messageId = messageId; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public QueueOutboxNotifyStatus getNotifyStatus() { + return notifyStatus; + } + + public void setNotifyStatus(QueueOutboxNotifyStatus notifyStatus) { + this.notifyStatus = notifyStatus; + } + + public Integer getRetryCount() { + return retryCount; + } + + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; + } + + public Instant getPublishedAt() { + return publishedAt; + } + + public void setPublishedAt(Instant publishedAt) { + this.publishedAt = publishedAt; + } + + public Instant getConfirmedAt() { + return confirmedAt; + } + + public void setConfirmedAt(Instant confirmedAt) { + this.confirmedAt = confirmedAt; + } + + public UUID getTenantId() { + return tenantId; + } + + public void setTenantId(UUID tenantId) { + this.tenantId = tenantId; + } + + public IsActive getIsActive() { + return isActive; + } + + public void setIsActive(IsActive isActive) { + this.isActive = isActive; + } + + public Instant getCreatedAt() { + return createdAt; + } + + public void setCreatedAt(Instant createdAt) { + this.createdAt = createdAt; + } + + public Instant getUpdatedAt() { + return updatedAt; + } + + public void setUpdatedAt(Instant updatedAt) { + this.updatedAt = updatedAt; + } +} + diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/AppRabbitConfigurer.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/AppRabbitConfigurer.java new file mode 100644 index 000000000..83348f4a6 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/AppRabbitConfigurer.java @@ -0,0 +1,47 @@ +package eu.eudat.integrationevent; + +import eu.eudat.integrationevent.inbox.InboxProperties; +import eu.eudat.integrationevent.outbox.OutboxProperties; +import gr.cite.queueinbox.repository.InboxRepository; +import gr.cite.rabbitmq.RabbitConfigurer; +import gr.cite.rabbitmq.consumer.InboxBindings; +import gr.cite.rabbitmq.consumer.InboxCreator; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.ArrayList; +import java.util.List; + +@Configuration +@EnableConfigurationProperties({OutboxProperties.class, InboxProperties.class}) +@ConditionalOnProperty(prefix = "queue.rabbitmq", name = "listenerEnabled") +public class AppRabbitConfigurer extends RabbitConfigurer { + private ApplicationContext applicationContext; + private InboxProperties inboxProperties; + + public AppRabbitConfigurer(ApplicationContext applicationContext, InboxProperties inboxProperties) { + this.applicationContext = applicationContext; + this.inboxProperties = inboxProperties; + } + +// @Bean + public InboxBindings inboxBindingsCreator() { + List bindingItems = new ArrayList<>(); + bindingItems.addAll(this.inboxProperties.getUserRemovalTopic()); + bindingItems.addAll(this.inboxProperties.getUserTouchedTopic()); + + return new InboxBindings(bindingItems); + } + + +// @Bean + public InboxCreator inboxCreator() { + return (params) -> { + InboxRepository inboxRepository = this.applicationContext.getBean(InboxRepository.class); + return inboxRepository.create(params) != null; + }; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/InboxIntegrationEventConfigurer.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/InboxIntegrationEventConfigurer.java new file mode 100644 index 000000000..a4a8ddcf1 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/InboxIntegrationEventConfigurer.java @@ -0,0 +1,30 @@ +package eu.eudat.integrationevent; + +import eu.eudat.integrationevent.inbox.InboxProperties; +import eu.eudat.integrationevent.inbox.InboxRepositoryImpl; +import gr.cite.queueinbox.InboxConfigurer; +import gr.cite.queueinbox.repository.InboxRepository; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@EnableConfigurationProperties({InboxProperties.class}) +@ConditionalOnProperty(prefix = "queue.task.listener", name = "enable", matchIfMissing = false) +public class InboxIntegrationEventConfigurer extends InboxConfigurer { + private ApplicationContext applicationContext; + private InboxProperties inboxProperties; + + public InboxIntegrationEventConfigurer(ApplicationContext applicationContext, InboxProperties inboxProperties) { + this.applicationContext = applicationContext; + this.inboxProperties = inboxProperties; + } + + @Bean + public InboxRepository inboxRepositoryCreator() { + return new InboxRepositoryImpl(this.applicationContext, this.inboxProperties); + } +} + diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/IntegrationEventContextImpl.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/IntegrationEventContextImpl.java new file mode 100644 index 000000000..e6c8d8431 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/IntegrationEventContextImpl.java @@ -0,0 +1,20 @@ +package eu.eudat.integrationevent; + +import gr.cite.rabbitmq.IntegrationEventContext; + +import java.util.UUID; + +public class IntegrationEventContextImpl implements IntegrationEventContext { + private UUID tenant; + + public IntegrationEventContextImpl() { + } + + public UUID getTenant() { + return tenant; + } + + public void setTenant(UUID tenant) { + this.tenant = tenant; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/OutboxIntegrationEventConfigurer.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/OutboxIntegrationEventConfigurer.java new file mode 100644 index 000000000..db9278357 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/OutboxIntegrationEventConfigurer.java @@ -0,0 +1,69 @@ +package eu.eudat.integrationevent; + + +import eu.eudat.integrationevent.outbox.OutboxProperties; +import eu.eudat.integrationevent.outbox.OutboxRepositoryImpl; +import gr.cite.queueoutbox.IntegrationEventContextCreator; +import gr.cite.queueoutbox.OutboxConfigurer; +import gr.cite.queueoutbox.repository.OutboxRepository; +import gr.cite.rabbitmq.IntegrationEventMessageConstants; +import gr.cite.rabbitmq.RabbitProperties; +import gr.cite.rabbitmq.broker.MessageHydrator; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.time.Instant; +import java.util.UUID; + +@Configuration +@EnableConfigurationProperties({OutboxProperties.class}) +@ConditionalOnProperty(prefix = "queue.task.publisher", name = "enable", matchIfMissing = false) +public class OutboxIntegrationEventConfigurer extends OutboxConfigurer { + private ApplicationContext applicationContext; + private OutboxProperties outboxProperties; + + public OutboxIntegrationEventConfigurer(ApplicationContext applicationContext, OutboxProperties outboxProperties) { + this.applicationContext = applicationContext; + this.outboxProperties = outboxProperties; + } + + @Bean + public MessageHydrator messageHydrator(RabbitProperties rabbitProperties) { + return (message, event, eventContext) -> { + MessageProperties messageProperties = message.getMessageProperties(); + messageProperties.setAppId(rabbitProperties.getAppId()); + messageProperties.setContentEncoding(StandardCharsets.UTF_8.displayName()); + messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); + //messageProperties.setUserId(userContext.getCurrentUser().toString()); + messageProperties.setTimestamp(Date.from(Instant.now())); + messageProperties.setMessageId(event.getMessageId().toString()); + + if (eventContext != null) { + UUID tenant = ((IntegrationEventContextImpl) eventContext).getTenant(); + if (tenant != null) { + messageProperties.setHeader(IntegrationEventMessageConstants.TENANT, tenant); + } + } + + return message; + }; + } + + @Bean + public IntegrationEventContextCreator integrationEventContextCreator() { + return (message) -> new IntegrationEventContextImpl(); + } + + @Bean + public OutboxRepository outboxRepositoryCreator() { + return new OutboxRepositoryImpl(this.applicationContext, this.outboxProperties); + } +} + + diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/OutboxPropertiesConfiguration.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/OutboxPropertiesConfiguration.java new file mode 100644 index 000000000..fc08550e9 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/OutboxPropertiesConfiguration.java @@ -0,0 +1,11 @@ +package eu.eudat.integrationevent; + +import eu.eudat.integrationevent.outbox.OutboxProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@EnableConfigurationProperties(OutboxProperties.class) +public class OutboxPropertiesConfiguration { + +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/TrackedEvent.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/TrackedEvent.java new file mode 100644 index 000000000..5bc6e205a --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/TrackedEvent.java @@ -0,0 +1,18 @@ +package eu.eudat.integrationevent; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class TrackedEvent { + public String trackingContextTag; + + public String getTrackingContextTag() { + return trackingContextTag; + } + + public void setTrackingContextTag(String trackingContextTag) { + this.trackingContextTag = trackingContextTag; + } + + +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/ConsistencyHandler.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/ConsistencyHandler.java new file mode 100644 index 000000000..a08d85386 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/ConsistencyHandler.java @@ -0,0 +1,5 @@ +package eu.eudat.integrationevent.inbox; + +public interface ConsistencyHandler { + Boolean isConsistent(T consistencyPredicates); +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/ConsistencyPredicates.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/ConsistencyPredicates.java new file mode 100644 index 000000000..5bcd43719 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/ConsistencyPredicates.java @@ -0,0 +1,4 @@ +package eu.eudat.integrationevent.inbox; + +public interface ConsistencyPredicates { +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/EventProcessingStatus.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/EventProcessingStatus.java new file mode 100644 index 000000000..bed452b45 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/EventProcessingStatus.java @@ -0,0 +1,8 @@ +package eu.eudat.integrationevent.inbox; + +public enum EventProcessingStatus { + Error, + Success, + Postponed, + Discard +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxPrincipal.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxPrincipal.java new file mode 100644 index 000000000..825fe6d64 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxPrincipal.java @@ -0,0 +1,56 @@ +package eu.eudat.integrationevent.inbox; + +import gr.cite.commons.web.oidc.principal.MyPrincipal; +import org.springframework.security.oauth2.core.ClaimAccessor; +import org.springframework.security.oauth2.jwt.JwtClaimNames; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class InboxPrincipal implements MyPrincipal, ClaimAccessor { + private Map claims; + private boolean isAuthenticated; + + public InboxPrincipal(Boolean isAuthenticated, String name) { + this.claims = new HashMap<>(); + this.put(JwtClaimNames.SUB, name); + this.isAuthenticated = isAuthenticated; + } + + @Override + public Boolean isAuthenticated() { + return this.isAuthenticated; + } + + @Override + public Map getClaims() { + return this.claims; + } + + @Override + public List getClaimAsStringList(String claim) { + if (claims == null) return null; + return this.getClaimAsStringList(claim); + } + + @Override + public String getName() { + return this.getClaimAsString(JwtClaimNames.SUB); + } + + public void put(String key, Object value) { + this.claims.put(key, value); + } + + public static InboxPrincipal build(IntegrationEventProperties properties) { + InboxPrincipal inboxPrincipal = new InboxPrincipal(true, "IntegrationEventQueueAppId"); + inboxPrincipal.put("client_id", properties.getAppId()); + inboxPrincipal.put("active", "true"); + inboxPrincipal.put("nbf", Instant.now().minus(30, ChronoUnit.SECONDS).toString()); + inboxPrincipal.put("exp", Instant.now().plus(10, ChronoUnit.MINUTES).toString()); + return inboxPrincipal; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxProperties.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxProperties.java new file mode 100644 index 000000000..3971f8fbd --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxProperties.java @@ -0,0 +1,42 @@ +package eu.eudat.integrationevent.inbox; + + +import org.springframework.boot.context.properties.ConfigurationProperties; +//import org.springframework.boot.context.properties.ConstructorBinding; +import org.springframework.validation.annotation.Validated; + +import jakarta.validation.constraints.NotNull; +import java.util.List; + +@Validated +@ConfigurationProperties(prefix = "queue.task.listener.options") +//@ConstructorBinding +public class InboxProperties { + @NotNull + private final String exchange; + @NotNull + private final List userRemovalTopic; + @NotNull + private final List userTouchedTopic; + + public InboxProperties( + String exchange, + List userRemovalTopic, + List userTouchedTopic) { + this.exchange = exchange; + this.userRemovalTopic = userRemovalTopic; + this.userTouchedTopic = userTouchedTopic; + } + + public List getUserRemovalTopic() { + return userRemovalTopic; + } + + public List getUserTouchedTopic() { + return userTouchedTopic; + } + + public String getExchange() { + return exchange; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxRepositoryImpl.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxRepositoryImpl.java new file mode 100644 index 000000000..d20305d24 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/InboxRepositoryImpl.java @@ -0,0 +1,338 @@ +package eu.eudat.integrationevent.inbox; + + +import eu.eudat.commons.JsonHandlingService; +import eu.eudat.commons.enums.IsActive; +import eu.eudat.commons.fake.FakeRequestScope; +import eu.eudat.data.QueueInboxEntity; +import eu.eudat.integrationevent.TrackedEvent; +import eu.eudat.integrationevent.inbox.userremoval.UserRemovalIntegrationEventHandler; +import eu.eudat.integrationevent.inbox.usertouched.UserTouchedIntegrationEventHandler; +import eu.eudat.query.QueueInboxQuery; +import gr.cite.queueinbox.entity.QueueInbox; +import gr.cite.queueinbox.entity.QueueInboxStatus; +import gr.cite.queueinbox.repository.CandidateInfo; +import gr.cite.queueinbox.repository.InboxRepository; +import gr.cite.queueinbox.task.MessageOptions; +import gr.cite.rabbitmq.consumer.InboxCreatorParams; +import gr.cite.tools.data.query.Ordering; +import gr.cite.tools.data.query.QueryFactory; +import gr.cite.tools.logging.LoggerService; +import jakarta.persistence.*; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + + +import java.time.Instant; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.function.Function; + +public class InboxRepositoryImpl implements InboxRepository { + + protected final ApplicationContext applicationContext; + private final Random random = new Random(); + private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(InboxRepositoryImpl.class)); + private final JsonHandlingService jsonHandlingService; + private final InboxProperties inboxProperties; + + public InboxRepositoryImpl( + ApplicationContext applicationContext, + InboxProperties inboxProperties + ) { + this.applicationContext = applicationContext; + this.jsonHandlingService = this.applicationContext.getBean(JsonHandlingService.class); + this.inboxProperties = inboxProperties; + } + + + @Override + public CandidateInfo candidate(Instant lastCandidateCreationTimestamp, MessageOptions options) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + CandidateInfo candidate = null; + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + entityManager = entityManagerFactory.createEntityManager(); + + transaction = entityManager.getTransaction(); + transaction.begin(); + + QueueInboxEntity item = queryFactory.query(QueueInboxQuery.class) + .isActives(IsActive.Active) + .status(QueueInboxStatus.PENDING, QueueInboxStatus.ERROR) + .retryThreshold(options.getRetryThreashold()) + .createdAfter(lastCandidateCreationTimestamp) + .ordering(new Ordering().addAscending(QueueInboxEntity._createdAt)) + .first(); + + if (item != null) { + QueueInboxStatus prevState = item.getStatus(); + item.setStatus(QueueInboxStatus.PROCESSING); + + entityManager.merge(item); + entityManager.flush(); + + candidate = new CandidateInfo(); + candidate.setId(item.getId()); + candidate.setCreatedAt(item.getCreatedAt()); + candidate.setPreviousState(prevState); + } + + transaction.commit(); + } catch (OptimisticLockException ex) { + // we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working + this.logger.debug("Concurrency exception getting queue inbox. Skipping: {} ", ex.getMessage()); + if (transaction != null) transaction.rollback(); + candidate = null; + } catch (Exception ex) { + this.logger.error("Problem getting list of queue inbox. Skipping: {}", ex.getMessage(), ex); + if (transaction != null) transaction.rollback(); + candidate = null; + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem getting list of queue inbox. Skipping: {}", ex.getMessage(), ex); + } + + return candidate; + } + + @Override + public Boolean shouldOmit(CandidateInfo candidate, Function shouldOmit) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + Boolean success = false; + + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + + entityManager = entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + + transaction.begin(); + + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); + QueueInboxEntity item = queryFactory.query(QueueInboxQuery.class).ids(candidate.getId()).first(); + + if (item == null) { + this.logger.warn("Could not lookup queue inbox {} to process. Continuing...", candidate.getId()); + } else { + if (shouldOmit.apply(item)) { + item.setStatus(QueueInboxStatus.OMITTED); + + entityManager.merge(item); + entityManager.flush(); + success = true; + } + } + + transaction.commit(); + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + if (transaction != null) transaction.rollback(); + success = false; + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + return success; + } + + @Override + public boolean shouldWait(CandidateInfo candidate, Function itIsTimeFunc) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + Boolean success = false; + + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + + entityManager = entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + + transaction.begin(); + + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); + QueueInboxEntity item = queryFactory.query(QueueInboxQuery.class).ids(candidate.getId()).first(); + + if (item.getRetryCount() != null && item.getRetryCount() >= 1) { + Boolean itIsTime = itIsTimeFunc.apply(item); + + if (!itIsTime) { + item.setStatus(candidate.getPreviousState()); + + entityManager.merge(item); + entityManager.flush(); + success = true; + } + + success = !itIsTime; + } + transaction.commit(); + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + if (transaction != null) transaction.rollback(); + success = false; + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + return success; + } + + @Override + public QueueInbox create(InboxCreatorParams inboxCreatorParams) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + Boolean success = false; + QueueInboxEntity queueMessage = null; + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + queueMessage = this.createQueueInboxEntity(inboxCreatorParams); + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + + entityManager = entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + + transaction.begin(); + + entityManager.persist(queueMessage); + entityManager.flush(); + + transaction.commit(); + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + if (transaction != null) transaction.rollback(); + success = false; + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + return queueMessage; + } + + private QueueInboxEntity createQueueInboxEntity(InboxCreatorParams inboxCreatorParams) { + + QueueInboxEntity queueMessage = new QueueInboxEntity(); + queueMessage.setId(UUID.randomUUID()); + queueMessage.setTenantId(null); + queueMessage.setExchange(this.inboxProperties.getExchange()); + queueMessage.setRoute(inboxCreatorParams.getRoutingKey()); + queueMessage.setQueue(inboxCreatorParams.getQueueName()); + queueMessage.setApplicationId(inboxCreatorParams.getAppId()); + queueMessage.setMessageId(UUID.fromString(inboxCreatorParams.getMessageId())); + queueMessage.setMessage(inboxCreatorParams.getMessageBody()); + queueMessage.setIsActive(IsActive.Active); + queueMessage.setStatus(QueueInboxStatus.PENDING); + queueMessage.setRetryCount(0); + queueMessage.setCreatedAt(Instant.now()); + + return queueMessage; + } + + + @Override + public Boolean emit(CandidateInfo candidateInfo) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + Boolean success = false; + + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + + entityManager = entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + transaction.begin(); + + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); + QueueInboxEntity queueInboxMessage = queryFactory.query(QueueInboxQuery.class).ids(candidateInfo.getId()).first(); + + if (queueInboxMessage == null) { + this.logger.warn("Could not lookup queue inbox {} to process. Continuing...", candidateInfo.getId()); + } else { + + EventProcessingStatus status = this.processMessage(queueInboxMessage.getRoute(), queueInboxMessage.getMessageId().toString(), queueInboxMessage.getApplicationId(), queueInboxMessage.getMessage()); + switch (status) { + case Success: { + queueInboxMessage.setStatus(QueueInboxStatus.SUCCESSFUL); + break; + } + case Postponed: { + queueInboxMessage.setStatus(QueueInboxStatus.PARKED); + break; + } + case Error: { + queueInboxMessage.setStatus(QueueInboxStatus.ERROR); + queueInboxMessage.setRetryCount(queueInboxMessage.getRetryCount() != null ? queueInboxMessage.getRetryCount() + 1 : 0); + break; + } + case Discard: + default: { + queueInboxMessage.setStatus(QueueInboxStatus.DISCARD); + break; + } + } + success = status == EventProcessingStatus.Success; + + entityManager.merge(queueInboxMessage); + entityManager.flush(); + } + + transaction.commit(); + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + if (transaction != null) transaction.rollback(); + success = false; + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + return success; + } + + private EventProcessingStatus processMessage(String routingKey, String messageId, String appId, String message) { + IntegrationEventHandler handler; + if (this.RoutingKeyMatched(routingKey, this.inboxProperties.getUserRemovalTopic())) handler = this.applicationContext.getBean(UserRemovalIntegrationEventHandler.class); + else if (this.RoutingKeyMatched(routingKey, this.inboxProperties.getUserTouchedTopic())) handler = this.applicationContext.getBean(UserTouchedIntegrationEventHandler.class); + else handler = null; + + if (handler == null) return EventProcessingStatus.Discard; + + IntegrationEventProperties properties = new IntegrationEventProperties(); + properties.setAppId(appId); + properties.setMessageId(messageId); + + TrackedEvent event = this.jsonHandlingService.fromJsonSafe(TrackedEvent.class, message); +// using (LogContext.PushProperty(this._logTrackingConfig.LogTrackingContextName, @event.TrackingContextTag)) +// { + try { + return handler.handle(properties, message); + } catch (Exception ex) { + logger.error("problem handling event from routing key " + routingKey + ". Setting nack and continuing...", ex); + return EventProcessingStatus.Error; + } +// } + } + + private Boolean RoutingKeyMatched(String routingKey, List topics) { + if (topics == null || topics.size() == 0) return false; + return topics.stream().anyMatch(x -> x.equals(routingKey)); + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/IntegrationEventHandler.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/IntegrationEventHandler.java new file mode 100644 index 000000000..fec42f871 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/IntegrationEventHandler.java @@ -0,0 +1,5 @@ +package eu.eudat.integrationevent.inbox; + +public interface IntegrationEventHandler { + EventProcessingStatus handle(IntegrationEventProperties properties, String message); +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/IntegrationEventProperties.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/IntegrationEventProperties.java new file mode 100644 index 000000000..90acd1d92 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/IntegrationEventProperties.java @@ -0,0 +1,22 @@ +package eu.eudat.integrationevent.inbox; + +public class IntegrationEventProperties { + private String messageId; + private String appId; + + public String getMessageId() { + return messageId; + } + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalConsistencyHandler.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalConsistencyHandler.java new file mode 100644 index 000000000..825e2fe41 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalConsistencyHandler.java @@ -0,0 +1,26 @@ +package eu.eudat.integrationevent.inbox.userremoval; + +import eu.eudat.integrationevent.inbox.ConsistencyHandler; +import eu.eudat.query.UserQuery; +import gr.cite.tools.data.query.QueryFactory; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +@Component +@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class UserRemovalConsistencyHandler implements ConsistencyHandler { + + private final QueryFactory queryFactory; + + public UserRemovalConsistencyHandler(QueryFactory queryFactory) { + this.queryFactory = queryFactory; + } + + @Override + public Boolean isConsistent(UserRemovalConsistencyPredicates consistencyPredicates) { + long count = this.queryFactory.query(UserQuery.class).ids(consistencyPredicates.getUserId()).count(); + if (count == 0) return false; + return true; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalConsistencyPredicates.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalConsistencyPredicates.java new file mode 100644 index 000000000..911065e3f --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalConsistencyPredicates.java @@ -0,0 +1,21 @@ +package eu.eudat.integrationevent.inbox.userremoval; + +import eu.eudat.integrationevent.inbox.ConsistencyPredicates; + +import java.util.UUID; + +public class UserRemovalConsistencyPredicates implements ConsistencyPredicates { + public UserRemovalConsistencyPredicates(UUID userId) { + this.userId = userId; + } + + private UUID userId; + + public UUID getUserId() { + return userId; + } + + public void setUserId(UUID userId) { + this.userId = userId; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEvent.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEvent.java new file mode 100644 index 000000000..49b81b156 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEvent.java @@ -0,0 +1,26 @@ +package eu.eudat.integrationevent.inbox.userremoval; + +import eu.eudat.integrationevent.TrackedEvent; + +import java.util.UUID; + +public class UserRemovalIntegrationEvent extends TrackedEvent { + private UUID userId; + private UUID tenant; + + public UUID getUserId() { + return userId; + } + + public void setUserId(UUID userId) { + this.userId = userId; + } + + public UUID getTenant() { + return tenant; + } + + public void setTenant(UUID tenant) { + this.tenant = tenant; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEventHandler.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEventHandler.java new file mode 100644 index 000000000..7861c4446 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEventHandler.java @@ -0,0 +1,6 @@ +package eu.eudat.integrationevent.inbox.userremoval; + +import eu.eudat.integrationevent.inbox.IntegrationEventHandler; + +public interface UserRemovalIntegrationEventHandler extends IntegrationEventHandler { +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEventHandlerImpl.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEventHandlerImpl.java new file mode 100644 index 000000000..cb4f44619 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/userremoval/UserRemovalIntegrationEventHandlerImpl.java @@ -0,0 +1,125 @@ +package eu.eudat.integrationevent.inbox.userremoval; + +import eu.eudat.audit.AuditableAction; +import eu.eudat.commons.JsonHandlingService; +import eu.eudat.commons.fake.FakeRequestScope; +import eu.eudat.data.TenantEntity; +import eu.eudat.errorcode.ErrorThesaurusProperties; +import eu.eudat.integrationevent.inbox.EventProcessingStatus; +import eu.eudat.integrationevent.inbox.InboxPrincipal; +import eu.eudat.integrationevent.inbox.IntegrationEventProperties; +import eu.eudat.query.TenantQuery; +import eu.eudat.service.user.UserService; +import gr.cite.commons.web.oidc.principal.CurrentPrincipalResolver; + +import gr.cite.tools.auditing.AuditService; +import gr.cite.tools.data.query.QueryFactory; +import gr.cite.tools.exception.MyValidationException; +import gr.cite.tools.fieldset.BaseFieldSet; +import gr.cite.tools.logging.LoggerService; +import jakarta.persistence.*; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.MessageSource; +import org.springframework.context.annotation.Scope; +import org.springframework.context.i18n.LocaleContextHolder; +import org.springframework.stereotype.Component; + + +import java.util.AbstractMap; +import java.util.Map; + +@Component +@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class UserRemovalIntegrationEventHandlerImpl implements UserRemovalIntegrationEventHandler { + private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(UserRemovalIntegrationEventHandlerImpl.class)); + private final JsonHandlingService jsonHandlingService; + protected final ApplicationContext applicationContext; + private final ErrorThesaurusProperties errors; + private final MessageSource messageSource; + + public UserRemovalIntegrationEventHandlerImpl( + JsonHandlingService jsonHandlingService, + ApplicationContext applicationContext, + ErrorThesaurusProperties errors, + MessageSource messageSource + ) { + this.jsonHandlingService = jsonHandlingService; + this.applicationContext = applicationContext; + this.errors = errors; + this.messageSource = messageSource; + } + + @Override + public EventProcessingStatus handle(IntegrationEventProperties properties, String message) { + UserRemovalIntegrationEvent event = this.jsonHandlingService.fromJsonSafe(UserRemovalIntegrationEvent.class, message); + if (event == null) return EventProcessingStatus.Error; + if (event.getUserId() == null) throw new MyValidationException(this.errors.getModelValidation().getCode(), "userId", messageSource.getMessage("Validation_Required", new Object[]{"userId"}, LocaleContextHolder.getLocale())); + + EntityManager entityManager = null; + EntityTransaction transaction = null; + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); +// TenantScope scope = this.applicationContext.getBean(TenantScope.class); +// if (scope.isMultitenant() && event.getTenant() != null) { +// TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(event.getTenant()).firstAs(new BaseFieldSet().ensure(Tenant._id).ensure(Tenant._code)); +// if (tenant == null) { +// logger.error("missing tenant from event message"); +// return EventProcessingStatus.Error; +// } +// scope.setTenant(event.getTenant(), tenant.getCode()); +// } else if (scope.isMultitenant()) { +// logger.error("missing tenant from event message"); +// return EventProcessingStatus.Error; +// } + + CurrentPrincipalResolver currentPrincipalResolver = this.applicationContext.getBean(CurrentPrincipalResolver.class); + currentPrincipalResolver.push(InboxPrincipal.build(properties)); + + UserRemovalConsistencyHandler userRemovalConsistencyHandler = this.applicationContext.getBean(UserRemovalConsistencyHandler.class); + if (!(userRemovalConsistencyHandler.isConsistent(new UserRemovalConsistencyPredicates(event.getUserId())))) return EventProcessingStatus.Postponed; + + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + entityManager = entityManagerFactory.createEntityManager(); + + transaction = entityManager.getTransaction(); + transaction.begin(); + + try { + UserService userService = this.applicationContext.getBean(UserService.class); + userService.deleteAndSave(event.getUserId()); + + AuditService auditService = this.applicationContext.getBean(AuditService.class); + + auditService.track(AuditableAction.User_Delete, Map.ofEntries( + new AbstractMap.SimpleEntry("id", event.getUserId()) + )); + //auditService.trackIdentity(AuditableAction.IdentityTracking_Action); + + transaction.commit(); + } catch (Exception e) { + transaction.rollback(); + throw e; + } finally { + currentPrincipalResolver.pop(); + } + + transaction.commit(); + } catch (OptimisticLockException ex) { + // we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working + this.logger.debug("Concurrency exception getting queue outbox. Skipping: {} ", ex.getMessage()); + if (transaction != null) transaction.rollback(); + } catch (Exception ex) { + this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); + if (transaction != null) transaction.rollback(); + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); + } + return null; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEvent.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEvent.java new file mode 100644 index 000000000..0ca679bff --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEvent.java @@ -0,0 +1,45 @@ +package eu.eudat.integrationevent.inbox.usertouched; + + +import eu.eudat.integrationevent.TrackedEvent; + +import java.util.UUID; + +public class UserTouchedIntegrationEvent extends TrackedEvent { + private UUID id; + private UUID tenant; + private String firstName; + private String lastName; + + public UUID getId() { + return id; + } + + public void setId(UUID id) { + this.id = id; + } + + public UUID getTenant() { + return tenant; + } + + public void setTenant(UUID tenant) { + this.tenant = tenant; + } + + public String getFirstName() { + return firstName; + } + + public void setFirstName(String firstName) { + this.firstName = firstName; + } + + public String getLastName() { + return lastName; + } + + public void setLastName(String lastName) { + this.lastName = lastName; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEventHandler.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEventHandler.java new file mode 100644 index 000000000..ffc6db6cc --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEventHandler.java @@ -0,0 +1,6 @@ +package eu.eudat.integrationevent.inbox.usertouched; + +import eu.eudat.integrationevent.inbox.IntegrationEventHandler; + +public interface UserTouchedIntegrationEventHandler extends IntegrationEventHandler { +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEventHandlerImpl.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEventHandlerImpl.java new file mode 100644 index 000000000..44bdd0e34 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/inbox/usertouched/UserTouchedIntegrationEventHandlerImpl.java @@ -0,0 +1,116 @@ +package eu.eudat.integrationevent.inbox.usertouched; + +import eu.eudat.audit.AuditableAction; +import eu.eudat.commons.JsonHandlingService; +import eu.eudat.commons.fake.FakeRequestScope; +import eu.eudat.integrationevent.inbox.EventProcessingStatus; +import eu.eudat.integrationevent.inbox.InboxPrincipal; +import eu.eudat.integrationevent.inbox.IntegrationEventProperties; +import eu.eudat.service.user.UserService; +import gr.cite.commons.web.oidc.principal.CurrentPrincipalResolver; +import gr.cite.tools.auditing.AuditService; +import gr.cite.tools.data.query.QueryFactory; +import gr.cite.tools.fieldset.BaseFieldSet; +import gr.cite.tools.logging.LoggerService; +import gr.cite.tools.validation.ValidationService; +import jakarta.persistence.*; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import java.util.AbstractMap; +import java.util.Map; + +@Component +@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class UserTouchedIntegrationEventHandlerImpl implements UserTouchedIntegrationEventHandler { + private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(UserTouchedIntegrationEventHandlerImpl.class)); + private final JsonHandlingService jsonHandlingService; + protected final ApplicationContext applicationContext; + + public UserTouchedIntegrationEventHandlerImpl( + JsonHandlingService jsonHandlingService, + ApplicationContext applicationContext + ) { + this.jsonHandlingService = jsonHandlingService; + this.applicationContext = applicationContext; + } + + @Override + public EventProcessingStatus handle(IntegrationEventProperties properties, String message) { + UserTouchedIntegrationEvent event = this.jsonHandlingService.fromJsonSafe(UserTouchedIntegrationEvent.class, message); + if (event == null) return EventProcessingStatus.Error; + +// UserTouchedIntegrationEventPersist model = new UserTouchedIntegrationEventPersist(); +// model.setId(event.getId()); +// model.setFirstName(event.getFirstName()); +// model.setLastName(event.getLastName()); + + EntityManager entityManager = null; + EntityTransaction transaction = null; + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); +// TenantScope scope = this.applicationContext.getBean(TenantScope.class); +// if (scope.isMultitenant() && event.getTenant() != null) { +// TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(event.getTenant()).firstAs(new BaseFieldSet().ensure(Tenant._id).ensure(Tenant._code)); +// if (tenant == null) { +// logger.error("missing tenant from event message"); +// return EventProcessingStatus.Error; +// } +// scope.setTenant(event.getTenant(), tenant.getCode()); +// } else if (scope.isMultitenant()) { +// logger.error("missing tenant from event message"); +// return EventProcessingStatus.Error; +// } +// +// ValidationService validator = this.applicationContext.getBean(ValidationService.class); +// validator.validateForce(model); + + + CurrentPrincipalResolver currentPrincipalResolver = this.applicationContext.getBean(CurrentPrincipalResolver.class); + currentPrincipalResolver.push(InboxPrincipal.build(properties)); + + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + entityManager = entityManagerFactory.createEntityManager(); + + transaction = entityManager.getTransaction(); + transaction.begin(); + + try { + UserService userService = this.applicationContext.getBean(UserService.class); +// userService.persist(model, null); + + AuditService auditService = this.applicationContext.getBean(AuditService.class); + + auditService.track(AuditableAction.User_Persist, Map.ofEntries( +// new AbstractMap.SimpleEntry("model", model) + )); + + transaction.commit(); + } catch (Exception e) { + transaction.rollback(); + throw e; + } finally { + currentPrincipalResolver.pop(); + } + + transaction.commit(); + } catch (OptimisticLockException ex) { + // we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working + this.logger.debug("Concurrency exception getting queue outbox. Skipping: {} ", ex.getMessage()); + if (transaction != null) transaction.rollback(); + } catch (Exception ex) { + this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); + if (transaction != null) transaction.rollback(); + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); + } + return null; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxIntegrationEvent.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxIntegrationEvent.java new file mode 100644 index 000000000..d492544e9 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxIntegrationEvent.java @@ -0,0 +1,27 @@ +package eu.eudat.integrationevent.outbox; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import eu.eudat.integrationevent.TrackedEvent; +import gr.cite.rabbitmq.IntegrationEvent; + + +@JsonIgnoreProperties(ignoreUnknown = true) +public class OutboxIntegrationEvent extends IntegrationEvent { + public static final String FORGET_ME_COMPLETED = "FORGET_ME_COMPLETED"; + public static final String NOTIFY = "NOTIFY"; + public static final String TENANT_REACTIVATE = "TENANT_REACTIVATE"; + public static final String TENANT_REMOVE = "TENANT_REMOVE"; + public static final String TENANT_TOUCH = "TENANT_TOUCH"; + public static final String TENANT_USER_INVITE = "TENANT_USER_INVITE"; + public static final String WHAT_YOU_KNOW_ABOUT_ME_COMPLETED = "WHAT_YOU_KNOW_ABOUT_ME_COMPLETED"; + public static final String GENERATE_FILE = "GENERATE_FILE"; + private TrackedEvent event; + + public TrackedEvent getEvent() { + return event; + } + + public void setEvent(TrackedEvent event) { + this.event = event; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxProperties.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxProperties.java new file mode 100644 index 000000000..e9e0c9680 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxProperties.java @@ -0,0 +1,89 @@ +package eu.eudat.integrationevent.outbox; + + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.bind.ConstructorBinding; +import org.springframework.validation.annotation.Validated; + +import jakarta.validation.constraints.NotNull; + +@Validated +@ConfigurationProperties(prefix = "queue.task.publisher.options") +//@ConstructorBinding +public class OutboxProperties { + @NotNull + private final String exchange; + @NotNull + private final String tenantTouchTopic; + @NotNull + private final String tenantRemovalTopic; + @NotNull + private final String tenantReactivationTopic; + @NotNull + private final String tenantUserInviteTopic; + @NotNull + private final String notifyTopic; + @NotNull + private final String forgetMeCompletedTopic; + @NotNull + private final String whatYouKnowAboutMeCompletedTopic; + @NotNull + private final String generateFileTopic; + + public OutboxProperties(String exchange, + String tenantTouchTopic, + String tenantRemovalTopic, + String tenantReactivationTopic, + String tenantUserInviteTopic, + String notifyTopic, + String forgetMeCompletedTopic, + String whatYouKnowAboutMeCompletedTopic, + String generateFileTopic + ) { + this.exchange = exchange; + this.tenantTouchTopic = tenantTouchTopic; + this.tenantRemovalTopic = tenantRemovalTopic; + this.tenantReactivationTopic = tenantReactivationTopic; + this.tenantUserInviteTopic = tenantUserInviteTopic; + this.notifyTopic = notifyTopic; + this.forgetMeCompletedTopic = forgetMeCompletedTopic; + this.whatYouKnowAboutMeCompletedTopic = whatYouKnowAboutMeCompletedTopic; + this.generateFileTopic = generateFileTopic; + } + + public String getExchange() { + return exchange; + } + + public String getTenantTouchTopic() { + return tenantTouchTopic; + } + + public String getTenantRemovalTopic() { + return tenantRemovalTopic; + } + + public String getTenantReactivationTopic() { + return tenantReactivationTopic; + } + + public String getTenantUserInviteTopic() { + return tenantUserInviteTopic; + } + + public String getNotifyTopic() { + return notifyTopic; + } + + public String getForgetMeCompletedTopic() { + return forgetMeCompletedTopic; + } + + public String getWhatYouKnowAboutMeCompletedTopic() { + return whatYouKnowAboutMeCompletedTopic; + } + + public String getGenerateFileTopic() { + return generateFileTopic; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxRepositoryImpl.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxRepositoryImpl.java new file mode 100644 index 000000000..a9e36748c --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxRepositoryImpl.java @@ -0,0 +1,428 @@ +package eu.eudat.integrationevent.outbox; + +import eu.eudat.commons.JsonHandlingService; +import eu.eudat.commons.enums.IsActive; +import eu.eudat.commons.fake.FakeRequestScope; +import eu.eudat.data.QueueOutboxEntity; +import eu.eudat.query.QueueOutboxQuery; +import gr.cite.queueoutbox.entity.QueueOutbox; +import gr.cite.queueoutbox.entity.QueueOutboxNotifyStatus; +import gr.cite.queueoutbox.repository.CandidateInfo; +import gr.cite.queueoutbox.repository.OutboxRepository; +import gr.cite.queueoutbox.task.MessageOptions; +import gr.cite.rabbitmq.IntegrationEvent; +import gr.cite.tools.data.query.Ordering; +import gr.cite.tools.data.query.QueryFactory; +import gr.cite.tools.logging.LoggerService; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +import jakarta.persistence.EntityManager; +import jakarta.persistence.EntityManagerFactory; +import jakarta.persistence.EntityTransaction; +import jakarta.persistence.OptimisticLockException; +import java.time.Instant; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class OutboxRepositoryImpl implements OutboxRepository { + + protected final ApplicationContext applicationContext; + private final Random random = new Random(); + private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(OutboxRepositoryImpl.class)); + private final JsonHandlingService jsonHandlingService; + private final OutboxProperties outboxProperties; + + public OutboxRepositoryImpl( + ApplicationContext applicationContext, + OutboxProperties outboxProperties + ) { + this.applicationContext = applicationContext; + this.jsonHandlingService = this.applicationContext.getBean(JsonHandlingService.class); + this.outboxProperties = outboxProperties; + } + + @Override + public CandidateInfo candidate(Instant lastCandidateCreationTimestamp, MessageOptions messageOptions, Function onConfirmTimeout) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + CandidateInfo candidate = null; + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + entityManager = entityManagerFactory.createEntityManager(); + + transaction = entityManager.getTransaction(); + transaction.begin(); + + QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class) + .isActives(IsActive.Active) + .notifyStatus(QueueOutboxNotifyStatus.PENDING, QueueOutboxNotifyStatus.WAITING_CONFIRMATION, QueueOutboxNotifyStatus.ERROR) + .retryThreshold(messageOptions.getRetryThreashold()) + .confirmTimeout(messageOptions.getConfirmTimeoutSeconds()) + .createdAfter(lastCandidateCreationTimestamp) + .ordering(new Ordering().addAscending(QueueOutboxEntity._createdAt)) + .first(); + + if (item != null) { + boolean confirmTimeout = onConfirmTimeout.apply(item); + + QueueOutboxNotifyStatus prevState = item.getNotifyStatus(); + item.setNotifyStatus(QueueOutboxNotifyStatus.PROCESSING); + + entityManager.merge(item); + entityManager.flush(); + + candidate = new CandidateInfo(); + candidate.setId(item.getId()); + candidate.setCreatedAt(item.getCreatedAt()); + candidate.setPreviousState(prevState); + } + + transaction.commit(); + } catch (OptimisticLockException ex) { + // we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working + this.logger.debug("Concurrency exception getting queue outbox. Skipping: {} ", ex.getMessage()); + if (transaction != null) transaction.rollback(); + candidate = null; + } catch (Exception ex) { + this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); + if (transaction != null) transaction.rollback(); + candidate = null; + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex); + } + + return candidate; + } + + @Override + public Boolean shouldOmit(CandidateInfo candidate, Function shouldOmit) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + Boolean success = false; + + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + + entityManager = entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + + transaction.begin(); + + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); + QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidate.getId()).first(); + + if (item == null) { + this.logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidate.getId()); + } else { + if (shouldOmit.apply(item)) { + item.setNotifyStatus(QueueOutboxNotifyStatus.OMITTED); + + entityManager.merge(item); + entityManager.flush(); + success = true; + } + } + + transaction.commit(); + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + if (transaction != null) transaction.rollback(); + success = false; + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + return success; + } + + @Override + public Boolean shouldWait(CandidateInfo candidate, Function itIsTimeFunc) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + Boolean success = false; + + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + + entityManager = entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + + transaction.begin(); + + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); + QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidate.getId()).first(); + + if (item.getRetryCount() != null && item.getRetryCount() >= 1) { + Boolean itIsTime = itIsTimeFunc.apply(item); + + if (!itIsTime) { + item.setNotifyStatus(candidate.getPreviousState()); + + entityManager.merge(item); + entityManager.flush(); + success = true; + } + + success = !itIsTime; + } + transaction.commit(); + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + if (transaction != null) transaction.rollback(); + success = false; + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + return success; + } + + @Override + public Boolean process(CandidateInfo candidateInfo, Boolean isAutoconfirmOnPublish, Function publish) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + Boolean success = false; + + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + + entityManager = entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + transaction.begin(); + + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); + QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidateInfo.getId()).first(); + + if (item == null) { + this.logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidateInfo.getId()); + } else { + + success = publish.apply(item); + if (success) { + if (isAutoconfirmOnPublish) { + item.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED); + item.setConfirmedAt(Instant.now()); + } else { + item.setNotifyStatus(QueueOutboxNotifyStatus.WAITING_CONFIRMATION); + } + item.setPublishedAt(Instant.now()); + } else { + item.setNotifyStatus(QueueOutboxNotifyStatus.ERROR); + item.setRetryCount(item.getRetryCount() != null ? item.getRetryCount() + 1 : 0); + item.setPublishedAt(null); + } + + entityManager.merge(item); + entityManager.flush(); + } + + transaction.commit(); + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + if (transaction != null) transaction.rollback(); + success = false; + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + return success; + } + + @Override + public void handleConfirm(List confirmedMessages) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + + entityManager = entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + transaction.begin(); + + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); + List queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(confirmedMessages).collect(); + + if (queueOutboxMessages == null) { + this.logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", confirmedMessages.stream().map(x -> x.toString()).collect(Collectors.toList()))); + } else { + + for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) { + queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED); + queueOutboxMessage.setConfirmedAt(Instant.now()); + entityManager.merge(queueOutboxMessage); + } + + entityManager.flush(); + } + + transaction.commit(); + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + if (transaction != null) transaction.rollback(); + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + } + + @Override + public void handleNack(List nackedMessages) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + + entityManager = entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + transaction.begin(); + + QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); + List queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(nackedMessages).collect(); + + if (queueOutboxMessages == null) { + this.logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", nackedMessages.stream().map(x -> x.toString()).collect(Collectors.toList()))); + } else { + + for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) { + queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.ERROR); + queueOutboxMessage.setRetryCount(queueOutboxMessage.getRetryCount() != null ? queueOutboxMessage.getRetryCount() + 1 : 0); + entityManager.merge(queueOutboxMessage); + } + + entityManager.flush(); + } + + transaction.commit(); + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + if (transaction != null) transaction.rollback(); + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + } + + @Override + public QueueOutbox create(IntegrationEvent item) { + EntityTransaction transaction = null; + EntityManager entityManager = null; + Boolean success = false; + QueueOutboxEntity queueMessage = null; + try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { + try { + queueMessage = this.mapEvent((OutboxIntegrationEvent) item); + EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); + + entityManager = entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + + transaction.begin(); + + entityManager.persist(queueMessage); + entityManager.flush(); + + transaction.commit(); + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + if (transaction != null) transaction.rollback(); + success = false; + } finally { + if (entityManager != null) entityManager.close(); + } + } catch (Exception ex) { + this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); + } + return queueMessage; + } + + private QueueOutboxEntity mapEvent(OutboxIntegrationEvent event) { + String routingKey; + switch (event.getType()) { + case OutboxIntegrationEvent.TENANT_REACTIVATE: { + routingKey = this.outboxProperties.getTenantReactivationTopic(); + break; + } + case OutboxIntegrationEvent.TENANT_REMOVE: { + routingKey = this.outboxProperties.getTenantRemovalTopic(); + break; + } + case OutboxIntegrationEvent.TENANT_TOUCH: { + routingKey = this.outboxProperties.getTenantTouchTopic(); + break; + } + case OutboxIntegrationEvent.TENANT_USER_INVITE: { + routingKey = this.outboxProperties.getTenantUserInviteTopic(); + break; + } + case OutboxIntegrationEvent.FORGET_ME_COMPLETED: { + routingKey = this.outboxProperties.getForgetMeCompletedTopic(); + break; + } + case OutboxIntegrationEvent.NOTIFY: { + routingKey = this.outboxProperties.getNotifyTopic(); + break; + } + case OutboxIntegrationEvent.WHAT_YOU_KNOW_ABOUT_ME_COMPLETED: { + routingKey = this.outboxProperties.getWhatYouKnowAboutMeCompletedTopic(); + break; + } + case OutboxIntegrationEvent.GENERATE_FILE: { + routingKey = this.outboxProperties.getGenerateFileTopic(); + break; + } + default: { + logger.error("unrecognized outgoing integration event {}. Skipping...", event.getType()); + return null; + } + } + + UUID correlationId = UUID.randomUUID(); + if (event.getEvent() != null) event.getEvent().setTrackingContextTag(correlationId.toString()); + event.setMessage(this.jsonHandlingService.toJsonSafe(event.getEvent())); + //this._logTrackingService.Trace(correlationId.ToString(), $"Correlating current tracking context with new correlationId {correlationId}"); + + QueueOutboxEntity queueMessage = new QueueOutboxEntity(); + queueMessage.setId(UUID.randomUUID()); + queueMessage.setTenantId(null); + queueMessage.setExchange(this.outboxProperties.getExchange()); + queueMessage.setRoute(routingKey); + queueMessage.setMessageId(event.getMessageId()); + queueMessage.setMessage(this.jsonHandlingService.toJsonSafe(event)); + queueMessage.setIsActive(IsActive.Active); + queueMessage.setNotifyStatus(QueueOutboxNotifyStatus.PENDING); + queueMessage.setRetryCount(0); + queueMessage.setCreatedAt(Instant.now()); + queueMessage.setUpdatedAt(Instant.now()); + + return queueMessage; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxService.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxService.java new file mode 100644 index 000000000..ba72e08a9 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxService.java @@ -0,0 +1,5 @@ +package eu.eudat.integrationevent.outbox; + +public interface OutboxService { + void publish(OutboxIntegrationEvent event); +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxServiceImpl.java b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxServiceImpl.java new file mode 100644 index 000000000..3d1a5b7be --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/integrationevent/outbox/OutboxServiceImpl.java @@ -0,0 +1,43 @@ +package eu.eudat.integrationevent.outbox; + + +import eu.eudat.commons.JsonHandlingService; +import gr.cite.tools.logging.LoggerService; +import gr.cite.tools.logging.MapLogEntry; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.stereotype.Component; +import org.springframework.web.context.annotation.RequestScope; + +@Component +@RequestScope +public class OutboxServiceImpl implements OutboxService { + private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(OutboxServiceImpl.class)); + + private final OutboxProperties config; + private final JsonHandlingService jsonHandlingService; + private final ApplicationEventPublisher eventPublisher; + + public OutboxServiceImpl( + OutboxProperties config, + JsonHandlingService jsonHandlingService, + ApplicationEventPublisher eventPublisher + ) { + this.config = config; + this.jsonHandlingService = jsonHandlingService; + this.eventPublisher = eventPublisher; + } + + @Override + public void publish(OutboxIntegrationEvent event) { + try { + eventPublisher.publishEvent(event); + return; + } catch (Exception ex) { + logger.error(new MapLogEntry(String.format("Could not save message ", event.getMessage())).And("message", event.getMessage()).And("ex", ex)); + //Still want to skip it from processing + return; + } + + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/query/QueueInboxQuery.java b/dmp-backend/core/src/main/java/eu/eudat/query/QueueInboxQuery.java new file mode 100644 index 000000000..9d0877822 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/query/QueueInboxQuery.java @@ -0,0 +1,219 @@ +package eu.eudat.query; + + +import eu.eudat.commons.enums.IsActive; +import eu.eudat.data.QueueInboxEntity; +import gr.cite.queueinbox.entity.QueueInboxStatus; +import gr.cite.tools.data.query.FieldResolver; +import gr.cite.tools.data.query.Ordering; +import gr.cite.tools.data.query.QueryBase; +import gr.cite.tools.data.query.QueryContext; +import jakarta.persistence.Tuple; +import jakarta.persistence.criteria.CriteriaBuilder; +import jakarta.persistence.criteria.Predicate; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import java.time.Instant; +import java.util.*; + +@Component +@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class QueueInboxQuery extends QueryBase { + + private Collection ids; + private Instant createdAfter; + private Collection isActives; + private Collection exchanges; + private Collection routes; + private Collection status; + private Integer retryThreshold; + + public QueueInboxQuery ids(UUID value) { + this.ids = List.of(value); + return this; + } + + public QueueInboxQuery ids(UUID... value) { + this.ids = Arrays.asList(value); + return this; + } + + public QueueInboxQuery ids(List value) { + this.ids = value; + return this; + } + + public QueueInboxQuery isActives(IsActive value) { + this.isActives = List.of(value); + return this; + } + + public QueueInboxQuery isActives(IsActive... value) { + this.isActives = Arrays.asList(value); + return this; + } + + public QueueInboxQuery isActives(List value) { + this.isActives = value; + return this; + } + + public QueueInboxQuery exchanges(String value) { + this.exchanges = List.of(value); + return this; + } + + public QueueInboxQuery exchanges(String... value) { + this.exchanges = Arrays.asList(value); + return this; + } + + public QueueInboxQuery exchanges(List value) { + this.exchanges = value; + return this; + } + + public QueueInboxQuery routes(String value) { + this.routes = List.of(value); + return this; + } + + public QueueInboxQuery routes(String... value) { + this.routes = Arrays.asList(value); + return this; + } + + public QueueInboxQuery routes(List value) { + this.routes = value; + return this; + } + + public QueueInboxQuery status(QueueInboxStatus value) { + this.status = List.of(value); + return this; + } + + public QueueInboxQuery status(QueueInboxStatus... value) { + this.status = Arrays.asList(value); + return this; + } + + public QueueInboxQuery status(List value) { + this.status = value; + return this; + } + + public QueueInboxQuery createdAfter(Instant value) { + this.createdAfter = value; + return this; + } + + public QueueInboxQuery retryThreshold(Integer value) { + this.retryThreshold = value; + return this; + } + + public QueueInboxQuery ordering(Ordering ordering) { + this.setOrder(ordering); + return this; + } + + @Override + protected Class entityClass() { + return QueueInboxEntity.class; + } + + @Override + protected Boolean isFalseQuery() { + return this.isEmpty(this.ids) || this.isEmpty(this.isActives) || this.isEmpty(this.exchanges) + || this.isEmpty(this.routes) || this.isEmpty(this.status); + } + + @Override + protected Predicate applyFilters(QueryContext queryContext) { + List predicates = new ArrayList<>(); + if (this.ids != null) { + CriteriaBuilder.In inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueInboxEntity._id)); + for (UUID item : this.ids) inClause.value(item); + predicates.add(inClause); + } + + if (this.createdAfter != null) { + predicates.add(queryContext.CriteriaBuilder.greaterThan(queryContext.Root.get(QueueInboxEntity._createdAt), this.createdAfter)); + } + if (this.isActives != null) { + CriteriaBuilder.In inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueInboxEntity._isActive)); + for (IsActive item : this.isActives) inClause.value(item); + predicates.add(inClause); + } + if (this.exchanges != null) { + CriteriaBuilder.In inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueInboxEntity._exchange)); + for (String item : this.exchanges) inClause.value(item); + predicates.add(inClause); + } + if (this.routes != null) { + CriteriaBuilder.In inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueInboxEntity._route)); + for (String item : this.routes) inClause.value(item); + predicates.add(inClause); + } + + if (this.status != null) { + CriteriaBuilder.In inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueInboxEntity._status)); + for (QueueInboxStatus item : this.status) inClause.value(item); + predicates.add(inClause); + } + + if (this.retryThreshold != null) { + predicates.add(queryContext.CriteriaBuilder.or(queryContext.CriteriaBuilder.isNull(queryContext.Root.get(QueueInboxEntity._retryCount)), + queryContext.CriteriaBuilder.lessThanOrEqualTo(queryContext.Root.get(QueueInboxEntity._retryCount), this.retryThreshold))); + } + + if (predicates.size() > 0) { + Predicate[] predicatesArray = predicates.toArray(new Predicate[0]); + return queryContext.CriteriaBuilder.and(predicatesArray); + } else { + return null; + } + } + + @Override + protected QueueInboxEntity convert(Tuple tuple, Set columns) { + QueueInboxEntity item = new QueueInboxEntity(); + item.setId(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._id, UUID.class)); + item.setExchange(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._exchange, String.class)); + item.setTenantId(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._tenantId, UUID.class)); + item.setRoute(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._route, String.class)); + item.setMessage(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._message, String.class)); + item.setMessageId(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._messageId, UUID.class)); + item.setCreatedAt(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._createdAt, Instant.class)); + item.setIsActive(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._isActive, IsActive.class)); + item.setStatus(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._status, QueueInboxStatus.class)); + item.setRetryCount(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._retryCount, Integer.class)); + item.setQueue(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._queue, String.class)); + item.setApplicationId(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._applicationId, String.class)); + item.setCreatedAt(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._createdAt, Instant.class)); + item.setUpdatedAt(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._updatedAt, Instant.class)); + return item; + } + + @Override + protected String fieldNameOf(FieldResolver item) { + if (item.match(QueueInboxEntity._id)) return QueueInboxEntity._id; + else if (item.match(QueueInboxEntity._exchange)) return QueueInboxEntity._exchange; + else if (item.match(QueueInboxEntity._tenantId)) return QueueInboxEntity._tenantId; + else if (item.match(QueueInboxEntity._route)) return QueueInboxEntity._route; + else if (item.match(QueueInboxEntity._message)) return QueueInboxEntity._message; + else if (item.match(QueueInboxEntity._messageId)) return QueueInboxEntity._messageId; + else if (item.match(QueueInboxEntity._createdAt)) return QueueInboxEntity._createdAt; + else if (item.match(QueueInboxEntity._isActive)) return QueueInboxEntity._isActive; + else if (item.match(QueueInboxEntity._status)) return QueueInboxEntity._status; + else if (item.match(QueueInboxEntity._retryCount)) return QueueInboxEntity._retryCount; + else if (item.match(QueueInboxEntity._queue)) return QueueInboxEntity._queue; + else if (item.match(QueueInboxEntity._applicationId)) return QueueInboxEntity._applicationId; + else if (item.match(QueueInboxEntity._createdAt)) return QueueInboxEntity._createdAt; + else if (item.match(QueueInboxEntity._updatedAt)) return QueueInboxEntity._updatedAt; + else return null; + } +} diff --git a/dmp-backend/core/src/main/java/eu/eudat/query/QueueOutboxQuery.java b/dmp-backend/core/src/main/java/eu/eudat/query/QueueOutboxQuery.java new file mode 100644 index 000000000..05c5d9c45 --- /dev/null +++ b/dmp-backend/core/src/main/java/eu/eudat/query/QueueOutboxQuery.java @@ -0,0 +1,236 @@ +package eu.eudat.query; + + +import eu.eudat.commons.enums.IsActive; +import eu.eudat.data.QueueOutboxEntity; +import gr.cite.queueoutbox.entity.QueueOutboxNotifyStatus; +import gr.cite.tools.data.query.FieldResolver; +import gr.cite.tools.data.query.Ordering; +import gr.cite.tools.data.query.QueryBase; +import gr.cite.tools.data.query.QueryContext; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import jakarta.persistence.Tuple; +import jakarta.persistence.criteria.CriteriaBuilder; +import jakarta.persistence.criteria.Predicate; +import java.time.Instant; +import java.util.*; + +@Component +@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class QueueOutboxQuery extends QueryBase { + + private Collection ids; + private Instant createdAfter; + private Collection isActives; + private Collection exchanges; + private Collection routes; + private Collection notifyStatus; + private Integer retryThreshold; + private Integer confirmTimeout; + + public QueueOutboxQuery ids(UUID value) { + this.ids = List.of(value); + return this; + } + + public QueueOutboxQuery ids(UUID... value) { + this.ids = Arrays.asList(value); + return this; + } + + public QueueOutboxQuery ids(List value) { + this.ids = value; + return this; + } + + public QueueOutboxQuery isActives(IsActive value) { + this.isActives = List.of(value); + return this; + } + + public QueueOutboxQuery isActives(IsActive... value) { + this.isActives = Arrays.asList(value); + return this; + } + + public QueueOutboxQuery isActives(List value) { + this.isActives = value; + return this; + } + + public QueueOutboxQuery exchanges(String value) { + this.exchanges = List.of(value); + return this; + } + + public QueueOutboxQuery exchanges(String... value) { + this.exchanges = Arrays.asList(value); + return this; + } + + public QueueOutboxQuery exchanges(List value) { + this.exchanges = value; + return this; + } + + public QueueOutboxQuery routes(String value) { + this.routes = List.of(value); + return this; + } + + public QueueOutboxQuery routes(String... value) { + this.routes = Arrays.asList(value); + return this; + } + + public QueueOutboxQuery routes(List value) { + this.routes = value; + return this; + } + + public QueueOutboxQuery notifyStatus(QueueOutboxNotifyStatus value) { + this.notifyStatus = List.of(value); + return this; + } + + public QueueOutboxQuery notifyStatus(QueueOutboxNotifyStatus... value) { + this.notifyStatus = Arrays.asList(value); + return this; + } + + public QueueOutboxQuery notifyStatus(List value) { + this.notifyStatus = value; + return this; + } + + public QueueOutboxQuery createdAfter(Instant value) { + this.createdAfter = value; + return this; + } + + public QueueOutboxQuery retryThreshold(Integer value) { + this.retryThreshold = value; + return this; + } + + public QueueOutboxQuery confirmTimeout(Integer value) { + this.confirmTimeout = value; + return this; + } + + public QueueOutboxQuery ordering(Ordering ordering) { + this.setOrder(ordering); + return this; + } + + @Override + protected Class entityClass() { + return QueueOutboxEntity.class; + } + + @Override + protected Boolean isFalseQuery() { + return this.isEmpty(this.ids) || this.isEmpty(this.isActives) || this.isEmpty(this.exchanges) + || this.isEmpty(this.routes) || this.isEmpty(this.notifyStatus); + } + + @Override + protected Predicate applyFilters(QueryContext queryContext) { + List predicates = new ArrayList<>(); + if (this.ids != null) { + CriteriaBuilder.In inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueOutboxEntity._id)); + for (UUID item : this.ids) inClause.value(item); + predicates.add(inClause); + } + + if (this.createdAfter != null) { + predicates.add(queryContext.CriteriaBuilder.greaterThan(queryContext.Root.get(QueueOutboxEntity._createdAt), this.createdAfter)); + } + if (this.isActives != null) { + CriteriaBuilder.In inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueOutboxEntity._isActive)); + for (IsActive item : this.isActives) inClause.value(item); + predicates.add(inClause); + } + if (this.exchanges != null) { + CriteriaBuilder.In inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueOutboxEntity._exchange)); + for (String item : this.exchanges) inClause.value(item); + predicates.add(inClause); + } + if (this.routes != null) { + CriteriaBuilder.In inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueOutboxEntity._route)); + for (String item : this.routes) inClause.value(item); + predicates.add(inClause); + } + + if (this.notifyStatus != null) { + CriteriaBuilder.In inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueOutboxEntity._notifyStatus)); + for (QueueOutboxNotifyStatus item : this.notifyStatus) inClause.value(item); + predicates.add(inClause); + } + + if (this.retryThreshold != null) { + predicates.add(queryContext.CriteriaBuilder.or(queryContext.CriteriaBuilder.isNull(queryContext.Root.get(QueueOutboxEntity._retryCount)), + queryContext.CriteriaBuilder.lessThanOrEqualTo(queryContext.Root.get(QueueOutboxEntity._retryCount), this.retryThreshold))); + } + + if (this.confirmTimeout != null) { + predicates.add(queryContext.CriteriaBuilder.or(queryContext.CriteriaBuilder.isNull(queryContext.Root.get(QueueOutboxEntity._publishedAt)), + queryContext.CriteriaBuilder.and( + queryContext.CriteriaBuilder.isNotNull(queryContext.Root.get(QueueOutboxEntity._publishedAt)), + queryContext.CriteriaBuilder.isNull(queryContext.Root.get(QueueOutboxEntity._confirmedAt)), + queryContext.CriteriaBuilder.lessThan(queryContext.Root.get(QueueOutboxEntity._publishedAt), Instant.now().minusSeconds(this.confirmTimeout)) + ) + )); + } + + + if (predicates.size() > 0) { + Predicate[] predicatesArray = predicates.toArray(new Predicate[0]); + return queryContext.CriteriaBuilder.and(predicatesArray); + } else { + return null; + } + } + + @Override + protected QueueOutboxEntity convert(Tuple tuple, Set columns) { + QueueOutboxEntity item = new QueueOutboxEntity(); + item.setId(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._id, UUID.class)); + item.setExchange(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._exchange, String.class)); + item.setTenantId(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._tenantId, UUID.class)); + item.setRoute(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._route, String.class)); + item.setMessage(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._message, String.class)); + item.setMessageId(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._messageId, UUID.class)); + item.setCreatedAt(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._createdAt, Instant.class)); + item.setIsActive(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._isActive, IsActive.class)); + item.setNotifyStatus(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._notifyStatus, QueueOutboxNotifyStatus.class)); + item.setRetryCount(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._retryCount, Integer.class)); + item.setPublishedAt(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._publishedAt, Instant.class)); + item.setConfirmedAt(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._confirmedAt, Instant.class)); + item.setCreatedAt(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._createdAt, Instant.class)); + item.setUpdatedAt(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._updatedAt, Instant.class)); + return item; + } + + @Override + protected String fieldNameOf(FieldResolver item) { + if (item.match(QueueOutboxEntity._id)) return QueueOutboxEntity._id; + else if (item.match(QueueOutboxEntity._exchange)) return QueueOutboxEntity._exchange; + else if (item.match(QueueOutboxEntity._tenantId)) return QueueOutboxEntity._tenantId; + else if (item.match(QueueOutboxEntity._route)) return QueueOutboxEntity._route; + else if (item.match(QueueOutboxEntity._message)) return QueueOutboxEntity._message; + else if (item.match(QueueOutboxEntity._messageId)) return QueueOutboxEntity._messageId; + else if (item.match(QueueOutboxEntity._createdAt)) return QueueOutboxEntity._createdAt; + else if (item.match(QueueOutboxEntity._isActive)) return QueueOutboxEntity._isActive; + else if (item.match(QueueOutboxEntity._notifyStatus)) return QueueOutboxEntity._notifyStatus; + else if (item.match(QueueOutboxEntity._retryCount)) return QueueOutboxEntity._retryCount; + else if (item.match(QueueOutboxEntity._publishedAt)) return QueueOutboxEntity._publishedAt; + else if (item.match(QueueOutboxEntity._confirmedAt)) return QueueOutboxEntity._confirmedAt; + else if (item.match(QueueOutboxEntity._createdAt)) return QueueOutboxEntity._createdAt; + else if (item.match(QueueOutboxEntity._updatedAt)) return QueueOutboxEntity._updatedAt; + else return null; + } +} diff --git a/dmp-backend/pom.xml b/dmp-backend/pom.xml index 8861a1a65..8660d01cb 100644 --- a/dmp-backend/pom.xml +++ b/dmp-backend/pom.xml @@ -355,6 +355,23 @@ 1.0.0 + + gr.cite + queue-inbox + 1.0.0 + + + gr.cite + queue-outbox + 1.0.0 + + + + gr.cite + rabbitmq-core + 1.0.0 + + diff --git a/dmp-backend/web/src/main/resources/config/application.yml b/dmp-backend/web/src/main/resources/config/application.yml index 1e0ca9ee9..e0577f211 100644 --- a/dmp-backend/web/src/main/resources/config/application.yml +++ b/dmp-backend/web/src/main/resources/config/application.yml @@ -25,5 +25,6 @@ spring: optional:classpath:config/errors.yml[.yml], optional:classpath:config/errors-${spring.profiles.active}.yml[.yml], optional:file:../config/errors-${spring.profiles.active}.yml[.yml], optional:classpath:config/storage.yml[.yml], optional:classpath:config/storage-${spring.profiles.active}.yml[.yml], optional:file:../config/storage-${spring.profiles.active}.yml[.yml], optional:classpath:config/reference-type.yml[.yml], optional:classpath:config/reference-type-${spring.profiles.active}.yml[.yml], optional:file:../config/reference-type-${spring.profiles.active}.yml[.yml], - optional:classpath:config/tenant.yml[.yml], optional:classpath:config/tenant-${spring.profiles.active}.yml[.yml], optional:file:../config/tenant-${spring.profiles.active}.yml[.yml] + optional:classpath:config/tenant.yml[.yml], optional:classpath:config/tenant-${spring.profiles.active}.yml[.yml], optional:file:../config/tenant-${spring.profiles.active}.yml[.yml], + optional:classpath:config/queue.yml[.yml], optional:classpath:config/queue-${spring.profiles.active}.yml[.yml], optional:file:../config/queue-${spring.profiles.active}.yml[.yml] diff --git a/dmp-backend/web/src/main/resources/config/queue-devel.yml b/dmp-backend/web/src/main/resources/config/queue-devel.yml new file mode 100644 index 000000000..a5a7d9992 --- /dev/null +++ b/dmp-backend/web/src/main/resources/config/queue-devel.yml @@ -0,0 +1,18 @@ +queue: + rabbitmq: + durable: true + queue: cite_dmp_devel_web_inbox_queue + exchange: cite_dmp_devel_queue + listenerEnabled: true + publisherEnabled: true + task: + publisher: + options: + exchange: cite_dmp_devel_queue + rabbitmq: + enable: true + listener: + options: + exchange: cite_dmp_devel_queue + rabbitmq: + enable: true diff --git a/dmp-backend/web/src/main/resources/config/queue.yml b/dmp-backend/web/src/main/resources/config/queue.yml new file mode 100644 index 000000000..0f04da973 --- /dev/null +++ b/dmp-backend/web/src/main/resources/config/queue.yml @@ -0,0 +1,58 @@ +spring: + rabbitmq: + host: ${RABBIT_HOST} + port: ${RABBIT_PORT} + username: ${RABBIT_USER} + password: ${RABBIT_PASS} + ssl: + enabled: false +queue: + rabbitmq: + enable: true + app-id: ${THE_API_ID} + durable: null + queue: null + exchange: null + listenerEnabled: true + publisherEnabled: true + #TODO + connection-recovery: + enable: true + network-recovery-interval: 5000 + unreachable-recovery-interval: 5000 + task: + publisher: + enable: true + options: + exchange: null + forget-me-completed-topic: forgetme.completed + notify-topic: notification.notify + tenant-reactivation-topic: tenant.reactivated + tenant-removal-topic: tenant.remove + tenant-touch-topic: tenant.touch + tenant-user-invite-topic: tenant.invite + what-you-know-about-me-completed-topic: whatyouknowaboutme.completed + generate-file-topic: generate.file + rabbitmq: + enable: true + interval-seconds: 30 + options: + retry-threashold: 100 + retry-delay-step-seconds: 300 + max-retry-delay-seconds: 10800 + too-old-to-send-seconds: 604800 + confirm-timeout-seconds: 30 + listener: + enable: true + options: + exchange: null + user-removal-topic: [ "user.remove" ] + user-touched-topic: [ "user.touch" ] + rabbitmq: + enable: true + interval-seconds: 30 + options: + retry-threashold: 100 + retry-delay-step-seconds: 300 + max-retry-delay-seconds: 10800 + too-old-to-send-seconds: 604800 \ No newline at end of file