add inbox, outbox
This commit is contained in:
parent
2a00fe7ec1
commit
f6100a8ea2
|
@ -0,0 +1,177 @@
|
||||||
|
package eu.eudat.data;
|
||||||
|
|
||||||
|
import eu.eudat.commons.enums.IsActive;
|
||||||
|
import eu.eudat.data.converters.enums.IsActiveConverter;
|
||||||
|
import gr.cite.queueinbox.entity.QueueInbox;
|
||||||
|
import gr.cite.queueinbox.entity.QueueInboxStatus;
|
||||||
|
import jakarta.persistence.*;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@Entity
|
||||||
|
@Table(name = "\"QueueInbox\"")
|
||||||
|
public class QueueInboxEntity implements QueueInbox {
|
||||||
|
@Id
|
||||||
|
@Column(name = "\"id\"", columnDefinition = "uuid", updatable = false, nullable = false)
|
||||||
|
private UUID id;
|
||||||
|
public final static String _id = "id";
|
||||||
|
|
||||||
|
@Column(name = "\"queue\"", nullable = false, length = 50)
|
||||||
|
private String queue;
|
||||||
|
public final static String _queue = "queue";
|
||||||
|
|
||||||
|
@Column(name = "\"exchange\"", nullable = false, length = 50)
|
||||||
|
private String exchange;
|
||||||
|
public final static String _exchange = "exchange";
|
||||||
|
|
||||||
|
@Column(name = "\"route\"", nullable = false, length = 50)
|
||||||
|
private String route;
|
||||||
|
public final static String _route = "route";
|
||||||
|
|
||||||
|
@Column(name = "\"application_id\"", nullable = false, length = 100)
|
||||||
|
private String applicationId;
|
||||||
|
public final static String _applicationId = "applicationId";
|
||||||
|
|
||||||
|
@Column(name = "\"message_id\"", columnDefinition = "uuid", nullable = false)
|
||||||
|
private UUID messageId;
|
||||||
|
public final static String _messageId = "messageId";
|
||||||
|
|
||||||
|
@Column(name = "\"message\"", columnDefinition = "json", nullable = false)
|
||||||
|
private String message;
|
||||||
|
public final static String _message = "message";
|
||||||
|
|
||||||
|
@Column(name = "\"retry_count\"", nullable = true)
|
||||||
|
private Integer retryCount;
|
||||||
|
public final static String _retryCount = "retryCount";
|
||||||
|
|
||||||
|
@Column(name = "\"tenant\"", columnDefinition = "uuid", nullable = true)
|
||||||
|
private UUID tenantId;
|
||||||
|
public final static String _tenantId = "tenantId";
|
||||||
|
|
||||||
|
@Column(name = "\"is_active\"", length = 20, nullable = false)
|
||||||
|
@Convert(converter = IsActiveConverter.class)
|
||||||
|
private IsActive isActive;
|
||||||
|
public final static String _isActive = "isActive";
|
||||||
|
|
||||||
|
//TODO: as integer
|
||||||
|
@Column(name = "\"status\"", length = 50, nullable = false)
|
||||||
|
@Enumerated(EnumType.STRING)
|
||||||
|
private QueueInboxStatus status;
|
||||||
|
public final static String _status = "status";
|
||||||
|
|
||||||
|
@Column(name = "\"created_at\"", nullable = false)
|
||||||
|
private Instant createdAt;
|
||||||
|
public final static String _createdAt = "createdAt";
|
||||||
|
|
||||||
|
@Column(name = "\"updated_at\"", nullable = false)
|
||||||
|
@Version
|
||||||
|
private Instant updatedAt;
|
||||||
|
public final static String _updatedAt = "updatedAt";
|
||||||
|
|
||||||
|
public UUID getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setId(UUID id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getExchange() {
|
||||||
|
return exchange;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setExchange(String exchange) {
|
||||||
|
this.exchange = exchange;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRoute() {
|
||||||
|
return route;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRoute(String route) {
|
||||||
|
this.route = route;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UUID getMessageId() {
|
||||||
|
return messageId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessageId(UUID messageId) {
|
||||||
|
this.messageId = messageId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMessage() {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessage(String message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UUID getTenantId() {
|
||||||
|
return tenantId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTenantId(UUID tenantId) {
|
||||||
|
this.tenantId = tenantId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IsActive getIsActive() {
|
||||||
|
return isActive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIsActive(IsActive isActive) {
|
||||||
|
this.isActive = isActive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Instant getCreatedAt() {
|
||||||
|
return createdAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCreatedAt(Instant createdAt) {
|
||||||
|
this.createdAt = createdAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Instant getUpdatedAt() {
|
||||||
|
return updatedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUpdatedAt(Instant updatedAt) {
|
||||||
|
this.updatedAt = updatedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getQueue() {
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setQueue(String queue) {
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getApplicationId() {
|
||||||
|
return applicationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setApplicationId(String applicationId) {
|
||||||
|
this.applicationId = applicationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer getRetryCount() {
|
||||||
|
return retryCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRetryCount(Integer retryCount) {
|
||||||
|
this.retryCount = retryCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxStatus getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStatus(QueueInboxStatus status) {
|
||||||
|
this.status = status;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,175 @@
|
||||||
|
package eu.eudat.data;
|
||||||
|
|
||||||
|
import eu.eudat.commons.enums.IsActive;
|
||||||
|
import eu.eudat.data.converters.enums.IsActiveConverter;
|
||||||
|
import gr.cite.queueoutbox.entity.QueueOutbox;
|
||||||
|
import gr.cite.queueoutbox.entity.QueueOutboxNotifyStatus;
|
||||||
|
|
||||||
|
import jakarta.persistence.*;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@Entity
|
||||||
|
@Table(name = "\"QueueOutbox\"")
|
||||||
|
public class QueueOutboxEntity implements QueueOutbox {
|
||||||
|
@Id
|
||||||
|
@Column(name = "\"id\"", columnDefinition = "uuid", updatable = false, nullable = false)
|
||||||
|
private UUID id;
|
||||||
|
public final static String _id = "id";
|
||||||
|
|
||||||
|
@Column(name = "\"exchange\"", nullable = false, length = 50)
|
||||||
|
private String exchange;
|
||||||
|
public final static String _exchange = "exchange";
|
||||||
|
|
||||||
|
@Column(name = "\"route\"", length = 50)
|
||||||
|
private String route;
|
||||||
|
public final static String _route = "route";
|
||||||
|
|
||||||
|
@Column(name = "\"message_id\"", columnDefinition = "uuid", nullable = false)
|
||||||
|
private UUID messageId;
|
||||||
|
public final static String _messageId = "messageId";
|
||||||
|
|
||||||
|
@Column(name = "\"message\"", columnDefinition = "json", nullable = false)
|
||||||
|
private String message;
|
||||||
|
public final static String _message = "message";
|
||||||
|
|
||||||
|
//TODO: as integer
|
||||||
|
@Column(name = "\"notify_status\"", length = 20, nullable = false)
|
||||||
|
@Enumerated(EnumType.STRING)
|
||||||
|
private QueueOutboxNotifyStatus notifyStatus;
|
||||||
|
public final static String _notifyStatus = "notifyStatus";
|
||||||
|
|
||||||
|
@Column(name = "\"retry_count\"", nullable = false)
|
||||||
|
private int retryCount;
|
||||||
|
public final static String _retryCount = "retryCount";
|
||||||
|
|
||||||
|
@Column(name = "\"published_at\"", nullable = true)
|
||||||
|
private Instant publishedAt;
|
||||||
|
public final static String _publishedAt = "publishedAt";
|
||||||
|
|
||||||
|
@Column(name = "\"confirmed_at\"", nullable = true)
|
||||||
|
private Instant confirmedAt;
|
||||||
|
public final static String _confirmedAt = "confirmedAt";
|
||||||
|
|
||||||
|
@Column(name = "\"tenant\"", columnDefinition = "uuid", nullable = true)
|
||||||
|
private UUID tenantId;
|
||||||
|
public final static String _tenantId = "tenantId";
|
||||||
|
|
||||||
|
@Column(name = "\"is_active\"", length = 20, nullable = false)
|
||||||
|
@Convert(converter = IsActiveConverter.class)
|
||||||
|
private IsActive isActive;
|
||||||
|
public final static String _isActive = "isActive";
|
||||||
|
|
||||||
|
@Column(name = "\"created_at\"", nullable = false)
|
||||||
|
private Instant createdAt;
|
||||||
|
public final static String _createdAt = "createdAt";
|
||||||
|
|
||||||
|
@Column(name = "\"updated_at\"", nullable = false)
|
||||||
|
private Instant updatedAt;
|
||||||
|
public final static String _updatedAt = "updatedAt";
|
||||||
|
|
||||||
|
public UUID getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setId(UUID id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getExchange() {
|
||||||
|
return exchange;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setExchange(String exchange) {
|
||||||
|
this.exchange = exchange;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRoute() {
|
||||||
|
return route;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRoute(String route) {
|
||||||
|
this.route = route;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UUID getMessageId() {
|
||||||
|
return messageId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessageId(UUID messageId) {
|
||||||
|
this.messageId = messageId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMessage() {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessage(String message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxNotifyStatus getNotifyStatus() {
|
||||||
|
return notifyStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNotifyStatus(QueueOutboxNotifyStatus notifyStatus) {
|
||||||
|
this.notifyStatus = notifyStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getRetryCount() {
|
||||||
|
return retryCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRetryCount(Integer retryCount) {
|
||||||
|
this.retryCount = retryCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Instant getPublishedAt() {
|
||||||
|
return publishedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPublishedAt(Instant publishedAt) {
|
||||||
|
this.publishedAt = publishedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Instant getConfirmedAt() {
|
||||||
|
return confirmedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConfirmedAt(Instant confirmedAt) {
|
||||||
|
this.confirmedAt = confirmedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UUID getTenantId() {
|
||||||
|
return tenantId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTenantId(UUID tenantId) {
|
||||||
|
this.tenantId = tenantId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IsActive getIsActive() {
|
||||||
|
return isActive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIsActive(IsActive isActive) {
|
||||||
|
this.isActive = isActive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Instant getCreatedAt() {
|
||||||
|
return createdAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCreatedAt(Instant createdAt) {
|
||||||
|
this.createdAt = createdAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Instant getUpdatedAt() {
|
||||||
|
return updatedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUpdatedAt(Instant updatedAt) {
|
||||||
|
this.updatedAt = updatedAt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
package eu.eudat.integrationevent;
|
||||||
|
|
||||||
|
import eu.eudat.integrationevent.inbox.InboxProperties;
|
||||||
|
import eu.eudat.integrationevent.outbox.OutboxProperties;
|
||||||
|
import gr.cite.queueinbox.repository.InboxRepository;
|
||||||
|
import gr.cite.rabbitmq.RabbitConfigurer;
|
||||||
|
import gr.cite.rabbitmq.consumer.InboxBindings;
|
||||||
|
import gr.cite.rabbitmq.consumer.InboxCreator;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableConfigurationProperties({OutboxProperties.class, InboxProperties.class})
|
||||||
|
@ConditionalOnProperty(prefix = "queue.rabbitmq", name = "listenerEnabled")
|
||||||
|
public class AppRabbitConfigurer extends RabbitConfigurer {
|
||||||
|
private ApplicationContext applicationContext;
|
||||||
|
private InboxProperties inboxProperties;
|
||||||
|
|
||||||
|
public AppRabbitConfigurer(ApplicationContext applicationContext, InboxProperties inboxProperties) {
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
|
this.inboxProperties = inboxProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Bean
|
||||||
|
public InboxBindings inboxBindingsCreator() {
|
||||||
|
List<String> bindingItems = new ArrayList<>();
|
||||||
|
bindingItems.addAll(this.inboxProperties.getUserRemovalTopic());
|
||||||
|
bindingItems.addAll(this.inboxProperties.getUserTouchedTopic());
|
||||||
|
|
||||||
|
return new InboxBindings(bindingItems);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// @Bean
|
||||||
|
public InboxCreator inboxCreator() {
|
||||||
|
return (params) -> {
|
||||||
|
InboxRepository inboxRepository = this.applicationContext.getBean(InboxRepository.class);
|
||||||
|
return inboxRepository.create(params) != null;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
package eu.eudat.integrationevent;
|
||||||
|
|
||||||
|
import eu.eudat.integrationevent.inbox.InboxProperties;
|
||||||
|
import eu.eudat.integrationevent.inbox.InboxRepositoryImpl;
|
||||||
|
import gr.cite.queueinbox.InboxConfigurer;
|
||||||
|
import gr.cite.queueinbox.repository.InboxRepository;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableConfigurationProperties({InboxProperties.class})
|
||||||
|
@ConditionalOnProperty(prefix = "queue.task.listener", name = "enable", matchIfMissing = false)
|
||||||
|
public class InboxIntegrationEventConfigurer extends InboxConfigurer {
|
||||||
|
private ApplicationContext applicationContext;
|
||||||
|
private InboxProperties inboxProperties;
|
||||||
|
|
||||||
|
public InboxIntegrationEventConfigurer(ApplicationContext applicationContext, InboxProperties inboxProperties) {
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
|
this.inboxProperties = inboxProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public InboxRepository inboxRepositoryCreator() {
|
||||||
|
return new InboxRepositoryImpl(this.applicationContext, this.inboxProperties);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
package eu.eudat.integrationevent;
|
||||||
|
|
||||||
|
import gr.cite.rabbitmq.IntegrationEventContext;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class IntegrationEventContextImpl implements IntegrationEventContext {
|
||||||
|
private UUID tenant;
|
||||||
|
|
||||||
|
public IntegrationEventContextImpl() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public UUID getTenant() {
|
||||||
|
return tenant;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTenant(UUID tenant) {
|
||||||
|
this.tenant = tenant;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
package eu.eudat.integrationevent;
|
||||||
|
|
||||||
|
|
||||||
|
import eu.eudat.integrationevent.outbox.OutboxProperties;
|
||||||
|
import eu.eudat.integrationevent.outbox.OutboxRepositoryImpl;
|
||||||
|
import gr.cite.queueoutbox.IntegrationEventContextCreator;
|
||||||
|
import gr.cite.queueoutbox.OutboxConfigurer;
|
||||||
|
import gr.cite.queueoutbox.repository.OutboxRepository;
|
||||||
|
import gr.cite.rabbitmq.IntegrationEventMessageConstants;
|
||||||
|
import gr.cite.rabbitmq.RabbitProperties;
|
||||||
|
import gr.cite.rabbitmq.broker.MessageHydrator;
|
||||||
|
import org.springframework.amqp.core.MessageProperties;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.sql.Date;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableConfigurationProperties({OutboxProperties.class})
|
||||||
|
@ConditionalOnProperty(prefix = "queue.task.publisher", name = "enable", matchIfMissing = false)
|
||||||
|
public class OutboxIntegrationEventConfigurer extends OutboxConfigurer {
|
||||||
|
private ApplicationContext applicationContext;
|
||||||
|
private OutboxProperties outboxProperties;
|
||||||
|
|
||||||
|
public OutboxIntegrationEventConfigurer(ApplicationContext applicationContext, OutboxProperties outboxProperties) {
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
|
this.outboxProperties = outboxProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public MessageHydrator messageHydrator(RabbitProperties rabbitProperties) {
|
||||||
|
return (message, event, eventContext) -> {
|
||||||
|
MessageProperties messageProperties = message.getMessageProperties();
|
||||||
|
messageProperties.setAppId(rabbitProperties.getAppId());
|
||||||
|
messageProperties.setContentEncoding(StandardCharsets.UTF_8.displayName());
|
||||||
|
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
|
||||||
|
//messageProperties.setUserId(userContext.getCurrentUser().toString());
|
||||||
|
messageProperties.setTimestamp(Date.from(Instant.now()));
|
||||||
|
messageProperties.setMessageId(event.getMessageId().toString());
|
||||||
|
|
||||||
|
if (eventContext != null) {
|
||||||
|
UUID tenant = ((IntegrationEventContextImpl) eventContext).getTenant();
|
||||||
|
if (tenant != null) {
|
||||||
|
messageProperties.setHeader(IntegrationEventMessageConstants.TENANT, tenant);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return message;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public IntegrationEventContextCreator integrationEventContextCreator() {
|
||||||
|
return (message) -> new IntegrationEventContextImpl();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public OutboxRepository outboxRepositoryCreator() {
|
||||||
|
return new OutboxRepositoryImpl(this.applicationContext, this.outboxProperties);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
package eu.eudat.integrationevent;
|
||||||
|
|
||||||
|
import eu.eudat.integrationevent.outbox.OutboxProperties;
|
||||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableConfigurationProperties(OutboxProperties.class)
|
||||||
|
public class OutboxPropertiesConfiguration {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package eu.eudat.integrationevent;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
public class TrackedEvent {
|
||||||
|
public String trackingContextTag;
|
||||||
|
|
||||||
|
public String getTrackingContextTag() {
|
||||||
|
return trackingContextTag;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTrackingContextTag(String trackingContextTag) {
|
||||||
|
this.trackingContextTag = trackingContextTag;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
package eu.eudat.integrationevent.inbox;
|
||||||
|
|
||||||
|
public interface ConsistencyHandler<T extends ConsistencyPredicates> {
|
||||||
|
Boolean isConsistent(T consistencyPredicates);
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
package eu.eudat.integrationevent.inbox;
|
||||||
|
|
||||||
|
public interface ConsistencyPredicates {
|
||||||
|
}
|
|
@ -0,0 +1,8 @@
|
||||||
|
package eu.eudat.integrationevent.inbox;
|
||||||
|
|
||||||
|
public enum EventProcessingStatus {
|
||||||
|
Error,
|
||||||
|
Success,
|
||||||
|
Postponed,
|
||||||
|
Discard
|
||||||
|
}
|
|
@ -0,0 +1,56 @@
|
||||||
|
package eu.eudat.integrationevent.inbox;
|
||||||
|
|
||||||
|
import gr.cite.commons.web.oidc.principal.MyPrincipal;
|
||||||
|
import org.springframework.security.oauth2.core.ClaimAccessor;
|
||||||
|
import org.springframework.security.oauth2.jwt.JwtClaimNames;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class InboxPrincipal implements MyPrincipal, ClaimAccessor {
|
||||||
|
private Map<String, Object> claims;
|
||||||
|
private boolean isAuthenticated;
|
||||||
|
|
||||||
|
public InboxPrincipal(Boolean isAuthenticated, String name) {
|
||||||
|
this.claims = new HashMap<>();
|
||||||
|
this.put(JwtClaimNames.SUB, name);
|
||||||
|
this.isAuthenticated = isAuthenticated;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean isAuthenticated() {
|
||||||
|
return this.isAuthenticated;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> getClaims() {
|
||||||
|
return this.claims;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getClaimAsStringList(String claim) {
|
||||||
|
if (claims == null) return null;
|
||||||
|
return this.getClaimAsStringList(claim);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return this.getClaimAsString(JwtClaimNames.SUB);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(String key, Object value) {
|
||||||
|
this.claims.put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static InboxPrincipal build(IntegrationEventProperties properties) {
|
||||||
|
InboxPrincipal inboxPrincipal = new InboxPrincipal(true, "IntegrationEventQueueAppId");
|
||||||
|
inboxPrincipal.put("client_id", properties.getAppId());
|
||||||
|
inboxPrincipal.put("active", "true");
|
||||||
|
inboxPrincipal.put("nbf", Instant.now().minus(30, ChronoUnit.SECONDS).toString());
|
||||||
|
inboxPrincipal.put("exp", Instant.now().plus(10, ChronoUnit.MINUTES).toString());
|
||||||
|
return inboxPrincipal;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,42 @@
|
||||||
|
package eu.eudat.integrationevent.inbox;
|
||||||
|
|
||||||
|
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
//import org.springframework.boot.context.properties.ConstructorBinding;
|
||||||
|
import org.springframework.validation.annotation.Validated;
|
||||||
|
|
||||||
|
import jakarta.validation.constraints.NotNull;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Validated
|
||||||
|
@ConfigurationProperties(prefix = "queue.task.listener.options")
|
||||||
|
//@ConstructorBinding
|
||||||
|
public class InboxProperties {
|
||||||
|
@NotNull
|
||||||
|
private final String exchange;
|
||||||
|
@NotNull
|
||||||
|
private final List<String> userRemovalTopic;
|
||||||
|
@NotNull
|
||||||
|
private final List<String> userTouchedTopic;
|
||||||
|
|
||||||
|
public InboxProperties(
|
||||||
|
String exchange,
|
||||||
|
List<String> userRemovalTopic,
|
||||||
|
List<String> userTouchedTopic) {
|
||||||
|
this.exchange = exchange;
|
||||||
|
this.userRemovalTopic = userRemovalTopic;
|
||||||
|
this.userTouchedTopic = userTouchedTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getUserRemovalTopic() {
|
||||||
|
return userRemovalTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getUserTouchedTopic() {
|
||||||
|
return userTouchedTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getExchange() {
|
||||||
|
return exchange;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,338 @@
|
||||||
|
package eu.eudat.integrationevent.inbox;
|
||||||
|
|
||||||
|
|
||||||
|
import eu.eudat.commons.JsonHandlingService;
|
||||||
|
import eu.eudat.commons.enums.IsActive;
|
||||||
|
import eu.eudat.commons.fake.FakeRequestScope;
|
||||||
|
import eu.eudat.data.QueueInboxEntity;
|
||||||
|
import eu.eudat.integrationevent.TrackedEvent;
|
||||||
|
import eu.eudat.integrationevent.inbox.userremoval.UserRemovalIntegrationEventHandler;
|
||||||
|
import eu.eudat.integrationevent.inbox.usertouched.UserTouchedIntegrationEventHandler;
|
||||||
|
import eu.eudat.query.QueueInboxQuery;
|
||||||
|
import gr.cite.queueinbox.entity.QueueInbox;
|
||||||
|
import gr.cite.queueinbox.entity.QueueInboxStatus;
|
||||||
|
import gr.cite.queueinbox.repository.CandidateInfo;
|
||||||
|
import gr.cite.queueinbox.repository.InboxRepository;
|
||||||
|
import gr.cite.queueinbox.task.MessageOptions;
|
||||||
|
import gr.cite.rabbitmq.consumer.InboxCreatorParams;
|
||||||
|
import gr.cite.tools.data.query.Ordering;
|
||||||
|
import gr.cite.tools.data.query.QueryFactory;
|
||||||
|
import gr.cite.tools.logging.LoggerService;
|
||||||
|
import jakarta.persistence.*;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
public class InboxRepositoryImpl implements InboxRepository {
|
||||||
|
|
||||||
|
protected final ApplicationContext applicationContext;
|
||||||
|
private final Random random = new Random();
|
||||||
|
private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(InboxRepositoryImpl.class));
|
||||||
|
private final JsonHandlingService jsonHandlingService;
|
||||||
|
private final InboxProperties inboxProperties;
|
||||||
|
|
||||||
|
public InboxRepositoryImpl(
|
||||||
|
ApplicationContext applicationContext,
|
||||||
|
InboxProperties inboxProperties
|
||||||
|
) {
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
|
this.jsonHandlingService = this.applicationContext.getBean(JsonHandlingService.class);
|
||||||
|
this.inboxProperties = inboxProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CandidateInfo candidate(Instant lastCandidateCreationTimestamp, MessageOptions options) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
CandidateInfo candidate = null;
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
QueueInboxEntity item = queryFactory.query(QueueInboxQuery.class)
|
||||||
|
.isActives(IsActive.Active)
|
||||||
|
.status(QueueInboxStatus.PENDING, QueueInboxStatus.ERROR)
|
||||||
|
.retryThreshold(options.getRetryThreashold())
|
||||||
|
.createdAfter(lastCandidateCreationTimestamp)
|
||||||
|
.ordering(new Ordering().addAscending(QueueInboxEntity._createdAt))
|
||||||
|
.first();
|
||||||
|
|
||||||
|
if (item != null) {
|
||||||
|
QueueInboxStatus prevState = item.getStatus();
|
||||||
|
item.setStatus(QueueInboxStatus.PROCESSING);
|
||||||
|
|
||||||
|
entityManager.merge(item);
|
||||||
|
entityManager.flush();
|
||||||
|
|
||||||
|
candidate = new CandidateInfo();
|
||||||
|
candidate.setId(item.getId());
|
||||||
|
candidate.setCreatedAt(item.getCreatedAt());
|
||||||
|
candidate.setPreviousState(prevState);
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (OptimisticLockException ex) {
|
||||||
|
// we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working
|
||||||
|
this.logger.debug("Concurrency exception getting queue inbox. Skipping: {} ", ex.getMessage());
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
candidate = null;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem getting list of queue inbox. Skipping: {}", ex.getMessage(), ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
candidate = null;
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem getting list of queue inbox. Skipping: {}", ex.getMessage(), ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
return candidate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean shouldOmit(CandidateInfo candidate, Function<QueueInbox, Boolean> shouldOmit) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
Boolean success = false;
|
||||||
|
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
QueueInboxEntity item = queryFactory.query(QueueInboxQuery.class).ids(candidate.getId()).first();
|
||||||
|
|
||||||
|
if (item == null) {
|
||||||
|
this.logger.warn("Could not lookup queue inbox {} to process. Continuing...", candidate.getId());
|
||||||
|
} else {
|
||||||
|
if (shouldOmit.apply(item)) {
|
||||||
|
item.setStatus(QueueInboxStatus.OMITTED);
|
||||||
|
|
||||||
|
entityManager.merge(item);
|
||||||
|
entityManager.flush();
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
success = false;
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean shouldWait(CandidateInfo candidate, Function<QueueInbox, Boolean> itIsTimeFunc) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
Boolean success = false;
|
||||||
|
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
QueueInboxEntity item = queryFactory.query(QueueInboxQuery.class).ids(candidate.getId()).first();
|
||||||
|
|
||||||
|
if (item.getRetryCount() != null && item.getRetryCount() >= 1) {
|
||||||
|
Boolean itIsTime = itIsTimeFunc.apply(item);
|
||||||
|
|
||||||
|
if (!itIsTime) {
|
||||||
|
item.setStatus(candidate.getPreviousState());
|
||||||
|
|
||||||
|
entityManager.merge(item);
|
||||||
|
entityManager.flush();
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
success = !itIsTime;
|
||||||
|
}
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
success = false;
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueInbox create(InboxCreatorParams inboxCreatorParams) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
Boolean success = false;
|
||||||
|
QueueInboxEntity queueMessage = null;
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
queueMessage = this.createQueueInboxEntity(inboxCreatorParams);
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
entityManager.persist(queueMessage);
|
||||||
|
entityManager.flush();
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
success = false;
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
}
|
||||||
|
return queueMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
private QueueInboxEntity createQueueInboxEntity(InboxCreatorParams inboxCreatorParams) {
|
||||||
|
|
||||||
|
QueueInboxEntity queueMessage = new QueueInboxEntity();
|
||||||
|
queueMessage.setId(UUID.randomUUID());
|
||||||
|
queueMessage.setTenantId(null);
|
||||||
|
queueMessage.setExchange(this.inboxProperties.getExchange());
|
||||||
|
queueMessage.setRoute(inboxCreatorParams.getRoutingKey());
|
||||||
|
queueMessage.setQueue(inboxCreatorParams.getQueueName());
|
||||||
|
queueMessage.setApplicationId(inboxCreatorParams.getAppId());
|
||||||
|
queueMessage.setMessageId(UUID.fromString(inboxCreatorParams.getMessageId()));
|
||||||
|
queueMessage.setMessage(inboxCreatorParams.getMessageBody());
|
||||||
|
queueMessage.setIsActive(IsActive.Active);
|
||||||
|
queueMessage.setStatus(QueueInboxStatus.PENDING);
|
||||||
|
queueMessage.setRetryCount(0);
|
||||||
|
queueMessage.setCreatedAt(Instant.now());
|
||||||
|
|
||||||
|
return queueMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean emit(CandidateInfo candidateInfo) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
Boolean success = false;
|
||||||
|
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
QueueInboxEntity queueInboxMessage = queryFactory.query(QueueInboxQuery.class).ids(candidateInfo.getId()).first();
|
||||||
|
|
||||||
|
if (queueInboxMessage == null) {
|
||||||
|
this.logger.warn("Could not lookup queue inbox {} to process. Continuing...", candidateInfo.getId());
|
||||||
|
} else {
|
||||||
|
|
||||||
|
EventProcessingStatus status = this.processMessage(queueInboxMessage.getRoute(), queueInboxMessage.getMessageId().toString(), queueInboxMessage.getApplicationId(), queueInboxMessage.getMessage());
|
||||||
|
switch (status) {
|
||||||
|
case Success: {
|
||||||
|
queueInboxMessage.setStatus(QueueInboxStatus.SUCCESSFUL);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case Postponed: {
|
||||||
|
queueInboxMessage.setStatus(QueueInboxStatus.PARKED);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case Error: {
|
||||||
|
queueInboxMessage.setStatus(QueueInboxStatus.ERROR);
|
||||||
|
queueInboxMessage.setRetryCount(queueInboxMessage.getRetryCount() != null ? queueInboxMessage.getRetryCount() + 1 : 0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case Discard:
|
||||||
|
default: {
|
||||||
|
queueInboxMessage.setStatus(QueueInboxStatus.DISCARD);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
success = status == EventProcessingStatus.Success;
|
||||||
|
|
||||||
|
entityManager.merge(queueInboxMessage);
|
||||||
|
entityManager.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
success = false;
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
private EventProcessingStatus processMessage(String routingKey, String messageId, String appId, String message) {
|
||||||
|
IntegrationEventHandler handler;
|
||||||
|
if (this.RoutingKeyMatched(routingKey, this.inboxProperties.getUserRemovalTopic())) handler = this.applicationContext.getBean(UserRemovalIntegrationEventHandler.class);
|
||||||
|
else if (this.RoutingKeyMatched(routingKey, this.inboxProperties.getUserTouchedTopic())) handler = this.applicationContext.getBean(UserTouchedIntegrationEventHandler.class);
|
||||||
|
else handler = null;
|
||||||
|
|
||||||
|
if (handler == null) return EventProcessingStatus.Discard;
|
||||||
|
|
||||||
|
IntegrationEventProperties properties = new IntegrationEventProperties();
|
||||||
|
properties.setAppId(appId);
|
||||||
|
properties.setMessageId(messageId);
|
||||||
|
|
||||||
|
TrackedEvent event = this.jsonHandlingService.fromJsonSafe(TrackedEvent.class, message);
|
||||||
|
// using (LogContext.PushProperty(this._logTrackingConfig.LogTrackingContextName, @event.TrackingContextTag))
|
||||||
|
// {
|
||||||
|
try {
|
||||||
|
return handler.handle(properties, message);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
logger.error("problem handling event from routing key " + routingKey + ". Setting nack and continuing...", ex);
|
||||||
|
return EventProcessingStatus.Error;
|
||||||
|
}
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
private Boolean RoutingKeyMatched(String routingKey, List<String> topics) {
|
||||||
|
if (topics == null || topics.size() == 0) return false;
|
||||||
|
return topics.stream().anyMatch(x -> x.equals(routingKey));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
package eu.eudat.integrationevent.inbox;
|
||||||
|
|
||||||
|
public interface IntegrationEventHandler {
|
||||||
|
EventProcessingStatus handle(IntegrationEventProperties properties, String message);
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package eu.eudat.integrationevent.inbox;
|
||||||
|
|
||||||
|
public class IntegrationEventProperties {
|
||||||
|
private String messageId;
|
||||||
|
private String appId;
|
||||||
|
|
||||||
|
public String getMessageId() {
|
||||||
|
return messageId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessageId(String messageId) {
|
||||||
|
this.messageId = messageId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAppId() {
|
||||||
|
return appId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAppId(String appId) {
|
||||||
|
this.appId = appId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
package eu.eudat.integrationevent.inbox.userremoval;
|
||||||
|
|
||||||
|
import eu.eudat.integrationevent.inbox.ConsistencyHandler;
|
||||||
|
import eu.eudat.query.UserQuery;
|
||||||
|
import gr.cite.tools.data.query.QueryFactory;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
public class UserRemovalConsistencyHandler implements ConsistencyHandler<UserRemovalConsistencyPredicates> {
|
||||||
|
|
||||||
|
private final QueryFactory queryFactory;
|
||||||
|
|
||||||
|
public UserRemovalConsistencyHandler(QueryFactory queryFactory) {
|
||||||
|
this.queryFactory = queryFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean isConsistent(UserRemovalConsistencyPredicates consistencyPredicates) {
|
||||||
|
long count = this.queryFactory.query(UserQuery.class).ids(consistencyPredicates.getUserId()).count();
|
||||||
|
if (count == 0) return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
package eu.eudat.integrationevent.inbox.userremoval;
|
||||||
|
|
||||||
|
import eu.eudat.integrationevent.inbox.ConsistencyPredicates;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class UserRemovalConsistencyPredicates implements ConsistencyPredicates {
|
||||||
|
public UserRemovalConsistencyPredicates(UUID userId) {
|
||||||
|
this.userId = userId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private UUID userId;
|
||||||
|
|
||||||
|
public UUID getUserId() {
|
||||||
|
return userId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUserId(UUID userId) {
|
||||||
|
this.userId = userId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
package eu.eudat.integrationevent.inbox.userremoval;
|
||||||
|
|
||||||
|
import eu.eudat.integrationevent.TrackedEvent;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class UserRemovalIntegrationEvent extends TrackedEvent {
|
||||||
|
private UUID userId;
|
||||||
|
private UUID tenant;
|
||||||
|
|
||||||
|
public UUID getUserId() {
|
||||||
|
return userId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUserId(UUID userId) {
|
||||||
|
this.userId = userId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UUID getTenant() {
|
||||||
|
return tenant;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTenant(UUID tenant) {
|
||||||
|
this.tenant = tenant;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
package eu.eudat.integrationevent.inbox.userremoval;
|
||||||
|
|
||||||
|
import eu.eudat.integrationevent.inbox.IntegrationEventHandler;
|
||||||
|
|
||||||
|
public interface UserRemovalIntegrationEventHandler extends IntegrationEventHandler {
|
||||||
|
}
|
|
@ -0,0 +1,125 @@
|
||||||
|
package eu.eudat.integrationevent.inbox.userremoval;
|
||||||
|
|
||||||
|
import eu.eudat.audit.AuditableAction;
|
||||||
|
import eu.eudat.commons.JsonHandlingService;
|
||||||
|
import eu.eudat.commons.fake.FakeRequestScope;
|
||||||
|
import eu.eudat.data.TenantEntity;
|
||||||
|
import eu.eudat.errorcode.ErrorThesaurusProperties;
|
||||||
|
import eu.eudat.integrationevent.inbox.EventProcessingStatus;
|
||||||
|
import eu.eudat.integrationevent.inbox.InboxPrincipal;
|
||||||
|
import eu.eudat.integrationevent.inbox.IntegrationEventProperties;
|
||||||
|
import eu.eudat.query.TenantQuery;
|
||||||
|
import eu.eudat.service.user.UserService;
|
||||||
|
import gr.cite.commons.web.oidc.principal.CurrentPrincipalResolver;
|
||||||
|
|
||||||
|
import gr.cite.tools.auditing.AuditService;
|
||||||
|
import gr.cite.tools.data.query.QueryFactory;
|
||||||
|
import gr.cite.tools.exception.MyValidationException;
|
||||||
|
import gr.cite.tools.fieldset.BaseFieldSet;
|
||||||
|
import gr.cite.tools.logging.LoggerService;
|
||||||
|
import jakarta.persistence.*;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.MessageSource;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.context.i18n.LocaleContextHolder;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.AbstractMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
public class UserRemovalIntegrationEventHandlerImpl implements UserRemovalIntegrationEventHandler {
|
||||||
|
private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(UserRemovalIntegrationEventHandlerImpl.class));
|
||||||
|
private final JsonHandlingService jsonHandlingService;
|
||||||
|
protected final ApplicationContext applicationContext;
|
||||||
|
private final ErrorThesaurusProperties errors;
|
||||||
|
private final MessageSource messageSource;
|
||||||
|
|
||||||
|
public UserRemovalIntegrationEventHandlerImpl(
|
||||||
|
JsonHandlingService jsonHandlingService,
|
||||||
|
ApplicationContext applicationContext,
|
||||||
|
ErrorThesaurusProperties errors,
|
||||||
|
MessageSource messageSource
|
||||||
|
) {
|
||||||
|
this.jsonHandlingService = jsonHandlingService;
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
|
this.errors = errors;
|
||||||
|
this.messageSource = messageSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EventProcessingStatus handle(IntegrationEventProperties properties, String message) {
|
||||||
|
UserRemovalIntegrationEvent event = this.jsonHandlingService.fromJsonSafe(UserRemovalIntegrationEvent.class, message);
|
||||||
|
if (event == null) return EventProcessingStatus.Error;
|
||||||
|
if (event.getUserId() == null) throw new MyValidationException(this.errors.getModelValidation().getCode(), "userId", messageSource.getMessage("Validation_Required", new Object[]{"userId"}, LocaleContextHolder.getLocale()));
|
||||||
|
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
// TenantScope scope = this.applicationContext.getBean(TenantScope.class);
|
||||||
|
// if (scope.isMultitenant() && event.getTenant() != null) {
|
||||||
|
// TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(event.getTenant()).firstAs(new BaseFieldSet().ensure(Tenant._id).ensure(Tenant._code));
|
||||||
|
// if (tenant == null) {
|
||||||
|
// logger.error("missing tenant from event message");
|
||||||
|
// return EventProcessingStatus.Error;
|
||||||
|
// }
|
||||||
|
// scope.setTenant(event.getTenant(), tenant.getCode());
|
||||||
|
// } else if (scope.isMultitenant()) {
|
||||||
|
// logger.error("missing tenant from event message");
|
||||||
|
// return EventProcessingStatus.Error;
|
||||||
|
// }
|
||||||
|
|
||||||
|
CurrentPrincipalResolver currentPrincipalResolver = this.applicationContext.getBean(CurrentPrincipalResolver.class);
|
||||||
|
currentPrincipalResolver.push(InboxPrincipal.build(properties));
|
||||||
|
|
||||||
|
UserRemovalConsistencyHandler userRemovalConsistencyHandler = this.applicationContext.getBean(UserRemovalConsistencyHandler.class);
|
||||||
|
if (!(userRemovalConsistencyHandler.isConsistent(new UserRemovalConsistencyPredicates(event.getUserId())))) return EventProcessingStatus.Postponed;
|
||||||
|
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
try {
|
||||||
|
UserService userService = this.applicationContext.getBean(UserService.class);
|
||||||
|
userService.deleteAndSave(event.getUserId());
|
||||||
|
|
||||||
|
AuditService auditService = this.applicationContext.getBean(AuditService.class);
|
||||||
|
|
||||||
|
auditService.track(AuditableAction.User_Delete, Map.ofEntries(
|
||||||
|
new AbstractMap.SimpleEntry<String, Object>("id", event.getUserId())
|
||||||
|
));
|
||||||
|
//auditService.trackIdentity(AuditableAction.IdentityTracking_Action);
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception e) {
|
||||||
|
transaction.rollback();
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
currentPrincipalResolver.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (OptimisticLockException ex) {
|
||||||
|
// we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working
|
||||||
|
this.logger.debug("Concurrency exception getting queue outbox. Skipping: {} ", ex.getMessage());
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
package eu.eudat.integrationevent.inbox.usertouched;
|
||||||
|
|
||||||
|
|
||||||
|
import eu.eudat.integrationevent.TrackedEvent;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class UserTouchedIntegrationEvent extends TrackedEvent {
|
||||||
|
private UUID id;
|
||||||
|
private UUID tenant;
|
||||||
|
private String firstName;
|
||||||
|
private String lastName;
|
||||||
|
|
||||||
|
public UUID getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setId(UUID id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UUID getTenant() {
|
||||||
|
return tenant;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTenant(UUID tenant) {
|
||||||
|
this.tenant = tenant;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFirstName() {
|
||||||
|
return firstName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFirstName(String firstName) {
|
||||||
|
this.firstName = firstName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLastName() {
|
||||||
|
return lastName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLastName(String lastName) {
|
||||||
|
this.lastName = lastName;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
package eu.eudat.integrationevent.inbox.usertouched;
|
||||||
|
|
||||||
|
import eu.eudat.integrationevent.inbox.IntegrationEventHandler;
|
||||||
|
|
||||||
|
public interface UserTouchedIntegrationEventHandler extends IntegrationEventHandler {
|
||||||
|
}
|
|
@ -0,0 +1,116 @@
|
||||||
|
package eu.eudat.integrationevent.inbox.usertouched;
|
||||||
|
|
||||||
|
import eu.eudat.audit.AuditableAction;
|
||||||
|
import eu.eudat.commons.JsonHandlingService;
|
||||||
|
import eu.eudat.commons.fake.FakeRequestScope;
|
||||||
|
import eu.eudat.integrationevent.inbox.EventProcessingStatus;
|
||||||
|
import eu.eudat.integrationevent.inbox.InboxPrincipal;
|
||||||
|
import eu.eudat.integrationevent.inbox.IntegrationEventProperties;
|
||||||
|
import eu.eudat.service.user.UserService;
|
||||||
|
import gr.cite.commons.web.oidc.principal.CurrentPrincipalResolver;
|
||||||
|
import gr.cite.tools.auditing.AuditService;
|
||||||
|
import gr.cite.tools.data.query.QueryFactory;
|
||||||
|
import gr.cite.tools.fieldset.BaseFieldSet;
|
||||||
|
import gr.cite.tools.logging.LoggerService;
|
||||||
|
import gr.cite.tools.validation.ValidationService;
|
||||||
|
import jakarta.persistence.*;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.AbstractMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
public class UserTouchedIntegrationEventHandlerImpl implements UserTouchedIntegrationEventHandler {
|
||||||
|
private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(UserTouchedIntegrationEventHandlerImpl.class));
|
||||||
|
private final JsonHandlingService jsonHandlingService;
|
||||||
|
protected final ApplicationContext applicationContext;
|
||||||
|
|
||||||
|
public UserTouchedIntegrationEventHandlerImpl(
|
||||||
|
JsonHandlingService jsonHandlingService,
|
||||||
|
ApplicationContext applicationContext
|
||||||
|
) {
|
||||||
|
this.jsonHandlingService = jsonHandlingService;
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EventProcessingStatus handle(IntegrationEventProperties properties, String message) {
|
||||||
|
UserTouchedIntegrationEvent event = this.jsonHandlingService.fromJsonSafe(UserTouchedIntegrationEvent.class, message);
|
||||||
|
if (event == null) return EventProcessingStatus.Error;
|
||||||
|
|
||||||
|
// UserTouchedIntegrationEventPersist model = new UserTouchedIntegrationEventPersist();
|
||||||
|
// model.setId(event.getId());
|
||||||
|
// model.setFirstName(event.getFirstName());
|
||||||
|
// model.setLastName(event.getLastName());
|
||||||
|
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
// TenantScope scope = this.applicationContext.getBean(TenantScope.class);
|
||||||
|
// if (scope.isMultitenant() && event.getTenant() != null) {
|
||||||
|
// TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(event.getTenant()).firstAs(new BaseFieldSet().ensure(Tenant._id).ensure(Tenant._code));
|
||||||
|
// if (tenant == null) {
|
||||||
|
// logger.error("missing tenant from event message");
|
||||||
|
// return EventProcessingStatus.Error;
|
||||||
|
// }
|
||||||
|
// scope.setTenant(event.getTenant(), tenant.getCode());
|
||||||
|
// } else if (scope.isMultitenant()) {
|
||||||
|
// logger.error("missing tenant from event message");
|
||||||
|
// return EventProcessingStatus.Error;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// ValidationService validator = this.applicationContext.getBean(ValidationService.class);
|
||||||
|
// validator.validateForce(model);
|
||||||
|
|
||||||
|
|
||||||
|
CurrentPrincipalResolver currentPrincipalResolver = this.applicationContext.getBean(CurrentPrincipalResolver.class);
|
||||||
|
currentPrincipalResolver.push(InboxPrincipal.build(properties));
|
||||||
|
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
try {
|
||||||
|
UserService userService = this.applicationContext.getBean(UserService.class);
|
||||||
|
// userService.persist(model, null);
|
||||||
|
|
||||||
|
AuditService auditService = this.applicationContext.getBean(AuditService.class);
|
||||||
|
|
||||||
|
auditService.track(AuditableAction.User_Persist, Map.ofEntries(
|
||||||
|
// new AbstractMap.SimpleEntry<String, Object>("model", model)
|
||||||
|
));
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception e) {
|
||||||
|
transaction.rollback();
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
currentPrincipalResolver.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (OptimisticLockException ex) {
|
||||||
|
// we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working
|
||||||
|
this.logger.debug("Concurrency exception getting queue outbox. Skipping: {} ", ex.getMessage());
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package eu.eudat.integrationevent.outbox;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
import eu.eudat.integrationevent.TrackedEvent;
|
||||||
|
import gr.cite.rabbitmq.IntegrationEvent;
|
||||||
|
|
||||||
|
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
public class OutboxIntegrationEvent extends IntegrationEvent {
|
||||||
|
public static final String FORGET_ME_COMPLETED = "FORGET_ME_COMPLETED";
|
||||||
|
public static final String NOTIFY = "NOTIFY";
|
||||||
|
public static final String TENANT_REACTIVATE = "TENANT_REACTIVATE";
|
||||||
|
public static final String TENANT_REMOVE = "TENANT_REMOVE";
|
||||||
|
public static final String TENANT_TOUCH = "TENANT_TOUCH";
|
||||||
|
public static final String TENANT_USER_INVITE = "TENANT_USER_INVITE";
|
||||||
|
public static final String WHAT_YOU_KNOW_ABOUT_ME_COMPLETED = "WHAT_YOU_KNOW_ABOUT_ME_COMPLETED";
|
||||||
|
public static final String GENERATE_FILE = "GENERATE_FILE";
|
||||||
|
private TrackedEvent event;
|
||||||
|
|
||||||
|
public TrackedEvent getEvent() {
|
||||||
|
return event;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEvent(TrackedEvent event) {
|
||||||
|
this.event = event;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,89 @@
|
||||||
|
package eu.eudat.integrationevent.outbox;
|
||||||
|
|
||||||
|
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.boot.context.properties.bind.ConstructorBinding;
|
||||||
|
import org.springframework.validation.annotation.Validated;
|
||||||
|
|
||||||
|
import jakarta.validation.constraints.NotNull;
|
||||||
|
|
||||||
|
@Validated
|
||||||
|
@ConfigurationProperties(prefix = "queue.task.publisher.options")
|
||||||
|
//@ConstructorBinding
|
||||||
|
public class OutboxProperties {
|
||||||
|
@NotNull
|
||||||
|
private final String exchange;
|
||||||
|
@NotNull
|
||||||
|
private final String tenantTouchTopic;
|
||||||
|
@NotNull
|
||||||
|
private final String tenantRemovalTopic;
|
||||||
|
@NotNull
|
||||||
|
private final String tenantReactivationTopic;
|
||||||
|
@NotNull
|
||||||
|
private final String tenantUserInviteTopic;
|
||||||
|
@NotNull
|
||||||
|
private final String notifyTopic;
|
||||||
|
@NotNull
|
||||||
|
private final String forgetMeCompletedTopic;
|
||||||
|
@NotNull
|
||||||
|
private final String whatYouKnowAboutMeCompletedTopic;
|
||||||
|
@NotNull
|
||||||
|
private final String generateFileTopic;
|
||||||
|
|
||||||
|
public OutboxProperties(String exchange,
|
||||||
|
String tenantTouchTopic,
|
||||||
|
String tenantRemovalTopic,
|
||||||
|
String tenantReactivationTopic,
|
||||||
|
String tenantUserInviteTopic,
|
||||||
|
String notifyTopic,
|
||||||
|
String forgetMeCompletedTopic,
|
||||||
|
String whatYouKnowAboutMeCompletedTopic,
|
||||||
|
String generateFileTopic
|
||||||
|
) {
|
||||||
|
this.exchange = exchange;
|
||||||
|
this.tenantTouchTopic = tenantTouchTopic;
|
||||||
|
this.tenantRemovalTopic = tenantRemovalTopic;
|
||||||
|
this.tenantReactivationTopic = tenantReactivationTopic;
|
||||||
|
this.tenantUserInviteTopic = tenantUserInviteTopic;
|
||||||
|
this.notifyTopic = notifyTopic;
|
||||||
|
this.forgetMeCompletedTopic = forgetMeCompletedTopic;
|
||||||
|
this.whatYouKnowAboutMeCompletedTopic = whatYouKnowAboutMeCompletedTopic;
|
||||||
|
this.generateFileTopic = generateFileTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getExchange() {
|
||||||
|
return exchange;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTenantTouchTopic() {
|
||||||
|
return tenantTouchTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTenantRemovalTopic() {
|
||||||
|
return tenantRemovalTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTenantReactivationTopic() {
|
||||||
|
return tenantReactivationTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTenantUserInviteTopic() {
|
||||||
|
return tenantUserInviteTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNotifyTopic() {
|
||||||
|
return notifyTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getForgetMeCompletedTopic() {
|
||||||
|
return forgetMeCompletedTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getWhatYouKnowAboutMeCompletedTopic() {
|
||||||
|
return whatYouKnowAboutMeCompletedTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getGenerateFileTopic() {
|
||||||
|
return generateFileTopic;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,428 @@
|
||||||
|
package eu.eudat.integrationevent.outbox;
|
||||||
|
|
||||||
|
import eu.eudat.commons.JsonHandlingService;
|
||||||
|
import eu.eudat.commons.enums.IsActive;
|
||||||
|
import eu.eudat.commons.fake.FakeRequestScope;
|
||||||
|
import eu.eudat.data.QueueOutboxEntity;
|
||||||
|
import eu.eudat.query.QueueOutboxQuery;
|
||||||
|
import gr.cite.queueoutbox.entity.QueueOutbox;
|
||||||
|
import gr.cite.queueoutbox.entity.QueueOutboxNotifyStatus;
|
||||||
|
import gr.cite.queueoutbox.repository.CandidateInfo;
|
||||||
|
import gr.cite.queueoutbox.repository.OutboxRepository;
|
||||||
|
import gr.cite.queueoutbox.task.MessageOptions;
|
||||||
|
import gr.cite.rabbitmq.IntegrationEvent;
|
||||||
|
import gr.cite.tools.data.query.Ordering;
|
||||||
|
import gr.cite.tools.data.query.QueryFactory;
|
||||||
|
import gr.cite.tools.logging.LoggerService;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
|
||||||
|
import jakarta.persistence.EntityManager;
|
||||||
|
import jakarta.persistence.EntityManagerFactory;
|
||||||
|
import jakarta.persistence.EntityTransaction;
|
||||||
|
import jakarta.persistence.OptimisticLockException;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class OutboxRepositoryImpl implements OutboxRepository {
|
||||||
|
|
||||||
|
protected final ApplicationContext applicationContext;
|
||||||
|
private final Random random = new Random();
|
||||||
|
private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(OutboxRepositoryImpl.class));
|
||||||
|
private final JsonHandlingService jsonHandlingService;
|
||||||
|
private final OutboxProperties outboxProperties;
|
||||||
|
|
||||||
|
public OutboxRepositoryImpl(
|
||||||
|
ApplicationContext applicationContext,
|
||||||
|
OutboxProperties outboxProperties
|
||||||
|
) {
|
||||||
|
this.applicationContext = applicationContext;
|
||||||
|
this.jsonHandlingService = this.applicationContext.getBean(JsonHandlingService.class);
|
||||||
|
this.outboxProperties = outboxProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CandidateInfo candidate(Instant lastCandidateCreationTimestamp, MessageOptions messageOptions, Function<QueueOutbox, Boolean> onConfirmTimeout) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
CandidateInfo candidate = null;
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class)
|
||||||
|
.isActives(IsActive.Active)
|
||||||
|
.notifyStatus(QueueOutboxNotifyStatus.PENDING, QueueOutboxNotifyStatus.WAITING_CONFIRMATION, QueueOutboxNotifyStatus.ERROR)
|
||||||
|
.retryThreshold(messageOptions.getRetryThreashold())
|
||||||
|
.confirmTimeout(messageOptions.getConfirmTimeoutSeconds())
|
||||||
|
.createdAfter(lastCandidateCreationTimestamp)
|
||||||
|
.ordering(new Ordering().addAscending(QueueOutboxEntity._createdAt))
|
||||||
|
.first();
|
||||||
|
|
||||||
|
if (item != null) {
|
||||||
|
boolean confirmTimeout = onConfirmTimeout.apply(item);
|
||||||
|
|
||||||
|
QueueOutboxNotifyStatus prevState = item.getNotifyStatus();
|
||||||
|
item.setNotifyStatus(QueueOutboxNotifyStatus.PROCESSING);
|
||||||
|
|
||||||
|
entityManager.merge(item);
|
||||||
|
entityManager.flush();
|
||||||
|
|
||||||
|
candidate = new CandidateInfo();
|
||||||
|
candidate.setId(item.getId());
|
||||||
|
candidate.setCreatedAt(item.getCreatedAt());
|
||||||
|
candidate.setPreviousState(prevState);
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (OptimisticLockException ex) {
|
||||||
|
// we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working
|
||||||
|
this.logger.debug("Concurrency exception getting queue outbox. Skipping: {} ", ex.getMessage());
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
candidate = null;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
candidate = null;
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
return candidate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean shouldOmit(CandidateInfo candidate, Function<QueueOutbox, Boolean> shouldOmit) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
Boolean success = false;
|
||||||
|
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidate.getId()).first();
|
||||||
|
|
||||||
|
if (item == null) {
|
||||||
|
this.logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidate.getId());
|
||||||
|
} else {
|
||||||
|
if (shouldOmit.apply(item)) {
|
||||||
|
item.setNotifyStatus(QueueOutboxNotifyStatus.OMITTED);
|
||||||
|
|
||||||
|
entityManager.merge(item);
|
||||||
|
entityManager.flush();
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
success = false;
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean shouldWait(CandidateInfo candidate, Function<QueueOutbox, Boolean> itIsTimeFunc) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
Boolean success = false;
|
||||||
|
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidate.getId()).first();
|
||||||
|
|
||||||
|
if (item.getRetryCount() != null && item.getRetryCount() >= 1) {
|
||||||
|
Boolean itIsTime = itIsTimeFunc.apply(item);
|
||||||
|
|
||||||
|
if (!itIsTime) {
|
||||||
|
item.setNotifyStatus(candidate.getPreviousState());
|
||||||
|
|
||||||
|
entityManager.merge(item);
|
||||||
|
entityManager.flush();
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
success = !itIsTime;
|
||||||
|
}
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
success = false;
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean process(CandidateInfo candidateInfo, Boolean isAutoconfirmOnPublish, Function<QueueOutbox, Boolean> publish) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
Boolean success = false;
|
||||||
|
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidateInfo.getId()).first();
|
||||||
|
|
||||||
|
if (item == null) {
|
||||||
|
this.logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidateInfo.getId());
|
||||||
|
} else {
|
||||||
|
|
||||||
|
success = publish.apply(item);
|
||||||
|
if (success) {
|
||||||
|
if (isAutoconfirmOnPublish) {
|
||||||
|
item.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED);
|
||||||
|
item.setConfirmedAt(Instant.now());
|
||||||
|
} else {
|
||||||
|
item.setNotifyStatus(QueueOutboxNotifyStatus.WAITING_CONFIRMATION);
|
||||||
|
}
|
||||||
|
item.setPublishedAt(Instant.now());
|
||||||
|
} else {
|
||||||
|
item.setNotifyStatus(QueueOutboxNotifyStatus.ERROR);
|
||||||
|
item.setRetryCount(item.getRetryCount() != null ? item.getRetryCount() + 1 : 0);
|
||||||
|
item.setPublishedAt(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
entityManager.merge(item);
|
||||||
|
entityManager.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
success = false;
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleConfirm(List<UUID> confirmedMessages) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
List<QueueOutboxEntity> queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(confirmedMessages).collect();
|
||||||
|
|
||||||
|
if (queueOutboxMessages == null) {
|
||||||
|
this.logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", confirmedMessages.stream().map(x -> x.toString()).collect(Collectors.toList())));
|
||||||
|
} else {
|
||||||
|
|
||||||
|
for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) {
|
||||||
|
queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED);
|
||||||
|
queueOutboxMessage.setConfirmedAt(Instant.now());
|
||||||
|
entityManager.merge(queueOutboxMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
entityManager.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleNack(List<UUID> nackedMessages) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
|
||||||
|
List<QueueOutboxEntity> queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(nackedMessages).collect();
|
||||||
|
|
||||||
|
if (queueOutboxMessages == null) {
|
||||||
|
this.logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", nackedMessages.stream().map(x -> x.toString()).collect(Collectors.toList())));
|
||||||
|
} else {
|
||||||
|
|
||||||
|
for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) {
|
||||||
|
queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.ERROR);
|
||||||
|
queueOutboxMessage.setRetryCount(queueOutboxMessage.getRetryCount() != null ? queueOutboxMessage.getRetryCount() + 1 : 0);
|
||||||
|
entityManager.merge(queueOutboxMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
entityManager.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueOutbox create(IntegrationEvent item) {
|
||||||
|
EntityTransaction transaction = null;
|
||||||
|
EntityManager entityManager = null;
|
||||||
|
Boolean success = false;
|
||||||
|
QueueOutboxEntity queueMessage = null;
|
||||||
|
try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) {
|
||||||
|
try {
|
||||||
|
queueMessage = this.mapEvent((OutboxIntegrationEvent) item);
|
||||||
|
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
|
||||||
|
|
||||||
|
entityManager = entityManagerFactory.createEntityManager();
|
||||||
|
transaction = entityManager.getTransaction();
|
||||||
|
|
||||||
|
transaction.begin();
|
||||||
|
|
||||||
|
entityManager.persist(queueMessage);
|
||||||
|
entityManager.flush();
|
||||||
|
|
||||||
|
transaction.commit();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
if (transaction != null) transaction.rollback();
|
||||||
|
success = false;
|
||||||
|
} finally {
|
||||||
|
if (entityManager != null) entityManager.close();
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
|
||||||
|
}
|
||||||
|
return queueMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
private QueueOutboxEntity mapEvent(OutboxIntegrationEvent event) {
|
||||||
|
String routingKey;
|
||||||
|
switch (event.getType()) {
|
||||||
|
case OutboxIntegrationEvent.TENANT_REACTIVATE: {
|
||||||
|
routingKey = this.outboxProperties.getTenantReactivationTopic();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case OutboxIntegrationEvent.TENANT_REMOVE: {
|
||||||
|
routingKey = this.outboxProperties.getTenantRemovalTopic();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case OutboxIntegrationEvent.TENANT_TOUCH: {
|
||||||
|
routingKey = this.outboxProperties.getTenantTouchTopic();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case OutboxIntegrationEvent.TENANT_USER_INVITE: {
|
||||||
|
routingKey = this.outboxProperties.getTenantUserInviteTopic();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case OutboxIntegrationEvent.FORGET_ME_COMPLETED: {
|
||||||
|
routingKey = this.outboxProperties.getForgetMeCompletedTopic();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case OutboxIntegrationEvent.NOTIFY: {
|
||||||
|
routingKey = this.outboxProperties.getNotifyTopic();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case OutboxIntegrationEvent.WHAT_YOU_KNOW_ABOUT_ME_COMPLETED: {
|
||||||
|
routingKey = this.outboxProperties.getWhatYouKnowAboutMeCompletedTopic();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case OutboxIntegrationEvent.GENERATE_FILE: {
|
||||||
|
routingKey = this.outboxProperties.getGenerateFileTopic();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
logger.error("unrecognized outgoing integration event {}. Skipping...", event.getType());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
UUID correlationId = UUID.randomUUID();
|
||||||
|
if (event.getEvent() != null) event.getEvent().setTrackingContextTag(correlationId.toString());
|
||||||
|
event.setMessage(this.jsonHandlingService.toJsonSafe(event.getEvent()));
|
||||||
|
//this._logTrackingService.Trace(correlationId.ToString(), $"Correlating current tracking context with new correlationId {correlationId}");
|
||||||
|
|
||||||
|
QueueOutboxEntity queueMessage = new QueueOutboxEntity();
|
||||||
|
queueMessage.setId(UUID.randomUUID());
|
||||||
|
queueMessage.setTenantId(null);
|
||||||
|
queueMessage.setExchange(this.outboxProperties.getExchange());
|
||||||
|
queueMessage.setRoute(routingKey);
|
||||||
|
queueMessage.setMessageId(event.getMessageId());
|
||||||
|
queueMessage.setMessage(this.jsonHandlingService.toJsonSafe(event));
|
||||||
|
queueMessage.setIsActive(IsActive.Active);
|
||||||
|
queueMessage.setNotifyStatus(QueueOutboxNotifyStatus.PENDING);
|
||||||
|
queueMessage.setRetryCount(0);
|
||||||
|
queueMessage.setCreatedAt(Instant.now());
|
||||||
|
queueMessage.setUpdatedAt(Instant.now());
|
||||||
|
|
||||||
|
return queueMessage;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
package eu.eudat.integrationevent.outbox;
|
||||||
|
|
||||||
|
public interface OutboxService {
|
||||||
|
void publish(OutboxIntegrationEvent event);
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
package eu.eudat.integrationevent.outbox;
|
||||||
|
|
||||||
|
|
||||||
|
import eu.eudat.commons.JsonHandlingService;
|
||||||
|
import gr.cite.tools.logging.LoggerService;
|
||||||
|
import gr.cite.tools.logging.MapLogEntry;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.context.annotation.RequestScope;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@RequestScope
|
||||||
|
public class OutboxServiceImpl implements OutboxService {
|
||||||
|
private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(OutboxServiceImpl.class));
|
||||||
|
|
||||||
|
private final OutboxProperties config;
|
||||||
|
private final JsonHandlingService jsonHandlingService;
|
||||||
|
private final ApplicationEventPublisher eventPublisher;
|
||||||
|
|
||||||
|
public OutboxServiceImpl(
|
||||||
|
OutboxProperties config,
|
||||||
|
JsonHandlingService jsonHandlingService,
|
||||||
|
ApplicationEventPublisher eventPublisher
|
||||||
|
) {
|
||||||
|
this.config = config;
|
||||||
|
this.jsonHandlingService = jsonHandlingService;
|
||||||
|
this.eventPublisher = eventPublisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void publish(OutboxIntegrationEvent event) {
|
||||||
|
try {
|
||||||
|
eventPublisher.publishEvent(event);
|
||||||
|
return;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
logger.error(new MapLogEntry(String.format("Could not save message ", event.getMessage())).And("message", event.getMessage()).And("ex", ex));
|
||||||
|
//Still want to skip it from processing
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,219 @@
|
||||||
|
package eu.eudat.query;
|
||||||
|
|
||||||
|
|
||||||
|
import eu.eudat.commons.enums.IsActive;
|
||||||
|
import eu.eudat.data.QueueInboxEntity;
|
||||||
|
import gr.cite.queueinbox.entity.QueueInboxStatus;
|
||||||
|
import gr.cite.tools.data.query.FieldResolver;
|
||||||
|
import gr.cite.tools.data.query.Ordering;
|
||||||
|
import gr.cite.tools.data.query.QueryBase;
|
||||||
|
import gr.cite.tools.data.query.QueryContext;
|
||||||
|
import jakarta.persistence.Tuple;
|
||||||
|
import jakarta.persistence.criteria.CriteriaBuilder;
|
||||||
|
import jakarta.persistence.criteria.Predicate;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
public class QueueInboxQuery extends QueryBase<QueueInboxEntity> {
|
||||||
|
|
||||||
|
private Collection<UUID> ids;
|
||||||
|
private Instant createdAfter;
|
||||||
|
private Collection<IsActive> isActives;
|
||||||
|
private Collection<String> exchanges;
|
||||||
|
private Collection<String> routes;
|
||||||
|
private Collection<QueueInboxStatus> status;
|
||||||
|
private Integer retryThreshold;
|
||||||
|
|
||||||
|
public QueueInboxQuery ids(UUID value) {
|
||||||
|
this.ids = List.of(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery ids(UUID... value) {
|
||||||
|
this.ids = Arrays.asList(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery ids(List<UUID> value) {
|
||||||
|
this.ids = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery isActives(IsActive value) {
|
||||||
|
this.isActives = List.of(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery isActives(IsActive... value) {
|
||||||
|
this.isActives = Arrays.asList(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery isActives(List<IsActive> value) {
|
||||||
|
this.isActives = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery exchanges(String value) {
|
||||||
|
this.exchanges = List.of(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery exchanges(String... value) {
|
||||||
|
this.exchanges = Arrays.asList(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery exchanges(List<String> value) {
|
||||||
|
this.exchanges = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery routes(String value) {
|
||||||
|
this.routes = List.of(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery routes(String... value) {
|
||||||
|
this.routes = Arrays.asList(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery routes(List<String> value) {
|
||||||
|
this.routes = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery status(QueueInboxStatus value) {
|
||||||
|
this.status = List.of(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery status(QueueInboxStatus... value) {
|
||||||
|
this.status = Arrays.asList(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery status(List<QueueInboxStatus> value) {
|
||||||
|
this.status = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery createdAfter(Instant value) {
|
||||||
|
this.createdAfter = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery retryThreshold(Integer value) {
|
||||||
|
this.retryThreshold = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueInboxQuery ordering(Ordering ordering) {
|
||||||
|
this.setOrder(ordering);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Class<QueueInboxEntity> entityClass() {
|
||||||
|
return QueueInboxEntity.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Boolean isFalseQuery() {
|
||||||
|
return this.isEmpty(this.ids) || this.isEmpty(this.isActives) || this.isEmpty(this.exchanges)
|
||||||
|
|| this.isEmpty(this.routes) || this.isEmpty(this.status);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected <X, Y> Predicate applyFilters(QueryContext<X, Y> queryContext) {
|
||||||
|
List<Predicate> predicates = new ArrayList<>();
|
||||||
|
if (this.ids != null) {
|
||||||
|
CriteriaBuilder.In<UUID> inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueInboxEntity._id));
|
||||||
|
for (UUID item : this.ids) inClause.value(item);
|
||||||
|
predicates.add(inClause);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.createdAfter != null) {
|
||||||
|
predicates.add(queryContext.CriteriaBuilder.greaterThan(queryContext.Root.get(QueueInboxEntity._createdAt), this.createdAfter));
|
||||||
|
}
|
||||||
|
if (this.isActives != null) {
|
||||||
|
CriteriaBuilder.In<IsActive> inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueInboxEntity._isActive));
|
||||||
|
for (IsActive item : this.isActives) inClause.value(item);
|
||||||
|
predicates.add(inClause);
|
||||||
|
}
|
||||||
|
if (this.exchanges != null) {
|
||||||
|
CriteriaBuilder.In<String> inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueInboxEntity._exchange));
|
||||||
|
for (String item : this.exchanges) inClause.value(item);
|
||||||
|
predicates.add(inClause);
|
||||||
|
}
|
||||||
|
if (this.routes != null) {
|
||||||
|
CriteriaBuilder.In<String> inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueInboxEntity._route));
|
||||||
|
for (String item : this.routes) inClause.value(item);
|
||||||
|
predicates.add(inClause);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.status != null) {
|
||||||
|
CriteriaBuilder.In<QueueInboxStatus> inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueInboxEntity._status));
|
||||||
|
for (QueueInboxStatus item : this.status) inClause.value(item);
|
||||||
|
predicates.add(inClause);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.retryThreshold != null) {
|
||||||
|
predicates.add(queryContext.CriteriaBuilder.or(queryContext.CriteriaBuilder.isNull(queryContext.Root.get(QueueInboxEntity._retryCount)),
|
||||||
|
queryContext.CriteriaBuilder.lessThanOrEqualTo(queryContext.Root.get(QueueInboxEntity._retryCount), this.retryThreshold)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (predicates.size() > 0) {
|
||||||
|
Predicate[] predicatesArray = predicates.toArray(new Predicate[0]);
|
||||||
|
return queryContext.CriteriaBuilder.and(predicatesArray);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected QueueInboxEntity convert(Tuple tuple, Set<String> columns) {
|
||||||
|
QueueInboxEntity item = new QueueInboxEntity();
|
||||||
|
item.setId(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._id, UUID.class));
|
||||||
|
item.setExchange(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._exchange, String.class));
|
||||||
|
item.setTenantId(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._tenantId, UUID.class));
|
||||||
|
item.setRoute(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._route, String.class));
|
||||||
|
item.setMessage(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._message, String.class));
|
||||||
|
item.setMessageId(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._messageId, UUID.class));
|
||||||
|
item.setCreatedAt(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._createdAt, Instant.class));
|
||||||
|
item.setIsActive(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._isActive, IsActive.class));
|
||||||
|
item.setStatus(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._status, QueueInboxStatus.class));
|
||||||
|
item.setRetryCount(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._retryCount, Integer.class));
|
||||||
|
item.setQueue(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._queue, String.class));
|
||||||
|
item.setApplicationId(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._applicationId, String.class));
|
||||||
|
item.setCreatedAt(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._createdAt, Instant.class));
|
||||||
|
item.setUpdatedAt(QueryBase.convertSafe(tuple, columns, QueueInboxEntity._updatedAt, Instant.class));
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String fieldNameOf(FieldResolver item) {
|
||||||
|
if (item.match(QueueInboxEntity._id)) return QueueInboxEntity._id;
|
||||||
|
else if (item.match(QueueInboxEntity._exchange)) return QueueInboxEntity._exchange;
|
||||||
|
else if (item.match(QueueInboxEntity._tenantId)) return QueueInboxEntity._tenantId;
|
||||||
|
else if (item.match(QueueInboxEntity._route)) return QueueInboxEntity._route;
|
||||||
|
else if (item.match(QueueInboxEntity._message)) return QueueInboxEntity._message;
|
||||||
|
else if (item.match(QueueInboxEntity._messageId)) return QueueInboxEntity._messageId;
|
||||||
|
else if (item.match(QueueInboxEntity._createdAt)) return QueueInboxEntity._createdAt;
|
||||||
|
else if (item.match(QueueInboxEntity._isActive)) return QueueInboxEntity._isActive;
|
||||||
|
else if (item.match(QueueInboxEntity._status)) return QueueInboxEntity._status;
|
||||||
|
else if (item.match(QueueInboxEntity._retryCount)) return QueueInboxEntity._retryCount;
|
||||||
|
else if (item.match(QueueInboxEntity._queue)) return QueueInboxEntity._queue;
|
||||||
|
else if (item.match(QueueInboxEntity._applicationId)) return QueueInboxEntity._applicationId;
|
||||||
|
else if (item.match(QueueInboxEntity._createdAt)) return QueueInboxEntity._createdAt;
|
||||||
|
else if (item.match(QueueInboxEntity._updatedAt)) return QueueInboxEntity._updatedAt;
|
||||||
|
else return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,236 @@
|
||||||
|
package eu.eudat.query;
|
||||||
|
|
||||||
|
|
||||||
|
import eu.eudat.commons.enums.IsActive;
|
||||||
|
import eu.eudat.data.QueueOutboxEntity;
|
||||||
|
import gr.cite.queueoutbox.entity.QueueOutboxNotifyStatus;
|
||||||
|
import gr.cite.tools.data.query.FieldResolver;
|
||||||
|
import gr.cite.tools.data.query.Ordering;
|
||||||
|
import gr.cite.tools.data.query.QueryBase;
|
||||||
|
import gr.cite.tools.data.query.QueryContext;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import jakarta.persistence.Tuple;
|
||||||
|
import jakarta.persistence.criteria.CriteriaBuilder;
|
||||||
|
import jakarta.persistence.criteria.Predicate;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
public class QueueOutboxQuery extends QueryBase<QueueOutboxEntity> {
|
||||||
|
|
||||||
|
private Collection<UUID> ids;
|
||||||
|
private Instant createdAfter;
|
||||||
|
private Collection<IsActive> isActives;
|
||||||
|
private Collection<String> exchanges;
|
||||||
|
private Collection<String> routes;
|
||||||
|
private Collection<QueueOutboxNotifyStatus> notifyStatus;
|
||||||
|
private Integer retryThreshold;
|
||||||
|
private Integer confirmTimeout;
|
||||||
|
|
||||||
|
public QueueOutboxQuery ids(UUID value) {
|
||||||
|
this.ids = List.of(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery ids(UUID... value) {
|
||||||
|
this.ids = Arrays.asList(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery ids(List<UUID> value) {
|
||||||
|
this.ids = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery isActives(IsActive value) {
|
||||||
|
this.isActives = List.of(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery isActives(IsActive... value) {
|
||||||
|
this.isActives = Arrays.asList(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery isActives(List<IsActive> value) {
|
||||||
|
this.isActives = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery exchanges(String value) {
|
||||||
|
this.exchanges = List.of(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery exchanges(String... value) {
|
||||||
|
this.exchanges = Arrays.asList(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery exchanges(List<String> value) {
|
||||||
|
this.exchanges = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery routes(String value) {
|
||||||
|
this.routes = List.of(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery routes(String... value) {
|
||||||
|
this.routes = Arrays.asList(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery routes(List<String> value) {
|
||||||
|
this.routes = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery notifyStatus(QueueOutboxNotifyStatus value) {
|
||||||
|
this.notifyStatus = List.of(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery notifyStatus(QueueOutboxNotifyStatus... value) {
|
||||||
|
this.notifyStatus = Arrays.asList(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery notifyStatus(List<QueueOutboxNotifyStatus> value) {
|
||||||
|
this.notifyStatus = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery createdAfter(Instant value) {
|
||||||
|
this.createdAfter = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery retryThreshold(Integer value) {
|
||||||
|
this.retryThreshold = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery confirmTimeout(Integer value) {
|
||||||
|
this.confirmTimeout = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueOutboxQuery ordering(Ordering ordering) {
|
||||||
|
this.setOrder(ordering);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Class<QueueOutboxEntity> entityClass() {
|
||||||
|
return QueueOutboxEntity.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Boolean isFalseQuery() {
|
||||||
|
return this.isEmpty(this.ids) || this.isEmpty(this.isActives) || this.isEmpty(this.exchanges)
|
||||||
|
|| this.isEmpty(this.routes) || this.isEmpty(this.notifyStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected <X, Y> Predicate applyFilters(QueryContext<X, Y> queryContext) {
|
||||||
|
List<Predicate> predicates = new ArrayList<>();
|
||||||
|
if (this.ids != null) {
|
||||||
|
CriteriaBuilder.In<UUID> inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueOutboxEntity._id));
|
||||||
|
for (UUID item : this.ids) inClause.value(item);
|
||||||
|
predicates.add(inClause);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.createdAfter != null) {
|
||||||
|
predicates.add(queryContext.CriteriaBuilder.greaterThan(queryContext.Root.get(QueueOutboxEntity._createdAt), this.createdAfter));
|
||||||
|
}
|
||||||
|
if (this.isActives != null) {
|
||||||
|
CriteriaBuilder.In<IsActive> inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueOutboxEntity._isActive));
|
||||||
|
for (IsActive item : this.isActives) inClause.value(item);
|
||||||
|
predicates.add(inClause);
|
||||||
|
}
|
||||||
|
if (this.exchanges != null) {
|
||||||
|
CriteriaBuilder.In<String> inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueOutboxEntity._exchange));
|
||||||
|
for (String item : this.exchanges) inClause.value(item);
|
||||||
|
predicates.add(inClause);
|
||||||
|
}
|
||||||
|
if (this.routes != null) {
|
||||||
|
CriteriaBuilder.In<String> inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueOutboxEntity._route));
|
||||||
|
for (String item : this.routes) inClause.value(item);
|
||||||
|
predicates.add(inClause);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.notifyStatus != null) {
|
||||||
|
CriteriaBuilder.In<QueueOutboxNotifyStatus> inClause = queryContext.CriteriaBuilder.in(queryContext.Root.get(QueueOutboxEntity._notifyStatus));
|
||||||
|
for (QueueOutboxNotifyStatus item : this.notifyStatus) inClause.value(item);
|
||||||
|
predicates.add(inClause);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.retryThreshold != null) {
|
||||||
|
predicates.add(queryContext.CriteriaBuilder.or(queryContext.CriteriaBuilder.isNull(queryContext.Root.get(QueueOutboxEntity._retryCount)),
|
||||||
|
queryContext.CriteriaBuilder.lessThanOrEqualTo(queryContext.Root.get(QueueOutboxEntity._retryCount), this.retryThreshold)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.confirmTimeout != null) {
|
||||||
|
predicates.add(queryContext.CriteriaBuilder.or(queryContext.CriteriaBuilder.isNull(queryContext.Root.get(QueueOutboxEntity._publishedAt)),
|
||||||
|
queryContext.CriteriaBuilder.and(
|
||||||
|
queryContext.CriteriaBuilder.isNotNull(queryContext.Root.get(QueueOutboxEntity._publishedAt)),
|
||||||
|
queryContext.CriteriaBuilder.isNull(queryContext.Root.get(QueueOutboxEntity._confirmedAt)),
|
||||||
|
queryContext.CriteriaBuilder.lessThan(queryContext.Root.get(QueueOutboxEntity._publishedAt), Instant.now().minusSeconds(this.confirmTimeout))
|
||||||
|
)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (predicates.size() > 0) {
|
||||||
|
Predicate[] predicatesArray = predicates.toArray(new Predicate[0]);
|
||||||
|
return queryContext.CriteriaBuilder.and(predicatesArray);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected QueueOutboxEntity convert(Tuple tuple, Set<String> columns) {
|
||||||
|
QueueOutboxEntity item = new QueueOutboxEntity();
|
||||||
|
item.setId(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._id, UUID.class));
|
||||||
|
item.setExchange(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._exchange, String.class));
|
||||||
|
item.setTenantId(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._tenantId, UUID.class));
|
||||||
|
item.setRoute(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._route, String.class));
|
||||||
|
item.setMessage(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._message, String.class));
|
||||||
|
item.setMessageId(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._messageId, UUID.class));
|
||||||
|
item.setCreatedAt(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._createdAt, Instant.class));
|
||||||
|
item.setIsActive(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._isActive, IsActive.class));
|
||||||
|
item.setNotifyStatus(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._notifyStatus, QueueOutboxNotifyStatus.class));
|
||||||
|
item.setRetryCount(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._retryCount, Integer.class));
|
||||||
|
item.setPublishedAt(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._publishedAt, Instant.class));
|
||||||
|
item.setConfirmedAt(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._confirmedAt, Instant.class));
|
||||||
|
item.setCreatedAt(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._createdAt, Instant.class));
|
||||||
|
item.setUpdatedAt(QueryBase.convertSafe(tuple, columns, QueueOutboxEntity._updatedAt, Instant.class));
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String fieldNameOf(FieldResolver item) {
|
||||||
|
if (item.match(QueueOutboxEntity._id)) return QueueOutboxEntity._id;
|
||||||
|
else if (item.match(QueueOutboxEntity._exchange)) return QueueOutboxEntity._exchange;
|
||||||
|
else if (item.match(QueueOutboxEntity._tenantId)) return QueueOutboxEntity._tenantId;
|
||||||
|
else if (item.match(QueueOutboxEntity._route)) return QueueOutboxEntity._route;
|
||||||
|
else if (item.match(QueueOutboxEntity._message)) return QueueOutboxEntity._message;
|
||||||
|
else if (item.match(QueueOutboxEntity._messageId)) return QueueOutboxEntity._messageId;
|
||||||
|
else if (item.match(QueueOutboxEntity._createdAt)) return QueueOutboxEntity._createdAt;
|
||||||
|
else if (item.match(QueueOutboxEntity._isActive)) return QueueOutboxEntity._isActive;
|
||||||
|
else if (item.match(QueueOutboxEntity._notifyStatus)) return QueueOutboxEntity._notifyStatus;
|
||||||
|
else if (item.match(QueueOutboxEntity._retryCount)) return QueueOutboxEntity._retryCount;
|
||||||
|
else if (item.match(QueueOutboxEntity._publishedAt)) return QueueOutboxEntity._publishedAt;
|
||||||
|
else if (item.match(QueueOutboxEntity._confirmedAt)) return QueueOutboxEntity._confirmedAt;
|
||||||
|
else if (item.match(QueueOutboxEntity._createdAt)) return QueueOutboxEntity._createdAt;
|
||||||
|
else if (item.match(QueueOutboxEntity._updatedAt)) return QueueOutboxEntity._updatedAt;
|
||||||
|
else return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -355,6 +355,23 @@
|
||||||
<version>1.0.0</version>
|
<version>1.0.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>gr.cite</groupId>
|
||||||
|
<artifactId>queue-inbox</artifactId>
|
||||||
|
<version>1.0.0</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>gr.cite</groupId>
|
||||||
|
<artifactId>queue-outbox</artifactId>
|
||||||
|
<version>1.0.0</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>gr.cite</groupId>
|
||||||
|
<artifactId>rabbitmq-core</artifactId>
|
||||||
|
<version>1.0.0</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<profiles>
|
<profiles>
|
||||||
|
|
|
@ -25,5 +25,6 @@ spring:
|
||||||
optional:classpath:config/errors.yml[.yml], optional:classpath:config/errors-${spring.profiles.active}.yml[.yml], optional:file:../config/errors-${spring.profiles.active}.yml[.yml],
|
optional:classpath:config/errors.yml[.yml], optional:classpath:config/errors-${spring.profiles.active}.yml[.yml], optional:file:../config/errors-${spring.profiles.active}.yml[.yml],
|
||||||
optional:classpath:config/storage.yml[.yml], optional:classpath:config/storage-${spring.profiles.active}.yml[.yml], optional:file:../config/storage-${spring.profiles.active}.yml[.yml],
|
optional:classpath:config/storage.yml[.yml], optional:classpath:config/storage-${spring.profiles.active}.yml[.yml], optional:file:../config/storage-${spring.profiles.active}.yml[.yml],
|
||||||
optional:classpath:config/reference-type.yml[.yml], optional:classpath:config/reference-type-${spring.profiles.active}.yml[.yml], optional:file:../config/reference-type-${spring.profiles.active}.yml[.yml],
|
optional:classpath:config/reference-type.yml[.yml], optional:classpath:config/reference-type-${spring.profiles.active}.yml[.yml], optional:file:../config/reference-type-${spring.profiles.active}.yml[.yml],
|
||||||
optional:classpath:config/tenant.yml[.yml], optional:classpath:config/tenant-${spring.profiles.active}.yml[.yml], optional:file:../config/tenant-${spring.profiles.active}.yml[.yml]
|
optional:classpath:config/tenant.yml[.yml], optional:classpath:config/tenant-${spring.profiles.active}.yml[.yml], optional:file:../config/tenant-${spring.profiles.active}.yml[.yml],
|
||||||
|
optional:classpath:config/queue.yml[.yml], optional:classpath:config/queue-${spring.profiles.active}.yml[.yml], optional:file:../config/queue-${spring.profiles.active}.yml[.yml]
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
queue:
|
||||||
|
rabbitmq:
|
||||||
|
durable: true
|
||||||
|
queue: cite_dmp_devel_web_inbox_queue
|
||||||
|
exchange: cite_dmp_devel_queue
|
||||||
|
listenerEnabled: true
|
||||||
|
publisherEnabled: true
|
||||||
|
task:
|
||||||
|
publisher:
|
||||||
|
options:
|
||||||
|
exchange: cite_dmp_devel_queue
|
||||||
|
rabbitmq:
|
||||||
|
enable: true
|
||||||
|
listener:
|
||||||
|
options:
|
||||||
|
exchange: cite_dmp_devel_queue
|
||||||
|
rabbitmq:
|
||||||
|
enable: true
|
|
@ -0,0 +1,58 @@
|
||||||
|
spring:
|
||||||
|
rabbitmq:
|
||||||
|
host: ${RABBIT_HOST}
|
||||||
|
port: ${RABBIT_PORT}
|
||||||
|
username: ${RABBIT_USER}
|
||||||
|
password: ${RABBIT_PASS}
|
||||||
|
ssl:
|
||||||
|
enabled: false
|
||||||
|
queue:
|
||||||
|
rabbitmq:
|
||||||
|
enable: true
|
||||||
|
app-id: ${THE_API_ID}
|
||||||
|
durable: null
|
||||||
|
queue: null
|
||||||
|
exchange: null
|
||||||
|
listenerEnabled: true
|
||||||
|
publisherEnabled: true
|
||||||
|
#TODO
|
||||||
|
connection-recovery:
|
||||||
|
enable: true
|
||||||
|
network-recovery-interval: 5000
|
||||||
|
unreachable-recovery-interval: 5000
|
||||||
|
task:
|
||||||
|
publisher:
|
||||||
|
enable: true
|
||||||
|
options:
|
||||||
|
exchange: null
|
||||||
|
forget-me-completed-topic: forgetme.completed
|
||||||
|
notify-topic: notification.notify
|
||||||
|
tenant-reactivation-topic: tenant.reactivated
|
||||||
|
tenant-removal-topic: tenant.remove
|
||||||
|
tenant-touch-topic: tenant.touch
|
||||||
|
tenant-user-invite-topic: tenant.invite
|
||||||
|
what-you-know-about-me-completed-topic: whatyouknowaboutme.completed
|
||||||
|
generate-file-topic: generate.file
|
||||||
|
rabbitmq:
|
||||||
|
enable: true
|
||||||
|
interval-seconds: 30
|
||||||
|
options:
|
||||||
|
retry-threashold: 100
|
||||||
|
retry-delay-step-seconds: 300
|
||||||
|
max-retry-delay-seconds: 10800
|
||||||
|
too-old-to-send-seconds: 604800
|
||||||
|
confirm-timeout-seconds: 30
|
||||||
|
listener:
|
||||||
|
enable: true
|
||||||
|
options:
|
||||||
|
exchange: null
|
||||||
|
user-removal-topic: [ "user.remove" ]
|
||||||
|
user-touched-topic: [ "user.touch" ]
|
||||||
|
rabbitmq:
|
||||||
|
enable: true
|
||||||
|
interval-seconds: 30
|
||||||
|
options:
|
||||||
|
retry-threashold: 100
|
||||||
|
retry-delay-step-seconds: 300
|
||||||
|
max-retry-delay-seconds: 10800
|
||||||
|
too-old-to-send-seconds: 604800
|
Loading…
Reference in New Issue