Implemented LinkProvider aggregations

This commit is contained in:
Sandro La Bruzzo 2022-02-09 11:33:09 +01:00
parent b776a3c7cd
commit fc74c86ba9
6 changed files with 203 additions and 51 deletions

View File

@ -2,15 +2,9 @@ package eu.dnetlib.scholix.api;
import eu.dnetlib.scholix.api.index.ElasticSearchPool;
import eu.dnetlib.scholix.api.index.ElasticSearchProperties;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import java.time.Duration;
@Configuration
public class RestClientConfig {
@ -22,20 +16,11 @@ public class RestClientConfig {
@Bean
public ElasticSearchPool connectionPool() {
elasticSearchProperties.setMaxIdle(5);
elasticSearchProperties.setMaxTotal(10);
ElasticSearchPool pool = new ElasticSearchPool(elasticSearchProperties);
return pool;
}
// @Override
// @Bean
// public RestHighLevelClient elasticsearchClient() {
//
// final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
// .connectedTo(elasticSearchProperties.getClusterNodes().split(","))
// .withConnectTimeout(elasticSearchProperties.getConnectionTimeout())
// .withSocketTimeout(elasticSearchProperties.getSocketTimeout())
// .build();
// return RestClients.create(clientConfiguration).rest();
// }
}

View File

@ -1,16 +1,22 @@
package eu.dnetlib.scholix.api.controller;
import eu.dnetlib.scholix.api.ScholixException;
import eu.dnetlib.scholix.api.index.ScholixIndexManager;
import eu.dnetlib.scholix.api.model.v2.LinkProviderType;
import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@RestController
@RequestMapping("/v2")
@ -19,17 +25,25 @@ import java.util.List;
})
public class LinkProviderV2 {
@Autowired
ScholixIndexManager manager;
@Operation(
summary = "Get all Link Providers",
description = "Return a list of link provider and relative number of relations")
@GetMapping("/LinkProvider")
public List<LinkProviderType> getLinkProviders(
@Parameter(in = ParameterIn.QUERY, description = "Filter the link provider name") @RequestParam(required = false) String name
) {
return Arrays.asList(
new LinkProviderType().name("pippo").totalRelationships(30),
new LinkProviderType().name("pluto").totalRelationships(30),
new LinkProviderType().name("peppa").totalRelationships(30)
);
) throws ScholixException {
List<Pair<String, Long>> result = manager.linksByProvider(null);
if (result==null)
return new ArrayList<>();
return result.stream().map(s -> new LinkProviderType().name(s.getLeft()).totalRelationships(s.getValue().intValue())).collect(Collectors.toList());
}
}

View File

@ -1,5 +1,7 @@
package eu.dnetlib.scholix.api.index;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
@ -7,20 +9,27 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
public class ElasticSearchClientFactory implements PooledObjectFactory<RestHighLevelClient> {
/**
* The type Elastic search client factory.
*/
public class ElasticSearchClientFactory implements PooledObjectFactory<Pair<RestHighLevelClient, ElasticsearchRestTemplate>> {
private ElasticSearchProperties elasticSearchProperties;
private final ElasticSearchProperties elasticSearchProperties;
/**
* Instantiates a new Elastic search client factory.
*
* @param elasticSearchProperties the elastic search properties
*/
public ElasticSearchClientFactory(final ElasticSearchProperties elasticSearchProperties){
this.elasticSearchProperties = elasticSearchProperties;
}
public PooledObject<RestHighLevelClient> makeObject() throws Exception {
public PooledObject<Pair<RestHighLevelClient, ElasticsearchRestTemplate>> makeObject() throws Exception {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchProperties.getClusterNodes().split(","))
@ -29,12 +38,11 @@ public class ElasticSearchClientFactory implements PooledObjectFactory<RestHighL
.build();
RestHighLevelClient cc = RestClients.create(clientConfiguration).rest();
return new DefaultPooledObject(cc);
return new DefaultPooledObject<>(new ImmutablePair<>(cc, new ElasticsearchRestTemplate(cc)));
}
public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
RestHighLevelClient client = pooledObject.getObject();
public void destroyObject(PooledObject<Pair<RestHighLevelClient, ElasticsearchRestTemplate>> pooledObject) throws Exception {
RestHighLevelClient client = pooledObject.getObject().getLeft();
if(client!=null&&client.ping(RequestOptions.DEFAULT)){
try {
client.close();
@ -44,8 +52,8 @@ public class ElasticSearchClientFactory implements PooledObjectFactory<RestHighL
}
}
public boolean validateObject(PooledObject<RestHighLevelClient> pooledObject) {
RestHighLevelClient client = pooledObject.getObject();
public boolean validateObject(PooledObject<Pair<RestHighLevelClient, ElasticsearchRestTemplate>> pooledObject) {
RestHighLevelClient client = pooledObject.getObject().getLeft();
try {
return client.ping(RequestOptions.DEFAULT);
}catch(Exception e){
@ -53,12 +61,12 @@ public class ElasticSearchClientFactory implements PooledObjectFactory<RestHighL
}
}
public void activateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
RestHighLevelClient client = pooledObject.getObject();
public void activateObject(PooledObject<Pair<RestHighLevelClient, ElasticsearchRestTemplate>> pooledObject) throws Exception {
RestHighLevelClient client = pooledObject.getObject().getLeft();
boolean response = client.ping(RequestOptions.DEFAULT);
}
public void passivateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
public void passivateObject(PooledObject<Pair<RestHighLevelClient, ElasticsearchRestTemplate>> pooledObject) throws Exception {
//nothing
}

View File

@ -1,16 +1,31 @@
package eu.dnetlib.scholix.api.index;
import org.apache.commons.lang3.tuple.Pair;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
public class ElasticSearchPool extends Pool<RestHighLevelClient> {
/**
* The type Elastic search pool.
*/
public class ElasticSearchPool extends Pool<Pair<RestHighLevelClient, ElasticsearchRestTemplate>> {
private final ElasticSearchProperties elasticSearchProperties;
/**
* Instantiates a new Elastic search pool.
*
* @param elasticSearchProperties the elastic search properties
*/
public ElasticSearchPool(ElasticSearchProperties elasticSearchProperties){
super(elasticSearchProperties, new ElasticSearchClientFactory(elasticSearchProperties));
this.elasticSearchProperties = elasticSearchProperties;
}
/**
* Gets elastic search properties.
*
* @return the elastic search properties
*/
public ElasticSearchProperties getElasticSearchProperties() {
return elasticSearchProperties;
}

View File

@ -5,18 +5,52 @@ import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
/**
* When using the Java High Level REST Client provided by the Elasticsearch official website, it is found that there is no in the client API.
* Connecting to connect the pool, create a new connection every time, this is impact in high concurrency situation, so it is ready to be on the client
* API increases the concept of pool.
*
* Fortunately, we don't need to turn your weight to write the implementation of the connection pool, because Apache provides us with the general framework of the connection pool.
* Commons-pool2, and we only need to implement some logic according to the frame design. Used in the REDIS client API
* Jedispool is based on Commons-pool2 implementation.
*
* Let's take a look at how to achieve it.
*
* First we have to create a pool class, this pool introduces GenericObjectPool in Commons-pool2 through dependent manner. In this class
* In, we define how to borrow objects and returns objects from the pool.
*
* @param <T> the type parameter
*/
public class Pool<T> implements Cloneable {
/**
* The Internal pool.
*/
protected GenericObjectPool<T> internalPool ;
/**
* Instantiates a new Pool.
*/
public Pool(){
super();
}
/**
* Instantiates a new Pool.
*
* @param poolConfig the pool config
* @param factory the factory
*/
public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory){
initPool(poolConfig, factory);
}
/**
* Init pool.
*
* @param poolConfig the pool config
* @param factory the factory
*/
public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
if (this.internalPool != null) {
@ -29,6 +63,11 @@ public class Pool<T> implements Cloneable {
this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
}
/**
* Close internal pool.
*
* @throws ScholixException the scholix exception
*/
protected void closeInternalPool() throws ScholixException {
try {
internalPool.close();
@ -37,6 +76,12 @@ public class Pool<T> implements Cloneable {
}
}
/**
* Gets resource.
*
* @return the resource
* @throws ScholixException the scholix exception
*/
public T getResource() throws ScholixException {
try {
return internalPool.borrowObject();
@ -46,6 +91,12 @@ public class Pool<T> implements Cloneable {
}
/**
* Return resource.
*
* @param resource the resource
* @throws ScholixException the scholix exception
*/
public void returnResource(final T resource) throws ScholixException {
if (resource != null) {
returnResourceObject(resource);
@ -63,6 +114,12 @@ public class Pool<T> implements Cloneable {
}
}
/**
* Return broken resource.
*
* @param resource the resource
* @throws ScholixException the scholix exception
*/
public void returnBrokenResource(final T resource) throws ScholixException {
if (resource != null) {
returnBrokenResourceObject(resource);
@ -77,11 +134,21 @@ public class Pool<T> implements Cloneable {
}
}
/**
* Destroy.
*
* @throws ScholixException the scholix exception
*/
public void destroy() throws ScholixException {
closeInternalPool();
}
/**
* Gets num active.
*
* @return the num active
*/
public int getNumActive() {
if (poolInactive()) {
return -1;
@ -90,6 +157,11 @@ public class Pool<T> implements Cloneable {
return this.internalPool.getNumActive();
}
/**
* Gets num idle.
*
* @return the num idle
*/
public int getNumIdle() {
if (poolInactive()) {
return -1;
@ -98,6 +170,11 @@ public class Pool<T> implements Cloneable {
return this.internalPool.getNumIdle();
}
/**
* Gets num waiters.
*
* @return the num waiters
*/
public int getNumWaiters() {
if (poolInactive()) {
return -1;
@ -106,6 +183,11 @@ public class Pool<T> implements Cloneable {
return this.internalPool.getNumWaiters();
}
/**
* Gets mean borrow wait time millis.
*
* @return the mean borrow wait time millis
*/
public long getMeanBorrowWaitTimeMillis() {
if (poolInactive()) {
return -1;
@ -114,6 +196,11 @@ public class Pool<T> implements Cloneable {
return this.internalPool.getMeanBorrowWaitTimeMillis();
}
/**
* Gets max borrow wait time millis.
*
* @return the max borrow wait time millis
*/
public long getMaxBorrowWaitTimeMillis() {
if (poolInactive()) {
return -1;
@ -126,6 +213,12 @@ public class Pool<T> implements Cloneable {
return this.internalPool == null || this.internalPool.isClosed();
}
/**
* Add objects.
*
* @param count the count
* @throws Exception the exception
*/
public void addObjects(int count) throws Exception {
try {
for (int i = 0; i < count; i++) {

View File

@ -11,11 +11,14 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.NestedQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.nested.ParsedNested;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
@ -114,8 +117,6 @@ public class ScholixIndexManager {
}
private void incrementPidCounter(pidTypePrefix prefix, String value) {
switch (value.toLowerCase()){
case "doi": {
myCounter.increment(String.format("%s_doi", prefix));
@ -127,12 +128,46 @@ public class ScholixIndexManager {
}
default:
myCounter.increment(String.format("%s_other", prefix));
}
}
public List<Pair<String, Long>> linksByProvider(final String filterName) throws ScholixException {
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchAllQuery())
.withSearchType(SearchType.DEFAULT)
.withPageable(PageRequest.of(0,10))
.addAggregation(AggregationBuilders.nested("nested", "linkprovider")
.subAggregation(AggregationBuilders.terms("by_map").field("linkprovider.name").size(100).minDocCount(1)))
.build();
Pair<RestHighLevelClient, ElasticsearchRestTemplate> resource = connectionPool.getResource();
ElasticsearchRestTemplate client = resource.getValue();
final SearchHits<Scholix> hits = client.search(searchQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
final Aggregations aggregations = hits.getAggregations();
if(aggregations == null)
return null;
final Aggregation aggByMap = ((ParsedNested) aggregations.asMap().get("nested")).getAggregations().asMap().get("by_map");
return ((ParsedStringTerms) aggByMap).getBuckets()
.stream()
.map(b -> new ImmutablePair<>(b.getKeyAsString(), b.getDocCount()))
.collect(Collectors.toList());
}
/**
* Links from pid pair.
*
@ -206,13 +241,15 @@ public class ScholixIndexManager {
.withPageable(PageRequest.of(page,10))
.build();
RestHighLevelClient client = connectionPool.getResource();
ElasticsearchRestTemplate template = new ElasticsearchRestTemplate(client);
long tt = template.count(finalQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
SearchHits<Scholix> scholixRes = template.search(finalQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
Pair<RestHighLevelClient, ElasticsearchRestTemplate> resource = connectionPool.getResource();
ElasticsearchRestTemplate client = resource.getValue();
connectionPool.returnResource(client);
long tt = client.count(finalQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
SearchHits<Scholix> scholixRes = client.search(finalQuery, Scholix.class, IndexCoordinates.of(elasticSearchProperties.getIndexName()));
connectionPool.returnResource(resource);
return new ImmutablePair<>(tt,scholixRes.stream().map(SearchHit::getContent).collect(Collectors.toList()));
}