diff --git a/.classpath b/.classpath index 83cab8a..8478cfb 100644 --- a/.classpath +++ b/.classpath @@ -15,7 +15,6 @@ - @@ -32,5 +31,11 @@ + + + + + + diff --git a/.settings/org.eclipse.wst.common.project.facet.core.xml b/.settings/org.eclipse.wst.common.project.facet.core.xml index 058e2bb..54a5299 100644 --- a/.settings/org.eclipse.wst.common.project.facet.core.xml +++ b/.settings/org.eclipse.wst.common.project.facet.core.xml @@ -1,6 +1,5 @@ - - + diff --git a/CHANGELOG.md b/CHANGELOG.md index ae5a8cb..cf925b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Restored correct behavior for event publishing with workflow id only sent back - Extracted `AbstractHTTPWithJWTTokenAuthEventSender` class for easy subclassing - Added new outcome check methods to inspect last send and last check actions results and some other facilities +- Better handling of exceptions and retrying behavior in case of read timeout. (Default connection timeout is 10s and read timeout now is 60s) ## [v1.1.0] - Added `BufferedEventProcessor` to manual send bunch of events and controlling their output status (#23628) diff --git a/pom.xml b/pom.xml index bd58f73..16dbf42 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ maven-parent org.gcube.tools - 1.2.1-SNAPSHOT + 1.2.0 @@ -33,6 +33,11 @@ scm:git:https://code-repo.d4science.org/gCubeSystem/${project.artifactId}.git https://code-repo.d4science.org/gCubeSystem/${project.artifactId} + + 11 + ${java.version} + ${java.version} + diff --git a/src/main/java/org/gcube/event/publisher/AbstractEventPublisher.java b/src/main/java/org/gcube/event/publisher/AbstractEventPublisher.java index fafce24..be91767 100644 --- a/src/main/java/org/gcube/event/publisher/AbstractEventPublisher.java +++ b/src/main/java/org/gcube/event/publisher/AbstractEventPublisher.java @@ -44,7 +44,7 @@ public abstract class AbstractEventPublisher implements EventPublisher { @Override public int getLastPublishEventHTTPResponseCode() { - return eventSender.getLastSendHTTPResponseCode(); + return getEventSender().getLastSendHTTPResponseCode(); } @Override @@ -88,7 +88,7 @@ public abstract class AbstractEventPublisher implements EventPublisher { @Override public int getLastCheckHTTPResponseCode() { - return eventSender.getLastRetrieveHTTPResponseCode(); + return getEventSender().getLastRetrieveHTTPResponseCode(); } protected abstract EventSender createEventSender(); diff --git a/src/main/java/org/gcube/event/publisher/AbstractHTTPWithJWTTokenAuthEventSender.java b/src/main/java/org/gcube/event/publisher/AbstractHTTPWithJWTTokenAuthEventSender.java index 6200e43..4733276 100644 --- a/src/main/java/org/gcube/event/publisher/AbstractHTTPWithJWTTokenAuthEventSender.java +++ b/src/main/java/org/gcube/event/publisher/AbstractHTTPWithJWTTokenAuthEventSender.java @@ -6,6 +6,7 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.MalformedURLException; +import java.net.SocketTimeoutException; import java.net.URL; import java.util.Arrays; @@ -32,21 +33,20 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe public AbstractHTTPWithJWTTokenAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret, URL tokenURL) { + this(baseEndpointURL, clientId, clientSecret, tokenURL, HTTPVerb.DEFAULT_CONNECTION_TIMEOUT, + HTTPVerb.DEFAULT_READ_TIMEOUT); + } + + public AbstractHTTPWithJWTTokenAuthEventSender(URL baseEndpointURL, String clientId, String clientSecret, + URL tokenURL, int connectionTimeout, int readTimeout) { + super(); this.baseEndpointURL = baseEndpointURL; this.clientId = clientId; this.clientSecret = clientSecret; this.tokenURL = tokenURL; - this.connectionTimeout = getDefaultConnectionTimeout(); - this.readTimeout = getDefaultReadTimeout(); - } - - protected int getDefaultReadTimeout() { - return HTTPVerb.DEFAULT_READ_TIMEOUT; - } - - protected int getDefaultConnectionTimeout() { - return HTTPVerb.DEFAULT_CONNECTION_TIMEOUT; + this.connectionTimeout = connectionTimeout; + this.readTimeout = readTimeout; } public int getReadTimeout() { @@ -112,7 +112,7 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe public abstract class HTTPVerb { protected static final int DEFAULT_CONNECTION_TIMEOUT = 10000; - protected static final int DEFAULT_READ_TIMEOUT = 5000; + protected static final int DEFAULT_READ_TIMEOUT = 60000; protected URL baseEndpoint; protected int httpResponseCode = -1; @@ -130,8 +130,10 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe public class HTTPPost extends HTTPVerb implements Runnable { - private static final int PAUSE_INCREMENT_FACTOR = 2; - private static final long MAX_RETRYINGS = 2; + private static final int FIRST_PAUSE = 7500; // ms + private static final int PAUSE_INCREMENT_FACTOR = 4; + private static final long MAX_RETRYINGS = 5; + private static final boolean RETRY_ON_READ_TIMEOUT = false; private final int[] RETRY_CODES = { HttpURLConnection.HTTP_BAD_GATEWAY, HttpURLConnection.HTTP_CLIENT_TIMEOUT, HttpURLConnection.HTTP_GATEWAY_TIMEOUT, @@ -139,7 +141,7 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe private Event event; private String result; - private long actualPause = 1; + private long actualPause = FIRST_PAUSE; private long retryings = 0; public HTTPPost(URL baseEndpoint, Event event) { @@ -154,7 +156,8 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe if (baseEndpoint.toString().endsWith("/")) { try { log.debug("Removing trailing slash from base URL since the endpoint computation API changed"); - eventEndpoint = new URL(baseEndpoint.toString().substring(0, baseEndpoint.toString().length() - 1)); + eventEndpoint = new URL( + baseEndpoint.toString().substring(0, baseEndpoint.toString().length() - 1)); } catch (MalformedURLException e) { log.error("Cannot remove trailing slash from base endpoint URL", e); return; @@ -206,45 +209,60 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe } else { isr = new InputStreamReader(connection.getErrorStream(), "UTF-8"); } - BufferedReader br = new BufferedReader(isr); - String line = null; - while ((line = br.readLine()) != null) { - sb.append("\n" + line); + try { + BufferedReader br = new BufferedReader(isr); + String line = null; + while ((line = br.readLine()) != null) { + sb.append("\n" + line); + } + br.close(); + isr.close(); + sb.deleteCharAt(0); + result = sb.toString(); + } catch (Exception e) { + log.error("Error reading response", e); + break; } - br.close(); - isr.close(); - sb.deleteCharAt(0); - result = sb.toString(); if (OK) { log.debug("[{}] Event publish for {} is OK", httpResponseCode, event.getName()); } else { log.trace("Response message from server:\n{}", result); if (shouldRetryWithCode(httpResponseCode)) { - if (retryings <= MAX_RETRYINGS) { - log.warn("[{}] Event publish ERROR, retrying in {} seconds", httpResponseCode, - actualPause); - - Thread.sleep(actualPause * 1000); - log.debug("Start retrying event publish: {}", event.getName()); - actualPause *= PAUSE_INCREMENT_FACTOR; - retryings += 1; - } else { - log.error("[{}] Event publish ERROR, exhausted tries after {} retryings", - httpResponseCode, - retryings); - - break; - } + log.warn("[{}] Event publish ERROR", httpResponseCode); } else { - log.info("[{}] Event publish ERROR but should not retry with this HTTP code", + log.info("[{}] Event publish ERROR but not retrying to send it with this HTTP code", httpResponseCode); break; } } + } catch (SocketTimeoutException e) { + log.error("Read timeout POSTing JSON to: " + eventEndpoint, e); + if (RETRY_ON_READ_TIMEOUT) { + log.trace("Event will be re-sent accodingly to retryes"); + + } else { + log.trace("Event will be not re-sent"); + break; + } } catch (IOException | OpenIdConnectRESTHelperException e) { - log.error("POSTing JSON to: " + eventEndpoint, e); - break; + log.error("I/O Error POSTing JSON to: " + eventEndpoint, e); + } + if (!OK) { + if (retryings <= MAX_RETRYINGS) { + log.info("Retrying to send event in {} ms", actualPause); + + Thread.sleep(actualPause); + log.debug("Start retrying event publish: {}", event.getName()); + actualPause *= PAUSE_INCREMENT_FACTOR; + retryings += 1; + } else { + log.error("[{}] Event publish ERROR, exhausted tries after {} retryings", + httpResponseCode, + retryings); + + break; + } } } while (!OK); } catch (InterruptedException e) {