Compare commits

...

14 Commits

Author SHA1 Message Date
Mauro Mugnaini a5dc463350
Re-releasing lib without `-SNAPSHOT` suffix 2024-05-16 10:41:18 +02:00
Mauro Mugnaini 7d46e2f791
Explicitily setting Java8 as target version 2024-05-16 10:36:02 +02:00
Mauro Mugnaini 28a9532a67
Changed workflow start according to new API with specific JSON object payload 2024-05-16 10:35:00 +02:00
Mauro Mugnaini 34e3b37a91
Added `log4j` lib for test that was missing after the migration to the `gcube-bom` from `portal` one 2024-05-09 15:53:01 +02:00
Mauro Mugnaini f3b3ee42c2
Releasing `v1.2.0` after moving from `maven-portal-bom` to `gcube-bom` 2024-05-08 12:01:41 +02:00
Mauro Mugnaini 6ea28b7b67
Moved from `maven-portal-bom` to `gcube-bom` 2024-05-08 12:00:43 +02:00
Mauro Mugnaini 1a012c7d1d
Moved back to `Java8` 2024-04-24 17:47:10 +02:00
Mauro Mugnaini 779881eb93
Releasing `v1.2.0` 2024-04-24 10:41:05 +02:00
Mauro Mugnaini 7790eb127c
Better handling of exceptions and retrying behavior in case of read timeout. (Default connection timeout is 10s and read timeout now is 60s) 2024-04-22 16:17:59 +02:00
Mauro Mugnaini d761dac8ac
- Restored correct behavior for event publishing with workflow id only sent back
- Extracted `AbstractHTTPWithJWTTokenAuthEventSender` class for easy subclassing
- Added new outcome check methods to inspect last send and last check actions results and some other facilities
2024-03-08 13:43:43 +01:00
Mauro Mugnaini 38259734b1
Added also setter/getter methods for connection and read timeouts 2024-03-06 12:18:30 +01:00
Mauro Mugnaini edfe561608
Added hooks methods for connection and read timeouts override 2024-03-06 12:09:09 +01:00
Mauro Mugnaini e41a8f9c20
Added missing constructor for extracted abstract class 2024-03-05 15:06:11 +01:00
Mauro Mugnaini b47f04a197
Extracted the `AbstractHTTPWithJWTTokenAuthEventSender` abstract class for easy extention purposes and updated to `maven-parent` version `1.2.1-SNAPSHOT` and `maven-portal-bom` version `3.7.0` 2024-03-05 11:49:47 +01:00
16 changed files with 736 additions and 352 deletions

View File

@ -1,27 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
@ -30,15 +13,29 @@
<attribute name="org.eclipse.jst.component.nondependency" value=""/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jst.server.core.container/com.liferay.ide.eclipse.server.tomcat.runtimeClasspathProvider/Liferay v6.2 (Tomcat 7)">
<attributes>
<attribute name="owner.project.facets" value="jst.utility"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11">
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="test" value="true"/>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

View File

@ -1,11 +1,16 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=11
org.eclipse.jdt.core.compiler.compliance=11
org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=warning
org.eclipse.jdt.core.compiler.release=enabled
org.eclipse.jdt.core.compiler.source=11
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8

View File

@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<faceted-project>
<runtime name="Liferay v6.2 (Tomcat 7)"/>
<installed facet="jst.utility" version="1.0"/>
<installed facet="java" version="11"/>
<installed facet="java" version="1.8"/>
</faceted-project>

View File

@ -2,6 +2,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
# Changelog for "event-publisher-library"
## [v1.2.0]
- Restored correct behavior for event publishing with workflow id only sent back, setting input payload in JSON
- Extracted `AbstractHTTPWithJWTTokenAuthEventSender` class for easy subclassing
- Added new outcome check methods to inspect last send and last check actions results and some other facilities
- Better handling of exceptions and retrying behavior in case of read timeout. (Default connection timeout is 10s and read timeout now is 60s)
- Moved from `maven-portal-bom` to `gcube-bom`
## [v1.1.0]
- Added `BufferedEventProcessor` to manual send bunch of events and controlling their output status (#23628)

56
pom.xml
View File

@ -7,21 +7,21 @@
<parent>
<artifactId>maven-parent</artifactId>
<groupId>org.gcube.tools</groupId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath />
</parent>
<groupId>org.gcube.common</groupId>
<artifactId>event-publisher-library</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<packaging>jar</packaging>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.gcube.distribution</groupId>
<artifactId>maven-portal-bom</artifactId>
<version>3.6.4</version>
<artifactId>gcube-bom</artifactId>
<version>2.4.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
@ -34,23 +34,32 @@
<url>https://code-repo.d4science.org/gCubeSystem/${project.artifactId}</url>
</scm>
<properties>
<java.version>8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<oidc-library.version>[1.2.0-SNAPSHOT,)</oidc-library.version>
<slf4j-log4j12.version>1.6.4</slf4j-log4j12.version>
<log4j.version>1.2.16</log4j.version>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
<version>2.5</version>
</plugin>
<!-- <plugin>-->
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
<!-- <configuration>-->
<!-- <encoding>UTF-8</encoding>-->
<!-- <source>${maven.compiler.source}</source>-->
<!-- <target>${maven.compiler.target}</target>-->
<!-- </configuration>-->
<!-- </plugin>-->
<!-- <plugin>-->
<!-- <artifactId>maven-resources-plugin</artifactId>-->
<!-- <configuration>-->
<!-- <encoding>UTF-8</encoding>-->
<!-- </configuration>-->
<!-- <version>2.5</version>-->
<!-- </plugin>-->
</plugins>
</build>
@ -59,6 +68,7 @@
<groupId>org.gcube.common</groupId>
<artifactId>oidc-library</artifactId>
<scope>compile</scope>
<version>${oidc-library.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@ -73,7 +83,15 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-log4j12.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,5 +1,7 @@
package org.gcube.event.publisher;
import java.net.HttpURLConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,16 +36,61 @@ public abstract class AbstractEventPublisher implements EventPublisher {
return null;
}
@Override
public boolean isLastPublishOK() {
return getLastPublishEventHTTPResponseCode() == HttpURLConnection.HTTP_OK
|| getLastPublishEventHTTPResponseCode() == HttpURLConnection.HTTP_CREATED;
}
@Override
public int getLastPublishEventHTTPResponseCode() {
return getEventSender().getLastSendHTTPResponseCode();
}
@Override
public EventStatus publishAndCheck(Event event) {
return publishAndCheck(event, 0);
}
@Override
public EventStatus publishAndCheck(Event event, int delayMS) {
String instanceId = publish(event, true);
try {
if (delayMS > 0) {
Thread.sleep(delayMS);
}
return check(instanceId);
} catch (InterruptedException e) {
logger.error("Sleeping before performing the event status check", e);
return null;
}
}
@Override
public EventStatus check(String instanceId) {
if (instanceId != null) {
return getResultsParser().parseResults(eventSender.retrive(instanceId));
return getResultsParser().parseResults(instanceId, eventSender.retrive(instanceId));
} else {
logger.warn("Cannot check with a null instance ID");
return EventStatus.NOT_FOUND();
return EventStatus.NOT_FOUND(instanceId);
}
}
@Override
public EventStatus refresh(EventStatus eventStatus) {
return check(eventStatus.getInstanceId());
}
@Override
public boolean isLastCheckOK() {
return getLastCheckHTTPResponseCode() == 200;
}
@Override
public int getLastCheckHTTPResponseCode() {
return getEventSender().getLastRetrieveHTTPResponseCode();
}
protected abstract EventSender createEventSender();
protected ResultsParser createResultsParser() {

View File

@ -0,0 +1,365 @@
package org.gcube.event.publisher;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.Arrays;
import org.gcube.oidc.rest.JWTToken;
import org.gcube.oidc.rest.OpenIdConnectRESTHelperException;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSender {
protected static final Logger log = LoggerFactory.getLogger(HTTPWithOIDCAuthEventSender.class);
protected URL baseEndpointURL;
protected String clientId;
protected String clientSecret;
protected URL tokenURL;
protected int connectionTimeout;
protected int readTimeout;
protected HTTPPost lastHTTPPost;
protected HTTPGet lastHTTPGet;
public AbstractHTTPWithJWTTokenAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret,
URL tokenURL) {
this(baseEndpointURL, clientId, clientSecret, tokenURL, HTTPVerb.DEFAULT_CONNECTION_TIMEOUT,
HTTPVerb.DEFAULT_READ_TIMEOUT);
}
public AbstractHTTPWithJWTTokenAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret,
URL tokenURL, int connectionTimeout, int readTimeout) {
super();
this.baseEndpointURL = baseEndpointURL;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.tokenURL = tokenURL;
this.connectionTimeout = connectionTimeout;
this.readTimeout = readTimeout;
}
public int getReadTimeout() {
return readTimeout;
}
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}
public int getConnectionTimeout() {
return connectionTimeout;
}
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
@Override
public void send(Event event) {
log.debug("Starting HTTP POST thread with base URL: {}", baseEndpointURL);
lastHTTPPost = new HTTPPost(baseEndpointURL, event);
new Thread(lastHTTPPost).start();
}
@Override
public int getLastSendHTTPResponseCode() {
return lastHTTPPost != null ? lastHTTPPost.gethttpResponseCode() : -1;
}
@Override
public String sendAndGetResult(Event event) {
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
lastHTTPPost = new HTTPPost(baseEndpointURL, event);
Thread postThread = new Thread(lastHTTPPost);
postThread.start();
try {
postThread.join();
return lastHTTPPost.getResult();
} catch (InterruptedException e) {
log.error("While waiting for HTTP Post thread termination", e);
return null;
}
}
@Override
public JSONObject retrive(String id) {
lastHTTPGet = new HTTPGet(baseEndpointURL, id);
return lastHTTPGet.readJSON();
}
@Override
public int getLastRetrieveHTTPResponseCode() {
return lastHTTPGet != null ? lastHTTPGet.gethttpResponseCode() : -1;
}
protected URL getTokenURL() {
return tokenURL;
}
protected abstract JWTToken getAuthorizationToken() throws OpenIdConnectRESTHelperException;
public abstract class HTTPVerb {
protected static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
protected static final int DEFAULT_READ_TIMEOUT = 60000;
protected URL baseEndpoint;
protected int httpResponseCode = -1;
protected String httpContentType;
public HTTPVerb(URL baseEndpoint) {
this.baseEndpoint = baseEndpoint;
}
public int gethttpResponseCode() {
return httpResponseCode;
}
}
public class HTTPPost extends HTTPVerb implements Runnable {
private static final int FIRST_PAUSE = 7500; // ms
private static final int PAUSE_INCREMENT_FACTOR = 4;
private static final long MAX_RETRYINGS = 5;
private static final boolean RETRY_ON_READ_TIMEOUT = false;
private final int[] RETRY_CODES = { HttpURLConnection.HTTP_BAD_GATEWAY,
HttpURLConnection.HTTP_CLIENT_TIMEOUT, HttpURLConnection.HTTP_GATEWAY_TIMEOUT,
HttpURLConnection.HTTP_INTERNAL_ERROR };
private Event event;
private String result;
private long actualPause = FIRST_PAUSE;
private long retryings = 0;
public HTTPPost(URL baseEndpoint, Event event) {
super(baseEndpoint);
this.event = event;
}
@Override
public void run() {
try {
URL eventEndpoint = null;
if (baseEndpoint.toString().endsWith("/")) {
try {
log.debug("Removing trailing slash from base URL since the endpoint computation API changed");
eventEndpoint = new URL(
baseEndpoint.toString().substring(0, baseEndpoint.toString().length() - 1));
} catch (MalformedURLException e) {
log.error("Cannot remove trailing slash from base endpoint URL", e);
return;
}
} else {
eventEndpoint = baseEndpoint;
}
boolean OK = false;
do {
try {
log.debug("Getting auth token for client '{}' if needed", clientId);
JWTToken token = getAuthorizationToken();
log.debug("Performing HTTP POST to: {}", eventEndpoint);
HttpURLConnection connection = (HttpURLConnection) eventEndpoint.openConnection();
connection.setRequestMethod("POST");
connection.setConnectTimeout(connectionTimeout);
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
connection.setReadTimeout(readTimeout);
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Accept", "text/plain");
connection.setDoOutput(true);
if (token != null) {
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
} else {
log.debug("Sending request without authorization header");
}
OutputStream os = connection.getOutputStream();
String jsonString = new StartWorkflow(event).toJSONString();
log.trace("Sending event JSON: {}", jsonString);
os.write(jsonString.getBytes("UTF-8"));
os.flush();
os.close();
StringBuilder sb = new StringBuilder();
httpResponseCode = connection.getResponseCode();
log.trace("HTTP Response code: {}", httpResponseCode);
httpContentType = connection.getContentType();
log.trace("HTTP Response content type is: {}", httpContentType);
log.trace("Reading response");
InputStreamReader isr = null;
if (httpResponseCode == HttpURLConnection.HTTP_OK) {
OK = true;
// From this point ahead every exception should not set OK to false
// since the server acknowledged with 200
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
} else {
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
}
try {
BufferedReader br = new BufferedReader(isr);
String line = null;
while ((line = br.readLine()) != null) {
sb.append("\n" + line);
}
br.close();
isr.close();
sb.deleteCharAt(0);
result = sb.toString();
} catch (Exception e) {
log.error("Error reading response", e);
break;
}
if (OK) {
log.debug("[{}] Event publish for {} is OK", httpResponseCode, event.getName());
} else {
log.trace("Response message from server:\n{}", result);
if (shouldRetryWithCode(httpResponseCode)) {
log.warn("[{}] Event publish ERROR", httpResponseCode);
} else {
log.info("[{}] Event publish ERROR but not retrying to send it with this HTTP code",
httpResponseCode);
break;
}
}
} catch (SocketTimeoutException e) {
log.error("Read timeout POSTing JSON to: " + eventEndpoint, e);
if (RETRY_ON_READ_TIMEOUT) {
log.trace("Event will be re-sent accodingly to retryes");
} else {
log.trace("Event will be not re-sent");
break;
}
} catch (IOException | OpenIdConnectRESTHelperException e) {
log.error("I/O Error POSTing JSON to: " + eventEndpoint, e);
}
if (!OK) {
if (retryings <= MAX_RETRYINGS) {
log.info("Retrying to send event in {} ms", actualPause);
Thread.sleep(actualPause);
log.debug("Start retrying event publish: {}", event.getName());
actualPause *= PAUSE_INCREMENT_FACTOR;
retryings += 1;
} else {
log.error("[{}] Event publish ERROR, exhausted tries after {} retryings",
httpResponseCode,
retryings);
break;
}
}
} while (!OK);
} catch (InterruptedException e) {
log.error("Sleeping before retry event send", e);
}
}
private boolean shouldRetryWithCode(int httpResultCode) {
return Arrays.binarySearch(RETRY_CODES, httpResultCode) > 0;
}
public String getResult() {
return result;
}
}
public class HTTPGet extends HTTPVerb {
private String id;
public HTTPGet(URL baseEndpoint, String id) {
super(baseEndpoint);
this.id = id;
}
public JSONObject readJSON() {
URL endpoint = null;
JSONObject results = null;
try {
String baseEndpointString = baseEndpoint.toString();
if (baseEndpointString.endsWith("/")) {
endpoint = new URL(baseEndpointString + id);
} else {
endpoint = new URL(baseEndpointString + "/" + id);
}
} catch (MalformedURLException e) {
log.error("Cannot compute retrieve endpoint URL. ID: " + id + ", base endpoint: " + baseEndpointURL, e);
return null;
}
try {
log.debug("Getting auth token for client '{}' if needed", clientId);
JWTToken token = getAuthorizationToken();
log.debug("Performing HTTP GET to: {}", endpoint);
HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(connectionTimeout);
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
connection.setReadTimeout(readTimeout);
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
if (token != null) {
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
} else {
log.debug("Sending request without authorization header");
}
StringBuilder sb = new StringBuilder();
httpResponseCode = connection.getResponseCode();
log.trace("HTTP Response code: {}", httpResponseCode);
httpContentType = connection.getContentType();
log.trace("HTTP Response content type is: {}", httpContentType);
log.trace("Reading response");
InputStreamReader isr = null;
if (httpResponseCode == HttpURLConnection.HTTP_OK) {
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
} else {
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
}
BufferedReader br = new BufferedReader(isr);
String line = null;
while ((line = br.readLine()) != null) {
sb.append(line + "\n");
}
br.close();
isr.close();
if (httpResponseCode == HttpURLConnection.HTTP_OK) {
log.debug("[{}] Got results for {}", httpResponseCode, id);
if (httpContentType.equals("application/json")) {
try {
results = (JSONObject) new JSONParser().parse(sb.toString());
} catch (ParseException e) {
log.warn("Error parsing results string as JSON: {}", sb.toString());
}
} else {
log.warn("Received a non JSON reponse: {}", sb.toString());
}
} else {
log.warn("[{}] Error getting results for ID {}", httpResponseCode, id);
}
} catch (IOException | OpenIdConnectRESTHelperException e) {
log.error("Getting results from: " + endpoint, e);
}
return results;
}
}
}

View File

@ -12,32 +12,32 @@ public class ConductorResultsParser implements ResultsParser {
}
@Override
public EventStatus parseResults(JSONObject results) {
public EventStatus parseResults(String uuid, JSONObject results) {
EventStatus eventStatus = null;
if (results != null) {
JSONObject input = (JSONObject) results.get("input");
JSONObject output = (JSONObject) results.get("output");
switch ((String) results.get("status")) {
case "COMPLETED":
eventStatus = EventStatus.COMPLETED(input, output);
eventStatus = EventStatus.COMPLETED(uuid, input, output);
break;
case "FAILED":
eventStatus = EventStatus.FAILED(input, output);
eventStatus = EventStatus.FAILED(uuid, input, output);
break;
case "NOT_FOUND":
eventStatus = EventStatus.NOT_FOUND();
eventStatus = EventStatus.NOT_FOUND(uuid);
break;
case "PAUSED":
eventStatus = EventStatus.PAUSED(input);
eventStatus = EventStatus.PAUSED(uuid, input);
break;
case "RUNNING":
eventStatus = EventStatus.RUNNING(input);
eventStatus = EventStatus.RUNNING(uuid, input);
break;
case "TERMINATED":
eventStatus = EventStatus.TERMINATED(input, output);
eventStatus = EventStatus.TERMINATED(uuid, input, output);
break;
default:
eventStatus = EventStatus.NOT_FOUND();
eventStatus = EventStatus.NOT_FOUND(uuid);
}
} else {
logger.warn("Nothing to parse since JSON object is null");

View File

@ -2,10 +2,76 @@ package org.gcube.event.publisher;
public interface EventPublisher {
/**
* Publish a new event and nothing more. The sender is not interested to the success/failure of the send.
* The results on the workflow engine, is the start of an instance of the workflow identified by the {@link Event#getName()} string.
* @param event the event to be published
*/
void publish(Event event);
/**
* Publishes a new event and optionally wait for the result.
* If <code>waitForResult</code> parameter is <code>false</code> the behavior is the same of the {@link #publish(Event)} method,
* if true, the workflow id is returned as string if the publish had success.
* @param event the vent to be published
* @param waitForResult if the sender is interested or not to the resulting workflow id
* @return the resulting workflow id
*/
String publish(Event event, boolean waitForResult);
/**
* Tells if the last publish was a success or not.
* @return <code>true</code> if the publish was OK, <code>false</code> otherwise
*/
boolean isLastPublishOK();
/**
* Returns the last returned HTTP response code of a publish. E.g. 200 if the send was OK or 404 if the event doesn't have a corresponding workflow definition.
* @return the HTTP response code of the last publish or -1 if an error occurred before the call (e.g. during the authorization or connection)
*/
int getLastPublishEventHTTPResponseCode();
/**
* Publish an event and immediately checks for the results status.
* The behavior is the same of the {@link #publishAndCheck(Event, int)} with <code>delayMS</code> argument less or equal to 0.
* @param event the event to be published
* @return an object with info about the event's running status
*/
EventStatus publishAndCheck(Event event);
/**
* Publish an event and checks for the results status after a delay.
* The behavior is the same of the {@link #publishAndCheck(Event)} if delayMS argument is less or equal to 0.
* @param event the event to be published
* @param delayMS the delay betwen the publish and the query calls
* @return an object with info about the event's running status
*/
EventStatus publishAndCheck(Event event, int delayMS);
/**
* Checks for the workflow results status.
* @param instanceId the workflow instance id, resulting of the {@link #publish(Event, boolean)} with <code>waitForResult</code> as true.
* @return an object with info about the event's running status
*/
EventStatus check(String instanceId);
/**
* Refreshes an event status by checking for the status of the workflow execution represented by the {@link EventStatus#getInstanceId()} string.
* @param eventStatus a previously obtained event status.
* @return an object with new info about the event's running status
*/
EventStatus refresh(EventStatus eventStatus);
/**
* Tells if the last check was a success or not.
* @return <code>true</code> if the publish was OK, <code>false</code> otherwise
*/
boolean isLastCheckOK();
/**
* Returns the last returned HTTP response code of a check. E.g. 200 if the send was OK or 404 if the event doesn't have a corresponding workflow instance.
* @return the HTTP response code of the last publish or -1 if an error occurred before the call (e.g. during the authorization or connection)
*/
int getLastCheckHTTPResponseCode();
}

View File

@ -4,10 +4,36 @@ import org.json.simple.JSONObject;
public interface EventSender {
/**
* Sends an event.
* @param event the event to send
*/
void send(Event event);
/**
* Return the last send HTTP response code.
* @return the HTTP response code or -1 if an error occurred before the call (e.g. during the authorization or connection)
*/
int getLastSendHTTPResponseCode();
/**
* Send an event and get results.
* @param event the event to send
* @return the result of the call
*/
String sendAndGetResult(Event event);
/**
* Retrieves the instance of the provided workflow instance.
* @param id the workflow instance id
* @return
*/
JSONObject retrive(String id);
/**
* Return the last retrieve HTTP response code.
* @return the HTTP response code or -1 if an error occurred before the call (e.g. during the authorization or connection)
*/
int getLastRetrieveHTTPResponseCode();
}

View File

@ -8,52 +8,62 @@ public class EventStatus {
RUNNING, COMPLETED, FAILED, TIMED_OUT, TERMINATED, PAUSED, NOT_FOUND;
}
public static EventStatus RUNNING(JSONObject input) {
return new EventStatus(Status.RUNNING, input);
public static EventStatus RUNNING(String instanceId, JSONObject input) {
return new EventStatus(instanceId, Status.RUNNING, input);
}
public static EventStatus COMPLETED(JSONObject input, JSONObject output) {
return new EventStatus(Status.COMPLETED, input, output);
public static EventStatus COMPLETED(String instanceId, JSONObject input, JSONObject output) {
return new EventStatus(instanceId, Status.COMPLETED, input, output);
}
public static EventStatus FAILED(JSONObject input, JSONObject output) {
return new EventStatus(Status.FAILED, input, output);
public static EventStatus FAILED(String instanceId, JSONObject input, JSONObject output) {
return new EventStatus(instanceId, Status.FAILED, input, output);
}
public static EventStatus TIMED_OUT(JSONObject input) {
return new EventStatus(Status.TIMED_OUT, input);
public static EventStatus TIMED_OUT(String instanceId, JSONObject input) {
return new EventStatus(instanceId, Status.TIMED_OUT, input);
}
public static EventStatus TERMINATED(JSONObject input, JSONObject output) {
return new EventStatus(Status.TERMINATED, input, output);
public static EventStatus TERMINATED(String instanceId, JSONObject input, JSONObject output) {
return new EventStatus(instanceId, Status.TERMINATED, input, output);
}
public static EventStatus PAUSED(JSONObject input) {
return new EventStatus(Status.PAUSED, input);
public static EventStatus PAUSED(String instanceId, JSONObject input) {
return new EventStatus(instanceId, Status.PAUSED, input);
}
public static EventStatus NOT_FOUND() {
return new EventStatus(Status.NOT_FOUND);
public static EventStatus NOT_FOUND(String instanceId) {
return new EventStatus(instanceId, Status.NOT_FOUND);
}
private String instanceId;
private Status status;
private JSONObject input;
private JSONObject output;
private EventStatus(Status status) {
this(status, null);
private EventStatus(String uuid, Status status) {
this(uuid, status, null);
}
private EventStatus(Status status, JSONObject input) {
this(status, input, null);
private EventStatus(String instanceId, Status status, JSONObject input) {
this(instanceId, status, input, null);
}
private EventStatus(Status status, JSONObject input, JSONObject output) {
private EventStatus(String instanceId, Status status, JSONObject input, JSONObject output) {
setInstanceId(instanceId);
setStatus(status);
setInput(input);
setOutput(output);
}
public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}
public String getInstanceId() {
return instanceId;
}
public void setStatus(Status status) {
this.status = status;
}

View File

@ -1,67 +1,15 @@
package org.gcube.event.publisher;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import org.gcube.oidc.rest.JWTToken;
import org.gcube.oidc.rest.OpenIdConnectRESTHelper;
import org.gcube.oidc.rest.OpenIdConnectRESTHelperException;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTPWithOIDCAuthEventSender implements EventSender {
protected static final Logger log = LoggerFactory.getLogger(HTTPWithOIDCAuthEventSender.class);
private URL baseEndpointURL;
private String clientId;
private String clientSecret;
private URL tokenURL;
public class HTTPWithOIDCAuthEventSender extends AbstractHTTPWithJWTTokenAuthEventSender implements EventSender {
public HTTPWithOIDCAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret, URL tokenURL) {
this.baseEndpointURL = baseEndpointURL;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.tokenURL = tokenURL;
}
@Override
public void send(Event event) {
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
new Thread(new HTTPPost(baseEndpointURL, event)).start();
}
@Override
public String sendAndGetResult(Event event) {
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
HTTPPost post = new HTTPPost(baseEndpointURL, event);
Thread postThread = new Thread(post);
postThread.start();
try {
postThread.join();
return post.getResult();
} catch (InterruptedException e) {
log.error("While waiting for HTTP Post thread termination", e);
return null;
}
}
@Override
public JSONObject retrive(String id) {
return new HTTPGet(baseEndpointURL, id).readJSON();
}
protected URL getTokenURL() {
return tokenURL;
super(baseEndpointURL, clientId, clientSecret, tokenURL);
}
protected JWTToken getAuthorizationToken() throws OpenIdConnectRESTHelperException {
@ -74,216 +22,4 @@ public class HTTPWithOIDCAuthEventSender implements EventSender {
}
}
public abstract class HTTPVerb {
protected static final int CONNECTION_TIMEOUT = 10000;
protected static final int READ_TIMEOUT = 5000;
protected URL baseEndpoint;
public HTTPVerb(URL baseEndpoint) {
this.baseEndpoint = baseEndpoint;
}
}
public class HTTPPost extends HTTPVerb implements Runnable {
private static final int PAUSE_INCREMENT_FACTOR = 2;
private static final long MAX_RETRYINGS = 2;
private final int[] RETRY_CODES = { HttpURLConnection.HTTP_BAD_GATEWAY,
HttpURLConnection.HTTP_CLIENT_TIMEOUT, HttpURLConnection.HTTP_GATEWAY_TIMEOUT,
HttpURLConnection.HTTP_INTERNAL_ERROR };
private Event event;
private String result;
private long actualPause = 1;
private long retryings = 0;
public HTTPPost(URL baseEndpoint, Event event) {
super(baseEndpoint);
this.event = event;
}
@Override
public void run() {
try {
URL eventEndpoint = null;
try {
eventEndpoint = new URL(baseEndpointURL, event.getName());
} catch (MalformedURLException e) {
log.error("Cannot compute event endpoint URL. Event name: " + event.getName() + ", base endpoint: "
+ baseEndpointURL, e);
return;
}
boolean OK = false;
do {
try {
log.debug("Getting auth token for client '{}' if needed", clientId);
JWTToken token = getAuthorizationToken();
log.debug("Performing HTTP POST to: {}", baseEndpoint);
HttpURLConnection connection = (HttpURLConnection) eventEndpoint.openConnection();
connection.setRequestMethod("POST");
connection.setConnectTimeout(CONNECTION_TIMEOUT);
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
connection.setReadTimeout(READ_TIMEOUT);
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
connection.setRequestProperty("Content-Type", "application/json");
// Commented out as per the Conductor issue: https://github.com/Netflix/conductor/issues/376
// connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);
if (token != null) {
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
} else {
log.debug("Sending request without authorization header");
}
OutputStream os = connection.getOutputStream();
String jsonString = event.toJSONString();
log.trace("Sending event JSON: {}", jsonString);
os.write(jsonString.getBytes("UTF-8"));
os.flush();
os.close();
StringBuilder sb = new StringBuilder();
int httpResultCode = connection.getResponseCode();
log.trace("HTTP Response code: {}", httpResultCode);
log.trace("Reading response");
InputStreamReader isr = null;
if (httpResultCode == HttpURLConnection.HTTP_OK) {
OK = true;
// From this point ahead every exception should not set OK to false
// since the server acknowledged with 200
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
} else {
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
}
BufferedReader br = new BufferedReader(isr);
String line = null;
while ((line = br.readLine()) != null) {
sb.append("\n" + line);
}
br.close();
isr.close();
sb.deleteCharAt(0);
result = sb.toString();
if (OK) {
log.debug("[{}] Event publish for {} is OK", httpResultCode, event.getName());
} else {
log.trace("Response message from server:\n{}", result);
if (shouldRetryWithCode(httpResultCode)) {
if (retryings <= MAX_RETRYINGS) {
log.warn("[{}] Event publish ERROR, retrying in {} seconds", httpResultCode,
actualPause);
Thread.sleep(actualPause * 1000);
log.debug("Start retrying event publish: {}", event.getName());
actualPause *= PAUSE_INCREMENT_FACTOR;
retryings += 1;
} else {
log.error("[{}] Event publish ERROR, exhausted tries after {} retryings",
httpResultCode,
retryings);
break;
}
} else {
log.info("[{}] Event publish ERROR but should not retry with this HTTP code",
httpResultCode);
break;
}
}
} catch (IOException | OpenIdConnectRESTHelperException e) {
log.error("POSTing JSON to: " + eventEndpoint, e);
}
} while (!OK);
} catch (InterruptedException e) {
log.error("Sleeping before retry event send", e);
}
}
private boolean shouldRetryWithCode(int httpResultCode) {
return Arrays.binarySearch(RETRY_CODES, httpResultCode) > 0;
}
public String getResult() {
return result;
}
}
public class HTTPGet extends HTTPVerb {
private String id;
public HTTPGet(URL baseEndpoint, String id) {
super(baseEndpoint);
this.id = id;
}
public JSONObject readJSON() {
URL endpoint = null;
JSONObject results = null;
try {
endpoint = new URL(baseEndpoint, id);
} catch (MalformedURLException e) {
log.error("Cannot compute retrieve endpoint URL. ID: " + id + ", base endpoint: " + baseEndpointURL, e);
return null;
}
try {
log.debug("Getting auth token for client '{}' if needed", clientId);
JWTToken token = getAuthorizationToken();
log.debug("Performing HTTP GET to: {}", endpoint);
HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(CONNECTION_TIMEOUT);
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
connection.setReadTimeout(READ_TIMEOUT);
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
if (token != null) {
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
} else {
log.debug("Sending request without authorization header");
}
StringBuilder sb = new StringBuilder();
int httpResultCode = connection.getResponseCode();
log.trace("HTTP Response code: {}", httpResultCode);
log.trace("Reading response");
InputStreamReader isr = null;
if (httpResultCode == HttpURLConnection.HTTP_OK) {
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
} else {
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
}
BufferedReader br = new BufferedReader(isr);
String line = null;
while ((line = br.readLine()) != null) {
sb.append(line + "\n");
}
br.close();
isr.close();
if (httpResultCode == HttpURLConnection.HTTP_OK) {
log.debug("[{}] Got results for {}", httpResultCode, id);
try {
results = (JSONObject) new JSONParser().parse(sb.toString());
} catch (ParseException e) {
log.warn("Error parsing results string as JSON: {}", sb.toString());
}
} else {
log.warn("[{}] Error getting results for ID {}", httpResultCode, id);
}
} catch (IOException | OpenIdConnectRESTHelperException e) {
log.error("Getting results from: " + endpoint, e);
}
return results;
}
}
}

View File

@ -14,10 +14,10 @@ public class HTTPWithUMAAuthEventSender extends HTTPWithOIDCAuthEventSender {
private String umaAudience;
public HTTPWithUMAAuthEventSender(URL baseEnndpointURL, String clientId, String clientSecret, URL tokenURL,
public HTTPWithUMAAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret, URL tokenURL,
String umaAudience) {
super(baseEnndpointURL, clientId, clientSecret, tokenURL);
super(baseEndpointURL, clientId, clientSecret, tokenURL);
this.umaAudience = umaAudience;
}

View File

@ -4,6 +4,6 @@ import org.json.simple.JSONObject;
public interface ResultsParser {
EventStatus parseResults(JSONObject results);
EventStatus parseResults(String uuid, JSONObject results);
}

View File

@ -0,0 +1,80 @@
package org.gcube.event.publisher;
import org.json.simple.JSONObject;
public class StartWorkflow extends JSONObject {
private static final long serialVersionUID = -6974427433855594349L;
// Name of the Workflow. MUST be registered with Conductor before starting workflow
public static final String NAME_ENTRY = "name";
// Workflow version defaults to latest available version
private static final String VERSION_ENTRY = "version";
// JSON object with key value params, that can be used by downstream tasks See Wiring Inputs and Outputs for details
private static final String INPUT_ENTRY = "input";
// Unique Id that correlates multiple Workflow executions optional
private static final String CORRELATION_ID_ENTRY = "correlationId";
// See Task Domains for more information. optional
private static final String TASK_TO_DOMAIN_ENTRY = "taskToDomain";
// An adhoc Workflow Definition to run, without registering. See Dynamic Workflows. optional
private static final String WORKFLOW_DEF_ENTRY = "workflowDef";
// This is taken care of by Java client. See External Payload Storage for more info. optional
private static final String EXTERNAL_INPUT_PAYLOAD_STORAGE_PATH_ENTRY = "externalInputPayloadStoragePath";
// Priority level for the tasks within this workflow execution. Possible values are between 0 - 99
private static final String PRIORITY_ENTRY = "priority";
public StartWorkflow(Event event) {
this(event.getName(), event);
}
public StartWorkflow(String name, JSONObject input) {
super();
setName(name);
setInput(input);
}
@SuppressWarnings("unchecked")
protected void set(String key, Object value) {
put(key, value);
}
public void setName(String name) {
set(NAME_ENTRY, name);
}
public void setVersion(Integer version) {
set(VERSION_ENTRY, version);
}
public void setInput(JSONObject input) {
set(INPUT_ENTRY, input);
}
public void setCorrelationId(String correlationId) {
set(CORRELATION_ID_ENTRY, correlationId);
}
public void setTaskToDomain(String taskToDomain) {
set(TASK_TO_DOMAIN_ENTRY, taskToDomain);
}
public void setWorkflowDef(JSONObject workflowDef) {
set(WORKFLOW_DEF_ENTRY, workflowDef);
}
public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) {
set(EXTERNAL_INPUT_PAYLOAD_STORAGE_PATH_ENTRY, externalInputPayloadStoragePath);
}
public void setPriority(Integer prioriry) {
set(PRIORITY_ENTRY, prioriry);
}
}

View File

@ -3,8 +3,13 @@ package org.gcube.event.publisher;
import java.net.MalformedURLException;
import java.net.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTPEventSenderTest {
protected static final Logger logger = LoggerFactory.getLogger(HTTPEventSenderTest.class);
public HTTPEventSenderTest() {
}
@ -14,7 +19,8 @@ public class HTTPEventSenderTest {
protected EventSender createEventSender() {
try {
return new HTTPWithOIDCAuthEventSender(
new URL("https://conductor.dev.d4science.org/api/workflow/"), "lr62_portal",
new URL("https://conductor.cloud-dev.d4science.org/api/workflow/"),
"lr62_portal",
"6937125d-6d70-404a-9d63-908c1e6415c4",
new URL("https://accounts.dev.d4science.org/auth/realms/d4science/protocol/openid-connect/token"));
} catch (MalformedURLException e) {
@ -24,9 +30,31 @@ public class HTTPEventSenderTest {
}
};
System.out.println("Published: " + publisher.publish(new Event("startup", "portal", "gcube"), true));
EventStatus status = publisher.check("93675866-c209-4584-8576-ec641df63e9a");
System.out.println(status);
String id = publisher.publish(new Event("test", "test", "test"), true);
logger.info("*** Published: {}", id);
logger.info("*** Publish was {} and HTTP response status is: {}",
publisher.isLastPublishOK() ? "OK" : "KO", publisher.getLastPublishEventHTTPResponseCode());
EventStatus status = null;
do {
status = publisher.check(id);
logger.trace("*** Check was {} and HTTP response status is: {}", publisher.isLastCheckOK() ? "OK" : "KO",
publisher.getLastCheckHTTPResponseCode());
} while (status.getStatus() != EventStatus.Status.COMPLETED && status.getStatus() != EventStatus.Status.FAILED);
logger.info("*** Final status is: {}", status.toString());
status = publisher.publishAndCheck(new Event("test", "test", "test"), 1000);
logger.info("*** Publish was {} and HTTP response status is: {}",
publisher.isLastPublishOK() ? "OK" : "KO", publisher.getLastPublishEventHTTPResponseCode());
do {
status = publisher.refresh(status);
logger.trace("*** Check was {} and HTTP response status is: {}", publisher.isLastCheckOK() ? "OK" : "KO",
publisher.getLastCheckHTTPResponseCode());
} while (status.getStatus() != EventStatus.Status.COMPLETED && status.getStatus() != EventStatus.Status.FAILED);
logger.info("*** Final status is: {}", status.toString());
}
}