package gr.cite.notification.service.notificationscheduling; import gr.cite.notification.common.enums.IsActive; import gr.cite.notification.common.enums.NotificationNotifyState; import gr.cite.notification.common.enums.NotificationTrackingProcess; import gr.cite.notification.common.enums.NotificationTrackingState; import gr.cite.notification.common.scope.fake.FakeRequestScope; import gr.cite.notification.common.scope.tenant.TenantScope; import gr.cite.notification.config.notification.NotificationProperties; import gr.cite.notification.data.NotificationEntity; import gr.cite.notification.data.TenantEntity; import gr.cite.notification.data.TenantEntityManager; import gr.cite.notification.model.SendNotificationResult; import gr.cite.notification.query.NotificationQuery; import gr.cite.notification.query.TenantQuery; import gr.cite.notification.schedule.model.CandidateInfo; import gr.cite.notification.schedule.model.MiniTenant; import gr.cite.notification.service.notification.NotificationService; import gr.cite.notification.service.track.TrackingFactory; import gr.cite.notification.service.track.model.TrackerResponse; import gr.cite.tools.data.query.Ordering; import gr.cite.tools.data.query.QueryFactory; import gr.cite.tools.fieldset.BaseFieldSet; import gr.cite.tools.fieldset.FieldSet; import gr.cite.tools.logging.LoggerService; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManagerFactory; import jakarta.persistence.EntityTransaction; import jakarta.persistence.OptimisticLockException; import java.time.Instant; import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; public class NotificationSchedulingServiceImpl implements NotificationSchedulingService { private static final LoggerService logger = new LoggerService(LoggerFactory.getLogger(NotificationSchedulingServiceImpl.class)); private final ApplicationContext applicationContext; private final NotificationProperties properties; public NotificationSchedulingServiceImpl(ApplicationContext applicationContext, NotificationProperties properties) { this.applicationContext = applicationContext; this.properties = properties; } @Override public CandidateInfo candidateToNotify(Instant lastCandidateNotificationCreationTimestamp) { EntityManager entityManager = null; EntityTransaction transaction = null; CandidateInfo candidateInfo = null; try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); NotificationQuery notificationQuery = queryFactory.query(NotificationQuery.class); NotificationEntity candidates; notificationQuery = notificationQuery .isActive(IsActive.Active) .notifyState(NotificationNotifyState.PENDING, NotificationNotifyState.ERROR) .retryThreshold(Math.toIntExact(this.properties.getTask().getProcessor().getOptions().getRetryThreshold())) .createdAfter(lastCandidateNotificationCreationTimestamp) .ordering(new Ordering().addAscending(NotificationEntity._createdAt)); candidates = notificationQuery.first(); if (candidates != null) { NotificationNotifyState previousState = candidates.getNotifyState(); candidates.setNotifyState(NotificationNotifyState.PROCESSING); //candidates.setUpdatedAt(Instant.now()); TenantScope tenantScope = this.applicationContext.getBean(TenantScope.class); try { if (candidates.getTenantId() != null) { TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(candidates.getTenantId()).first(); tenantScope.setTempTenant(entityManager, tenant.getId(), tenant.getCode()); } else { tenantScope.setTempTenant(entityManager, null, tenantScope.getDefaultTenantCode()); } candidates = entityManager.merge(candidates); entityManager.persist(candidates); entityManager.flush(); } finally { tenantScope.removeTempTenant(entityManager); } candidateInfo = new CandidateInfo(candidates.getId(), previousState, candidates.getCreatedAt()); } transaction.commit(); } catch (OptimisticLockException e) { logger.error("Optimistic Lock Error occurred on Notification persist"); if (transaction != null) transaction.rollback(); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); if (transaction != null) transaction.rollback(); } finally { if (entityManager != null) entityManager.close(); } return candidateInfo; } @Override public Boolean shouldWait(CandidateInfo candidateInfo) { EntityManager entityManager = null; EntityTransaction transaction = null; Boolean shouldWait = false; try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); NotificationEntity notification = entityManager.find(NotificationEntity.class, candidateInfo.getNotificationId()); if (notification.getRetryCount() != null && notification.getRetryCount() >= 1) { int accumulatedRetry = 0; int pastAccumulateRetry = 0; NotificationProperties.Task.Processor.Options options = properties.getTask().getProcessor().getOptions(); for (int i = 1; i <= notification.getRetryCount() + 1; i += 1) accumulatedRetry += (i * options.getRetryThreshold()); for (int i = 1; i <= notification.getRetryCount(); i += 1) pastAccumulateRetry += (i * options.getRetryThreshold()); int randAccumulatedRetry = ThreadLocalRandom.current().nextInt((int) (accumulatedRetry / 2), accumulatedRetry + 1); long additionalTime = randAccumulatedRetry > options.getMaxRetryDelaySeconds() ? options.getMaxRetryDelaySeconds() : randAccumulatedRetry; long retry = pastAccumulateRetry + additionalTime; Instant retryOn = notification.getCreatedAt().plusSeconds(retry); boolean itIsTime = retryOn.isBefore(Instant.now()); if (!itIsTime) { notification.setNotifyState(candidateInfo.getPreviousState()); //notification.setUpdatedAt(Instant.now()); TenantScope tenantScope = this.applicationContext.getBean(TenantScope.class); try { if (notification.getTenantId() != null) { QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(notification.getTenantId()).first(); tenantScope.setTempTenant(entityManager, tenant.getId(), tenant.getCode()); } else { tenantScope.setTempTenant(entityManager, null, tenantScope.getDefaultTenantCode()); } // notification = entityManager.merge(notification); entityManager.merge(notification); entityManager.flush(); } finally { tenantScope.removeTempTenant(entityManager); } } shouldWait = !itIsTime; } transaction.commit(); } catch (OptimisticLockException e) { logger.error("Optimistic Lock Error occurred on Notification persist"); if (transaction != null) transaction.rollback(); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); if (transaction != null) transaction.rollback(); } finally { if (entityManager != null) entityManager.close(); } return shouldWait; } @Override public Boolean shouldOmitNotify(UUID notificationId) { EntityManager entityManager = null; EntityTransaction transaction = null; Boolean shouldOmit = false; try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); NotificationEntity notification = entityManager.find(NotificationEntity.class, notificationId); long age = Instant.now().getEpochSecond() - notification.getCreatedAt().getEpochSecond(); long omitSeconds = properties.getTask().getProcessor().getOptions().getTooOldToSendSeconds(); if (age >= omitSeconds) { notification.setNotifyState(NotificationNotifyState.OMITTED); //notification.setUpdatedAt(Instant.now()); TenantScope tenantScope = this.applicationContext.getBean(TenantScope.class); try { if (notification.getTenantId() != null) { QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(notification.getTenantId()).first(); tenantScope.setTempTenant(entityManager, tenant.getId(), tenant.getCode()); } else { tenantScope.setTempTenant(entityManager, null, tenantScope.getDefaultTenantCode()); } notification = entityManager.merge(notification); entityManager.persist(notification); entityManager.flush(); } finally { tenantScope.removeTempTenant(entityManager); } shouldOmit = true; } transaction.commit(); } catch (OptimisticLockException e) { logger.error("Optimistic Lock Error occurred on Notification persist"); if (transaction != null) transaction.rollback(); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); if (transaction != null) transaction.rollback(); } finally { if (entityManager != null) entityManager.close(); } return shouldOmit; } @Override public Boolean notify(UUID notificationId) { EntityManager entityManager = null; EntityTransaction transaction = null; Boolean success = null; try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); NotificationQuery notificationQuery = applicationContext.getBean(NotificationQuery.class); NotificationEntity notification = notificationQuery.ids(notificationId).first(); if (notification == null) throw new IllegalArgumentException("notification is null"); TenantScope tenantScope = this.applicationContext.getBean(TenantScope.class); TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); tenantEntityManager.setEntityManager(entityManager); SendNotificationResult result = null; NotificationService notificationService = this.applicationContext.getBean(NotificationService.class); try { if (notification.getTenantId() != null) { QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(notification.getTenantId()).first(); tenantScope.setTempTenant(entityManager, tenant.getId(), tenant.getCode()); } else { tenantScope.setTempTenant(entityManager, null, tenantScope.getDefaultTenantCode()); } result = notificationService.doNotify(notification); if (result != null && result.getSuccess()) { notification.setNotifyState(NotificationNotifyState.SUCCESSFUL); notification.setTrackingData(result.getTrackingData()); notification.setNotifiedWith(result.getContactType()); notification.setNotifiedAt(Instant.now()); } else { notification.setNotifyState(NotificationNotifyState.ERROR); notification.setRetryCount(notification.getRetryCount() != null ? notification.getRetryCount() + 1 : 0); notification.setNotifiedWith(null); notification.setNotifiedAt(null); } //notification.setUpdatedAt(Instant.now()); NotificationEntity notification1 = entityManager.merge(notification); entityManager.persist(notification1); entityManager.flush(); } finally { tenantScope.removeTempTenant(entityManager); } //we want to return false for notification we want to add in the skip bag success = result != null && result.getSuccess(); transaction.commit(); } catch (OptimisticLockException e) { logger.error("Optimistic Lock Error occurred on Notification persist"); if (transaction != null) transaction.rollback(); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); if (transaction != null) transaction.rollback(); } finally { if (entityManager != null) entityManager.close(); } return success; } @Override public CandidateInfo candidateToTrack(Instant lastCandidateNotificationCreationTimestamp) { EntityManager entityManager = null; EntityTransaction transaction = null; CandidateInfo candidateInfo = null; try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); NotificationQuery notificationQuery = queryFactory.query(NotificationQuery.class); NotificationEntity candidates; notificationQuery = notificationQuery .isActive(IsActive.Active) .notifyState(NotificationNotifyState.SUCCESSFUL) .notifiedWithHasValue() .notifiedWithHasValue() .createdAfter(lastCandidateNotificationCreationTimestamp) .trackingState(NotificationTrackingState.QUEUED, NotificationTrackingState.SENT, NotificationTrackingState.UNDEFINED) .trackingProgress(NotificationTrackingProcess.PENDING); candidates = notificationQuery.first(); if (candidates != null) { NotificationNotifyState previousState = candidates.getNotifyState(); candidates.setTrackingProcess(NotificationTrackingProcess.PROCESSING); //candidates.setUpdatedAt(Instant.now()); TenantScope tenantScope = this.applicationContext.getBean(TenantScope.class); try { if (candidates.getTenantId() != null) { TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(candidates.getTenantId()).first(); tenantScope.setTempTenant(entityManager, tenant.getId(), tenant.getCode()); } else { tenantScope.setTempTenant(entityManager, null, tenantScope.getDefaultTenantCode()); } candidates = entityManager.merge(candidates); entityManager.persist(candidates); entityManager.flush(); } finally { tenantScope.removeTempTenant(entityManager); } candidateInfo = new CandidateInfo(candidates.getId(), previousState, candidates.getCreatedAt()); } transaction.commit(); } catch (OptimisticLockException e) { logger.error("Optimistic Lock Error occurred on Notification persist"); if (transaction != null) transaction.rollback(); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); if (transaction != null) transaction.rollback(); } finally { if (entityManager != null) entityManager.close(); } return candidateInfo; } @Override public Boolean shouldOmitTracking(UUID notificationId) { EntityManager entityManager = null; EntityTransaction transaction = null; Boolean shouldOmit = false; try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); NotificationEntity notification = entityManager.find(NotificationEntity.class, notificationId); long age = Instant.now().getEpochSecond() - notification.getNotifiedAt().getEpochSecond(); long omitSeconds = properties.getTask().getProcessor().getOptions().getTooOldToTrackSeconds(); if (age >= omitSeconds) { notification.setTrackingProcess(NotificationTrackingProcess.OMITTED); //notification.setUpdatedAt(Instant.now()); TenantScope tenantScope = this.applicationContext.getBean(TenantScope.class); try { if (notification.getTenantId() != null) { QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(notification.getTenantId()).first(); tenantScope.setTempTenant(entityManager, tenant.getId(), tenant.getCode()); } else { tenantScope.setTempTenant(entityManager, null, tenantScope.getDefaultTenantCode()); } notification = entityManager.merge(notification); entityManager.persist(notification); entityManager.flush(); } finally { tenantScope.removeTempTenant(entityManager); } shouldOmit = true; } transaction.commit(); } catch (OptimisticLockException e) { logger.error("Optimistic Lock Error occurred on Notification persist"); if (transaction != null) transaction.rollback(); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); if (transaction != null) transaction.rollback(); } finally { if (entityManager != null) entityManager.close(); } return shouldOmit; } @Override public Boolean track(UUID notificationId) { EntityManager entityManager = null; EntityTransaction transaction = null; Boolean success = null; try (FakeRequestScope fakeRequestScope = new FakeRequestScope()) { QueryFactory queryFactory = this.applicationContext.getBean(QueryFactory.class); EntityManagerFactory entityManagerFactory = this.applicationContext.getBean(EntityManagerFactory.class); entityManager = entityManagerFactory.createEntityManager(); transaction = entityManager.getTransaction(); transaction.begin(); NotificationQuery notificationQuery = queryFactory.query(NotificationQuery.class); NotificationEntity notification = notificationQuery.ids(notificationId).first(); if (notification == null) throw new IllegalArgumentException("notification is null"); if (notification.getNotifiedWith() == null) throw new IllegalArgumentException("Notification's Notified With is null"); if (notification.getNotifiedAt() == null) throw new IllegalArgumentException("Notification's Notified At is null"); TenantScope tenantScope = this.applicationContext.getBean(TenantScope.class); TenantEntityManager tenantEntityManager = this.applicationContext.getBean(TenantEntityManager.class); tenantEntityManager.setEntityManager(entityManager); TrackerResponse result = null; try { if (notification.getTenantId() != null) { TenantEntity tenant = queryFactory.query(TenantQuery.class).ids(notification.getTenantId()).first(); tenantScope.setTempTenant(entityManager, tenant.getId(), tenant.getCode()); } else { tenantScope.setTempTenant(entityManager, null, tenantScope.getDefaultTenantCode()); } try { TrackingFactory trackingFactory = applicationContext.getBean(TrackingFactory.class); result = trackingFactory.fromContactType(notification.getNotifiedWith()).track(notification); } catch (Exception e) { logger.error("Could not complete track for notification " + notification.getId(), e); } if (result != null && notification.getTrackingProcess().equals(result.getTrackingProgress()) && notification.getTrackingState().equals(result.getTrackingState())) { return false; } if (result != null) { notification.setTrackingState(result.getTrackingState()); notification.setTrackingProcess(result.getTrackingProgress()); notification.setTrackingData(result.getTrackingData()); } else { notification.setTrackingProcess(NotificationTrackingProcess.ERROR); } //notification.setUpdatedAt(Instant.now()); NotificationEntity notification1 = entityManager.merge(notification); entityManager.persist(notification1); entityManager.flush(); } finally { tenantScope.removeTempTenant(entityManager); } //we want to return false for notification we want to add in the skip bag success = result != null && !notification.getTrackingProcess().equals(NotificationTrackingProcess.ERROR); transaction.commit(); } catch (OptimisticLockException e) { logger.error("Optimistic Lock Error occurred on Notification persist"); if (transaction != null) transaction.rollback(); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); if (transaction != null) transaction.rollback(); } finally { if (entityManager != null) entityManager.close(); } return success; } }