merge from TRUNK

git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/branches/common/common-events/1.0@83912 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
fabio.simeoni 2013-10-24 09:13:14 +00:00
parent 321dc051ad
commit c5896c052b
2 changed files with 36 additions and 6 deletions

View File

@ -29,7 +29,7 @@ public @interface Observes {
* The kind of the observer, {@link #critical}, {@link #safe}, or {@link #resilient}.
*
*/
static enum Kind {
public static enum Kind {
critical, safe, resilient
}

View File

@ -31,6 +31,8 @@ public class DefaultHub implements Hub {
private final ExecutorService service;
private volatile boolean terminated=false;
public DefaultHub() {
this.service= Executors.newCachedThreadPool();
}
@ -48,6 +50,9 @@ public class DefaultHub implements Hub {
@Override
public synchronized void subscribe(Object object) {
if (isTerminated())
return;
notNull("observer", object);
for (Observer observer : Observer.observersFor(object,service))
@ -57,6 +62,9 @@ public class DefaultHub implements Hub {
@Override
public synchronized boolean unsubscribe(Object observer) {
if (isTerminated())
return false;
notNull("observer", observer);
for (Key key : subscriptions.keySet())
@ -70,6 +78,9 @@ public class DefaultHub implements Hub {
@Override
public synchronized void fire(Object event, String... qualifiers) {
if (isTerminated())
return;
notNull("event", event);
notNull("qualifiers", qualifiers);
@ -85,12 +96,19 @@ public class DefaultHub implements Hub {
@Override
public void waitFor(final Class<?> eventType) {
if (isTerminated())
return;
waitFor(eventType,0);
}
@Override
public void waitFor(Class<?> eventType, long duration, TimeUnit unit) {
if (isTerminated())
return;
notNull("time unit", unit);
if (duration<=0)
@ -103,9 +121,18 @@ public class DefaultHub implements Hub {
@Override
public void stop() {
if (isTerminated())
return;
try {
//give a margin to let 'concurrent' events to be delivered
Thread.sleep(200);
terminated=true;
service.shutdown();
service.awaitTermination(3000, TimeUnit.MILLISECONDS);
service.awaitTermination(1000, TimeUnit.MILLISECONDS);
}
catch(InterruptedException e) {
log.warn("cannot shutdown this hub",e);
@ -170,10 +197,7 @@ public class DefaultHub implements Hub {
//execute asynchronous in parallel
for (Runnable observer : asynchronous)
if (service.isTerminated())
log.trace("discarding event of type "+event.getClass()+" as the hub has shut down");
else
service.submit(observer);
service.submit(observer);
}
@ -244,4 +268,10 @@ public class DefaultHub implements Hub {
}
private boolean isTerminated() {
if (terminated)
log.trace("hub is terminated, operation request aborted");
return terminated;
}
}