2024-03-05 11:49:47 +01:00
|
|
|
package org.gcube.event.publisher;
|
|
|
|
|
|
|
|
import java.io.BufferedReader;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.io.InputStreamReader;
|
|
|
|
import java.io.OutputStream;
|
|
|
|
import java.net.HttpURLConnection;
|
|
|
|
import java.net.MalformedURLException;
|
2024-04-22 16:17:59 +02:00
|
|
|
import java.net.SocketTimeoutException;
|
2024-03-05 11:49:47 +01:00
|
|
|
import java.net.URL;
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
|
|
|
import org.gcube.oidc.rest.JWTToken;
|
|
|
|
import org.gcube.oidc.rest.OpenIdConnectRESTHelperException;
|
|
|
|
import org.json.simple.JSONObject;
|
|
|
|
import org.json.simple.parser.JSONParser;
|
|
|
|
import org.json.simple.parser.ParseException;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSender {
|
|
|
|
|
|
|
|
protected static final Logger log = LoggerFactory.getLogger(HTTPWithOIDCAuthEventSender.class);
|
|
|
|
protected URL baseEndpointURL;
|
|
|
|
protected String clientId;
|
|
|
|
protected String clientSecret;
|
|
|
|
protected URL tokenURL;
|
2024-03-06 12:09:09 +01:00
|
|
|
protected int connectionTimeout;
|
|
|
|
protected int readTimeout;
|
2024-03-08 13:43:43 +01:00
|
|
|
protected HTTPPost lastHTTPPost;
|
|
|
|
protected HTTPGet lastHTTPGet;
|
2024-03-05 11:49:47 +01:00
|
|
|
|
2024-03-05 15:06:11 +01:00
|
|
|
public AbstractHTTPWithJWTTokenAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret,
|
|
|
|
URL tokenURL) {
|
|
|
|
|
2024-04-22 16:17:59 +02:00
|
|
|
this(baseEndpointURL, clientId, clientSecret, tokenURL, HTTPVerb.DEFAULT_CONNECTION_TIMEOUT,
|
|
|
|
HTTPVerb.DEFAULT_READ_TIMEOUT);
|
|
|
|
}
|
|
|
|
|
|
|
|
public AbstractHTTPWithJWTTokenAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret,
|
|
|
|
URL tokenURL, int connectionTimeout, int readTimeout) {
|
|
|
|
|
2024-03-05 11:49:47 +01:00
|
|
|
super();
|
2024-03-05 15:06:11 +01:00
|
|
|
this.baseEndpointURL = baseEndpointURL;
|
|
|
|
this.clientId = clientId;
|
|
|
|
this.clientSecret = clientSecret;
|
|
|
|
this.tokenURL = tokenURL;
|
2024-04-22 16:17:59 +02:00
|
|
|
this.connectionTimeout = connectionTimeout;
|
|
|
|
this.readTimeout = readTimeout;
|
2024-03-05 11:49:47 +01:00
|
|
|
}
|
|
|
|
|
2024-03-06 12:18:30 +01:00
|
|
|
public int getReadTimeout() {
|
|
|
|
return readTimeout;
|
|
|
|
}
|
|
|
|
|
|
|
|
public void setReadTimeout(int readTimeout) {
|
|
|
|
this.readTimeout = readTimeout;
|
|
|
|
}
|
|
|
|
|
|
|
|
public int getConnectionTimeout() {
|
|
|
|
return connectionTimeout;
|
|
|
|
}
|
|
|
|
|
|
|
|
public void setConnectionTimeout(int connectionTimeout) {
|
|
|
|
this.connectionTimeout = connectionTimeout;
|
|
|
|
}
|
|
|
|
|
2024-03-05 11:49:47 +01:00
|
|
|
@Override
|
|
|
|
public void send(Event event) {
|
2024-03-08 13:43:43 +01:00
|
|
|
log.debug("Starting HTTP POST thread with base URL: {}", baseEndpointURL);
|
|
|
|
lastHTTPPost = new HTTPPost(baseEndpointURL, event);
|
|
|
|
new Thread(lastHTTPPost).start();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public int getLastSendHTTPResponseCode() {
|
|
|
|
return lastHTTPPost != null ? lastHTTPPost.gethttpResponseCode() : -1;
|
2024-03-05 11:49:47 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String sendAndGetResult(Event event) {
|
|
|
|
log.debug("Starting HTTP POST thread to: {}", baseEndpointURL);
|
2024-03-08 13:43:43 +01:00
|
|
|
lastHTTPPost = new HTTPPost(baseEndpointURL, event);
|
|
|
|
Thread postThread = new Thread(lastHTTPPost);
|
2024-03-05 11:49:47 +01:00
|
|
|
postThread.start();
|
|
|
|
try {
|
|
|
|
postThread.join();
|
2024-03-08 13:43:43 +01:00
|
|
|
return lastHTTPPost.getResult();
|
2024-03-05 11:49:47 +01:00
|
|
|
} catch (InterruptedException e) {
|
|
|
|
log.error("While waiting for HTTP Post thread termination", e);
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public JSONObject retrive(String id) {
|
2024-03-08 13:43:43 +01:00
|
|
|
lastHTTPGet = new HTTPGet(baseEndpointURL, id);
|
|
|
|
return lastHTTPGet.readJSON();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public int getLastRetrieveHTTPResponseCode() {
|
|
|
|
return lastHTTPGet != null ? lastHTTPGet.gethttpResponseCode() : -1;
|
2024-03-05 11:49:47 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
protected URL getTokenURL() {
|
|
|
|
return tokenURL;
|
|
|
|
}
|
|
|
|
|
|
|
|
protected abstract JWTToken getAuthorizationToken() throws OpenIdConnectRESTHelperException;
|
|
|
|
|
|
|
|
public abstract class HTTPVerb {
|
|
|
|
|
2024-03-06 12:09:09 +01:00
|
|
|
protected static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
|
2024-04-22 16:17:59 +02:00
|
|
|
protected static final int DEFAULT_READ_TIMEOUT = 60000;
|
2024-03-05 11:49:47 +01:00
|
|
|
|
|
|
|
protected URL baseEndpoint;
|
2024-03-08 13:43:43 +01:00
|
|
|
protected int httpResponseCode = -1;
|
|
|
|
protected String httpContentType;
|
2024-03-05 11:49:47 +01:00
|
|
|
|
|
|
|
public HTTPVerb(URL baseEndpoint) {
|
|
|
|
this.baseEndpoint = baseEndpoint;
|
|
|
|
}
|
|
|
|
|
2024-03-08 13:43:43 +01:00
|
|
|
public int gethttpResponseCode() {
|
|
|
|
return httpResponseCode;
|
|
|
|
}
|
|
|
|
|
2024-03-05 11:49:47 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
public class HTTPPost extends HTTPVerb implements Runnable {
|
|
|
|
|
2024-04-22 16:17:59 +02:00
|
|
|
private static final int FIRST_PAUSE = 7500; // ms
|
|
|
|
private static final int PAUSE_INCREMENT_FACTOR = 4;
|
|
|
|
private static final long MAX_RETRYINGS = 5;
|
|
|
|
private static final boolean RETRY_ON_READ_TIMEOUT = false;
|
2024-03-05 11:49:47 +01:00
|
|
|
|
|
|
|
private final int[] RETRY_CODES = { HttpURLConnection.HTTP_BAD_GATEWAY,
|
|
|
|
HttpURLConnection.HTTP_CLIENT_TIMEOUT, HttpURLConnection.HTTP_GATEWAY_TIMEOUT,
|
|
|
|
HttpURLConnection.HTTP_INTERNAL_ERROR };
|
|
|
|
|
|
|
|
private Event event;
|
|
|
|
private String result;
|
2024-04-22 16:17:59 +02:00
|
|
|
private long actualPause = FIRST_PAUSE;
|
2024-03-05 11:49:47 +01:00
|
|
|
private long retryings = 0;
|
|
|
|
|
|
|
|
public HTTPPost(URL baseEndpoint, Event event) {
|
|
|
|
super(baseEndpoint);
|
|
|
|
this.event = event;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
try {
|
|
|
|
URL eventEndpoint = null;
|
2024-03-08 13:43:43 +01:00
|
|
|
if (baseEndpoint.toString().endsWith("/")) {
|
|
|
|
try {
|
|
|
|
log.debug("Removing trailing slash from base URL since the endpoint computation API changed");
|
2024-04-22 16:17:59 +02:00
|
|
|
eventEndpoint = new URL(
|
|
|
|
baseEndpoint.toString().substring(0, baseEndpoint.toString().length() - 1));
|
2024-03-08 13:43:43 +01:00
|
|
|
} catch (MalformedURLException e) {
|
|
|
|
log.error("Cannot remove trailing slash from base endpoint URL", e);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
eventEndpoint = baseEndpoint;
|
2024-03-05 11:49:47 +01:00
|
|
|
}
|
|
|
|
boolean OK = false;
|
|
|
|
do {
|
|
|
|
try {
|
|
|
|
log.debug("Getting auth token for client '{}' if needed", clientId);
|
|
|
|
JWTToken token = getAuthorizationToken();
|
2024-03-08 13:43:43 +01:00
|
|
|
log.debug("Performing HTTP POST to: {}", eventEndpoint);
|
2024-03-05 11:49:47 +01:00
|
|
|
HttpURLConnection connection = (HttpURLConnection) eventEndpoint.openConnection();
|
|
|
|
connection.setRequestMethod("POST");
|
2024-03-06 12:09:09 +01:00
|
|
|
connection.setConnectTimeout(connectionTimeout);
|
2024-03-05 11:49:47 +01:00
|
|
|
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
2024-03-06 12:09:09 +01:00
|
|
|
connection.setReadTimeout(readTimeout);
|
2024-03-05 11:49:47 +01:00
|
|
|
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
|
|
|
connection.setRequestProperty("Content-Type", "application/json");
|
2024-03-08 13:43:43 +01:00
|
|
|
connection.setRequestProperty("Accept", "text/plain");
|
2024-03-05 11:49:47 +01:00
|
|
|
connection.setDoOutput(true);
|
|
|
|
if (token != null) {
|
|
|
|
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
|
|
|
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
|
|
|
} else {
|
|
|
|
log.debug("Sending request without authorization header");
|
|
|
|
}
|
|
|
|
OutputStream os = connection.getOutputStream();
|
2024-05-16 10:35:00 +02:00
|
|
|
String jsonString = new StartWorkflow(event).toJSONString();
|
2024-03-05 11:49:47 +01:00
|
|
|
log.trace("Sending event JSON: {}", jsonString);
|
|
|
|
os.write(jsonString.getBytes("UTF-8"));
|
|
|
|
os.flush();
|
|
|
|
os.close();
|
|
|
|
|
|
|
|
StringBuilder sb = new StringBuilder();
|
2024-03-08 13:43:43 +01:00
|
|
|
httpResponseCode = connection.getResponseCode();
|
|
|
|
log.trace("HTTP Response code: {}", httpResponseCode);
|
|
|
|
httpContentType = connection.getContentType();
|
|
|
|
log.trace("HTTP Response content type is: {}", httpContentType);
|
2024-03-05 11:49:47 +01:00
|
|
|
|
|
|
|
log.trace("Reading response");
|
|
|
|
InputStreamReader isr = null;
|
2024-03-08 13:43:43 +01:00
|
|
|
if (httpResponseCode == HttpURLConnection.HTTP_OK) {
|
2024-03-05 11:49:47 +01:00
|
|
|
OK = true;
|
|
|
|
// From this point ahead every exception should not set OK to false
|
|
|
|
// since the server acknowledged with 200
|
|
|
|
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
|
|
|
} else {
|
|
|
|
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
|
|
|
}
|
2024-04-22 16:17:59 +02:00
|
|
|
try {
|
|
|
|
BufferedReader br = new BufferedReader(isr);
|
|
|
|
String line = null;
|
|
|
|
while ((line = br.readLine()) != null) {
|
|
|
|
sb.append("\n" + line);
|
|
|
|
}
|
|
|
|
br.close();
|
|
|
|
isr.close();
|
|
|
|
sb.deleteCharAt(0);
|
|
|
|
result = sb.toString();
|
|
|
|
} catch (Exception e) {
|
|
|
|
log.error("Error reading response", e);
|
|
|
|
break;
|
2024-03-05 11:49:47 +01:00
|
|
|
}
|
|
|
|
if (OK) {
|
2024-03-08 13:43:43 +01:00
|
|
|
log.debug("[{}] Event publish for {} is OK", httpResponseCode, event.getName());
|
2024-03-05 11:49:47 +01:00
|
|
|
} else {
|
|
|
|
log.trace("Response message from server:\n{}", result);
|
2024-03-08 13:43:43 +01:00
|
|
|
if (shouldRetryWithCode(httpResponseCode)) {
|
2024-04-22 16:17:59 +02:00
|
|
|
log.warn("[{}] Event publish ERROR", httpResponseCode);
|
2024-03-05 11:49:47 +01:00
|
|
|
} else {
|
2024-04-22 16:17:59 +02:00
|
|
|
log.info("[{}] Event publish ERROR but not retrying to send it with this HTTP code",
|
2024-03-08 13:43:43 +01:00
|
|
|
httpResponseCode);
|
2024-03-05 11:49:47 +01:00
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2024-04-22 16:17:59 +02:00
|
|
|
} catch (SocketTimeoutException e) {
|
|
|
|
log.error("Read timeout POSTing JSON to: " + eventEndpoint, e);
|
|
|
|
if (RETRY_ON_READ_TIMEOUT) {
|
|
|
|
log.trace("Event will be re-sent accodingly to retryes");
|
|
|
|
|
|
|
|
} else {
|
|
|
|
log.trace("Event will be not re-sent");
|
|
|
|
break;
|
|
|
|
}
|
2024-03-05 11:49:47 +01:00
|
|
|
} catch (IOException | OpenIdConnectRESTHelperException e) {
|
2024-04-22 16:17:59 +02:00
|
|
|
log.error("I/O Error POSTing JSON to: " + eventEndpoint, e);
|
|
|
|
}
|
|
|
|
if (!OK) {
|
|
|
|
if (retryings <= MAX_RETRYINGS) {
|
|
|
|
log.info("Retrying to send event in {} ms", actualPause);
|
|
|
|
|
|
|
|
Thread.sleep(actualPause);
|
|
|
|
log.debug("Start retrying event publish: {}", event.getName());
|
|
|
|
actualPause *= PAUSE_INCREMENT_FACTOR;
|
|
|
|
retryings += 1;
|
|
|
|
} else {
|
|
|
|
log.error("[{}] Event publish ERROR, exhausted tries after {} retryings",
|
|
|
|
httpResponseCode,
|
|
|
|
retryings);
|
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
2024-03-05 11:49:47 +01:00
|
|
|
}
|
|
|
|
} while (!OK);
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
log.error("Sleeping before retry event send", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private boolean shouldRetryWithCode(int httpResultCode) {
|
|
|
|
return Arrays.binarySearch(RETRY_CODES, httpResultCode) > 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
public String getResult() {
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
public class HTTPGet extends HTTPVerb {
|
|
|
|
|
|
|
|
private String id;
|
|
|
|
|
|
|
|
public HTTPGet(URL baseEndpoint, String id) {
|
|
|
|
super(baseEndpoint);
|
|
|
|
this.id = id;
|
|
|
|
}
|
|
|
|
|
|
|
|
public JSONObject readJSON() {
|
|
|
|
URL endpoint = null;
|
|
|
|
JSONObject results = null;
|
|
|
|
try {
|
2024-03-08 13:43:43 +01:00
|
|
|
String baseEndpointString = baseEndpoint.toString();
|
|
|
|
if (baseEndpointString.endsWith("/")) {
|
|
|
|
endpoint = new URL(baseEndpointString + id);
|
|
|
|
} else {
|
|
|
|
endpoint = new URL(baseEndpointString + "/" + id);
|
|
|
|
}
|
2024-03-05 11:49:47 +01:00
|
|
|
} catch (MalformedURLException e) {
|
|
|
|
log.error("Cannot compute retrieve endpoint URL. ID: " + id + ", base endpoint: " + baseEndpointURL, e);
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
try {
|
|
|
|
log.debug("Getting auth token for client '{}' if needed", clientId);
|
|
|
|
JWTToken token = getAuthorizationToken();
|
|
|
|
|
|
|
|
log.debug("Performing HTTP GET to: {}", endpoint);
|
|
|
|
HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection();
|
|
|
|
connection.setRequestMethod("GET");
|
2024-03-06 12:09:09 +01:00
|
|
|
connection.setConnectTimeout(connectionTimeout);
|
2024-03-05 11:49:47 +01:00
|
|
|
log.trace("HTTP connection timeout set to: {}", connection.getConnectTimeout());
|
2024-03-06 12:09:09 +01:00
|
|
|
connection.setReadTimeout(readTimeout);
|
2024-03-05 11:49:47 +01:00
|
|
|
log.trace("HTTP connection Read timeout set to: {}", connection.getReadTimeout());
|
|
|
|
if (token != null) {
|
|
|
|
log.debug("Setting authorization header as: {}", token.getAccessTokenAsBearer());
|
|
|
|
connection.setRequestProperty("Authorization", token.getAccessTokenAsBearer());
|
|
|
|
} else {
|
|
|
|
log.debug("Sending request without authorization header");
|
|
|
|
}
|
|
|
|
|
|
|
|
StringBuilder sb = new StringBuilder();
|
2024-03-08 13:43:43 +01:00
|
|
|
httpResponseCode = connection.getResponseCode();
|
|
|
|
log.trace("HTTP Response code: {}", httpResponseCode);
|
|
|
|
httpContentType = connection.getContentType();
|
|
|
|
log.trace("HTTP Response content type is: {}", httpContentType);
|
2024-03-05 11:49:47 +01:00
|
|
|
|
|
|
|
log.trace("Reading response");
|
|
|
|
InputStreamReader isr = null;
|
2024-03-08 13:43:43 +01:00
|
|
|
if (httpResponseCode == HttpURLConnection.HTTP_OK) {
|
2024-03-05 11:49:47 +01:00
|
|
|
isr = new InputStreamReader(connection.getInputStream(), "UTF-8");
|
|
|
|
} else {
|
|
|
|
isr = new InputStreamReader(connection.getErrorStream(), "UTF-8");
|
|
|
|
}
|
|
|
|
BufferedReader br = new BufferedReader(isr);
|
|
|
|
String line = null;
|
|
|
|
while ((line = br.readLine()) != null) {
|
|
|
|
sb.append(line + "\n");
|
|
|
|
}
|
|
|
|
br.close();
|
|
|
|
isr.close();
|
2024-03-08 13:43:43 +01:00
|
|
|
if (httpResponseCode == HttpURLConnection.HTTP_OK) {
|
|
|
|
log.debug("[{}] Got results for {}", httpResponseCode, id);
|
|
|
|
if (httpContentType.equals("application/json")) {
|
|
|
|
try {
|
|
|
|
results = (JSONObject) new JSONParser().parse(sb.toString());
|
|
|
|
} catch (ParseException e) {
|
|
|
|
log.warn("Error parsing results string as JSON: {}", sb.toString());
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
log.warn("Received a non JSON reponse: {}", sb.toString());
|
2024-03-05 11:49:47 +01:00
|
|
|
}
|
|
|
|
} else {
|
2024-03-08 13:43:43 +01:00
|
|
|
log.warn("[{}] Error getting results for ID {}", httpResponseCode, id);
|
2024-03-05 11:49:47 +01:00
|
|
|
}
|
|
|
|
} catch (IOException | OpenIdConnectRESTHelperException e) {
|
|
|
|
log.error("Getting results from: " + endpoint, e);
|
|
|
|
}
|
|
|
|
|
|
|
|
return results;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|