fixed bug if some exception happen the connection pool didn't release the connection. Now it should be fixed

This commit is contained in:
Sandro La Bruzzo 2023-07-18 15:00:53 +02:00
parent b07be69f90
commit dbe6d76c59
4 changed files with 84 additions and 72 deletions

View File

@ -11,6 +11,7 @@
<packaging>jar</packaging>
<artifactId>scholexplorer-api</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>

View File

@ -13,12 +13,9 @@ import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.nested.ParsedNested;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
@ -73,55 +70,53 @@ public class ScholixIndexManager {
}
private QueryBuilder createObjectTypeQuery(final RelationPrefix prefix, final String objectType ) throws ScholixException{
if (prefix == null){
private QueryBuilder createObjectTypeQuery(final RelationPrefix prefix, final String objectType) throws ScholixException {
if (prefix == null) {
throw new ScholixException("prefix cannot be null");
}
return new NestedQueryBuilder(String.format("%s", prefix), new TermQueryBuilder(String.format("%s.objectType",prefix), objectType), ScoreMode.None);
return new NestedQueryBuilder(String.format("%s", prefix), new TermQueryBuilder(String.format("%s.objectType", prefix), objectType), ScoreMode.None);
}
private QueryBuilder createPidTypeQuery(final RelationPrefix prefix, final String pidTypeValue ) throws ScholixException{
if (prefix == null){
private QueryBuilder createPidTypeQuery(final RelationPrefix prefix, final String pidTypeValue) throws ScholixException {
if (prefix == null) {
throw new ScholixException("prefix cannot be null");
}
return new NestedQueryBuilder(String.format("%s.identifier", prefix), new TermQueryBuilder(String.format("%s.identifier.schema",prefix), pidTypeValue), ScoreMode.None);
return new NestedQueryBuilder(String.format("%s.identifier", prefix), new TermQueryBuilder(String.format("%s.identifier.schema", prefix), pidTypeValue), ScoreMode.None);
}
private QueryBuilder createLinkProviderQuery(final String providerName ) throws ScholixException{
if (providerName == null){
private QueryBuilder createLinkProviderQuery(final String providerName) throws ScholixException {
if (providerName == null) {
throw new ScholixException("prefix cannot be null");
}
return new NestedQueryBuilder("linkprovider", new TermQueryBuilder("linkprovider.name",providerName), ScoreMode.None);
return new NestedQueryBuilder("linkprovider", new TermQueryBuilder("linkprovider.name", providerName), ScoreMode.None);
}
private QueryBuilder createLinkPublisherQuery(final RelationPrefix prefix, final String publisher ) throws ScholixException{
if (prefix == null){
private QueryBuilder createLinkPublisherQuery(final RelationPrefix prefix, final String publisher) throws ScholixException {
if (prefix == null) {
throw new ScholixException("prefix cannot be null");
}
return new NestedQueryBuilder(String.format("%s.publisher", prefix), new TermQueryBuilder(String.format("%s.publisher.name",prefix), publisher), ScoreMode.None);
return new NestedQueryBuilder(String.format("%s.publisher", prefix), new TermQueryBuilder(String.format("%s.publisher.name", prefix), publisher), ScoreMode.None);
}
private QueryBuilder createPidValueQuery(final RelationPrefix prefix, final String pidValue ) throws ScholixException{
if (prefix == null){
private QueryBuilder createPidValueQuery(final RelationPrefix prefix, final String pidValue) throws ScholixException {
if (prefix == null) {
throw new ScholixException("prefix cannot be null");
}
return new NestedQueryBuilder(String.format("%s.identifier", prefix), new TermQueryBuilder(String.format("%s.identifier.identifier",prefix), pidValue), ScoreMode.None);
return new NestedQueryBuilder(String.format("%s.identifier", prefix), new TermQueryBuilder(String.format("%s.identifier.identifier", prefix), pidValue), ScoreMode.None);
}
private QueryBuilder createFinalQuery(final List<QueryBuilder> queries) throws ScholixException{
private QueryBuilder createFinalQuery(final List<QueryBuilder> queries) throws ScholixException {
if (queries == null || queries.isEmpty())
throw new ScholixException("the list of queries must be not empty");
if (queries.size() ==1) {
if (queries.size() == 1) {
return queries.get(0);
}
else {
} else {
final BoolQueryBuilder b = new BoolQueryBuilder();
b.must().addAll(queries);
@ -131,7 +126,7 @@ public class ScholixIndexManager {
}
private void incrementPidCounter(RelationPrefix prefix, String value) {
switch (value.toLowerCase()){
switch (value.toLowerCase()) {
case "doi": {
myCounter.increment(String.format("%s_doi", prefix));
break;
@ -149,30 +144,37 @@ public class ScholixIndexManager {
public List<Pair<String, Long>> totalLinksByProvider(final String filterName) throws ScholixException {
final QueryBuilder query = StringUtils.isNoneBlank(filterName)?QueryBuilders.termQuery("linkProviders", filterName):QueryBuilders.matchAllQuery();
final QueryBuilder query = StringUtils.isNoneBlank(filterName) ? QueryBuilders.termQuery("linkProviders", filterName) : QueryBuilders.matchAllQuery();
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(query)
.withSearchType(SearchType.DEFAULT)
.withPageable(PageRequest.of(0,10))
.withPageable(PageRequest.of(0, 10))
.addAggregation(
AggregationBuilders.terms("genres").field("linkProviders").size(100)
.minDocCount(1))
.build();
Pair<RestHighLevelClient, ElasticsearchRestTemplate> resource = connectionPool.getResource();
ElasticsearchRestTemplate client = resource.getValue();
final SearchHits<Scholix> hits = client.search(searchQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
Pair<RestHighLevelClient, ElasticsearchRestTemplate> resource = null;
try {
resource = connectionPool.getResource();
ElasticsearchRestTemplate client = resource.getValue();
final SearchHits<ScholixFlat> hits = client.search(searchQuery, ScholixFlat.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
final Aggregations aggregations = hits.getAggregations();
connectionPool.returnResource(resource);
if(aggregations == null)
return null;
return ((ParsedStringTerms) aggregations.get("genres")).getBuckets().stream().map(b -> new ImmutablePair<>(b.getKeyAsString(), b.getDocCount())).collect(Collectors.toList());
final Aggregations aggregations = hits.getAggregations();
connectionPool.returnResource(resource);
if (aggregations == null)
return null;
return ((ParsedStringTerms) aggregations.get("genres")).getBuckets().stream().map(b -> new ImmutablePair<>(b.getKeyAsString(), b.getDocCount())).collect(Collectors.toList());
} catch (ScholixException e) {
throw e;
} finally {
if (connectionPool != null) {
connectionPool.returnResource(resource);
}
}
}
@ -181,37 +183,39 @@ public class ScholixIndexManager {
public List<Pair<String, Long>> totalLinksPublisher(final RelationPrefix prefix, final String filterName) throws ScholixException {
final QueryBuilder query = StringUtils.isNoneBlank(filterName)?createLinkPublisherQuery(prefix,filterName):QueryBuilders.matchAllQuery();
final QueryBuilder query = StringUtils.isNoneBlank(filterName) ? createLinkPublisherQuery(prefix, filterName) : QueryBuilders.matchAllQuery();
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(query)
.withSearchType(SearchType.DEFAULT)
.withPageable(PageRequest.of(0,10))
.withPageable(PageRequest.of(0, 10))
.addAggregation(
AggregationBuilders.terms("publishers").field(String.format("%sPublisher", prefix.toString())).size(100)
.minDocCount(1))
.build();
Pair<RestHighLevelClient, ElasticsearchRestTemplate> resource = connectionPool.getResource();
ElasticsearchRestTemplate client = resource.getValue();
final SearchHits<Scholix> hits = client.search(searchQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
Pair<RestHighLevelClient, ElasticsearchRestTemplate> resource = null;
try {
resource = connectionPool.getResource();
ElasticsearchRestTemplate client = resource.getValue();
final SearchHits<ScholixFlat> hits = client.search(searchQuery, ScholixFlat.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
final Aggregations aggregations = hits.getAggregations();
connectionPool.returnResource(resource);
final Aggregations aggregations = hits.getAggregations();
if (aggregations == null)
return null;
if(aggregations == null)
return null;
return ((ParsedStringTerms) aggregations.get("publishers")).getBuckets().stream().map(b -> new ImmutablePair<>(b.getKeyAsString(), b.getDocCount())).collect(Collectors.toList());
return ((ParsedStringTerms) aggregations.get("publishers")).getBuckets().stream().map(b -> new ImmutablePair<>(b.getKeyAsString(), b.getDocCount())).collect(Collectors.toList());
} catch (ScholixException e) {
throw e;
} finally {
if (connectionPool != null) {
connectionPool.returnResource(resource);
}
}
}
/**
* Links from pid pair.
*
@ -229,15 +233,14 @@ public class ScholixIndexManager {
* @throws ScholixException the scholix exception
*/
@Timed(value = "scholix.index.request.links", description = "Time taken to request index")
public Pair<Long,List<Scholix>> linksFromPid ( final String linkProvider,
final String targetPid, final String targetPidType, final String targetPublisher,
final String targetType, final String sourcePid, final String sourcePidType,
final String sourcePublisher, final String sourceType,
final Integer page) throws ScholixException {
public Pair<Long, List<Scholix>> linksFromPid(final String linkProvider,
final String targetPid, final String targetPidType, final String targetPublisher,
final String targetType, final String sourcePid, final String sourcePidType,
final String sourcePublisher, final String sourceType,
final Integer page) throws ScholixException {
if (sourcePid==null && sourcePidType==null && targetPid==null && targetPidType==null && sourcePublisher==null && targetPublisher==null && linkProvider==null)
if (sourcePid == null && sourcePidType == null && targetPid == null && targetPidType == null && sourcePublisher == null && targetPublisher == null && linkProvider == null)
throw new ScholixException("One of sourcePid, targetPid, sourcePublisher, targetPublisher, linkProvider should be not null");
final List<QueryBuilder> queries = new ArrayList<>();
@ -258,17 +261,17 @@ public class ScholixIndexManager {
if (StringUtils.isNoneBlank(targetPidType)) {
assert targetPidType != null;
incrementPidCounter(RelationPrefix.target,targetPidType);
incrementPidCounter(RelationPrefix.target, targetPidType);
queries.add(QueryBuilders.termQuery("targetPidType", targetPidType));
}
if (StringUtils.isNoneBlank(sourcePidType)) {
assert sourcePidType != null;
incrementPidCounter(RelationPrefix.source,sourcePidType);
incrementPidCounter(RelationPrefix.source, sourcePidType);
queries.add(QueryBuilders.termQuery("sourcePidType", sourcePidType));
}
if (StringUtils.isNoneBlank(targetType)) {
myCounter.increment(String.format("targetType_%s", targetType));
myCounter.increment(String.format("targetType_%s", targetType));
queries.add(QueryBuilders.termQuery("targetType", targetType));
}
@ -286,21 +289,28 @@ public class ScholixIndexManager {
NativeSearchQuery finalQuery = new NativeSearchQueryBuilder()
.withQuery(result)
.withPageable(PageRequest.of(page,100))
.withPageable(PageRequest.of(page, 100))
.build();
Pair<RestHighLevelClient, ElasticsearchRestTemplate> resource = connectionPool.getResource();
ElasticsearchRestTemplate client = resource.getValue();
Pair<RestHighLevelClient, ElasticsearchRestTemplate> resource = null;
try {
resource = connectionPool.getResource();
ElasticsearchRestTemplate client = resource.getValue();
long tt = client.count(finalQuery, ScholixFlat.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
long tt = client.count(finalQuery, ScholixFlat .class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
SearchHits<ScholixFlat> scholixRes = client.search(finalQuery, ScholixFlat.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
return new ImmutablePair<>(tt, scholixRes.stream().map(SearchHit::getContent).map(ScholixUtils::getScholixFromBlob).collect(Collectors.toList()));
} catch (ScholixException e) {
throw e;
} finally {
if (connectionPool != null) {
connectionPool.returnResource(resource);
}
SearchHits<ScholixFlat> scholixRes = client.search(finalQuery, ScholixFlat.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
connectionPool.returnResource(resource);
return new ImmutablePair<>(tt,scholixRes.stream().map(SearchHit::getContent).map(ScholixUtils::getScholixFromBlob).collect(Collectors.toList()));
}
}
}

View File

@ -7,6 +7,7 @@ import org.apache.commons.codec.binary.Base64InputStream;
import org.apache.commons.io.IOUtils;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
import java.util.zip.GZIPInputStream;
public class ScholixUtils {
@ -16,7 +17,7 @@ public class ScholixUtils {
private static String uncompress(final String compressed) throws Exception {
Base64InputStream bis = new Base64InputStream(new ByteArrayInputStream(compressed.getBytes()));
GZIPInputStream gzip = new GZIPInputStream(bis);
return IOUtils.toString(gzip);
return IOUtils.toString(gzip, Charset.defaultCharset());
}
public static Scholix getScholixFromBlob(final ScholixFlat flat) {

View File

@ -27,7 +27,7 @@ management.metrics.distribution.percentiles.http.server.requests=0.5, 0.9, 0.95,
#scholix.elastic.clusterNodes = 10.19.65.51:9200,10.19.65.52:9200,10.19.65.53:9200,10.19.65.54:9200
scholix.elastic.clusterNodes = localhost:9200
scholix.elastic.indexName = scholix
scholix.elastic.indexName = dli_scholix
scholix.elastic.socketTimeout = 60000
scholix.elastic.connectionTimeout= 60000