Compare commits
No commits in common. "master" and "r5.0.0" have entirely different histories.
45
.classpath
45
.classpath
|
@ -1,10 +1,27 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<classpath>
|
<classpath>
|
||||||
|
<classpathentry kind="src" output="target/classes" path="src/main/java">
|
||||||
|
<attributes>
|
||||||
|
<attribute name="optional" value="true"/>
|
||||||
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
|
</attributes>
|
||||||
|
</classpathentry>
|
||||||
|
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
|
||||||
|
<attributes>
|
||||||
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
|
</attributes>
|
||||||
|
</classpathentry>
|
||||||
|
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
|
||||||
|
<attributes>
|
||||||
|
<attribute name="optional" value="true"/>
|
||||||
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
|
<attribute name="test" value="true"/>
|
||||||
|
</attributes>
|
||||||
|
</classpathentry>
|
||||||
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
|
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
|
||||||
<attributes>
|
<attributes>
|
||||||
<attribute name="test" value="true"/>
|
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
<attribute name="optional" value="true"/>
|
<attribute name="test" value="true"/>
|
||||||
</attributes>
|
</attributes>
|
||||||
</classpathentry>
|
</classpathentry>
|
||||||
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
|
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
|
||||||
|
@ -13,29 +30,15 @@
|
||||||
<attribute name="org.eclipse.jst.component.nondependency" value=""/>
|
<attribute name="org.eclipse.jst.component.nondependency" value=""/>
|
||||||
</attributes>
|
</attributes>
|
||||||
</classpathentry>
|
</classpathentry>
|
||||||
|
<classpathentry kind="con" path="org.eclipse.jst.server.core.container/com.liferay.ide.eclipse.server.tomcat.runtimeClasspathProvider/Liferay v6.2 (Tomcat 7)">
|
||||||
|
<attributes>
|
||||||
|
<attribute name="owner.project.facets" value="jst.utility"/>
|
||||||
|
</attributes>
|
||||||
|
</classpathentry>
|
||||||
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
|
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
|
||||||
<attributes>
|
<attributes>
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
</attributes>
|
</attributes>
|
||||||
</classpathentry>
|
</classpathentry>
|
||||||
<classpathentry kind="src" output="target/classes" path="src/main/java">
|
|
||||||
<attributes>
|
|
||||||
<attribute name="optional" value="true"/>
|
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
|
||||||
</attributes>
|
|
||||||
</classpathentry>
|
|
||||||
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
|
|
||||||
<attributes>
|
|
||||||
<attribute name="test" value="true"/>
|
|
||||||
<attribute name="optional" value="true"/>
|
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
|
||||||
</attributes>
|
|
||||||
</classpathentry>
|
|
||||||
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
|
|
||||||
<attributes>
|
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
|
||||||
<attribute name="optional" value="true"/>
|
|
||||||
</attributes>
|
|
||||||
</classpathentry>
|
|
||||||
<classpathentry kind="output" path="target/classes"/>
|
<classpathentry kind="output" path="target/classes"/>
|
||||||
</classpath>
|
</classpath>
|
||||||
|
|
|
@ -1,16 +1,11 @@
|
||||||
eclipse.preferences.version=1
|
eclipse.preferences.version=1
|
||||||
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
|
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
|
||||||
org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
|
|
||||||
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
|
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
|
||||||
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
|
|
||||||
org.eclipse.jdt.core.compiler.compliance=1.8
|
org.eclipse.jdt.core.compiler.compliance=1.8
|
||||||
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
|
|
||||||
org.eclipse.jdt.core.compiler.debug.localVariable=generate
|
|
||||||
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
|
|
||||||
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
|
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
|
||||||
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
|
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
|
||||||
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
|
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
|
||||||
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
|
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
|
||||||
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=warning
|
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
|
||||||
org.eclipse.jdt.core.compiler.release=disabled
|
org.eclipse.jdt.core.compiler.release=disabled
|
||||||
org.eclipse.jdt.core.compiler.source=1.8
|
org.eclipse.jdt.core.compiler.source=1.8
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<faceted-project>
|
<faceted-project>
|
||||||
<installed facet="jst.utility" version="1.0"/>
|
<runtime name="Liferay v6.2 (Tomcat 7)"/>
|
||||||
<installed facet="java" version="1.8"/>
|
<installed facet="java" version="1.8"/>
|
||||||
|
<installed facet="jst.utility" version="1.0"/>
|
||||||
</faceted-project>
|
</faceted-project>
|
||||||
|
|
15
CHANGELOG.md
15
CHANGELOG.md
|
@ -1,17 +1,5 @@
|
||||||
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
|
||||||
|
|
||||||
# Changelog for "event-publisher-library"
|
# Changelog for "event-publisher-library"
|
||||||
|
|
||||||
## [v1.2.0]
|
|
||||||
- Restored correct behavior for event publishing with workflow id only sent back, setting input payload in JSON
|
|
||||||
- Extracted `AbstractHTTPWithJWTTokenAuthEventSender` class for easy subclassing
|
|
||||||
- Added new outcome check methods to inspect last send and last check actions results and some other facilities
|
|
||||||
- Better handling of exceptions and retrying behavior in case of read timeout. (Default connection timeout is 10s and read timeout now is 60s)
|
|
||||||
- Moved from `maven-portal-bom` to `gcube-bom`
|
|
||||||
|
|
||||||
## [v1.1.0]
|
|
||||||
- Added `BufferedEventProcessor` to manual send bunch of events and controlling their output status (#23628)
|
|
||||||
|
|
||||||
## [v1.0.2]
|
## [v1.0.2]
|
||||||
- Refactored/extracted event sender that uses OIDC token only and extended it for the UMA version.
|
- Refactored/extracted event sender that uses OIDC token only and extended it for the UMA version.
|
||||||
|
|
||||||
|
@ -20,3 +8,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||||
|
|
||||||
## [v1.0.0-SNAPSHOT]
|
## [v1.0.0-SNAPSHOT]
|
||||||
- First release (#19461)
|
- First release (#19461)
|
||||||
|
|
||||||
|
|
||||||
|
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
65
pom.xml
65
pom.xml
|
@ -1,74 +1,57 @@
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<parent>
|
<parent>
|
||||||
<artifactId>maven-parent</artifactId>
|
<artifactId>maven-parent</artifactId>
|
||||||
<groupId>org.gcube.tools</groupId>
|
<groupId>org.gcube.tools</groupId>
|
||||||
<version>1.2.0</version>
|
<version>1.1.0</version>
|
||||||
<relativePath />
|
<relativePath />
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<groupId>org.gcube.common</groupId>
|
<groupId>org.gcube.common</groupId>
|
||||||
<artifactId>event-publisher-library</artifactId>
|
<artifactId>event-publisher-library</artifactId>
|
||||||
<version>1.2.0</version>
|
<version>1.0.2</version>
|
||||||
<packaging>jar</packaging>
|
|
||||||
|
|
||||||
<dependencyManagement>
|
<dependencyManagement>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.gcube.distribution</groupId>
|
<groupId>org.gcube.distribution</groupId>
|
||||||
<artifactId>gcube-bom</artifactId>
|
<artifactId>maven-portal-bom</artifactId>
|
||||||
<version>2.4.0</version>
|
<version>3.6.0</version>
|
||||||
<type>pom</type>
|
<type>pom</type>
|
||||||
<scope>import</scope>
|
<scope>import</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
|
|
||||||
<scm>
|
<scm>
|
||||||
<connection>scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git</connection>
|
<connection>scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git</connection>
|
||||||
<developerConnection>scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git</developerConnection>
|
<developerConnection>scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git</developerConnection>
|
||||||
<url>https://code-repo.d4science.org/gCubeSystem/${project.artifactId}</url>
|
<url>https://code-repo.d4science.org/gCubeSystem/${project.artifactId}</url>
|
||||||
</scm>
|
</scm>
|
||||||
|
|
||||||
<properties>
|
|
||||||
<java.version>8</java.version>
|
|
||||||
<maven.compiler.source>${java.version}</maven.compiler.source>
|
|
||||||
<maven.compiler.target>${java.version}</maven.compiler.target>
|
|
||||||
<oidc-library.version>[1.2.0-SNAPSHOT,)</oidc-library.version>
|
|
||||||
<slf4j-log4j12.version>1.6.4</slf4j-log4j12.version>
|
|
||||||
<log4j.version>1.2.16</log4j.version>
|
|
||||||
</properties>
|
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
<!-- <plugin>-->
|
<plugin>
|
||||||
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
<!-- <configuration>-->
|
<configuration>
|
||||||
<!-- <encoding>UTF-8</encoding>-->
|
<encoding>UTF-8</encoding>
|
||||||
<!-- <source>${maven.compiler.source}</source>-->
|
<source>${maven.compiler.source}</source>
|
||||||
<!-- <target>${maven.compiler.target}</target>-->
|
<target>${maven.compiler.target}</target>
|
||||||
<!-- </configuration>-->
|
</configuration>
|
||||||
<!-- </plugin>-->
|
</plugin>
|
||||||
<!-- <plugin>-->
|
<plugin>
|
||||||
<!-- <artifactId>maven-resources-plugin</artifactId>-->
|
<artifactId>maven-resources-plugin</artifactId>
|
||||||
<!-- <configuration>-->
|
<configuration>
|
||||||
<!-- <encoding>UTF-8</encoding>-->
|
<encoding>UTF-8</encoding>
|
||||||
<!-- </configuration>-->
|
</configuration>
|
||||||
<!-- <version>2.5</version>-->
|
<version>2.5</version>
|
||||||
<!-- </plugin>-->
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
</build>
|
</build>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.gcube.common</groupId>
|
<groupId>org.gcube.common</groupId>
|
||||||
<artifactId>oidc-library</artifactId>
|
<artifactId>oidc-library</artifactId>
|
||||||
|
<version>[1.0.1-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
|
||||||
<scope>compile</scope>
|
<scope>compile</scope>
|
||||||
<version>${oidc-library.version}</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.slf4j</groupId>
|
<groupId>org.slf4j</groupId>
|
||||||
|
@ -83,15 +66,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.slf4j</groupId>
|
<groupId>org.slf4j</groupId>
|
||||||
<artifactId>slf4j-log4j12</artifactId>
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
<version>${slf4j-log4j12.version}</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>log4j</groupId>
|
|
||||||
<artifactId>log4j</artifactId>
|
|
||||||
<version>${log4j.version}</version>
|
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
|
@ -1,7 +1,5 @@
|
||||||
package org.gcube.event.publisher;
|
package org.gcube.event.publisher;
|
||||||
|
|
||||||
import java.net.HttpURLConnection;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -10,93 +8,22 @@ public abstract class AbstractEventPublisher implements EventPublisher {
|
||||||
protected static final Logger logger = LoggerFactory.getLogger(AbstractEventPublisher.class);
|
protected static final Logger logger = LoggerFactory.getLogger(AbstractEventPublisher.class);
|
||||||
|
|
||||||
private EventSender eventSender;
|
private EventSender eventSender;
|
||||||
private ResultsParser resultsParser;
|
|
||||||
|
|
||||||
public AbstractEventPublisher() {
|
public AbstractEventPublisher() {
|
||||||
this.eventSender = createEventSender();
|
this.eventSender = createEventSender();
|
||||||
this.resultsParser = createResultsParser();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void publish(Event event) {
|
public void publish(Event event) {
|
||||||
publish(event, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String publish(Event event, boolean waitForResult) {
|
|
||||||
if (event != null) {
|
if (event != null) {
|
||||||
if (waitForResult) {
|
eventSender.send(event);
|
||||||
return getEventSender().sendAndGetResult(event);
|
|
||||||
} else {
|
|
||||||
getEventSender().send(event);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Cannot publish a null event");
|
logger.warn("Cannot publish a null event");
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isLastPublishOK() {
|
|
||||||
return getLastPublishEventHTTPResponseCode() == HttpURLConnection.HTTP_OK
|
|
||||||
|| getLastPublishEventHTTPResponseCode() == HttpURLConnection.HTTP_CREATED;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getLastPublishEventHTTPResponseCode() {
|
|
||||||
return getEventSender().getLastSendHTTPResponseCode();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventStatus publishAndCheck(Event event) {
|
|
||||||
return publishAndCheck(event, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventStatus publishAndCheck(Event event, int delayMS) {
|
|
||||||
String instanceId = publish(event, true);
|
|
||||||
try {
|
|
||||||
if (delayMS > 0) {
|
|
||||||
Thread.sleep(delayMS);
|
|
||||||
}
|
|
||||||
return check(instanceId);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
logger.error("Sleeping before performing the event status check", e);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventStatus check(String instanceId) {
|
|
||||||
if (instanceId != null) {
|
|
||||||
return getResultsParser().parseResults(instanceId, eventSender.retrive(instanceId));
|
|
||||||
} else {
|
|
||||||
logger.warn("Cannot check with a null instance ID");
|
|
||||||
return EventStatus.NOT_FOUND(instanceId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventStatus refresh(EventStatus eventStatus) {
|
|
||||||
return check(eventStatus.getInstanceId());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isLastCheckOK() {
|
|
||||||
return getLastCheckHTTPResponseCode() == 200;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getLastCheckHTTPResponseCode() {
|
|
||||||
return getEventSender().getLastRetrieveHTTPResponseCode();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract EventSender createEventSender();
|
protected abstract EventSender createEventSender();
|
||||||
|
|
||||||
protected ResultsParser createResultsParser() {
|
|
||||||
return new ConductorResultsParser();
|
|
||||||
}
|
|
||||||
|
|
||||||
public EventSender getEventSender() {
|
public EventSender getEventSender() {
|
||||||
return eventSender;
|
return eventSender;
|
||||||
}
|
}
|
||||||
|
@ -105,12 +32,4 @@ public abstract class AbstractEventPublisher implements EventPublisher {
|
||||||
this.eventSender = eventSender;
|
this.eventSender = eventSender;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResultsParser getResultsParser() {
|
|
||||||
return resultsParser;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setResultsParser(ResultsParser resultsParser) {
|
|
||||||
this.resultsParser = resultsParser;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,365 +0,0 @@
|
||||||
package org.gcube.event.publisher;
|
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.HttpURLConnection;
|
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.net.SocketTimeoutException;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
import org.gcube.oidc.rest.JWTToken;
|
|
||||||
import org.gcube.oidc.rest.OpenIdConnectRESTHelperException;
|
|
||||||
import org.json.simple.JSONObject;
|
|
||||||
import org.json.simple.parser.JSONParser;
|
|
||||||
import org.json.simple.parser.ParseException;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSender {
|
|
||||||
|
|
||||||
protected static final Logger log = LoggerFactory.getLogger(HTTPWithOIDCAuthEventSender.class);
|
|
||||||
protected URL baseEndpointURL;
|
|
||||||
protected String clientId;
|
|
||||||
protected String clientSecret;
|
|
||||||
protected URL tokenURL;
|
|
||||||
protected int connectionTimeout;
|
|
||||||
protected int readTimeout;
|
|
||||||
protected HTTPPost lastHTTPPost;
|
|
||||||
protected HTTPGet lastHTTPGet;
|
|
||||||
|
|
||||||
public AbstractHTTPWithJWTTokenAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret,
|
|
||||||
URL tokenURL) {
|
|
||||||
|
|
||||||
this(baseEndpointURL, clientId, clientSecret, tokenURL, HTTPVerb.DEFAULT_CONNECTION_TIMEOUT,
|
|
||||||
HTTPVerb.DEFAULT_READ_TIMEOUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
public AbstractHTTPWithJWTTokenAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret,
|
|
||||||
URL tokenURL, int connectionTimeout, int readTimeout) {
|
|
||||||
|
|
||||||
super();
|
|
||||||
this.baseEndpointURL = baseEndpointURL;
|
|
||||||
this.clientId = clientId;
|
|
||||||
this.clientSecret = clientSecret;
|
|
||||||
this.tokenURL = tokenURL;
|
|
||||||
this.connectionTimeout = connectionTimeout;
|
|
||||||
this.readTimeout = readTimeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getReadTimeout() {
|
|
||||||
return readTimeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setReadTimeout(int readTimeout) {
|
|
||||||
this.readTimeout = readTimeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getConnectionTimeout() {
|
|
||||||
return connectionTimeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setConnectionTimeout(int connectionTimeout) {
|
|
||||||
this.connectionTimeout = connectionTimeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(Event event) {
|
|
||||||
log.debug("Starting HTTP POST thread with base URL: {}", baseEndpointURL);
|
|
||||||
lastHTTPPost = new HTTPPost(baseEndpointURL, event);
|
|
||||||
new Thread(lastHTTPPost).start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getLastSendHTTPResponseCode() {
|
|
||||||
return lastHTTPPost != null ? lastHTTPPost.gethttpResponseCode() : -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String sendAndGetResult(Event event) {
|
|
||||||
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
|
|
||||||
lastHTTPPost = new HTTPPost(baseEndpointURL, event);
|
|
||||||
Thread postThread = new Thread(lastHTTPPost);
|
|
||||||
postThread.start();
|
|
||||||
try {
|
|
||||||
postThread.join();
|
|
||||||
return lastHTTPPost.getResult();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error("While waiting for HTTP Post thread termination", e);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public JSONObject retrive(String id) {
|
|
||||||
lastHTTPGet = new HTTPGet(baseEndpointURL, id);
|
|
||||||
return lastHTTPGet.readJSON();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getLastRetrieveHTTPResponseCode() {
|
|
||||||
return lastHTTPGet != null ? lastHTTPGet.gethttpResponseCode() : -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected URL getTokenURL() {
|
|
||||||
return tokenURL;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract JWTToken getAuthorizationToken() throws OpenIdConnectRESTHelperException;
|
|
||||||
|
|
||||||
public abstract class HTTPVerb {
|
|
||||||
|
|
||||||
protected static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
|
|
||||||
protected static final int DEFAULT_READ_TIMEOUT = 60000;
|
|
||||||
|
|
||||||
protected URL baseEndpoint;
|
|
||||||
protected int httpResponseCode = -1;
|
|
||||||
protected String httpContentType;
|
|
||||||
|
|
||||||
public HTTPVerb(URL baseEndpoint) {
|
|
||||||
this.baseEndpoint = baseEndpoint;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int gethttpResponseCode() {
|
|
||||||
return httpResponseCode;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public class HTTPPost extends HTTPVerb implements Runnable {
|
|
||||||
|
|
||||||
private static final int FIRST_PAUSE = 7500; // ms
|
|
||||||
private static final int PAUSE_INCREMENT_FACTOR = 4;
|
|
||||||
private static final long MAX_RETRYINGS = 5;
|
|
||||||
private static final boolean RETRY_ON_READ_TIMEOUT = false;
|
|
||||||
|
|
||||||
private final int[] RETRY_CODES = { HttpURLConnection.HTTP_BAD_GATEWAY,
|
|
||||||
HttpURLConnection.HTTP_CLIENT_TIMEOUT, HttpURLConnection.HTTP_GATEWAY_TIMEOUT,
|
|
||||||
HttpURLConnection.HTTP_INTERNAL_ERROR };
|
|
||||||
|
|
||||||
private Event event;
|
|
||||||
private String result;
|
|
||||||
private long actualPause = FIRST_PAUSE;
|
|
||||||
private long retryings = 0;
|
|
||||||
|
|
||||||
public HTTPPost(URL baseEndpoint, Event event) {
|
|
||||||
super(baseEndpoint);
|
|
||||||
this.event = event;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
URL eventEndpoint = null;
|
|
||||||
if (baseEndpoint.toString().endsWith("/")) {
|
|
||||||
try {
|
|
||||||
log.debug("Removing trailing slash from base URL since the endpoint computation API changed");
|
|
||||||
eventEndpoint = new URL(
|
|
||||||
baseEndpoint.toString().substring(0, baseEndpoint.toString().length() - 1));
|
|
||||||
} catch (MalformedURLException e) {
|
|
||||||
log.error("Cannot remove trailing slash from base endpoint URL", e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
eventEndpoint = baseEndpoint;
|
|
||||||
}
|
|
||||||
boolean OK = false;
|
|
||||||
do {
|
|
||||||
try {
|
|
||||||
log.debug("Getting auth token for client '{}' if needed", clientId);
|
|
||||||
JWTToken token = getAuthorizationToken();
|
|
||||||
log.debug("Performing HTTP POST to: {}", eventEndpoint);
|
|
||||||
HttpURLConnection connection = (HttpURLConnection) eventEndpoint.openConnection();
|
|
||||||
connection.setRequestMethod("POST");
|
|
||||||
connection.setConnectTimeout(connectionTimeout);
|
|
||||||
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
|
||||||
connection.setReadTimeout(readTimeout);
|
|
||||||
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
|
||||||
connection.setRequestProperty("Content-Type", "application/json");
|
|
||||||
connection.setRequestProperty("Accept", "text/plain");
|
|
||||||
connection.setDoOutput(true);
|
|
||||||
if (token != null) {
|
|
||||||
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
|
||||||
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
|
||||||
} else {
|
|
||||||
log.debug("Sending request without authorization header");
|
|
||||||
}
|
|
||||||
OutputStream os = connection.getOutputStream();
|
|
||||||
String jsonString = new StartWorkflow(event).toJSONString();
|
|
||||||
log.trace("Sending event JSON: {}", jsonString);
|
|
||||||
os.write(jsonString.getBytes("UTF-8"));
|
|
||||||
os.flush();
|
|
||||||
os.close();
|
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
httpResponseCode = connection.getResponseCode();
|
|
||||||
log.trace("HTTP Response code: {}", httpResponseCode);
|
|
||||||
httpContentType = connection.getContentType();
|
|
||||||
log.trace("HTTP Response content type is: {}", httpContentType);
|
|
||||||
|
|
||||||
log.trace("Reading response");
|
|
||||||
InputStreamReader isr = null;
|
|
||||||
if (httpResponseCode == HttpURLConnection.HTTP_OK) {
|
|
||||||
OK = true;
|
|
||||||
// From this point ahead every exception should not set OK to false
|
|
||||||
// since the server acknowledged with 200
|
|
||||||
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
|
||||||
} else {
|
|
||||||
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
BufferedReader br = new BufferedReader(isr);
|
|
||||||
String line = null;
|
|
||||||
while ((line = br.readLine()) != null) {
|
|
||||||
sb.append("\n" + line);
|
|
||||||
}
|
|
||||||
br.close();
|
|
||||||
isr.close();
|
|
||||||
sb.deleteCharAt(0);
|
|
||||||
result = sb.toString();
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Error reading response", e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (OK) {
|
|
||||||
log.debug("[{}] Event publish for {} is OK", httpResponseCode, event.getName());
|
|
||||||
} else {
|
|
||||||
log.trace("Response message from server:\n{}", result);
|
|
||||||
if (shouldRetryWithCode(httpResponseCode)) {
|
|
||||||
log.warn("[{}] Event publish ERROR", httpResponseCode);
|
|
||||||
} else {
|
|
||||||
log.info("[{}] Event publish ERROR but not retrying to send it with this HTTP code",
|
|
||||||
httpResponseCode);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (SocketTimeoutException e) {
|
|
||||||
log.error("Read timeout POSTing JSON to: " + eventEndpoint, e);
|
|
||||||
if (RETRY_ON_READ_TIMEOUT) {
|
|
||||||
log.trace("Event will be re-sent accodingly to retryes");
|
|
||||||
|
|
||||||
} else {
|
|
||||||
log.trace("Event will be not re-sent");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (IOException | OpenIdConnectRESTHelperException e) {
|
|
||||||
log.error("I/O Error POSTing JSON to: " + eventEndpoint, e);
|
|
||||||
}
|
|
||||||
if (!OK) {
|
|
||||||
if (retryings <= MAX_RETRYINGS) {
|
|
||||||
log.info("Retrying to send event in {} ms", actualPause);
|
|
||||||
|
|
||||||
Thread.sleep(actualPause);
|
|
||||||
log.debug("Start retrying event publish: {}", event.getName());
|
|
||||||
actualPause *= PAUSE_INCREMENT_FACTOR;
|
|
||||||
retryings += 1;
|
|
||||||
} else {
|
|
||||||
log.error("[{}] Event publish ERROR, exhausted tries after {} retryings",
|
|
||||||
httpResponseCode,
|
|
||||||
retryings);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} while (!OK);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error("Sleeping before retry event send", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean shouldRetryWithCode(int httpResultCode) {
|
|
||||||
return Arrays.binarySearch(RETRY_CODES, httpResultCode) > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getResult() {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public class HTTPGet extends HTTPVerb {
|
|
||||||
|
|
||||||
private String id;
|
|
||||||
|
|
||||||
public HTTPGet(URL baseEndpoint, String id) {
|
|
||||||
super(baseEndpoint);
|
|
||||||
this.id = id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public JSONObject readJSON() {
|
|
||||||
URL endpoint = null;
|
|
||||||
JSONObject results = null;
|
|
||||||
try {
|
|
||||||
String baseEndpointString = baseEndpoint.toString();
|
|
||||||
if (baseEndpointString.endsWith("/")) {
|
|
||||||
endpoint = new URL(baseEndpointString + id);
|
|
||||||
} else {
|
|
||||||
endpoint = new URL(baseEndpointString + "/" + id);
|
|
||||||
}
|
|
||||||
} catch (MalformedURLException e) {
|
|
||||||
log.error("Cannot compute retrieve endpoint URL. ID: " + id + ", base endpoint: " + baseEndpointURL, e);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
log.debug("Getting auth token for client '{}' if needed", clientId);
|
|
||||||
JWTToken token = getAuthorizationToken();
|
|
||||||
|
|
||||||
log.debug("Performing HTTP GET to: {}", endpoint);
|
|
||||||
HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection();
|
|
||||||
connection.setRequestMethod("GET");
|
|
||||||
connection.setConnectTimeout(connectionTimeout);
|
|
||||||
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
|
||||||
connection.setReadTimeout(readTimeout);
|
|
||||||
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
|
||||||
if (token != null) {
|
|
||||||
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
|
||||||
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
|
||||||
} else {
|
|
||||||
log.debug("Sending request without authorization header");
|
|
||||||
}
|
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
httpResponseCode = connection.getResponseCode();
|
|
||||||
log.trace("HTTP Response code: {}", httpResponseCode);
|
|
||||||
httpContentType = connection.getContentType();
|
|
||||||
log.trace("HTTP Response content type is: {}", httpContentType);
|
|
||||||
|
|
||||||
log.trace("Reading response");
|
|
||||||
InputStreamReader isr = null;
|
|
||||||
if (httpResponseCode == HttpURLConnection.HTTP_OK) {
|
|
||||||
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
|
||||||
} else {
|
|
||||||
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
|
||||||
}
|
|
||||||
BufferedReader br = new BufferedReader(isr);
|
|
||||||
String line = null;
|
|
||||||
while ((line = br.readLine()) != null) {
|
|
||||||
sb.append(line + "\n");
|
|
||||||
}
|
|
||||||
br.close();
|
|
||||||
isr.close();
|
|
||||||
if (httpResponseCode == HttpURLConnection.HTTP_OK) {
|
|
||||||
log.debug("[{}] Got results for {}", httpResponseCode, id);
|
|
||||||
if (httpContentType.equals("application/json")) {
|
|
||||||
try {
|
|
||||||
results = (JSONObject) new JSONParser().parse(sb.toString());
|
|
||||||
} catch (ParseException e) {
|
|
||||||
log.warn("Error parsing results string as JSON: {}", sb.toString());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.warn("Received a non JSON reponse: {}", sb.toString());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.warn("[{}] Error getting results for ID {}", httpResponseCode, id);
|
|
||||||
}
|
|
||||||
} catch (IOException | OpenIdConnectRESTHelperException e) {
|
|
||||||
log.error("Getting results from: " + endpoint, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,152 +0,0 @@
|
||||||
package org.gcube.event.publisher;
|
|
||||||
|
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
|
||||||
import java.util.function.Function;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class BufferedEventProcessor {
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(BufferedEventProcessor.class);
|
|
||||||
|
|
||||||
private static final int DEFAULT_BUFFER_SIZE = 10;
|
|
||||||
private static final int RESULT_POLLING_INTERVAL = 500;
|
|
||||||
|
|
||||||
private ArrayBlockingQueue<Event> eventsBuffer;
|
|
||||||
private Map<Runnable, Boolean> threadStatus;
|
|
||||||
private int count = 0;
|
|
||||||
private long min = 10000000000L;
|
|
||||||
private long max = 0;
|
|
||||||
private int errors = 0;
|
|
||||||
|
|
||||||
public BufferedEventProcessor(String conductorEndpoint, String clientId, String clientSecret,
|
|
||||||
String keycloakTokenEndpoint, Function<Event, String> logInfoGenerator)
|
|
||||||
throws EventProcessorException {
|
|
||||||
|
|
||||||
this(conductorEndpoint, clientId, clientSecret, keycloakTokenEndpoint, logInfoGenerator, DEFAULT_BUFFER_SIZE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public BufferedEventProcessor(String conductorEndpoint, String clientId, String clientSecret,
|
|
||||||
String keycloakTokenEndpoint, Function<Event, String> logInfoGenerator, int bufferSize)
|
|
||||||
throws EventProcessorException {
|
|
||||||
|
|
||||||
eventsBuffer = new ArrayBlockingQueue<>(bufferSize, true);
|
|
||||||
threadStatus = Collections.synchronizedMap(new HashMap<>(bufferSize));
|
|
||||||
final AbstractEventPublisher publisher = new AbstractEventPublisher() {
|
|
||||||
@Override
|
|
||||||
protected EventSender createEventSender() {
|
|
||||||
try {
|
|
||||||
return new HTTPWithOIDCAuthEventSender(new URL(conductorEndpoint), clientId, clientSecret,
|
|
||||||
new URL(keycloakTokenEndpoint));
|
|
||||||
|
|
||||||
} catch (MalformedURLException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if (publisher.getEventSender() == null) {
|
|
||||||
throw new EventProcessorException("Cannot create procrssor correctly");
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < bufferSize; i++) {
|
|
||||||
Thread t = new Thread(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
while (true) {
|
|
||||||
String log = null;
|
|
||||||
Event toBeSent;
|
|
||||||
try {
|
|
||||||
toBeSent = eventsBuffer.take();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// interrupted when waiting, nothing to do...
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
threadStatus.put(this, Boolean.TRUE);
|
|
||||||
logger.trace(" * Peeked event. Buffer size: {}", eventsBuffer.size());
|
|
||||||
log = logInfoGenerator.apply(toBeSent);
|
|
||||||
logger.debug(" > Sending event for: {}", log);
|
|
||||||
boolean sent = false;
|
|
||||||
String workflowId = null;
|
|
||||||
do {
|
|
||||||
try {
|
|
||||||
workflowId = publisher.publish(toBeSent, true);
|
|
||||||
sent = true;
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("Publishing event for {}. It will be re-published", log, e);
|
|
||||||
}
|
|
||||||
} while (!sent);
|
|
||||||
|
|
||||||
EventStatus eventStatus;
|
|
||||||
do {
|
|
||||||
eventStatus = null;
|
|
||||||
try {
|
|
||||||
Thread.sleep(RESULT_POLLING_INTERVAL);
|
|
||||||
eventStatus = publisher.check(workflowId);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("Checking status for the event for {}", log, e);
|
|
||||||
}
|
|
||||||
if (eventStatus == null || eventStatus.getStatus() == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} while (eventStatus.getStatus() != EventStatus.Status.COMPLETED
|
|
||||||
&& eventStatus.getStatus() != EventStatus.Status.FAILED);
|
|
||||||
|
|
||||||
long elapsed = System.currentTimeMillis() - start;
|
|
||||||
count = count + 1;
|
|
||||||
if (elapsed > max) {
|
|
||||||
max = elapsed;
|
|
||||||
}
|
|
||||||
if (elapsed < min) {
|
|
||||||
min = elapsed;
|
|
||||||
}
|
|
||||||
if (eventStatus.getStatus() == EventStatus.Status.FAILED) {
|
|
||||||
logger.warn(" - ({}) {} -> {} [{} ms]",
|
|
||||||
new Object[] { count, log, eventStatus.getStatus(), elapsed });
|
|
||||||
|
|
||||||
errors += 1;
|
|
||||||
} else {
|
|
||||||
logger.info(" - (" + count + ") " + log + " -> " + eventStatus.getStatus() + " ["
|
|
||||||
+ elapsed + " ms]");
|
|
||||||
}
|
|
||||||
threadStatus.put(this, Boolean.FALSE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
threadStatus.put(t, Boolean.FALSE);
|
|
||||||
t.start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void enqueueEvent(Event event) throws InterruptedException {
|
|
||||||
eventsBuffer.put(event);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Boolean allQueuedFinishedCorrectly() {
|
|
||||||
return eventsBuffer.size() > 0 || threadStatus.values().stream().anyMatch(v -> v.equals(Boolean.TRUE));
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getCount() {
|
|
||||||
return count;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getMin() {
|
|
||||||
return min;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getMax() {
|
|
||||||
return max;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getErrors() {
|
|
||||||
return errors;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,48 +0,0 @@
|
||||||
package org.gcube.event.publisher;
|
|
||||||
|
|
||||||
import org.json.simple.JSONObject;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class ConductorResultsParser implements ResultsParser {
|
|
||||||
|
|
||||||
protected static final Logger logger = LoggerFactory.getLogger(ConductorResultsParser.class);
|
|
||||||
|
|
||||||
public ConductorResultsParser() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventStatus parseResults(String uuid, JSONObject results) {
|
|
||||||
EventStatus eventStatus = null;
|
|
||||||
if (results != null) {
|
|
||||||
JSONObject input = (JSONObject) results.get("input");
|
|
||||||
JSONObject output = (JSONObject) results.get("output");
|
|
||||||
switch ((String) results.get("status")) {
|
|
||||||
case "COMPLETED":
|
|
||||||
eventStatus = EventStatus.COMPLETED(uuid, input, output);
|
|
||||||
break;
|
|
||||||
case "FAILED":
|
|
||||||
eventStatus = EventStatus.FAILED(uuid, input, output);
|
|
||||||
break;
|
|
||||||
case "NOT_FOUND":
|
|
||||||
eventStatus = EventStatus.NOT_FOUND(uuid);
|
|
||||||
break;
|
|
||||||
case "PAUSED":
|
|
||||||
eventStatus = EventStatus.PAUSED(uuid, input);
|
|
||||||
break;
|
|
||||||
case "RUNNING":
|
|
||||||
eventStatus = EventStatus.RUNNING(uuid, input);
|
|
||||||
break;
|
|
||||||
case "TERMINATED":
|
|
||||||
eventStatus = EventStatus.TERMINATED(uuid, input, output);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
eventStatus = EventStatus.NOT_FOUND(uuid);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.warn("Nothing to parse since JSON object is null");
|
|
||||||
}
|
|
||||||
return eventStatus;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,14 +0,0 @@
|
||||||
package org.gcube.event.publisher;
|
|
||||||
|
|
||||||
public class EventProcessorException extends Exception {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 568668418901859597L;
|
|
||||||
|
|
||||||
public EventProcessorException(String message) {
|
|
||||||
super(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
public EventProcessorException(String message, Throwable cause) {
|
|
||||||
super(message, cause);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -2,76 +2,6 @@ package org.gcube.event.publisher;
|
||||||
|
|
||||||
public interface EventPublisher {
|
public interface EventPublisher {
|
||||||
|
|
||||||
/**
|
|
||||||
* Publish a new event and nothing more. The sender is not interested to the success/failure of the send.
|
|
||||||
* The results on the workflow engine, is the start of an instance of the workflow identified by the {@link Event#getName()} string.
|
|
||||||
* @param event the event to be published
|
|
||||||
*/
|
|
||||||
void publish(Event event);
|
void publish(Event event);
|
||||||
|
|
||||||
/**
|
|
||||||
* Publishes a new event and optionally wait for the result.
|
|
||||||
* If <code>waitForResult</code> parameter is <code>false</code> the behavior is the same of the {@link #publish(Event)} method,
|
|
||||||
* if true, the workflow id is returned as string if the publish had success.
|
|
||||||
* @param event the vent to be published
|
|
||||||
* @param waitForResult if the sender is interested or not to the resulting workflow id
|
|
||||||
* @return the resulting workflow id
|
|
||||||
*/
|
|
||||||
String publish(Event event, boolean waitForResult);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tells if the last publish was a success or not.
|
|
||||||
* @return <code>true</code> if the publish was OK, <code>false</code> otherwise
|
|
||||||
*/
|
|
||||||
boolean isLastPublishOK();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the last returned HTTP response code of a publish. E.g. 200 if the send was OK or 404 if the event doesn't have a corresponding workflow definition.
|
|
||||||
* @return the HTTP response code of the last publish or -1 if an error occurred before the call (e.g. during the authorization or connection)
|
|
||||||
*/
|
|
||||||
int getLastPublishEventHTTPResponseCode();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Publish an event and immediately checks for the results status.
|
|
||||||
* The behavior is the same of the {@link #publishAndCheck(Event, int)} with <code>delayMS</code> argument less or equal to 0.
|
|
||||||
* @param event the event to be published
|
|
||||||
* @return an object with info about the event's running status
|
|
||||||
*/
|
|
||||||
EventStatus publishAndCheck(Event event);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Publish an event and checks for the results status after a delay.
|
|
||||||
* The behavior is the same of the {@link #publishAndCheck(Event)} if delayMS argument is less or equal to 0.
|
|
||||||
* @param event the event to be published
|
|
||||||
* @param delayMS the delay betwen the publish and the query calls
|
|
||||||
* @return an object with info about the event's running status
|
|
||||||
*/
|
|
||||||
EventStatus publishAndCheck(Event event, int delayMS);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks for the workflow results status.
|
|
||||||
* @param instanceId the workflow instance id, resulting of the {@link #publish(Event, boolean)} with <code>waitForResult</code> as true.
|
|
||||||
* @return an object with info about the event's running status
|
|
||||||
*/
|
|
||||||
EventStatus check(String instanceId);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Refreshes an event status by checking for the status of the workflow execution represented by the {@link EventStatus#getInstanceId()} string.
|
|
||||||
* @param eventStatus a previously obtained event status.
|
|
||||||
* @return an object with new info about the event's running status
|
|
||||||
*/
|
|
||||||
EventStatus refresh(EventStatus eventStatus);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tells if the last check was a success or not.
|
|
||||||
* @return <code>true</code> if the publish was OK, <code>false</code> otherwise
|
|
||||||
*/
|
|
||||||
boolean isLastCheckOK();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the last returned HTTP response code of a check. E.g. 200 if the send was OK or 404 if the event doesn't have a corresponding workflow instance.
|
|
||||||
* @return the HTTP response code of the last publish or -1 if an error occurred before the call (e.g. during the authorization or connection)
|
|
||||||
*/
|
|
||||||
int getLastCheckHTTPResponseCode();
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,39 +1,7 @@
|
||||||
package org.gcube.event.publisher;
|
package org.gcube.event.publisher;
|
||||||
|
|
||||||
import org.json.simple.JSONObject;
|
|
||||||
|
|
||||||
public interface EventSender {
|
public interface EventSender {
|
||||||
|
|
||||||
/**
|
|
||||||
* Sends an event.
|
|
||||||
* @param event the event to send
|
|
||||||
*/
|
|
||||||
void send(Event event);
|
void send(Event event);
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the last send HTTP response code.
|
|
||||||
* @return the HTTP response code or -1 if an error occurred before the call (e.g. during the authorization or connection)
|
|
||||||
*/
|
|
||||||
int getLastSendHTTPResponseCode();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send an event and get results.
|
|
||||||
* @param event the event to send
|
|
||||||
* @return the result of the call
|
|
||||||
*/
|
|
||||||
String sendAndGetResult(Event event);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieves the instance of the provided workflow instance.
|
|
||||||
* @param id the workflow instance id
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
JSONObject retrive(String id);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the last retrieve HTTP response code.
|
|
||||||
* @return the HTTP response code or -1 if an error occurred before the call (e.g. during the authorization or connection)
|
|
||||||
*/
|
|
||||||
int getLastRetrieveHTTPResponseCode();
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,97 +0,0 @@
|
||||||
package org.gcube.event.publisher;
|
|
||||||
|
|
||||||
import org.json.simple.JSONObject;
|
|
||||||
|
|
||||||
public class EventStatus {
|
|
||||||
|
|
||||||
public enum Status {
|
|
||||||
RUNNING, COMPLETED, FAILED, TIMED_OUT, TERMINATED, PAUSED, NOT_FOUND;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static EventStatus RUNNING(String instanceId, JSONObject input) {
|
|
||||||
return new EventStatus(instanceId, Status.RUNNING, input);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static EventStatus COMPLETED(String instanceId, JSONObject input, JSONObject output) {
|
|
||||||
return new EventStatus(instanceId, Status.COMPLETED, input, output);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static EventStatus FAILED(String instanceId, JSONObject input, JSONObject output) {
|
|
||||||
return new EventStatus(instanceId, Status.FAILED, input, output);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static EventStatus TIMED_OUT(String instanceId, JSONObject input) {
|
|
||||||
return new EventStatus(instanceId, Status.TIMED_OUT, input);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static EventStatus TERMINATED(String instanceId, JSONObject input, JSONObject output) {
|
|
||||||
return new EventStatus(instanceId, Status.TERMINATED, input, output);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static EventStatus PAUSED(String instanceId, JSONObject input) {
|
|
||||||
return new EventStatus(instanceId, Status.PAUSED, input);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static EventStatus NOT_FOUND(String instanceId) {
|
|
||||||
return new EventStatus(instanceId, Status.NOT_FOUND);
|
|
||||||
}
|
|
||||||
|
|
||||||
private String instanceId;
|
|
||||||
private Status status;
|
|
||||||
private JSONObject input;
|
|
||||||
private JSONObject output;
|
|
||||||
|
|
||||||
private EventStatus(String uuid, Status status) {
|
|
||||||
this(uuid, status, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private EventStatus(String instanceId, Status status, JSONObject input) {
|
|
||||||
this(instanceId, status, input, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private EventStatus(String instanceId, Status status, JSONObject input, JSONObject output) {
|
|
||||||
setInstanceId(instanceId);
|
|
||||||
setStatus(status);
|
|
||||||
setInput(input);
|
|
||||||
setOutput(output);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setInstanceId(String instanceId) {
|
|
||||||
this.instanceId = instanceId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getInstanceId() {
|
|
||||||
return instanceId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setStatus(Status status) {
|
|
||||||
this.status = status;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Status getStatus() {
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setInput(JSONObject input) {
|
|
||||||
this.input = input;
|
|
||||||
}
|
|
||||||
|
|
||||||
public JSONObject getInput() {
|
|
||||||
return input;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setOutput(JSONObject output) {
|
|
||||||
this.output = output;
|
|
||||||
}
|
|
||||||
|
|
||||||
public JSONObject getOutput() {
|
|
||||||
return output;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return String.format("[%s]\ninput: %s\noutput: %s", status, input != null ? input.toJSONString() : "<No input>",
|
|
||||||
output != null ? output.toJSONString() : "<No output>");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,15 +1,48 @@
|
||||||
package org.gcube.event.publisher;
|
package org.gcube.event.publisher;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
|
||||||
import org.gcube.oidc.rest.JWTToken;
|
import org.gcube.oidc.rest.JWTToken;
|
||||||
import org.gcube.oidc.rest.OpenIdConnectRESTHelper;
|
import org.gcube.oidc.rest.OpenIdConnectRESTHelper;
|
||||||
import org.gcube.oidc.rest.OpenIdConnectRESTHelperException;
|
import org.gcube.oidc.rest.OpenIdConnectRESTHelperException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class HTTPWithOIDCAuthEventSender extends AbstractHTTPWithJWTTokenAuthEventSender implements EventSender {
|
public class HTTPWithOIDCAuthEventSender implements EventSender {
|
||||||
|
|
||||||
public HTTPWithOIDCAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret, URL tokenURL) {
|
protected static final Logger log = LoggerFactory.getLogger(HTTPWithOIDCAuthEventSender.class);
|
||||||
super(baseEndpointURL, clientId, clientSecret, tokenURL);
|
|
||||||
|
private URL baseEnndpointURL;
|
||||||
|
private String clientId;
|
||||||
|
private String clientSecret;
|
||||||
|
private URL tokenURL;
|
||||||
|
|
||||||
|
public HTTPWithOIDCAuthEventSender(URL baseEnndpointURL, String clientId, String clientSecret, URL tokenURL) {
|
||||||
|
this.baseEnndpointURL = baseEnndpointURL;
|
||||||
|
this.clientId = clientId;
|
||||||
|
this.clientSecret = clientSecret;
|
||||||
|
this.tokenURL = tokenURL;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(Event event) {
|
||||||
|
log.debug("Starting HTTP POST thread to: {}", baseEnndpointURL);
|
||||||
|
try {
|
||||||
|
URL eventEndpoint = new URL(baseEnndpointURL, event.getName());
|
||||||
|
new Thread(new HTTPost(eventEndpoint, event)).start();
|
||||||
|
} catch (MalformedURLException e) {
|
||||||
|
log.error("Cannot compute event endpoint URL. Event name: " + event.getName() + ", base endpoint: "
|
||||||
|
+ baseEnndpointURL, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected URL getTokenURL() {
|
||||||
|
return tokenURL;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected JWTToken getAuthorizationToken() throws OpenIdConnectRESTHelperException {
|
protected JWTToken getAuthorizationToken() throws OpenIdConnectRESTHelperException {
|
||||||
|
@ -22,4 +55,79 @@ public class HTTPWithOIDCAuthEventSender extends AbstractHTTPWithJWTTokenAuthEve
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class HTTPost implements Runnable {
|
||||||
|
|
||||||
|
private static final int CONNECTION_TIMEOUT = 10000;
|
||||||
|
private static final int READ_TIMEOUT = 5000;
|
||||||
|
|
||||||
|
private URL endpoint;
|
||||||
|
private Event event;
|
||||||
|
|
||||||
|
public HTTPost(URL endpoint, Event event) {
|
||||||
|
this.endpoint = endpoint;
|
||||||
|
this.event = event;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
log.debug("Getting auth token for client '{}' if needed", clientId);
|
||||||
|
JWTToken token = getAuthorizationToken();
|
||||||
|
|
||||||
|
log.debug("Performing HTTP POST to: {}", endpoint);
|
||||||
|
HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection();
|
||||||
|
connection.setRequestMethod("POST");
|
||||||
|
connection.setConnectTimeout(CONNECTION_TIMEOUT);
|
||||||
|
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
||||||
|
connection.setReadTimeout(READ_TIMEOUT);
|
||||||
|
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
||||||
|
connection.setRequestProperty("Content-Type", "application/json");
|
||||||
|
// Commented out as per the Conductor issue: https://github.com/Netflix/conductor/issues/376
|
||||||
|
// connection.setRequestProperty("Accept", "application/json");
|
||||||
|
connection.setDoOutput(true);
|
||||||
|
if (token != null) {
|
||||||
|
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
||||||
|
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
||||||
|
} else {
|
||||||
|
log.debug("Sending request without authorization header");
|
||||||
|
}
|
||||||
|
OutputStream os = connection.getOutputStream();
|
||||||
|
String jsonString = event.toJSONString();
|
||||||
|
log.trace("Sending event JSON: {}", jsonString);
|
||||||
|
os.write(jsonString.getBytes("UTF-8"));
|
||||||
|
os.flush();
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
int httpResultCode = connection.getResponseCode();
|
||||||
|
log.trace("HTTP Response code: {}", httpResultCode);
|
||||||
|
|
||||||
|
log.trace("Reading response");
|
||||||
|
boolean ok = true;
|
||||||
|
InputStreamReader isr = null;
|
||||||
|
if (httpResultCode == HttpURLConnection.HTTP_OK) {
|
||||||
|
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
||||||
|
} else {
|
||||||
|
ok = false;
|
||||||
|
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
||||||
|
}
|
||||||
|
BufferedReader br = new BufferedReader(isr);
|
||||||
|
String line = null;
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
sb.append(line + "\n");
|
||||||
|
}
|
||||||
|
br.close();
|
||||||
|
isr.close();
|
||||||
|
if (ok) {
|
||||||
|
log.info("[{}] Event publish for {} is OK", httpResultCode, event.getName());
|
||||||
|
} else {
|
||||||
|
log.debug("[{}] Event publish for {} is not OK", httpResultCode, event.getName());
|
||||||
|
}
|
||||||
|
log.trace("Response message from server: {}", sb.toString());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("POSTing JSON to: " + endpoint, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -14,10 +14,10 @@ public class HTTPWithUMAAuthEventSender extends HTTPWithOIDCAuthEventSender {
|
||||||
|
|
||||||
private String umaAudience;
|
private String umaAudience;
|
||||||
|
|
||||||
public HTTPWithUMAAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret, URL tokenURL,
|
public HTTPWithUMAAuthEventSender(URL baseEnndpointURL, String clientId, String clientSecret, URL tokenURL,
|
||||||
String umaAudience) {
|
String umaAudience) {
|
||||||
|
|
||||||
super(baseEndpointURL, clientId, clientSecret, tokenURL);
|
super(baseEnndpointURL, clientId, clientSecret, tokenURL);
|
||||||
this.umaAudience = umaAudience;
|
this.umaAudience = umaAudience;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ public class HTTPWithUMAAuthEventSender extends HTTPWithOIDCAuthEventSender {
|
||||||
JWTToken oidcToken = super.getAuthorizationToken();
|
JWTToken oidcToken = super.getAuthorizationToken();
|
||||||
if (oidcToken != null) {
|
if (oidcToken != null) {
|
||||||
if (umaAudience != null) {
|
if (umaAudience != null) {
|
||||||
log.debug("Getting UMA token with audience '{}' from: {}", umaAudience, getTokenURL());
|
log.debug("Getting UMA token for audience '{}' from: {}", umaAudience, getTokenURL());
|
||||||
return OpenIdConnectRESTHelper.queryUMAToken(getTokenURL(), oidcToken.getAccessTokenAsBearer(),
|
return OpenIdConnectRESTHelper.queryUMAToken(getTokenURL(), oidcToken.getAccessTokenAsBearer(),
|
||||||
umaAudience, null);
|
umaAudience, null);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,9 +0,0 @@
|
||||||
package org.gcube.event.publisher;
|
|
||||||
|
|
||||||
import org.json.simple.JSONObject;
|
|
||||||
|
|
||||||
public interface ResultsParser {
|
|
||||||
|
|
||||||
EventStatus parseResults(String uuid, JSONObject results);
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,80 +0,0 @@
|
||||||
package org.gcube.event.publisher;
|
|
||||||
|
|
||||||
import org.json.simple.JSONObject;
|
|
||||||
|
|
||||||
public class StartWorkflow extends JSONObject {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = -6974427433855594349L;
|
|
||||||
|
|
||||||
// Name of the Workflow. MUST be registered with Conductor before starting workflow
|
|
||||||
public static final String NAME_ENTRY = "name";
|
|
||||||
|
|
||||||
// Workflow version defaults to latest available version
|
|
||||||
private static final String VERSION_ENTRY = "version";
|
|
||||||
|
|
||||||
// JSON object with key value params, that can be used by downstream tasks See Wiring Inputs and Outputs for details
|
|
||||||
private static final String INPUT_ENTRY = "input";
|
|
||||||
|
|
||||||
// Unique Id that correlates multiple Workflow executions optional
|
|
||||||
private static final String CORRELATION_ID_ENTRY = "correlationId";
|
|
||||||
|
|
||||||
// See Task Domains for more information. optional
|
|
||||||
private static final String TASK_TO_DOMAIN_ENTRY = "taskToDomain";
|
|
||||||
|
|
||||||
// An adhoc Workflow Definition to run, without registering. See Dynamic Workflows. optional
|
|
||||||
private static final String WORKFLOW_DEF_ENTRY = "workflowDef";
|
|
||||||
|
|
||||||
// This is taken care of by Java client. See External Payload Storage for more info. optional
|
|
||||||
private static final String EXTERNAL_INPUT_PAYLOAD_STORAGE_PATH_ENTRY = "externalInputPayloadStoragePath";
|
|
||||||
|
|
||||||
// Priority level for the tasks within this workflow execution. Possible values are between 0 - 99
|
|
||||||
private static final String PRIORITY_ENTRY = "priority";
|
|
||||||
|
|
||||||
public StartWorkflow(Event event) {
|
|
||||||
this(event.getName(), event);
|
|
||||||
}
|
|
||||||
|
|
||||||
public StartWorkflow(String name, JSONObject input) {
|
|
||||||
super();
|
|
||||||
setName(name);
|
|
||||||
setInput(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
protected void set(String key, Object value) {
|
|
||||||
put(key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setName(String name) {
|
|
||||||
set(NAME_ENTRY, name);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setVersion(Integer version) {
|
|
||||||
set(VERSION_ENTRY, version);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setInput(JSONObject input) {
|
|
||||||
set(INPUT_ENTRY, input);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCorrelationId(String correlationId) {
|
|
||||||
set(CORRELATION_ID_ENTRY, correlationId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTaskToDomain(String taskToDomain) {
|
|
||||||
set(TASK_TO_DOMAIN_ENTRY, taskToDomain);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setWorkflowDef(JSONObject workflowDef) {
|
|
||||||
set(WORKFLOW_DEF_ENTRY, workflowDef);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) {
|
|
||||||
set(EXTERNAL_INPUT_PAYLOAD_STORAGE_PATH_ENTRY, externalInputPayloadStoragePath);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setPriority(Integer prioriry) {
|
|
||||||
set(PRIORITY_ENTRY, prioriry);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -3,58 +3,19 @@ package org.gcube.event.publisher;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class HTTPEventSenderTest {
|
public class HTTPEventSenderTest {
|
||||||
|
|
||||||
protected static final Logger logger = LoggerFactory.getLogger(HTTPEventSenderTest.class);
|
|
||||||
|
|
||||||
public HTTPEventSenderTest() {
|
public HTTPEventSenderTest() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) throws MalformedURLException {
|
||||||
EventPublisher publisher = new AbstractEventPublisher() {
|
HTTPWithUMAAuthEventSender sender = new HTTPWithUMAAuthEventSender(
|
||||||
@Override
|
new URL("https://nubis1.int.d4science.net/api/workflow/"), "lr62_portal",
|
||||||
protected EventSender createEventSender() {
|
"28726d01-9f24-4ef4-a057-3d208d96aaa0",
|
||||||
try {
|
new URL("https://nubis2.int.d4science.net/auth/realms/d4science/protocol/openid-connect/token"),
|
||||||
return new HTTPWithOIDCAuthEventSender(
|
"%2Fgcube");
|
||||||
new URL("https://conductor.cloud-dev.d4science.org/api/workflow/"),
|
|
||||||
"lr62_portal",
|
|
||||||
"6937125d-6d70-404a-9d63-908c1e6415c4",
|
|
||||||
new URL("https://accounts.dev.d4science.org/auth/realms/d4science/protocol/openid-connect/token"));
|
|
||||||
} catch (MalformedURLException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
String id = publisher.publish(new Event("test", "test", "test"), true);
|
sender.send(new Event("startup", "portal", "gcube"));
|
||||||
logger.info("*** Published: {}", id);
|
|
||||||
logger.info("*** Publish was {} and HTTP response status is: {}",
|
|
||||||
publisher.isLastPublishOK() ? "OK" : "KO", publisher.getLastPublishEventHTTPResponseCode());
|
|
||||||
|
|
||||||
EventStatus status = null;
|
|
||||||
do {
|
|
||||||
status = publisher.check(id);
|
|
||||||
logger.trace("*** Check was {} and HTTP response status is: {}", publisher.isLastCheckOK() ? "OK" : "KO",
|
|
||||||
publisher.getLastCheckHTTPResponseCode());
|
|
||||||
} while (status.getStatus() != EventStatus.Status.COMPLETED && status.getStatus() != EventStatus.Status.FAILED);
|
|
||||||
|
|
||||||
logger.info("*** Final status is: {}", status.toString());
|
|
||||||
|
|
||||||
status = publisher.publishAndCheck(new Event("test", "test", "test"), 1000);
|
|
||||||
logger.info("*** Publish was {} and HTTP response status is: {}",
|
|
||||||
publisher.isLastPublishOK() ? "OK" : "KO", publisher.getLastPublishEventHTTPResponseCode());
|
|
||||||
|
|
||||||
do {
|
|
||||||
status = publisher.refresh(status);
|
|
||||||
logger.trace("*** Check was {} and HTTP response status is: {}", publisher.isLastCheckOK() ? "OK" : "KO",
|
|
||||||
publisher.getLastCheckHTTPResponseCode());
|
|
||||||
} while (status.getStatus() != EventStatus.Status.COMPLETED && status.getStatus() != EventStatus.Status.FAILED);
|
|
||||||
|
|
||||||
logger.info("*** Final status is: {}", status.toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue