From c5896c052b2752e147e164d821c77004fbd99516 Mon Sep 17 00:00:00 2001 From: "fabio.simeoni" Date: Thu, 24 Oct 2013 09:13:14 +0000 Subject: [PATCH] merge from TRUNK git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/branches/common/common-events/1.0@83912 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../org/gcube/common/events/Observes.java | 2 +- .../gcube/common/events/impl/DefaultHub.java | 40 ++++++++++++++++--- 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/gcube/common/events/Observes.java b/src/main/java/org/gcube/common/events/Observes.java index 1cfab98..060a770 100644 --- a/src/main/java/org/gcube/common/events/Observes.java +++ b/src/main/java/org/gcube/common/events/Observes.java @@ -29,7 +29,7 @@ public @interface Observes { * The kind of the observer, {@link #critical}, {@link #safe}, or {@link #resilient}. * */ - static enum Kind { + public static enum Kind { critical, safe, resilient } diff --git a/src/main/java/org/gcube/common/events/impl/DefaultHub.java b/src/main/java/org/gcube/common/events/impl/DefaultHub.java index 8947892..ffe6a16 100644 --- a/src/main/java/org/gcube/common/events/impl/DefaultHub.java +++ b/src/main/java/org/gcube/common/events/impl/DefaultHub.java @@ -31,6 +31,8 @@ public class DefaultHub implements Hub { private final ExecutorService service; + private volatile boolean terminated=false; + public DefaultHub() { this.service= Executors.newCachedThreadPool(); } @@ -48,6 +50,9 @@ public class DefaultHub implements Hub { @Override public synchronized void subscribe(Object object) { + if (isTerminated()) + return; + notNull("observer", object); for (Observer observer : Observer.observersFor(object,service)) @@ -57,6 +62,9 @@ public class DefaultHub implements Hub { @Override public synchronized boolean unsubscribe(Object observer) { + if (isTerminated()) + return false; + notNull("observer", observer); for (Key key : subscriptions.keySet()) @@ -70,6 +78,9 @@ public class DefaultHub implements Hub { @Override public synchronized void fire(Object event, String... qualifiers) { + if (isTerminated()) + return; + notNull("event", event); notNull("qualifiers", qualifiers); @@ -85,12 +96,19 @@ public class DefaultHub implements Hub { @Override public void waitFor(final Class eventType) { + + if (isTerminated()) + return; + waitFor(eventType,0); } @Override public void waitFor(Class eventType, long duration, TimeUnit unit) { + if (isTerminated()) + return; + notNull("time unit", unit); if (duration<=0) @@ -103,9 +121,18 @@ public class DefaultHub implements Hub { @Override public void stop() { + if (isTerminated()) + return; + try { + + //give a margin to let 'concurrent' events to be delivered + Thread.sleep(200); + + terminated=true; + service.shutdown(); - service.awaitTermination(3000, TimeUnit.MILLISECONDS); + service.awaitTermination(1000, TimeUnit.MILLISECONDS); } catch(InterruptedException e) { log.warn("cannot shutdown this hub",e); @@ -170,10 +197,7 @@ public class DefaultHub implements Hub { //execute asynchronous in parallel for (Runnable observer : asynchronous) - if (service.isTerminated()) - log.trace("discarding event of type "+event.getClass()+" as the hub has shut down"); - else - service.submit(observer); + service.submit(observer); } @@ -244,4 +268,10 @@ public class DefaultHub implements Hub { } + private boolean isTerminated() { + if (terminated) + log.trace("hub is terminated, operation request aborted"); + return terminated; + } + }