branch for 1.x releases

git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/branches/common/common-events/1.0@81235 82a268e6-3cf1-43bd-a215-b396298e98cf
master
fabio.simeoni 11 years ago
commit 541cd4f59e

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>common-events</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

@ -0,0 +1,7 @@
#Tue Jun 18 10:36:18 CEST 2013
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding//src/test/resources=UTF-8
encoding/<project>=UTF-8

@ -0,0 +1,6 @@
#Tue Jun 18 10:36:18 CEST 2013
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.source=1.6

@ -0,0 +1,5 @@
#Fri Jun 14 15:34:28 CEST 2013
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1

@ -0,0 +1,6 @@
gCube System - License
------------------------------------------------------------
The gCube/gCore software is licensed as Free Open Source software conveying to the EUPL (http://ec.europa.eu/idabc/eupl).
The software and documentation is provided by its authors/distributors "as is" and no expressed or
implied warranty is given for its use, quality or fitness for a particular case.

@ -0,0 +1 @@
* Fabio Simeoni (fabio.simeoni@fao.org), FAO of the UN, Italy

@ -0,0 +1,37 @@
The gCube System - ${name}
----------------------
This work has been partially supported by the following European projects: DILIGENT (FP6-2003-IST-2), D4Science (FP7-INFRA-2007-1.2.2),
D4Science-II (FP7-INFRA-2008-1.2.2), iMarine (FP7-INFRASTRUCTURES-2011-2), and EUBrazilOpenBio (FP7-ICT-2011-EU-Brazil).
Authors
-------
* Fabio Simeoni (fabio.simeoni@fao.org), FAO of the UN, Italy.
Version and Release Date
------------------------
${version}
Description
-----------
${description}
Download information
--------------------
Source code is available from SVN:
${scm.url}
Binaries can be downloaded from:
Documentation
-------------
Documentation is available on-line from the Projects Documentation Wiki:
https://gcube.wiki.gcube-system.org/gcube/index.php
Licensing
---------
This software is licensed under the terms you may find in the file named "LICENSE" in this directory.

@ -0,0 +1,5 @@
<ReleaseNotes>
<Changeset component="${build.finalName}" date="2013-1-27">
<Change>First Release</Change>
</Changeset>
</ReleaseNotes>

@ -0,0 +1,42 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>servicearchive</id>
<formats>
<format>tar.gz</format>
</formats>
<baseDirectory>/</baseDirectory>
<fileSets>
<fileSet>
<directory>${distroDirectory}</directory>
<outputDirectory>/</outputDirectory>
<useDefaultExcludes>true</useDefaultExcludes>
<includes>
<include>README</include>
<include>LICENSE</include>
<include>INSTALL</include>
<include>MAINTAINERS</include>
<include>changelog.xml</include>
</includes>
<fileMode>755</fileMode>
<filtered>true</filtered>
</fileSet>
</fileSets>
<files>
<file>
<source>${distroDirectory}/profile.xml</source>
<outputDirectory>/</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>target/${build.finalName}.jar</source>
<outputDirectory>/${artifactId}</outputDirectory>
</file>
<file>
<source>${distroDirectory}/svnpath.txt</source>
<outputDirectory>/${artifactId}</outputDirectory>
<filtered>true</filtered>
</file>
</files>
</assembly>

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<Resource xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<ID />
<Type>Service</Type>
<Profile>
<Description>${description}</Description>
<Class>Common</Class>
<Name>${artifactId}</Name>
<Version>1.0.0</Version>
<Packages>
<Software>
<Name>${artifactId}</Name>
<Version>${version}</Version>
<MavenCoordinates>
<groupId>${groupId}</groupId>
<artifactId>${artifactId}</artifactId>
<version>${version}</version>
</MavenCoordinates>
<Files>
<File>${build.finalName}.jar</File>
</Files>
</Software>
</Packages>
</Profile>
</Resource>

@ -0,0 +1 @@
${scm.url}

@ -0,0 +1,108 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.gcube.core</groupId>
<artifactId>common-events</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>common-events</name>
<description>A lightweight, zero-dependency library for application-level eventing in arbitrary environments.</description>
<parent>
<groupId>org.gcube.tools</groupId>
<artifactId>maven-parent</artifactId>
<version>1.0.0</version>
</parent>
<properties>
<distroDirectory>distro</distroDirectory>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.googlecode.jeeunit</groupId>
<artifactId>jeeunit-weld-se</artifactId>
<version>1.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.5</version>
<executions>
<execution>
<id>copy-profile</id>
<phase>install</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>target</outputDirectory>
<resources>
<resource>
<directory>${distroDirectory}</directory>
<filtering>true</filtering>
<includes>
<include>profile.xml</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>${distroDirectory}/descriptor.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>servicearchive</id>
<phase>install</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,226 @@
package org.gcube.common.events;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.gcube.common.events.Observes.Kind;
/**
* Mediates between producers and observers of events.
* <p>
* Producers fire events through the hub ((cf. {@link #fire(Object, String...)})), which routes them to all the
* observers that have previously subscribed to receive them ({@link #subscribe(Object)})).
* <p>
* <b>Producers, Observers, and Events</b>
* <p>
* Producers, observers, and events are arbitrary objects. Observers are objects with one or more methods marked with
* the {@link Observes} annotation and taking events as their only parameter. {@link Observes} methods can have any name
* and access modifier, and may throw any kind of exception. Any object value may serve as an event. The following
* example illustrates:
* <P>
*
* <pre>
* {@code
* class Observer {
*
* {@literal @}Observes
* void someMethod(int event) {...}
*
* }
*
* Hub hub = ....;
*
* hub.subscribe(new Observer());
*
* hub.fire(10);
*
* }
* </pre>
*
* In general, events and {@link Observes} methods <em>match</em> events when the type of events is a subtype of the
* type of the single parameter of the observer methods. Normal java subtyping rule apply, with the following
* exceptions:
* <p>
*
* <ul>
* <li>observers <em>cannot</em> use primitive types and should use wrapper types instead.
* <li>parametric types are not directly supported, as Java does not make available type parameters at runtime. Possible solutions are discussed below.
* </ul>
*
* <b>Qualifiers</b>
* <p>
* The type matching algorithm can be refined by qualifying events and event types with one ore more labels that
* observers declare in {@link Observes} annotations and produce specify when firing events. An example of using
* <em>qualifiers</em> is the following:
*
* <pre>
* {@code
* class Observer {
*
* {@literal @}Observes({"currency","dollars"})
* void onDollarPayment(Integer amount) {...}
*
* {@literal @}Observes({"currency","euro"})
* void onEuroPayment(Integer amount) {...}
*
* {@literal @}Observes({"currency"})
* void onAnyPayment(Integer amount) {...}
* }
*
* Hub hub = ....;
*
* hub.subscribe(new Observer());
*
* hub.fire(10, "currency", "dollars");
*
* }
* </code>
* </pre>
*
* Here the methods <code>onDollarPayment()</code> and <code>onAnyPayment()</code> receive the event, while the method
* <code>onEuroPayment()</code> does not. In general, {@link Observes} methods are notified if they specify a subset of
* the qualifiers associated with events, including no qualifiers at all.
*
* <p>
* <b>Event Grouping</b>
* <p>
* Observers that perform costly operations may wish to process rapid bursts of events at once. {@link Observes} methods
* may then specify the minimal delay in milliseconds between two successive notifications {cf. @link Observes#every()}.
* All the events produced within this delay are grouped and delivered in a collection when the delay expires and in the
* order in which they were produced. Observers process the collections as they see fit (e.g. consume the most recent,
* merge all the events, aggregate data in the events, etc). For example:
*
* <pre>
* {@code
*
* {@literal @}Observes(value="change",every=1000)
* void onPayments(List&lt;Integer> payments) {...}
*
* }
* </code>
* </pre>
*
* Any {@link Collection} type can be used for the delivery of the events.
*
* <p>
* <b>Critical, Safe, and Resilient Consumers</b>
* <p>
* Firing events blocks producers until all matching {@link Observes} methods that are marked {@link Kind#critical} have
* been executed (cf. {@link Observes#kind()}). Critical {@link Observes} methods execute sequentially in an
* unpredictable order, and any exception they raise is reported to producers. Producers do not block instead for the
* execution of {@link Observes} methods that are marked {@link Kind#safe} or {@link Kind#resilient}. Safe and resilient
* {@link Observes} methods execute in parallel and any exception they raise is logged. Finally, the difference between
* {@link Kind#safe} and {@link Kind#resilient} handlers is that the former execute if and only if there are no critical
* failures, while the latter execute in all cases.
*
* *
* <p>
* <b>Parametric Observers and Events</b>
* <p>
*
* Parametric observers and events can be still be used in either one of two ways:
* <p>
*
* <ul>
* <li>Qualifiers can be used to differentiate different instantiations of a parametric type (qualifiers are discussed
* below). Like with any other use of qualifiers, convenience is traded off for compile-time safety.
*
* <li>Events can be wrapped as instances of the <code>Event</code> class, which is provided to capture type information
* which is otherwise lost at runtime due to type erasure. This approach is more verbose but also safer.
*
* Consider the following example of event wrapping:
*
* <pre>
* {@code
* class Observer {
*
* {@literal @}Observes
* void someMethod(MyType<Integer> event) {...}
*
* }
*
* Hub hub = ....;
*
* hub.subscribe(new Observer());
*
* MyType<Integer> event = ...
*
* hub.fire(new Event<MyType<Integer>>(event){});
*
* }
* </pre>
*
* where <code>new Event<MyType>(event){}</code> instantiates an anonymous <code>Event</code> subclass. The idiom is
* hardly palatable, but it does circumvent the limitation of type erasure in Java. Currently, the follow limitations
* apply:
*
* <p>
*
* <ul>
* <li>type variables cannot be used in both events and observers' parameters;
* <li>wildcards are supported, but only with upper bounds (<code>? extends ...</code>). Lower bounds are not currently supported.
* </ul>
*
* @see Observes
* @author Fabio Simeoni
*
*/
public interface Hub {
/**
* Subscribes an observer for notification of events of a given type.
* <p>
* The single parameter of any method of the observer annotated with {@link Observes} identifies the type of the
* events observed by the observer.
*
* @param observer the observer
* @throws IllegalArgumentException if the observer declares no methods annotated with {@link Observes}, or such
* methods do not declare a single parameter
*
* @see Observes
*/
void subscribe(Object observer);
/**
* Unsubscribes an observer.
*
* @param observer the observer
* @return <code>true</code> if the observer is found and unsubscribed.
*/
boolean unsubscribe(Object observer);
/**
* Fires an event to all observers which have subscribed for them, blocking the client until all the critical
* observers have executed.
*
* @param event the event
* @param qualifiers optional event qualifiers
*/
void fire(Object event, String... qualifiers);
/**
* Blocks the caller until an event of a given type occurs.
*
* @param eventType the event type
* @throws RuntimeException if the wait is interrupted
*/
void waitFor(Class<?> eventType);
/**
* Blocks the caller until an event of a given type occurs or a given length of time elapses.
*
* @param eventType the event type
* @param duration the length of time to wait for
* @param unit the time unit of the duration
*
* @throws IllegalArgumentException if the inputs are null or the duration is less or equal than zero
* @throws RuntimeException if the wait is interrupted
*/
void waitFor(Class<?> eventType, long duration, TimeUnit unit);
/**
* Signals that this hub is no longer needed and can release its resources.
*/
void stop();
}

@ -0,0 +1,56 @@
package org.gcube.common.events;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Identifies callback methods in event observers.
* <p>
* Optional attributes are used to indicate:
*
* <li>event qualifiers ({@link Observes#Any} by default);
* <li>whether the successful execution of the method is {@link Kind#critical} to the client. Non critical
* methods may be {@link Kind#resilient} or {@link Kind#safe} (the default), depending on whether they should or should
* not execute after previous critical failures;
* <li>the delay during which events are accumulated before being delivered.
*
* @author Fabio Simeoni
* @see Hub
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Observes {
static final String Any = "any";
/**
* The kind of the observer, {@link #critical}, {@link #safe}, or {@link #resilient}.
*
*/
static enum Kind {
critical, safe, resilient
}
/**
* The event qualifiers.
*
* @return the qualifiers, {@link #Any} by default.
*/
String[] value() default { Any };
/**
* The kind of the observer {@link Kind#safe} by default.
*
* @return the kind
*/
Kind kind() default Kind.safe;
/**
* The minimum duration in milliseconds between the delivery of two subsequent events.
*
* @return the kind
*/
long every() default 0;
}

@ -0,0 +1,247 @@
package org.gcube.common.events.impl;
import static org.gcube.common.events.impl.ReflectionUtils.*;
import static org.gcube.common.events.impl.Utils.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.gcube.common.events.Hub;
import org.gcube.common.events.Observes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default {@link Hub} implementation.
*
* @author Fabio Simeoni
*
*/
public class DefaultHub implements Hub {
private static final Logger log = LoggerFactory.getLogger(Hub.class);
private final Map<Key, List<Observer>> subscriptions = new HashMap<Key, List<Observer>>();
private final ExecutorService service;
public DefaultHub() {
this.service= Executors.newCachedThreadPool();
}
public DefaultHub(ExecutorService service) {
notNull("executor service", service);
this.service= service;
log.info("configured hub with executor service {}",service.getClass().getSimpleName());
}
@Override
public synchronized void subscribe(Object object) {
notNull("observer", object);
for (Observer observer : Observer.observersFor(object,service))
subscribe(observer);
}
@Override
public synchronized boolean unsubscribe(Object observer) {
notNull("observer", observer);
for (Key key : subscriptions.keySet())
if (unsubscribe(observer, key))
return true;
return false;
}
@Override
public synchronized void fire(Object event, String... qualifiers) {
notNull("event", event);
notNull("qualifiers", qualifiers);
List<Observer> observers = new ArrayList<Observer>();
for (Key key : subscriptions.keySet())
if (key.matches(typeOf(event), qualifiers))
observers.addAll(subscriptions.get(key));
notifyObservers(observers,valueOf(event));
}
@Override
public void waitFor(final Class<?> eventType) {
waitFor(eventType,0);
}
@Override
public void waitFor(Class<?> eventType, long duration, TimeUnit unit) {
notNull("time unit", unit);
if (duration<=0)
throw new IllegalArgumentException("invalid duration: 0 ms");
waitFor(eventType,unit.toMillis(duration));
}
@Override
public void stop() {
try {
service.shutdown();
service.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
catch(InterruptedException e) {
log.warn("cannot shutdown this hub",e);
}
}
// helpers
private void notifyObservers(List<Observer> observers, Object event) {
List<Observer> critical = new ArrayList<Observer>();
List<Observer> resilient = new ArrayList<Observer>();
List<Observer> safe = new ArrayList<Observer>();
List<Observer> target = null;
for (Observer observer : observers) {
switch(observer.kind()) {
case critical: target=critical;break;
case resilient: target=resilient;break;
case safe: target=safe;
}
target.add(observer);
}
//execute criticals synchronously and sequentially
for (Observer observer : critical)
try {
observer.onEvent(event);
}
catch (RuntimeException e) {
notifyObserversAsynchronously(resilient, event);
throw e;
}
safe.addAll(resilient);
notifyObserversAsynchronously(safe, event);
}
private void notifyObserversAsynchronously(List<Observer> observers,final Object event) {
List<Runnable> asynchronous = new ArrayList<Runnable>();
for (final Observer observer : observers)
asynchronous.add(new Runnable() {
@Override
public void run() {
observer.onEvent(event);
}
});
//execute asynchronous in parallel
for (Runnable observer : asynchronous)
if (service.isTerminated())
log.trace("discarding event of type "+event.getClass()+" as the hub has shut down");
else
service.submit(observer);
}
private void subscribe(Observer observer) {
Key key = observer.key();
List<Observer> observers = subscriptions.get(key);
if (observers == null) {
observers = new ArrayList<Observer>();
subscriptions.put(key, observers);
}
observers.add(observer);
}
private boolean unsubscribe(Object observer, Key key) {
List<Observer> observers = subscriptions.get(key);
if (observers != null)
for (Observer l : observers)
if (l.equals(observer)) {
observers.remove(l);
return true;
}
return false;
}
private boolean waitFor(final Class<?> eventType, long duration) {
notNull("event type", eventType);
final CountDownLatch latch = new CountDownLatch(1);
Object watcher = new Object() {
@Observes @SuppressWarnings("unused")
void onEvent() {
log.trace("end of watch for event {}",eventType);
latch.countDown();
}
};
subscribe(watcher);
log.info("subscribed watcher for event {}",eventType);
boolean outcome = true;
try {
if (duration==0)
latch.await();
else
outcome= latch.await(duration, TimeUnit.MILLISECONDS);
}
catch(InterruptedException e) {
log.error("watcher for event {} has been interrupted",eventType);
outcome=false;
}
return outcome;
}
}

@ -0,0 +1,30 @@
package org.gcube.common.events.impl;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
/**
* Event wrapper that captures the parametric type of events.
*
* @author Fabio Simeoni
*
* @param <T> the type of the event
*/
public abstract class Event<T> {
private final T event;
private final Type type;
public Event(T event) {
this.event=event;
this.type = ParameterizedType.class.cast(this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
}
Type type() {
return type;
}
T event() {
return event;
}
}

@ -0,0 +1,42 @@
package org.gcube.common.events.impl;
import static java.util.Arrays.*;
import static org.gcube.common.events.Observes.*;
import static org.gcube.common.events.impl.TypeChecker.*;
import java.lang.reflect.Type;
import java.util.HashSet;
import java.util.Set;
/**
* Used internally to match events with observers.
*
* @author Fabio Simeoni
*
*/
class Key {
private final Type type;
private Set<String> qualifiers;
public Key(Type type,Set<String> qualifiers) {
this.type=type;
this.qualifiers=qualifiers;
}
public Type type() {
return type;
}
public Set<String> qualifiers() {
return qualifiers;
}
public boolean matches(Type eventType,String ... qualifiers) {
return matchTypes(type,eventType) &&
(qualifiers().contains(Any) ||
qualifiers().containsAll(new HashSet<String>(asList(qualifiers))));
}
}

@ -0,0 +1,204 @@
package org.gcube.common.events.impl;
import static java.util.Arrays.*;
import static org.gcube.common.events.impl.ReflectionUtils.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.gcube.common.events.Hub;
import org.gcube.common.events.Observes;
import org.gcube.common.events.Observes.Kind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Used internally to wrap arbitrary objects subscribed as observers.
*
* @author Fabio Simeoni
*
*/
public class Observer {
private static final Logger log = LoggerFactory.getLogger(Hub.class);
final Object object;
final Method method;
final Kind kind;
final Key key;
private ExecutorService service;
private Future<?> future;
private long delay;
private List<Object> accumulated = new ArrayList<Object>();
public static List<Observer> observersFor(Object object, ExecutorService service) {
List<Observer> observers = new ArrayList<Observer>();
List<Method> methods = observerMethodsOf(object);
if (methods.isEmpty())
throw new IllegalArgumentException(object
+ " is not an observer, none of its methods is annotated with @Observes");
for (Method method : methods)
observers.add(new Observer(object, method, service));
return observers;
}
Observer(Object object, Method method, ExecutorService service) {
method.setAccessible(true);
this.object = object;
this.method = method;
this.service = service;
Observes.Kind kind = method.getAnnotation(Observes.class).kind();
this.kind = kind;
Set<String> qualifiers = new HashSet<String>(asList(method.getAnnotation(Observes.class).value()));
delay = method.getAnnotation(Observes.class).every();
Type paramType = method.getGenericParameterTypes()[0];
if (delay>0)
paramType=elementTypeOf(paramType);
key = new Key(paramType, qualifiers);
}
public Kind kind() {
return kind;
}
public void onEvent(Object event) {
if (delay > 0)
onEventDelayed(event);
else
onEventImmediate(event);
}
public void onEventImmediate(final Object event) {
try {
method.invoke(object, event);
} catch (InvocationTargetException e) {
rethrow(event, e.getCause());
} catch (Exception e) {
rethrow(event, e);
}
}
public synchronized void onEventDelayed(final Object event) {
accumulated.add(event);
if (future == null || future.isDone())
future = service.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// pass a copy observers can freely manipulate
List<Object> listEvent = new ArrayList<Object>(accumulated);
accumulated.clear();
onEventImmediate(listEvent);
}
});
}
private void rethrow(Object event, Throwable t) {
String msg = "observer " + object + " failed to process event " + event + " with qualifiers "
+ key.qualifiers();
switch (kind) {
case critical:
if (t instanceof RuntimeException)
throw RuntimeException.class.cast(t);
else
throw new RuntimeException(msg, t);
default:
log.error(msg, t);
}
}
public Key key() {
return key;
}
@Override
public int hashCode() {
return object.hashCode();
}
@Override
public boolean equals(Object obj) {
return object.equals(obj);
}
// helper
public static List<Method> observerMethodsOf(Object o) {
List<Method> methods = new ArrayList<Method>();
for (Method method : o.getClass().getDeclaredMethods())
if (method.isAnnotationPresent(Observes.class)) {
Type[] params = method.getGenericParameterTypes();
if (params.length != 1)
throw new IllegalArgumentException("observer method " + method
+ " does not take a single parameter");
if (containsVariable(params[0]))
throw new IllegalArgumentException("observer method " + method
+ " uses type variables, which inhibit event delivery");
long delay = method.getAnnotation(Observes.class).every();
//validate coalescing consumers
if (delay > 0 && !isCollectionType(params[0]))
throw new IllegalArgumentException(
"observer method "
+ method
+ " expects multiple events at once but its parameter cannot be assigned to java.util.Collection parameter");
methods.add(method);
}
return methods;
}
}

@ -0,0 +1,78 @@
package org.gcube.common.events.impl;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.lang.reflect.WildcardType;
import java.util.Collection;
/**
* Library-wide reflection utilities.
*
* @author Fabio Simeoni
*
*/
public class ReflectionUtils {
public static Type typeOf(Object event) {
return event instanceof Event? Event.class.cast(event).type():event.getClass();
}
public static Object valueOf(Object event) {
return event instanceof Event? Event.class.cast(event).event():event;
}
public static boolean isCollectionType(Type t) {
if (t instanceof Class)
return Collection.class.isAssignableFrom(Class.class.cast(t));
if (t instanceof ParameterizedType)
return isCollectionType(ParameterizedType.class.cast(t).getRawType());
return false;
}
public static Type elementTypeOf(Type t) {
if (!(t instanceof ParameterizedType))
throw new RuntimeException("invoked with non-collection type, should have been previous ruled out");
t = ParameterizedType.class.cast(t).getActualTypeArguments()[0];
if (t instanceof Class)
return Class.class.cast(t);
if (t instanceof WildcardType)
return WildcardType.class.cast(t).getUpperBounds()[0];
throw new RuntimeException("invoked with type variable, should have been previous ruled out");
}
public static boolean containsVariable(Type t) {
if (t instanceof TypeVariable<?>)
return true;
if (t instanceof ParameterizedType)
for (Type arg : ParameterizedType.class.cast(t).getActualTypeArguments())
if (containsVariable(arg))
return true;
if (t instanceof WildcardType)
return containsVariable(WildcardType.class.cast(t).getUpperBounds()[0])
&& containsVariable(WildcardType.class.cast(t).getLowerBounds()[0]);
return false;
}
}

@ -0,0 +1,100 @@
package org.gcube.common.events.impl;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.WildcardType;
/**
*
* Used internally to perform subtype checks at runtime.
*
* @author Fabio Simeoni
*
*/
class TypeChecker {
public static boolean matchTypes(Type paramType,Type eventType) {
print(paramType,eventType);
if (paramType instanceof Class)
return matchClass(Class.class.cast(paramType),eventType);
if (paramType instanceof WildcardType)
return matchWildcardType(WildcardType.class.cast(paramType),eventType);
if (paramType instanceof ParameterizedType)
return matchParameterizedType(ParameterizedType.class.cast(paramType),eventType);
return false;
}
public static boolean matchWildcardType(WildcardType paramType, Type eventType) {
//compare upper bounds
if (eventType instanceof WildcardType)
return matchTypes(paramType.getUpperBounds()[0], WildcardType.class.cast(eventType).getUpperBounds()[0]);
//compare upper bound with event type
return matchTypes(paramType.getUpperBounds()[0],eventType);
}
public static boolean matchParameterizedType(ParameterizedType paramType,Type eventType) {
//compare type parameters
if (eventType instanceof ParameterizedType)
return matchParameterizedTypes(paramType,ParameterizedType.class.cast(eventType));
//fails if event is raw or non-parametric
return false;
}
public static boolean matchParameterizedTypes(ParameterizedType paramType,ParameterizedType eventType) {
return matchTypes(paramType.getRawType(),eventType.getRawType())
&& matchArguments(paramType.getActualTypeArguments(), eventType.getActualTypeArguments());
}
public static boolean matchArguments(Type[] paramArgumentTypes, Type[] eventArgumentTypes) {
//no need to check lengths as we know raw type is the same
for (int i=0; i<paramArgumentTypes.length;i++)
if (!matchTypes(paramArgumentTypes[i],eventArgumentTypes[i]))
return false;
return true;
}
public static boolean matchClass(Class<?> paramType,Type eventType) {
if (eventType instanceof Class)
return matchClasses(paramType,Class.class.cast(eventType));
if (eventType instanceof WildcardType)
return false;
return matchClass(paramType,ParameterizedType.class.cast(eventType));
}
public static boolean matchClass(Class<?> paramType,ParameterizedType eventType) {
print(paramType,eventType);
//compare with raw type: equivalent to performing a cast
return matchClass(paramType,ParameterizedType.class.cast(eventType).getRawType());
}
public static boolean matchClasses(Class<?> paramType,Class<?> eventType) {
return paramType.isAssignableFrom(eventType);
}
public static void print(Type t1,Type t2) {
//System.out.println(t1+" >? "+t2);
}
}

@ -0,0 +1,47 @@
package org.gcube.common.events.impl;
import java.util.Arrays;
import java.util.Collection;
/**
* Library-wide utilities.
*
* @author Fabio Simeoni
*
*/
public class Utils {
public static void notNull(Object o) throws IllegalArgumentException {
notNull("argument",o);
}
public static void notNull(String name, Object o) throws IllegalArgumentException {
if (o==null)
throw new IllegalArgumentException(name+" is null");
}
public static void notEmpty(String name, String o) throws IllegalArgumentException {
if (o.isEmpty())
throw new IllegalArgumentException(name+" is empty");
}
public static void valid(String name, String o) throws IllegalArgumentException {
notNull(name, o);
notEmpty(name,o);
}
public static void valid(String name, Collection<String> o) throws IllegalArgumentException {
for (String s : o) {
notNull(name+"'s element", s);
notEmpty(name+"'s element",s);
}
}
public static void valid(String name, String[] o) throws IllegalArgumentException {
valid(name,Arrays.asList(o));
}
}

@ -0,0 +1,36 @@
package org.gcube.common.events.utils;
import org.gcube.common.events.Hub;
import org.gcube.common.events.impl.DefaultHub;
/**
* Convenience factory to share {@link Hub} instances.
* Encouraged only in environments where better means of sharing like dependency injection are not
* available.
*
* @author Fabio Simeoni
*
*/
public class HubFactory {
private static Hub hub = new DefaultHub();
/**
* Returns the shared {@link Hub} instance.
* @return the instance
*/
public static Hub hub() {
return hub;
}
/**
* Sets the shared {@link Hub} instance, overriding the default one.
* <p>
* Typically used in testing to configure a test-driving instance.
*
* @param hub the instance
*/
public static void configure(Hub hub) {
HubFactory.hub=hub;
}
}

@ -0,0 +1,52 @@
package test;
import static org.junit.Assert.*;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.Executors;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import javax.inject.Qualifier;
import org.gcube.common.events.Hub;
import org.gcube.common.events.impl.DefaultHub;
import org.junit.Test;
import org.junit.runner.RunWith;
import com.googlecode.jeeunit.JeeunitRunner;
@RunWith(JeeunitRunner.class)
public class CdiIntegrationTest {
@Qualifier
@Target({ElementType.FIELD,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Configured {}
@Inject
Hub hub;
@Inject @Configured
Hub configuredHub;
@Test
public void hubIsInjected() throws Exception {
//assertNotNull(hub);
assertNotNull(configuredHub);
}
@Produces @Configured
public static Hub configuredHub() {
return new DefaultHub(Executors.newSingleThreadExecutor());
}
}

@ -0,0 +1,524 @@
package test;
import static org.gcube.common.events.Observes.Kind.*;
import static org.junit.Assert.*;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.gcube.common.events.Hub;
import org.gcube.common.events.Observes;
import org.gcube.common.events.impl.DefaultHub;
import org.gcube.common.events.impl.Event;
import org.gcube.common.events.impl.Observer;
import org.junit.Test;
@SuppressWarnings("unused")
public class HubTest {
@Test
public void observersAreReflectivelyProcessed() throws Exception {
Object o = new Object() {
@Observes
public void shouldBeIncluded(String event) {
}
public void shouldNotBeIncluded(String event) {
}
};
List<Method> methods = Observer.observerMethodsOf(o);
assertTrue(methods.contains(o.getClass().getMethod("shouldBeIncluded", String.class)));
assertFalse(methods.contains(o.getClass().getMethod("shouldNotBeIncluded", String.class)));
}
@Test
public void invalidObserversAreDetected() throws Exception {
Object o = new Object() {
@Observes
public void tooManyParams(String event, int another) {}
};
try {
Observer.observerMethodsOf(o);
fail();
}
catch(IllegalArgumentException e) {}
o = new Object() {
@Observes
public void tooFewParams() {}
};
try {
Observer.observerMethodsOf(o);
fail();
}
catch(IllegalArgumentException e) {}
o = new Object() {
@Observes(every=1000)
public void notACollectionParam(String s) {}
};
try {
Observer.observerMethodsOf(o);
fail();
}
catch(IllegalArgumentException e) {}
o = new Object() {
@Observes(every=1000)
public <T> void canNeverBeInvoked(T s) {}
};
try {
Observer.observerMethodsOf(o);
fail();
}
catch(IllegalArgumentException e) {}
}
@Test
public void subscribedObserversAreUnsubscribed() throws Exception {
class A {
@Observes
public void m(String event) {}
}
A o = new A();
Hub hub = new DefaultHub();
hub.subscribe(o);
assertTrue(hub.unsubscribe(o));
}
@Test
public void observersAreNotified() throws Exception {
final CountDownLatch latch = new CountDownLatch(2);
Object o = new Object() {
@Observes
public void shouldBeInvoked(String event) {
assertEquals("event",event);
latch.countDown();
}
@Observes
public void shouldBeInvokedToo(String event) {
assertEquals("event",event);
latch.countDown();
}
@Observes
public void shouldntBeInvoked(int event) {
fail();
}
};
Hub hub = new DefaultHub();
hub.subscribe(o);
hub.fire("event");
assertTrue(latch.await(50,TimeUnit.MILLISECONDS));
}
@Test
public void parametricObserversAreNotified() throws Exception {
final Object wrappedButUndeliveredBecauseDifferentSpecialisation = new Event<List<Integer>>(Arrays.asList(1)){};
final List<String> undelivered = Arrays.asList("undelivered");
final List<String> delivered = Arrays.asList("delivered");
//negative cases
final Object undeliveredBecauseUnwrapped = undelivered;
final Object wrappedButUndeliveredBecauseDeclaredTooSpecific = new Event<List<? extends String>>(undelivered){};
final Object undeliveredBecauseTooGeneric = new Event<List<?>>(undelivered){};
@SuppressWarnings("rawtypes")
final Object undeliveredBecauseRaw = new Event<List>(undelivered){};
//positive case
final Object deliveredBecauseWrappedAndPerfectMatch = new Event<List<String>>(delivered){};
final CountDownLatch latch = new CountDownLatch(1);
Object o = new Object() {
@Observes
public void observe1(List<String> e) {
assertEquals(delivered,e);
latch.countDown();
}
};
Hub hub = new DefaultHub();
hub.subscribe(o);
hub.fire(undeliveredBecauseUnwrapped);
hub.fire(wrappedButUndeliveredBecauseDifferentSpecialisation);
hub.fire(undeliveredBecauseTooGeneric);
hub.fire(wrappedButUndeliveredBecauseDeclaredTooSpecific);
hub.fire(undeliveredBecauseRaw);
hub.fire(deliveredBecauseWrappedAndPerfectMatch);
assertTrue(latch.await(50,TimeUnit.MILLISECONDS));
}
@Test
public void wildcardObserversAreNotified() throws Exception {
final Object wrappedButUndeliveredBecauseDifferentSpecialisation = new Event<List<Integer>>(Arrays.asList(1)){};
final List<String> undelivered = Arrays.asList("undelivered");
final List<String> delivered = Arrays.asList("delivered");
final Object undeliveredBecauseUnwrapped = undelivered;
final Object undeliveredBecauseTooGeneric = new Event<List<?>>(undelivered){};
@SuppressWarnings("rawtypes")
final Object undeliveredBecauseRaw = new Event<List>(undelivered){};
final Object deliveredBecauseWrappedAndUpperBound = new Event<List<String>>(delivered){};
final Object deliveredBecauseWrappedAndPerfectMatch = new Event<List<? extends String>>(delivered){};
final CountDownLatch latch = new CountDownLatch(2);
Object o = new Object() {
@Observes
public void observe1(List<? extends String> e) {
assertEquals(delivered,e);
latch.countDown();
}
};
Hub hub = new DefaultHub();
hub.subscribe(o);
hub.fire(undeliveredBecauseUnwrapped);
hub.fire(wrappedButUndeliveredBecauseDifferentSpecialisation);
hub.fire(undeliveredBecauseTooGeneric);
hub.fire(undeliveredBecauseRaw);
hub.fire(deliveredBecauseWrappedAndPerfectMatch);
hub.fire(deliveredBecauseWrappedAndUpperBound);
assertTrue(latch.await(50,TimeUnit.MILLISECONDS));
}
@Test
public void rawObserversAreNotified() throws Exception {
final Object deliveredEvenIfUnwrapped = Arrays.asList(1);
final Object deliveredRegardlessOfSpecialisation = new Event<List<Integer>>(Arrays.asList(1)){};
final CountDownLatch latch = new CountDownLatch(2);
Object o = new Object() {
@Observes @SuppressWarnings("rawtypes")
public void observe1(List e) {
latch.countDown();
}
};
Hub hub = new DefaultHub();
hub.subscribe(o);
hub.fire(deliveredEvenIfUnwrapped);
hub.fire(deliveredRegardlessOfSpecialisation);
assertTrue(latch.await(50,TimeUnit.MILLISECONDS));
}
@Test
public void notificationsAreBasedOnQualifiers() throws Exception {
final CountDownLatch latch = new CountDownLatch(5);
Object o = new Object() {
@Observes("1")
public void one(String event){
latch.countDown();
}
@Observes("2")
public void two(String event) {
latch.countDown();
}
@Observes
public void either(String event) {
latch.countDown();
}
};
Hub hub = new DefaultHub();
hub.subscribe(o);
hub.fire("event","1");
hub.fire("event","2");
hub.fire("event");
assertTrue(latch.await(10,TimeUnit.MILLISECONDS));
}
@Test
public void failuresFollowPolicy() {
final Thread currentThread = Thread.currentThread();
final RuntimeException rt = new RuntimeException();
Object o = new Object() {
@Observes
public void one(String event){
throw rt;
}
@Observes(kind=critical)
public void three(boolean event) {
throw rt;
}
@Observes(kind=critical)
public void four(double event) throws Exception {
throw new Exception(rt);
}
};
Hub hub = new DefaultHub();
hub.subscribe(o);
hub.fire("unlucky");
hub.fire(0);
try {
hub.fire(false);
}
catch(RuntimeException e) {
assertEquals(rt,e);
}
try {
hub.fire(0.0);
}
catch(RuntimeException e) {
assertEquals(rt,e.getCause());
}
}
@Test
public void onlyCriticalObserverRunSynchronously() {
final Thread currentThread = Thread.currentThread();
Object o = new Object() {
@Observes
public void one(String event){
assertNotSame(currentThread,Thread.currentThread());
}
@Observes(kind=critical)
public void two(String event) {
assertSame(currentThread,Thread.currentThread());
}
};
Hub hub = new DefaultHub();
hub.subscribe(o);
hub.fire("event");
}
@Test
public void onlyResilientObserversRunIfACriticalOneFails() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final RuntimeException rt = new RuntimeException();
Object o = new Object() {
@Observes
public void safe(String event){
fail();
}
@Observes(kind=resilient)
public void resilient(String event){
latch.countDown(); //if we get here the test fails
}
@Observes(kind=critical)
public void critical(String event) throws Exception {
Thread.sleep(50);
throw rt;
}
};
Hub hub = new DefaultHub();
hub.subscribe(o);
try {
hub.fire("event");
fail();
}
catch(Exception e) {
assertSame(rt,e);
}
latch.await(100,TimeUnit.MILLISECONDS);
}
@Test
public void nonCriticalDoNotBlockProducer() throws Exception {
final CountDownLatch latch = new CountDownLatch(2);
Object o = new Object() {
@Observes
public void one(String event) throws Exception {
Thread.sleep(100);
latch.countDown();
}
@Observes
public void two(String event) throws Exception {
Thread.sleep(100);
latch.countDown();
}
};
Hub hub = new DefaultHub();
hub.subscribe(o);
hub.fire("event");
assertTrue(latch.await(500,TimeUnit.MILLISECONDS));
}
@Test
public void eventsAreCoalesced() throws Exception {
Hub hub = new DefaultHub();
final AtomicInteger count=new AtomicInteger(0);
hub.subscribe(new Object() {
@Observes(every=100)
void coaleasced(List<Integer> events) {
count.incrementAndGet();
}
});
for (int i=0; i<100; i++) {
hub.fire(i);
Thread.sleep(10);
}
assertEquals(10,count.get());
}
@Test
public void eventsAreCoalescedUnderWildCards() throws Exception {
Hub hub = new DefaultHub();
final AtomicInteger count=new AtomicInteger(0);
hub.subscribe(new Object() {
@Observes(every=100)
void coaleasced(List<? extends Integer> events) {
count.incrementAndGet();
}
});
for (int i=0; i<100; i++) {
hub.fire(i);
Thread.sleep(10);
}
assertEquals(10,count.get());
}
}
Loading…
Cancel
Save