Logging Circuit breaker
This commit is contained in:
parent
f6cc76a360
commit
7c5e9430cb
|
@ -21,7 +21,13 @@ public abstract class AbstractBatchLogger {
|
||||||
public AbstractBatchLogger(Environment environment) {
|
public AbstractBatchLogger(Environment environment) {
|
||||||
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
|
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
|
||||||
|
|
||||||
executor.scheduleAtFixedRate(() -> this.outputData(), Long.parseLong(environment.getProperty("http-logger.initial-delay")), Long.parseLong(environment.getProperty("http-logger.delay")), TimeUnit.SECONDS);
|
executor.scheduleAtFixedRate(() -> {
|
||||||
|
try {
|
||||||
|
this.outputData();
|
||||||
|
} catch (Exception e) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}, Long.parseLong(environment.getProperty("http-logger.initial-delay")), Long.parseLong(environment.getProperty("http-logger.delay")), TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract LoggingOutputType logOutputType();
|
public abstract LoggingOutputType logOutputType();
|
||||||
|
@ -53,5 +59,5 @@ public abstract class AbstractBatchLogger {
|
||||||
} else return null;
|
} else return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract void outputData();
|
public abstract void outputData() throws Exception;
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,6 @@ import org.springframework.core.env.Environment;
|
||||||
import org.springframework.http.HttpEntity;
|
import org.springframework.http.HttpEntity;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.http.ResponseEntity;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.web.client.RestTemplate;
|
import org.springframework.web.client.RestTemplate;
|
||||||
import types.LoggingOutputType;
|
import types.LoggingOutputType;
|
||||||
|
@ -36,9 +35,9 @@ public class HttpRemoteLogger extends AbstractBatchLogger implements Logger {
|
||||||
private Environment environment;
|
private Environment environment;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public HttpRemoteLogger(Environment environment) {
|
public HttpRemoteLogger(Environment environment, RestTemplate loggerClient) {
|
||||||
super(environment);
|
super(environment);
|
||||||
this.rest = new RestTemplate();
|
this.rest = loggerClient;
|
||||||
this.headers = new HttpHeaders();
|
this.headers = new HttpHeaders();
|
||||||
this.environment = environment;
|
this.environment = environment;
|
||||||
headers.add("Content-Type", "application/json");
|
headers.add("Content-Type", "application/json");
|
||||||
|
@ -46,17 +45,12 @@ public class HttpRemoteLogger extends AbstractBatchLogger implements Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void outputData() {
|
public void outputData() throws Exception {
|
||||||
try {
|
|
||||||
String log = this.tranformLog();
|
String log = this.tranformLog();
|
||||||
if (log != null) {
|
if (log != null) {
|
||||||
HttpEntity<String> requestEntity = new HttpEntity(log, headers);
|
HttpEntity<String> requestEntity = new HttpEntity(log, headers);
|
||||||
ResponseEntity<String> Object = rest.exchange(this.environment.getProperty("http-logger.server-address"), HttpMethod.POST, requestEntity, String.class);
|
rest.exchange(this.environment.getProperty("http-logger.server-address"), HttpMethod.POST, requestEntity, String.class);
|
||||||
}
|
}
|
||||||
} catch (Exception ex) {
|
|
||||||
ex.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,20 +1,13 @@
|
||||||
package eu.eudat.configurations;
|
package eu.eudat.configurations.database.elasticsearch;
|
||||||
|
|
||||||
import org.apache.http.HttpHost;
|
import org.apache.http.HttpHost;
|
||||||
import org.elasticsearch.client.Client;
|
|
||||||
import org.elasticsearch.client.RestClient;
|
import org.elasticsearch.client.RestClient;
|
||||||
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
import org.elasticsearch.client.transport.TransportClient;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
|
||||||
import org.elasticsearch.transport.client.PreBuiltTransportClient;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.core.env.Environment;
|
import org.springframework.core.env.Environment;
|
||||||
|
|
||||||
import java.net.InetAddress;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by ikalyvas on 7/5/2018.
|
* Created by ikalyvas on 7/5/2018.
|
||||||
*/
|
*/
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.eudat.configurations;
|
package eu.eudat.configurations.database.sql;
|
||||||
|
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
@ -7,7 +7,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
import org.springframework.context.annotation.*;
|
import org.springframework.context.annotation.*;
|
||||||
import org.springframework.core.env.Environment;
|
import org.springframework.core.env.Environment;
|
||||||
import org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor;
|
import org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor;
|
||||||
import org.springframework.jdbc.datasource.DriverManagerDataSource;
|
|
||||||
import org.springframework.orm.jpa.JpaTransactionManager;
|
import org.springframework.orm.jpa.JpaTransactionManager;
|
||||||
import org.springframework.orm.jpa.JpaVendorAdapter;
|
import org.springframework.orm.jpa.JpaVendorAdapter;
|
||||||
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
|
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.eudat.configurations;
|
package eu.eudat.configurations.database.sql;
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
|
@ -48,7 +48,7 @@ public class DynamicProjectConfigurationDevelImpl implements DynamicProjectConfi
|
||||||
|
|
||||||
JAXBContext jaxbContext = JAXBContext.newInstance(Configuration.class);
|
JAXBContext jaxbContext = JAXBContext.newInstance(Configuration.class);
|
||||||
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
|
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
|
||||||
is = new URL("file:///"+current + "/web/src/main/resources/ProjectConfiguration.xml").openStream();
|
is = new URL("file:///"+current + "/dmp-backend/web/src/main/resources/ProjectConfiguration.xml").openStream();
|
||||||
this.configuration = (Configuration) jaxbUnmarshaller.unmarshal(is);
|
this.configuration = (Configuration) jaxbUnmarshaller.unmarshal(is);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
ex.printStackTrace();
|
ex.printStackTrace();
|
||||||
|
@ -69,14 +69,14 @@ public class DynamicProjectConfigurationDevelImpl implements DynamicProjectConfi
|
||||||
Configuration configuration = this.getConfiguration();
|
Configuration configuration = this.getConfiguration();
|
||||||
List<DynamicField> fields = new LinkedList<>();
|
List<DynamicField> fields = new LinkedList<>();
|
||||||
List<Property> properties = configuration.getConfigurationProperties();
|
List<Property> properties = configuration.getConfigurationProperties();
|
||||||
properties.stream().forEach(item -> {
|
properties.forEach(item -> {
|
||||||
DynamicField dynamicField = new DynamicField();
|
DynamicField dynamicField = new DynamicField();
|
||||||
dynamicField.setId(item.getId());
|
dynamicField.setId(item.getId());
|
||||||
dynamicField.setName(item.getName());
|
dynamicField.setName(item.getName());
|
||||||
dynamicField.setQueryProperty(item.getQueryProperty());
|
dynamicField.setQueryProperty(item.getQueryProperty());
|
||||||
dynamicField.setRequired(item.getRequired());
|
dynamicField.setRequired(item.getRequired());
|
||||||
List<Dependency> dependencies = new LinkedList<>();
|
List<Dependency> dependencies = new LinkedList<>();
|
||||||
item.getDependencies().stream().forEach(dependency -> {
|
item.getDependencies().forEach(dependency -> {
|
||||||
Dependency modelDependency = new Dependency();
|
Dependency modelDependency = new Dependency();
|
||||||
modelDependency.setId(dependency.getId());
|
modelDependency.setId(dependency.getId());
|
||||||
modelDependency.setQueryProperty(dependency.getQueryProperty());
|
modelDependency.setQueryProperty(dependency.getQueryProperty());
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
package eu.eudat.configurations.http;
|
||||||
|
|
||||||
|
import eu.eudat.logic.handlers.CircuitBreakerHttpClientInterceptor;
|
||||||
|
import eu.eudat.logic.utilities.circuitbreaker.Circuit;
|
||||||
|
import eu.eudat.logic.utilities.circuitbreaker.CircuitBreaker;
|
||||||
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.http.client.ClientHttpRequestInterceptor;
|
||||||
|
import org.springframework.web.client.RestTemplate;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by ikalyvas on 2/25/2019.
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class HttpClientConfigurations {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RestTemplate loggerClient(CircuitBreaker circuitBreaker) {
|
||||||
|
RestTemplate restTemplate = new RestTemplate();
|
||||||
|
|
||||||
|
List<ClientHttpRequestInterceptor> interceptors
|
||||||
|
= restTemplate.getInterceptors();
|
||||||
|
if (CollectionUtils.isEmpty(interceptors)) {
|
||||||
|
interceptors = new ArrayList<>();
|
||||||
|
}
|
||||||
|
interceptors.add(new CircuitBreakerHttpClientInterceptor(circuitBreaker));
|
||||||
|
restTemplate.setInterceptors(interceptors);
|
||||||
|
return restTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public CircuitBreaker circuitBreaker(){
|
||||||
|
CircuitBreaker circuitBreaker = new CircuitBreaker();
|
||||||
|
circuitBreaker.addCircuitResolver(
|
||||||
|
new Circuit.CircuitBuilder()
|
||||||
|
.url("http://localhost:31311")
|
||||||
|
.method("POST")
|
||||||
|
.build(),
|
||||||
|
httpRequest -> {
|
||||||
|
System.out.println(httpRequest);
|
||||||
|
});
|
||||||
|
return circuitBreaker;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,13 +1,10 @@
|
||||||
package eu.eudat.configurations;
|
package eu.eudat.configurations.misc;
|
||||||
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.core.task.TaskExecutor;
|
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by ikalyvas on 9/26/2018.
|
* Created by ikalyvas on 9/26/2018.
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.eudat.configurations;
|
package eu.eudat.configurations.web;
|
||||||
|
|
||||||
import eu.eudat.controllers.interceptors.RequestInterceptor;
|
import eu.eudat.controllers.interceptors.RequestInterceptor;
|
||||||
import eu.eudat.logic.handlers.PrincipalArgumentResolver;
|
import eu.eudat.logic.handlers.PrincipalArgumentResolver;
|
|
@ -0,0 +1,27 @@
|
||||||
|
package eu.eudat.logic.handlers;
|
||||||
|
|
||||||
|
import eu.eudat.logic.utilities.circuitbreaker.CircuitBreaker;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.http.HttpRequest;
|
||||||
|
import org.springframework.http.client.ClientHttpRequestExecution;
|
||||||
|
import org.springframework.http.client.ClientHttpRequestInterceptor;
|
||||||
|
import org.springframework.http.client.ClientHttpResponse;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by ikalyvas on 2/25/2019.
|
||||||
|
*/
|
||||||
|
public class CircuitBreakerHttpClientInterceptor implements ClientHttpRequestInterceptor {
|
||||||
|
|
||||||
|
private CircuitBreaker circuitBreaker;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public CircuitBreakerHttpClientInterceptor(CircuitBreaker circuitBreaker) {
|
||||||
|
this.circuitBreaker = circuitBreaker;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {
|
||||||
|
System.out.println(request);
|
||||||
|
return this.circuitBreaker.handle(request, () -> execution.execute(request, body));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
package eu.eudat.logic.utilities.circuitbreaker;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by ikalyvas on 2/25/2019.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
public final class Circuit {
|
||||||
|
private final String url;
|
||||||
|
private final String method;
|
||||||
|
private Integer numberOfFails;
|
||||||
|
private CircuitState state;
|
||||||
|
private final Integer MAX_NUMBER_OF_FAILS = 5;
|
||||||
|
public String getUrl() {
|
||||||
|
return url;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMethod() {
|
||||||
|
return method;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getNumberOfFails() {
|
||||||
|
return numberOfFails;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Circuit(String url, String method, Integer numberOfFails) {
|
||||||
|
this.url = url;
|
||||||
|
this.method = method;
|
||||||
|
this.numberOfFails = numberOfFails;
|
||||||
|
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
|
||||||
|
executor.scheduleAtFixedRate(this::halfOpenCircuit, 0, 20, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public enum CircuitState {
|
||||||
|
CLOSED, HALF_OPENED, OPENED
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class CircuitBuilder {
|
||||||
|
private String url;
|
||||||
|
private String method;
|
||||||
|
|
||||||
|
public CircuitBuilder url(String url) {
|
||||||
|
this.url = url;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CircuitBuilder method(String method) {
|
||||||
|
this.method = method;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Circuit build() {
|
||||||
|
return new Circuit(url, method, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
|
||||||
|
Circuit circuit = (Circuit) o;
|
||||||
|
|
||||||
|
if (!url.equals(circuit.url)) return false;
|
||||||
|
return method.equals(circuit.method);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int result = url.hashCode();
|
||||||
|
result = 31 * result + method.hashCode();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void halfOpenCircuit() {
|
||||||
|
if (this.state == CircuitState.CLOSED) {
|
||||||
|
this.state = CircuitState.HALF_OPENED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addFailure() {
|
||||||
|
this.numberOfFails++;
|
||||||
|
if(this.state == CircuitState.OPENED && this.numberOfFails > MAX_NUMBER_OF_FAILS){
|
||||||
|
this.state = CircuitState.CLOSED;
|
||||||
|
}
|
||||||
|
else if(this.state == CircuitState.HALF_OPENED){
|
||||||
|
this.state = CircuitState.CLOSED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isOpen(){
|
||||||
|
return this.state == CircuitState.OPENED;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
package eu.eudat.logic.utilities.circuitbreaker;
|
||||||
|
|
||||||
|
import eu.eudat.logic.utilities.interfaces.CheckedSupplier;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.springframework.http.HttpRequest;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.client.RestClientResponseException;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by ikalyvas on 2/25/2019.
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class CircuitBreaker {
|
||||||
|
private Set<Circuit> circuits = new HashSet<>();
|
||||||
|
private Map<Circuit, Consumer<HttpRequest>> resolvers = new HashMap<>();
|
||||||
|
|
||||||
|
public <R> R handle(HttpRequest request, CheckedSupplier<R> function) throws RestClientResponseException {
|
||||||
|
Optional<Circuit> circuitOptional = this.circuits.stream()
|
||||||
|
.filter(circuit ->
|
||||||
|
circuit.getMethod().equals(request.getMethod().name()) && circuit.getUrl().equals(request.getURI().toString()))
|
||||||
|
.findFirst();
|
||||||
|
if (circuitOptional.isPresent() && circuitOptional.get().isOpen()) {
|
||||||
|
Consumer<HttpRequest> resolver = resolvers.get(circuitOptional.get());
|
||||||
|
if (resolver != null) {
|
||||||
|
resolver.accept(request);
|
||||||
|
return null;
|
||||||
|
} else throw new RuntimeException("Resolver not found for " + request);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
return function.get();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
if (circuitOptional.isPresent()) circuitOptional.get().addFailure();
|
||||||
|
else
|
||||||
|
circuits.add(new Circuit.CircuitBuilder().method(request.getMethod().name()).url(request.getURI().toString()).build());
|
||||||
|
throw new RestClientResponseException(StringUtils.EMPTY, 400, StringUtils.EMPTY, null, null, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public CircuitBreaker addCircuitResolver(Circuit circuit, Consumer<HttpRequest> requestConsumer) {
|
||||||
|
this.resolvers.put(circuit, requestConsumer);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,8 @@
|
||||||
|
package eu.eudat.logic.utilities.interfaces;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by ikalyvas on 2/25/2019.
|
||||||
|
*/
|
||||||
|
public interface CheckedSupplier<T> {
|
||||||
|
T get() throws Exception;
|
||||||
|
}
|
Loading…
Reference in New Issue