This commit is contained in:
Lucio Lelii 2014-12-03 16:34:30 +00:00
parent 44044943e6
commit f650e1f7da
5 changed files with 73 additions and 57 deletions

View File

@ -9,7 +9,7 @@
<groupId>org.gcube.data.access</groupId> <groupId>org.gcube.data.access</groupId>
<artifactId>streams</artifactId> <artifactId>streams</artifactId>
<version>2.0.1-SNAPSHOT</version> <version>2.0.2-SNAPSHOT</version>
<name>Stream Library</name> <name>Stream Library</name>

View File

@ -67,7 +67,7 @@ public abstract class LookAheadStream<E> implements Stream<E> {
} }
private boolean lookAhead() { private boolean lookAhead() {
if (!delegateHasNext()) if (!delegateHasNext())
return false; return false;

View File

@ -7,7 +7,6 @@ import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.record.exception.GRS2UncheckedException; import gr.uoa.di.madgik.grs.record.exception.GRS2UncheckedException;
import java.net.URI; import java.net.URI;
import java.util.Iterator;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.gcube.data.streams.LookAheadStream; import org.gcube.data.streams.LookAheadStream;
@ -29,108 +28,113 @@ import org.slf4j.LoggerFactory;
public class ResultsetStream<E extends Record> extends LookAheadStream<E> { public class ResultsetStream<E extends Record> extends LookAheadStream<E> {
private static Logger log =LoggerFactory.getLogger(ResultsetStream.class); private static Logger log =LoggerFactory.getLogger(ResultsetStream.class);
public static final int default_timeout = 30; public static final int default_timeout = 30;
public static final TimeUnit default_timeout_unit=TimeUnit.SECONDS; public static final TimeUnit default_timeout_unit=TimeUnit.SECONDS;
private final URI locator; private final URI locator;
private long timeout = default_timeout; private long timeout = default_timeout;
private TimeUnit timeoutUnit = default_timeout_unit; private TimeUnit timeoutUnit = default_timeout_unit;
private boolean open=false; private boolean open=false;
private boolean closed=false; private boolean closed=false;
private RuntimeException lookAheadFailure; private RuntimeException lookAheadFailure;
private ForwardReader<E> reader;
private Iterator<E> iterator;
private ForwardReader<E> reader;
private E record;
/** /**
* Creates a new instance with a result set locator. * Creates a new instance with a result set locator.
* @param locator the locator. * @param locator the locator.
* @throws IllegalArgumentException if the locator is <code>null</code>. * @throws IllegalArgumentException if the locator is <code>null</code>.
* */ * */
public ResultsetStream(URI locator) throws IllegalArgumentException { public ResultsetStream(URI locator) throws IllegalArgumentException {
if (locator==null) if (locator==null)
throw new IllegalArgumentException("invalid or null locator"); throw new IllegalArgumentException("invalid or null locator");
this.locator=locator; this.locator=locator;
} }
public void setTimeout(long timeout, TimeUnit unit) throws IllegalArgumentException { public void setTimeout(long timeout, TimeUnit unit) throws IllegalArgumentException {
if (timeout<=0 || timeoutUnit==null) if (timeout<=0 || timeoutUnit==null)
throw new IllegalArgumentException("invalid timeout or null timeout unit"); throw new IllegalArgumentException("invalid timeout or null timeout unit");
this.timeout = timeout; this.timeout = timeout;
this.timeoutUnit = unit; this.timeoutUnit = unit;
} }
@Override @Override
protected E delegateNext() { protected E delegateNext() {
try { try {
if (lookAheadFailure!=null) if (lookAheadFailure!=null)
throw lookAheadFailure; throw lookAheadFailure;
else else
try { try {
return iterator.next(); return record;
}
catch(GRS2UncheckedException e) {
//get underlying cause
Throwable cause = e.getCause();
//rewrap checked cause as appropriate to this layer
if (cause instanceof RuntimeException)
throw (RuntimeException) cause;
else
throw new StreamException(cause);
} }
catch(GRS2UncheckedException e) {
//get underlying cause
Throwable cause = e.getCause();
//rewrap checked cause as appropriate to this layer
if (cause instanceof RuntimeException)
throw (RuntimeException) cause;
else
throw new StreamException(cause);
}
} }
finally { finally {
lookAheadFailure=null; lookAheadFailure=null;
} }
} }
@Override @Override
protected boolean delegateHasNext() { protected boolean delegateHasNext() {
if (closed) if (closed)
return false; return false;
if (!open) { if (!open) {
try { try {
reader = new ForwardReader<E>(locator); reader = new ForwardReader<E>(locator);
reader.setIteratorTimeout(timeout);
reader.setIteratorTimeUnit(timeoutUnit);
} }
catch (Throwable t) { catch (Throwable t) {
lookAheadFailure= new StreamOpenException("cannot open resultset "+locator,t); lookAheadFailure= new StreamOpenException("cannot open resultset "+locator,t);
return true; return true;
} }
iterator = reader.iterator();
log.info("initialised resultset at "+locator); log.info("initialised resultset at "+locator);
open=true; open=true;
} }
//infer outage if reader has been dismissed remotely
if (reader.getStatus()==Status.Dispose && !closed)
lookAheadFailure= new RuntimeException("unrecoverable failure in resultset ");
boolean hasNext = iterator.hasNext(); try {
record = reader.get(timeout, timeoutUnit );
} catch (GRS2ReaderException e) {
lookAheadFailure = new RuntimeException(e);
lookAheadFailure.printStackTrace();
}
if (reader.getStatus()!=Status.Close && record == null) {
if (reader.getStatus()==Status.Open)
lookAheadFailure = new RuntimeException("Timeout occurred reading the resultSet");
else if (reader.getStatus()==Status.Dispose)
lookAheadFailure = new RuntimeException("ResultSet disposed");
return true;
} else return record!=null;
return hasNext;
} }
@Override @Override
public void close() { public void close() {
@ -146,22 +150,22 @@ public class ResultsetStream<E extends Record> extends LookAheadStream<E> {
} }
closed=true; closed=true;
} }
@Override @Override
public URI locator() throws IllegalStateException { public URI locator() throws IllegalStateException {
if (open) if (open)
throw new IllegalStateException("locator is invalid as result set has already been opened"); throw new IllegalStateException("locator is invalid as result set has already been opened");
else else
return locator; return locator;
} }
@Override @Override
public void remove() { public void remove() {
iterator.remove(); record = null;
} }
@Override @Override
public boolean isClosed() { public boolean isClosed() {
return closed; return closed;

View File

@ -1,10 +1,12 @@
package org.gcube.data.streams.dsl; package org.gcube.data.streams.dsl;
import static java.util.Arrays.*; import static java.util.Arrays.asList;
import gr.uoa.di.madgik.grs.record.Record;
import java.net.URI; import java.net.URI;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import org.gcube.data.streams.Stream; import org.gcube.data.streams.Stream;
import org.gcube.data.streams.adapters.IteratorAdapter; import org.gcube.data.streams.adapters.IteratorAdapter;
@ -28,8 +30,6 @@ import org.gcube.data.streams.handlers.StopFastHandler;
import org.gcube.data.streams.publishers.RsStringRecordFactory; import org.gcube.data.streams.publishers.RsStringRecordFactory;
import org.gcube.data.streams.test.FallibleIterator; import org.gcube.data.streams.test.FallibleIterator;
import gr.uoa.di.madgik.grs.record.Record;
/** /**
* *
* The definitions of an eDSL of stream and stream-related expressions. * The definitions of an eDSL of stream and stream-related expressions.
@ -127,9 +127,17 @@ public class Streams {
* @return the stream * @return the stream
*/ */
public static Stream<String> stringsIn(URI locator) { public static Stream<String> stringsIn(URI locator) {
return convert(locator).ofStrings().withDefaults(); return convert(locator).ofStrings().withTimeout(5, TimeUnit.MINUTES);
} }
/**
* Returns a {@link Stream} of strings extracted from a resultset of {@link RsStringRecordFactory#STRING_RECORD}s.
* @param locator the locator of the resultset
* @return the stream
*/
public static Stream<String> stringsIn(URI locator, int timeout, TimeUnit timeUnit) {
return convert(locator).ofStrings().withTimeout(timeout, timeUnit);
}
// PIPE // PIPE

View File

@ -281,11 +281,15 @@ public class RsPublisher<E> implements StreamPublisher {
//private helper: publish a record //private helper: publish a record
private void publish(RecordWriter<Record> writer, Record record) throws GRS2WriterException { private void publish(RecordWriter<Record> writer, Record record) throws GRS2WriterException {
if (writer.getStatus() == Status.Open) if (writer.getStatus() == Status.Open){
if (!writer.put(record, timeout, timeoutUnit)) { if (!writer.put(record, timeout, timeoutUnit)) {
log.trace("client is not consuming resulset, stop publishing"); log.trace("client is not consuming resulset, stop publishing");
throw new GRS2WriterException(); throw new GRS2WriterException();
} }
} else{
log.warn("Writer not open, actual status is {}",writer.getStatus());
throw new GRS2WriterException("writer closed or disposed");
}
} }