event-publisher-library/src/main/java/org/gcube/event/publisher/BufferedEventProcessor.java

152 lines
5.9 KiB
Java

package org.gcube.event.publisher;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BufferedEventProcessor {
private static final Logger logger = LoggerFactory.getLogger(BufferedEventProcessor.class);
private static final int DEFAULT_BUFFER_SIZE = 10;
private static final int RESULT_POLLING_INTERVAL = 500;
private ArrayBlockingQueue<Event> eventsBuffer;
private Map<Runnable, Boolean> threadStatus;
private int count = 0;
private long min = 10000000000L;
private long max = 0;
private int errors = 0;
public BufferedEventProcessor(String conductorEndpoint, String clientId, String clientSecret,
String keycloakTokenEndpoint, Function<Event, String> logInfoGenerator)
throws EventProcessorException {
this(conductorEndpoint, clientId, clientSecret, keycloakTokenEndpoint, logInfoGenerator, DEFAULT_BUFFER_SIZE);
}
public BufferedEventProcessor(String conductorEndpoint, String clientId, String clientSecret,
String keycloakTokenEndpoint, Function<Event, String> logInfoGenerator, int bufferSize)
throws EventProcessorException {
eventsBuffer = new ArrayBlockingQueue<>(bufferSize, true);
threadStatus = Collections.synchronizedMap(new HashMap<>(bufferSize));
final AbstractEventPublisher publisher = new AbstractEventPublisher() {
@Override
protected EventSender createEventSender() {
try {
return new HTTPWithOIDCAuthEventSender(new URL(conductorEndpoint), clientId, clientSecret,
new URL(keycloakTokenEndpoint));
} catch (MalformedURLException e) {
e.printStackTrace();
return null;
}
}
};
if (publisher.getEventSender() == null) {
throw new EventProcessorException("Cannot create procrssor correctly");
}
for (int i = 0; i < bufferSize; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
String log = null;
Event toBeSent;
try {
toBeSent = eventsBuffer.take();
} catch (InterruptedException e) {
// interrupted when waiting, nothing to do...
continue;
}
long start = System.currentTimeMillis();
threadStatus.put(this, Boolean.TRUE);
logger.trace(" * Peeked event. Buffer size: {}", eventsBuffer.size());
log = logInfoGenerator.apply(toBeSent);
logger.debug(" > Sending event for: {}", log);
boolean sent = false;
String workflowId = null;
do {
try {
workflowId = publisher.publish(toBeSent, true);
sent = true;
} catch (Exception e) {
logger.warn("Publishing event for {}. It will be re-published", log, e);
}
} while (!sent);
EventStatus eventStatus;
do {
eventStatus = null;
try {
Thread.sleep(RESULT_POLLING_INTERVAL);
eventStatus = publisher.check(workflowId);
} catch (Exception e) {
logger.warn("Checking status for the event for {}", log, e);
}
if (eventStatus == null || eventStatus.getStatus() == null) {
continue;
}
} while (eventStatus.getStatus() != EventStatus.Status.COMPLETED
&& eventStatus.getStatus() != EventStatus.Status.FAILED);
long elapsed = System.currentTimeMillis() - start;
count = count + 1;
if (elapsed > max) {
max = elapsed;
}
if (elapsed < min) {
min = elapsed;
}
if (eventStatus.getStatus() == EventStatus.Status.FAILED) {
logger.warn(" - ({}) {} -> {} [{} ms]",
new Object[] { count, log, eventStatus.getStatus(), elapsed });
errors += 1;
} else {
logger.info(" - (" + count + ") " + log + " -> " + eventStatus.getStatus() + " ["
+ elapsed + " ms]");
}
threadStatus.put(this, Boolean.FALSE);
}
}
});
threadStatus.put(t, Boolean.FALSE);
t.start();
}
}
public void enqueueEvent(Event event) throws InterruptedException {
eventsBuffer.put(event);
}
public Boolean allQueuedFinishedCorrectly() {
return eventsBuffer.size() > 0 || threadStatus.values().stream().anyMatch(v -> v.equals(Boolean.TRUE));
}
public int getCount() {
return count;
}
public long getMin() {
return min;
}
public long getMax() {
return max;
}
public int getErrors() {
return errors;
}
}