@ -26,6 +26,8 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
protected URL tokenURL ;
protected int connectionTimeout ;
protected int readTimeout ;
protected HTTPPost lastHTTPPost ;
protected HTTPGet lastHTTPGet ;
public AbstractHTTPWithJWTTokenAuthEventSender ( URL baseEndpointURL , String clientId , String clientSecret ,
URL tokenURL ) {
@ -35,8 +37,8 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
this . clientId = clientId ;
this . clientSecret = clientSecret ;
this . tokenURL = tokenURL ;
connectionTimeout = getDefaultConnectionTimeout ( ) ;
readTimeout = getDefaultReadTimeout ( ) ;
this . connectionTimeout = getDefaultConnectionTimeout ( ) ;
this . readTimeout = getDefaultReadTimeout ( ) ;
}
protected int getDefaultReadTimeout ( ) {
@ -65,19 +67,25 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
@Override
public void send ( Event event ) {
log . debug ( "Starting HTTP POST thread to: {}" , baseEndpointURL ) ;
new Thread ( new HTTPPost ( baseEndpointURL , event ) ) . start ( ) ;
log . debug ( "Starting HTTP POST thread with base URL: {}" , baseEndpointURL ) ;
lastHTTPPost = new HTTPPost ( baseEndpointURL , event ) ;
new Thread ( lastHTTPPost ) . start ( ) ;
}
@Override
public int getLastSendHTTPResponseCode ( ) {
return lastHTTPPost ! = null ? lastHTTPPost . gethttpResponseCode ( ) : - 1 ;
}
@Override
public String sendAndGetResult ( Event event ) {
log . debug ( "Starting HTTP POST thread to: {}" , baseEndpointURL ) ;
HTTPPost post = new HTTPPost ( baseEndpointURL , event ) ;
Thread postThread = new Thread ( p ost) ;
last HTTPPost = new HTTPPost ( baseEndpointURL , event ) ;
Thread postThread = new Thread ( lastHTTPP ost) ;
postThread . start ( ) ;
try {
postThread . join ( ) ;
return p ost. getResult ( ) ;
return lastHTTPP ost. getResult ( ) ;
} catch ( InterruptedException e ) {
log . error ( "While waiting for HTTP Post thread termination" , e ) ;
return null ;
@ -86,7 +94,13 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
@Override
public JSONObject retrive ( String id ) {
return new HTTPGet ( baseEndpointURL , id ) . readJSON ( ) ;
lastHTTPGet = new HTTPGet ( baseEndpointURL , id ) ;
return lastHTTPGet . readJSON ( ) ;
}
@Override
public int getLastRetrieveHTTPResponseCode ( ) {
return lastHTTPGet ! = null ? lastHTTPGet . gethttpResponseCode ( ) : - 1 ;
}
protected URL getTokenURL ( ) {
@ -101,11 +115,17 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
protected static final int DEFAULT_READ_TIMEOUT = 5000 ;
protected URL baseEndpoint ;
protected int httpResponseCode = - 1 ;
protected String httpContentType ;
public HTTPVerb ( URL baseEndpoint ) {
this . baseEndpoint = baseEndpoint ;
}
public int gethttpResponseCode ( ) {
return httpResponseCode ;
}
}
public class HTTPPost extends HTTPVerb implements Runnable {
@ -131,19 +151,23 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
public void run ( ) {
try {
URL eventEndpoint = null ;
try {
eventEndpoint = new URL ( baseEndpointURL , event . getName ( ) ) ;
} catch ( MalformedURLException e ) {
log . error ( "Cannot compute event endpoint URL. Event name: " + event . getName ( ) + ", base endpoint: "
+ baseEndpointURL , e ) ;
return ;
if ( baseEndpoint . toString ( ) . endsWith ( "/" ) ) {
try {
log . debug ( "Removing trailing slash from base URL since the endpoint computation API changed" ) ;
eventEndpoint = new URL ( baseEndpoint . toString ( ) . substring ( 0 , baseEndpoint . toString ( ) . length ( ) - 1 ) ) ;
} catch ( MalformedURLException e ) {
log . error ( "Cannot remove trailing slash from base endpoint URL" , e ) ;
return ;
}
} else {
eventEndpoint = baseEndpoint ;
}
boolean OK = false ;
do {
try {
log . debug ( "Getting auth token for client '{}' if needed" , clientId ) ;
JWTToken token = getAuthorizationToken ( ) ;
log . debug ( "Performing HTTP POST to: {}" , base Endpoint) ;
log . debug ( "Performing HTTP POST to: {}" , event Endpoint) ;
HttpURLConnection connection = ( HttpURLConnection ) eventEndpoint . openConnection ( ) ;
connection . setRequestMethod ( "POST" ) ;
connection . setConnectTimeout ( connectionTimeout ) ;
@ -151,8 +175,7 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
connection . setReadTimeout ( readTimeout ) ;
log . trace ( "HTTP connection Read timeout set to: {}" , connection . getReadTimeout ( ) ) ;
connection . setRequestProperty ( "Content-Type" , "application/json" ) ;
// Commented out as per the Conductor issue: https://github.com/Netflix/conductor/issues/376
// connection.setRequestProperty("Accept", "application/json");
connection . setRequestProperty ( "Accept" , "text/plain" ) ;
connection . setDoOutput ( true ) ;
if ( token ! = null ) {
log . debug ( "Setting authorization header as: {}" , token . getAccessTokenAsBearer ( ) ) ;
@ -168,12 +191,14 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
os . close ( ) ;
StringBuilder sb = new StringBuilder ( ) ;
int httpResultCode = connection . getResponseCode ( ) ;
log . trace ( "HTTP Response code: {}" , httpResultCode ) ;
httpResponseCode = connection . getResponseCode ( ) ;
log . trace ( "HTTP Response code: {}" , httpResponseCode ) ;
httpContentType = connection . getContentType ( ) ;
log . trace ( "HTTP Response content type is: {}" , httpContentType ) ;
log . trace ( "Reading response" ) ;
InputStreamReader isr = null ;
if ( httpRes ult Code = = HttpURLConnection . HTTP_OK ) {
if ( httpRes ponse Code = = HttpURLConnection . HTTP_OK ) {
OK = true ;
// From this point ahead every exception should not set OK to false
// since the server acknowledged with 200
@ -191,12 +216,12 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
sb . deleteCharAt ( 0 ) ;
result = sb . toString ( ) ;
if ( OK ) {
log . debug ( "[{}] Event publish for {} is OK" , httpRes ult Code, event . getName ( ) ) ;
log . debug ( "[{}] Event publish for {} is OK" , httpRes ponse Code, event . getName ( ) ) ;
} else {
log . trace ( "Response message from server:\n{}" , result ) ;
if ( shouldRetryWithCode ( httpRes ult Code) ) {
if ( shouldRetryWithCode ( httpRes ponse Code) ) {
if ( retryings < = MAX_RETRYINGS ) {
log . warn ( "[{}] Event publish ERROR, retrying in {} seconds" , httpRes ult Code,
log . warn ( "[{}] Event publish ERROR, retrying in {} seconds" , httpRes ponse Code,
actualPause ) ;
Thread . sleep ( actualPause * 1000 ) ;
@ -205,20 +230,21 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
retryings + = 1 ;
} else {
log . error ( "[{}] Event publish ERROR, exhausted tries after {} retryings" ,
httpRes ult Code,
httpRes ponse Code,
retryings ) ;
break ;
}
} else {
log . info ( "[{}] Event publish ERROR but should not retry with this HTTP code" ,
httpRes ult Code) ;
httpRes ponse Code) ;
break ;
}
}
} catch ( IOException | OpenIdConnectRESTHelperException e ) {
log . error ( "POSTing JSON to: " + eventEndpoint , e ) ;
break ;
}
} while ( ! OK ) ;
} catch ( InterruptedException e ) {
@ -249,7 +275,12 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
URL endpoint = null ;
JSONObject results = null ;
try {
endpoint = new URL ( baseEndpoint , id ) ;
String baseEndpointString = baseEndpoint . toString ( ) ;
if ( baseEndpointString . endsWith ( "/" ) ) {
endpoint = new URL ( baseEndpointString + id ) ;
} else {
endpoint = new URL ( baseEndpointString + "/" + id ) ;
}
} catch ( MalformedURLException e ) {
log . error ( "Cannot compute retrieve endpoint URL. ID: " + id + ", base endpoint: " + baseEndpointURL , e ) ;
return null ;
@ -273,12 +304,14 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
}
StringBuilder sb = new StringBuilder ( ) ;
int httpResultCode = connection . getResponseCode ( ) ;
log . trace ( "HTTP Response code: {}" , httpResultCode ) ;
httpResponseCode = connection . getResponseCode ( ) ;
log . trace ( "HTTP Response code: {}" , httpResponseCode ) ;
httpContentType = connection . getContentType ( ) ;
log . trace ( "HTTP Response content type is: {}" , httpContentType ) ;
log . trace ( "Reading response" ) ;
InputStreamReader isr = null ;
if ( httpRes ult Code = = HttpURLConnection . HTTP_OK ) {
if ( httpRes ponse Code = = HttpURLConnection . HTTP_OK ) {
isr = new InputStreamReader ( connection . getInputStream ( ) , "UTF-8" ) ;
} else {
isr = new InputStreamReader ( connection . getErrorStream ( ) , "UTF-8" ) ;
@ -290,15 +323,19 @@ public abstract class AbstractHTTPWithJWTTokenAuthEventSender implements EventSe
}
br . close ( ) ;
isr . close ( ) ;
if ( httpResultCode = = HttpURLConnection . HTTP_OK ) {
log . debug ( "[{}] Got results for {}" , httpResultCode , id ) ;
try {
results = ( JSONObject ) new JSONParser ( ) . parse ( sb . toString ( ) ) ;
} catch ( ParseException e ) {
log . warn ( "Error parsing results string as JSON: {}" , sb . toString ( ) ) ;
if ( httpResponseCode = = HttpURLConnection . HTTP_OK ) {
log . debug ( "[{}] Got results for {}" , httpResponseCode , id ) ;
if ( httpContentType . equals ( "application/json" ) ) {
try {
results = ( JSONObject ) new JSONParser ( ) . parse ( sb . toString ( ) ) ;
} catch ( ParseException e ) {
log . warn ( "Error parsing results string as JSON: {}" , sb . toString ( ) ) ;
}
} else {
log . warn ( "Received a non JSON reponse: {}" , sb . toString ( ) ) ;
}
} else {
log . warn ( "[{}] Error getting results for ID {}" , httpResultCode , id ) ;
log . warn ( "[{}] Error getting results for ID {}" , httpRes ponse Code, id ) ;
}
} catch ( IOException | OpenIdConnectRESTHelperException e ) {
log . error ( "Getting results from: " + endpoint , e ) ;