From a49e030d71de3890f87b3244e59f433bf80702b2 Mon Sep 17 00:00:00 2001 From: Mauro Mugnaini Date: Mon, 27 Dec 2021 12:49:14 +0100 Subject: [PATCH] Added functions to also get the workflow execution's results with a pluggable results parser, ref. impl. for `Conductor` --- CHANGELOG.md | 7 +- pom.xml | 22 +- .../publisher/AbstractEventPublisher.java | 36 ++- .../publisher/ConductorResultsParser.java | 48 ++++ .../gcube/event/publisher/EventPublisher.java | 4 + .../gcube/event/publisher/EventSender.java | 6 + .../gcube/event/publisher/EventStatus.java | 87 +++++++ .../HTTPWithOIDCAuthEventSender.java | 228 +++++++++++++++--- .../publisher/HTTPWithUMAAuthEventSender.java | 2 +- .../gcube/event/publisher/ResultsParser.java | 9 + .../event/publisher/HTTPEventSenderTest.java | 25 +- 11 files changed, 419 insertions(+), 55 deletions(-) create mode 100644 src/main/java/org/gcube/event/publisher/ConductorResultsParser.java create mode 100644 src/main/java/org/gcube/event/publisher/EventStatus.java create mode 100644 src/main/java/org/gcube/event/publisher/ResultsParser.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 510e2ee..b3b57ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ +This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + # Changelog for "event-publisher-library" +## [v1.1.0-SNAPSHOT] + ## [v1.0.2] - Refactored/extracted event sender that uses OIDC token only and extended it for the UMA version. @@ -8,6 +12,3 @@ ## [v1.0.0-SNAPSHOT] - First release (#19461) - - -This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). \ No newline at end of file diff --git a/pom.xml b/pom.xml index 90c959a..9d0e105 100644 --- a/pom.xml +++ b/pom.xml @@ -1,32 +1,39 @@ + 4.0.0 + maven-parent org.gcube.tools 1.1.0 + org.gcube.common event-publisher-library - 1.0.2 + 1.1.0-SNAPSHOT + jar + org.gcube.distribution maven-portal-bom - 3.6.0 + 3.6.3-SNAPSHOT pom import - - scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git - scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git - https://code-repo.d4science.org/gCubeSystem/${project.artifactId} - + + + scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git + scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git + https://code-repo.d4science.org/gCubeSystem/${project.artifactId} + + @@ -46,6 +53,7 @@ + org.gcube.common diff --git a/src/main/java/org/gcube/event/publisher/AbstractEventPublisher.java b/src/main/java/org/gcube/event/publisher/AbstractEventPublisher.java index bb3cb11..346e23a 100644 --- a/src/main/java/org/gcube/event/publisher/AbstractEventPublisher.java +++ b/src/main/java/org/gcube/event/publisher/AbstractEventPublisher.java @@ -8,22 +8,48 @@ public abstract class AbstractEventPublisher implements EventPublisher { protected static final Logger logger = LoggerFactory.getLogger(AbstractEventPublisher.class); private EventSender eventSender; + private ResultsParser resultsParser; public AbstractEventPublisher() { this.eventSender = createEventSender(); + this.resultsParser = createResultsParser(); } @Override public void publish(Event event) { + publish(event, false); + } + + @Override + public String publish(Event event, boolean waitForResult) { if (event != null) { - eventSender.send(event); + if (waitForResult) { + return getEventSender().sendAndGetResult(event); + } else { + getEventSender().send(event); + } } else { logger.warn("Cannot publish a null event"); } + return null; + } + + @Override + public EventStatus check(String instanceId) { + if (instanceId != null) { + return getResultsParser().parseResults(eventSender.retrive(instanceId)); + } else { + logger.warn("Cannot check with a null instance ID"); + return EventStatus.NOT_FOUND(); + } } protected abstract EventSender createEventSender(); + protected ResultsParser createResultsParser() { + return new ConductorResultsParser(); + } + public EventSender getEventSender() { return eventSender; } @@ -32,4 +58,12 @@ public abstract class AbstractEventPublisher implements EventPublisher { this.eventSender = eventSender; } + public ResultsParser getResultsParser() { + return resultsParser; + } + + public void setResultsParser(ResultsParser resultsParser) { + this.resultsParser = resultsParser; + } + } diff --git a/src/main/java/org/gcube/event/publisher/ConductorResultsParser.java b/src/main/java/org/gcube/event/publisher/ConductorResultsParser.java new file mode 100644 index 0000000..e0512d8 --- /dev/null +++ b/src/main/java/org/gcube/event/publisher/ConductorResultsParser.java @@ -0,0 +1,48 @@ +package org.gcube.event.publisher; + +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConductorResultsParser implements ResultsParser { + + protected static final Logger logger = LoggerFactory.getLogger(ConductorResultsParser.class); + + public ConductorResultsParser() { + } + + @Override + public EventStatus parseResults(JSONObject results) { + EventStatus eventStatus = null; + if (results != null) { + JSONObject input = (JSONObject) results.get("input"); + JSONObject output = (JSONObject) results.get("output"); + switch ((String) results.get("status")) { + case "COMPLETED": + eventStatus = EventStatus.COMPLETED(input, output); + break; + case "FAILED": + eventStatus = EventStatus.FAILED(input, output); + break; + case "NOT_FOUND": + eventStatus = EventStatus.NOT_FOUND(); + break; + case "PAUSED": + eventStatus = EventStatus.PAUSED(input); + break; + case "RUNNING": + eventStatus = EventStatus.RUNNING(input); + break; + case "TERMINATED": + eventStatus = EventStatus.TERMINATED(input, output); + break; + default: + eventStatus = EventStatus.NOT_FOUND(); + } + } else { + logger.warn("Nothing to parse since JSON object is null"); + } + return eventStatus; + } + +} diff --git a/src/main/java/org/gcube/event/publisher/EventPublisher.java b/src/main/java/org/gcube/event/publisher/EventPublisher.java index c3c6cf7..0458668 100644 --- a/src/main/java/org/gcube/event/publisher/EventPublisher.java +++ b/src/main/java/org/gcube/event/publisher/EventPublisher.java @@ -4,4 +4,8 @@ public interface EventPublisher { void publish(Event event); + String publish(Event event, boolean waitForResult); + + EventStatus check(String instanceId); + } \ No newline at end of file diff --git a/src/main/java/org/gcube/event/publisher/EventSender.java b/src/main/java/org/gcube/event/publisher/EventSender.java index 55f039d..58b7f58 100644 --- a/src/main/java/org/gcube/event/publisher/EventSender.java +++ b/src/main/java/org/gcube/event/publisher/EventSender.java @@ -1,7 +1,13 @@ package org.gcube.event.publisher; +import org.json.simple.JSONObject; + public interface EventSender { void send(Event event); + String sendAndGetResult(Event event); + + JSONObject retrive(String id); + } \ No newline at end of file diff --git a/src/main/java/org/gcube/event/publisher/EventStatus.java b/src/main/java/org/gcube/event/publisher/EventStatus.java new file mode 100644 index 0000000..c1a751e --- /dev/null +++ b/src/main/java/org/gcube/event/publisher/EventStatus.java @@ -0,0 +1,87 @@ +package org.gcube.event.publisher; + +import org.json.simple.JSONObject; + +public class EventStatus { + + public enum Status { + RUNNING, COMPLETED, FAILED, TIMED_OUT, TERMINATED, PAUSED, NOT_FOUND; + } + + public static EventStatus RUNNING(JSONObject input) { + return new EventStatus(Status.RUNNING, input); + } + + public static EventStatus COMPLETED(JSONObject input, JSONObject output) { + return new EventStatus(Status.COMPLETED, input, output); + } + + public static EventStatus FAILED(JSONObject input, JSONObject output) { + return new EventStatus(Status.FAILED, input, output); + } + + public static EventStatus TIMED_OUT(JSONObject input) { + return new EventStatus(Status.TIMED_OUT, input); + } + + public static EventStatus TERMINATED(JSONObject input, JSONObject output) { + return new EventStatus(Status.TERMINATED, input, output); + } + + public static EventStatus PAUSED(JSONObject input) { + return new EventStatus(Status.PAUSED, input); + } + + public static EventStatus NOT_FOUND() { + return new EventStatus(Status.NOT_FOUND); + } + + private Status status; + private JSONObject input; + private JSONObject output; + + private EventStatus(Status status) { + this(status, null); + } + + private EventStatus(Status status, JSONObject input) { + this(status, input, null); + } + + private EventStatus(Status status, JSONObject input, JSONObject output) { + setStatus(status); + setInput(input); + setOutput(output); + } + + public void setStatus(Status status) { + this.status = status; + } + + public Status getStatus() { + return status; + } + + public void setInput(JSONObject input) { + this.input = input; + } + + public JSONObject getInput() { + return input; + } + + public void setOutput(JSONObject output) { + this.output = output; + } + + public JSONObject getOutput() { + return output; + } + + @Override + public String toString() { + return String.format("[%s]\ninput: %s\noutput: %s", status, input != null ? input.toJSONString() : "", + output != null ? output.toJSONString() : ""); + } + +} \ No newline at end of file diff --git a/src/main/java/org/gcube/event/publisher/HTTPWithOIDCAuthEventSender.java b/src/main/java/org/gcube/event/publisher/HTTPWithOIDCAuthEventSender.java index 0f48c97..f9cb55d 100644 --- a/src/main/java/org/gcube/event/publisher/HTTPWithOIDCAuthEventSender.java +++ b/src/main/java/org/gcube/event/publisher/HTTPWithOIDCAuthEventSender.java @@ -1,15 +1,20 @@ package org.gcube.event.publisher; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; +import java.util.Arrays; import org.gcube.oidc.rest.JWTToken; import org.gcube.oidc.rest.OpenIdConnectRESTHelper; import org.gcube.oidc.rest.OpenIdConnectRESTHelperException; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,13 +22,13 @@ public class HTTPWithOIDCAuthEventSender implements EventSender { protected static final Logger log = LoggerFactory.getLogger(HTTPWithOIDCAuthEventSender.class); - private URL baseEnndpointURL; + private URL baseEndpointURL; private String clientId; private String clientSecret; private URL tokenURL; - public HTTPWithOIDCAuthEventSender(URL baseEnndpointURL, String clientId, String clientSecret, URL tokenURL) { - this.baseEnndpointURL = baseEnndpointURL; + public HTTPWithOIDCAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret, URL tokenURL) { + this.baseEndpointURL = baseEndpointURL; this.clientId = clientId; this.clientSecret = clientSecret; this.tokenURL = tokenURL; @@ -31,16 +36,30 @@ public class HTTPWithOIDCAuthEventSender implements EventSender { @Override public void send(Event event) { - log.debug("Starting HTTP POST thread to: {}", baseEnndpointURL); + log.debug("Starting HTTP POST thread to: {}", baseEndpointURL); + new Thread(new HTTPPost(baseEndpointURL, event)).start(); + } + + @Override + public String sendAndGetResult(Event event) { + log.debug("Starting HTTP POST thread to: {}", baseEndpointURL); + HTTPPost post = new HTTPPost(baseEndpointURL, event); + Thread postThread = new Thread(post); + postThread.start(); try { - URL eventEndpoint = new URL(baseEnndpointURL, event.getName()); - new Thread(new HTTPost(eventEndpoint, event)).start(); - } catch (MalformedURLException e) { - log.error("Cannot compute event endpoint URL. Event name: " + event.getName() + ", base endpoint: " - + baseEnndpointURL, e); + postThread.join(); + return post.getResult(); + } catch (InterruptedException e) { + log.error("While waiting for HTTP Post thread termination", e); + return null; } } + @Override + public JSONObject retrive(String id) { + return new HTTPGet(baseEndpointURL, id).readJSON(); + } + protected URL getTokenURL() { return tokenURL; } @@ -55,60 +74,192 @@ public class HTTPWithOIDCAuthEventSender implements EventSender { } } - public class HTTPost implements Runnable { + public abstract class HTTPVerb { - private static final int CONNECTION_TIMEOUT = 10000; - private static final int READ_TIMEOUT = 5000; + protected static final int CONNECTION_TIMEOUT = 10000; + protected static final int READ_TIMEOUT = 5000; + + protected URL baseEndpoint; + + public HTTPVerb(URL baseEndpoint) { + this.baseEndpoint = baseEndpoint; + } + + } + + public class HTTPPost extends HTTPVerb implements Runnable { + + private static final int PAUSE_INCREMENT_FACTOR = 2; + private static final long MAX_RETRYINGS = 2; + + private final int[] RETRY_CODES = { HttpURLConnection.HTTP_BAD_GATEWAY, + HttpURLConnection.HTTP_CLIENT_TIMEOUT, HttpURLConnection.HTTP_GATEWAY_TIMEOUT, + HttpURLConnection.HTTP_INTERNAL_ERROR }; - private URL endpoint; private Event event; + private String result; + private long actualPause = 1; + private long retryings = 0; - public HTTPost(URL endpoint, Event event) { - this.endpoint = endpoint; + public HTTPPost(URL baseEndpoint, Event event) { + super(baseEndpoint); this.event = event; } @Override public void run() { + try { + URL eventEndpoint = null; + try { + eventEndpoint = new URL(baseEndpointURL, event.getName()); + } catch (MalformedURLException e) { + log.error("Cannot compute event endpoint URL. Event name: " + event.getName() + ", base endpoint: " + + baseEndpointURL, e); + return; + } + boolean OK = false; + do { + try { + log.debug("Getting auth token for client '{}' if needed", clientId); + JWTToken token = getAuthorizationToken(); + log.debug("Performing HTTP POST to: {}", baseEndpoint); + HttpURLConnection connection = (HttpURLConnection) eventEndpoint.openConnection(); + connection.setRequestMethod("POST"); + connection.setConnectTimeout(CONNECTION_TIMEOUT); + log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout()); + connection.setReadTimeout(READ_TIMEOUT); + log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout()); + connection.setRequestProperty("Content-Type", "application/json"); + // Commented out as per the Conductor issue: https://github.com/Netflix/conductor/issues/376 + // connection.setRequestProperty("Accept", "application/json"); + connection.setDoOutput(true); + if (token != null) { + log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer()); + connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer()); + } else { + log.debug("Sending request without authorization header"); + } + OutputStream os = connection.getOutputStream(); + String jsonString = event.toJSONString(); + log.trace("Sending event JSON: {}", jsonString); + os.write(jsonString.getBytes("UTF-8")); + os.flush(); + os.close(); + + StringBuilder sb = new StringBuilder(); + int httpResultCode = connection.getResponseCode(); + log.trace("HTTP Response code: {}", httpResultCode); + + log.trace("Reading response"); + InputStreamReader isr = null; + if (httpResultCode == HttpURLConnection.HTTP_OK) { + OK = true; + // From this point ahead every exception should not set OK to false + // since the server acknowledged with 200 + isr = new InputStreamReader(connection.getInputStream(), "UTF-8"); + } else { + isr = new InputStreamReader(connection.getErrorStream(), "UTF-8"); + } + BufferedReader br = new BufferedReader(isr); + String line = null; + while ((line = br.readLine()) != null) { + sb.append("\n" + line); + } + br.close(); + isr.close(); + sb.deleteCharAt(0); + result = sb.toString(); + if (OK) { + log.debug("[{}] Event publish for {} is OK", httpResultCode, event.getName()); + } else { + log.trace("Response message from server:\n{}", result); + if (shouldRetryWithCode(httpResultCode)) { + if (retryings <= MAX_RETRYINGS) { + log.warn("[{}] Event publish ERROR, retrying in {} seconds", httpResultCode, + actualPause); + + Thread.sleep(actualPause * 1000); + log.debug("Start retrying event publish: {}", event.getName()); + actualPause *= PAUSE_INCREMENT_FACTOR; + retryings += 1; + } else { + log.error("[{}] Event publish ERROR, exhausted tries after {} retryings", + httpResultCode, + retryings); + + break; + } + } else { + log.info("[{}] Event publish ERROR but should not retry with this HTTP code", + httpResultCode); + + break; + } + } + } catch (IOException | OpenIdConnectRESTHelperException e) { + log.error("POSTing JSON to: " + eventEndpoint, e); + } + } while (!OK); + } catch (InterruptedException e) { + log.error("Sleeping before retry event send", e); + } + } + + private boolean shouldRetryWithCode(int httpResultCode) { + return Arrays.binarySearch(RETRY_CODES, httpResultCode) > 0; + } + + public String getResult() { + return result; + } + + } + + public class HTTPGet extends HTTPVerb { + + private String id; + + public HTTPGet(URL baseEndpoint, String id) { + super(baseEndpoint); + this.id = id; + } + + public JSONObject readJSON() { + URL endpoint = null; + JSONObject results = null; + try { + endpoint = new URL(baseEndpoint, id); + } catch (MalformedURLException e) { + log.error("Cannot compute retrieve endpoint URL. ID: " + id + ", base endpoint: " + baseEndpointURL, e); + return null; + } try { log.debug("Getting auth token for client '{}' if needed", clientId); JWTToken token = getAuthorizationToken(); - log.debug("Performing HTTP POST to: {}", endpoint); + log.debug("Performing HTTP GET to: {}", endpoint); HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection(); - connection.setRequestMethod("POST"); + connection.setRequestMethod("GET"); connection.setConnectTimeout(CONNECTION_TIMEOUT); log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout()); connection.setReadTimeout(READ_TIMEOUT); log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout()); - connection.setRequestProperty("Content-Type", "application/json"); - // Commented out as per the Conductor issue: https://github.com/Netflix/conductor/issues/376 - // connection.setRequestProperty("Accept", "application/json"); - connection.setDoOutput(true); if (token != null) { log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer()); connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer()); } else { log.debug("Sending request without authorization header"); } - OutputStream os = connection.getOutputStream(); - String jsonString = event.toJSONString(); - log.trace("Sending event JSON: {}", jsonString); - os.write(jsonString.getBytes("UTF-8")); - os.flush(); - os.close(); StringBuilder sb = new StringBuilder(); int httpResultCode = connection.getResponseCode(); log.trace("HTTP Response code: {}", httpResultCode); log.trace("Reading response"); - boolean ok = true; InputStreamReader isr = null; if (httpResultCode == HttpURLConnection.HTTP_OK) { isr = new InputStreamReader(connection.getInputStream(), "UTF-8"); } else { - ok = false; isr = new InputStreamReader(connection.getErrorStream(), "UTF-8"); } BufferedReader br = new BufferedReader(isr); @@ -118,16 +269,21 @@ public class HTTPWithOIDCAuthEventSender implements EventSender { } br.close(); isr.close(); - if (ok) { - log.info("[{}] Event publish for {} is OK", httpResultCode, event.getName()); + if (httpResultCode == HttpURLConnection.HTTP_OK) { + log.debug("[{}] Got results for {}", httpResultCode, id); + try { + results = (JSONObject) new JSONParser().parse(sb.toString()); + } catch (ParseException e) { + log.warn("Error parsing results string as JSON: {}", sb.toString()); + } } else { - log.debug("[{}] Event publish for {} is not OK", httpResultCode, event.getName()); + log.warn("[{}] Error getting results for ID {}", httpResultCode, id); } - log.trace("Response message from server: {}", sb.toString()); - } catch (Exception e) { - log.error("POSTing JSON to: " + endpoint, e); + } catch (IOException | OpenIdConnectRESTHelperException e) { + log.error("Getting results from: " + endpoint, e); } + + return results; } } - } \ No newline at end of file diff --git a/src/main/java/org/gcube/event/publisher/HTTPWithUMAAuthEventSender.java b/src/main/java/org/gcube/event/publisher/HTTPWithUMAAuthEventSender.java index e7ef242..624ee0e 100644 --- a/src/main/java/org/gcube/event/publisher/HTTPWithUMAAuthEventSender.java +++ b/src/main/java/org/gcube/event/publisher/HTTPWithUMAAuthEventSender.java @@ -26,7 +26,7 @@ public class HTTPWithUMAAuthEventSender extends HTTPWithOIDCAuthEventSender { JWTToken oidcToken = super.getAuthorizationToken(); if (oidcToken != null) { if (umaAudience != null) { - log.debug("Getting UMA token for audience '{}' from: {}", umaAudience, getTokenURL()); + log.debug("Getting UMA token with audience '{}' from: {}", umaAudience, getTokenURL()); return OpenIdConnectRESTHelper.queryUMAToken(getTokenURL(), oidcToken.getAccessTokenAsBearer(), umaAudience, null); } else { diff --git a/src/main/java/org/gcube/event/publisher/ResultsParser.java b/src/main/java/org/gcube/event/publisher/ResultsParser.java new file mode 100644 index 0000000..921dd94 --- /dev/null +++ b/src/main/java/org/gcube/event/publisher/ResultsParser.java @@ -0,0 +1,9 @@ +package org.gcube.event.publisher; + +import org.json.simple.JSONObject; + +public interface ResultsParser { + + EventStatus parseResults(JSONObject results); + +} diff --git a/src/test/java/org/gcube/event/publisher/HTTPEventSenderTest.java b/src/test/java/org/gcube/event/publisher/HTTPEventSenderTest.java index 293fda9..03b1177 100644 --- a/src/test/java/org/gcube/event/publisher/HTTPEventSenderTest.java +++ b/src/test/java/org/gcube/event/publisher/HTTPEventSenderTest.java @@ -8,14 +8,25 @@ public class HTTPEventSenderTest { public HTTPEventSenderTest() { } - public static void main(String[] args) throws MalformedURLException { - HTTPWithUMAAuthEventSender sender = new HTTPWithUMAAuthEventSender( - new URL("https://nubis1.int.d4science.net/api/workflow/"), "lr62_portal", - "28726d01-9f24-4ef4-a057-3d208d96aaa0", - new URL("https://nubis2.int.d4science.net/auth/realms/d4science/protocol/openid-connect/token"), - "%2Fgcube"); + public static void main(String[] args) { + EventPublisher publisher = new AbstractEventPublisher() { + @Override + protected EventSender createEventSender() { + try { + return new HTTPWithOIDCAuthEventSender( + new URL("https://conductor.dev.d4science.org/api/workflow/"), "lr62_portal", + "6937125d-6d70-404a-9d63-908c1e6415c4", + new URL("https://accounts.dev.d4science.org/auth/realms/d4science/protocol/openid-connect/token")); + } catch (MalformedURLException e) { + e.printStackTrace(); + return null; + } + } + }; - sender.send(new Event("startup", "portal", "gcube")); + System.out.println("Published: " + publisher.publish(new Event("startup", "portal", "gcube"), true)); + EventStatus status = publisher.check("93675866-c209-4584-8576-ec641df63e9a"); + System.out.println(status); } }