From 83641cc4786c60ed8d8d27a7cf0845662279cbde Mon Sep 17 00:00:00 2001 From: "fabio.simeoni" Date: Thu, 6 Sep 2012 07:52:49 +0000 Subject: [PATCH] branched for releases 2.0.x (first created for gCube 2.10.0) git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/branches/data-access/streams/2.0@57745 82a268e6-3cf1-43bd-a215-b396298e98cf --- .classpath | 10 + .project | 23 ++ distro/INSTALL | 1 + distro/LICENSE | 6 + distro/MAINTAINERS | 1 + distro/README | 38 +++ distro/changelog.xml | 30 ++ distro/descriptor.xml | 42 +++ distro/profile.xml | 26 ++ distro/svnpath.txt | 1 + pom.xml | 120 +++++++ .../java/org/gcube/data/streams/Callback.java | 23 ++ .../org/gcube/data/streams/Iteration.java | 30 ++ .../gcube/data/streams/LookAheadStream.java | 141 ++++++++ .../java/org/gcube/data/streams/Stream.java | 191 +++++++++++ .../gcube/data/streams/StreamConsumer.java | 62 ++++ .../java/org/gcube/data/streams/Utils.java | 63 ++++ .../streams/adapters/IteratorAdapter.java | 59 ++++ .../data/streams/adapters/IteratorStream.java | 102 ++++++ .../streams/adapters/ResultsetStream.java | 169 ++++++++++ .../delegates/AbstractDelegateStream.java | 58 ++++ .../data/streams/delegates/FoldedStream.java | 56 ++++ .../data/streams/delegates/GuardedStream.java | 42 +++ .../streams/delegates/MonitoredStream.java | 61 ++++ .../data/streams/delegates/PipedStream.java | 43 +++ .../streams/delegates/StreamListener.java | 27 ++ .../delegates/StreamListenerAdapter.java | 28 ++ .../streams/delegates/UnfoldedStream.java | 110 ++++++ .../org/gcube/data/streams/dsl/Faults.java | 132 ++++++++ .../gcube/data/streams/dsl/StreamClause.java | 30 ++ .../data/streams/dsl/StreamClauseEnv.java | 28 ++ .../org/gcube/data/streams/dsl/Streams.java | 253 ++++++++++++++ .../dsl/consume/ConsumeWithClause.java | 35 ++ .../gcube/data/streams/dsl/fold/InClause.java | 34 ++ .../gcube/data/streams/dsl/from/RsClause.java | 24 ++ .../gcube/data/streams/dsl/from/RsEnv.java | 40 +++ .../data/streams/dsl/from/RsOfClause.java | 43 +++ .../streams/dsl/from/RsStringWithClause.java | 61 ++++ .../data/streams/dsl/from/RsWithClause.java | 68 ++++ .../streams/dsl/guard/GuardWithClause.java | 35 ++ .../streams/dsl/listen/MonitorWithClause.java | 36 ++ .../streams/dsl/pipe/PipeThroughClause.java | 36 ++ .../streams/dsl/publish/PublishRsEnv.java | 40 +++ .../dsl/publish/PublishRsUsingClause.java | 49 +++ .../dsl/publish/PublishRsWithClause.java | 106 ++++++ .../dsl/unfold/UnfoldThroughClause.java | 36 ++ .../streams/exceptions/StreamContingency.java | 21 ++ .../streams/exceptions/StreamException.java | 43 +++ .../exceptions/StreamOpenException.java | 36 ++ .../exceptions/StreamPublishException.java | 36 ++ .../streams/exceptions/StreamSkipSignal.java | 28 ++ .../streams/exceptions/StreamStopSignal.java | 18 + .../gcube/data/streams/generators/Filter.java | 22 ++ .../data/streams/generators/Generator.java | 35 ++ .../streams/generators/LoggingListener.java | 59 ++++ .../streams/generators/NoOpGenerator.java | 17 + .../data/streams/generators/Processor.java | 31 ++ .../streams/handlers/CountingHandler.java | 32 ++ .../data/streams/handlers/FaultHandler.java | 24 ++ .../data/streams/handlers/IgnoreHandler.java | 14 + .../data/streams/handlers/RethrowHandler.java | 17 + .../handlers/RethrowUnrecoverableHandler.java | 22 ++ .../streams/handlers/StopFastHandler.java | 17 + .../handlers/StopUnrecoverableHandler.java | 22 ++ .../streams/publishers/RecordFactory.java | 34 ++ .../data/streams/publishers/RsPublisher.java | 312 ++++++++++++++++++ .../publishers/RsStringRecordFactory.java | 51 +++ .../data/streams/publishers/RsTransport.java | 29 ++ .../streams/publishers/StreamPublisher.java | 21 ++ .../streams/publishers/ThreadProvider.java | 18 + .../data/streams/test/FallibleIterator.java | 76 +++++ .../data/streams/test/StreamProvider.java | 18 + .../org/gcube/data/streams/test/Utils.java | 141 ++++++++ .../org/gcube/data/streams/CallbackTest.java | 112 +++++++ .../gcube/data/streams/FoldedStreamTest.java | 54 +++ .../gcube/data/streams/GuardedStreamTest.java | 149 +++++++++ .../data/streams/IteratorStreamTest.java | 50 +++ .../data/streams/MonitoredStreamTest.java | 66 ++++ .../gcube/data/streams/PipedStreamTest.java | 142 ++++++++ .../org/gcube/data/streams/PublishTest.java | 181 ++++++++++ .../org/gcube/data/streams/RsStreamTest.java | 87 +++++ .../gcube/data/streams/TestContingency.java | 8 + .../org/gcube/data/streams/TestUtils.java | 35 ++ .../data/streams/UnfoldedStreamTest.java | 84 +++++ 84 files changed, 4811 insertions(+) create mode 100644 .classpath create mode 100644 .project create mode 100644 distro/INSTALL create mode 100644 distro/LICENSE create mode 100644 distro/MAINTAINERS create mode 100644 distro/README create mode 100644 distro/changelog.xml create mode 100644 distro/descriptor.xml create mode 100644 distro/profile.xml create mode 100644 distro/svnpath.txt create mode 100644 pom.xml create mode 100644 src/main/java/org/gcube/data/streams/Callback.java create mode 100644 src/main/java/org/gcube/data/streams/Iteration.java create mode 100644 src/main/java/org/gcube/data/streams/LookAheadStream.java create mode 100644 src/main/java/org/gcube/data/streams/Stream.java create mode 100644 src/main/java/org/gcube/data/streams/StreamConsumer.java create mode 100644 src/main/java/org/gcube/data/streams/Utils.java create mode 100644 src/main/java/org/gcube/data/streams/adapters/IteratorAdapter.java create mode 100644 src/main/java/org/gcube/data/streams/adapters/IteratorStream.java create mode 100644 src/main/java/org/gcube/data/streams/adapters/ResultsetStream.java create mode 100644 src/main/java/org/gcube/data/streams/delegates/AbstractDelegateStream.java create mode 100644 src/main/java/org/gcube/data/streams/delegates/FoldedStream.java create mode 100644 src/main/java/org/gcube/data/streams/delegates/GuardedStream.java create mode 100644 src/main/java/org/gcube/data/streams/delegates/MonitoredStream.java create mode 100644 src/main/java/org/gcube/data/streams/delegates/PipedStream.java create mode 100644 src/main/java/org/gcube/data/streams/delegates/StreamListener.java create mode 100644 src/main/java/org/gcube/data/streams/delegates/StreamListenerAdapter.java create mode 100644 src/main/java/org/gcube/data/streams/delegates/UnfoldedStream.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/Faults.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/StreamClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/StreamClauseEnv.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/Streams.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/consume/ConsumeWithClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/fold/InClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/from/RsClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/from/RsEnv.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/from/RsOfClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/from/RsStringWithClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/from/RsWithClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/guard/GuardWithClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/listen/MonitorWithClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/pipe/PipeThroughClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/publish/PublishRsEnv.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/publish/PublishRsUsingClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/publish/PublishRsWithClause.java create mode 100644 src/main/java/org/gcube/data/streams/dsl/unfold/UnfoldThroughClause.java create mode 100644 src/main/java/org/gcube/data/streams/exceptions/StreamContingency.java create mode 100644 src/main/java/org/gcube/data/streams/exceptions/StreamException.java create mode 100644 src/main/java/org/gcube/data/streams/exceptions/StreamOpenException.java create mode 100644 src/main/java/org/gcube/data/streams/exceptions/StreamPublishException.java create mode 100644 src/main/java/org/gcube/data/streams/exceptions/StreamSkipSignal.java create mode 100644 src/main/java/org/gcube/data/streams/exceptions/StreamStopSignal.java create mode 100644 src/main/java/org/gcube/data/streams/generators/Filter.java create mode 100644 src/main/java/org/gcube/data/streams/generators/Generator.java create mode 100644 src/main/java/org/gcube/data/streams/generators/LoggingListener.java create mode 100644 src/main/java/org/gcube/data/streams/generators/NoOpGenerator.java create mode 100644 src/main/java/org/gcube/data/streams/generators/Processor.java create mode 100644 src/main/java/org/gcube/data/streams/handlers/CountingHandler.java create mode 100644 src/main/java/org/gcube/data/streams/handlers/FaultHandler.java create mode 100644 src/main/java/org/gcube/data/streams/handlers/IgnoreHandler.java create mode 100644 src/main/java/org/gcube/data/streams/handlers/RethrowHandler.java create mode 100644 src/main/java/org/gcube/data/streams/handlers/RethrowUnrecoverableHandler.java create mode 100644 src/main/java/org/gcube/data/streams/handlers/StopFastHandler.java create mode 100644 src/main/java/org/gcube/data/streams/handlers/StopUnrecoverableHandler.java create mode 100644 src/main/java/org/gcube/data/streams/publishers/RecordFactory.java create mode 100644 src/main/java/org/gcube/data/streams/publishers/RsPublisher.java create mode 100644 src/main/java/org/gcube/data/streams/publishers/RsStringRecordFactory.java create mode 100644 src/main/java/org/gcube/data/streams/publishers/RsTransport.java create mode 100644 src/main/java/org/gcube/data/streams/publishers/StreamPublisher.java create mode 100644 src/main/java/org/gcube/data/streams/publishers/ThreadProvider.java create mode 100644 src/main/java/org/gcube/data/streams/test/FallibleIterator.java create mode 100644 src/main/java/org/gcube/data/streams/test/StreamProvider.java create mode 100644 src/main/java/org/gcube/data/streams/test/Utils.java create mode 100644 src/test/java/org/gcube/data/streams/CallbackTest.java create mode 100644 src/test/java/org/gcube/data/streams/FoldedStreamTest.java create mode 100644 src/test/java/org/gcube/data/streams/GuardedStreamTest.java create mode 100644 src/test/java/org/gcube/data/streams/IteratorStreamTest.java create mode 100644 src/test/java/org/gcube/data/streams/MonitoredStreamTest.java create mode 100644 src/test/java/org/gcube/data/streams/PipedStreamTest.java create mode 100644 src/test/java/org/gcube/data/streams/PublishTest.java create mode 100644 src/test/java/org/gcube/data/streams/RsStreamTest.java create mode 100644 src/test/java/org/gcube/data/streams/TestContingency.java create mode 100644 src/test/java/org/gcube/data/streams/TestUtils.java create mode 100644 src/test/java/org/gcube/data/streams/UnfoldedStreamTest.java diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..0f53f3e --- /dev/null +++ b/.classpath @@ -0,0 +1,10 @@ + + + + + + + + + + diff --git a/.project b/.project new file mode 100644 index 0000000..688aec7 --- /dev/null +++ b/.project @@ -0,0 +1,23 @@ + + + streams + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + diff --git a/distro/INSTALL b/distro/INSTALL new file mode 100644 index 0000000..8d1c8b6 --- /dev/null +++ b/distro/INSTALL @@ -0,0 +1 @@ + diff --git a/distro/LICENSE b/distro/LICENSE new file mode 100644 index 0000000..630ba97 --- /dev/null +++ b/distro/LICENSE @@ -0,0 +1,6 @@ +gCube System - License +------------------------------------------------------------ + +The gCube/gCore software is licensed as Free Open Source software conveying to the EUPL (http://ec.europa.eu/idabc/eupl). +The software and documentation is provided by its authors/distributors "as is" and no expressed or +implied warranty is given for its use, quality or fitness for a particular case. diff --git a/distro/MAINTAINERS b/distro/MAINTAINERS new file mode 100644 index 0000000..5e8aa8f --- /dev/null +++ b/distro/MAINTAINERS @@ -0,0 +1 @@ +* Fabio Simeoni (fabio.simeoni@fao.org), FAO of the UN, Italy \ No newline at end of file diff --git a/distro/README b/distro/README new file mode 100644 index 0000000..62c4be1 --- /dev/null +++ b/distro/README @@ -0,0 +1,38 @@ +The gCube System - ${name} +---------------------- + +This work has been partially supported by the following European projects: DILIGENT (FP6-2003-IST-2), D4Science (FP7-INFRA-2007-1.2.2), +D4Science-II (FP7-INFRA-2008-1.2.2), iMarine (FP7-INFRASTRUCTURES-2011-2), and EUBrazilOpenBio (FP7-ICT-2011-EU-Brazil). + +Authors +------- + +* Fabio Simeoni (fabio.simeoni@fao.org), FAO of the UN, Italy. + +Version and Release Date +------------------------ +${version} + +Description +----------- +${description} + +Download information +-------------------- + +Source code is available from SVN: +${scm.url} + +Binaries can be downloaded from: + + +Documentation +------------- +Documentation is available on-line from the Projects Documentation Wiki: +https://gcube.wiki.gcube-system.org/gcube/index.php/The_Streams_Library + + +Licensing +--------- + +This software is licensed under the terms you may find in the file named "LICENSE" in this directory. diff --git a/distro/changelog.xml b/distro/changelog.xml new file mode 100644 index 0000000..b586f58 --- /dev/null +++ b/distro/changelog.xml @@ -0,0 +1,30 @@ + + + First Release + + + + Fault handling in stream publication and consumption aligns + with fault transport facilities of latest gRS2. + + Faults are published and abort publication unless they are + contingencies (cf. StreamContingency annotation). + + Streams can be conveniently consumed with callbacks (cf. + Callback interface and Streams#consume()) and callbacks can explicitly skip + elements or stop + consumption (cf. Iteration class). + + + Like callbacks, fault handlers can control iteration directly (cf. Iteration). FaultResponse is no longer needed. + + + StreamSkipException is replaced internally with + StreamSkipSignal, and StreamStopSignal is also added to model signals from callbacks and fault handlers. + + + New stream implementations as well as code that consumes + streams can now be tested with dedicated facilities. + + + \ No newline at end of file diff --git a/distro/descriptor.xml b/distro/descriptor.xml new file mode 100644 index 0000000..21d8c88 --- /dev/null +++ b/distro/descriptor.xml @@ -0,0 +1,42 @@ + + servicearchive + + tar.gz + + / + + + ${distroDirectory} + / + true + + README + LICENSE + INSTALL + MAINTAINERS + changelog.xml + + 755 + true + + + + + ${distroDirectory}/profile.xml + / + true + + + target/${build.finalName}.jar + /${artifactId} + + + ${distroDirectory}/svnpath.txt + /${artifactId} + true + + + \ No newline at end of file diff --git a/distro/profile.xml b/distro/profile.xml new file mode 100644 index 0000000..0078cff --- /dev/null +++ b/distro/profile.xml @@ -0,0 +1,26 @@ + + + + Service + + ${description} + DataAccess + ${artifactId} + 1.0.0 + + + ${artifactId} + ${version} + + ${groupId} + ${artifactId} + ${version} + + + ${build.finalName}.jar + + + + + + diff --git a/distro/svnpath.txt b/distro/svnpath.txt new file mode 100644 index 0000000..f416f9d --- /dev/null +++ b/distro/svnpath.txt @@ -0,0 +1 @@ +${scm.url} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..81fede3 --- /dev/null +++ b/pom.xml @@ -0,0 +1,120 @@ + + 4.0.0 + + maven-parent + org.gcube.tools + 1.0.0 + + + org.gcube.data.access + streams + 2.0.0-SNAPSHOT + + + Stream Library + Embedded Domain-Specific Language for Stream Transformations + + + scm:svn:http://svn.d4science.research-infrastructures.eu/gcube/trunk/data-access/${project.artifactId} + scm:svn:https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-access/${project.artifactId} + http://svn.d4science.research-infrastructures.eu/gcube/trunk/data-access/${project.artifactId} + + + + distro + + + + + + org.gcube.execution + grs2library + [2.0.0-SNAPSHOT,3.0.0-SNAPSHOT) + + + + org.slf4j + slf4j-api + 1.6.4 + compile + + + + junit + junit + 4.8.2 + test + + + + org.slf4j + slf4j-simple + 1.6.6 + test + + + + org.mockito + mockito-core + 1.9.0 + test + + + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + 2.5 + + + copy-profile + install + + copy-resources + + + target + + + ${distroDirectory} + true + + profile.xml + + + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + ${distroDirectory}/descriptor.xml + + + + + servicearchive + install + + single + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/org/gcube/data/streams/Callback.java b/src/main/java/org/gcube/data/streams/Callback.java new file mode 100644 index 0000000..5a90462 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/Callback.java @@ -0,0 +1,23 @@ +package org.gcube.data.streams; + + +/** + * A callback for a {@link StreamConsumer}. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + * + */ +public interface Callback { + + /** The ongoing iteration. */ + static final Iteration iteration = new Iteration(); + + /** + * Implement to consume an element of the stream. + * @param element the element + */ + void consume(T element); + +} diff --git a/src/main/java/org/gcube/data/streams/Iteration.java b/src/main/java/org/gcube/data/streams/Iteration.java new file mode 100644 index 0000000..8c5a9e6 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/Iteration.java @@ -0,0 +1,30 @@ +package org.gcube.data.streams; + +import org.gcube.data.streams.exceptions.StreamSkipSignal; +import org.gcube.data.streams.exceptions.StreamStopSignal; +import org.gcube.data.streams.handlers.FaultHandler; + +/** + * A model of a {@link Stream} iteration, with facilities to control it from within {@link Callback}s and {@link FaultHandler}s. + * + * @author Fabio Simeoni + * + */ +public final class Iteration { + + /** + * Stops the ongoing iteration. + */ + public void stop() throws StreamStopSignal { + throw new StreamStopSignal(); + } + + /** + * Skip this element of the ongoing iteration. + */ + public void skip() throws StreamSkipSignal { + throw new StreamSkipSignal(); + } + + +} diff --git a/src/main/java/org/gcube/data/streams/LookAheadStream.java b/src/main/java/org/gcube/data/streams/LookAheadStream.java new file mode 100644 index 0000000..532f4b0 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/LookAheadStream.java @@ -0,0 +1,141 @@ +package org.gcube.data.streams; + +import java.util.NoSuchElementException; + +import org.gcube.data.streams.exceptions.StreamSkipSignal; +import org.gcube.data.streams.exceptions.StreamStopSignal; +import org.gcube.data.streams.handlers.FaultHandler; +import org.gcube.data.streams.handlers.RethrowHandler; + +/** + * Partial {@link Stream} implementation based on look-ahead operations over an underlying stream. + *

+ * The implementation attempts to prefetch the output of {@link #next()} in {@link #hasNext()}. If a failure occurs, + * {@link #hasNext()}: + * + *

  • keeps consuming the underlying stream as long as the failure is a {@link StreamSkipSignal}; + *
  • consults a {@link FaultHandler} for all the other failures. If the {@link FaultHandler} re-throws the same or a + * different exception, the implementation throws it at the following {@link #next()}. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public abstract class LookAheadStream implements Stream { + + private FaultHandler handler = new RethrowHandler(); + + // iteration state + protected Boolean hasNext; + protected E element; + private RuntimeException failure; + + /** + * Sets the {@link FaultHandler} for the iteration + * + * @param handler the handler + * @throws IllegalArgumentException if the handler is null + */ + public void setHandler(FaultHandler handler) throws IllegalArgumentException { + + if (handler == null) + throw new IllegalArgumentException("invalid null handler"); + + this.handler = handler; + } + + @Override + public final boolean hasNext() { + + if (hasNext==null) + hasNext = lookAhead(); + + //auto-close + if (!hasNext) + close(); + + return hasNext; + } + + private boolean lookAhead() { + + if (!delegateHasNext()) + return false; + + try { + this.element = delegateNext(); + return true; + } + catch (RuntimeException failure) { + + try { + handler.handle(failure); + return lookAhead(); + } + catch(StreamSkipSignal skip) { + return lookAhead(); + } + catch(StreamStopSignal stop) { + return false; + } + catch(RuntimeException rethrownUnchecked) { + this.failure=rethrownUnchecked; + return true; + } + } + + + + } + + @Override + public final E next() { + + try { + throwLookAheadFailureIfAny(); + return lookedAheadElementOrGetItNow(); + } + finally { + cleanIterationState(); + } + } + + private void throwLookAheadFailureIfAny() { + if (failure != null) + throw failure; + } + + private E lookedAheadElementOrGetItNow() { + + if (element == null) + if (hasNext()) + return next(); + else + throw new NoSuchElementException(); + + return element; + + } + + private void cleanIterationState() { + failure=null; + element=null; + hasNext = null; + } + + + + /** + * Returns an element of the underlying stream + * + * @return the element + */ + protected abstract E delegateNext(); + + /** + * Returns {@code true} if the underlying stream has more elements. + * + * @return {@code true} if the underlying stream has more elements + */ + protected abstract boolean delegateHasNext(); +} diff --git a/src/main/java/org/gcube/data/streams/Stream.java b/src/main/java/org/gcube/data/streams/Stream.java new file mode 100644 index 0000000..13c0548 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/Stream.java @@ -0,0 +1,191 @@ +package org.gcube.data.streams; + +import java.net.URI; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.gcube.data.streams.adapters.IteratorStream; +import org.gcube.data.streams.adapters.ResultsetStream; +import org.gcube.data.streams.delegates.FoldedStream; +import org.gcube.data.streams.delegates.GuardedStream; +import org.gcube.data.streams.delegates.MonitoredStream; +import org.gcube.data.streams.delegates.PipedStream; +import org.gcube.data.streams.delegates.UnfoldedStream; +import org.gcube.data.streams.dsl.Faults; +import org.gcube.data.streams.dsl.Streams; +import org.gcube.data.streams.exceptions.StreamOpenException; +import org.gcube.data.streams.handlers.FaultHandler; +import org.gcube.data.streams.publishers.RsPublisher; +import org.gcube.data.streams.publishers.StreamPublisher; + +/** + * An {@link Iterator} over the elements of a dataset of arbitrary origin, including memory, secondary storage, and + * network. + *

    + * + *

    Properties


    + * + * Streams are: + *

    + * + *

      + *
    • addressable: clients may invoke {@link #locator()} to obtain a reference to their address. The use and + * syntax of locators is implementation-dependent. + *
    • closeable: clients may invoke {@link #close()} to allow implementations to release resources. Clients + * should invoke {@link #close()} if they do not consume streams in their entirety. Implementations + * must automatically release their resources when they have been consumed in their entirety. + *
    • fallible: invoking {@link #next()} over streams that originate from secondary storage and remote + * locations may raise a wide range failures. Some failures may be recoverable, in that subsequent invocations + * of {@link #next()} may still succeed. Other failures may be unrecoverable, in that subsequent + * invocations of {@link #next()} are guaranteed to fail too. + *
    + * + *

    Implementations


    + * + * There are predefined implementations that adapt the {@link Stream} interface to existing {@link Iterator}s and remote + * gRS2 resultsets (cf. {@link IteratorStream} and {@link ResultsetStream}). + *

    + * + * Other predefined implementations transform, fold, and unfold the elements of existing streams (cf. + * {@link PipedStream}, {@link FoldedStream}, {@link UnfoldedStream}). + *

    + * + * Additional implementations allow modular handling of stream faults and notify interested listeners of stream + * iteration events (cf. {@link GuardedStream}, {@link MonitoredStream}). + *

    + * + * Finally, streams may be published outside the current runtime by implementations of the {@link StreamPublisher} + * interface. A predefined implementation supports publication of streams as gRS2 resultsets (cf. {@link RsPublisher}). + * + *

    + * + * All the available implementations can be fluently instantiated and configured with an embedded DSL (cf. + * {@link Streams}). + * + *

    Fault Handling


    + * + * Clients can implement {@link FaultHandler}s to specify fault handling policies over streams, and then wrap streams in + * {@link GuardedStream}s that apply they policies: + * + *
    + * import static ....Streams.*; 
    + * ...
    + * Stream<T> stream = ...
    + * 
    + * FaultHandler handler = new FaultHandler() {
    + *   public void handle(RuntimeException fault) {
    + *    ...
    + *   }
    + * };
    + * 
    + * Stream<T> guarded = guard(stream).with(handler);
    + * 
    + * + * + * {@link FaultHandler}s can ignore faults, rethrow them, rethrow different faults, or use the constant + * {@link FaultHandler#iteration} to stop the iteration of the underlying stream (cf. {@link Iteration#stop()}) + *

    + * + * + * Faults are unchecked exceptions thrown by {@link #next()}, often wrappers around an original cause. + * {@link FaultHandler}s can use a fluent API to simplify the task of analysing fault causes (cf. {@link Faults}): + * + *

    + * FaultHandler handler = new FaultHandler() {
    + *  	public void handle(RuntimeException fault) {
    + *           try {
    + *           	throw causeOf(fault).as(SomeException.class,SomeOtherException.class);
    + *           }
    + *           catch(SomeException e) {...}
    + *           catch(SomeOtherException e) {...}
    + *        }
    + * };
    + * 
    + * + *

    Consumption


    + * + * Clients may consume streams by explicitly iterating over their elements. Since streams are fallible and closeable, + * the recommended idiom is the following: + * + *
    + * Stream<T> stream = ...
    + * try {
    + *   while (stream.hasNext())
    + *     ....stream.next()...
    + * }
    + * finally {
    + *  stream.close();
    + * }
    + * 
    + * + * Alternatively, clients may provide {@link Callback}s to generic {@link StreamConsumer}s that iterate on + * behalf of clients. Using the simplifications of the DSL: + * + *
    + * Stream<T> stream = ...
    + * 
    + * Callback<T> callback = new Callback<T>() {
    + *  	public void consume(T element) {
    + *           ...element...
    + *        }
    + * };
    + * 
    + * consume(stream).with(callback);
    + * 
    + * + * {@link Callback}s can control iteration through the {@link Iteration} constant (cf. {@link Callback#iteration}): + * + *
    + * Callback<T> callback = new Callback<T>() {
    + *  	public void consume(T element) {
    + *  	    ...iteration.stop()...
    + *  		...
    + *        }
    + * };
    + * 
    + * + * + * + * @param the type of elements iterated over + * + * @author Fabio Simeoni + * + */ +public interface Stream extends Iterator { + + boolean hasNext(); + + /** + * @throws NoSuchElementException if the stream has no more elements or it has been closed + * @throws StreamOpenException if the stream cannot be opened + * @throws RuntimeException if the element cannot be returned + */ + E next(); + + /** + * Returns the stream locator. + * + * @return the locator + * @throws IllegalStateException if the stream is no longer addressable at the time of invocation. + */ + URI locator(); + + /** + * Closes the stream unconditionally, releasing any resources that it may be using. + *

    + * Subsequent invocations of this method have no effect.
    + * Subsequents invocations of {@link #hasNext()} return {@code false}.
    + * Subsequent invocations of {@link #next()} throw {@link NoSuchElementException}s. + *

    + * Failures are logged by implementations and suppressed otherwise. + */ + void close(); + + /** + * Returns true if the stream has been closed. + * + * @return true if the stream has been closed + */ + boolean isClosed(); + +} \ No newline at end of file diff --git a/src/main/java/org/gcube/data/streams/StreamConsumer.java b/src/main/java/org/gcube/data/streams/StreamConsumer.java new file mode 100644 index 0000000..48382cc --- /dev/null +++ b/src/main/java/org/gcube/data/streams/StreamConsumer.java @@ -0,0 +1,62 @@ +package org.gcube.data.streams; + +import org.gcube.data.streams.exceptions.StreamSkipSignal; +import org.gcube.data.streams.exceptions.StreamStopSignal; + +/** + * A generic {@link Stream} consumer that delegates element processing and failure handling to a {@link Callback}. + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public final class StreamConsumer { + + private final Stream stream; + private final Callback callback; + + /** + * Creates an instance with a {@link Stream} and a {@link Callback} + * @param stream the stream + * @param callback the callback + */ + public StreamConsumer(Stream stream, Callback callback) { + this.stream=stream; + this.callback=callback; + } + + /** + * Starts the iteration. + */ + public void start() { + consume(); + } + + //helper + private void consume() { + + try { + consuming: while (stream.hasNext()) { + + T next = stream.next(); + + try { + callback.consume(next); + } + catch(StreamSkipSignal skip) { + continue consuming; + } + catch(StreamStopSignal stop) { + break consuming; + } + } + } + finally { + stream.close(); + } + + } + + +} + + diff --git a/src/main/java/org/gcube/data/streams/Utils.java b/src/main/java/org/gcube/data/streams/Utils.java new file mode 100644 index 0000000..95220d4 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/Utils.java @@ -0,0 +1,63 @@ +package org.gcube.data.streams; + +import gr.uoa.di.madgik.commons.server.PortRange; +import gr.uoa.di.madgik.commons.server.TCPConnectionManager; +import gr.uoa.di.madgik.commons.server.TCPConnectionManagerConfig; +import gr.uoa.di.madgik.grs.proxy.tcp.TCPConnectionHandler; + +import java.net.InetAddress; +import java.util.ArrayList; + +import org.gcube.data.streams.exceptions.StreamContingency; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Library utils. + * + * @author Fabio Simeoni + * + */ +public class Utils { + + private static Logger log = LoggerFactory.getLogger(Utils.class); + + /** + * Initialises gRS2 library. + */ + public static synchronized void initialiseRS() { + + if (TCPConnectionManager.IsInitialized()) + return; + + log.info("gRS2 is not initialised: using defaults"); + + String host =null; + try { + host = InetAddress.getLocalHost().getHostName(); + } + catch(Exception e) { + log.info("could not discover hostname, using 'localhost' to allow offline usage"); + host="localhost"; + } + + try { + TCPConnectionManager.Init(new TCPConnectionManagerConfig(host,new ArrayList(),true)); + TCPConnectionManager.RegisterEntry(new TCPConnectionHandler()); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Indicates whether a failure or its indirect causes are annotated with {@link StreamContingency}. + * @param t the failure + * @return true if the failure or its indirect causes are annotated with {@link StreamContingency}. + */ + public static boolean isContingency(Throwable t) { + return t.getClass().isAnnotationPresent(StreamContingency.class) + || ((t.getCause()!=null) &&(isContingency(t.getCause()))); + } + +} diff --git a/src/main/java/org/gcube/data/streams/adapters/IteratorAdapter.java b/src/main/java/org/gcube/data/streams/adapters/IteratorAdapter.java new file mode 100644 index 0000000..69685a3 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/adapters/IteratorAdapter.java @@ -0,0 +1,59 @@ +package org.gcube.data.streams.adapters; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.Iterator; + +/** + * An adaptation strategy for {@link IteratorStream}. + * + * @author Fabio Simeoni + * @see IteratorStream + * + */ +public class IteratorAdapter implements Closeable { + + private final Iterator iterator; + + /** + * Creates an instance from a given {@link Iterator}. + * @param iterator the iterator + */ + public IteratorAdapter(Iterator iterator) { + this.iterator=iterator; + } + + /** + * Returns the underlying iterator. + * @return the iterator + */ + final Iterator iterator() { + return iterator; + } + + /** + * Returns a locator for the underlying iterator. + *

    + * By default it returns a locator of the form {@code local:}, where {@code } is the string + * obtained by invoking {@link #toString()} on the iterator. + * @return the locator + */ + public URI locator() { + return URI.create("local://"+iterator); + } + + /** + * Closes the underlying iterator. + *

    + * By defaults it has no effect except when the iterator implements the {@link Closeable} interface. In this + * case, it simply delegates to the iterator. + */ + @Override + public void close() throws IOException { + if (iterator instanceof Closeable) + ((Closeable) iterator).close(); + }; + + +} diff --git a/src/main/java/org/gcube/data/streams/adapters/IteratorStream.java b/src/main/java/org/gcube/data/streams/adapters/IteratorStream.java new file mode 100644 index 0000000..c1a1c99 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/adapters/IteratorStream.java @@ -0,0 +1,102 @@ +package org.gcube.data.streams.adapters; + +import java.net.URI; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.gcube.data.streams.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link Stream} adapter for {@link Iterator}s. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class IteratorStream implements Stream { + + private static Logger log =LoggerFactory.getLogger(IteratorStream.class); + + private final Iterator iterator; + private boolean closed; + private final IteratorAdapter adapter; + + + /** + * Creates an instance that adapts a given {@link IteratorAdapter}. + * @param adapter the adapter + */ + public IteratorStream(IteratorAdapter adapter) { + this.iterator=adapter.iterator(); + this.adapter=adapter; + } + + + /** + * Creates an instance that adapts a given {@link Iterator} with a default {@link IteratorAdapter}. + * @param iterator the iterator + */ + public IteratorStream(final Iterator iterator) { + this(new IteratorAdapter(iterator)); //use default adapter + } + + @Override + public boolean hasNext() { + + if (closed) + return false; //respect close semantics + + else { + + boolean hasNext = iterator.hasNext(); + + if (!hasNext) + close(); + + return hasNext; + } + + } + + @Override + public E next() { + + //respect close semantics + if (closed) + throw new NoSuchElementException(); + + return iterator.next(); + } + + + @Override + public void close() { + + try { + adapter.close(); + } + catch(Exception e) { + log.error("could not close iterator "+locator(),e); + } + finally { + closed=true; + } + } + + @Override + public URI locator() { + return adapter.locator(); + } + + @Override + public void remove() { + iterator.remove(); + } + + @Override + public boolean isClosed() { + return closed; + } +} diff --git a/src/main/java/org/gcube/data/streams/adapters/ResultsetStream.java b/src/main/java/org/gcube/data/streams/adapters/ResultsetStream.java new file mode 100644 index 0000000..a68f3b4 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/adapters/ResultsetStream.java @@ -0,0 +1,169 @@ +package org.gcube.data.streams.adapters; + +import gr.uoa.di.madgik.grs.buffer.IBuffer.Status; +import gr.uoa.di.madgik.grs.reader.ForwardReader; +import gr.uoa.di.madgik.grs.reader.GRS2ReaderException; +import gr.uoa.di.madgik.grs.record.Record; +import gr.uoa.di.madgik.grs.record.exception.GRS2UncheckedException; + +import java.net.URI; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +import org.gcube.data.streams.LookAheadStream; +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.exceptions.StreamException; +import org.gcube.data.streams.exceptions.StreamOpenException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link Stream} adapter for gRS2 resultsets. + *

    + * This implementation is not thread safe. + * + * @author Fabio Simeoni + * + */ +public class ResultsetStream extends LookAheadStream { + + private static Logger log =LoggerFactory.getLogger(ResultsetStream.class); + + public static final int default_timeout = 30; + public static final TimeUnit default_timeout_unit=TimeUnit.SECONDS; + + private final URI locator; + + private long timeout = default_timeout; + private TimeUnit timeoutUnit = default_timeout_unit; + + private boolean open=false; + private boolean closed=false; + private RuntimeException lookAheadFailure; + + private ForwardReader reader; + private Iterator iterator; + + + /** + * Creates a new instance with a result set locator. + * @param locator the locator. + * @throws IllegalArgumentException if the locator is null. + * */ + public ResultsetStream(URI locator) throws IllegalArgumentException { + + if (locator==null) + throw new IllegalArgumentException("invalid or null locator"); + + this.locator=locator; + + } + + public void setTimeout(long timeout, TimeUnit unit) throws IllegalArgumentException { + + if (timeout<=0 || timeoutUnit==null) + throw new IllegalArgumentException("invalid timeout or null timeout unit"); + + this.timeout = timeout; + this.timeoutUnit = unit; + } + + @Override + protected E delegateNext() { + + try { + if (lookAheadFailure!=null) + throw lookAheadFailure; + else + try { + return iterator.next(); + } + catch(GRS2UncheckedException e) { + + //get underlying cause + Throwable cause = e.getCause(); + + //rewrap checked cause as appropriate to this layer + if (cause instanceof RuntimeException) + throw (RuntimeException) cause; + else + throw new StreamException(cause); + } + } + finally { + lookAheadFailure=null; + } + } + + @Override + protected boolean delegateHasNext() { + + if (closed) + return false; + + if (!open) { + + try { + reader = new ForwardReader(locator); + reader.setIteratorTimeout(timeout); + reader.setIteratorTimeUnit(timeoutUnit); + } + catch (Throwable t) { + lookAheadFailure= new StreamOpenException("cannot open resultset "+locator,t); + return true; + } + + iterator = reader.iterator(); + + log.info("initialised resultset at "+locator); + + open=true; + } + + //infer outage if reader has been dismissed remotely + if (reader.getStatus()==Status.Dispose && !closed) + lookAheadFailure= new RuntimeException("unrecoverable failure in resultset "); + + boolean hasNext = iterator.hasNext(); + + return hasNext; + } + + + @Override + public void close() { + + if (open) { + try { + reader.close(); + log.info("closed resultset at "+locator); + } + catch(GRS2ReaderException e) { + log.error("could not close resultset",e); + } + open=false; + } + closed=true; + } + + @Override + public URI locator() throws IllegalStateException { + + if (open) + throw new IllegalStateException("locator is invalid as result set has already been opened"); + else + return locator; + + } + + @Override + public void remove() { + iterator.remove(); + } + + @Override + public boolean isClosed() { + return closed; + } +} diff --git a/src/main/java/org/gcube/data/streams/delegates/AbstractDelegateStream.java b/src/main/java/org/gcube/data/streams/delegates/AbstractDelegateStream.java new file mode 100644 index 0000000..638594c --- /dev/null +++ b/src/main/java/org/gcube/data/streams/delegates/AbstractDelegateStream.java @@ -0,0 +1,58 @@ +package org.gcube.data.streams.delegates; + +import org.gcube.data.streams.LookAheadStream; +import org.gcube.data.streams.Stream; + +/** + * Partial implementation for {@link Stream}s that delegate to an underlying streams. + * + * @author Fabio Simeoni + * + * @param the type of elements of the underlying stream + * @param the type of elements of the stream delegate + */ +abstract class AbstractDelegateStream extends LookAheadStream { + + private final Stream stream; + + /** + * Creates an instance that delegates to a given {@link Stream}. + * @param stream the stream + */ + AbstractDelegateStream(Stream stream) { + + if (stream==null) + throw new IllegalArgumentException("invalid null stream"); + + this.stream=stream; + } + + /** + * Returns the underlying {@link Stream}. + * @return the stream + */ + protected Stream stream() { + return stream; + } + + + @Override + public void close() { + stream.close(); + } + + @Override + public java.net.URI locator() throws IllegalStateException { + return stream.locator(); + }; + + @Override + public void remove() { + stream.remove(); + } + + @Override + public boolean isClosed() { + return stream.isClosed(); + } +} diff --git a/src/main/java/org/gcube/data/streams/delegates/FoldedStream.java b/src/main/java/org/gcube/data/streams/delegates/FoldedStream.java new file mode 100644 index 0000000..4e37360 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/delegates/FoldedStream.java @@ -0,0 +1,56 @@ +package org.gcube.data.streams.delegates; + +import java.util.ArrayList; +import java.util.List; + +import org.gcube.data.streams.Stream; + +/** + * A {@link Stream} that folds into lists the elements of another {@link Stream}. + * + * @author Fabio Simeoni + * + * @param the type of stream element + */ +public class FoldedStream extends AbstractDelegateStream> { + + private final int foldSize; + + /** + * Creates an instance with a {@link Stream} and a fold size. + * @param stream the stream + * @param foldSize the fault size + * @throws IllegalArgumentException if the stream is null or the size is not positive + */ + public FoldedStream(Stream stream,int foldSize) throws IllegalArgumentException { + + super(stream); + + if (foldSize<1) + throw new IllegalArgumentException("invalid foldsize is not positive"); + + this.foldSize=foldSize; + } + + @Override + protected List delegateNext() { + + //we do not deal with failures, streams will need to be guarded upstream + //we also do not deal with transformations, which will need to be applied upstream + + List fold = new ArrayList(); + + for (int i=0;i the type of stream element + */ +public class GuardedStream extends AbstractDelegateStream { + + /** + * Creates an instance with a {@link Stream} and a {@link FaultHandler} + * + * @param stream the stream + * @param handler the handler + * @throws IllegalArgumentException if the stream or the handler are null + */ + public GuardedStream(Stream stream, FaultHandler handler) throws IllegalArgumentException { + + super(stream); + + if (handler == null) + throw new IllegalArgumentException("invalid null generator"); + + this.setHandler(handler); + } + + @Override + protected E delegateNext() { + return stream().next(); + } + + @Override + protected boolean delegateHasNext() { + return stream().hasNext(); + } + +} diff --git a/src/main/java/org/gcube/data/streams/delegates/MonitoredStream.java b/src/main/java/org/gcube/data/streams/delegates/MonitoredStream.java new file mode 100644 index 0000000..5fd7da0 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/delegates/MonitoredStream.java @@ -0,0 +1,61 @@ +package org.gcube.data.streams.delegates; + +import org.gcube.data.streams.Stream; + +/** + * A {@link Stream} that notifies an {@link StreamListener} of key iteration events. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class MonitoredStream extends AbstractDelegateStream { + + private final StreamListener listener; + + private boolean started=false; + + /** + * Creates an instance with a {@link Stream} and a {@link StreamListener}. + * @param stream the stream + * @param listener the listener + * @throws IllegalArgumentException if the stream or the listener are {@code null} + */ + public MonitoredStream(Stream stream,StreamListener listener) throws IllegalArgumentException { + + super(stream); + + if (listener==null) + throw new IllegalArgumentException("invalid null listener"); + + this.listener=listener; + } + + @Override + protected E delegateNext() { + + E element = stream().next(); + + if (!started) { + listener.onStart(); + started=true; + } + + if (!delegateHasNext()) + listener.onEnd(); + + return element; + + } + + @Override + protected boolean delegateHasNext() { + return stream().hasNext(); + } + + @Override + public void close() { + super.close(); + listener.onClose(); + } +} diff --git a/src/main/java/org/gcube/data/streams/delegates/PipedStream.java b/src/main/java/org/gcube/data/streams/delegates/PipedStream.java new file mode 100644 index 0000000..f63569a --- /dev/null +++ b/src/main/java/org/gcube/data/streams/delegates/PipedStream.java @@ -0,0 +1,43 @@ +package org.gcube.data.streams.delegates; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.generators.Generator; + +/** + * A {@link Stream} of elements generated by the elements of an input {@link Stream} + * + * @author Fabio Simeoni + * + * @param the type of elements of the input stream + * @param the type of stream elements + */ +public class PipedStream extends AbstractDelegateStream { + + private final Generator generator; + + /** + * Creates an instance with a {@link Stream} and an element {@link Generator}. + * @param stream the stream + * @param generator the generator + * @throws IllegalArgumentException if the stream or the generator are null + */ + public PipedStream(Stream stream,Generator generator) throws IllegalArgumentException { + + super(stream); + + if (generator == null) + throw new IllegalArgumentException("invalid null generator"); + + this.generator=generator; + } + + @Override + protected E2 delegateNext() { + return generator.yield(stream().next()); + } + + @Override + protected boolean delegateHasNext() { + return stream().hasNext(); + } +} diff --git a/src/main/java/org/gcube/data/streams/delegates/StreamListener.java b/src/main/java/org/gcube/data/streams/delegates/StreamListener.java new file mode 100644 index 0000000..535ccc7 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/delegates/StreamListener.java @@ -0,0 +1,27 @@ +package org.gcube.data.streams.delegates; + +import org.gcube.data.streams.Stream; + +/** + * A listener of key events in the iteration of a target {@link Stream}. + * + * @author Fabio Simeoni + * + */ +public interface StreamListener { + + /** + * Invoked after the first element of the target {@link Stream} has been iterated over. + */ + void onStart(); + + /** + * Invoked after the last element of the target {@link Stream} has been iterated over. + */ + void onEnd(); + + /** + * Invoked then stream is closed. + */ + void onClose(); +} diff --git a/src/main/java/org/gcube/data/streams/delegates/StreamListenerAdapter.java b/src/main/java/org/gcube/data/streams/delegates/StreamListenerAdapter.java new file mode 100644 index 0000000..8aaaaea --- /dev/null +++ b/src/main/java/org/gcube/data/streams/delegates/StreamListenerAdapter.java @@ -0,0 +1,28 @@ +package org.gcube.data.streams.delegates; + +/** + * Adapter implementation for {@link StreamListener}. + * + * @author Fabio Simeoni + * + */ +public class StreamListenerAdapter implements StreamListener { + + @Override + public void onStart() { + // TODO Auto-generated method stub + + } + + @Override + public void onEnd() { + // TODO Auto-generated method stub + + } + + @Override + public void onClose() { + // TODO Auto-generated method stub + + } +} diff --git a/src/main/java/org/gcube/data/streams/delegates/UnfoldedStream.java b/src/main/java/org/gcube/data/streams/delegates/UnfoldedStream.java new file mode 100644 index 0000000..d8216fc --- /dev/null +++ b/src/main/java/org/gcube/data/streams/delegates/UnfoldedStream.java @@ -0,0 +1,110 @@ +package org.gcube.data.streams.delegates; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.exceptions.StreamSkipSignal; +import org.gcube.data.streams.exceptions.StreamStopSignal; +import org.gcube.data.streams.generators.Generator; + +/** + * A {@link Stream} of elements generated by unfolding the elements of an input {@link Stream} into multiple elements. + * + * @author Fabio Simeoni + * + * @param the type of elements of the input stream + * @param the type of stream elements + */ +public class UnfoldedStream extends AbstractDelegateStream { + + private final Generator> generator; + private Stream unfold; + + /** + * Creates an instance with a {@link Stream} and an element {@link Generator}. + * @param stream the stream + * @param generator the generator + * @throws IllegalArgumentException if the stream or the generator are null + */ + public UnfoldedStream(Stream stream,Generator> generator) throws IllegalArgumentException { + + super(stream); + + if (generator == null) + throw new IllegalArgumentException("invalid null generator"); + + this.generator=generator; + } + + private RuntimeException lookAheadFailure; + + @Override + protected E2 delegateNext() { + return lookAheadFailureOrNextInUnfold(); + } + + @Override + protected boolean delegateHasNext() { + + if (!hasUnfold()) + return false; + + return existsInThisOrNextUnfold(); + } + + @Override + public void close() { + + if (unfold!=null) + unfold.close(); + + stream().close(); + } + + //helpers + private boolean hasUnfold() { + + if (unfold==null) + if (stream().hasNext()) + try { + unfold = generator.yield(stream().next()); + } + catch(StreamStopSignal stop) { + return false; + } + catch(StreamSkipSignal skip) { + return hasUnfold(); + } + catch(RuntimeException unchecked) { + lookAheadFailure = unchecked; + } + else + return false; + + return true; + } + + private boolean existsInThisOrNextUnfold() { + + boolean hasNext = unfold.hasNext(); + + if (!hasNext) { + unfold.close(); + unfold=null; + return delegateHasNext(); + } + + return hasNext; + } + + private E2 lookAheadFailureOrNextInUnfold() { + + try { + if (lookAheadFailure!=null) + throw lookAheadFailure; + else + return unfold.next(); + } + finally { + lookAheadFailure=null; + } + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/Faults.java b/src/main/java/org/gcube/data/streams/dsl/Faults.java new file mode 100644 index 0000000..47ef499 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/Faults.java @@ -0,0 +1,132 @@ +package org.gcube.data.streams.dsl; + +/** + * A simple DSL for fault conversion. + * + * @author Fabio Simeoni + * + */ +public class Faults { + + /** + * Fault narrowing clause; + * + * @author Fabio Simeoni + * + */ + public static class OngoingRethrowClause { + + final RuntimeException caught; + final Throwable cause; + + /** + * Creates an instance with the fault to narrow. + * + * @param fault the fault + */ + OngoingRethrowClause(RuntimeException fault) { + this.caught=fault; + this.cause = fault.getCause()==null?fault:fault.getCause(); + } + + /** + * Rethrows the fault with a narrower type, or wraps it in a {@link RuntimeException} if its type cannot be + * narrowed. + * + * @param clazz1 the narrower type + * @return unused, allows clients to throw invocations of this method + * @throws T1 the narrower type + */ + public RuntimeException as(Class clazz1) throws T1 { + + if (clazz1.isInstance(cause)) + throw clazz1.cast(cause); + + else + + if (cause instanceof RuntimeException) + throw (RuntimeException) cause; + + else + return caught; + } + + /** + * Rethrows the fault with a narrower type, or wraps it in {@link RuntimeException} if its type cannot be + * narrowed. + * + * @param clazz1 the narrower type + * @param clazz2 an alternative narrower type + * @return unused, allows clients to throw invocations of this method + * @throws T1 the narrower type + * @throws T2 the second narrower type + */ + public RuntimeException as(Class clazz1, Class clazz2) + throws T1, T2 { + + if (clazz2.isInstance(cause)) + throw clazz2.cast(cause); + + else + return as(clazz1); + } + + /** + * Rethrows the fault with a narrower type, or wraps it in {@link RuntimeException} if its type cannot be + * narrowed. + * + * @param clazz1 the narrower type + * @param clazz2 an alternative narrower type + * @param clazz3 an alternative narrower type + * @return unused, allows clients to throw invocations of this method + * @throws T1 the narrower type + * @throws T2 the second narrower type + * @throws T3 the second narrower type + */ + public RuntimeException as(Class clazz1, + Class clazz2, Class clazz3) throws T1, T2, T3 { + + if (clazz3.isInstance(cause)) + throw clazz3.cast(cause); + + else + return as(clazz1, clazz2); + } + + /** + * Rethrows the fault with a narrower type, or wraps it in {@link RuntimeException} if its type cannot be + * narrowed. + * + * @param clazz1 the narrower type + * @param clazz2 an alternative narrower type + * @param clazz3 an alternative narrower type + * @param clazz4 an alternative narrower type + * @return unused, allows clients to throw invocations of this method + * @throws T1 the narrower type + * @throws T2 the second narrower type + * @throws T3 the second narrower type + * @throws T4 the second narrower type + */ + public RuntimeException as( + Class clazz1, Class clazz2, Class clazz3, Class clazz4) throws T1, T2, T3, T4 { + + if (clazz4.isInstance(cause)) + throw clazz4.cast(cause); + + else + return as(clazz1, clazz2, clazz3); + } + } + + /** + * Indicates a fault to be rethrown with a narrower type. + * + * @param fault the fault + * @return the next clause in the sentence + */ + public static OngoingRethrowClause causeOf(RuntimeException fault) { + + return new OngoingRethrowClause(fault); + + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/StreamClause.java b/src/main/java/org/gcube/data/streams/dsl/StreamClause.java new file mode 100644 index 0000000..97c6ce7 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/StreamClause.java @@ -0,0 +1,30 @@ +/** + * + */ +package org.gcube.data.streams.dsl; + +import org.gcube.data.streams.Stream; + + + +/** + * + * Base implementation for clauses of {@link Stream} sentences. + * + * @author Fabio Simeoni + * + * @param the type of elements of the stream + * @param the type of environment in which the clause is evaluated + */ +public class StreamClause> { + + protected ENV env; + + /** + * Creates an instance with a given evaluation environment. + * @param e the environment + */ + public StreamClause(ENV e) { + env=e; + } +} \ No newline at end of file diff --git a/src/main/java/org/gcube/data/streams/dsl/StreamClauseEnv.java b/src/main/java/org/gcube/data/streams/dsl/StreamClauseEnv.java new file mode 100644 index 0000000..e52b088 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/StreamClauseEnv.java @@ -0,0 +1,28 @@ +/** + * + */ +package org.gcube.data.streams.dsl; + +import org.gcube.data.streams.Stream; + + +/** + * The environment in which a {@link Stream} sentence is evaluated. + * + * @author Fabio Simeoni + * + * @param the type of elements of the input stream + * + */ +public class StreamClauseEnv { + + private final Stream stream; + + public StreamClauseEnv(Stream stream) { + this.stream=stream; + } + + public Stream stream() { + return stream; + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/Streams.java b/src/main/java/org/gcube/data/streams/dsl/Streams.java new file mode 100644 index 0000000..b81d1bb --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/Streams.java @@ -0,0 +1,253 @@ +package org.gcube.data.streams.dsl; + +import static java.util.Arrays.*; + +import java.net.URI; +import java.util.Iterator; +import java.util.List; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.adapters.IteratorAdapter; +import org.gcube.data.streams.adapters.IteratorStream; +import org.gcube.data.streams.dsl.consume.ConsumeWithClause; +import org.gcube.data.streams.dsl.fold.InClause; +import org.gcube.data.streams.dsl.from.RsOfClause; +import org.gcube.data.streams.dsl.guard.GuardWithClause; +import org.gcube.data.streams.dsl.listen.MonitorWithClause; +import org.gcube.data.streams.dsl.pipe.PipeThroughClause; +import org.gcube.data.streams.dsl.publish.PublishRsUsingClause; +import org.gcube.data.streams.dsl.publish.PublishRsWithClause; +import org.gcube.data.streams.dsl.unfold.UnfoldThroughClause; +import org.gcube.data.streams.generators.LoggingListener; +import org.gcube.data.streams.generators.NoOpGenerator; +import org.gcube.data.streams.handlers.FaultHandler; +import org.gcube.data.streams.handlers.IgnoreHandler; +import org.gcube.data.streams.handlers.RethrowHandler; +import org.gcube.data.streams.handlers.RethrowUnrecoverableHandler; +import org.gcube.data.streams.handlers.StopFastHandler; +import org.gcube.data.streams.publishers.RsStringRecordFactory; +import org.gcube.data.streams.test.FallibleIterator; + +import gr.uoa.di.madgik.grs.record.Record; + +/** + * + * The definitions of an eDSL of stream and stream-related expressions. + * + * @author Fabio Simeoni + * + */ +public class Streams { + + + //CONSUME + + /** + * Starts a sentence to consume a {@link Stream} + * @param stream the stream + * @return the next clause of the sentence + */ + public static ConsumeWithClause consume(Stream stream) { + return new ConsumeWithClause(stream); + } + + //CONVERT + + /** + * Converts an {@link Iterator} to a {@link Stream}. + * @param itarator the iterator + * @return the stream + */ + public static Stream convert(Iterator itarator) { + return new IteratorStream(itarator); + } + + /** + * Converts a custom {@link IteratorAdapter} to a {@link Stream}. + * @param adapter the adapter + * @return the stream + */ + public static IteratorStream convert(IteratorAdapter adapter) { + return new IteratorStream(adapter); + } + + /** + * Converts an {@link Iterable} to a {@link Stream}. + * @param iterable the iterable + * @return the stream + */ + public static Stream convert(Iterable iterable) { + return convert(iterable.iterator()); + } + + /** + * Converts one or more elements into a {@link Stream}. + * @param elements the elements + * @return the stream + */ + public static Stream convert(E ...elements) { + return convert(asList(elements)); + } + + /** + * Converts a mixture of exceptions and elements of a given type to a {@link Stream} of a that type. + * It's the client's responsibility to ensure that the elements that are not exceptions are homogeneously typed as the type indicated in input. + * @param clazz the stream type + * @param elements the elements + * @return the stream + */ + public static Stream convertWithFaults(Class clazz, Object...elements) { + return convertWithFaults(clazz,asList(elements)); + } + + /** + * Converts a mixture of exceptions and elements of a given type to a {@link Stream} of a that type. + * It's the client's responsibility to ensure that the elements that are not exceptions are homogeneously typed as the type indicated in input. + * @param clazz the stream type + * @param elements the elements + * @return the stream + */ + public static Stream convertWithFaults(Class clazz,List elements) { + + return convert(new FallibleIterator(clazz, elements)); + } + + /** + * Starts a sentence to convert a resultset into a {@link Stream}. + * @param locator the locator of the resultset + * @return the next clause of the sentence + */ + public static RsOfClause convert(URI locator) { + return new RsOfClause(locator); + } + + /** + * Returns a {@link Stream} of strings extracted from a resultset of {@link RsStringRecordFactory#STRING_RECORD}s. + * @param locator the locator of the resultset + * @return the stream + */ + public static Stream stringsIn(URI locator) { + return convert(locator).ofStrings().withDefaults(); + } + + + // PIPE + + /** + * Starts a sentence to produce a {@link Stream} generated from another {@link Stream}. + * @param stream the input stream. + * @return the next clause of the sentence + */ + public static PipeThroughClause pipe(Stream stream) { + return new PipeThroughClause(stream); + } + + + // FOLD + + /** + * Starts a sentence to produce a {@link Stream} that groups of elements of another {@link Stream}. + * @param stream the input stream. + * @return the next clause of the sentence + */ + public static InClause fold(Stream stream) { + return new InClause(stream); + } + + // UNFOLD + + /** + * Starts a sentence to produce a {@link Stream} that unfolds the elements of another {@link Stream}. + * @param stream the input stream. + * @return the next clause of the sentence + */ + public static UnfoldThroughClause unfold(Stream stream) { + return new UnfoldThroughClause(stream); + } + + // GUARD + + /** + * Starts a sentence to produce a {@link Stream} that controls the error raised by another {@link Stream}. + * @param stream the input stream. + * @return the next clause of the sentence + */ + public static GuardWithClause guard(Stream stream) { + return new GuardWithClause(stream); + } + + /** + * Starts a sentence to produce a {@link Stream} that notifies key events in the iteration of another {@link Stream}. + * @param stream the input stream. + * @return the next clause of the sentence + */ + public static MonitorWithClause monitor(Stream stream) { + return new MonitorWithClause(stream); + } + + // PUBLISH + + /** + * Starts a sentence to publish a {@link Stream} as a resultset. + * @param stream the stream + * @return the next clause of the sentence + */ + public static PublishRsUsingClause publish(Stream stream) { + return new PublishRsUsingClause(stream); + } + + /** + * Starts a sentence to publish a {@link Stream} as a resultset. + * @param stream the stream + * @return the next clause of the sentence + */ + public static PublishRsWithClause publishStringsIn(Stream stream) { + return new PublishRsUsingClause(stream).using(no_serialiser); + } + + /** + * Returns a {@link Stream} that logs the throughput of an input {@link Stream}. + * @param stream the input stream + * @return the output stream + */ + public static Stream log(Stream stream) { + LoggingListener listener = new LoggingListener(); + return monitor(pipe(stream).through(listener)).with(listener); + } + + // GENERATORS + + /** + * A {@link NoOpGenerator}. + */ + public static NoOpGenerator no_serialiser = new NoOpGenerator(); + + /** + * Returns a {@link NoOpGenerator}. + * @return the generator + */ + public static NoOpGenerator no_op(Stream stream) { + return new NoOpGenerator(); + } + + // HANDLERS + /** + * A {@link RethrowHandler} for failure handling. + */ + public static FaultHandler RETHROW_POLICY = new RethrowHandler(); + + /** + * A {@link RethrowUnrecoverableHandler} for failure handling. + */ + public static FaultHandler RETHROW_UNRECOVERABLE_POLICY = new RethrowUnrecoverableHandler(); + + /** + * A {@link StopFastHandler} for failure handling. + */ + public static FaultHandler STOPFAST_POLICY= new StopFastHandler(); + + /** + * A {@link IgnoreHandler} for failure handling. + */ + public static FaultHandler IGNORE_POLICY = new IgnoreHandler(); +} diff --git a/src/main/java/org/gcube/data/streams/dsl/consume/ConsumeWithClause.java b/src/main/java/org/gcube/data/streams/dsl/consume/ConsumeWithClause.java new file mode 100644 index 0000000..0b031ae --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/consume/ConsumeWithClause.java @@ -0,0 +1,35 @@ +package org.gcube.data.streams.dsl.consume; + +import org.gcube.data.streams.Callback; +import org.gcube.data.streams.StreamConsumer; +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.dsl.StreamClause; +import org.gcube.data.streams.dsl.StreamClauseEnv; + +/** + * The clause of {@code consume} sentences in which a {@link Callback} is configured on the input stream. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class ConsumeWithClause extends StreamClause> { + + /** + * Creates an instance from an input {@link Stream} + * + * @param stream the stream + */ + public ConsumeWithClause(Stream stream) { + super(new StreamClauseEnv(stream)); + } + + /** + * Return a {@link Stream} configured with a given {@link Callback}. + * + * @param consumer the consumer + */ + public void with(Callback consumer) { + new StreamConsumer(env.stream(), consumer).start(); + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/fold/InClause.java b/src/main/java/org/gcube/data/streams/dsl/fold/InClause.java new file mode 100644 index 0000000..e77f629 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/fold/InClause.java @@ -0,0 +1,34 @@ +package org.gcube.data.streams.dsl.fold; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.delegates.FoldedStream; +import org.gcube.data.streams.dsl.StreamClause; +import org.gcube.data.streams.dsl.StreamClauseEnv; + +/** + * The clause of {@code fold} sentences in which a fold size is configured for + * the output stream. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class InClause extends StreamClause> { + + /** + * Creates an instance with an input {@link Stream}. + * @param stream the stream + */ + public InClause(Stream stream) { + super(new StreamClauseEnv(stream)); + } + + /** + * Returns a {@link Stream} that folds the element of the input {@link Stream} in lists of a given size. + * @param foldSize the size + * @return the stream + */ + public FoldedStream in(int foldSize) { + return new FoldedStream(env.stream(),foldSize); + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/from/RsClause.java b/src/main/java/org/gcube/data/streams/dsl/from/RsClause.java new file mode 100644 index 0000000..cad3691 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/from/RsClause.java @@ -0,0 +1,24 @@ +package org.gcube.data.streams.dsl.from; + +import gr.uoa.di.madgik.grs.record.Record; + +/** + * Partial clause implementation for {@code convert} sentences. + * + * @author Fabio Simeoni + * + */ +abstract class RsClause { + + protected RsEnv env; + + /** + * Creates an instance from a {@link RsEnv} + * @param env the environment + */ + public RsClause(RsEnv env) { + + this.env=env; + } + +} diff --git a/src/main/java/org/gcube/data/streams/dsl/from/RsEnv.java b/src/main/java/org/gcube/data/streams/dsl/from/RsEnv.java new file mode 100644 index 0000000..b4d5b56 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/from/RsEnv.java @@ -0,0 +1,40 @@ +/** + * + */ +package org.gcube.data.streams.dsl.from; + +import gr.uoa.di.madgik.grs.record.Record; + +import java.net.URI; + + +/** + * The environment in which {@code convert} sentences are evaluated. + * + * @author Fabio Simeoni + * + */ +class RsEnv { + + protected URI locator; + + protected Class recordClass; + + /** + * Creates a new instance from a resultset locator + * @param locator the locator + */ + public RsEnv(URI locator) { + this.locator=locator; + } + + /** + * Creates a new instance from a resultset locator + * @param locator the locator + * @param recordClass the class; + */ + public RsEnv(URI locator, Class recordClass) { + this.locator=locator; + this.recordClass = recordClass; + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/from/RsOfClause.java b/src/main/java/org/gcube/data/streams/dsl/from/RsOfClause.java new file mode 100644 index 0000000..b4b557e --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/from/RsOfClause.java @@ -0,0 +1,43 @@ +package org.gcube.data.streams.dsl.from; + +import gr.uoa.di.madgik.grs.record.GenericRecord; +import gr.uoa.di.madgik.grs.record.Record; + +import java.net.URI; + +import org.gcube.data.streams.publishers.RsStringRecordFactory; + +/** + * A {@link RsClause} in which the record type of the input resultset is configured. + * + * @author Fabio Simeoni + * + */ +public class RsOfClause extends RsClause { + + /** + * Creates an instance from a resultset. + * @param locator the locator of the resultset + */ + public RsOfClause(URI locator) { + super(new RsEnv(locator)); + } + + /** + * Configures the type of records in the input resultset + * @param clazz the record type + * @return the next clause in the sentence + */ + public RsWithClause of(Class clazz) { + return new RsWithClause(new RsEnv(env.locator,clazz)); + } + + /** + * Configures the type of records in the input result set to {@link RsStringRecordFactory#STRING_RECORD}. + * @return the next clause in the sentence + */ + public RsStringWithClause ofStrings() { + return new RsStringWithClause(new RsEnv(env.locator)); + } + +} diff --git a/src/main/java/org/gcube/data/streams/dsl/from/RsStringWithClause.java b/src/main/java/org/gcube/data/streams/dsl/from/RsStringWithClause.java new file mode 100644 index 0000000..4c71322 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/from/RsStringWithClause.java @@ -0,0 +1,61 @@ +package org.gcube.data.streams.dsl.from; + +import gr.uoa.di.madgik.grs.record.GenericRecord; +import gr.uoa.di.madgik.grs.record.field.StringField; + +import java.util.concurrent.TimeUnit; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.adapters.ResultsetStream; +import org.gcube.data.streams.dsl.Streams; +import org.gcube.data.streams.generators.Generator; + +/** + * A {@link RsClause} in which the adapter of the input resultset is configured. + * + * @author Fabio Simeoni + * + */ +public class RsStringWithClause extends RsClause { + + // used internally to extract strings from records + private static Generator recordSerialiser = new Generator() { + @Override + public String yield(GenericRecord element) { + return ((StringField) element.getField(0)).getPayload(); + } + }; + + /** + * Creates an instance with a {@link RsEnv}. + * + * @param env the environment + */ + public RsStringWithClause(RsEnv env) { + super(env); + } + + /** + * Returns a {@link Stream} with a given read timeout on the input resultset + * + * @param timeout the timeout + * @param unit the time unit of the timeout + * @return the stream + */ + public Stream withTimeout(int timeout, TimeUnit unit) { + ResultsetStream recordStream = new ResultsetStream(env.locator); + recordStream.setTimeout(timeout, unit); + return Streams.pipe(recordStream).through(recordSerialiser); + } + + /** + * Returns a {@link Stream} with a {@link ResultsetStream#default_timeout} in a + * {@link ResultsetStream#default_timeout_unit} on the input resultset. + * + * @return the stream + */ + public Stream withDefaults() { + ResultsetStream recordStream = new ResultsetStream(env.locator); + return Streams.pipe(recordStream).through(recordSerialiser); + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/from/RsWithClause.java b/src/main/java/org/gcube/data/streams/dsl/from/RsWithClause.java new file mode 100644 index 0000000..d48c1fa --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/from/RsWithClause.java @@ -0,0 +1,68 @@ +package org.gcube.data.streams.dsl.from; + +import gr.uoa.di.madgik.grs.record.Record; + +import java.util.concurrent.TimeUnit; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.adapters.ResultsetStream; +import org.gcube.data.streams.dsl.Streams; +import org.gcube.data.streams.generators.Generator; + +/** + * A {@link RsClause} in which the adapter of the input resultset is configured. + * + * @author Fabio Simeoni + * + * @param the {@link Record} type of stream elements + */ +public class RsWithClause extends RsClause { + + /** + * Creates an instance with a {@link RsEnv}. + * + * @param env the environment + */ + public RsWithClause(RsEnv env) { + super(env); + } + + /** + * Returns a {@link Stream} with a given read timeout on the input resultset. + * + * @param timeout the timeout + * @param unit the time unit of the timeout + * @return the stream + */ + public Stream withTimeout(int timeout, TimeUnit unit) { + ResultsetStream stream = new ResultsetStream(env.locator); + stream.setTimeout(timeout, unit); + return Streams.pipe(stream).through(new RecordDeserialiser(env.recordClass)); + + } + + /** + * Returns a {@link Stream} with a {@link ResultsetStream#default_timeout} in a + * {@link ResultsetStream#default_timeout_unit} on the input resultset. + * + * @return the stream + */ + public Stream withDefaults() { + return new ResultsetStream(env.locator); + } + + // used internally to extract strings from records + private static class RecordDeserialiser implements Generator { + + private final Class recordClass; + + public RecordDeserialiser(Class recordClass) { + this.recordClass=recordClass; + } + + @Override + public R yield(Record element) { + return recordClass.cast(element); + } + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/guard/GuardWithClause.java b/src/main/java/org/gcube/data/streams/dsl/guard/GuardWithClause.java new file mode 100644 index 0000000..78d3d38 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/guard/GuardWithClause.java @@ -0,0 +1,35 @@ +package org.gcube.data.streams.dsl.guard; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.delegates.GuardedStream; +import org.gcube.data.streams.dsl.StreamClause; +import org.gcube.data.streams.dsl.StreamClauseEnv; +import org.gcube.data.streams.handlers.FaultHandler; + +/** + * The clause of {@code fold} sentences of the {@link Stream} DSL in which a {@link FaultHandler} is configured on + * the output stream. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class GuardWithClause extends StreamClause> { + + /** + * Creates an instance with an input {@link Stream}. + * @param stream the stream + */ + public GuardWithClause(Stream stream) { + super(new StreamClauseEnv(stream)); + } + + /** + * Returns a {@link Stream} with configured with a given {@link FaultHandler}. + * @param handler the handler + * @return the stream + */ + public GuardedStream with(FaultHandler handler) { + return new GuardedStream(env.stream(),handler); + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/listen/MonitorWithClause.java b/src/main/java/org/gcube/data/streams/dsl/listen/MonitorWithClause.java new file mode 100644 index 0000000..1be2fb9 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/listen/MonitorWithClause.java @@ -0,0 +1,36 @@ +package org.gcube.data.streams.dsl.listen; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.delegates.MonitoredStream; +import org.gcube.data.streams.delegates.StreamListener; +import org.gcube.data.streams.dsl.StreamClause; +import org.gcube.data.streams.dsl.StreamClauseEnv; + +/** + * The clause of {@code guard} sentences in which a {@link StreamListener} is configured on the stream. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class MonitorWithClause extends StreamClause> { + + /** + * Creates an instance from an input {@link Stream} + * + * @param stream the stream + */ + public MonitorWithClause(Stream stream) { + super(new StreamClauseEnv(stream)); + } + + /** + * Return a {@link Stream} configured with a given {@link StreamListener}. + * + * @param listener the listener + * @return the stream + */ + public MonitoredStream with(StreamListener listener) { + return new MonitoredStream(env.stream(), listener); + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/pipe/PipeThroughClause.java b/src/main/java/org/gcube/data/streams/dsl/pipe/PipeThroughClause.java new file mode 100644 index 0000000..02959bf --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/pipe/PipeThroughClause.java @@ -0,0 +1,36 @@ +package org.gcube.data.streams.dsl.pipe; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.delegates.PipedStream; +import org.gcube.data.streams.dsl.StreamClause; +import org.gcube.data.streams.dsl.StreamClauseEnv; +import org.gcube.data.streams.generators.Generator; + +/** + * The clause of {@code pipe} sentences in which a {@link Generator} is configured on the output stream. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class PipeThroughClause extends StreamClause> { + + /** + * Creates an instance from an input {@link Stream} + * + * @param stream the stream + */ + public PipeThroughClause(Stream stream) { + super(new StreamClauseEnv(stream)); + } + + /** + * Return a {@link Stream} configured with a given {@link Generator}. + * + * @param generator the generator + * @return the stream + */ + public PipedStream through(Generator generator) { + return new PipedStream(env.stream(), generator); + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/publish/PublishRsEnv.java b/src/main/java/org/gcube/data/streams/dsl/publish/PublishRsEnv.java new file mode 100644 index 0000000..a3d93dd --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/publish/PublishRsEnv.java @@ -0,0 +1,40 @@ +/** + * + */ +package org.gcube.data.streams.dsl.publish; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.dsl.StreamClauseEnv; +import org.gcube.data.streams.publishers.RecordFactory; + + +/** + * The {@link StreamClauseEnv} in which {@code publish} sentences are evaluated. + * + * @author Fabio Simeoni + * + * @param the type of elements of the input stream + * + */ +public class PublishRsEnv extends StreamClauseEnv { + + RecordFactory factory; + + /** + * Creates an instance with a {@link Stream} + * @param stream the stream + */ + public PublishRsEnv(Stream stream) { + super(stream); + } + + /** + * Creates an instance with a {@link Stream} and a {@link RecordFactory} + * @param stream the stream + * @param factory the factory + */ + public PublishRsEnv(Stream stream, RecordFactory factory) { + super(stream); + this.factory=factory; + } +} diff --git a/src/main/java/org/gcube/data/streams/dsl/publish/PublishRsUsingClause.java b/src/main/java/org/gcube/data/streams/dsl/publish/PublishRsUsingClause.java new file mode 100644 index 0000000..1f4788b --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/publish/PublishRsUsingClause.java @@ -0,0 +1,49 @@ +/** + * + */ +package org.gcube.data.streams.dsl.publish; + +import gr.uoa.di.madgik.grs.record.Record; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.dsl.StreamClause; +import org.gcube.data.streams.generators.Generator; +import org.gcube.data.streams.publishers.RecordFactory; +import org.gcube.data.streams.publishers.RsStringRecordFactory; + +/** + * The clause of {@code publish} sentences in which the type of {@link Record}s is configured on the output resultset. + * + * @author Fabio Simeoni + * + */ +public class PublishRsUsingClause extends StreamClause> { + + /** + * Creates an instance with an input {@link Stream}. + * @param stream the stream + */ + public PublishRsUsingClause(Stream stream) { + super(new PublishRsEnv(stream)); + } + + /** + * Configures a serialiser for the elements of the input {@link Stream}. + * @param serialiser the serialiser + * @return the next clause in the sentence + */ + public PublishRsWithClause using(Generator serialiser) { + env.factory = new RsStringRecordFactory(serialiser); + return new PublishRsWithClause(env); + } + + /** + * Configures a {@link RecordFactory} for the elements of the input {@link Stream}. + * @param factory the factory + * @return the next clause in the sentence + */ + public PublishRsWithClause using(RecordFactory factory) { + env.factory = factory; + return new PublishRsWithClause(env); + } +} \ No newline at end of file diff --git a/src/main/java/org/gcube/data/streams/dsl/publish/PublishRsWithClause.java b/src/main/java/org/gcube/data/streams/dsl/publish/PublishRsWithClause.java new file mode 100644 index 0000000..1b5c050 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/publish/PublishRsWithClause.java @@ -0,0 +1,106 @@ +/** + * + */ +package org.gcube.data.streams.dsl.publish; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.dsl.StreamClause; +import org.gcube.data.streams.handlers.FaultHandler; +import org.gcube.data.streams.publishers.ThreadProvider; +import org.gcube.data.streams.publishers.RsPublisher; + +/** + * The clause of {@code publish} sentences in which the output resultset is configured. + * + * @author Fabio Simeoni + * + */ +public class PublishRsWithClause extends StreamClause> { + + private final RsPublisher publisher; + + /** + * Creates an instance with the {@link PublishRsEnv} of the ongoing sentence. + * + * @param e the environment + */ + public PublishRsWithClause(PublishRsEnv e) { + super(e); + this.publisher = new RsPublisher(e.stream(), e.factory); + } + + /** + * Configures the element capacity of the resultset buffer. + * + * @param size the number of elements in the buffer + * @return this clause + */ + public PublishRsWithClause withBufferOf(int size) { + publisher.setBufferSize(size); + return this; + } + + /** + * Configures the publishing timeout of the resultset. + *

    + * If the timeout expire, elements of the input {@link Stream} will no longer be published, though they may still be + * consumed if {@link #nonstop()} has been invoked. + * + * @param timeout the timeout + * @param unit the time unit for the timeout + * @return this clause + */ + public PublishRsWithClause withTimeoutOf(int timeout, TimeUnit unit) { + publisher.setTimeout(timeout, unit); + return this; + } + + /** + * Configures publication to continue consuming the input {@link Stream} after the expiry of the publishing timeout. + *

    + * Typically used for the side-effects of publications. + * + * @return this clause + */ + public PublishRsWithClause nonstop() { + publisher.setOnDemand(false); + return this; + } + + /** + * Configures a {@link ThreadProvider} for the publishing thread. + *

    + * Publication occurs asynchronously and a thread provider may be required to make available thread-bound + * information in the publishing thread. + * + * @param provider the thread provider + * @return this clause + */ + public PublishRsWithClause with(ThreadProvider provider) { + publisher.setThreadProvider(provider); + return this; + } + + /** + * Configures a {@link FaultHandler} for publication and returns the locator of the resultset. + * + * @param handler the handler + * @return the locator the locator + */ + public URI with(FaultHandler handler) { + publisher.setFaultHandler(handler); + return publisher.publish(); + } + + /** + * Returns the locator of the resultset. + * + * @return the locator. + */ + public URI withDefaults() { + return publisher.publish(); + } +} \ No newline at end of file diff --git a/src/main/java/org/gcube/data/streams/dsl/unfold/UnfoldThroughClause.java b/src/main/java/org/gcube/data/streams/dsl/unfold/UnfoldThroughClause.java new file mode 100644 index 0000000..21e096f --- /dev/null +++ b/src/main/java/org/gcube/data/streams/dsl/unfold/UnfoldThroughClause.java @@ -0,0 +1,36 @@ +package org.gcube.data.streams.dsl.unfold; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.delegates.UnfoldedStream; +import org.gcube.data.streams.dsl.StreamClause; +import org.gcube.data.streams.dsl.StreamClauseEnv; +import org.gcube.data.streams.generators.Generator; + +/** + * The clause of {@code unfold} sentences in which a {@link Generator} is configured on the output stream. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class UnfoldThroughClause extends StreamClause> { + + /** + * Creates an instance from an input {@link Stream} + * + * @param stream the stream + */ + public UnfoldThroughClause(Stream stream) { + super(new StreamClauseEnv(stream)); + } + + /** + * Return a {@link Stream} configured with a given {@link Generator}. + * + * @param generator the generator + * @return the stream + */ + public UnfoldedStream through(Generator> generator) { + return new UnfoldedStream(env.stream(), generator); + } +} diff --git a/src/main/java/org/gcube/data/streams/exceptions/StreamContingency.java b/src/main/java/org/gcube/data/streams/exceptions/StreamContingency.java new file mode 100644 index 0000000..2ec7179 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/exceptions/StreamContingency.java @@ -0,0 +1,21 @@ +package org.gcube.data.streams.exceptions; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +/** + * Intended for {@link Exception} classes to mark them as unrecoverable for iteration purposes. + * + * @author Fabio Simeoni + * + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Inherited +@Documented +public @interface StreamContingency {} diff --git a/src/main/java/org/gcube/data/streams/exceptions/StreamException.java b/src/main/java/org/gcube/data/streams/exceptions/StreamException.java new file mode 100644 index 0000000..9a002d7 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/exceptions/StreamException.java @@ -0,0 +1,43 @@ +package org.gcube.data.streams.exceptions; + +import org.gcube.data.streams.Stream; + +/** + * A failure that occurs when first accessing a {@link Stream}. + * + * @author Fabio Simeoni + * + */ +public class StreamException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Creates an instance. + */ + public StreamException() { + super(); + } + + /** + * Creates an instance with a given cause. + * @param cause the cause + */ + public StreamException(Throwable cause) { + super(cause); + } + + /** + * Creates an instance with a given message and a given cause. + * @param msg the message + * @param cause the cause + */ + public StreamException(String msg, Throwable cause) { + super(msg,cause); + } + + +} diff --git a/src/main/java/org/gcube/data/streams/exceptions/StreamOpenException.java b/src/main/java/org/gcube/data/streams/exceptions/StreamOpenException.java new file mode 100644 index 0000000..512ee9e --- /dev/null +++ b/src/main/java/org/gcube/data/streams/exceptions/StreamOpenException.java @@ -0,0 +1,36 @@ +package org.gcube.data.streams.exceptions; + +import org.gcube.data.streams.Stream; + +/** + * A failure that occurs when a {@link Stream} cannot be opened. + * + * @author Fabio Simeoni + * + */ +public class StreamOpenException extends StreamException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Creates an instance with a given cause. + * @param cause the cause + */ + public StreamOpenException(Throwable cause) { + super(cause); + } + + /** + * Creates an instance with a given message and a given cause. + * @param msg the message + * @param cause the cause + */ + public StreamOpenException(String msg, Throwable cause) { + super(msg,cause); + } + + +} diff --git a/src/main/java/org/gcube/data/streams/exceptions/StreamPublishException.java b/src/main/java/org/gcube/data/streams/exceptions/StreamPublishException.java new file mode 100644 index 0000000..d8bdf0a --- /dev/null +++ b/src/main/java/org/gcube/data/streams/exceptions/StreamPublishException.java @@ -0,0 +1,36 @@ +package org.gcube.data.streams.exceptions; + +import org.gcube.data.streams.Stream; + +/** + * A failure that occurs when a {@link Stream} cannot be published. + * + * @author Fabio Simeoni + * + */ +public class StreamPublishException extends StreamException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Creates an instance with a given cause. + * @param cause the cause + */ + public StreamPublishException(Throwable cause) { + super(cause); + } + + /** + * Creates an instance with a given message and a given cause. + * @param msg the message + * @param cause the cause + */ + public StreamPublishException(String msg, Throwable cause) { + super(msg,cause); + } + + +} diff --git a/src/main/java/org/gcube/data/streams/exceptions/StreamSkipSignal.java b/src/main/java/org/gcube/data/streams/exceptions/StreamSkipSignal.java new file mode 100644 index 0000000..7c79677 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/exceptions/StreamSkipSignal.java @@ -0,0 +1,28 @@ +package org.gcube.data.streams.exceptions; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.generators.Generator; +import org.gcube.data.streams.handlers.FaultHandler; + +/** + * Used in {@link Generator}s or {@link FaultHandler}s to signals that the current element of a {@link Stream} should be skipped. + * + * @author Fabio Simeoni + * + */ +public class StreamSkipSignal extends StreamException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Creates an instance. + */ + public StreamSkipSignal() { + super(); + } + + +} diff --git a/src/main/java/org/gcube/data/streams/exceptions/StreamStopSignal.java b/src/main/java/org/gcube/data/streams/exceptions/StreamStopSignal.java new file mode 100644 index 0000000..7b77702 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/exceptions/StreamStopSignal.java @@ -0,0 +1,18 @@ +package org.gcube.data.streams.exceptions; + +import org.gcube.data.streams.Callback; +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.handlers.FaultHandler; + +/** + * Used internally by {@link FaultHandler}s and {@link Callback}s to require the premature end of an iteration over a + * {@link Stream} + * + * @author Fabio Simeoni + * + */ +public class StreamStopSignal extends RuntimeException { + + private static final long serialVersionUID = 1L; + +} diff --git a/src/main/java/org/gcube/data/streams/generators/Filter.java b/src/main/java/org/gcube/data/streams/generators/Filter.java new file mode 100644 index 0000000..9774a28 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/generators/Filter.java @@ -0,0 +1,22 @@ +package org.gcube.data.streams.generators; + +import org.gcube.data.streams.exceptions.StreamSkipSignal; + +/** + * A partial implementation of {@link Filter} that provides support for skipping elements + * @author Fabio Simeoni + * + * @param the type of input elements + * @param the type of yielded elements + */ +public abstract class Filter implements Generator { + + private final StreamSkipSignal skip = new StreamSkipSignal(); + + /** + * Invoked to skip the current element. + */ + protected void skip() { + throw skip; + } +} diff --git a/src/main/java/org/gcube/data/streams/generators/Generator.java b/src/main/java/org/gcube/data/streams/generators/Generator.java new file mode 100644 index 0000000..f534a0a --- /dev/null +++ b/src/main/java/org/gcube/data/streams/generators/Generator.java @@ -0,0 +1,35 @@ +package org.gcube.data.streams.generators; + +import org.gcube.data.streams.Iteration; +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.exceptions.StreamSkipSignal; +import org.gcube.data.streams.exceptions.StreamStopSignal; + +/** + * Yields elements of a {@link Stream} from elements of another {@link Stream}. + * + * @author Fabio Simeoni + * + * @param the type of elements in the input stream + * @param the type of elements in the output stream + * + * @see Stream + */ +public interface Generator { + + /** The ongoing iteration. */ + static final Iteration iteration = new Iteration(); + + + /** + * Yields an element of a {@link Stream} from an element of another {@link Stream}. + * + * @param element the input element + * @return the output element + * @throws StreamSkipSignal if no element should be yielded from the input element (i.e. the element should + * not contribute to the output stream) + * @throws StreamStopSignal if no further element should be yielded + * @throws RuntimeException if no element can be yielded from the input element + */ + E2 yield(E1 element) throws StreamSkipSignal,StreamStopSignal; +} diff --git a/src/main/java/org/gcube/data/streams/generators/LoggingListener.java b/src/main/java/org/gcube/data/streams/generators/LoggingListener.java new file mode 100644 index 0000000..69ac544 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/generators/LoggingListener.java @@ -0,0 +1,59 @@ +package org.gcube.data.streams.generators; + +import static java.lang.Math.*; +import static java.lang.System.*; + +import org.gcube.data.streams.delegates.StreamListener; +import org.gcube.data.streams.delegates.StreamListenerAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A pass-through {@link Generator} that acts as a {@link StreamListener} for throughput logging purposes. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class LoggingListener extends StreamListenerAdapter implements Generator { + + static Logger log = LoggerFactory.getLogger(LoggingListener.class); + + static long count=0; + static long starttime=0; + + static private String rformat = "streamed %1d elements in %2d ms (%3d/sec)"; + + + /**{@inheritDoc}*/ + @Override + public void onStart() { + log.info("started processing"); + starttime=currentTimeMillis(); + } + + + /**{@inheritDoc}*/ + @Override + public E yield(E element) { + + count++; + return element; + } + + /**{@inheritDoc}*/ + @Override + public void onClose() { + + String report; + if (starttime==0) //we may have iterated over no elements at all + report="processed 0 elements"; + else { + long time= currentTimeMillis()-starttime; + long ratio = round(((double)count/time)*1000); + report = String.format(rformat,count,time,ratio); + } + + log.info(report); + } +} diff --git a/src/main/java/org/gcube/data/streams/generators/NoOpGenerator.java b/src/main/java/org/gcube/data/streams/generators/NoOpGenerator.java new file mode 100644 index 0000000..e2faebd --- /dev/null +++ b/src/main/java/org/gcube/data/streams/generators/NoOpGenerator.java @@ -0,0 +1,17 @@ +package org.gcube.data.streams.generators; + + +/** + * A pass-through {@link Generator}. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class NoOpGenerator implements Generator { + + @Override + public E yield(E element) { + return element; + }; +} diff --git a/src/main/java/org/gcube/data/streams/generators/Processor.java b/src/main/java/org/gcube/data/streams/generators/Processor.java new file mode 100644 index 0000000..fff67fc --- /dev/null +++ b/src/main/java/org/gcube/data/streams/generators/Processor.java @@ -0,0 +1,31 @@ +package org.gcube.data.streams.generators; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.exceptions.StreamSkipSignal; + +/** + * A {@link Filter} that changes the elements of the input {@link Stream}. + * + * @author Fabio Simeoni + * + * @param the type of input elements + */ +public abstract class Processor extends Filter { + + @Override + public final E yield(E element) { + + process(element); + return element; + }; + + /** + * Processes an element of a {@link Stream}. + * + * @param element the input element + * @throws StreamSkipSignal if no element should be yielded from the input element (i.e. the element + * should not contribute to the output stream) + * @throws RuntimeException ion if no element can be yielded from the input element + */ + protected abstract void process(E element); +} diff --git a/src/main/java/org/gcube/data/streams/handlers/CountingHandler.java b/src/main/java/org/gcube/data/streams/handlers/CountingHandler.java new file mode 100644 index 0000000..5620056 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/handlers/CountingHandler.java @@ -0,0 +1,32 @@ +package org.gcube.data.streams.handlers; + + + +/** + * A partial implementation for {@link FaultHandler} that count failures and keep track of the last observed failure. + * + * @author Fabio Simeoni + * + */ +public abstract class CountingHandler implements FaultHandler { + + private int count; + private Exception lastFailure; + + @Override + public final void handle(RuntimeException failure) { + handle(failure, lastFailure, count); + count++; + lastFailure = failure; + } + + /** + * Indicates whether iteration should continue or stop the iteration on the occurrence of an iteration failure. + * + * @param failure the failure + * @param lastFailure the failure observed previously, or null if this is the first observed failure + * @param failureCount the number of failures counted so far, 0 if this is the first observed failure + * @throws RuntimeException if no element can be yielded from the input element + */ + protected abstract void handle(Exception failure, Exception lastFailure, int failureCount); +} diff --git a/src/main/java/org/gcube/data/streams/handlers/FaultHandler.java b/src/main/java/org/gcube/data/streams/handlers/FaultHandler.java new file mode 100644 index 0000000..cb50c8d --- /dev/null +++ b/src/main/java/org/gcube/data/streams/handlers/FaultHandler.java @@ -0,0 +1,24 @@ +package org.gcube.data.streams.handlers; + +import org.gcube.data.streams.Iteration; +import org.gcube.data.streams.Stream; + +/** + * Handlers of {@link Stream} iteration failures. + * @author Fabio Simeoni + * + */ +public interface FaultHandler { + + /** The ongoing iteration. */ + static final Iteration iteration = new Iteration(); + + /** + * Indicates whether iteration should continue or stop the iteration on the occurrence of an iteration failure. + * @param failure the failure + * @throws RuntimeException if no element can be yielded from the input element + * + */ + void handle(RuntimeException failure); + +} diff --git a/src/main/java/org/gcube/data/streams/handlers/IgnoreHandler.java b/src/main/java/org/gcube/data/streams/handlers/IgnoreHandler.java new file mode 100644 index 0000000..f1a1f05 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/handlers/IgnoreHandler.java @@ -0,0 +1,14 @@ +package org.gcube.data.streams.handlers; + + +/** + * A {@link FaultHandler} that silently absorbs all failures. + * + * @author Fabio Simeoni + * + */ +public class IgnoreHandler implements FaultHandler { + + @Override + public void handle(RuntimeException failure) {} +} diff --git a/src/main/java/org/gcube/data/streams/handlers/RethrowHandler.java b/src/main/java/org/gcube/data/streams/handlers/RethrowHandler.java new file mode 100644 index 0000000..4d74186 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/handlers/RethrowHandler.java @@ -0,0 +1,17 @@ +package org.gcube.data.streams.handlers; + + +/** + * A {@link FaultHandler} that rethrows all failures (i.e. does not handle any). + * + * @author Fabio Simeoni + * + */ +public class RethrowHandler implements FaultHandler { + + @Override + public void handle(RuntimeException failure) { + + throw failure; + } +} diff --git a/src/main/java/org/gcube/data/streams/handlers/RethrowUnrecoverableHandler.java b/src/main/java/org/gcube/data/streams/handlers/RethrowUnrecoverableHandler.java new file mode 100644 index 0000000..70d6a0b --- /dev/null +++ b/src/main/java/org/gcube/data/streams/handlers/RethrowUnrecoverableHandler.java @@ -0,0 +1,22 @@ +package org.gcube.data.streams.handlers; + +import static org.gcube.data.streams.Utils.*; + +import org.gcube.data.streams.exceptions.StreamContingency; + +/** + * A {@link FaultHandler} that silently absorbs {@link StreamContingency}s + * but re-throws all other failures. + * + * @author Fabio Simeoni + * + */ +public class RethrowUnrecoverableHandler implements FaultHandler { + + @Override + public void handle(RuntimeException failure) { + + if (!isContingency(failure)) + throw failure; + } +} diff --git a/src/main/java/org/gcube/data/streams/handlers/StopFastHandler.java b/src/main/java/org/gcube/data/streams/handlers/StopFastHandler.java new file mode 100644 index 0000000..bc7441c --- /dev/null +++ b/src/main/java/org/gcube/data/streams/handlers/StopFastHandler.java @@ -0,0 +1,17 @@ +package org.gcube.data.streams.handlers; + + +/** + * A {@link FaultHandler} that silently stops iteration at the first occurrence of any failure. + * + * @author Fabio Simeoni + * + */ +public class StopFastHandler implements FaultHandler { + + @Override + public void handle(RuntimeException failure) { + + iteration.stop(); + } +} diff --git a/src/main/java/org/gcube/data/streams/handlers/StopUnrecoverableHandler.java b/src/main/java/org/gcube/data/streams/handlers/StopUnrecoverableHandler.java new file mode 100644 index 0000000..8dfa085 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/handlers/StopUnrecoverableHandler.java @@ -0,0 +1,22 @@ +package org.gcube.data.streams.handlers; + +import static org.gcube.data.streams.Utils.*; + +import org.gcube.data.streams.exceptions.StreamContingency; + +/** + * A {@link FaultHandler} that silently absorbs {@link StreamContingency}s + * and stops iteration at the first unrecoverable failure. + * + * @author Fabio Simeoni + * + */ +public class StopUnrecoverableHandler implements FaultHandler { + + @Override + public void handle(RuntimeException failure) { + + if (!isContingency(failure)) + iteration.stop(); + } +} diff --git a/src/main/java/org/gcube/data/streams/publishers/RecordFactory.java b/src/main/java/org/gcube/data/streams/publishers/RecordFactory.java new file mode 100644 index 0000000..bbc3fbf --- /dev/null +++ b/src/main/java/org/gcube/data/streams/publishers/RecordFactory.java @@ -0,0 +1,34 @@ +package org.gcube.data.streams.publishers; + +import gr.uoa.di.madgik.grs.record.Record; +import gr.uoa.di.madgik.grs.record.RecordDefinition; + +import org.gcube.data.streams.Iteration; +import org.gcube.data.streams.Stream; + +/** + * Generates {@link Record}s from the elements of a {@link Stream}. + * + * @author Fabio Simeoni + * + * @param the type of the elements + */ +public interface RecordFactory { + + /** The ongoing iteration. */ + static final Iteration iteration = new Iteration(); + + /** + * Returns the definitions of the records. + * @return the definitions + */ + RecordDefinition[] definitions(); + + /** + * Generates a {@link Record} from a {@link Stream} element. + * @param element the element + * @return the record + * @throws RuntimeException if no record can be generated from the input element + */ + Record newRecord(E element); +} diff --git a/src/main/java/org/gcube/data/streams/publishers/RsPublisher.java b/src/main/java/org/gcube/data/streams/publishers/RsPublisher.java new file mode 100644 index 0000000..36e7260 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/publishers/RsPublisher.java @@ -0,0 +1,312 @@ +package org.gcube.data.streams.publishers; + +import static gr.uoa.di.madgik.grs.writer.RecordWriter.*; +import static org.gcube.data.streams.Utils.*; +import gr.uoa.di.madgik.grs.buffer.IBuffer.Status; +import gr.uoa.di.madgik.grs.record.Record; +import gr.uoa.di.madgik.grs.writer.GRS2WriterException; +import gr.uoa.di.madgik.grs.writer.RecordWriter; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.Utils; +import org.gcube.data.streams.exceptions.StreamPublishException; +import org.gcube.data.streams.exceptions.StreamSkipSignal; +import org.gcube.data.streams.exceptions.StreamStopSignal; +import org.gcube.data.streams.generators.Generator; +import org.gcube.data.streams.handlers.FaultHandler; +import org.gcube.data.streams.handlers.RethrowHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Publishes {@link Stream}s as resultsets. + * + * @author Fabio Simeoni + * + * @param the type of stream element + * + */ +public class RsPublisher implements StreamPublisher { + + private static Logger log = LoggerFactory.getLogger(RsPublisher.class); + + private final Stream stream; + private final RecordFactory factory; + + private RsTransport transport; + + // resultset writer parameters + private int bufferSize = DefaultBufferCapacity; + private long timeout = DefaultInactivityTimeout; + private TimeUnit timeoutUnit = DefaultInactivityTimeUnit; + private boolean onDemand = true; + + // default thread provider + private ThreadProvider provider = new ThreadProvider() { + @Override + public Thread newThread(Runnable task) { + return new Thread(task); + } + }; + + // by default, we re-throw failures and handle them in publishing loop + private FaultHandler handler = new RethrowHandler(); + + /** + * Creates an instance for a given {@link Stream} and with a given element serialiser. + * + * @param stream the stream + * @param serialiser the serialiser + */ + public RsPublisher(Stream stream, Generator serialiser) { + this(stream, new RsStringRecordFactory(serialiser)); + } + + /** + * Creates an instance for a given {@link Stream} and with a given {@link RecordFactory}. + * + * @param stream the stream + * @param factory the factory + */ + public RsPublisher(Stream stream, RecordFactory factory) { + + if (stream == null || factory == null || factory.definitions() == null) + throw new IllegalArgumentException("invalid or null inputs"); + + this.stream = stream; + this.factory = factory; + } + + /** + * Sets the size of the write buffer. + * + * @param size the size in bytes. + * @throws IllegalArgumentException if the size is not a positive number + */ + public void setBufferSize(int size) throws IllegalArgumentException { + + if (size <= 0) + throw new IllegalArgumentException("invalid empty buffer"); + + this.bufferSize = size; + } + + /** + * Sets the time to wait on a full write buffer. After the time has elapsed publication stops. + * + * @param timeout the timeout + * @param timeoutUnit the timeout unit + * @throws IllegalArgumentException if the timeout is not a positive number or the timeout unit is null + */ + public void setTimeout(long timeout, TimeUnit timeoutUnit) throws IllegalArgumentException { + + if (timeout <= 0 || timeoutUnit == null) + throw new IllegalArgumentException("invalid timeout or time unit"); + + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; + } + + /** + * Sets the resultset transport. + * + * @param transport the transport + * @throws IllegalArgumentException if the transport is null + */ + public void setTransport(RsTransport transport) { + + if (transport == null) + throw new IllegalArgumentException("invalid null transport"); + + this.transport = transport; + } + + /** + * Sets the production mode of the publiher + * + * @param onDemand true if the stream ought to be consumed only when clients require it, + * false if it should be consumed continuously. + */ + public void setOnDemand(boolean onDemand) { + this.onDemand = onDemand; + } + + /** + * Sets the {@link ThreadProvider} used to populate the resultset. + * + * @param provider the provider + * @throws IllegalArgumentException if the provider is null. + */ + public void setThreadProvider(ThreadProvider provider) { + + if (provider == null) + throw new IllegalArgumentException("invalid null provider"); + + this.provider = provider; + } + + /** + * Sets the {@link FaultHandler} for reading and writing failures. + * + * @param handler the handler + * @throws IllegalArgumentException if the handler is null + */ + public void setFaultHandler(FaultHandler handler) { + + if (handler == null) + throw new IllegalArgumentException("invalid null handler"); + + this.handler = handler; + } + + @Override + public URI publish() throws StreamPublishException { + + Utils.initialiseRS(); + + if (transport == null) + transport = RsTransport.TCP; + + URI locator; + RecordWriter writer; + + // publish + try { + + writer = new RecordWriter(transport.proxy(), // The proxy that defines the way the writer can be + // accessed + factory.definitions(), // The definitions of the records the gRS handles + bufferSize, // The capacity of the underlying synchronization buffer + DefaultConcurrentPartialCapacity, // The maximum number of records that can be concurrently accessed + // on partial transfer + DefaultMirrorBufferFactor, // The maximum fraction of the buffer that should be transfered during + // mirroring + timeout, // The timeout in time units after which an inactive gRS can be disposed + timeoutUnit // The time unit in timeout after which an inactive gRS can be disposed + ); + + locator = writer.getLocator(); + + } catch (GRS2WriterException e) { + throw new StreamPublishException("cannot publish stream as resultset", e); + } + + Runnable feeder = newFeeder(writer, locator); + + provider.newThread((feeder)).start(); + + return locator; + } + + // used internally: the task that consumes the stream to publish it in the resultset + private Runnable newFeeder(final RecordWriter writer, final URI locator) { + + return new Runnable() { + + @Override + public void run() { + + while (stream.hasNext()) { + + try { + publishNextElementOrFailure(writer); + } + //stop publishing + catch (RuntimeException e) { + + //also stop consuming if publication was on demand + if (onDemand) + break; + else + close(writer,locator); //close as soon as we can + } + } + + close(writer,locator); + stream.close(); + } + }; + + } + + private void publishNextElementOrFailure(RecordWriter writer) { + + try { + + try { + publish(writer,nextRecord()); + } + catch(StreamSkipSignal skip) {//skip this element and continue + return; + } + catch(StreamStopSignal stop) {//rethrow stop + throw stop; + } + catch(RuntimeException failure) { + + //publish failure + publish(writer, failure); + + //stop publishing if cannot be recognised as contingency + if (!isContingency(failure)) + throw failure; + } + + } + catch(GRS2WriterException failure) { + throw new RuntimeException(failure);//stop publishing + } + } + + private Record nextRecord() { + + try { + E element = stream.next(); + return factory.newRecord(element); + } + catch (RuntimeException e) { + try { + handler.handle(e); + } + catch(StreamStopSignal stop) { + throw e; + } + throw new StreamSkipSignal(); + } + } + + //private helper: publish a record + private void publish(RecordWriter writer, Record record) throws GRS2WriterException { + + if (writer.getStatus() == Status.Open) + if (!writer.put(record, timeout, timeoutUnit)) { + log.trace("client is not consuming resulset, stop publishing"); + throw new GRS2WriterException(); + } + + } + + //private helper: publish a failure + private void publish(RecordWriter writer, Throwable failure) throws GRS2WriterException { + if (writer.getStatus() == Status.Open) + if (!writer.put(failure, timeout, timeoutUnit)) { + log.trace("client is not consuming resulset, stop publishing"); + throw new GRS2WriterException(); + } + } + + + private void close(RecordWriter writer, final URI locator) { + + if (writer.getStatus() == Status.Open) { + try { + writer.close(); + } catch (GRS2WriterException e) {//log anomaly + log.error("error closing resultset at " + locator, e); + } + } + } +} diff --git a/src/main/java/org/gcube/data/streams/publishers/RsStringRecordFactory.java b/src/main/java/org/gcube/data/streams/publishers/RsStringRecordFactory.java new file mode 100644 index 0000000..714c3b8 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/publishers/RsStringRecordFactory.java @@ -0,0 +1,51 @@ +package org.gcube.data.streams.publishers; + +import gr.uoa.di.madgik.grs.record.GenericRecord; +import gr.uoa.di.madgik.grs.record.GenericRecordDefinition; +import gr.uoa.di.madgik.grs.record.RecordDefinition; +import gr.uoa.di.madgik.grs.record.field.Field; +import gr.uoa.di.madgik.grs.record.field.FieldDefinition; +import gr.uoa.di.madgik.grs.record.field.StringField; +import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.generators.Generator; + +/** + * A {@link RecordFactory} for {@link #STRING_RECORD}s with serialisations of {@link Stream} elements. + *

    + * An untyped record is a record with a string-valued payload field. + * + * @author Fabio Simeoni + * + * @param the type of the serialised values + */ +public class RsStringRecordFactory implements RecordFactory { + + /** The type definition of a record with a string-valued payload field. */ + public static final RecordDefinition STRING_RECORD = + new GenericRecordDefinition(new FieldDefinition[]{new StringFieldDefinition("value")}); + + private final Generator serialiser; + + /** + * Creates an instance with a {@link Generator} that returns serialisations of {@link Stream} elements. + * @param serialiser the serialiser + */ + public RsStringRecordFactory(Generator serialiser) { + this.serialiser=serialiser; + } + + @Override + public GenericRecord newRecord(E element) { + String serialisation = serialiser.yield(element); + GenericRecord record = new GenericRecord(); + record.setFields(new Field[]{new StringField(serialisation)}); + return record; + }; + + @Override + public RecordDefinition[] definitions() { + return new RecordDefinition[]{STRING_RECORD}; + } +} diff --git a/src/main/java/org/gcube/data/streams/publishers/RsTransport.java b/src/main/java/org/gcube/data/streams/publishers/RsTransport.java new file mode 100644 index 0000000..71e6b69 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/publishers/RsTransport.java @@ -0,0 +1,29 @@ +package org.gcube.data.streams.publishers; + +import gr.uoa.di.madgik.grs.proxy.IWriterProxy; +import gr.uoa.di.madgik.grs.proxy.local.LocalWriterProxy; +import gr.uoa.di.madgik.grs.proxy.tcp.TCPWriterProxy; + +/** + * The transport used by a {@link RsPublisher}. + * + * @author Fabio Simeoni + * + */ +public enum RsTransport { + + TCP() { + IWriterProxy proxy() { + return new TCPWriterProxy(); + } + }, + LOCAL { + @Override + IWriterProxy proxy() { + return new LocalWriterProxy(); + } + }; + + abstract IWriterProxy proxy(); + +} diff --git a/src/main/java/org/gcube/data/streams/publishers/StreamPublisher.java b/src/main/java/org/gcube/data/streams/publishers/StreamPublisher.java new file mode 100644 index 0000000..81c94e2 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/publishers/StreamPublisher.java @@ -0,0 +1,21 @@ +package org.gcube.data.streams.publishers; + +import java.net.URI; + +import org.gcube.data.streams.Stream; + +/** + * Publishes a {@link Stream} at a given address. + * + * @author Fabio Simeoni + * + */ +public interface StreamPublisher { + + /** + * Publishes the stream and returns its address. + * @return the address + * @throws StreamPublishException if the stream cannot be published + */ + URI publish(); +} diff --git a/src/main/java/org/gcube/data/streams/publishers/ThreadProvider.java b/src/main/java/org/gcube/data/streams/publishers/ThreadProvider.java new file mode 100644 index 0000000..9f34b81 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/publishers/ThreadProvider.java @@ -0,0 +1,18 @@ +package org.gcube.data.streams.publishers; + +import org.gcube.data.streams.Stream; + +/** + * Provides {@link Thread}s for the asynchronous publicaton of {@link Stream}. + * @author Fabio Simeoni + * + */ +public interface ThreadProvider { + + /** + * Provides a new {@link Thread} in which to execute the publication task. + * @param task the task + * @return the {@link Thread} + */ + Thread newThread(Runnable task); +} diff --git a/src/main/java/org/gcube/data/streams/test/FallibleIterator.java b/src/main/java/org/gcube/data/streams/test/FallibleIterator.java new file mode 100644 index 0000000..628125f --- /dev/null +++ b/src/main/java/org/gcube/data/streams/test/FallibleIterator.java @@ -0,0 +1,76 @@ +package org.gcube.data.streams.test; + +import java.util.Iterator; +import java.util.List; + +import org.gcube.data.streams.exceptions.StreamSkipSignal; + +/** + * An {@link Iterator} that can be staged to throw faults as well as elements. Used for testing purposes. + * + * @author Fabio Simeoni + * + * @param the type of stream elements + */ +public class FallibleIterator implements Iterator { + + final List elements; + final Class clazz; + int index; + + /** + * Creates an instance with a given type of the elements and given actual elements. + * + * @param clazz the type of elements + * @param elements the actual elements, including {@link RuntimeException}s + * @throws IllegalArgumentException if the elements are neither {@link RuntimeException}s nor have the declared type. + */ + public FallibleIterator(Class clazz, List elements) throws IllegalArgumentException { + this.clazz = clazz; + // upfront type checks + for (Object e : elements) { + try { + if (!(e instanceof RuntimeException)) + clazz.cast(e); + } catch (ClassCastException ex) { + throw new IllegalArgumentException("invalid stream element: " + e + " is neither a " + clazz.getSimpleName() + + " nor a RuntimeException"); + } + } + + this.elements = elements; + } + + @Override + public boolean hasNext() { + if (index < elements.size()) { + if (elements.get(index) instanceof StreamSkipSignal) { + index++; + return hasNext(); + } else + return true; + } else + return false; + + } + + @Override + public E next() { + + Object o = elements.get(index); + + index++; + + // throw unchecked as they are + if (o instanceof RuntimeException) + throw (RuntimeException) o; + + return clazz.cast(o); + } + + @Override + public void remove() { + + } + +} diff --git a/src/main/java/org/gcube/data/streams/test/StreamProvider.java b/src/main/java/org/gcube/data/streams/test/StreamProvider.java new file mode 100644 index 0000000..5d77dcf --- /dev/null +++ b/src/main/java/org/gcube/data/streams/test/StreamProvider.java @@ -0,0 +1,18 @@ +package org.gcube.data.streams.test; + +import org.gcube.data.streams.Stream; + +/** + * Generates a {@link Stream} for testing purposes. + * + * @author Fabio Simeoni + * + */ +public interface StreamProvider { + + /** + * Generates a {@link Stream} + * @return the stream. + */ + Stream get(); +} diff --git a/src/main/java/org/gcube/data/streams/test/Utils.java b/src/main/java/org/gcube/data/streams/test/Utils.java new file mode 100644 index 0000000..2cf3e66 --- /dev/null +++ b/src/main/java/org/gcube/data/streams/test/Utils.java @@ -0,0 +1,141 @@ +package org.gcube.data.streams.test; + + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +import org.gcube.data.streams.Stream; +import org.gcube.data.streams.exceptions.StreamSkipSignal; + +/** + * Collection of test facilities to validate {@link Stream} implementation and their usage. + * + * @author Fabio Simeoni + * + */ +public class Utils { + + + /** + * Returns the elements of a {@link Stream} as a {@link List}, including failures. + * @param stream the stream + * @return the elements + */ + public static List elementsOf(Stream stream) { + + List outcomes = new ArrayList(); + + //consume + while (stream.hasNext()) + try { + Object e = stream.next(); + outcomes.add(e); + } + catch(StreamSkipSignal skip) { + continue; + } + catch(RuntimeException ex) { + outcomes.add(ex); + } + + stream.close(); + + return outcomes; + } + + /** + * Applies a set of sets to test a stream respects the constraints defined by the interface. + * @param provider a {@link StreamProvider} that repeatedly provides the stream to test + */ + public static void validateWith(StreamProvider provider) { + + isAddressableAndClosable(provider.get()); + canBeIteratedOver(provider.get()); + respectsCloseSemantics(provider.get()); + + } + + //a validation test + private static void isAddressableAndClosable(Stream stream) { + + if (stream.locator()==null) + throw new AssertionError("locator is null"); + + if (stream.isClosed()) + throw new AssertionError("stream is already closed"); + + + stream.close(); + + if (!stream.isClosed()) + throw new AssertionError("stream has not been closed"); + } + + //a validation test + private static void respectsCloseSemantics(Stream stream) { + + if (stream.isClosed()) + throw new AssertionError("stream is already closed"); + + stream.close(); + + if (!stream.isClosed()) + throw new AssertionError("stream has been closed but does not reveal it"); + + if (stream.hasNext()) + throw new AssertionError("stream indicates that it has elements after being closed"); + + + try { + stream.next(); + throw new AssertionError("stream returns elements after being closed"); + } + catch(NoSuchElementException ex) { + //expected + } + + } + + //a validation test + private static void canBeIteratedOver(Stream stream) { + + //can be iterated without hasNext(); + try { + if (stream.next()==null) + throw new AssertionError("next() returns null"); + } + catch(NoSuchElementException e) { + if (stream.hasNext()) + throw new AssertionError("stream has no elements but hasNext() returns true"); + } + catch (RuntimeException e) { + //ignore exception for this test + } + + //consume + while (stream.hasNext()) + try { + if (stream.next()==null) + throw new AssertionError("hasNext() returns true but next() returns null"); + } + catch(RuntimeException e) { + //ignore exceptions for this test + } + + //hasNext() is idempotent + if (stream.hasNext()) + throw new AssertionError("hasNext() is not idempotent"); + + //verify reading past end + try { + stream.next(); + throw new AssertionError("stream can be read past its end"); + } + catch(NoSuchElementException e) { + //expected + } + + stream.close(); + } +} diff --git a/src/test/java/org/gcube/data/streams/CallbackTest.java b/src/test/java/org/gcube/data/streams/CallbackTest.java new file mode 100644 index 0000000..9e40fab --- /dev/null +++ b/src/test/java/org/gcube/data/streams/CallbackTest.java @@ -0,0 +1,112 @@ +package org.gcube.data.streams; + +import static java.util.Arrays.*; +import static junit.framework.Assert.*; +import static org.gcube.data.streams.TestUtils.*; +import static org.gcube.data.streams.dsl.Streams.*; +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +public class CallbackTest { + + @Test + public void callbackConsumeStreams() { + + final List data = asList("1","2","3"); + + Stream stream = convert(data); + + final List consumed = new ArrayList(); + + Callback callback = new Callback() { + + @Override + public void consume(String element) { + consumed.add(element); + } + }; + + consume(stream).with(callback); + + assertTrue(stream.isClosed()); + + assertEquals(data,consumed); + } + + + @Test + public void callbackStopIteration() { + + final List data = asList("1","2","3"); + + Stream stream = convert(data); + + final List consumed = new ArrayList(); + + Callback callback = new Callback() { + + @Override + public void consume(String element) { + if (element.equals("2")) + iteration.stop(); + else + consumed.add(element); + } + }; + + consume(stream).with(callback); + + assertTrue(stream.isClosed()); + + assertEquals(asList("1"),consumed); + } + + @Test + public void callbackSkipsElement() { + + final List data = asList("1","2","3"); + + Stream stream = convert(data); + + final List consumed = new ArrayList(); + + Callback callback = new Callback() { + + @Override + public void consume(String element) { + if (element.equals("2")) + iteration.skip(); + else + consumed.add(element); + } + }; + + consume(stream).with(callback); + + assertTrue(stream.isClosed()); + + assertEquals(asList("1","3"),consumed); + } + + @Test + public void consumerPropagatesFailures() { + + + Stream stream = stringsAndFaults("1",fault1,"3"); + + @SuppressWarnings("unchecked") + Callback callback = mock(Callback.class); + + try { + consume(stream).with(callback); + fail(); + } + catch(Exception e) { + assertEquals(fault1,e); + } + } +} diff --git a/src/test/java/org/gcube/data/streams/FoldedStreamTest.java b/src/test/java/org/gcube/data/streams/FoldedStreamTest.java new file mode 100644 index 0000000..02f9162 --- /dev/null +++ b/src/test/java/org/gcube/data/streams/FoldedStreamTest.java @@ -0,0 +1,54 @@ +package org.gcube.data.streams; + +import static java.util.Arrays.*; +import static junit.framework.Assert.*; +import static org.gcube.data.streams.TestUtils.*; +import static org.gcube.data.streams.dsl.Streams.*; +import static org.gcube.data.streams.test.Utils.*; + +import java.util.List; + +import org.gcube.data.streams.test.StreamProvider; +import org.junit.Test; + +public class FoldedStreamTest { + + static int foldStep = 2; + + @Test + public void foldedStreamsAreValidStreams() throws Exception { + + StreamProvider provider = new StreamProvider() { + public Stream get() { + return fold(convert("1", "2", "3")).in(foldStep); + } + }; + + validateWith(provider); + + @SuppressWarnings("unchecked") + List folded = asList(asList("1","2"),asList("3")); + + assertEquals(folded, elementsOf(provider.get())); + + } + + @Test + public void foldingHandlesFailures() throws Exception { + + StreamProvider provider = new StreamProvider() { + public Stream get() { + return fold(stringsAndFaults(fault1,"1",fault2,"2","3",fault3)).in(foldStep); + } + }; + + validateWith(provider); + + List folded = asList(fault1,fault2,asList("2", "3"),fault3); + + assertEquals(folded, elementsOf(provider.get())); + + } + + +} diff --git a/src/test/java/org/gcube/data/streams/GuardedStreamTest.java b/src/test/java/org/gcube/data/streams/GuardedStreamTest.java new file mode 100644 index 0000000..2a35caf --- /dev/null +++ b/src/test/java/org/gcube/data/streams/GuardedStreamTest.java @@ -0,0 +1,149 @@ +package org.gcube.data.streams; + +import static java.util.Arrays.*; +import static java.util.Collections.*; +import static junit.framework.Assert.*; +import static org.gcube.data.streams.TestUtils.*; +import static org.gcube.data.streams.dsl.Streams.*; +import static org.gcube.data.streams.test.Utils.*; + +import java.util.Arrays; +import java.util.List; + +import org.gcube.data.streams.generators.Generator; +import org.gcube.data.streams.handlers.CountingHandler; +import org.gcube.data.streams.handlers.FaultHandler; +import org.gcube.data.streams.test.StreamProvider; +import org.junit.Test; + +public class GuardedStreamTest { + + static List testData = Arrays.asList("1","2","3"); + + static List testFailingData1 = Arrays.asList(fault1,"1",fault2,"2",fault3,"3"); + static List testFailingData2 = Arrays.asList(skip,"1",fault2,"2","3",fault3); + + static Generator doubler = new Generator() { + @Override + public String yield(String element) { + return element+element; + } + }; + + @Test + public void consumeNoFailures() throws Exception { + + final List data = asList("1","2","3"); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + return guard(convert(data)).with(STOPFAST_POLICY); + } + }; + + validateWith(provider); + + assertEquals(data,elementsOf(provider.get())); + + } + + @Test + public void consumeIgnoreFailures() { + + final List data = asList(fault1,"1",fault2,"2",fault3,"3"); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + return guard(stringsAndFaults(data)).with(IGNORE_POLICY); + } + }; + + validateWith(provider); + + List preserved = asList("1","2","3"); + + assertEquals(preserved,elementsOf(provider.get())); + + } + + @Test + public void consumeStopFast() { + + final List data = asList(fault1,"1",fault2,"2",fault3,"3"); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + return guard(stringsAndFaults(data)).with(STOPFAST_POLICY); + } + }; + + validateWith(provider); + + assertEquals(emptyList(),elementsOf(provider.get())); + + } + + @Test + public void consumeStopFastWithSkips() { + + final List data = asList(skip,"1",fault2,"2","3",fault3); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + return guard(stringsAndFaults(data)).with(STOPFAST_POLICY); + } + }; + + assertEquals(asList("1"),elementsOf(provider.get())); + + } + + @Test + public void consumeStopFastCustomPolicy() { + + final FaultHandler customPolicy = new FaultHandler() { + + @Override + public void handle(RuntimeException failure) { + if (failure==fault2) + iteration.stop(); + } + }; + + final List data = asList(fault1,"1",fault2,"2",fault3,"3"); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + return guard(stringsAndFaults(data)).with(customPolicy); + } + }; + + assertEquals(asList("1"),elementsOf(provider.get())); + + } + + @Test + public void consumeStopFastcountingPolicy() { + + final FaultHandler customPolicy = new CountingHandler() { + + @Override + protected void handle(Exception failure, Exception lastFailure, int failureCount) { + if (failureCount>=2) + iteration.stop(); + } + + }; + + final List data = asList(fault1,"1",fault2,"2",fault3,"3"); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + return guard(stringsAndFaults(data)).with(customPolicy); + } + }; + + assertEquals(asList("1","2"),elementsOf(provider.get())); + + } +} diff --git a/src/test/java/org/gcube/data/streams/IteratorStreamTest.java b/src/test/java/org/gcube/data/streams/IteratorStreamTest.java new file mode 100644 index 0000000..efd79eb --- /dev/null +++ b/src/test/java/org/gcube/data/streams/IteratorStreamTest.java @@ -0,0 +1,50 @@ +package org.gcube.data.streams; + +import static java.util.Arrays.*; +import static junit.framework.Assert.*; +import static org.gcube.data.streams.TestUtils.*; +import static org.gcube.data.streams.dsl.Streams.*; +import static org.gcube.data.streams.test.Utils.*; + +import java.util.Arrays; +import java.util.List; + +import org.gcube.data.streams.test.StreamProvider; +import org.junit.Test; + +public class IteratorStreamTest { + + @Test + public void iteratorsMakeValidStreams() { + + final List data = Arrays.asList("1","2","3"); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + return convert(data); + } + }; + + validateWith(provider); + + assertEquals(data,elementsOf(provider.get())); + } + + + @Test + public void iteratorsWithFailuresMakeValidStreams() { + + final List data = asList(fault1,"1",fault2,"2","3",fault3); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + return convert(data); + } + }; + + validateWith(provider); + + assertEquals(data,elementsOf(provider.get())); + + } +} diff --git a/src/test/java/org/gcube/data/streams/MonitoredStreamTest.java b/src/test/java/org/gcube/data/streams/MonitoredStreamTest.java new file mode 100644 index 0000000..323db4c --- /dev/null +++ b/src/test/java/org/gcube/data/streams/MonitoredStreamTest.java @@ -0,0 +1,66 @@ +package org.gcube.data.streams; + +import static junit.framework.Assert.*; +import static org.gcube.data.streams.TestUtils.*; +import static org.gcube.data.streams.dsl.Streams.*; +import static org.gcube.data.streams.test.Utils.*; +import static org.mockito.Mockito.*; + +import java.util.Arrays; +import java.util.List; + +import org.gcube.data.streams.delegates.StreamListener; +import org.gcube.data.streams.test.StreamProvider; +import org.junit.Test; + +public class MonitoredStreamTest { + + static List testData = Arrays.asList("1","2","3"); + static List testFailingData = Arrays.asList(fault1,"1",fault2,"2","3",fault3); + + @Test + public void consumeAndListens() throws Exception { + + StreamListener listener = mock(StreamListener.class); + + Stream stream = convert(testData); + stream = monitor(stream).with(listener); + + //just consume stream + elementsOf(stream); + + verify(listener).onStart(); + verify(listener).onEnd(); + verify(listener,times(2)).onClose(); + } + + @Test + public void consume() throws Exception { + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream stream = convert(testData); + return monitor(stream).with(mock(StreamListener.class)); + } + }; + + validateWith(provider); + + assertEquals(testData, elementsOf(provider.get())); + } + + @Test + public void handleFailure() throws Exception { + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream stream = convert(stringsAndFaults(testFailingData)); + return monitor(stream).with(mock(StreamListener.class)); + } + }; + + validateWith(provider); + + assertEquals(testFailingData, elementsOf(provider.get())); + } +} diff --git a/src/test/java/org/gcube/data/streams/PipedStreamTest.java b/src/test/java/org/gcube/data/streams/PipedStreamTest.java new file mode 100644 index 0000000..6569f92 --- /dev/null +++ b/src/test/java/org/gcube/data/streams/PipedStreamTest.java @@ -0,0 +1,142 @@ +package org.gcube.data.streams; + +import static java.util.Arrays.*; +import static junit.framework.Assert.*; +import static org.gcube.data.streams.TestUtils.*; +import static org.gcube.data.streams.dsl.Streams.*; +import static org.gcube.data.streams.test.Utils.*; + +import java.util.List; + +import org.gcube.data.streams.generators.Generator; +import org.gcube.data.streams.test.StreamProvider; +import org.junit.Test; + +public class PipedStreamTest { + + static Generator doubler = new Generator() { + @Override + public String yield(String element) { + return element+element; + } + }; + + @Test + public void pipesYieldValidStreams() throws Exception { + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream stream = convert("1","2","3"); + return pipe(stream).through(doubler); + } + }; + + + validateWith(provider); + + List piped = asList("11","22","33"); + + assertEquals(piped,elementsOf(provider.get())); + + } + + @Test + public void pipesHandleFailures() { + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream stream = stringsAndFaults(fault1,"1",fault2,"2","3",fault3); + return pipe(stream).through(doubler); + } + }; + + validateWith(provider); + + List piped = asList(fault1,"11",fault2,"22","33",fault3); + + assertEquals(piped,elementsOf(provider.get())); + + } + + @Test + public void pipesHandleSkipSignals() { + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream stream = stringsAndFaults(skip,"1",skip,"2","3",skip); + return pipe(stream).through(doubler); + } + }; + + List piped = asList("11","22","33"); + + assertEquals(piped,elementsOf(provider.get())); + + } + + @Test + public void pipesHandleStopSignals() { + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream stream = stringsAndFaults("1","2",stop,"3"); + return pipe(stream).through(doubler); + } + }; + + List piped = asList("11","22"); + + assertEquals(piped,elementsOf(provider.get())); + + } + + @Test + public void generatorsCanSkipElements() { + + final Generator generator = new Generator() { + @Override + public String yield(String element) { + if (element.equals("2")) + iteration.skip(); + return element+element; + } + }; + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream stream = stringsAndFaults("1","2","3"); + return pipe(stream).through(generator); + } + }; + + List piped = asList("11","33"); + + assertEquals(piped,elementsOf(provider.get())); + + } + + @Test + public void generatorsCanStopIteration() { + + final Generator generator = new Generator() { + @Override + public String yield(String element) { + if (element.equals("2")) + iteration.stop(); + return element+element; + } + }; + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream stream = stringsAndFaults("1","2","3"); + return pipe(stream).through(generator); + } + }; + + List piped = asList("11"); + + assertEquals(piped,elementsOf(provider.get())); + + } +} diff --git a/src/test/java/org/gcube/data/streams/PublishTest.java b/src/test/java/org/gcube/data/streams/PublishTest.java new file mode 100644 index 0000000..19e2a32 --- /dev/null +++ b/src/test/java/org/gcube/data/streams/PublishTest.java @@ -0,0 +1,181 @@ +package org.gcube.data.streams; + +import static java.util.Arrays.*; +import static junit.framework.Assert.*; +import static org.gcube.data.streams.TestUtils.*; +import static org.gcube.data.streams.dsl.Streams.*; +import static org.gcube.data.streams.test.Utils.*; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.gcube.data.streams.generators.Generator; +import org.gcube.data.streams.test.StreamProvider; +import org.junit.BeforeClass; +import org.junit.Test; + +public class PublishTest { + + static List testData = Arrays.asList("1","2","3","4","5"); + + + @BeforeClass + public static void setup() { + + System.setProperty("org.slf4j.simplelogger.defaultlog", "trace"); + } + + @Test + public void publishAndRead() { + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream stream = convert(testData); + URI rs = publishStringsIn(stream).withDefaults(); + return convert(rs).ofStrings().withDefaults(); + } + }; + + validateWith(provider); + + assertEquals(testData,elementsOf(provider.get())); + } + + @Test + public void publishOnDemand() throws Exception { + + final List consumed = new ArrayList(); + + final Generator consumer = new Generator() { + @Override + public String yield(String element) { + consumed.add(true); + return element; + } + }; + + Stream stream = convert(testData); + stream = pipe(stream).through(consumer); + + URI rs = publishStringsIn(stream).withBufferOf(2).withDefaults(); + + Thread.sleep(100); + + int generated = consumed.size(); + assertTrue("only some elements have been moved",generated consumed = new ArrayList(); + + final Generator consumer = new Generator() { + @Override + public String yield(String element) { + consumed.add(true); + return element; + } + }; + + Stream stream = convert(testData); + stream = pipe(stream).through(consumer); + + publishStringsIn(stream).withBufferOf(2).withTimeoutOf(100,TimeUnit.MILLISECONDS).nonstop().withDefaults(); + + //wait for longer than timeout to trigger continuous publication + Thread.sleep(200); + + assertEquals("all elements have been consumed",testData.size(),consumed.size()); + + } + + @Test + public void publishWithConfiguration() throws Exception { + + List elements = asList("1","2"); + + Stream stream = convert(elements); + + URI resultset = publishStringsIn(stream).nonstop().withBufferOf(10).withTimeoutOf(1,TimeUnit.HOURS).withDefaults(); + + Stream published = convert(resultset).ofStrings().withDefaults(); + + int i=0; + while (published.hasNext()) { + assertEquals(elements.get(i),published.next()); + i++; + } + + assertEquals(2,i); + } + + @Test + public void contingenciesArePublished() { + + List data = asList(contingency1,"1",contingency2,"2","3",contingency3); + + Stream stream = stringsAndFaults(data); + + URI rs = publishStringsIn(stream).withDefaults(); + + Stream published = convert(rs).ofStrings().withDefaults(); + + List elements = elementsOf(published); + + System.out.println(elements); + + for (int i = 0; i segmentUntilOutage = asList("1",fault1); + List data = new ArrayList(segmentUntilOutage); + data.add("2"); + + Stream stream = stringsAndFaults(data); + + URI rs = publishStringsIn(stream).withDefaults(); + + Stream published = convert(rs).ofStrings().withDefaults(); + + List elements = elementsOf(published); + + //resultset stops at first outage + assertEquals(elements.size(),segmentUntilOutage.size()); + + for (int i = 0; i elements = asList("1","2","3"); + + StreamProvider provider = new StreamProvider() { + + @Override + public Stream get() { + Stream stream = convert(elements); + URI resultset = publishStringsIn(stream).withDefaults(); + return convert(resultset).ofStrings().withDefaults(); + } + }; + + validateWith(provider); + + assertEquals(elements,elementsOf(provider.get())); + } + + + @Test + public void resultsetsWithFailuresMakeValidStreams() throws Exception { + + RuntimeException fault = new RuntimeException(new TestContingency()); + + final List elements = asList(fault,"1",fault,"2","3",fault); + + StreamProvider provider = new StreamProvider() { + + @Override + public Stream get() { + Stream stream = stringsAndFaults(elements); + URI resultset = publishStringsIn(stream).withDefaults(); + return convert(resultset).ofStrings().withDefaults(); + } + }; + + validateWith(provider); + + //exceptions instances will be different + assertEquals(elements.size(),elementsOf(provider.get()).size()); + } + + + + @Test + public void resultsetCannotBeOpened() throws Exception { + + //publish mock stream + URI resultset = URI.create("tcp://malformed"); + + //stream resultset + Stream stream =convert(resultset).ofStrings().withDefaults(); + + try { + stream.next(); + fail(); + } + catch(StreamOpenException e) {} + + } +} diff --git a/src/test/java/org/gcube/data/streams/TestContingency.java b/src/test/java/org/gcube/data/streams/TestContingency.java new file mode 100644 index 0000000..d2112d0 --- /dev/null +++ b/src/test/java/org/gcube/data/streams/TestContingency.java @@ -0,0 +1,8 @@ +package org.gcube.data.streams; + +import org.gcube.data.streams.exceptions.StreamContingency; + +@StreamContingency +public class TestContingency extends Exception { + private static final long serialVersionUID = 1L; +} diff --git a/src/test/java/org/gcube/data/streams/TestUtils.java b/src/test/java/org/gcube/data/streams/TestUtils.java new file mode 100644 index 0000000..d80cd8d --- /dev/null +++ b/src/test/java/org/gcube/data/streams/TestUtils.java @@ -0,0 +1,35 @@ +package org.gcube.data.streams; + +import java.util.List; + +import org.gcube.data.streams.dsl.Streams; +import org.gcube.data.streams.exceptions.StreamException; +import org.gcube.data.streams.exceptions.StreamSkipSignal; +import org.gcube.data.streams.exceptions.StreamStopSignal; + + +public class TestUtils { + + + static RuntimeException fault1 = new RuntimeException(); + static RuntimeException fault2 = new RuntimeException(); + static RuntimeException fault3 = new RuntimeException(); + static StreamSkipSignal skip = new StreamSkipSignal(); + static StreamStopSignal stop = new StreamStopSignal(); + static StreamException contingency1 = new StreamException(new TestContingency()); + static StreamException contingency2 = new StreamException(new TestContingency()); + static StreamException contingency3 = new StreamException(new TestContingency()); + + + static Stream stringsAndFaults(Object ... elements) { + return Streams.convertWithFaults(String.class, elements); + } + + static Stream stringsAndFaults(List elements) { + return Streams.convertWithFaults(String.class, elements); + } + + + + +} diff --git a/src/test/java/org/gcube/data/streams/UnfoldedStreamTest.java b/src/test/java/org/gcube/data/streams/UnfoldedStreamTest.java new file mode 100644 index 0000000..733e95d --- /dev/null +++ b/src/test/java/org/gcube/data/streams/UnfoldedStreamTest.java @@ -0,0 +1,84 @@ +package org.gcube.data.streams; + +import static java.util.Arrays.*; +import static junit.framework.Assert.*; +import static org.gcube.data.streams.TestUtils.*; +import static org.gcube.data.streams.dsl.Streams.*; +import static org.gcube.data.streams.test.Utils.*; + +import java.util.List; + +import org.gcube.data.streams.dsl.Streams; +import org.gcube.data.streams.generators.Generator; +import org.gcube.data.streams.test.StreamProvider; +import org.junit.Test; + +public class UnfoldedStreamTest { + + static Generator, Stream> streamer = new Generator, Stream>() { + @Override + public Stream yield(List element) { + return convert(element); + } + }; + + @Test + public void consume() throws Exception { + + @SuppressWarnings("unchecked") + final List> data = asList(asList("1","2","3"),asList("1","2","3")); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream> stream = convert(data); + return unfold(stream).through(streamer); + } + }; + + List unfoldedData = asList("1","2","3","1","2","3"); + + assertEquals(unfoldedData,elementsOf(provider.get())); + + } + + @Test + public void handleFailures() { + + @SuppressWarnings("unchecked") + final List data = asList(asList(fault1,"1",fault2,"2","3",fault3), + asList("1",fault1,"2",fault2,"3")); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + + @SuppressWarnings("all") + Stream> stream = (Stream) Streams.convertWithFaults(List.class,data); + return unfold(stream).through(streamer); + } + }; + + List unfoldedFailingData = asList(fault1,"1",fault2,"2","3",fault3,"1",fault1,"2",fault2,"3"); + + assertEquals(unfoldedFailingData,elementsOf(provider.get())); + } + + @Test + public void ignoreFailures() { + + @SuppressWarnings("unchecked") + final List> data = asList(asList("1","2","3"),asList("1","2","3")); + + StreamProvider provider = new StreamProvider() { + public Stream get() { + Stream> stream = convert(data); + Stream unfolded = unfold(stream).through(streamer); + return guard(unfolded).with(IGNORE_POLICY); + } + }; + + List unfoldedData = asList("1","2","3","1","2","3"); + + assertEquals(unfoldedData,elementsOf(provider.get())); + } + +}