package org.gcube.event.publisher; import java.net.MalformedURLException; import java.net.URL; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BufferedEventProcessor { private static final Logger logger = LoggerFactory.getLogger(BufferedEventProcessor.class); private static final int DEFAULT_BUFFER_SIZE = 10; private static final int RESULT_POLLING_INTERVAL = 500; private ArrayBlockingQueue eventsBuffer; private Map threadStatus; private int count = 0; private long min = 10000000000L; private long max = 0; private int errors = 0; public BufferedEventProcessor(String conductorEndpoint, String clientId, String clientSecret, String keycloakTokenEndpoint, Function logInfoGenerator) throws EventProcessorException { this(conductorEndpoint, clientId, clientSecret, keycloakTokenEndpoint, logInfoGenerator, DEFAULT_BUFFER_SIZE); } public BufferedEventProcessor(String conductorEndpoint, String clientId, String clientSecret, String keycloakTokenEndpoint, Function logInfoGenerator, int bufferSize) throws EventProcessorException { eventsBuffer = new ArrayBlockingQueue<>(bufferSize, true); threadStatus = Collections.synchronizedMap(new HashMap<>(bufferSize)); final AbstractEventPublisher publisher = new AbstractEventPublisher() { @Override protected EventSender createEventSender() { try { return new HTTPWithOIDCAuthEventSender(new URL(conductorEndpoint), clientId, clientSecret, new URL(keycloakTokenEndpoint)); } catch (MalformedURLException e) { e.printStackTrace(); return null; } } }; if (publisher.getEventSender() == null) { throw new EventProcessorException("Cannot create procrssor correctly"); } for (int i = 0; i < bufferSize; i++) { Thread t = new Thread(new Runnable() { @Override public void run() { while (true) { String log = null; Event toBeSent; try { toBeSent = eventsBuffer.take(); } catch (InterruptedException e) { // interrupted when waiting, nothing to do... continue; } long start = System.currentTimeMillis(); threadStatus.put(this, Boolean.TRUE); logger.trace(" * Peeked event. Buffer size: {}", eventsBuffer.size()); log = logInfoGenerator.apply(toBeSent); logger.debug(" > Sending event for: {}", log); boolean sent = false; String workflowId = null; do { try { workflowId = publisher.publish(toBeSent, true); sent = true; } catch (Exception e) { logger.warn("Publishing event for {}. It will be re-published", log, e); } } while (!sent); EventStatus eventStatus; do { eventStatus = null; try { Thread.sleep(RESULT_POLLING_INTERVAL); eventStatus = publisher.check(workflowId); } catch (Exception e) { logger.warn("Checking status for the event for {}", log, e); } if (eventStatus == null || eventStatus.getStatus() == null) { continue; } } while (eventStatus.getStatus() != EventStatus.Status.COMPLETED && eventStatus.getStatus() != EventStatus.Status.FAILED); long elapsed = System.currentTimeMillis() - start; count = count + 1; if (elapsed > max) { max = elapsed; } if (elapsed < min) { min = elapsed; } if (eventStatus.getStatus() == EventStatus.Status.FAILED) { logger.warn(" - ({}) {} -> {} [{} ms]", new Object[] { count, log, eventStatus.getStatus(), elapsed }); errors += 1; } else { logger.info(" - (" + count + ") " + log + " -> " + eventStatus.getStatus() + " [" + elapsed + " ms]"); } threadStatus.put(this, Boolean.FALSE); } } }); threadStatus.put(t, Boolean.FALSE); t.start(); } } public void enqueueEvent(Event event) throws InterruptedException { eventsBuffer.put(event); } public Boolean allQueuedFinishedCorrectly() { return eventsBuffer.size() > 0 || threadStatus.values().stream().anyMatch(v -> v.equals(Boolean.TRUE)); } public int getCount() { return count; } public long getMin() { return min; } public long getMax() { return max; } public int getErrors() { return errors; } }