Implemented ElasticSearch Connection Pool

This commit is contained in:
Sandro La Bruzzo 2022-02-08 09:57:45 +01:00
parent 68eed5d523
commit b776a3c7cd
11 changed files with 312 additions and 56 deletions

View File

@ -30,6 +30,11 @@
<artifactId>dhp-schemas</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
</dependencies>

View File

@ -42,11 +42,6 @@ public class MainApplication extends AbstractDnetApp {
@Value("${dhp.swagger.api.basePath}")
private String swaggerPath;
final String descriptionAPI = " <p style=\"text-align:center;\"><img src=\"/logo.png\" alt=\"ScholeXplorer\"> </p>" +
"The Scholix Swagger API allows clients to run REST queries over the Scholexplorer index in order to fetch links matching given criteria. In the current version, clients can search for:" +
"<ul><li>Links whose source object has a given PID or PID type</li>" +
"<li>Links whose source object has been published by a given data source (\"data source as publisher\")</li>" +
"<li>Links that were collected from a given data source (\"data source as provider\").</li></ul>";
public static void main(final String[] args) {
SpringApplication.run(MainApplication.class, args);
@ -57,20 +52,17 @@ public class MainApplication extends AbstractDnetApp {
@Bean
public TaggedCounter myCounter(MeterRegistry meterRegistry) {
return new TaggedCounter("scholixLinkCounter", "links",meterRegistry);
return new TaggedCounter(ScholixAPIConstants.SCHOLIX_MANAGER_COUNTER_NAME, ScholixAPIConstants.SCHOLIX_MANAGER_TAG_NAME,meterRegistry);
}
@Bean
public TimedAspect timedAspect(MeterRegistry meterRegistry) {
MeterFilter mf = new MeterFilter() {
@Override
public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) {
if (id.getName().startsWith("scholix")) {
if (id.getName().startsWith(ScholixAPIConstants.SCHOLIX_COUNTER_PREFIX)) {
return DistributionStatisticConfig.builder()
.percentiles(0.20, 0.50, 0.75,0.95)
.percentilesHistogram(false)
.serviceLevelObjectives( histogramValues)
.build()
@ -79,31 +71,24 @@ public class MainApplication extends AbstractDnetApp {
return config;
}
};
meterRegistry.config().meterFilter(mf);
return new TimedAspect(meterRegistry);
}
@Override
protected void configSwagger(final Docket docket) {
docket
.host(swaggetHost)
.pathMapping(swaggerPath)
.groupName("Scholexplorer V1")
.groupName(ScholixAPIConstants.API_V1_NAME)
.select()
.apis(RequestHandlerSelectors.any())
.paths(p -> p.startsWith("/v1"))
.build()
.apiInfo(new ApiInfoBuilder()
.title("Scholexplorer API V1.0")
.description(descriptionAPI)
.title(ScholixAPIConstants.API_V1_NAME)
.description(ScholixAPIConstants.API_DESCRIPTION)
.version("1.0")
.contact(ApiInfo.DEFAULT_CONTACT)
.license("Apache 2.0")
@ -113,23 +98,20 @@ public class MainApplication extends AbstractDnetApp {
@Bean (name = "SpringDOcketv2")
@Bean (name = "SpringDocketv2")
public Docket v2Docket() {
final Docket docket = new Docket(DocumentationType.SWAGGER_2);
docket
.host(swaggetHost)
.pathMapping(swaggerPath)
.groupName("Scholexplorer V2")
.groupName(ScholixAPIConstants.API_V2_NAME)
.select()
.apis(RequestHandlerSelectors.any())
.paths(p -> p.startsWith("/v2"))
.build()
.apiInfo(new ApiInfoBuilder()
.title("Scholexplorer API V2.0")
.description(descriptionAPI)
.title(ScholixAPIConstants.API_V2_NAME)
.description(ScholixAPIConstants.API_DESCRIPTION)
.version("2.0")
.contact(ApiInfo.DEFAULT_CONTACT)
.license("Apache 2.0")

View File

@ -1,5 +1,6 @@
package eu.dnetlib.scholix.api;
import eu.dnetlib.scholix.api.index.ElasticSearchPool;
import eu.dnetlib.scholix.api.index.ElasticSearchProperties;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
@ -12,20 +13,29 @@ import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfig
import java.time.Duration;
@Configuration
public class RestClientConfig extends AbstractElasticsearchConfiguration {
public class RestClientConfig {
@Autowired
private ElasticSearchProperties elasticSearchProperties;
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchProperties.getClusterNodes().split(","))
.withConnectTimeout(elasticSearchProperties.getConnectionTimeout())
.withSocketTimeout(elasticSearchProperties.getSocketTimeout())
.build();
return RestClients.create(clientConfiguration).rest();
@Bean
public ElasticSearchPool connectionPool() {
ElasticSearchPool pool = new ElasticSearchPool(elasticSearchProperties);
return pool;
}
// @Override
// @Bean
// public RestHighLevelClient elasticsearchClient() {
//
// final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
// .connectedTo(elasticSearchProperties.getClusterNodes().split(","))
// .withConnectTimeout(elasticSearchProperties.getConnectionTimeout())
// .withSocketTimeout(elasticSearchProperties.getSocketTimeout())
// .build();
// return RestClients.create(clientConfiguration).rest();
// }
}

View File

@ -0,0 +1,23 @@
package eu.dnetlib.scholix.api;
public class ScholixAPIConstants {
public static final String API_V1_NAME = "Scholexplorer API V1.0";
public static final String API_V2_NAME = "Scholexplorer API V2.0";
public static String API_DESCRIPTION =" <p style=\"text-align:center;\"><img src=\"/logo.png\" alt=\"ScholeXplorer\"> </p>" +
"The Scholix Swagger API allows clients to run REST queries over the Scholexplorer index in order to fetch links matching given criteria. In the current version, clients can search for:" +
"<ul><li>Links whose source object has a given PID or PID type</li>" +
"<li>Links whose source object has been published by a given data source (\"data source as publisher\")</li>" +
"<li>Links that were collected from a given data source (\"data source as provider\").</li></ul>";
public static String SCHOLIX_MANAGER_COUNTER_NAME= "scholixLinkCounter";
public static final String SCHOLIX_MANAGER_TAG_NAME = "links";
public static String SCHOLIX_COUNTER_PREFIX = "scholix";
}

View File

@ -4,6 +4,7 @@ package eu.dnetlib.scholix.api.controller;
import eu.dnetlib.common.controller.AbstractDnetController;
import eu.dnetlib.dhp.schema.sx.scholix.ScholixIdentifier;
import eu.dnetlib.scholix.api.ScholixAPIVersion;
import eu.dnetlib.scholix.api.ScholixException;
import eu.dnetlib.scholix.api.index.ScholixIndexManager;
import eu.dnetlib.scholix.api.model.v2.PageResultType;
import eu.dnetlib.scholix.api.model.v2.ScholixType;
@ -13,6 +14,7 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import eu.dnetlib.dhp.schema.sx.scholix.Scholix;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.index.query.BoolQueryBuilder;
@ -77,19 +79,20 @@ public class ScholixControllerV2 extends AbstractDnetController {
@Parameter(in = ParameterIn.QUERY,
description = "select page of result") Integer page) throws Exception {
if (StringUtils.isEmpty(sourcePid) && StringUtils.isEmpty(targetPid) && StringUtils.isEmpty(sourcePublisher)&& StringUtils.isEmpty(targetPublisher)&& StringUtils.isEmpty(linkProvider))
throw new ScholixException("The method requires one of the following parameters: sourcePid, targetPid, sourcePublisher, targetPublisher, linkProvider");
final int currentPage = page!= null? page : 0;
System.out.println(currentPage);
Pair<Long, List<Scholix>> scholixResult = manager.linksFromPid(ScholixAPIVersion.V2, linkProvider, targetPid, targetPidType, targetPublisher, targetType, sourcePid, sourcePidType, sourcePublisher, sourceType, harvestedAfter, currentPage);
final PageResultType pageResult = new PageResultType();
pageResult.setTotalPages(scholixResult.getLeft().intValue() / 10);
pageResult.setTotalLinks(scholixResult.getLeft().intValue());
pageResult.setResult(scholixResult.getRight().stream().map(ScholixType::fromScholix).collect(Collectors.toList()));
return pageResult;
try {
final int currentPage = page != null ? page : 0;
Pair<Long, List<Scholix>> scholixResult = manager.linksFromPid(ScholixAPIVersion.V2, linkProvider, targetPid, targetPidType, targetPublisher, targetType, sourcePid, sourcePidType, sourcePublisher, sourceType, harvestedAfter, currentPage);
final PageResultType pageResult = new PageResultType();
pageResult.setTotalPages(scholixResult.getLeft().intValue() / 10);
pageResult.setTotalLinks(scholixResult.getLeft().intValue());
pageResult.setResult(scholixResult.getRight().stream().map(ScholixType::fromScholix).collect(Collectors.toList()));
return pageResult;
} catch (Throwable e) {
throw new ScholixException("Error on requesting url ", e);
}
}

View File

@ -0,0 +1,66 @@
package eu.dnetlib.scholix.api.index;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
public class ElasticSearchClientFactory implements PooledObjectFactory<RestHighLevelClient> {
private ElasticSearchProperties elasticSearchProperties;
public ElasticSearchClientFactory(final ElasticSearchProperties elasticSearchProperties){
this.elasticSearchProperties = elasticSearchProperties;
}
public PooledObject<RestHighLevelClient> makeObject() throws Exception {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchProperties.getClusterNodes().split(","))
.withConnectTimeout(elasticSearchProperties.getConnectionTimeout())
.withSocketTimeout(elasticSearchProperties.getSocketTimeout())
.build();
RestHighLevelClient cc = RestClients.create(clientConfiguration).rest();
return new DefaultPooledObject(cc);
}
public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
RestHighLevelClient client = pooledObject.getObject();
if(client!=null&&client.ping(RequestOptions.DEFAULT)){
try {
client.close();
}catch (Exception e){
//ignore
}
}
}
public boolean validateObject(PooledObject<RestHighLevelClient> pooledObject) {
RestHighLevelClient client = pooledObject.getObject();
try {
return client.ping(RequestOptions.DEFAULT);
}catch(Exception e){
return false;
}
}
public void activateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
RestHighLevelClient client = pooledObject.getObject();
boolean response = client.ping(RequestOptions.DEFAULT);
}
public void passivateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
//nothing
}
}

View File

@ -0,0 +1,19 @@
package eu.dnetlib.scholix.api.index;
import org.elasticsearch.client.RestHighLevelClient;
public class ElasticSearchPool extends Pool<RestHighLevelClient> {
private final ElasticSearchProperties elasticSearchProperties;
public ElasticSearchPool(ElasticSearchProperties elasticSearchProperties){
super(elasticSearchProperties, new ElasticSearchClientFactory(elasticSearchProperties));
this.elasticSearchProperties = elasticSearchProperties;
}
public ElasticSearchProperties getElasticSearchProperties() {
return elasticSearchProperties;
}
}

View File

@ -1,5 +1,6 @@
package eu.dnetlib.scholix.api.index;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@ -10,7 +11,7 @@ import javax.validation.constraints.NotNull;
*/
@Component("elasticSearchProperties")
@ConfigurationProperties(prefix = "scholix.elastic")
public class ElasticSearchProperties {
public class ElasticSearchProperties extends GenericObjectPoolConfig {
@NotNull
private String clusterNodes;

View File

@ -0,0 +1,138 @@
package eu.dnetlib.scholix.api.index;
import eu.dnetlib.scholix.api.ScholixException;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
public class Pool<T> implements Cloneable {
protected GenericObjectPool<T> internalPool ;
public Pool(){
super();
}
public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory){
initPool(poolConfig, factory);
}
public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
if (this.internalPool != null) {
try {
closeInternalPool();
} catch (Exception e) {
}
}
this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
}
protected void closeInternalPool() throws ScholixException {
try {
internalPool.close();
} catch (Exception e) {
throw new ScholixException("Could not destroy the pool", e);
}
}
public T getResource() throws ScholixException {
try {
return internalPool.borrowObject();
} catch (Exception e) {
throw new ScholixException("Could not get a resource from the pool", e);
}
}
public void returnResource(final T resource) throws ScholixException {
if (resource != null) {
returnResourceObject(resource);
}
}
private void returnResourceObject(final T resource) throws ScholixException {
if (resource == null) {
return;
}
try {
internalPool.returnObject(resource);
} catch (Exception e) {
throw new ScholixException("Could not return the resource to the pool", e);
}
}
public void returnBrokenResource(final T resource) throws ScholixException {
if (resource != null) {
returnBrokenResourceObject(resource);
}
}
private void returnBrokenResourceObject(T resource) throws ScholixException {
try {
internalPool.invalidateObject(resource);
} catch (Exception e) {
throw new ScholixException("Could not return the resource to the pool", e);
}
}
public void destroy() throws ScholixException {
closeInternalPool();
}
public int getNumActive() {
if (poolInactive()) {
return -1;
}
return this.internalPool.getNumActive();
}
public int getNumIdle() {
if (poolInactive()) {
return -1;
}
return this.internalPool.getNumIdle();
}
public int getNumWaiters() {
if (poolInactive()) {
return -1;
}
return this.internalPool.getNumWaiters();
}
public long getMeanBorrowWaitTimeMillis() {
if (poolInactive()) {
return -1;
}
return this.internalPool.getMeanBorrowWaitTimeMillis();
}
public long getMaxBorrowWaitTimeMillis() {
if (poolInactive()) {
return -1;
}
return this.internalPool.getMaxBorrowWaitTimeMillis();
}
private boolean poolInactive() {
return this.internalPool == null || this.internalPool.isClosed();
}
public void addObjects(int count) throws Exception {
try {
for (int i = 0; i < count; i++) {
this.internalPool.addObject();
}
} catch (Exception e) {
throw new Exception("Error trying to add idle objects", e);
}
}
}

View File

@ -11,13 +11,14 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.NestedQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@ -45,7 +46,7 @@ public class ScholixIndexManager {
* The Elasticsearch template.
*/
@Autowired
ElasticsearchOperations elasticsearchTemplate;
ElasticSearchPool connectionPool;
/**
* The My counter.
@ -205,12 +206,13 @@ public class ScholixIndexManager {
.withPageable(PageRequest.of(page,10))
.build();
long tt = elasticsearchTemplate.count(finalQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
System.out.println(tt);
RestHighLevelClient client = connectionPool.getResource();
ElasticsearchRestTemplate template = new ElasticsearchRestTemplate(client);
long tt = template.count(finalQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
SearchHits<Scholix> scholixRes = elasticsearchTemplate.search(finalQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
SearchHits<Scholix> scholixRes = template.search(finalQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
System.out.println("SIZE OF HITS ->"+scholixRes.getSearchHits().size());
connectionPool.returnResource(client);
return new ImmutablePair<>(tt,scholixRes.stream().map(SearchHit::getContent).collect(Collectors.toList()));
}

View File

@ -142,6 +142,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
<dependencyManagement>