diff --git a/CHANGELOG.md b/CHANGELOG.md index b3b57ad..a5570c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm # Changelog for "event-publisher-library" ## [v1.1.0-SNAPSHOT] +- Added `BufferedEventProcessor` to manual send bunch of events and controlling their output status ## [v1.0.2] - Refactored/extracted event sender that uses OIDC token only and extended it for the UMA version. diff --git a/pom.xml b/pom.xml index 9d0e105..c6c612a 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ org.gcube.distribution maven-portal-bom - 3.6.3-SNAPSHOT + 3.6.4 pom import diff --git a/src/main/java/org/gcube/event/publisher/BufferedEventProcessor.java b/src/main/java/org/gcube/event/publisher/BufferedEventProcessor.java new file mode 100644 index 0000000..fff66ed --- /dev/null +++ b/src/main/java/org/gcube/event/publisher/BufferedEventProcessor.java @@ -0,0 +1,152 @@ +package org.gcube.event.publisher; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BufferedEventProcessor { + + private static final Logger logger = LoggerFactory.getLogger(BufferedEventProcessor.class); + + private static final int DEFAULT_BUFFER_SIZE = 10; + private static final int RESULT_POLLING_INTERVAL = 500; + + private ArrayBlockingQueue eventsBuffer; + private Map threadStatus; + private int count = 0; + private long min = 10000000000L; + private long max = 0; + private int errors = 0; + + public BufferedEventProcessor(String conductorEndpoint, String clientId, String clientSecret, + String keycloakTokenEndpoint, Function logInfoGenerator) + throws EventProcessorException { + + this(conductorEndpoint, clientId, clientSecret, keycloakTokenEndpoint, logInfoGenerator, DEFAULT_BUFFER_SIZE); + } + + public BufferedEventProcessor(String conductorEndpoint, String clientId, String clientSecret, + String keycloakTokenEndpoint, Function logInfoGenerator, int bufferSize) + throws EventProcessorException { + + eventsBuffer = new ArrayBlockingQueue<>(bufferSize, true); + threadStatus = Collections.synchronizedMap(new HashMap<>(bufferSize)); + final AbstractEventPublisher publisher = new AbstractEventPublisher() { + @Override + protected EventSender createEventSender() { + try { + return new HTTPWithOIDCAuthEventSender(new URL(conductorEndpoint), clientId, clientSecret, + new URL(keycloakTokenEndpoint)); + + } catch (MalformedURLException e) { + e.printStackTrace(); + return null; + } + } + }; + if (publisher.getEventSender() == null) { + throw new EventProcessorException("Cannot create procrssor correctly"); + } + + for (int i = 0; i < bufferSize; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + String log = null; + Event toBeSent; + try { + toBeSent = eventsBuffer.take(); + } catch (InterruptedException e) { + // interrupted when waiting, nothing to do... + continue; + } + long start = System.currentTimeMillis(); + threadStatus.put(this, Boolean.TRUE); + logger.trace(" * Peeked event. Buffer size: {}", eventsBuffer.size()); + log = logInfoGenerator.apply(toBeSent); + logger.debug(" > Sending event for: {}", log); + boolean sent = false; + String workflowId = null; + do { + try { + workflowId = publisher.publish(toBeSent, true); + sent = true; + } catch (Exception e) { + logger.warn("Publishing event for {}. It will be re-published", log, e); + } + } while (!sent); + + EventStatus eventStatus; + do { + eventStatus = null; + try { + Thread.sleep(RESULT_POLLING_INTERVAL); + eventStatus = publisher.check(workflowId); + } catch (Exception e) { + logger.warn("Checking status for the event for {}", log, e); + } + if (eventStatus == null || eventStatus.getStatus() == null) { + continue; + } + } while (eventStatus.getStatus() != EventStatus.Status.COMPLETED + && eventStatus.getStatus() != EventStatus.Status.FAILED); + + long elapsed = System.currentTimeMillis() - start; + count = count + 1; + if (elapsed > max) { + max = elapsed; + } + if (elapsed < min) { + min = elapsed; + } + if (eventStatus.getStatus() == EventStatus.Status.FAILED) { + logger.warn(" - ({}) {} -> {} [{} ms]", + new Object[] { count, log, eventStatus.getStatus(), elapsed }); + + errors += 1; + } else { + logger.info(" - (" + count + ") " + log + " -> " + eventStatus.getStatus() + " [" + + elapsed + " ms]"); + } + threadStatus.put(this, Boolean.FALSE); + } + } + }); + threadStatus.put(t, Boolean.FALSE); + t.start(); + } + } + + public void enqueueEvent(Event event) throws InterruptedException { + eventsBuffer.put(event); + } + + public Boolean allQueuedFinishedCorrectly() { + return eventsBuffer.size() > 0 || threadStatus.values().stream().anyMatch(v -> v.equals(Boolean.TRUE)); + } + + public int getCount() { + return count; + } + + public long getMin() { + return min; + } + + public long getMax() { + return max; + } + + public int getErrors() { + return errors; + } + +} \ No newline at end of file diff --git a/src/main/java/org/gcube/event/publisher/EventProcessorException.java b/src/main/java/org/gcube/event/publisher/EventProcessorException.java new file mode 100644 index 0000000..a2d6dea --- /dev/null +++ b/src/main/java/org/gcube/event/publisher/EventProcessorException.java @@ -0,0 +1,14 @@ +package org.gcube.event.publisher; + +public class EventProcessorException extends Exception { + + private static final long serialVersionUID = 568668418901859597L; + + public EventProcessorException(String message) { + super(message); + } + + public EventProcessorException(String message, Throwable cause) { + super(message, cause); + } +}