Adding outbox events for users on the main app

This commit is contained in:
Thomas Georgios Giannos 2024-01-26 17:42:45 +02:00
parent f58832e3e9
commit a55d7c3692
16 changed files with 792 additions and 397 deletions

View File

@ -7,26 +7,27 @@ import eu.eudat.data.converters.enums.DatabaseEnum;
import java.util.Map; import java.util.Map;
public enum NotificationContactType implements DatabaseEnum<Short> { public enum NotificationContactType implements DatabaseEnum<Short> {
EMAIL((short)0),
SLACK_BROADCAST((short)1),
SMS((short)2),
IN_APP((short)3);
private final Short value; EMAIL((short) 0),
SLACK_BROADCAST((short) 1),
SMS((short) 2),
IN_APP((short) 3);
NotificationContactType(Short value) { private final Short value;
this.value = value;
}
@JsonValue NotificationContactType(Short value) {
public Short getValue() { this.value = value;
return value; }
}
private static final Map<Short, NotificationContactType> map = EnumUtils.getEnumValueMap(NotificationContactType.class); @JsonValue
public Short getValue() {
return value;
}
public static NotificationContactType of(Short i) { private static final Map<Short, NotificationContactType> map = EnumUtils.getEnumValueMap(NotificationContactType.class);
return map.get(i);
} public static NotificationContactType of(Short i) {
return map.get(i);
}
} }

View File

@ -0,0 +1,30 @@
package eu.eudat.commons.enums.notification;
import com.fasterxml.jackson.annotation.JsonValue;
import eu.eudat.commons.enums.EnumUtils;
import eu.eudat.data.converters.enums.DatabaseEnum;
import java.util.Map;
public enum TentativeUserProfile implements DatabaseEnum<Short> {
Complete((short) 0), Tentative((short) 1);
private final Short value;
TentativeUserProfile(Short value) {
this.value = value;
}
@JsonValue
public Short getValue() {
return value;
}
private static final Map<Short, TentativeUserProfile> map = EnumUtils.getEnumValueMap(TentativeUserProfile.class);
public static TentativeUserProfile of(Short i) {
return map.get(i);
}
}

View File

@ -0,0 +1,30 @@
package eu.eudat.commons.enums.notification;
import com.fasterxml.jackson.annotation.JsonValue;
import eu.eudat.commons.enums.EnumUtils;
import eu.eudat.data.converters.enums.DatabaseEnum;
import java.util.Map;
public enum UserType implements DatabaseEnum<Short> {
Person((short) 0), Service((short) 1);
private final Short value;
UserType(Short value) {
this.value = value;
}
@JsonValue
public Short getValue() {
return value;
}
private static final Map<Short, UserType> map = EnumUtils.getEnumValueMap(UserType.class);
public static UserType of(Short i) {
return map.get(i);
}
}

View File

@ -4,24 +4,36 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import eu.eudat.integrationevent.TrackedEvent; import eu.eudat.integrationevent.TrackedEvent;
import gr.cite.rabbitmq.IntegrationEvent; import gr.cite.rabbitmq.IntegrationEvent;
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class OutboxIntegrationEvent extends IntegrationEvent { public class OutboxIntegrationEvent extends IntegrationEvent {
public static final String FORGET_ME_COMPLETED = "FORGET_ME_COMPLETED";
public static final String NOTIFY = "NOTIFY";
public static final String TENANT_REACTIVATE = "TENANT_REACTIVATE";
public static final String TENANT_REMOVE = "TENANT_REMOVE";
public static final String TENANT_TOUCH = "TENANT_TOUCH";
public static final String TENANT_USER_INVITE = "TENANT_USER_INVITE";
public static final String WHAT_YOU_KNOW_ABOUT_ME_COMPLETED = "WHAT_YOU_KNOW_ABOUT_ME_COMPLETED";
public static final String GENERATE_FILE = "GENERATE_FILE";
private TrackedEvent event;
public TrackedEvent getEvent() { public static final String FORGET_ME_COMPLETED = "FORGET_ME_COMPLETED";
return event;
}
public void setEvent(TrackedEvent event) { public static final String NOTIFY = "NOTIFY";
this.event = event;
} public static final String TENANT_REACTIVATE = "TENANT_REACTIVATE";
public static final String TENANT_REMOVE = "TENANT_REMOVE";
public static final String TENANT_TOUCH = "TENANT_TOUCH";
public static final String TENANT_USER_INVITE = "TENANT_USER_INVITE";
public static final String USER_TOUCH = "USER_TOUCH";
public static final String USER_REMOVE = "USER_REMOVE";
public static final String WHAT_YOU_KNOW_ABOUT_ME_COMPLETED = "WHAT_YOU_KNOW_ABOUT_ME_COMPLETED";
public static final String GENERATE_FILE = "GENERATE_FILE";
private TrackedEvent event;
public TrackedEvent getEvent() {
return event;
}
public void setEvent(TrackedEvent event) {
this.event = event;
}
} }

View File

@ -1,51 +1,52 @@
package eu.eudat.integrationevent.outbox; package eu.eudat.integrationevent.outbox;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.bind.ConstructorBinding;
import org.springframework.validation.annotation.Validated;
import jakarta.validation.constraints.NotNull;
@Validated
@ConfigurationProperties(prefix = "queue.task.publisher.options") @ConfigurationProperties(prefix = "queue.task.publisher.options")
//@ConstructorBinding
public class OutboxProperties { public class OutboxProperties {
@NotNull
private final String exchange; private final String exchange;
@NotNull
private final String tenantTouchTopic; private final String tenantTouchTopic;
@NotNull
private final String tenantRemovalTopic; private final String tenantRemovalTopic;
@NotNull
private final String tenantReactivationTopic; private final String tenantReactivationTopic;
@NotNull
private final String tenantUserInviteTopic; private final String tenantUserInviteTopic;
@NotNull
private final String userRemovalTopic;
private final String userTouchTopic;
private final String notifyTopic; private final String notifyTopic;
@NotNull
private final String forgetMeCompletedTopic; private final String forgetMeCompletedTopic;
@NotNull
private final String whatYouKnowAboutMeCompletedTopic; private final String whatYouKnowAboutMeCompletedTopic;
@NotNull
private final String generateFileTopic; private final String generateFileTopic;
public OutboxProperties(String exchange, public OutboxProperties(String exchange,
String tenantTouchTopic, String tenantTouchTopic,
String tenantRemovalTopic, String tenantRemovalTopic,
String tenantReactivationTopic, String tenantReactivationTopic,
String tenantUserInviteTopic, String tenantUserInviteTopic,
String notifyTopic, String userRemovalTopic,
String forgetMeCompletedTopic, String userTouchTopic,
String whatYouKnowAboutMeCompletedTopic, String notifyTopic,
String generateFileTopic String forgetMeCompletedTopic,
String whatYouKnowAboutMeCompletedTopic,
String generateFileTopic
) { ) {
this.exchange = exchange; this.exchange = exchange;
this.tenantTouchTopic = tenantTouchTopic; this.tenantTouchTopic = tenantTouchTopic;
this.tenantRemovalTopic = tenantRemovalTopic; this.tenantRemovalTopic = tenantRemovalTopic;
this.tenantReactivationTopic = tenantReactivationTopic; this.tenantReactivationTopic = tenantReactivationTopic;
this.tenantUserInviteTopic = tenantUserInviteTopic; this.tenantUserInviteTopic = tenantUserInviteTopic;
this.notifyTopic = notifyTopic; this.userRemovalTopic = userRemovalTopic;
this.userTouchTopic = userTouchTopic;
this.notifyTopic = notifyTopic;
this.forgetMeCompletedTopic = forgetMeCompletedTopic; this.forgetMeCompletedTopic = forgetMeCompletedTopic;
this.whatYouKnowAboutMeCompletedTopic = whatYouKnowAboutMeCompletedTopic; this.whatYouKnowAboutMeCompletedTopic = whatYouKnowAboutMeCompletedTopic;
this.generateFileTopic = generateFileTopic; this.generateFileTopic = generateFileTopic;
@ -71,6 +72,14 @@ public class OutboxProperties {
return tenantUserInviteTopic; return tenantUserInviteTopic;
} }
public String getUserRemovalTopic() {
return userRemovalTopic;
}
public String getUserTouchTopic() {
return userTouchTopic;
}
public String getNotifyTopic() { public String getNotifyTopic() {
return notifyTopic; return notifyTopic;
} }

View File

@ -14,415 +14,440 @@ import gr.cite.rabbitmq.IntegrationEvent;
import gr.cite.tools.data.query.Ordering; import gr.cite.tools.data.query.Ordering;
import gr.cite.tools.data.query.QueryFactory; import gr.cite.tools.data.query.QueryFactory;
import gr.cite.tools.logging.LoggerService; import gr.cite.tools.logging.LoggerService;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManager;
import jakarta.persistence.EntityManagerFactory; import jakarta.persistence.EntityManagerFactory;
import jakarta.persistence.EntityTransaction; import jakarta.persistence.EntityTransaction;
import jakarta.persistence.OptimisticLockException; import jakarta.persistence.OptimisticLockException;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class OutboxRepositoryImpl implements OutboxRepository { public class OutboxRepositoryImpl implements OutboxRepository {
protected final ApplicationContext applicationContext; protected final ApplicationContext applicationContext;
private final Random random = new Random();
private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(OutboxRepositoryImpl.class));
private final JsonHandlingService jsonHandlingService;
private final OutboxProperties outboxProperties;
public OutboxRepositoryImpl( private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(OutboxRepositoryImpl.class));
ApplicationContext applicationContext,
OutboxProperties outboxProperties
) {
this.applicationContext = applicationContext;
this.jsonHandlingService = this.applicationContext.getBean(JsonHandlingService.class);
this.outboxProperties = outboxProperties;
}
@Override private final JsonHandlingService jsonHandlingService;
public CandidateInfo candidate(Instant lastCandidateCreationTimestamp, MessageOptions messageOptions, Function<QueueOutbox, Boolean> onConfirmTimeout) {
EntityTransaction transaction = null;
EntityManager entityManager = null;
CandidateInfo candidate = null;
try (FakeRequestScope ignored = new FakeRequestScope()) {
try {
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
entityManager = entityManagerFactory.createEntityManager();
transaction = entityManager.getTransaction(); private final OutboxProperties outboxProperties;
transaction.begin();
QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class) public OutboxRepositoryImpl(
.isActives(IsActive.Active) ApplicationContext applicationContext,
.notifyStatus(QueueOutboxNotifyStatus.PENDING, QueueOutboxNotifyStatus.WAITING_CONFIRMATION, QueueOutboxNotifyStatus.ERROR) OutboxProperties outboxProperties
.retryThreshold(messageOptions.getRetryThreashold()) ) {
.confirmTimeout(messageOptions.getConfirmTimeoutSeconds()) this.applicationContext = applicationContext;
.createdAfter(lastCandidateCreationTimestamp) this.jsonHandlingService = this.applicationContext.getBean(JsonHandlingService.class);
.ordering(new Ordering().addAscending(QueueOutboxEntity._createdAt)) this.outboxProperties = outboxProperties;
.first(); }
if (item != null) { @Override
boolean confirmTimeout = onConfirmTimeout.apply(item); public CandidateInfo candidate(Instant lastCandidateCreationTimestamp, MessageOptions messageOptions, Function<QueueOutbox, Boolean> onConfirmTimeout) {
EntityTransaction transaction = null;
EntityManager entityManager = null;
CandidateInfo candidate = null;
try (FakeRequestScope ignored = new FakeRequestScope()) {
try {
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
entityManager = entityManagerFactory.createEntityManager();
QueueOutboxNotifyStatus prevState = item.getNotifyStatus(); transaction = entityManager.getTransaction();
item.setNotifyStatus(QueueOutboxNotifyStatus.PROCESSING); transaction.begin();
entityManager.merge(item); QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class)
entityManager.flush(); .isActives(IsActive.Active)
.notifyStatus(QueueOutboxNotifyStatus.PENDING, QueueOutboxNotifyStatus.WAITING_CONFIRMATION, QueueOutboxNotifyStatus.ERROR)
.retryThreshold(messageOptions.getRetryThreashold())
.confirmTimeout(messageOptions.getConfirmTimeoutSeconds())
.createdAfter(lastCandidateCreationTimestamp)
.ordering(new Ordering().addAscending(QueueOutboxEntity._createdAt))
.first();
candidate = new CandidateInfo(); if (item != null) {
candidate.setId(item.getId()); boolean confirmTimeout = onConfirmTimeout.apply(item);
candidate.setCreatedAt(item.getCreatedAt());
candidate.setPreviousState(prevState);
}
transaction.commit(); QueueOutboxNotifyStatus prevState = item.getNotifyStatus();
} catch (OptimisticLockException ex) { item.setNotifyStatus(QueueOutboxNotifyStatus.PROCESSING);
// we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working
logger.debug("Concurrency exception getting queue outbox. Skipping: {} ", ex.getMessage());
if (transaction != null) transaction.rollback();
candidate = null;
} catch (Exception ex) {
logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
if (transaction != null) transaction.rollback();
candidate = null;
} finally {
if (entityManager != null) entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
}
return candidate; entityManager.merge(item);
} entityManager.flush();
@Override candidate = new CandidateInfo();
public Boolean shouldOmit(CandidateInfo candidate, Function<QueueOutbox, Boolean> shouldOmit) { candidate.setId(item.getId());
EntityTransaction transaction = null; candidate.setCreatedAt(item.getCreatedAt());
EntityManager entityManager = null; candidate.setPreviousState(prevState);
boolean success = false; }
try (FakeRequestScope ignored = new FakeRequestScope()) { transaction.commit();
try { } catch (OptimisticLockException ex) {
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); // we get this if/when someone else already modified the notifications. We want to essentially ignore this, and keep working
logger.debug("Concurrency exception getting queue outbox. Skipping: {} ", ex.getMessage());
if (transaction != null)
transaction.rollback();
candidate = null;
} catch (Exception ex) {
logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
if (transaction != null)
transaction.rollback();
candidate = null;
} finally {
if (entityManager != null)
entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem getting list of queue outbox. Skipping: {}", ex.getMessage(), ex);
}
entityManager = entityManagerFactory.createEntityManager(); return candidate;
transaction = entityManager.getTransaction(); }
transaction.begin(); @Override
public Boolean shouldOmit(CandidateInfo candidate, Function<QueueOutbox, Boolean> shouldOmit) {
EntityTransaction transaction = null;
EntityManager entityManager = null;
boolean success = false;
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); try (FakeRequestScope ignored = new FakeRequestScope()) {
QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidate.getId()).first(); try {
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
if (item == null) { entityManager = entityManagerFactory.createEntityManager();
logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidate.getId()); transaction = entityManager.getTransaction();
} else {
if (shouldOmit.apply(item)) {
item.setNotifyStatus(QueueOutboxNotifyStatus.OMITTED);
entityManager.merge(item); transaction.begin();
entityManager.flush();
success = true;
}
}
transaction.commit(); QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
} catch (Exception ex) { QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidate.getId()).first();
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) transaction.rollback();
success = false;
} finally {
if (entityManager != null) entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
return success;
}
@Override if (item == null) {
public Boolean shouldWait(CandidateInfo candidate, Function<QueueOutbox, Boolean> itIsTimeFunc) { logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidate.getId());
EntityTransaction transaction = null; } else {
EntityManager entityManager = null; if (shouldOmit.apply(item)) {
boolean success = false; item.setNotifyStatus(QueueOutboxNotifyStatus.OMITTED);
try (FakeRequestScope ignored = new FakeRequestScope()) { entityManager.merge(item);
try { entityManager.flush();
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); success = true;
}
}
entityManager = entityManagerFactory.createEntityManager(); transaction.commit();
transaction = entityManager.getTransaction(); } catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null)
transaction.rollback();
success = false;
} finally {
if (entityManager != null)
entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
return success;
}
transaction.begin(); @Override
public Boolean shouldWait(CandidateInfo candidate, Function<QueueOutbox, Boolean> itIsTimeFunc) {
EntityTransaction transaction = null;
EntityManager entityManager = null;
boolean success = false;
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); try (FakeRequestScope ignored = new FakeRequestScope()) {
QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidate.getId()).first(); try {
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
if (item.getRetryCount() != null && item.getRetryCount() >= 1) { entityManager = entityManagerFactory.createEntityManager();
Boolean itIsTime = itIsTimeFunc.apply(item); transaction = entityManager.getTransaction();
if (!itIsTime) { transaction.begin();
item.setNotifyStatus(candidate.getPreviousState());
entityManager.merge(item); QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
entityManager.flush(); QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidate.getId()).first();
success = true;
}
success = !itIsTime; if (item.getRetryCount() != null && item.getRetryCount() >= 1) {
} Boolean itIsTime = itIsTimeFunc.apply(item);
transaction.commit();
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) transaction.rollback();
success = false;
} finally {
if (entityManager != null) entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
return success;
}
@Override if (!itIsTime) {
public Boolean process(CandidateInfo candidateInfo, Boolean isAutoconfirmOnPublish, Function<QueueOutbox, Boolean> publish) { item.setNotifyStatus(candidate.getPreviousState());
EntityTransaction transaction = null;
EntityManager entityManager = null;
Boolean success = false;
try (FakeRequestScope ignored = new FakeRequestScope()) { entityManager.merge(item);
try { entityManager.flush();
success = true;
}
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); success = !itIsTime;
}
transaction.commit();
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null)
transaction.rollback();
success = false;
} finally {
if (entityManager != null)
entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
return success;
}
entityManager = entityManagerFactory.createEntityManager(); @Override
transaction = entityManager.getTransaction(); public Boolean process(CandidateInfo candidateInfo, Boolean isAutoconfirmOnPublish, Function<QueueOutbox, Boolean> publish) {
transaction.begin(); EntityTransaction transaction = null;
EntityManager entityManager = null;
Boolean success = false;
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); try (FakeRequestScope ignored = new FakeRequestScope()) {
QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidateInfo.getId()).first(); try {
if (item == null) { EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidateInfo.getId());
} else {
success = publish.apply(item); entityManager = entityManagerFactory.createEntityManager();
if (success) { transaction = entityManager.getTransaction();
if (isAutoconfirmOnPublish) { transaction.begin();
item.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED);
item.setConfirmedAt(Instant.now());
} else {
item.setNotifyStatus(QueueOutboxNotifyStatus.WAITING_CONFIRMATION);
}
item.setPublishedAt(Instant.now());
} else {
item.setNotifyStatus(QueueOutboxNotifyStatus.ERROR);
item.setRetryCount(item.getRetryCount() != null ? item.getRetryCount() + 1 : 0);
item.setPublishedAt(null);
}
entityManager.merge(item); QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
entityManager.flush(); QueueOutboxEntity item = queryFactory.query(QueueOutboxQuery.class).ids(candidateInfo.getId()).first();
}
transaction.commit(); if (item == null) {
} catch (Exception ex) { logger.warn("Could not lookup queue outbox {} to process. Continuing...", candidateInfo.getId());
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); } else {
if (transaction != null) transaction.rollback();
success = false;
} finally {
if (entityManager != null) entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
return success;
}
@Override success = publish.apply(item);
public void handleConfirm(List<UUID> confirmedMessages) { if (success) {
EntityTransaction transaction = null; if (isAutoconfirmOnPublish) {
EntityManager entityManager = null; item.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED);
item.setConfirmedAt(Instant.now());
} else {
item.setNotifyStatus(QueueOutboxNotifyStatus.WAITING_CONFIRMATION);
}
item.setPublishedAt(Instant.now());
} else {
item.setNotifyStatus(QueueOutboxNotifyStatus.ERROR);
item.setRetryCount(item.getRetryCount() != null ? item.getRetryCount() + 1 : 0);
item.setPublishedAt(null);
}
try (FakeRequestScope ignored = new FakeRequestScope()) { entityManager.merge(item);
try { entityManager.flush();
}
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); transaction.commit();
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null)
transaction.rollback();
success = false;
} finally {
if (entityManager != null)
entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
return success;
}
entityManager = entityManagerFactory.createEntityManager(); @Override
transaction = entityManager.getTransaction(); public void handleConfirm(List<UUID> confirmedMessages) {
transaction.begin(); EntityTransaction transaction = null;
EntityManager entityManager = null;
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); try (FakeRequestScope ignored = new FakeRequestScope()) {
List<QueueOutboxEntity> queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(confirmedMessages).collect(); try {
if (queueOutboxMessages == null) { EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", confirmedMessages.stream().map(x -> x.toString()).collect(Collectors.toList())));
} else {
for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) { entityManager = entityManagerFactory.createEntityManager();
queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED); transaction = entityManager.getTransaction();
queueOutboxMessage.setConfirmedAt(Instant.now()); transaction.begin();
entityManager.merge(queueOutboxMessage);
}
entityManager.flush(); QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
} List<QueueOutboxEntity> queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(confirmedMessages).collect();
transaction.commit(); if (queueOutboxMessages == null) {
} catch (Exception ex) { logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", confirmedMessages.stream().map(x -> x.toString()).collect(Collectors.toList())));
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); } else {
if (transaction != null) transaction.rollback();
} finally {
if (entityManager != null) entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
}
@Override for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) {
public void handleNack(List<UUID> nackedMessages) { queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.CONFIRMED);
EntityTransaction transaction = null; queueOutboxMessage.setConfirmedAt(Instant.now());
EntityManager entityManager = null; entityManager.merge(queueOutboxMessage);
}
try (FakeRequestScope ignored = new FakeRequestScope()) { entityManager.flush();
try { }
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); transaction.commit();
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null)
transaction.rollback();
} finally {
if (entityManager != null)
entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
}
entityManager = entityManagerFactory.createEntityManager(); @Override
transaction = entityManager.getTransaction(); public void handleNack(List<UUID> nackedMessages) {
transaction.begin(); EntityTransaction transaction = null;
EntityManager entityManager = null;
QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); try (FakeRequestScope ignored = new FakeRequestScope()) {
List<QueueOutboxEntity> queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(nackedMessages).collect(); try {
if (queueOutboxMessages == null) { EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", nackedMessages.stream().map(x -> x.toString()).collect(Collectors.toList())));
} else {
for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) { entityManager = entityManagerFactory.createEntityManager();
queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.ERROR); transaction = entityManager.getTransaction();
queueOutboxMessage.setRetryCount(queueOutboxMessage.getRetryCount() != null ? queueOutboxMessage.getRetryCount() + 1 : 0); transaction.begin();
entityManager.merge(queueOutboxMessage);
}
entityManager.flush(); QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class);
} List<QueueOutboxEntity> queueOutboxMessages = queryFactory.query(QueueOutboxQuery.class).ids(nackedMessages).collect();
transaction.commit(); if (queueOutboxMessages == null) {
} catch (Exception ex) { logger.warn("Could not lookup messages {} to process. Continuing...", String.join(",", nackedMessages.stream().map(x -> x.toString()).collect(Collectors.toList())));
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex); } else {
if (transaction != null) transaction.rollback();
} finally {
if (entityManager != null) entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
}
@Override for (QueueOutboxEntity queueOutboxMessage : queueOutboxMessages) {
public QueueOutbox create(IntegrationEvent item) { queueOutboxMessage.setNotifyStatus(QueueOutboxNotifyStatus.ERROR);
EntityTransaction transaction = null; queueOutboxMessage.setRetryCount(queueOutboxMessage.getRetryCount() != null ? queueOutboxMessage.getRetryCount() + 1 : 0);
EntityManager entityManager = null; entityManager.merge(queueOutboxMessage);
boolean success = false; }
QueueOutboxEntity queueMessage = null;
try (FakeRequestScope ignored = new FakeRequestScope()) {
try {
queueMessage = this.mapEvent((OutboxIntegrationEvent) item);
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
entityManager = entityManagerFactory.createEntityManager(); entityManager.flush();
transaction = entityManager.getTransaction(); }
transaction.begin(); transaction.commit();
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null)
transaction.rollback();
} finally {
if (entityManager != null)
entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
}
entityManager.persist(queueMessage); @Override
entityManager.flush(); public QueueOutbox create(IntegrationEvent item) {
EntityTransaction transaction = null;
EntityManager entityManager = null;
boolean success = false;
QueueOutboxEntity queueMessage = null;
try (FakeRequestScope ignored = new FakeRequestScope()) {
try {
queueMessage = this.mapEvent((OutboxIntegrationEvent) item);
EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class);
transaction.commit(); entityManager = entityManagerFactory.createEntityManager();
} catch (Exception ex) { transaction = entityManager.getTransaction();
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
if (transaction != null) transaction.rollback();
success = false;
} finally {
if (entityManager != null) entityManager.close();
}
} catch (Exception ex) {
logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
return queueMessage;
}
private QueueOutboxEntity mapEvent(OutboxIntegrationEvent event) { transaction.begin();
String routingKey;
switch (event.getType()) {
case OutboxIntegrationEvent.TENANT_REACTIVATE: {
routingKey = this.outboxProperties.getTenantReactivationTopic();
break;
}
case OutboxIntegrationEvent.TENANT_REMOVE: {
routingKey = this.outboxProperties.getTenantRemovalTopic();
break;
}
case OutboxIntegrationEvent.TENANT_TOUCH: {
routingKey = this.outboxProperties.getTenantTouchTopic();
break;
}
case OutboxIntegrationEvent.TENANT_USER_INVITE: {
routingKey = this.outboxProperties.getTenantUserInviteTopic();
break;
}
case OutboxIntegrationEvent.FORGET_ME_COMPLETED: {
routingKey = this.outboxProperties.getForgetMeCompletedTopic();
break;
}
case OutboxIntegrationEvent.NOTIFY: {
routingKey = this.outboxProperties.getNotifyTopic();
break;
}
case OutboxIntegrationEvent.WHAT_YOU_KNOW_ABOUT_ME_COMPLETED: {
routingKey = this.outboxProperties.getWhatYouKnowAboutMeCompletedTopic();
break;
}
case OutboxIntegrationEvent.GENERATE_FILE: {
routingKey = this.outboxProperties.getGenerateFileTopic();
break;
}
default: {
logger.error("unrecognized outgoing integration event {}. Skipping...", event.getType());
return null;
}
}
UUID correlationId = UUID.randomUUID(); entityManager.persist(queueMessage);
if (event.getEvent() != null) event.getEvent().setTrackingContextTag(correlationId.toString()); entityManager.flush();
event.setMessage(this.jsonHandlingService.toJsonSafe(event.getEvent()));
//this._logTrackingService.Trace(correlationId.ToString(), $"Correlating current tracking context with new correlationId {correlationId}");
QueueOutboxEntity queueMessage = new QueueOutboxEntity(); transaction.commit();
queueMessage.setId(UUID.randomUUID()); } catch (Exception ex) {
queueMessage.setTenantId(null); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
queueMessage.setExchange(this.outboxProperties.getExchange()); if (transaction != null)
queueMessage.setRoute(routingKey); transaction.rollback();
queueMessage.setMessageId(event.getMessageId()); success = false;
queueMessage.setMessage(this.jsonHandlingService.toJsonSafe(event)); } finally {
queueMessage.setIsActive(IsActive.Active); if (entityManager != null)
queueMessage.setNotifyStatus(QueueOutboxNotifyStatus.PENDING); entityManager.close();
queueMessage.setRetryCount(0); }
queueMessage.setCreatedAt(Instant.now()); } catch (Exception ex) {
queueMessage.setUpdatedAt(Instant.now()); logger.error("Problem executing purge. rolling back any db changes and marking error. Continuing...", ex);
}
return queueMessage;
}
return queueMessage; private QueueOutboxEntity mapEvent(OutboxIntegrationEvent event) {
} String routingKey;
switch (event.getType()) {
case OutboxIntegrationEvent.TENANT_REACTIVATE: {
routingKey = this.outboxProperties.getTenantReactivationTopic();
break;
}
case OutboxIntegrationEvent.TENANT_REMOVE: {
routingKey = this.outboxProperties.getTenantRemovalTopic();
break;
}
case OutboxIntegrationEvent.TENANT_TOUCH: {
routingKey = this.outboxProperties.getTenantTouchTopic();
break;
}
case OutboxIntegrationEvent.TENANT_USER_INVITE: {
routingKey = this.outboxProperties.getTenantUserInviteTopic();
break;
}
case OutboxIntegrationEvent.USER_REMOVE: {
routingKey = this.outboxProperties.getUserRemovalTopic();
break;
}
case OutboxIntegrationEvent.USER_TOUCH: {
routingKey = this.outboxProperties.getUserTouchTopic();
break;
}
case OutboxIntegrationEvent.FORGET_ME_COMPLETED: {
routingKey = this.outboxProperties.getForgetMeCompletedTopic();
break;
}
case OutboxIntegrationEvent.NOTIFY: {
routingKey = this.outboxProperties.getNotifyTopic();
break;
}
case OutboxIntegrationEvent.WHAT_YOU_KNOW_ABOUT_ME_COMPLETED: {
routingKey = this.outboxProperties.getWhatYouKnowAboutMeCompletedTopic();
break;
}
case OutboxIntegrationEvent.GENERATE_FILE: {
routingKey = this.outboxProperties.getGenerateFileTopic();
break;
}
default: {
logger.error("unrecognized outgoing integration event {}. Skipping...", event.getType());
return null;
}
}
UUID correlationId = UUID.randomUUID();
if (event.getEvent() != null)
event.getEvent().setTrackingContextTag(correlationId.toString());
event.setMessage(this.jsonHandlingService.toJsonSafe(event.getEvent()));
//this._logTrackingService.Trace(correlationId.ToString(), $"Correlating current tracking context with new correlationId {correlationId}");
QueueOutboxEntity queueMessage = new QueueOutboxEntity();
queueMessage.setId(UUID.randomUUID());
queueMessage.setTenantId(null);
queueMessage.setExchange(this.outboxProperties.getExchange());
queueMessage.setRoute(routingKey);
queueMessage.setMessageId(event.getMessageId());
queueMessage.setMessage(this.jsonHandlingService.toJsonSafe(event));
queueMessage.setIsActive(IsActive.Active);
queueMessage.setNotifyStatus(QueueOutboxNotifyStatus.PENDING);
queueMessage.setRetryCount(0);
queueMessage.setCreatedAt(Instant.now());
queueMessage.setUpdatedAt(Instant.now());
return queueMessage;
}
} }

View File

@ -32,12 +32,10 @@ public class OutboxServiceImpl implements OutboxService {
public void publish(OutboxIntegrationEvent event) { public void publish(OutboxIntegrationEvent event) {
try { try {
eventPublisher.publishEvent(event); eventPublisher.publishEvent(event);
return; } catch (Exception ex) {
} catch (Exception ex) {
logger.error(new MapLogEntry(String.format("Could not save message ", event.getMessage())).And("message", event.getMessage()).And("ex", ex)); logger.error(new MapLogEntry(String.format("Could not save message ", event.getMessage())).And("message", event.getMessage()).And("ex", ex));
//Still want to skip it from processing //Still want to skip it from processing
return; }
}
} }
} }

View File

@ -0,0 +1,25 @@
package eu.eudat.integrationevent.outbox.userremoval;
import eu.eudat.integrationevent.inbox.ConsistencyHandler;
import eu.eudat.query.UserQuery;
import gr.cite.tools.data.query.QueryFactory;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class UserRemovalConsistencyHandler implements ConsistencyHandler<UserRemovalConsistencyPredicates> {
private final QueryFactory queryFactory;
public UserRemovalConsistencyHandler(QueryFactory queryFactory) {
this.queryFactory = queryFactory;
}
@Override
public Boolean isConsistent(UserRemovalConsistencyPredicates consistencyPredicates) {
long count = this.queryFactory.query(UserQuery.class).ids(consistencyPredicates.getUserId()).count();
return count != 0;
}
}

View File

@ -0,0 +1,21 @@
package eu.eudat.integrationevent.outbox.userremoval;
import eu.eudat.integrationevent.inbox.ConsistencyPredicates;
import java.util.UUID;
public class UserRemovalConsistencyPredicates implements ConsistencyPredicates {
public UserRemovalConsistencyPredicates(UUID userId) {
this.userId = userId;
}
private UUID userId;
public UUID getUserId() {
return userId;
}
public void setUserId(UUID userId) {
this.userId = userId;
}
}

View File

@ -0,0 +1,28 @@
package eu.eudat.integrationevent.outbox.userremoval;
import eu.eudat.integrationevent.TrackedEvent;
import java.util.UUID;
public class UserRemovalIntegrationEvent extends TrackedEvent {
private UUID userId;
private UUID tenant;
public UUID getUserId() {
return userId;
}
public void setUserId(UUID userId) {
this.userId = userId;
}
public UUID getTenant() {
return tenant;
}
public void setTenant(UUID tenant) {
this.tenant = tenant;
}
}

View File

@ -0,0 +1,7 @@
package eu.eudat.integrationevent.outbox.userremoval;
public interface UserRemovalIntegrationEventHandler {
void handle(UserRemovalIntegrationEvent event);
}

View File

@ -0,0 +1,40 @@
package eu.eudat.integrationevent.outbox.userremoval;
import eu.eudat.integrationevent.outbox.OutboxIntegrationEvent;
import eu.eudat.integrationevent.outbox.OutboxService;
import gr.cite.tools.logging.LoggerService;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class UserRemovalIntegrationEventHandlerImpl implements UserRemovalIntegrationEventHandler {
private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(UserRemovalIntegrationEventHandlerImpl.class));
private final OutboxService outboxService;
private final ApplicationContext applicationContext;
public UserRemovalIntegrationEventHandlerImpl(OutboxService outboxService, ApplicationContext applicationContext) {
this.outboxService = outboxService;
this.applicationContext = applicationContext;
}
@Override
public void handle(UserRemovalIntegrationEvent event) {
UserRemovalConsistencyHandler userRemovalConsistencyHandler = this.applicationContext.getBean(UserRemovalConsistencyHandler.class);
if (!userRemovalConsistencyHandler.isConsistent(new UserRemovalConsistencyPredicates(event.getUserId())))
return;
OutboxIntegrationEvent message = new OutboxIntegrationEvent();
message.setMessageId(UUID.randomUUID());
message.setType(OutboxIntegrationEvent.USER_REMOVE);
message.setEvent(event);
this.outboxService.publish(message);
}
}

View File

@ -0,0 +1,126 @@
package eu.eudat.integrationevent.outbox.usertouched;
import eu.eudat.commons.enums.notification.TentativeUserProfile;
import eu.eudat.commons.enums.notification.UserType;
import eu.eudat.integrationevent.TrackedEvent;
import eu.eudat.model.UserContactInfo;
import java.util.List;
import java.util.UUID;
public class UserTouchedIntegrationEvent extends TrackedEvent {
private UUID id;
private UUID tenant;
private String firstName;
private String lastName;
private UserType type;
private UserProfile profile;
private List<UserContactInfo> userContactInfo;
public UUID getId() {
return id;
}
public void setId(UUID id) {
this.id = id;
}
public UUID getTenant() {
return tenant;
}
public void setTenant(UUID tenant) {
this.tenant = tenant;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public UserType getType() {
return type;
}
public void setType(UserType type) {
this.type = type;
}
public UserProfile getProfile() {
return profile;
}
public void setProfile(UserProfile profile) {
this.profile = profile;
}
public List<UserContactInfo> getUserContactInfo() {
return userContactInfo;
}
public void setUserContactInfo(List<UserContactInfo> userContactInfo) {
this.userContactInfo = userContactInfo;
}
public static class UserProfile {
private TentativeUserProfile isTentative;
private String timezone;
private String culture;
private String language;
public TentativeUserProfile getIsTentative() {
return isTentative;
}
public void setIsTentative(TentativeUserProfile isTentative) {
this.isTentative = isTentative;
}
public String getTimezone() {
return timezone;
}
public void setTimezone(String timezone) {
this.timezone = timezone;
}
public String getCulture() {
return culture;
}
public void setCulture(String culture) {
this.culture = culture;
}
public String getLanguage() {
return language;
}
public void setLanguage(String language) {
this.language = language;
}
}
}

View File

@ -0,0 +1,7 @@
package eu.eudat.integrationevent.outbox.usertouched;
public interface UserTouchedIntegrationEventHandler {
void handle(UserTouchedIntegrationEvent event);
}

View File

@ -0,0 +1,34 @@
package eu.eudat.integrationevent.outbox.usertouched;
import eu.eudat.integrationevent.outbox.OutboxIntegrationEvent;
import eu.eudat.integrationevent.outbox.OutboxService;
import gr.cite.tools.logging.LoggerService;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class UserTouchedIntegrationEventHandlerImpl implements UserTouchedIntegrationEventHandler {
private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(UserTouchedIntegrationEventHandlerImpl.class));
private final OutboxService outboxService;
public UserTouchedIntegrationEventHandlerImpl(
OutboxService outboxService) {
this.outboxService = outboxService;
}
@Override
public void handle(UserTouchedIntegrationEvent event) {
OutboxIntegrationEvent message = new OutboxIntegrationEvent();
message.setMessageId(UUID.randomUUID());
message.setType(OutboxIntegrationEvent.USER_TOUCH);
message.setEvent(event);
this.outboxService.publish(message);
}
}

View File

@ -31,6 +31,8 @@ queue:
tenant-removal-topic: tenant.remove tenant-removal-topic: tenant.remove
tenant-touch-topic: tenant.touch tenant-touch-topic: tenant.touch
tenant-user-invite-topic: tenant.invite tenant-user-invite-topic: tenant.invite
user-touch-topic: user.touch
user-removal-topic: user.remove
what-you-know-about-me-completed-topic: whatyouknowaboutme.completed what-you-know-about-me-completed-topic: whatyouknowaboutme.completed
generate-file-topic: generate.file generate-file-topic: generate.file
rabbitmq: rabbitmq: