Extracted the `AbstractHTTPWithJWTTokenAuthEventSender` abstract class for easy extention purposes and updated to `maven-parent` version `1.2.1-SNAPSHOT` and `maven-portal-bom` version `3.7.0`
This commit is contained in:
parent
bfcef8b9c3
commit
b47f04a197
35
.classpath
35
.classpath
|
@ -1,27 +1,10 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<classpath>
|
<classpath>
|
||||||
<classpathentry kind="src" output="target/classes" path="src/main/java">
|
|
||||||
<attributes>
|
|
||||||
<attribute name="optional" value="true"/>
|
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
|
||||||
</attributes>
|
|
||||||
</classpathentry>
|
|
||||||
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
|
|
||||||
<attributes>
|
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
|
||||||
</attributes>
|
|
||||||
</classpathentry>
|
|
||||||
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
|
|
||||||
<attributes>
|
|
||||||
<attribute name="optional" value="true"/>
|
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
|
||||||
<attribute name="test" value="true"/>
|
|
||||||
</attributes>
|
|
||||||
</classpathentry>
|
|
||||||
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
|
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
|
||||||
<attributes>
|
<attributes>
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
|
||||||
<attribute name="test" value="true"/>
|
<attribute name="test" value="true"/>
|
||||||
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
|
<attribute name="optional" value="true"/>
|
||||||
</attributes>
|
</attributes>
|
||||||
</classpathentry>
|
</classpathentry>
|
||||||
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
|
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
|
||||||
|
@ -30,13 +13,21 @@
|
||||||
<attribute name="org.eclipse.jst.component.nondependency" value=""/>
|
<attribute name="org.eclipse.jst.component.nondependency" value=""/>
|
||||||
</attributes>
|
</attributes>
|
||||||
</classpathentry>
|
</classpathentry>
|
||||||
<classpathentry kind="con" path="org.eclipse.jst.server.core.container/com.liferay.ide.eclipse.server.tomcat.runtimeClasspathProvider/Liferay v6.2 (Tomcat 7)">
|
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
|
||||||
<attributes>
|
<attributes>
|
||||||
<attribute name="owner.project.facets" value="jst.utility"/>
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
</attributes>
|
</attributes>
|
||||||
</classpathentry>
|
</classpathentry>
|
||||||
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11">
|
<classpathentry kind="src" output="target/classes" path="src/main/java">
|
||||||
<attributes>
|
<attributes>
|
||||||
|
<attribute name="optional" value="true"/>
|
||||||
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
|
</attributes>
|
||||||
|
</classpathentry>
|
||||||
|
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
|
||||||
|
<attributes>
|
||||||
|
<attribute name="test" value="true"/>
|
||||||
|
<attribute name="optional" value="true"/>
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
</attributes>
|
</attributes>
|
||||||
</classpathentry>
|
</classpathentry>
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
eclipse.preferences.version=1
|
eclipse.preferences.version=1
|
||||||
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
|
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
|
||||||
org.eclipse.jdt.core.compiler.codegen.targetPlatform=11
|
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
|
||||||
org.eclipse.jdt.core.compiler.compliance=11
|
org.eclipse.jdt.core.compiler.compliance=1.8
|
||||||
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
|
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
|
||||||
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
|
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
|
||||||
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
|
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
|
||||||
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
|
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
|
||||||
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=warning
|
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=warning
|
||||||
org.eclipse.jdt.core.compiler.release=enabled
|
org.eclipse.jdt.core.compiler.release=disabled
|
||||||
org.eclipse.jdt.core.compiler.source=11
|
org.eclipse.jdt.core.compiler.source=1.8
|
||||||
|
|
|
@ -2,5 +2,5 @@
|
||||||
<faceted-project>
|
<faceted-project>
|
||||||
<runtime name="Liferay v6.2 (Tomcat 7)"/>
|
<runtime name="Liferay v6.2 (Tomcat 7)"/>
|
||||||
<installed facet="jst.utility" version="1.0"/>
|
<installed facet="jst.utility" version="1.0"/>
|
||||||
<installed facet="java" version="11"/>
|
<installed facet="java" version="1.8"/>
|
||||||
</faceted-project>
|
</faceted-project>
|
||||||
|
|
|
@ -2,6 +2,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||||
|
|
||||||
# Changelog for "event-publisher-library"
|
# Changelog for "event-publisher-library"
|
||||||
|
|
||||||
|
## [v1.2.0-SNAPSHOT]
|
||||||
|
|
||||||
## [v1.1.0]
|
## [v1.1.0]
|
||||||
- Added `BufferedEventProcessor` to manual send bunch of events and controlling their output status (#23628)
|
- Added `BufferedEventProcessor` to manual send bunch of events and controlling their output status (#23628)
|
||||||
|
|
||||||
|
|
36
pom.xml
36
pom.xml
|
@ -7,13 +7,13 @@
|
||||||
<parent>
|
<parent>
|
||||||
<artifactId>maven-parent</artifactId>
|
<artifactId>maven-parent</artifactId>
|
||||||
<groupId>org.gcube.tools</groupId>
|
<groupId>org.gcube.tools</groupId>
|
||||||
<version>1.1.0</version>
|
<version>1.2.1-SNAPSHOT</version>
|
||||||
<relativePath />
|
<relativePath />
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<groupId>org.gcube.common</groupId>
|
<groupId>org.gcube.common</groupId>
|
||||||
<artifactId>event-publisher-library</artifactId>
|
<artifactId>event-publisher-library</artifactId>
|
||||||
<version>1.1.0</version>
|
<version>1.2.0-SNAPSHOT</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<dependencyManagement>
|
<dependencyManagement>
|
||||||
|
@ -21,7 +21,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.gcube.distribution</groupId>
|
<groupId>org.gcube.distribution</groupId>
|
||||||
<artifactId>maven-portal-bom</artifactId>
|
<artifactId>maven-portal-bom</artifactId>
|
||||||
<version>3.6.4</version>
|
<version>3.7.0</version>
|
||||||
<type>pom</type>
|
<type>pom</type>
|
||||||
<scope>import</scope>
|
<scope>import</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
@ -36,21 +36,21 @@
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
<plugin>
|
<!-- <plugin>-->
|
||||||
<artifactId>maven-compiler-plugin</artifactId>
|
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
|
||||||
<configuration>
|
<!-- <configuration>-->
|
||||||
<encoding>UTF-8</encoding>
|
<!-- <encoding>UTF-8</encoding>-->
|
||||||
<source>${maven.compiler.source}</source>
|
<!-- <source>${maven.compiler.source}</source>-->
|
||||||
<target>${maven.compiler.target}</target>
|
<!-- <target>${maven.compiler.target}</target>-->
|
||||||
</configuration>
|
<!-- </configuration>-->
|
||||||
</plugin>
|
<!-- </plugin>-->
|
||||||
<plugin>
|
<!-- <plugin>-->
|
||||||
<artifactId>maven-resources-plugin</artifactId>
|
<!-- <artifactId>maven-resources-plugin</artifactId>-->
|
||||||
<configuration>
|
<!-- <configuration>-->
|
||||||
<encoding>UTF-8</encoding>
|
<!-- <encoding>UTF-8</encoding>-->
|
||||||
</configuration>
|
<!-- </configuration>-->
|
||||||
<version>2.5</version>
|
<!-- <version>2.5</version>-->
|
||||||
</plugin>
|
<!-- </plugin>-->
|
||||||
</plugins>
|
</plugins>
|
||||||
</build>
|
</build>
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,276 @@
|
||||||
|
package org.gcube.event.publisher;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.gcube.oidc.rest.JWTToken;
|
||||||
|
import org.gcube.oidc.rest.OpenIdConnectRESTHelperException;
|
||||||
|
import org.json.simple.JSONObject;
|
||||||
|
import org.json.simple.parser.JSONParser;
|
||||||
|
import org.json.simple.parser.ParseException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSender {
|
||||||
|
|
||||||
|
protected static final Logger log = LoggerFactory.getLogger(HTTPWithOIDCAuthEventSender.class);
|
||||||
|
protected URL baseEndpointURL;
|
||||||
|
protected String clientId;
|
||||||
|
protected String clientSecret;
|
||||||
|
protected URL tokenURL;
|
||||||
|
|
||||||
|
public AbstractHTTPWithJWTTokenAuthEventSender() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(Event event) {
|
||||||
|
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
|
||||||
|
new Thread(new HTTPPost(baseEndpointURL, event)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String sendAndGetResult(Event event) {
|
||||||
|
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
|
||||||
|
HTTPPost post = new HTTPPost(baseEndpointURL, event);
|
||||||
|
Thread postThread = new Thread(post);
|
||||||
|
postThread.start();
|
||||||
|
try {
|
||||||
|
postThread.join();
|
||||||
|
return post.getResult();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("While waiting for HTTP Post thread termination", e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JSONObject retrive(String id) {
|
||||||
|
return new HTTPGet(baseEndpointURL, id).readJSON();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected URL getTokenURL() {
|
||||||
|
return tokenURL;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract JWTToken getAuthorizationToken() throws OpenIdConnectRESTHelperException;
|
||||||
|
|
||||||
|
public abstract class HTTPVerb {
|
||||||
|
|
||||||
|
protected static final int CONNECTION_TIMEOUT = 10000;
|
||||||
|
protected static final int READ_TIMEOUT = 5000;
|
||||||
|
|
||||||
|
protected URL baseEndpoint;
|
||||||
|
|
||||||
|
public HTTPVerb(URL baseEndpoint) {
|
||||||
|
this.baseEndpoint = baseEndpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public class HTTPPost extends HTTPVerb implements Runnable {
|
||||||
|
|
||||||
|
private static final int PAUSE_INCREMENT_FACTOR = 2;
|
||||||
|
private static final long MAX_RETRYINGS = 2;
|
||||||
|
|
||||||
|
private final int[] RETRY_CODES = { HttpURLConnection.HTTP_BAD_GATEWAY,
|
||||||
|
HttpURLConnection.HTTP_CLIENT_TIMEOUT, HttpURLConnection.HTTP_GATEWAY_TIMEOUT,
|
||||||
|
HttpURLConnection.HTTP_INTERNAL_ERROR };
|
||||||
|
|
||||||
|
private Event event;
|
||||||
|
private String result;
|
||||||
|
private long actualPause = 1;
|
||||||
|
private long retryings = 0;
|
||||||
|
|
||||||
|
public HTTPPost(URL baseEndpoint, Event event) {
|
||||||
|
super(baseEndpoint);
|
||||||
|
this.event = event;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
URL eventEndpoint = null;
|
||||||
|
try {
|
||||||
|
eventEndpoint = new URL(baseEndpointURL, event.getName());
|
||||||
|
} catch (MalformedURLException e) {
|
||||||
|
log.error("Cannot compute event endpoint URL. Event name: " + event.getName() + ", base endpoint: "
|
||||||
|
+ baseEndpointURL, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
boolean OK = false;
|
||||||
|
do {
|
||||||
|
try {
|
||||||
|
log.debug("Getting auth token for client '{}' if needed", clientId);
|
||||||
|
JWTToken token = getAuthorizationToken();
|
||||||
|
log.debug("Performing HTTP POST to: {}", baseEndpoint);
|
||||||
|
HttpURLConnection connection = (HttpURLConnection) eventEndpoint.openConnection();
|
||||||
|
connection.setRequestMethod("POST");
|
||||||
|
connection.setConnectTimeout(CONNECTION_TIMEOUT);
|
||||||
|
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
||||||
|
connection.setReadTimeout(READ_TIMEOUT);
|
||||||
|
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
||||||
|
connection.setRequestProperty("Content-Type", "application/json");
|
||||||
|
// Commented out as per the Conductor issue: https://github.com/Netflix/conductor/issues/376
|
||||||
|
// connection.setRequestProperty("Accept", "application/json");
|
||||||
|
connection.setDoOutput(true);
|
||||||
|
if (token != null) {
|
||||||
|
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
||||||
|
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
||||||
|
} else {
|
||||||
|
log.debug("Sending request without authorization header");
|
||||||
|
}
|
||||||
|
OutputStream os = connection.getOutputStream();
|
||||||
|
String jsonString = event.toJSONString();
|
||||||
|
log.trace("Sending event JSON: {}", jsonString);
|
||||||
|
os.write(jsonString.getBytes("UTF-8"));
|
||||||
|
os.flush();
|
||||||
|
os.close();
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
int httpResultCode = connection.getResponseCode();
|
||||||
|
log.trace("HTTP Response code: {}", httpResultCode);
|
||||||
|
|
||||||
|
log.trace("Reading response");
|
||||||
|
InputStreamReader isr = null;
|
||||||
|
if (httpResultCode == HttpURLConnection.HTTP_OK) {
|
||||||
|
OK = true;
|
||||||
|
// From this point ahead every exception should not set OK to false
|
||||||
|
// since the server acknowledged with 200
|
||||||
|
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
||||||
|
} else {
|
||||||
|
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
||||||
|
}
|
||||||
|
BufferedReader br = new BufferedReader(isr);
|
||||||
|
String line = null;
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
sb.append("\n" + line);
|
||||||
|
}
|
||||||
|
br.close();
|
||||||
|
isr.close();
|
||||||
|
sb.deleteCharAt(0);
|
||||||
|
result = sb.toString();
|
||||||
|
if (OK) {
|
||||||
|
log.debug("[{}] Event publish for {} is OK", httpResultCode, event.getName());
|
||||||
|
} else {
|
||||||
|
log.trace("Response message from server:\n{}", result);
|
||||||
|
if (shouldRetryWithCode(httpResultCode)) {
|
||||||
|
if (retryings <= MAX_RETRYINGS) {
|
||||||
|
log.warn("[{}] Event publish ERROR, retrying in {} seconds", httpResultCode,
|
||||||
|
actualPause);
|
||||||
|
|
||||||
|
Thread.sleep(actualPause * 1000);
|
||||||
|
log.debug("Start retrying event publish: {}", event.getName());
|
||||||
|
actualPause *= PAUSE_INCREMENT_FACTOR;
|
||||||
|
retryings += 1;
|
||||||
|
} else {
|
||||||
|
log.error("[{}] Event publish ERROR, exhausted tries after {} retryings",
|
||||||
|
httpResultCode,
|
||||||
|
retryings);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.info("[{}] Event publish ERROR but should not retry with this HTTP code",
|
||||||
|
httpResultCode);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException | OpenIdConnectRESTHelperException e) {
|
||||||
|
log.error("POSTing JSON to: " + eventEndpoint, e);
|
||||||
|
}
|
||||||
|
} while (!OK);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("Sleeping before retry event send", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean shouldRetryWithCode(int httpResultCode) {
|
||||||
|
return Arrays.binarySearch(RETRY_CODES, httpResultCode) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public class HTTPGet extends HTTPVerb {
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
public HTTPGet(URL baseEndpoint, String id) {
|
||||||
|
super(baseEndpoint);
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JSONObject readJSON() {
|
||||||
|
URL endpoint = null;
|
||||||
|
JSONObject results = null;
|
||||||
|
try {
|
||||||
|
endpoint = new URL(baseEndpoint, id);
|
||||||
|
} catch (MalformedURLException e) {
|
||||||
|
log.error("Cannot compute retrieve endpoint URL. ID: " + id + ", base endpoint: " + baseEndpointURL, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
log.debug("Getting auth token for client '{}' if needed", clientId);
|
||||||
|
JWTToken token = getAuthorizationToken();
|
||||||
|
|
||||||
|
log.debug("Performing HTTP GET to: {}", endpoint);
|
||||||
|
HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection();
|
||||||
|
connection.setRequestMethod("GET");
|
||||||
|
connection.setConnectTimeout(CONNECTION_TIMEOUT);
|
||||||
|
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
||||||
|
connection.setReadTimeout(READ_TIMEOUT);
|
||||||
|
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
||||||
|
if (token != null) {
|
||||||
|
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
||||||
|
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
||||||
|
} else {
|
||||||
|
log.debug("Sending request without authorization header");
|
||||||
|
}
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
int httpResultCode = connection.getResponseCode();
|
||||||
|
log.trace("HTTP Response code: {}", httpResultCode);
|
||||||
|
|
||||||
|
log.trace("Reading response");
|
||||||
|
InputStreamReader isr = null;
|
||||||
|
if (httpResultCode == HttpURLConnection.HTTP_OK) {
|
||||||
|
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
||||||
|
} else {
|
||||||
|
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
||||||
|
}
|
||||||
|
BufferedReader br = new BufferedReader(isr);
|
||||||
|
String line = null;
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
sb.append(line + "\n");
|
||||||
|
}
|
||||||
|
br.close();
|
||||||
|
isr.close();
|
||||||
|
if (httpResultCode == HttpURLConnection.HTTP_OK) {
|
||||||
|
log.debug("[{}] Got results for {}", httpResultCode, id);
|
||||||
|
try {
|
||||||
|
results = (JSONObject) new JSONParser().parse(sb.toString());
|
||||||
|
} catch (ParseException e) {
|
||||||
|
log.warn("Error parsing results string as JSON: {}", sb.toString());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.warn("[{}] Error getting results for ID {}", httpResultCode, id);
|
||||||
|
}
|
||||||
|
} catch (IOException | OpenIdConnectRESTHelperException e) {
|
||||||
|
log.error("Getting results from: " + endpoint, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,69 +1,21 @@
|
||||||
package org.gcube.event.publisher;
|
package org.gcube.event.publisher;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.HttpURLConnection;
|
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
import org.gcube.oidc.rest.JWTToken;
|
import org.gcube.oidc.rest.JWTToken;
|
||||||
import org.gcube.oidc.rest.OpenIdConnectRESTHelper;
|
import org.gcube.oidc.rest.OpenIdConnectRESTHelper;
|
||||||
import org.gcube.oidc.rest.OpenIdConnectRESTHelperException;
|
import org.gcube.oidc.rest.OpenIdConnectRESTHelperException;
|
||||||
import org.json.simple.JSONObject;
|
|
||||||
import org.json.simple.parser.JSONParser;
|
|
||||||
import org.json.simple.parser.ParseException;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class HTTPWithOIDCAuthEventSender implements EventSender {
|
public class HTTPWithOIDCAuthEventSender extends AbstractHTTPWithJWTTokenAuthEventSender implements EventSender {
|
||||||
|
|
||||||
protected static final Logger log = LoggerFactory.getLogger(HTTPWithOIDCAuthEventSender.class);
|
|
||||||
|
|
||||||
private URL baseEndpointURL;
|
|
||||||
private String clientId;
|
|
||||||
private String clientSecret;
|
|
||||||
private URL tokenURL;
|
|
||||||
|
|
||||||
public HTTPWithOIDCAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret, URL tokenURL) {
|
public HTTPWithOIDCAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret, URL tokenURL) {
|
||||||
|
super();
|
||||||
this.baseEndpointURL = baseEndpointURL;
|
this.baseEndpointURL = baseEndpointURL;
|
||||||
this.clientId = clientId;
|
this.clientId = clientId;
|
||||||
this.clientSecret = clientSecret;
|
this.clientSecret = clientSecret;
|
||||||
this.tokenURL = tokenURL;
|
this.tokenURL = tokenURL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(Event event) {
|
|
||||||
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
|
|
||||||
new Thread(new HTTPPost(baseEndpointURL, event)).start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String sendAndGetResult(Event event) {
|
|
||||||
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
|
|
||||||
HTTPPost post = new HTTPPost(baseEndpointURL, event);
|
|
||||||
Thread postThread = new Thread(post);
|
|
||||||
postThread.start();
|
|
||||||
try {
|
|
||||||
postThread.join();
|
|
||||||
return post.getResult();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error("While waiting for HTTP Post thread termination", e);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public JSONObject retrive(String id) {
|
|
||||||
return new HTTPGet(baseEndpointURL, id).readJSON();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected URL getTokenURL() {
|
|
||||||
return tokenURL;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected JWTToken getAuthorizationToken() throws OpenIdConnectRESTHelperException {
|
protected JWTToken getAuthorizationToken() throws OpenIdConnectRESTHelperException {
|
||||||
if (clientId != null && clientSecret != null && tokenURL != null) {
|
if (clientId != null && clientSecret != null && tokenURL != null) {
|
||||||
log.debug("Getting OIDC token for clientId '{}' from: {}", clientId, tokenURL);
|
log.debug("Getting OIDC token for clientId '{}' from: {}", clientId, tokenURL);
|
||||||
|
@ -74,216 +26,4 @@ public class HTTPWithOIDCAuthEventSender implements EventSender {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract class HTTPVerb {
|
|
||||||
|
|
||||||
protected static final int CONNECTION_TIMEOUT = 10000;
|
|
||||||
protected static final int READ_TIMEOUT = 5000;
|
|
||||||
|
|
||||||
protected URL baseEndpoint;
|
|
||||||
|
|
||||||
public HTTPVerb(URL baseEndpoint) {
|
|
||||||
this.baseEndpoint = baseEndpoint;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public class HTTPPost extends HTTPVerb implements Runnable {
|
|
||||||
|
|
||||||
private static final int PAUSE_INCREMENT_FACTOR = 2;
|
|
||||||
private static final long MAX_RETRYINGS = 2;
|
|
||||||
|
|
||||||
private final int[] RETRY_CODES = { HttpURLConnection.HTTP_BAD_GATEWAY,
|
|
||||||
HttpURLConnection.HTTP_CLIENT_TIMEOUT, HttpURLConnection.HTTP_GATEWAY_TIMEOUT,
|
|
||||||
HttpURLConnection.HTTP_INTERNAL_ERROR };
|
|
||||||
|
|
||||||
private Event event;
|
|
||||||
private String result;
|
|
||||||
private long actualPause = 1;
|
|
||||||
private long retryings = 0;
|
|
||||||
|
|
||||||
public HTTPPost(URL baseEndpoint, Event event) {
|
|
||||||
super(baseEndpoint);
|
|
||||||
this.event = event;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
URL eventEndpoint = null;
|
|
||||||
try {
|
|
||||||
eventEndpoint = new URL(baseEndpointURL, event.getName());
|
|
||||||
} catch (MalformedURLException e) {
|
|
||||||
log.error("Cannot compute event endpoint URL. Event name: " + event.getName() + ", base endpoint: "
|
|
||||||
+ baseEndpointURL, e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
boolean OK = false;
|
|
||||||
do {
|
|
||||||
try {
|
|
||||||
log.debug("Getting auth token for client '{}' if needed", clientId);
|
|
||||||
JWTToken token = getAuthorizationToken();
|
|
||||||
log.debug("Performing HTTP POST to: {}", baseEndpoint);
|
|
||||||
HttpURLConnection connection = (HttpURLConnection) eventEndpoint.openConnection();
|
|
||||||
connection.setRequestMethod("POST");
|
|
||||||
connection.setConnectTimeout(CONNECTION_TIMEOUT);
|
|
||||||
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
|
||||||
connection.setReadTimeout(READ_TIMEOUT);
|
|
||||||
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
|
||||||
connection.setRequestProperty("Content-Type", "application/json");
|
|
||||||
// Commented out as per the Conductor issue: https://github.com/Netflix/conductor/issues/376
|
|
||||||
// connection.setRequestProperty("Accept", "application/json");
|
|
||||||
connection.setDoOutput(true);
|
|
||||||
if (token != null) {
|
|
||||||
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
|
||||||
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
|
||||||
} else {
|
|
||||||
log.debug("Sending request without authorization header");
|
|
||||||
}
|
|
||||||
OutputStream os = connection.getOutputStream();
|
|
||||||
String jsonString = event.toJSONString();
|
|
||||||
log.trace("Sending event JSON: {}", jsonString);
|
|
||||||
os.write(jsonString.getBytes("UTF-8"));
|
|
||||||
os.flush();
|
|
||||||
os.close();
|
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
int httpResultCode = connection.getResponseCode();
|
|
||||||
log.trace("HTTP Response code: {}", httpResultCode);
|
|
||||||
|
|
||||||
log.trace("Reading response");
|
|
||||||
InputStreamReader isr = null;
|
|
||||||
if (httpResultCode == HttpURLConnection.HTTP_OK) {
|
|
||||||
OK = true;
|
|
||||||
// From this point ahead every exception should not set OK to false
|
|
||||||
// since the server acknowledged with 200
|
|
||||||
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
|
||||||
} else {
|
|
||||||
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
|
||||||
}
|
|
||||||
BufferedReader br = new BufferedReader(isr);
|
|
||||||
String line = null;
|
|
||||||
while ((line = br.readLine()) != null) {
|
|
||||||
sb.append("\n" + line);
|
|
||||||
}
|
|
||||||
br.close();
|
|
||||||
isr.close();
|
|
||||||
sb.deleteCharAt(0);
|
|
||||||
result = sb.toString();
|
|
||||||
if (OK) {
|
|
||||||
log.debug("[{}] Event publish for {} is OK", httpResultCode, event.getName());
|
|
||||||
} else {
|
|
||||||
log.trace("Response message from server:\n{}", result);
|
|
||||||
if (shouldRetryWithCode(httpResultCode)) {
|
|
||||||
if (retryings <= MAX_RETRYINGS) {
|
|
||||||
log.warn("[{}] Event publish ERROR, retrying in {} seconds", httpResultCode,
|
|
||||||
actualPause);
|
|
||||||
|
|
||||||
Thread.sleep(actualPause * 1000);
|
|
||||||
log.debug("Start retrying event publish: {}", event.getName());
|
|
||||||
actualPause *= PAUSE_INCREMENT_FACTOR;
|
|
||||||
retryings += 1;
|
|
||||||
} else {
|
|
||||||
log.error("[{}] Event publish ERROR, exhausted tries after {} retryings",
|
|
||||||
httpResultCode,
|
|
||||||
retryings);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.info("[{}] Event publish ERROR but should not retry with this HTTP code",
|
|
||||||
httpResultCode);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException | OpenIdConnectRESTHelperException e) {
|
|
||||||
log.error("POSTing JSON to: " + eventEndpoint, e);
|
|
||||||
}
|
|
||||||
} while (!OK);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error("Sleeping before retry event send", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean shouldRetryWithCode(int httpResultCode) {
|
|
||||||
return Arrays.binarySearch(RETRY_CODES, httpResultCode) > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getResult() {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public class HTTPGet extends HTTPVerb {
|
|
||||||
|
|
||||||
private String id;
|
|
||||||
|
|
||||||
public HTTPGet(URL baseEndpoint, String id) {
|
|
||||||
super(baseEndpoint);
|
|
||||||
this.id = id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public JSONObject readJSON() {
|
|
||||||
URL endpoint = null;
|
|
||||||
JSONObject results = null;
|
|
||||||
try {
|
|
||||||
endpoint = new URL(baseEndpoint, id);
|
|
||||||
} catch (MalformedURLException e) {
|
|
||||||
log.error("Cannot compute retrieve endpoint URL. ID: " + id + ", base endpoint: " + baseEndpointURL, e);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
log.debug("Getting auth token for client '{}' if needed", clientId);
|
|
||||||
JWTToken token = getAuthorizationToken();
|
|
||||||
|
|
||||||
log.debug("Performing HTTP GET to: {}", endpoint);
|
|
||||||
HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection();
|
|
||||||
connection.setRequestMethod("GET");
|
|
||||||
connection.setConnectTimeout(CONNECTION_TIMEOUT);
|
|
||||||
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
|
||||||
connection.setReadTimeout(READ_TIMEOUT);
|
|
||||||
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
|
||||||
if (token != null) {
|
|
||||||
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
|
||||||
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
|
||||||
} else {
|
|
||||||
log.debug("Sending request without authorization header");
|
|
||||||
}
|
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
int httpResultCode = connection.getResponseCode();
|
|
||||||
log.trace("HTTP Response code: {}", httpResultCode);
|
|
||||||
|
|
||||||
log.trace("Reading response");
|
|
||||||
InputStreamReader isr = null;
|
|
||||||
if (httpResultCode == HttpURLConnection.HTTP_OK) {
|
|
||||||
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
|
||||||
} else {
|
|
||||||
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
|
||||||
}
|
|
||||||
BufferedReader br = new BufferedReader(isr);
|
|
||||||
String line = null;
|
|
||||||
while ((line = br.readLine()) != null) {
|
|
||||||
sb.append(line + "\n");
|
|
||||||
}
|
|
||||||
br.close();
|
|
||||||
isr.close();
|
|
||||||
if (httpResultCode == HttpURLConnection.HTTP_OK) {
|
|
||||||
log.debug("[{}] Got results for {}", httpResultCode, id);
|
|
||||||
try {
|
|
||||||
results = (JSONObject) new JSONParser().parse(sb.toString());
|
|
||||||
} catch (ParseException e) {
|
|
||||||
log.warn("Error parsing results string as JSON: {}", sb.toString());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.warn("[{}] Error getting results for ID {}", httpResultCode, id);
|
|
||||||
}
|
|
||||||
} catch (IOException | OpenIdConnectRESTHelperException e) {
|
|
||||||
log.error("Getting results from: " + endpoint, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue