2012-09-06 09:52:49 +02:00
|
|
|
package org.gcube.data.streams.adapters;
|
|
|
|
|
|
|
|
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
|
|
|
|
import gr.uoa.di.madgik.grs.reader.ForwardReader;
|
|
|
|
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
|
2014-12-03 18:08:48 +01:00
|
|
|
import gr.uoa.di.madgik.grs.record.GRS2ExceptionWrapper;
|
2012-09-06 09:52:49 +02:00
|
|
|
import gr.uoa.di.madgik.grs.record.Record;
|
|
|
|
import gr.uoa.di.madgik.grs.record.exception.GRS2UncheckedException;
|
|
|
|
|
|
|
|
import java.net.URI;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
import org.gcube.data.streams.LookAheadStream;
|
|
|
|
import org.gcube.data.streams.Stream;
|
|
|
|
import org.gcube.data.streams.exceptions.StreamException;
|
|
|
|
import org.gcube.data.streams.exceptions.StreamOpenException;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* A {@link Stream} adapter for gRS2 resultsets.
|
|
|
|
* <p>
|
|
|
|
* This implementation is not thread safe.
|
|
|
|
*
|
|
|
|
* @author Fabio Simeoni
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
public class ResultsetStream<E extends Record> extends LookAheadStream<E> {
|
|
|
|
|
|
|
|
private static Logger log =LoggerFactory.getLogger(ResultsetStream.class);
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
public static final int default_timeout = 30;
|
|
|
|
public static final TimeUnit default_timeout_unit=TimeUnit.SECONDS;
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
private final URI locator;
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
private long timeout = default_timeout;
|
|
|
|
private TimeUnit timeoutUnit = default_timeout_unit;
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
private boolean open=false;
|
|
|
|
private boolean closed=false;
|
|
|
|
private RuntimeException lookAheadFailure;
|
|
|
|
|
2014-12-03 17:34:30 +01:00
|
|
|
private ForwardReader<E> reader;
|
2014-12-03 18:08:48 +01:00
|
|
|
|
2014-12-03 17:34:30 +01:00
|
|
|
private E record;
|
|
|
|
|
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
/**
|
|
|
|
* Creates a new instance with a result set locator.
|
|
|
|
* @param locator the locator.
|
|
|
|
* @throws IllegalArgumentException if the locator is <code>null</code>.
|
|
|
|
* */
|
|
|
|
public ResultsetStream(URI locator) throws IllegalArgumentException {
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
if (locator==null)
|
|
|
|
throw new IllegalArgumentException("invalid or null locator");
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
this.locator=locator;
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
}
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
public void setTimeout(long timeout, TimeUnit unit) throws IllegalArgumentException {
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
if (timeout<=0 || timeoutUnit==null)
|
|
|
|
throw new IllegalArgumentException("invalid timeout or null timeout unit");
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
this.timeout = timeout;
|
|
|
|
this.timeoutUnit = unit;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
protected E delegateNext() {
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
try {
|
|
|
|
if (lookAheadFailure!=null)
|
|
|
|
throw lookAheadFailure;
|
2014-12-03 18:08:48 +01:00
|
|
|
else if (record instanceof GRS2ExceptionWrapper){
|
2014-12-03 17:34:30 +01:00
|
|
|
//get underlying cause
|
2014-12-03 18:08:48 +01:00
|
|
|
Throwable cause = ((GRS2ExceptionWrapper)record).getEx().getCause();
|
2014-12-03 17:34:30 +01:00
|
|
|
|
|
|
|
//rewrap checked cause as appropriate to this layer
|
|
|
|
if (cause instanceof RuntimeException)
|
|
|
|
throw (RuntimeException) cause;
|
|
|
|
else
|
|
|
|
throw new StreamException(cause);
|
2014-12-03 18:08:48 +01:00
|
|
|
} else return record;
|
|
|
|
|
|
|
|
}finally {
|
2012-09-06 09:52:49 +02:00
|
|
|
lookAheadFailure=null;
|
|
|
|
}
|
|
|
|
}
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
@Override
|
|
|
|
protected boolean delegateHasNext() {
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
if (closed)
|
|
|
|
return false;
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
if (!open) {
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
try {
|
|
|
|
reader = new ForwardReader<E>(locator);
|
|
|
|
}
|
|
|
|
catch (Throwable t) {
|
|
|
|
lookAheadFailure= new StreamOpenException("cannot open resultset "+locator,t);
|
|
|
|
return true;
|
|
|
|
}
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
log.info("initialised resultset at "+locator);
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
open=true;
|
|
|
|
}
|
|
|
|
|
2014-12-03 17:34:30 +01:00
|
|
|
try {
|
|
|
|
record = reader.get(timeout, timeoutUnit );
|
|
|
|
} catch (GRS2ReaderException e) {
|
|
|
|
lookAheadFailure = new RuntimeException(e);
|
|
|
|
}
|
2014-12-03 18:08:48 +01:00
|
|
|
|
2014-12-03 17:34:30 +01:00
|
|
|
if (reader.getStatus()!=Status.Close && record == null) {
|
|
|
|
if (reader.getStatus()==Status.Open)
|
|
|
|
lookAheadFailure = new RuntimeException("Timeout occurred reading the resultSet");
|
|
|
|
else if (reader.getStatus()==Status.Dispose)
|
|
|
|
lookAheadFailure = new RuntimeException("ResultSet disposed");
|
|
|
|
return true;
|
|
|
|
} else return record!=null;
|
2014-12-03 18:08:48 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
}
|
2014-12-03 17:34:30 +01:00
|
|
|
|
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
@Override
|
|
|
|
public void close() {
|
|
|
|
|
|
|
|
if (open) {
|
|
|
|
try {
|
|
|
|
reader.close();
|
|
|
|
log.info("closed resultset at "+locator);
|
|
|
|
}
|
|
|
|
catch(GRS2ReaderException e) {
|
|
|
|
log.error("could not close resultset",e);
|
|
|
|
}
|
|
|
|
open=false;
|
|
|
|
}
|
|
|
|
closed=true;
|
|
|
|
}
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
@Override
|
|
|
|
public URI locator() throws IllegalStateException {
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
if (open)
|
|
|
|
throw new IllegalStateException("locator is invalid as result set has already been opened");
|
|
|
|
else
|
|
|
|
return locator;
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void remove() {
|
2014-12-03 17:34:30 +01:00
|
|
|
record = null;
|
2012-09-06 09:52:49 +02:00
|
|
|
}
|
2014-12-03 17:34:30 +01:00
|
|
|
|
2012-09-06 09:52:49 +02:00
|
|
|
@Override
|
|
|
|
public boolean isClosed() {
|
|
|
|
return closed;
|
|
|
|
}
|
|
|
|
}
|