added cnr-resultset-service to dnet-core-components

This commit is contained in:
Claudio Atzori 2019-06-05 17:54:43 +02:00
parent 103acc089e
commit e370a17984
11 changed files with 829 additions and 0 deletions

View File

@ -0,0 +1,61 @@
package eu.dnetlib.enabling.resultset.client;
import java.util.Iterator;
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;
/**
*
* @author claudio
*
*/
public class IterableResultSetClient implements Iterable<String> {
/**
* reference to resultset service.
*/
private ResultSetService resultSet;
/**
* resultset id
*/
private String rsId;
/**
* page size.
*/
private int pageSize;
/**
* timeout
*/
private long timeout;
public IterableResultSetClient(ResultSetService resultSet, String rsId, int pageSize) {
this.resultSet = resultSet;
this.rsId = rsId;
this.pageSize = pageSize;
this.timeout = 0;
}
public IterableResultSetClient(ResultSetService resultSet, String rsId, int pageSize, long timeout) {
this(resultSet, rsId, pageSize);
this.timeout = timeout;
}
@Override
public Iterator<String> iterator() {
if (timeout == 0)
return new ResultSetClientIterator(resultSet, rsId, pageSize);
return new ResultSetClientIterator(resultSet, rsId, pageSize, timeout);
}
public int getPageSize() {
return pageSize;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
}

View File

@ -0,0 +1,44 @@
package eu.dnetlib.enabling.resultset.client;
import javax.xml.ws.wsaddressing.W3CEndpointReference;
/**
*
* @author claudio
*
*/
public interface ResultSetClient {
/**
*
* @param epr
* @return
*/
public IterableResultSetClient getClient(W3CEndpointReference epr);
/**
*
* @param epr
* @return
*/
public IterableResultSetClient getClient(String epr);
/**
*
* @param epr
* @param pageSize
* @return
*/
public IterableResultSetClient getClient(W3CEndpointReference epr, int pageSize);
/**
*
* @param epr
* @param pageSize
* @return
*/
public IterableResultSetClient getClient(String epr, int pageSize);
}

View File

@ -0,0 +1,217 @@
package eu.dnetlib.enabling.resultset.client;
import java.util.Map;
import javax.xml.ws.BindingProvider;
import javax.xml.ws.wsaddressing.W3CEndpointReference;
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;
import eu.dnetlib.enabling.tools.ServiceResolver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.springframework.beans.factory.annotation.Required;
/**
*
* @author claudio
*
*/
public class ResultSetClientFactory implements ResultSetClient {
/**
* logger
*/
private static final Log log = LogFactory.getLog(ResultSetClientFactory.class);
private final static long DEFAULT_CONNECT_TIMEOUT = 10000;
private final static long DEFAULT_REQUEST_TIMEOUT = 60000;
private final static int DEFAULT_PAGE_SIZE = 100;
/**
* used to resolve the epr references to the service endpoint
*/
private ServiceResolver serviceResolver;
/**
* utility object
*/
private EPRUtils eprUtils;
/**
* actual page size
*/
private int pageSize;
/**
* request timeout
*/
private long timeout;
/**
* request timeout
*/
private long connectTimeout;
public ResultSetClientFactory() {
this(DEFAULT_PAGE_SIZE, DEFAULT_REQUEST_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
}
/**
*
* @param pageSize
* @param timeout
* @throws IllegalArgumentException
*/
public ResultSetClientFactory(int pageSize, long timeout) throws IllegalArgumentException {
this(pageSize, timeout, DEFAULT_CONNECT_TIMEOUT);
}
/**
*
* @param pageSize
* @param timeout
* time to wait for server response before throwing a timeout exception
* @param connectTimeout
* time to wait for server to accept the connection before throwing a connection timeout exception
* @throws IllegalArgumentException
*/
public ResultSetClientFactory(int pageSize, long timeout, long connectTimeout) throws IllegalArgumentException {
if (pageSize <= 0 || timeout <= 0 || connectTimeout <= 0) {
throw new IllegalArgumentException("parameters pageSize, timeout and connectTimeout must be greater than zero");
}
log.info(String.format("creating new ResultSetClientIterableFactory with pageSize (%s), read timeout (%s) and connect timeout (%s)",
pageSize, timeout, connectTimeout));
this.pageSize = pageSize;
this.timeout = timeout;
this.connectTimeout = connectTimeout;
}
/**
*
* @param epr
* @param pageSize
* @return
*/
@Override
public IterableResultSetClient getClient(W3CEndpointReference epr, int pageSize) {
final ResultSetService resultSet = getResultSetService(epr, getConnectTimeout(), getTimeout());
final String rsId = serviceResolver.getResourceIdentifier(epr);
//using given pageSize and default timeout
return new IterableResultSetClient(resultSet, rsId, pageSize, getTimeout());
}
/**
*
* @param epr
* @return
*/
@Override
public IterableResultSetClient getClient(W3CEndpointReference epr) {
final ResultSetService resultSet = getResultSetService(epr, getConnectTimeout(), getTimeout());
final String rsId = serviceResolver.getResourceIdentifier(epr);
//using default pageSize and timeouts
return new IterableResultSetClient(resultSet, rsId, getPageSize(), getTimeout());
}
private ResultSetService getResultSetService(final W3CEndpointReference epr, final long connectTimeout, final long requestTimeout) {
final ResultSetService service = serviceResolver.getService(ResultSetService.class, epr);
log.debug(String.format("creting resultSet service stub (%s) with connectTimeout(%s), requestTimeout(%s)", service.getClass().getName(), connectTimeout, requestTimeout));
if(service instanceof Client) {
log.debug(String.format("setting timeouts for %s", Client.class));
final Client client = ClientProxy.getClient(service);
final HTTPConduit http = (HTTPConduit) client.getConduit();
final HTTPClientPolicy httpClientPolicy = new HTTPClientPolicy();
httpClientPolicy.setConnectionTimeout(connectTimeout);
httpClientPolicy.setAllowChunking(false);
httpClientPolicy.setReceiveTimeout(requestTimeout);
http.setClient(httpClientPolicy);
} else if (service instanceof BindingProvider) {
log.debug(String.format("setting timeouts for %s", BindingProvider.class));
final Map<String, Object> requestContext = ((BindingProvider) service).getRequestContext();
// can't be sure about which will be used. Set them all.
requestContext.put("com.sun.xml.internal.ws.request.timeout", requestTimeout);
requestContext.put("com.sun.xml.internal.ws.connect.timeout", connectTimeout);
requestContext.put("com.sun.xml.ws.request.timeout", requestTimeout);
requestContext.put("com.sun.xml.ws.connect.timeout", connectTimeout);
requestContext.put("javax.xml.ws.client.receiveTimeout", requestTimeout);
requestContext.put("javax.xml.ws.client.connectionTimeout", connectTimeout);
}
return service;
}
/**
*
* @param stringEpr
* @param pageSize
* @return
*/
@Override
public IterableResultSetClient getClient(String stringEpr, int pageSize) {
return getClient(eprUtils.getEpr(stringEpr), pageSize);
}
/**
*
* @param stringEpr
* @return
*/
@Override
public IterableResultSetClient getClient(String stringEpr) {
return getClient(eprUtils.getEpr(stringEpr));
}
@Required
public void setServiceResolver(ServiceResolver serviceResolver) {
this.serviceResolver = serviceResolver;
}
@Required
public void setEprUtils(EPRUtils eprUtils) {
this.eprUtils = eprUtils;
}
public int getPageSize() {
return pageSize;
}
@Required
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public long getTimeout() {
return timeout;
}
@Required
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public long getConnectTimeout() {
return connectTimeout;
}
public void setConnectTimeout(final long connectTimeout) {
this.connectTimeout = connectTimeout;
}
}

View File

@ -0,0 +1,94 @@
package eu.dnetlib.enabling.resultset.client;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.enabling.resultset.client.utils.ResultSetRuntimeException;
import eu.dnetlib.enabling.resultset.client.utils.ResultSetTimeoutException;
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;
/**
*
* @author claudio
*
*/
public class ResultSetClientIterator implements Iterator<String> {
/**
* logger
*/
private static final Log log = LogFactory.getLog(ResultSetClientIterator.class);
/**
* the page source
*/
private ResultSetPageProvider pageProvider;
/**
* buffer used to provide a single element for the next() method
*/
private Queue<String> buffer;
public ResultSetClientIterator(ResultSetService resultSet, String rsId)
throws ResultSetRuntimeException {
pageProvider = new ResultSetPageProvider(resultSet, rsId);
buffer = new LinkedList<String>();
}
public ResultSetClientIterator(ResultSetService resultSet, String rsId, int pageSize) {
this(resultSet, rsId);
pageProvider.setPageSize(pageSize);
}
public ResultSetClientIterator(ResultSetService resultSet, String rsId, int pageSize, long timeout) {
this(resultSet, rsId, pageSize);
pageProvider.setMaxWaitTime(timeout);
}
/**
* Tries to refill the buffer with a nextPage()
* @return true if the buffer was filled successfully, false otherwise
* @throws ResultSetTimeoutException in case of timeout
* @throws ResultSetException
* @throws ResultSetException
*/
private boolean refillBuffer() throws ResultSetTimeoutException, ResultSetRuntimeException {
List<String> page = pageProvider.nextPage();
if (page != null && !page.isEmpty()) {
buffer.addAll(page);
return true;
}
return false;
}
@Override
public boolean hasNext() {
if (!buffer.isEmpty())
return true;
return refillBuffer();
}
@Override
public String next() {
if (!hasNext()) {
log.info("NoSuchElementException");
throw new NoSuchElementException();
}
return buffer.poll();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,211 @@
package eu.dnetlib.enabling.resultset.client;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.enabling.resultset.client.utils.ResultSetRuntimeException;
import eu.dnetlib.enabling.resultset.client.utils.ResultSetTimeoutException;
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;
/**
*
* @author claudio
*
*/
public class ResultSetPageProvider {
/**
* logger
*/
private static final Log log = LogFactory.getLog(ResultSetPageProvider.class);
/**
* the resultset service
*/
private final ResultSetService resultSet;
/**
* the resultset id
*/
private final String rsId;
/**
* positional value of the getResult requests
*/
private int fromPosition;
/**
* positional value of the getResult requests
*/
private int toPosition;
/**
* actual page size
*/
private int pageSize;
/**
* default page size
*/
private static int DEFAULT_PAGE_SIZE = 10;
/**
* default max waiting time
*/
private static long DEFAULT_MAX_WAIT_TIME = 30000;
/**
* actual max waiting time
*/
private long maxWaitTime;
/**
* current wait time
*/
private long waitTime;
/**
* request counter used to calculate the waitTime
*/
private int delayCount;
/**
* resultset status
*/
private String RSStatus;
/**
* number of elements in the resultset
*/
private int numberOfElements;
private final static String RS_CLOSED = "closed";
private final static String RS_OPEN = "open";
/**
*
* @param resultSet
* @param rsId
* @throws ResultSetException
*/
public ResultSetPageProvider(final ResultSetService resultSet, final String rsId) throws ResultSetRuntimeException {
this.resultSet = resultSet;
this.rsId = rsId;
this.pageSize = DEFAULT_PAGE_SIZE;
this.maxWaitTime = DEFAULT_MAX_WAIT_TIME;
fromPosition = toPosition = 0;
delayCount = 0;
waitTime = 0;
updateResultSetStatus();
}
public ResultSetPageProvider(final ResultSetService resultSet, final String rsId, final int pageSize) throws ResultSetRuntimeException {
this(resultSet, rsId);
this.pageSize = pageSize;
}
/**
*
* @return
* @throws ResultSetTimeoutException
* @throws ResultSetException
*/
public List<String> nextPage() throws ResultSetTimeoutException, ResultSetRuntimeException {
do {
updateResultSetStatus();
int availableElements = numberOfElements - toPosition;
log.debug("availableElements: " + availableElements);
if (availableElements > 0) {
fromPosition = toPosition + 1;
if (availableElements < pageSize) {
toPosition = (fromPosition + availableElements) - 1;
} else {
toPosition = (fromPosition + pageSize) - 1;
delayCount = 0;
}
log.debug(" - getting result from " + fromPosition + " to " + toPosition + ", numberOfElements: " + numberOfElements + ", availableElements: "
+ availableElements);
try {
return resultSet.getResult(rsId, fromPosition, toPosition, "waiting");
} catch (ResultSetException e) {
log.info(e);
throw new NoSuchElementException(e.getMessage());
}
}
if (RSStatus.equals(RS_CLOSED) && (availableElements == 0)) return null;
else {
stopAndWait(++delayCount);
}
} while (true);
}
/**
*
* @param delayCount
* @throws ResultSetTimeoutException
*/
private void stopAndWait(final int delayCount) throws ResultSetTimeoutException {
try {
waitTime = (long) ((Math.pow(1.2, 10 + delayCount) * 10L)) + 200;
// waitTime = (long) Math.exp(delayCount);
if (waitTime > maxWaitTime) {
log.warn("Timeout getting elements from resultset: " + waitTime + ". next poll would wait more than " + maxWaitTime + " ms");
throw new ResultSetTimeoutException("Timeout getting elements from resultset: next poll would wait more than " + maxWaitTime + " ms");
}
log.debug("resultset client is going to sleep for: " + waitTime);
// System.out.println("resultset client is going to sleep for: " + waitTime);
Thread.sleep(waitTime);
} catch (InterruptedException e) {
log.error("resultSetClient got InterruptedException", e);
}
}
/**
* updates the
*
* @throws ResultSetException
*/
private void updateResultSetStatus() throws ResultSetRuntimeException {
try {
RSStatus = resultSet.getRSStatus(rsId);
numberOfElements = resultSet.getNumberOfElements(rsId);
// System.out.println("updateResultSetStatus: size is " + numberOfElements + " and status is " + RSStatus);
} catch (ResultSetException e) {
log.warn(e);
throw new ResultSetRuntimeException(e);
}
}
public int getPageSize() {
return pageSize;
}
public void setPageSize(final int pageSize) {
if (pageSize <= 0) throw new IllegalArgumentException("parameter 'pageSize' must be grater than zero");
this.pageSize = pageSize;
}
@Deprecated
public boolean hasElements() throws ResultSetRuntimeException {
updateResultSetStatus();
if (RSStatus.equals(RS_OPEN)) return true;
if (RSStatus.equals(RS_CLOSED) && (numberOfElements == 0)) return false;
return true;
}
public void setMaxWaitTime(final long maxWaitTime) {
if (maxWaitTime <= 0) throw new IllegalArgumentException("parameter 'maxWaitTime' must be grater than zero");
this.maxWaitTime = maxWaitTime;
}
}

View File

@ -0,0 +1,29 @@
package eu.dnetlib.enabling.resultset.client.utils;
import java.io.StringReader;
import javax.xml.transform.stream.StreamSource;
import javax.xml.ws.EndpointReference;
import javax.xml.ws.wsaddressing.W3CEndpointReference;
/**
*
* @author claudio
*
*/
public class EPRUtils {
/**
* builds an epr from its string representation
*
* @param epr
* String epr
* @return
* W3CEndpointReference epr
*
*/
public W3CEndpointReference getEpr(String epr) {
return (W3CEndpointReference) EndpointReference.readFrom(new StreamSource(new StringReader(epr)));
}
}

View File

@ -0,0 +1,18 @@
package eu.dnetlib.enabling.resultset.client.utils;
public class ResultSetRuntimeException extends RuntimeException {
/**
*
*/
private static final long serialVersionUID = -5131499590327995897L;
public ResultSetRuntimeException(String message) {
super(message);
}
public ResultSetRuntimeException(Throwable e) {
super(e);
}
}

View File

@ -0,0 +1,11 @@
package eu.dnetlib.enabling.resultset.client.utils;
public class ResultSetTimeoutException extends RuntimeException {
private static final long serialVersionUID = -3713991101055085620L;
public ResultSetTimeoutException(String message) {
super(message);
}
}

View File

@ -0,0 +1,3 @@
services.is.resultset.client.timeout=60000
services.is.resultset.client.connecttimeout=10000
services.is.resultset.client.pagesize=100

View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jaxws="http://cxf.apache.org/jaxws"
xmlns:sec="http://cxf.apache.org/configuration/security" xmlns:wsa="http://cxf.apache.org/ws/addressing"
xmlns:p="http://www.springframework.org/schema/p" xmlns:http="http://cxf.apache.org/transports/http/configuration"
xmlns:t="http://dnetlib.eu/springbeans/t" xmlns:template="http://dnetlib.eu/springbeans/template"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://cxf.apache.org/ws/addressing http://cxf.apache.org/schemas/ws-addr-conf.xsd
http://cxf.apache.org/configuration/security http://cxf.apache.org/schemas/configuration/security.xsd
http://cxf.apache.org/transports/http/configuration http://cxf.apache.org/schemas/configuration/http-conf.xsd
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
http://dnetlib.eu/springbeans/template http://dnetlib.eu/springbeans/template.xsd">
<!-- beans -->
<bean id="resultSetClientFactory"
class="eu.dnetlib.enabling.resultset.client.ResultSetClientFactory"
p:serviceResolver-ref="serviceResolver" p:eprUtils-ref="eprUtils"
p:pageSize="${services.is.resultset.client.pagesize}"
p:timeout="${services.is.resultset.client.timeout}"
p:connectTimeout="${services.is.resultset.client.connecttimeout}"/>
<bean id="eprUtils" class="eu.dnetlib.enabling.resultset.client.utils.EPRUtils" />
</beans>

View File

@ -0,0 +1,118 @@
package eu.dnetlib.enabling.resultset;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import com.google.common.collect.Lists;
import eu.dnetlib.enabling.resultset.client.ResultSetPageProvider;
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;
@RunWith(MockitoJUnitRunner.class)
public class ResultSetPageProviderTest {
private static final String RSID = "RS_123";
private static final int PAGE_SIZE = 10;
private static final String RSSTATUS = "closed";
// Class Under test
private ResultSetPageProvider pageProvider;
@Mock
private ResultSetService mockResultSet;
private class ResultAnswer implements Answer<List<String>> {
private int size;
public ResultAnswer(int size) {
this.size = size;
}
@Override
public List<String> answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
int from = Integer.parseInt(args[1].toString());
int to = Integer.parseInt(args[2].toString());
if (to > size) to = size;
if (from > to) throw new Exception("FROM IS GREATER THAN TO");
List<String> list = Lists.newArrayList();
for (int i = from; i<=to; i++) {
list.add("RECORD " + i);
}
return list;
}
}
@Before
public void setUp() throws Exception {
pageProvider = new ResultSetPageProvider(mockResultSet, RSID);
pageProvider.setMaxWaitTime(60000);
pageProvider.setPageSize(PAGE_SIZE);
}
@Test
public void testNextPage5() throws Exception {
performTest(5);
}
@Test
public void testNextPage10() throws Exception {
performTest(10);
}
@Test
public void testNextPage15() throws Exception {
performTest(15);
}
@Test
public void testNextPage20() throws Exception {
performTest(20);
}
@Test
public void testNextPage250() throws Exception {
performTest(250);
}
@Test
public void testNextPage254() throws Exception {
performTest(254);
}
public void performTest(int size) throws Exception {
when(mockResultSet.getRSStatus(RSID)).thenReturn(RSSTATUS);
when(mockResultSet.getNumberOfElements(RSID)).thenReturn(size);
when(mockResultSet.getResult(anyString(), anyInt(), anyInt(), anyString())).thenAnswer(new ResultAnswer(size));
List<String> list = Lists.newArrayList();
while (true) {
List<String> res = pageProvider.nextPage();
if (res == null || res.isEmpty()) break;
list.addAll(res);
}
assertEquals(list.size(), size);
verify(mockResultSet, times(((size - 1) / PAGE_SIZE) + 1)).getResult(anyString(), anyInt(), anyInt(), anyString());
}
}