diff --git a/pom.xml b/pom.xml index 259a8df..c6b36f9 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.gcube.data.access streams - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT Stream Library diff --git a/src/main/java/org/gcube/data/streams/LookAheadStream.java b/src/main/java/org/gcube/data/streams/LookAheadStream.java index a1fc3ea..5a11144 100644 --- a/src/main/java/org/gcube/data/streams/LookAheadStream.java +++ b/src/main/java/org/gcube/data/streams/LookAheadStream.java @@ -67,7 +67,7 @@ public abstract class LookAheadStream implements Stream { } private boolean lookAhead() { - + if (!delegateHasNext()) return false; diff --git a/src/main/java/org/gcube/data/streams/adapters/ResultsetStream.java b/src/main/java/org/gcube/data/streams/adapters/ResultsetStream.java index a68f3b4..8bdd87b 100644 --- a/src/main/java/org/gcube/data/streams/adapters/ResultsetStream.java +++ b/src/main/java/org/gcube/data/streams/adapters/ResultsetStream.java @@ -7,7 +7,6 @@ import gr.uoa.di.madgik.grs.record.Record; import gr.uoa.di.madgik.grs.record.exception.GRS2UncheckedException; import java.net.URI; -import java.util.Iterator; import java.util.concurrent.TimeUnit; import org.gcube.data.streams.LookAheadStream; @@ -29,108 +28,113 @@ import org.slf4j.LoggerFactory; public class ResultsetStream extends LookAheadStream { private static Logger log =LoggerFactory.getLogger(ResultsetStream.class); - + public static final int default_timeout = 30; public static final TimeUnit default_timeout_unit=TimeUnit.SECONDS; - + private final URI locator; - + private long timeout = default_timeout; private TimeUnit timeoutUnit = default_timeout_unit; - + private boolean open=false; private boolean closed=false; private RuntimeException lookAheadFailure; - - private ForwardReader reader; - private Iterator iterator; + private ForwardReader reader; + private E record; + + /** * Creates a new instance with a result set locator. * @param locator the locator. * @throws IllegalArgumentException if the locator is null. * */ public ResultsetStream(URI locator) throws IllegalArgumentException { - + if (locator==null) throw new IllegalArgumentException("invalid or null locator"); - + this.locator=locator; - + } - + public void setTimeout(long timeout, TimeUnit unit) throws IllegalArgumentException { - + if (timeout<=0 || timeoutUnit==null) throw new IllegalArgumentException("invalid timeout or null timeout unit"); - + this.timeout = timeout; this.timeoutUnit = unit; } @Override protected E delegateNext() { - + try { if (lookAheadFailure!=null) throw lookAheadFailure; else try { - return iterator.next(); - } - catch(GRS2UncheckedException e) { - - //get underlying cause - Throwable cause = e.getCause(); - - //rewrap checked cause as appropriate to this layer - if (cause instanceof RuntimeException) - throw (RuntimeException) cause; - else - throw new StreamException(cause); + return record; } + catch(GRS2UncheckedException e) { + + //get underlying cause + Throwable cause = e.getCause(); + + //rewrap checked cause as appropriate to this layer + if (cause instanceof RuntimeException) + throw (RuntimeException) cause; + else + throw new StreamException(cause); + } } finally { lookAheadFailure=null; } } - + @Override protected boolean delegateHasNext() { - + if (closed) return false; - + if (!open) { - + try { reader = new ForwardReader(locator); - reader.setIteratorTimeout(timeout); - reader.setIteratorTimeUnit(timeoutUnit); } catch (Throwable t) { lookAheadFailure= new StreamOpenException("cannot open resultset "+locator,t); return true; } - - iterator = reader.iterator(); - + log.info("initialised resultset at "+locator); - + open=true; } - - //infer outage if reader has been dismissed remotely - if (reader.getStatus()==Status.Dispose && !closed) - lookAheadFailure= new RuntimeException("unrecoverable failure in resultset "); - boolean hasNext = iterator.hasNext(); + try { + record = reader.get(timeout, timeoutUnit ); + } catch (GRS2ReaderException e) { + lookAheadFailure = new RuntimeException(e); + lookAheadFailure.printStackTrace(); + } + + if (reader.getStatus()!=Status.Close && record == null) { + if (reader.getStatus()==Status.Open) + lookAheadFailure = new RuntimeException("Timeout occurred reading the resultSet"); + else if (reader.getStatus()==Status.Dispose) + lookAheadFailure = new RuntimeException("ResultSet disposed"); + return true; + } else return record!=null; - return hasNext; } - - + + @Override public void close() { @@ -146,22 +150,22 @@ public class ResultsetStream extends LookAheadStream { } closed=true; } - + @Override public URI locator() throws IllegalStateException { - + if (open) throw new IllegalStateException("locator is invalid as result set has already been opened"); else return locator; - + } @Override public void remove() { - iterator.remove(); + record = null; } - + @Override public boolean isClosed() { return closed; diff --git a/src/main/java/org/gcube/data/streams/dsl/Streams.java b/src/main/java/org/gcube/data/streams/dsl/Streams.java index b81d1bb..7718731 100644 --- a/src/main/java/org/gcube/data/streams/dsl/Streams.java +++ b/src/main/java/org/gcube/data/streams/dsl/Streams.java @@ -1,10 +1,12 @@ package org.gcube.data.streams.dsl; -import static java.util.Arrays.*; +import static java.util.Arrays.asList; +import gr.uoa.di.madgik.grs.record.Record; import java.net.URI; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; import org.gcube.data.streams.Stream; import org.gcube.data.streams.adapters.IteratorAdapter; @@ -28,8 +30,6 @@ import org.gcube.data.streams.handlers.StopFastHandler; import org.gcube.data.streams.publishers.RsStringRecordFactory; import org.gcube.data.streams.test.FallibleIterator; -import gr.uoa.di.madgik.grs.record.Record; - /** * * The definitions of an eDSL of stream and stream-related expressions. @@ -127,9 +127,17 @@ public class Streams { * @return the stream */ public static Stream stringsIn(URI locator) { - return convert(locator).ofStrings().withDefaults(); + return convert(locator).ofStrings().withTimeout(5, TimeUnit.MINUTES); } + /** + * Returns a {@link Stream} of strings extracted from a resultset of {@link RsStringRecordFactory#STRING_RECORD}s. + * @param locator the locator of the resultset + * @return the stream + */ + public static Stream stringsIn(URI locator, int timeout, TimeUnit timeUnit) { + return convert(locator).ofStrings().withTimeout(timeout, timeUnit); + } // PIPE diff --git a/src/main/java/org/gcube/data/streams/publishers/RsPublisher.java b/src/main/java/org/gcube/data/streams/publishers/RsPublisher.java index 36e7260..6d4623b 100644 --- a/src/main/java/org/gcube/data/streams/publishers/RsPublisher.java +++ b/src/main/java/org/gcube/data/streams/publishers/RsPublisher.java @@ -281,11 +281,15 @@ public class RsPublisher implements StreamPublisher { //private helper: publish a record private void publish(RecordWriter writer, Record record) throws GRS2WriterException { - if (writer.getStatus() == Status.Open) + if (writer.getStatus() == Status.Open){ if (!writer.put(record, timeout, timeoutUnit)) { log.trace("client is not consuming resulset, stop publishing"); throw new GRS2WriterException(); } + } else{ + log.warn("Writer not open, actual status is {}",writer.getStatus()); + throw new GRS2WriterException("writer closed or disposed"); + } }