Added functions to also get the workflow execution's results with a pluggable results parser, ref. impl. for `Conductor`
This commit is contained in:
parent
3f01b590f2
commit
a49e030d71
|
@ -1,5 +1,9 @@
|
|||
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
# Changelog for "event-publisher-library"
|
||||
|
||||
## [v1.1.0-SNAPSHOT]
|
||||
|
||||
## [v1.0.2]
|
||||
- Refactored/extracted event sender that uses OIDC token only and extended it for the UMA version.
|
||||
|
||||
|
@ -8,6 +12,3 @@
|
|||
|
||||
## [v1.0.0-SNAPSHOT]
|
||||
- First release (#19461)
|
||||
|
||||
|
||||
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
22
pom.xml
22
pom.xml
|
@ -1,32 +1,39 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>maven-parent</artifactId>
|
||||
<groupId>org.gcube.tools</groupId>
|
||||
<version>1.1.0</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
|
||||
<groupId>org.gcube.common</groupId>
|
||||
<artifactId>event-publisher-library</artifactId>
|
||||
<version>1.0.2</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.gcube.distribution</groupId>
|
||||
<artifactId>maven-portal-bom</artifactId>
|
||||
<version>3.6.0</version>
|
||||
<version>3.6.3-SNAPSHOT</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<scm>
|
||||
<connection>scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git</connection>
|
||||
<developerConnection>scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git</developerConnection>
|
||||
<url>https://code-repo.d4science.org/gCubeSystem/${project.artifactId}</url>
|
||||
</scm>
|
||||
|
||||
<scm>
|
||||
<connection>scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git</connection>
|
||||
<developerConnection>scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git</developerConnection>
|
||||
<url>https://code-repo.d4science.org/gCubeSystem/${project.artifactId}</url>
|
||||
</scm>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
|
@ -46,6 +53,7 @@
|
|||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.gcube.common</groupId>
|
||||
|
|
|
@ -8,22 +8,48 @@ public abstract class AbstractEventPublisher implements EventPublisher {
|
|||
protected static final Logger logger = LoggerFactory.getLogger(AbstractEventPublisher.class);
|
||||
|
||||
private EventSender eventSender;
|
||||
private ResultsParser resultsParser;
|
||||
|
||||
public AbstractEventPublisher() {
|
||||
this.eventSender = createEventSender();
|
||||
this.resultsParser = createResultsParser();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publish(Event event) {
|
||||
publish(event, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String publish(Event event, boolean waitForResult) {
|
||||
if (event != null) {
|
||||
eventSender.send(event);
|
||||
if (waitForResult) {
|
||||
return getEventSender().sendAndGetResult(event);
|
||||
} else {
|
||||
getEventSender().send(event);
|
||||
}
|
||||
} else {
|
||||
logger.warn("Cannot publish a null event");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventStatus check(String instanceId) {
|
||||
if (instanceId != null) {
|
||||
return getResultsParser().parseResults(eventSender.retrive(instanceId));
|
||||
} else {
|
||||
logger.warn("Cannot check with a null instance ID");
|
||||
return EventStatus.NOT_FOUND();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract EventSender createEventSender();
|
||||
|
||||
protected ResultsParser createResultsParser() {
|
||||
return new ConductorResultsParser();
|
||||
}
|
||||
|
||||
public EventSender getEventSender() {
|
||||
return eventSender;
|
||||
}
|
||||
|
@ -32,4 +58,12 @@ public abstract class AbstractEventPublisher implements EventPublisher {
|
|||
this.eventSender = eventSender;
|
||||
}
|
||||
|
||||
public ResultsParser getResultsParser() {
|
||||
return resultsParser;
|
||||
}
|
||||
|
||||
public void setResultsParser(ResultsParser resultsParser) {
|
||||
this.resultsParser = resultsParser;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package org.gcube.event.publisher;
|
||||
|
||||
import org.json.simple.JSONObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ConductorResultsParser implements ResultsParser {
|
||||
|
||||
protected static final Logger logger = LoggerFactory.getLogger(ConductorResultsParser.class);
|
||||
|
||||
public ConductorResultsParser() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventStatus parseResults(JSONObject results) {
|
||||
EventStatus eventStatus = null;
|
||||
if (results != null) {
|
||||
JSONObject input = (JSONObject) results.get("input");
|
||||
JSONObject output = (JSONObject) results.get("output");
|
||||
switch ((String) results.get("status")) {
|
||||
case "COMPLETED":
|
||||
eventStatus = EventStatus.COMPLETED(input, output);
|
||||
break;
|
||||
case "FAILED":
|
||||
eventStatus = EventStatus.FAILED(input, output);
|
||||
break;
|
||||
case "NOT_FOUND":
|
||||
eventStatus = EventStatus.NOT_FOUND();
|
||||
break;
|
||||
case "PAUSED":
|
||||
eventStatus = EventStatus.PAUSED(input);
|
||||
break;
|
||||
case "RUNNING":
|
||||
eventStatus = EventStatus.RUNNING(input);
|
||||
break;
|
||||
case "TERMINATED":
|
||||
eventStatus = EventStatus.TERMINATED(input, output);
|
||||
break;
|
||||
default:
|
||||
eventStatus = EventStatus.NOT_FOUND();
|
||||
}
|
||||
} else {
|
||||
logger.warn("Nothing to parse since JSON object is null");
|
||||
}
|
||||
return eventStatus;
|
||||
}
|
||||
|
||||
}
|
|
@ -4,4 +4,8 @@ public interface EventPublisher {
|
|||
|
||||
void publish(Event event);
|
||||
|
||||
String publish(Event event, boolean waitForResult);
|
||||
|
||||
EventStatus check(String instanceId);
|
||||
|
||||
}
|
|
@ -1,7 +1,13 @@
|
|||
package org.gcube.event.publisher;
|
||||
|
||||
import org.json.simple.JSONObject;
|
||||
|
||||
public interface EventSender {
|
||||
|
||||
void send(Event event);
|
||||
|
||||
String sendAndGetResult(Event event);
|
||||
|
||||
JSONObject retrive(String id);
|
||||
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
package org.gcube.event.publisher;
|
||||
|
||||
import org.json.simple.JSONObject;
|
||||
|
||||
public class EventStatus {
|
||||
|
||||
public enum Status {
|
||||
RUNNING, COMPLETED, FAILED, TIMED_OUT, TERMINATED, PAUSED, NOT_FOUND;
|
||||
}
|
||||
|
||||
public static EventStatus RUNNING(JSONObject input) {
|
||||
return new EventStatus(Status.RUNNING, input);
|
||||
}
|
||||
|
||||
public static EventStatus COMPLETED(JSONObject input, JSONObject output) {
|
||||
return new EventStatus(Status.COMPLETED, input, output);
|
||||
}
|
||||
|
||||
public static EventStatus FAILED(JSONObject input, JSONObject output) {
|
||||
return new EventStatus(Status.FAILED, input, output);
|
||||
}
|
||||
|
||||
public static EventStatus TIMED_OUT(JSONObject input) {
|
||||
return new EventStatus(Status.TIMED_OUT, input);
|
||||
}
|
||||
|
||||
public static EventStatus TERMINATED(JSONObject input, JSONObject output) {
|
||||
return new EventStatus(Status.TERMINATED, input, output);
|
||||
}
|
||||
|
||||
public static EventStatus PAUSED(JSONObject input) {
|
||||
return new EventStatus(Status.PAUSED, input);
|
||||
}
|
||||
|
||||
public static EventStatus NOT_FOUND() {
|
||||
return new EventStatus(Status.NOT_FOUND);
|
||||
}
|
||||
|
||||
private Status status;
|
||||
private JSONObject input;
|
||||
private JSONObject output;
|
||||
|
||||
private EventStatus(Status status) {
|
||||
this(status, null);
|
||||
}
|
||||
|
||||
private EventStatus(Status status, JSONObject input) {
|
||||
this(status, input, null);
|
||||
}
|
||||
|
||||
private EventStatus(Status status, JSONObject input, JSONObject output) {
|
||||
setStatus(status);
|
||||
setInput(input);
|
||||
setOutput(output);
|
||||
}
|
||||
|
||||
public void setStatus(Status status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public Status getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setInput(JSONObject input) {
|
||||
this.input = input;
|
||||
}
|
||||
|
||||
public JSONObject getInput() {
|
||||
return input;
|
||||
}
|
||||
|
||||
public void setOutput(JSONObject output) {
|
||||
this.output = output;
|
||||
}
|
||||
|
||||
public JSONObject getOutput() {
|
||||
return output;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("[%s]\ninput: %s\noutput: %s", status, input != null ? input.toJSONString() : "<No input>",
|
||||
output != null ? output.toJSONString() : "<No output>");
|
||||
}
|
||||
|
||||
}
|
|
@ -1,15 +1,20 @@
|
|||
package org.gcube.event.publisher;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.gcube.oidc.rest.JWTToken;
|
||||
import org.gcube.oidc.rest.OpenIdConnectRESTHelper;
|
||||
import org.gcube.oidc.rest.OpenIdConnectRESTHelperException;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -17,13 +22,13 @@ public class HTTPWithOIDCAuthEventSender implements EventSender {
|
|||
|
||||
protected static final Logger log = LoggerFactory.getLogger(HTTPWithOIDCAuthEventSender.class);
|
||||
|
||||
private URL baseEnndpointURL;
|
||||
private URL baseEndpointURL;
|
||||
private String clientId;
|
||||
private String clientSecret;
|
||||
private URL tokenURL;
|
||||
|
||||
public HTTPWithOIDCAuthEventSender(URL baseEnndpointURL, String clientId, String clientSecret, URL tokenURL) {
|
||||
this.baseEnndpointURL = baseEnndpointURL;
|
||||
public HTTPWithOIDCAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret, URL tokenURL) {
|
||||
this.baseEndpointURL = baseEndpointURL;
|
||||
this.clientId = clientId;
|
||||
this.clientSecret = clientSecret;
|
||||
this.tokenURL = tokenURL;
|
||||
|
@ -31,16 +36,30 @@ public class HTTPWithOIDCAuthEventSender implements EventSender {
|
|||
|
||||
@Override
|
||||
public void send(Event event) {
|
||||
log.debug("Starting HTTP POST thread to: {}", baseEnndpointURL);
|
||||
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
|
||||
new Thread(new HTTPPost(baseEndpointURL, event)).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String sendAndGetResult(Event event) {
|
||||
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
|
||||
HTTPPost post = new HTTPPost(baseEndpointURL, event);
|
||||
Thread postThread = new Thread(post);
|
||||
postThread.start();
|
||||
try {
|
||||
URL eventEndpoint = new URL(baseEnndpointURL, event.getName());
|
||||
new Thread(new HTTPost(eventEndpoint, event)).start();
|
||||
} catch (MalformedURLException e) {
|
||||
log.error("Cannot compute event endpoint URL. Event name: " + event.getName() + ", base endpoint: "
|
||||
+ baseEnndpointURL, e);
|
||||
postThread.join();
|
||||
return post.getResult();
|
||||
} catch (InterruptedException e) {
|
||||
log.error("While waiting for HTTP Post thread termination", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public JSONObject retrive(String id) {
|
||||
return new HTTPGet(baseEndpointURL, id).readJSON();
|
||||
}
|
||||
|
||||
protected URL getTokenURL() {
|
||||
return tokenURL;
|
||||
}
|
||||
|
@ -55,60 +74,192 @@ public class HTTPWithOIDCAuthEventSender implements EventSender {
|
|||
}
|
||||
}
|
||||
|
||||
public class HTTPost implements Runnable {
|
||||
public abstract class HTTPVerb {
|
||||
|
||||
private static final int CONNECTION_TIMEOUT = 10000;
|
||||
private static final int READ_TIMEOUT = 5000;
|
||||
protected static final int CONNECTION_TIMEOUT = 10000;
|
||||
protected static final int READ_TIMEOUT = 5000;
|
||||
|
||||
protected URL baseEndpoint;
|
||||
|
||||
public HTTPVerb(URL baseEndpoint) {
|
||||
this.baseEndpoint = baseEndpoint;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public class HTTPPost extends HTTPVerb implements Runnable {
|
||||
|
||||
private static final int PAUSE_INCREMENT_FACTOR = 2;
|
||||
private static final long MAX_RETRYINGS = 2;
|
||||
|
||||
private final int[] RETRY_CODES = { HttpURLConnection.HTTP_BAD_GATEWAY,
|
||||
HttpURLConnection.HTTP_CLIENT_TIMEOUT, HttpURLConnection.HTTP_GATEWAY_TIMEOUT,
|
||||
HttpURLConnection.HTTP_INTERNAL_ERROR };
|
||||
|
||||
private URL endpoint;
|
||||
private Event event;
|
||||
private String result;
|
||||
private long actualPause = 1;
|
||||
private long retryings = 0;
|
||||
|
||||
public HTTPost(URL endpoint, Event event) {
|
||||
this.endpoint = endpoint;
|
||||
public HTTPPost(URL baseEndpoint, Event event) {
|
||||
super(baseEndpoint);
|
||||
this.event = event;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
URL eventEndpoint = null;
|
||||
try {
|
||||
eventEndpoint = new URL(baseEndpointURL, event.getName());
|
||||
} catch (MalformedURLException e) {
|
||||
log.error("Cannot compute event endpoint URL. Event name: " + event.getName() + ", base endpoint: "
|
||||
+ baseEndpointURL, e);
|
||||
return;
|
||||
}
|
||||
boolean OK = false;
|
||||
do {
|
||||
try {
|
||||
log.debug("Getting auth token for client '{}' if needed", clientId);
|
||||
JWTToken token = getAuthorizationToken();
|
||||
log.debug("Performing HTTP POST to: {}", baseEndpoint);
|
||||
HttpURLConnection connection = (HttpURLConnection) eventEndpoint.openConnection();
|
||||
connection.setRequestMethod("POST");
|
||||
connection.setConnectTimeout(CONNECTION_TIMEOUT);
|
||||
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
||||
connection.setReadTimeout(READ_TIMEOUT);
|
||||
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
||||
connection.setRequestProperty("Content-Type", "application/json");
|
||||
// Commented out as per the Conductor issue: https://github.com/Netflix/conductor/issues/376
|
||||
// connection.setRequestProperty("Accept", "application/json");
|
||||
connection.setDoOutput(true);
|
||||
if (token != null) {
|
||||
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
||||
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
||||
} else {
|
||||
log.debug("Sending request without authorization header");
|
||||
}
|
||||
OutputStream os = connection.getOutputStream();
|
||||
String jsonString = event.toJSONString();
|
||||
log.trace("Sending event JSON: {}", jsonString);
|
||||
os.write(jsonString.getBytes("UTF-8"));
|
||||
os.flush();
|
||||
os.close();
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
int httpResultCode = connection.getResponseCode();
|
||||
log.trace("HTTP Response code: {}", httpResultCode);
|
||||
|
||||
log.trace("Reading response");
|
||||
InputStreamReader isr = null;
|
||||
if (httpResultCode == HttpURLConnection.HTTP_OK) {
|
||||
OK = true;
|
||||
// From this point ahead every exception should not set OK to false
|
||||
// since the server acknowledged with 200
|
||||
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
||||
} else {
|
||||
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
||||
}
|
||||
BufferedReader br = new BufferedReader(isr);
|
||||
String line = null;
|
||||
while ((line = br.readLine()) != null) {
|
||||
sb.append("\n" + line);
|
||||
}
|
||||
br.close();
|
||||
isr.close();
|
||||
sb.deleteCharAt(0);
|
||||
result = sb.toString();
|
||||
if (OK) {
|
||||
log.debug("[{}] Event publish for {} is OK", httpResultCode, event.getName());
|
||||
} else {
|
||||
log.trace("Response message from server:\n{}", result);
|
||||
if (shouldRetryWithCode(httpResultCode)) {
|
||||
if (retryings <= MAX_RETRYINGS) {
|
||||
log.warn("[{}] Event publish ERROR, retrying in {} seconds", httpResultCode,
|
||||
actualPause);
|
||||
|
||||
Thread.sleep(actualPause * 1000);
|
||||
log.debug("Start retrying event publish: {}", event.getName());
|
||||
actualPause *= PAUSE_INCREMENT_FACTOR;
|
||||
retryings += 1;
|
||||
} else {
|
||||
log.error("[{}] Event publish ERROR, exhausted tries after {} retryings",
|
||||
httpResultCode,
|
||||
retryings);
|
||||
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
log.info("[{}] Event publish ERROR but should not retry with this HTTP code",
|
||||
httpResultCode);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (IOException | OpenIdConnectRESTHelperException e) {
|
||||
log.error("POSTing JSON to: " + eventEndpoint, e);
|
||||
}
|
||||
} while (!OK);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Sleeping before retry event send", e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldRetryWithCode(int httpResultCode) {
|
||||
return Arrays.binarySearch(RETRY_CODES, httpResultCode) > 0;
|
||||
}
|
||||
|
||||
public String getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public class HTTPGet extends HTTPVerb {
|
||||
|
||||
private String id;
|
||||
|
||||
public HTTPGet(URL baseEndpoint, String id) {
|
||||
super(baseEndpoint);
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public JSONObject readJSON() {
|
||||
URL endpoint = null;
|
||||
JSONObject results = null;
|
||||
try {
|
||||
endpoint = new URL(baseEndpoint, id);
|
||||
} catch (MalformedURLException e) {
|
||||
log.error("Cannot compute retrieve endpoint URL. ID: " + id + ", base endpoint: " + baseEndpointURL, e);
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
log.debug("Getting auth token for client '{}' if needed", clientId);
|
||||
JWTToken token = getAuthorizationToken();
|
||||
|
||||
log.debug("Performing HTTP POST to: {}", endpoint);
|
||||
log.debug("Performing HTTP GET to: {}", endpoint);
|
||||
HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection();
|
||||
connection.setRequestMethod("POST");
|
||||
connection.setRequestMethod("GET");
|
||||
connection.setConnectTimeout(CONNECTION_TIMEOUT);
|
||||
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
||||
connection.setReadTimeout(READ_TIMEOUT);
|
||||
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
||||
connection.setRequestProperty("Content-Type", "application/json");
|
||||
// Commented out as per the Conductor issue: https://github.com/Netflix/conductor/issues/376
|
||||
// connection.setRequestProperty("Accept", "application/json");
|
||||
connection.setDoOutput(true);
|
||||
if (token != null) {
|
||||
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
||||
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
||||
} else {
|
||||
log.debug("Sending request without authorization header");
|
||||
}
|
||||
OutputStream os = connection.getOutputStream();
|
||||
String jsonString = event.toJSONString();
|
||||
log.trace("Sending event JSON: {}", jsonString);
|
||||
os.write(jsonString.getBytes("UTF-8"));
|
||||
os.flush();
|
||||
os.close();
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
int httpResultCode = connection.getResponseCode();
|
||||
log.trace("HTTP Response code: {}", httpResultCode);
|
||||
|
||||
log.trace("Reading response");
|
||||
boolean ok = true;
|
||||
InputStreamReader isr = null;
|
||||
if (httpResultCode == HttpURLConnection.HTTP_OK) {
|
||||
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
||||
} else {
|
||||
ok = false;
|
||||
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
||||
}
|
||||
BufferedReader br = new BufferedReader(isr);
|
||||
|
@ -118,16 +269,21 @@ public class HTTPWithOIDCAuthEventSender implements EventSender {
|
|||
}
|
||||
br.close();
|
||||
isr.close();
|
||||
if (ok) {
|
||||
log.info("[{}] Event publish for {} is OK", httpResultCode, event.getName());
|
||||
if (httpResultCode == HttpURLConnection.HTTP_OK) {
|
||||
log.debug("[{}] Got results for {}", httpResultCode, id);
|
||||
try {
|
||||
results = (JSONObject) new JSONParser().parse(sb.toString());
|
||||
} catch (ParseException e) {
|
||||
log.warn("Error parsing results string as JSON: {}", sb.toString());
|
||||
}
|
||||
} else {
|
||||
log.debug("[{}] Event publish for {} is not OK", httpResultCode, event.getName());
|
||||
log.warn("[{}] Error getting results for ID {}", httpResultCode, id);
|
||||
}
|
||||
log.trace("Response message from server: {}", sb.toString());
|
||||
} catch (Exception e) {
|
||||
log.error("POSTing JSON to: " + endpoint, e);
|
||||
} catch (IOException | OpenIdConnectRESTHelperException e) {
|
||||
log.error("Getting results from: " + endpoint, e);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -26,7 +26,7 @@ public class HTTPWithUMAAuthEventSender extends HTTPWithOIDCAuthEventSender {
|
|||
JWTToken oidcToken = super.getAuthorizationToken();
|
||||
if (oidcToken != null) {
|
||||
if (umaAudience != null) {
|
||||
log.debug("Getting UMA token for audience '{}' from: {}", umaAudience, getTokenURL());
|
||||
log.debug("Getting UMA token with audience '{}' from: {}", umaAudience, getTokenURL());
|
||||
return OpenIdConnectRESTHelper.queryUMAToken(getTokenURL(), oidcToken.getAccessTokenAsBearer(),
|
||||
umaAudience, null);
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
package org.gcube.event.publisher;
|
||||
|
||||
import org.json.simple.JSONObject;
|
||||
|
||||
public interface ResultsParser {
|
||||
|
||||
EventStatus parseResults(JSONObject results);
|
||||
|
||||
}
|
|
@ -8,14 +8,25 @@ public class HTTPEventSenderTest {
|
|||
public HTTPEventSenderTest() {
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws MalformedURLException {
|
||||
HTTPWithUMAAuthEventSender sender = new HTTPWithUMAAuthEventSender(
|
||||
new URL("https://nubis1.int.d4science.net/api/workflow/"), "lr62_portal",
|
||||
"28726d01-9f24-4ef4-a057-3d208d96aaa0",
|
||||
new URL("https://nubis2.int.d4science.net/auth/realms/d4science/protocol/openid-connect/token"),
|
||||
"%2Fgcube");
|
||||
public static void main(String[] args) {
|
||||
EventPublisher publisher = new AbstractEventPublisher() {
|
||||
@Override
|
||||
protected EventSender createEventSender() {
|
||||
try {
|
||||
return new HTTPWithOIDCAuthEventSender(
|
||||
new URL("https://conductor.dev.d4science.org/api/workflow/"), "lr62_portal",
|
||||
"6937125d-6d70-404a-9d63-908c1e6415c4",
|
||||
new URL("https://accounts.dev.d4science.org/auth/realms/d4science/protocol/openid-connect/token"));
|
||||
} catch (MalformedURLException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
sender.send(new Event("startup", "portal", "gcube"));
|
||||
System.out.println("Published: " + publisher.publish(new Event("startup", "portal", "gcube"), true));
|
||||
EventStatus status = publisher.check("93675866-c209-4584-8576-ec641df63e9a");
|
||||
System.out.println(status);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue