From ec964c0c47671a0d511c91e89bf68743b4096a7c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 31 May 2019 11:45:07 +0200 Subject: [PATCH] imported cnr-notifications-common and cnr-blackboard-common --- dnet-core-components/pom.xml | 4 + .../actions/AbstractSubscriptionAction.java | 71 +++++++ .../enabling/actions/SubscriptionAction.java | 25 +++ .../enabling/tools/AbstractSchedulable.java | 191 ++++++++++++++++++ .../eu/dnetlib/enabling/tools/Enableable.java | 25 +++ .../enabling/tools/EnableableEnumerator.java | 57 ++++++ .../dnetlib/enabling/tools/Schedulable.java | 52 +++++ .../enabling/tools/SchedulableEnumerator.java | 57 ++++++ .../tools/SchedulableProfileUpdater.java | 48 +++++ .../AbstractBlackboardJobListener.java | 62 ++++++ ...AbstractBlackboardNotificationHandler.java | 96 +++++++++ .../tools/blackboard/ActionStatus.java | 26 +++ .../blackboard/BlackboardClientHandler.java | 32 +++ .../BlackboardClientHandlerImpl.java | 155 ++++++++++++++ .../tools/blackboard/BlackboardHandler.java | 20 ++ .../blackboard/BlackboardHandlerImpl.java | 135 +++++++++++++ .../tools/blackboard/BlackboardJob.java | 118 +++++++++++ .../tools/blackboard/BlackboardJobImpl.java | 163 +++++++++++++++ .../blackboard/BlackboardJobListener.java | 18 ++ .../blackboard/BlackboardJobRegistry.java | 19 ++ .../tools/blackboard/BlackboardMessage.java | 82 ++++++++ .../blackboard/BlackboardMessageImpl.java | 142 +++++++++++++ .../BlackboardNotificationHandler.java | 59 ++++++ .../tools/blackboard/BlackboardParameter.java | 36 ++++ .../blackboard/BlackboardParameterImpl.java | 87 ++++++++ .../blackboard/BlackboardServerAction.java | 24 +++ .../BlackboardServerActionExecutor.java | 119 +++++++++++ ...oardServerExecutorNotificationHandler.java | 52 +++++ .../blackboard/BlackboardServerHandler.java | 37 ++++ ...DeletingBlackboardNotificationHandler.java | 70 +++++++ .../tools/blackboard/NotificationHandler.java | 22 ++ .../blackboard/NotificationHandlerChain.java | 14 ++ .../NotificationHandlerChainImpl.java | 104 ++++++++++ .../tools/blackboard/NotificationHistory.java | 37 ++++ .../blackboard/NotificationHistoryImpl.java | 39 ++++ .../tools/blackboard/NotificationInfo.java | 74 +++++++ .../tools/applicationContext-enableable.xml | 12 ++ .../tools/applicationContext-schedulable.xml | 28 +++ .../applicationContext-blackboard-tools.xml | 18 ++ .../NotificationHandlerChainTest.java | 104 ++++++++++ .../iterators/xml/IterableXmlParserTest.java | 2 +- .../NotificationHandlerChainTest-context.xml | 16 ++ pom.xml | 5 + 43 files changed, 2556 insertions(+), 1 deletion(-) create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/actions/AbstractSubscriptionAction.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/actions/SubscriptionAction.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/AbstractSchedulable.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/Enableable.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/EnableableEnumerator.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/Schedulable.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/SchedulableEnumerator.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/SchedulableProfileUpdater.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/AbstractBlackboardJobListener.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/AbstractBlackboardNotificationHandler.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/ActionStatus.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardClientHandler.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardClientHandlerImpl.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardHandler.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardHandlerImpl.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJob.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobImpl.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobListener.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobRegistry.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardMessage.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardMessageImpl.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardNotificationHandler.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardParameter.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardParameterImpl.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerAction.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerActionExecutor.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerExecutorNotificationHandler.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerHandler.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/DeletingBlackboardNotificationHandler.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandler.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChain.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainImpl.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHistory.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHistoryImpl.java create mode 100644 dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationInfo.java create mode 100644 dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/applicationContext-enableable.xml create mode 100644 dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/applicationContext-schedulable.xml create mode 100644 dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/blackboard/applicationContext-blackboard-tools.xml create mode 100644 dnet-core-components/src/test/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainTest.java create mode 100644 dnet-core-components/src/test/resources/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainTest-context.xml diff --git a/dnet-core-components/pom.xml b/dnet-core-components/pom.xml index 50db0a8..08bdb1d 100644 --- a/dnet-core-components/pom.xml +++ b/dnet-core-components/pom.xml @@ -161,6 +161,10 @@ org.springframework spring-tx + + org.springframework + spring-context-support + diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/actions/AbstractSubscriptionAction.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/actions/AbstractSubscriptionAction.java new file mode 100644 index 0000000..7248f1a --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/actions/AbstractSubscriptionAction.java @@ -0,0 +1,71 @@ +package eu.dnetlib.enabling.actions; + +import javax.annotation.PostConstruct; + +import org.springframework.beans.factory.annotation.Required; + +import eu.dnetlib.enabling.actions.SubscriptionAction; +import eu.dnetlib.enabling.tools.Enableable; + +/** + * Common subscription action. + * + * @author marko + * + */ +public abstract class AbstractSubscriptionAction implements SubscriptionAction, Enableable { + + /** + * topic expression. + */ + private String topicExpression; + + /** + * topic prefix. + */ + private String topicPrefix; + + /** + * true if enabled. + */ + private boolean enabled = true; + + /** + * init the default topic prefix. + */ + @PostConstruct + public void init() { + if (topicPrefix == null) + topicPrefix = getTopicExpression().replace("/", "."); + } + + @Override + public String getTopicExpression() { + return topicExpression; + } + + @Required + public void setTopicExpression(final String topicExpression) { + this.topicExpression = topicExpression; + } + + @Override + public String getTopicPrefix() { + return topicPrefix; + } + + public void setTopicPrefix(final String topicPrefix) { + this.topicPrefix = topicPrefix; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/actions/SubscriptionAction.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/actions/SubscriptionAction.java new file mode 100644 index 0000000..375c593 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/actions/SubscriptionAction.java @@ -0,0 +1,25 @@ +package eu.dnetlib.enabling.actions; + +import eu.dnetlib.enabling.tools.blackboard.NotificationHandler; + +/** + * Manager subscription action. + * + * @author marko + * + */ +public interface SubscriptionAction extends NotificationHandler { + /** + * topic expression associated with this subscription action. + * + * @return SN topic expression + */ + String getTopicExpression(); + + /** + * react on this topic prefix. Might be shorter than the topic expression. + * + * @return SN topic prefix + */ + String getTopicPrefix(); +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/AbstractSchedulable.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/AbstractSchedulable.java new file mode 100644 index 0000000..261fdef --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/AbstractSchedulable.java @@ -0,0 +1,191 @@ +package eu.dnetlib.enabling.tools; + +import java.text.ParseException; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +import org.quartz.*; +import org.quartz.Trigger.TriggerState; +import org.quartz.impl.JobDetailImpl; +import org.quartz.impl.triggers.CronTriggerImpl; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.beans.factory.annotation.Required; + +/** + * Common implementation for schedulable beans. + * + * @author claudio + * + */ +public abstract class AbstractSchedulable implements Schedulable, Job, BeanNameAware { + + private static final String THIS = "this"; + + private final static String GROUP = "schedulableJobs"; + + private boolean enabled; + + private CronExpression cronExpression; + + private String beanName; + + @Resource(name = "dnetJobScheduler") + private Scheduler jobScheduler; + + @PostConstruct + protected void init() { + try { + + JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put(THIS, this); + + JobDetailImpl jd = new JobDetailImpl(); + jd.setName(getBeanName()); + jd.setGroup(GROUP); + jd.setJobDataMap(jobDataMap); + jd.setJobClass(this.getClass()); + + jobScheduler.scheduleJob(jd, createTrigger()); + + } catch (SchedulerException e) { + throw new RuntimeException(e); + } + } + + private Trigger createTrigger() { + try { + CronTriggerImpl trigger = new CronTriggerImpl(getBeanName(), GROUP, getCronExpression()); + trigger.setMisfireInstruction(Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY); + trigger.setJobGroup(GROUP); + trigger.setJobName(getBeanName()); + return trigger; + } catch (ParseException e) { + throw new IllegalArgumentException("invalid cron expression: " + cronExpression, e); + } + } + + protected abstract void doExecute(); + + @Override + public void execute() { + // bean represents the quartz instance of this object + if (isEnabled()) { + doExecute(); + } + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + AbstractSchedulable bean = (AbstractSchedulable) context.getJobDetail().getJobDataMap().get(THIS); + + // bean represents the quartz instance of this object + if (bean.isEnabled()) { + bean.doExecute(); + } + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + @Override + public String getCronExpression() { + return cronExpression.getCronExpression(); + } + + @Required + public void setCronExpression(String cronExpression) { + try { + this.cronExpression = new CronExpression(cronExpression); + } catch (ParseException e) { + throw new IllegalArgumentException("invalid cron expression: " + cronExpression, e); + } + } + + @Override + public void updateCronExpression(String cronExpression) { + + if (!cronExpression.equals(getCronExpression())) { + setCronExpression(cronExpression); + try { + final TriggerKey tk = new TriggerKey(getBeanName()); + jobScheduler.rescheduleJob(tk, createTrigger()); + } catch (SchedulerException e) { + throw new RuntimeException("unable to reschedule trigger", e); + } + } + } + + @Override + public String getNextFireTime() { + try { + if (isPaused()) { + return ""; + } + if (!isEnabled()) { + return ""; + } + final TriggerKey tk = new TriggerKey(getBeanName()); + final Trigger t = jobScheduler.getTrigger(tk); + return t != null ? t.getNextFireTime().toString() : ""; + } catch (SchedulerException e) { + throw new RuntimeException("unable to get trigger", e); + } + } + + @Override + public boolean isPaused() { + try { + final TriggerKey tk = new TriggerKey(getBeanName()); + final TriggerState state = jobScheduler.getTriggerState(tk); + switch (state) { + case PAUSED: + case NONE: + case ERROR: + return true; + default: + return false; + } + } catch (SchedulerException e) { + throw new RuntimeException("unable to get trigger", e); + } + } + + @Override + public void pause() { + try { + final TriggerKey tk = new TriggerKey(getBeanName()); + jobScheduler.pauseTrigger(tk); + } catch (SchedulerException e) { + throw new RuntimeException("unable to pause trigger", e); + } + } + + @Override + public void resume() { + try { + final TriggerKey tk = new TriggerKey(getBeanName()); + jobScheduler.resumeTrigger(tk); + } catch (SchedulerException e) { + throw new RuntimeException("unable to resume trigger", e); + } + } + + public String getBeanName() { + return beanName; + } + + @Override + @Required + public void setBeanName(String beanName) { + this.beanName = beanName; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/Enableable.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/Enableable.java new file mode 100644 index 0000000..87088cc --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/Enableable.java @@ -0,0 +1,25 @@ +package eu.dnetlib.enabling.tools; + +/** + * Simple interface for classes, which's objects can be enabled or disabled. + * + * + * @author marko + * + */ +public interface Enableable { + /** + * True if this object enabled. + * + * @return if this object is currently enabled. + */ + boolean isEnabled(); + + /** + * Enables of disables this object. + * + * @param enabled + * enabled + */ + void setEnabled(boolean enabled); +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/EnableableEnumerator.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/EnableableEnumerator.java new file mode 100644 index 0000000..61eb252 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/EnableableEnumerator.java @@ -0,0 +1,57 @@ +package eu.dnetlib.enabling.tools; + +import java.util.Map; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.ListableBeanFactory; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; + +/** + * This bean has to live in the same bean factory where all enableables live. Thus it cannot be put inside the EnableablController, which + * lives in the webContext. + * + * @author marko + * + */ +public class EnableableEnumerator implements BeanFactoryAware { + + /** + * bean factory. + */ + private ListableBeanFactory beanFactory; + + /** + * Get all beans implementing the Enableable interface. + * + * @return + */ + public Map getAllEnableables() { + return beanFactory.getBeansOfType(Enableable.class); + } + + @Override + public void setBeanFactory(final BeanFactory beanFactory) throws BeansException { + this.beanFactory = (ListableBeanFactory) beanFactory; + } + + public ListableBeanFactory getBeanFactory() { + return beanFactory; + } + + /** + * Get given enableable or null. + * + * @param name + * @return + */ + public Enableable getEnableable(final String name) { + try { + return beanFactory.getBean(name, Enableable.class); + } catch (final NoSuchBeanDefinitionException e) { + return null; + } + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/Schedulable.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/Schedulable.java new file mode 100644 index 0000000..5608ac0 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/Schedulable.java @@ -0,0 +1,52 @@ +package eu.dnetlib.enabling.tools; + +/** + * Simple interface for classes, which's objects actions can be scheduled. + * + * @author claudio + */ +public interface Schedulable extends Enableable { + /** + * Updates the cron expression. + * + * @param cronExpression + */ + void updateCronExpression(String cronExpression); + + /** + * Getter for the cron expression. + * + * @return the String representing the cron expression. + */ + String getCronExpression(); + + /** + * Computes the next time the doExecute method will be invoked. + * + * @return the String representing the next fire time. + */ + String getNextFireTime(); + + /** + * True if the object has been paused. + * + * @return if the bean has been paused. + */ + boolean isPaused(); + + /** + * Prevents the bean for being scheduled for future invocations. + */ + void pause(); + + /** + * Resumes the bean for being scheduled for future invocations. + */ + void resume(); + + /** + * Must be implemented to perform the real action. + */ + void execute(); + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/SchedulableEnumerator.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/SchedulableEnumerator.java new file mode 100644 index 0000000..e98de95 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/SchedulableEnumerator.java @@ -0,0 +1,57 @@ +package eu.dnetlib.enabling.tools; + +import java.util.Map; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.ListableBeanFactory; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; + +/** + * This bean has to live in the same bean factory where all schedulables live. Thus it cannot be put inside the EnableablController, which + * lives in the webContext. + * + * @author claudio + * + */ +public class SchedulableEnumerator implements BeanFactoryAware { + + /** + * bean factory. + */ + private ListableBeanFactory beanFactory; + + /** + * Get all beans implementing the Schedulable interface. + * + * @return + */ + public Map getAllSchedulables() { + return beanFactory.getBeansOfType(Schedulable.class); + } + + @Override + public void setBeanFactory(final BeanFactory beanFactory) throws BeansException { + this.beanFactory = (ListableBeanFactory) beanFactory; + } + + public ListableBeanFactory getBeanFactory() { + return beanFactory; + } + + /** + * Get given schedulable or null. + * + * @param name + * @return + */ + public Schedulable getSchedulable(final String name) { + try { + return beanFactory.getBean(name, Schedulable.class); + } catch (final NoSuchBeanDefinitionException e) { + return null; + } + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/SchedulableProfileUpdater.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/SchedulableProfileUpdater.java new file mode 100644 index 0000000..296d696 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/SchedulableProfileUpdater.java @@ -0,0 +1,48 @@ +package eu.dnetlib.enabling.tools; + +import javax.annotation.Resource; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Required; + +import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException; +import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; +import eu.dnetlib.enabling.locators.UniqueServiceLocator; + +/** + * Schedulable implementation of an xUpdate. + * + * @author claudio + * + */ +public class SchedulableProfileUpdater extends AbstractSchedulable { + + private static final Log log = LogFactory.getLog(SchedulableProfileUpdater.class); // NOPMD by marko on 11/24/08 5:02 PM + + private String xUpdate; + + @Resource + private UniqueServiceLocator serviceLocator; + + @Override + protected void doExecute() { + try { + log.info("triggering scheduled reindex: " + getxUpdate()); + serviceLocator.getService(ISRegistryService.class).executeXUpdate(getxUpdate()); + } catch (ISRegistryException e) { + + throw new RuntimeException(e); + } + } + + public String getxUpdate() { + return xUpdate; + } + + @Required + public void setxUpdate(String xUpdate) { + this.xUpdate = xUpdate; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/AbstractBlackboardJobListener.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/AbstractBlackboardJobListener.java new file mode 100644 index 0000000..75e65be --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/AbstractBlackboardJobListener.java @@ -0,0 +1,62 @@ +package eu.dnetlib.enabling.tools.blackboard; + +/** + * Utility abstract class which dispatches to commodity onSomething() methods, one for each interesting + * blackboard job state. + * + * @author marko + * + */ +public abstract class AbstractBlackboardJobListener implements BlackboardJobListener { + + /** + * {@inheritDoc} + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardJobListener#processJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob) + */ + @Override + public void processJob(final BlackboardJob job) { + if (job.getActionStatus() == ActionStatus.DONE) + onDone(job); + else if (job.getActionStatus() == ActionStatus.FAILED) + onFailed(job); + else if (job.getActionStatus() == ActionStatus.ASSIGNED) + onAssigned(job); + else if (job.getActionStatus() == ActionStatus.ONGOING) + onOngoing(job); + } + + /** + * Called when the job enters the ASSIGNED state. + * + * @param job job + */ + protected void onAssigned(final BlackboardJob job) { // NOPMD + // default no operation + // TODO: increase job expiry time + } + + /** + * Called when the job enters the ONGOING state. + * + * @param job job + */ + protected void onOngoing(final BlackboardJob job) { // NOPMD + // default no operation + // TODO: increase job expiry time + } + + /** + * Called when the job finishes in the FAILED state. + * + * @param job job + */ + protected abstract void onFailed(BlackboardJob job); + + /** + * Called when the job finishes in the DONE state. + * + * @param job job + */ + protected abstract void onDone(BlackboardJob job); + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/AbstractBlackboardNotificationHandler.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/AbstractBlackboardNotificationHandler.java new file mode 100644 index 0000000..92481b3 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/AbstractBlackboardNotificationHandler.java @@ -0,0 +1,96 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.io.IOException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.xpath.XPathExpressionException; + +import org.springframework.beans.factory.annotation.Required; +import org.xml.sax.SAXException; + +import eu.dnetlib.enabling.tools.Enableable; +import eu.dnetlib.enabling.tools.StringOpaqueResource; + +/** + * Common blackboard notification handler. This notification handler processes only message with ...BODY.BLACKBOARD.LAST* as topic. + * + * @param + * type of blackboard handler used to extract the blackboard message (client or server) + * @author marko + * + */ +public abstract class AbstractBlackboardNotificationHandler implements NotificationHandler, Enableable { + + /** + * blackboard handler. + */ + private T blackboardHandler; + + /** + * true if enabled. + */ + private boolean enabled = true; + + /** + * Executor handles the notified request in a dedicated thread and allows to return immediately. + */ + private Executor executor = Executors.newCachedThreadPool(); + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.data.mdstore.NotificationHandler#notified(java.lang.String, java.lang.String, java.lang.String, java.lang.String) + */ + @Override + public void notified(final String subscrId, final String topic, final String rsId, final String profile) { + if (!topic.contains("BODY.BLACKBOARD.LAST")) return; + + executor.execute(new Runnable() { + + @Override + public void run() { + try { + processJob(blackboardHandler.getJob(new StringOpaqueResource(profile))); + } catch (final XPathExpressionException e) { + throw new IllegalStateException(e); + } catch (final SAXException e) { + throw new IllegalStateException(e); + } catch (final IOException e) { + throw new IllegalStateException(e); + } catch (final ParserConfigurationException e) { + throw new IllegalStateException(e); + } + } + }); + } + + /** + * Subclassess override this to process incoming blackboard jobs. + * + * @param job + * blackboard job + */ + protected abstract void processJob(BlackboardJob job); + + public T getBlackboardHandler() { + return blackboardHandler; + } + + @Required + public void setBlackboardHandler(final T blackboardHandler) { + this.blackboardHandler = blackboardHandler; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public void setEnabled(final boolean enabled) { + this.enabled = enabled; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/ActionStatus.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/ActionStatus.java new file mode 100644 index 0000000..4966f61 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/ActionStatus.java @@ -0,0 +1,26 @@ +package eu.dnetlib.enabling.tools.blackboard; + +/** + * Blackboard action status. + * + * @author marko + * + */ +public enum ActionStatus { + /** + * The job/action is assigned but not jet taken into started execution. + */ + ASSIGNED, + /** + * The job/action is ongoing. + */ + ONGOING, + /** + * The job/action is completed successfully. + */ + DONE, + /** + * The job/action is completed with failure. + */ + FAILED +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardClientHandler.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardClientHandler.java new file mode 100644 index 0000000..b5c70c7 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardClientHandler.java @@ -0,0 +1,32 @@ +package eu.dnetlib.enabling.tools.blackboard; + +/** + * Helpers for the blackboard protocol client. + * + * @author marko + * + */ +public interface BlackboardClientHandler extends BlackboardHandler { + /** + * Create a new job. + * + * @param serviceId service identifier + * @return newly created blackboard job + */ + BlackboardJob newJob(String serviceId); + + /** + * Assign a blackboard job to a service. + * + * @param job blackboard job to send + */ + void assign(BlackboardJob job); + + /** + * The client can delete the job after it has reached a final state + * or the job timeout has expired. + * + * @param job blackboard job to delete. + */ + void delete(BlackboardJob job); +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardClientHandlerImpl.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardClientHandlerImpl.java new file mode 100644 index 0000000..c1864d1 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardClientHandlerImpl.java @@ -0,0 +1,155 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.util.Map; + +import javax.xml.bind.JAXBException; +import javax.xml.transform.dom.DOMSource; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.springframework.beans.factory.annotation.Required; +import org.w3c.dom.Element; + +import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException; +import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; +import eu.dnetlib.enabling.locators.UniqueServiceLocator; +import eu.dnetlib.enabling.tools.OpaqueResource; +import eu.dnetlib.enabling.tools.UniqueIdentifierGenerator; +import eu.dnetlib.enabling.tools.UniqueIdentifierGeneratorImpl; +import eu.dnetlib.miscutils.jaxb.JaxbFactory; + +/** + * Blackboard client. + * + * @author marko + * + */ +public class BlackboardClientHandlerImpl implements BlackboardClientHandler { + + /** + * blackboard message factory. + */ + private JaxbFactory messageFactory; + + /** + * service locator. + */ + private UniqueServiceLocator serviceLocator; + + /** + * generates blackboard message identifiers for new jobs. + */ + private UniqueIdentifierGenerator uuidGenerator = new UniqueIdentifierGeneratorImpl("bb-"); + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler#assign(eu.dnetlib.enabling.tools.blackboard.BlackboardJob) + */ + @Override + public void assign(final BlackboardJob job) { + checkJob(job); + + try { + serviceLocator.getService(ISRegistryService.class).addBlackBoardMessage(job.getServiceId(), job.getId(), + messageFactory.serialize(job.getMessage())); + } catch (final ISRegistryException e) { + throw new IllegalStateException("cannot register blackboard message", e); + } catch (final JAXBException e) { + throw new IllegalArgumentException("cannot serialize blackboard message", e); + } + } + + /** + * Check that the job has sane values. + * + * @param job + */ + protected void checkJob(final BlackboardJob job) { + for (Map.Entry param : job.getParameters().entrySet()) { + if (param.getValue() == null) { throw new IllegalStateException("job parameter value cannot be null: " + param.getKey()); } + } + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler#delete(eu.dnetlib.enabling.tools.blackboard.BlackboardJob) + */ + @Override + public void delete(final BlackboardJob job) { + try { + serviceLocator.getService(ISRegistryService.class).deleteBlackBoardMessage(job.getServiceId(), job.getId()); + } catch (final ISRegistryException e) { + throw new IllegalStateException("cannot delete blackboard message", e); + } + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler#newJob() + */ + @Override + public BlackboardJob newJob(final String serviceId) { + final BlackboardJob job = new BlackboardJobImpl(serviceId, messageFactory.newInstance()); + job.setActionStatus(ActionStatus.ASSIGNED); + job.getParameters().put("id", ""); + job.getParameters().put("error", ""); + + job.setId(uuidGenerator.generateIdentifier()); + return job; + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardHandler#getJob(eu.dnetlib.enabling.tools.OpaqueResource) + */ + @Override + public BlackboardJob getJob(final OpaqueResource profile) { + + final XPath xpa = XPathFactory.newInstance().newXPath(); + + try { + final Element source = (Element) xpa.evaluate("/RESOURCE_PROFILE/BODY/BLACKBOARD/MESSAGE[@id = /RESOURCE_PROFILE/BODY/BLACKBOARD/LAST_RESPONSE]", + profile.asDom(), XPathConstants.NODE); + + if (source == null) { throw new IllegalStateException("cannot find last blackboard message in the service profile"); } + + return new BlackboardJobImpl(profile.getResourceId(), messageFactory.parse(new DOMSource(source))); + } catch (final JAXBException e) { + throw new IllegalStateException("cannot parse blackboard message", e); + } catch (final XPathExpressionException e) { + throw new IllegalStateException("cannot find last blackboard message in the service profile", e); + } + } + + public JaxbFactory getMessageFactory() { + return messageFactory; + } + + public void setMessageFactory(final JaxbFactory messageFactory) { + this.messageFactory = messageFactory; + } + + public UniqueIdentifierGenerator getUuidGenerator() { + return uuidGenerator; + } + + public void setUuidGenerator(final UniqueIdentifierGenerator uuidGenerator) { + this.uuidGenerator = uuidGenerator; + } + + public UniqueServiceLocator getServiceLocator() { + return serviceLocator; + } + + @Required + public void setServiceLocator(final UniqueServiceLocator serviceLocator) { + this.serviceLocator = serviceLocator; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardHandler.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardHandler.java new file mode 100644 index 0000000..3c82f15 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardHandler.java @@ -0,0 +1,20 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import eu.dnetlib.enabling.tools.OpaqueResource; + +/** + * Basic blackboard handler. + * + * @author marko + * + */ +public interface BlackboardHandler { + /** + * Get the current job from, as notified in the service profile. + * + * @param profile + * service profile + * @return notified blackboard job + */ + BlackboardJob getJob(OpaqueResource profile); +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardHandlerImpl.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardHandlerImpl.java new file mode 100644 index 0000000..48e7410 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardHandlerImpl.java @@ -0,0 +1,135 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.io.PrintWriter; +import java.io.StringWriter; + +import javax.xml.bind.JAXBException; +import javax.xml.transform.dom.DOMSource; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.springframework.beans.factory.annotation.Required; +import org.w3c.dom.Element; + +import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException; +import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; +import eu.dnetlib.enabling.locators.UniqueServiceLocator; +import eu.dnetlib.enabling.tools.OpaqueResource; +import eu.dnetlib.miscutils.jaxb.JaxbFactory; + +/** + * Blackboard handler implementation. + * + * @author marko + * + */ +public class BlackboardHandlerImpl implements BlackboardServerHandler { + + /** + * blackboard message factory. + */ + private JaxbFactory messageFactory; + + /** + * service locator. + */ + private UniqueServiceLocator serviceLocator; + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardHandler#getJob(eu.dnetlib.enabling.tools.OpaqueResource) + */ + @Override + public BlackboardJob getJob(final OpaqueResource profile) { + + final XPath xpa = XPathFactory.newInstance().newXPath(); + + try { + final Element source = (Element) xpa.evaluate("/RESOURCE_PROFILE/BODY/BLACKBOARD/MESSAGE[@id = /RESOURCE_PROFILE/BODY/BLACKBOARD/LAST_REQUEST]", + profile.asDom(), XPathConstants.NODE); + + if (source == null) { throw new IllegalStateException("cannot find last blackboard message in the service profile"); } + + return new BlackboardJobImpl(profile.getResourceId(), messageFactory.parse(new DOMSource(source))); + } catch (final JAXBException e) { + throw new IllegalStateException("cannot parse blackboard message", e); + } catch (final XPathExpressionException e) { + throw new IllegalStateException("cannot find last blackboard message in the service profile", e); + } + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardHandler#done(eu.dnetlib.enabling.tools.blackboard.BlackboardJob) + */ + @Override + public void done(final BlackboardJob job) { + job.setActionStatus(ActionStatus.DONE); + replyJob(job); + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardHandler#failed(eu.dnetlib.enabling.tools.blackboard.BlackboardJob, + * java.lang.Exception) + */ + @Override + public void failed(final BlackboardJob job, final Throwable exception) { + job.setActionStatus(ActionStatus.FAILED); + final StringWriter stackTrace = new StringWriter(); + exception.printStackTrace(new PrintWriter(stackTrace)); + job.getParameters().put("error", exception.toString()); + job.getParameters().put("errorDetails", stackTrace.toString()); + replyJob(job); + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardHandler#ongoing(eu.dnetlib.enabling.tools.blackboard.BlackboardJob) + */ + @Override + public void ongoing(final BlackboardJob job) { + job.setActionStatus(ActionStatus.ONGOING); + replyJob(job); + } + + /** + * Internal helper method which replies a blackboard job. + * + * @param job + * blackboard job + */ + protected void replyJob(final BlackboardJob job) { + try { + serviceLocator.getService(ISRegistryService.class).replyBlackBoardMessage(job.getServiceId(), messageFactory.serialize(job.getMessage())); + } catch (final ISRegistryException e) { + throw new IllegalStateException("cannot reply the blackboard message", e); + } catch (final JAXBException e) { + throw new IllegalArgumentException("cannot serialize blackboard message", e); + } + } + + public JaxbFactory getMessageFactory() { + return messageFactory; + } + + public void setMessageFactory(final JaxbFactory messageFactory) { + this.messageFactory = messageFactory; + } + + public UniqueServiceLocator getServiceLocator() { + return serviceLocator; + } + + @Required + public void setServiceLocator(final UniqueServiceLocator serviceLocator) { + this.serviceLocator = serviceLocator; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJob.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJob.java new file mode 100644 index 0000000..1a50b34 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJob.java @@ -0,0 +1,118 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.util.Map; + +/** + * High level representation of a blackboard job. + * + * @author marko + * + */ +public interface BlackboardJob { + + /** + * Get the message id. + * + * @return id + */ + String getId(); + + /** + * Set message id. + * + * @param identifier + * id + */ + void setId(String identifier); + + /** + * Get the action name. + * + * @return action name + */ + String getAction(); + + /** + * Set the action name. + * + * @param action + * action name + */ + void setAction(String action); + + /** + * Get the action status. + * + * @return action status + */ + ActionStatus getActionStatus(); + + /** + * Set the action status. + * + * @param actionStatus + * action status + */ + void setActionStatus(ActionStatus actionStatus); + + /** + * Get the message date. + * + * @return date + */ + String getDate(); + + /** + * Set the message date. + * + * @param date + * date + */ + void setDate(String date); + + /** + * obtains a mutable parameter map (key/value). + * + * @return mutable parameter map + */ + Map getParameters(); + + /** + * Get the error message, if the actionStatus is FAILED. Usually the error message is also present in the parameter map but it's + * preferred to use this method. + * + * @return error message + */ + String getError(); + + /** + * Set the error message, if the actionStatus is FAILED. Usually the error message is also present in the parameter map but it's + * preferred to use this method. + * + * @param error + * error message + */ + void setError(String error); + + /** + * Get the setviceId. + * + * @return the serviceId + */ + String getServiceId(); + + /** + * True if the action status is one of the termination statuses (e.g. done and failed). + * + * @return true if completed + */ + boolean isCompleted(); + + /** + * Get the bb message. + * + * @return the bb message + */ + BlackboardMessage getMessage(); + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobImpl.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobImpl.java new file mode 100644 index 0000000..e829c18 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobImpl.java @@ -0,0 +1,163 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * High level representation of a blackboard job. + * + * @author marko + * + */ +public class BlackboardJobImpl implements BlackboardJob { + + /** + * underlying low level blackboard message. + */ + private BlackboardMessage message; + + /** + * service identifier. + */ + private String serviceId; + + /** + * parameters. + */ + private final transient Map parameters = new HashMap(); + + /** + * Construct a new blackboard job from a blackboard message. + * + * @param serviceId + * service identifier + * @param message + * underlying low-level blackboard message + */ + public BlackboardJobImpl(final String serviceId, final BlackboardMessage message) { + super(); + this.message = message; + this.serviceId = serviceId; + + for (final BlackboardParameter param : message.getParameters()) { + parameters.put(param.getName(), param.getValue()); + } + } + + @Override + public String getId() { + return message.getId(); + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardJob#setId(java.lang.String) + */ + @Override + public void setId(final String identifier) { + message.setId(identifier); + } + + @Override + public String getAction() { + return message.getAction(); + } + + @Override + public ActionStatus getActionStatus() { + return message.getActionStatus(); + } + + @Override + public String getDate() { + return message.getDate(); + } + + @Override + public String getError() { + return getParameters().get("error"); + } + + @Override + public Map getParameters() { + return parameters; + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardJob#setAction(java.lang.String) + */ + @Override + public void setAction(final String action) { + message.setAction(action); + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardJob#setActionStatus(eu.dnetlib.enabling.tools.blackboard.ActionStatus) + */ + @Override + public void setActionStatus(final ActionStatus actionStatus) { + message.setActionStatus(actionStatus); + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardJob#setDate(java.lang.String) + */ + @Override + public void setDate(final String date) { + message.setDate(date); + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardJob#setError(java.lang.String) + */ + @Override + public void setError(final String error) { + getParameters().put("error", error); + } + + /** + * returns the blackboard message, potentially modified. + * + * @return underlying blackboard message + */ + @Override + public BlackboardMessage getMessage() { + message.getParameters().clear(); + for (final Entry entry : getParameters().entrySet()) { + final BlackboardParameterImpl param = new BlackboardParameterImpl(); // NOPMD + param.setName(entry.getKey()); + param.setValue(entry.getValue()); + message.getParameters().add(param); + } + return message; + } + + public void setMessage(final BlackboardMessage message) { + this.message = message; + } + + @Override + public String getServiceId() { + return serviceId; + } + + public void setServiceId(final String serviceId) { + this.serviceId = serviceId; + } + + @Override + public boolean isCompleted() { + return getActionStatus() == ActionStatus.DONE || getActionStatus() == ActionStatus.FAILED; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobListener.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobListener.java new file mode 100644 index 0000000..7dc1803 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobListener.java @@ -0,0 +1,18 @@ +package eu.dnetlib.enabling.tools.blackboard; + +/** + * Implement this interface in order to receive notifications for specific blackboard messages. + * + * @author marko + * + */ +public interface BlackboardJobListener { + + /** + * process the given job. + * + * @param job job to process + */ + void processJob(BlackboardJob job); + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobRegistry.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobRegistry.java new file mode 100644 index 0000000..161925f --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardJobRegistry.java @@ -0,0 +1,19 @@ +package eu.dnetlib.enabling.tools.blackboard; + +/** + * Registers BlackboardJobListeners to be called when related incoming blackboard job notifications arrive. + * @author marko + * + */ +public interface BlackboardJobRegistry { + /** + * Registers a new job listener. It will be notified when the job changes state. The listener will be unregistered + * when the job arrives to a final state (DONE, or FAILED). + * + * @param job + * job + * @param listener + * job listener + */ + void registerJobListener(BlackboardJob job, BlackboardJobListener listener); +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardMessage.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardMessage.java new file mode 100644 index 0000000..c835946 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardMessage.java @@ -0,0 +1,82 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.util.List; + +/** + * Implementor of this interface represent blackboard messages nodes as the appear in the service profile blackboard. + * + * This is the low-level representation of the blackboard message. The BlackboardJob is a higher level representation of a blackboard message, + * and a BlackboardHandler helps converting between the two. + * + * Implementors of this interface should simply provide a serialization so that the message can be stored in the service profile. + * + * @author marko + * + */ +public interface BlackboardMessage { + /** + * blackboard message date. + * + * @return date + */ + String getDate(); + + /** + * sets the blackboard message date/timestamp. + * + * @param date + * date + */ + void setDate(String date); + + /** + * Blackboard message identifier. + * + * @return identifier + */ + String getId(); + + /** + * sets the blackboard message identifier. + * + * @param id identifier + */ + void setId(String id); // NOPMD + + /** + * Blackboard message action name. + * + * @return action + */ + String getAction(); + + /** + * Sets the blackboard message action name. + * + * @param action + * action + */ + void setAction(String action); + + /** + * Blackboard message parameter list. + * + * @return parameter list + */ + List getParameters(); + + /** + * Status of the action associated with this message. + * + * @return action status + */ + ActionStatus getActionStatus(); + + /** + * sets the action status. + * + * @param actionStatus + * action status + */ + void setActionStatus(ActionStatus actionStatus); +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardMessageImpl.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardMessageImpl.java new file mode 100644 index 0000000..bc683ea --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardMessageImpl.java @@ -0,0 +1,142 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.util.ArrayList; +import java.util.List; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +/** + * standard serialization of the blackboard message. + * + * @author marko + * + */ +@XmlRootElement(namespace = "", name = "MESSAGE") +@XmlAccessorType(XmlAccessType.NONE) +public class BlackboardMessageImpl implements BlackboardMessage { + + /** + * hash seed. + */ + private static final int HASH_SEED_2 = 63; + + /** + * hash seed. + */ + private static final int HASH_SEED = 13; + + /** + * blackboard message timestamp. + */ + @XmlAttribute + private String date; + + /** + * blackboard message identifier. + */ + @XmlAttribute + private String id; // NOPMD + + /** + * blackboard message action name. + */ + @XmlElement(name = "ACTION", required = true) + private String action; + + /** + * blackboard message parameters (key/value). + */ + @XmlElement(name = "PARAMETER", type = BlackboardParameterImpl.class) + private List parameters = new ArrayList(); + + /** + * the status of the action described by this message. + */ + @XmlElement(name = "ACTION_STATUS", required = true) + private ActionStatus actionStatus; + + /** + * {@inheritDoc} + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(final Object obj) { + if (!(obj instanceof BlackboardMessage)) + return false; + + if (this == obj) + return true; + + final BlackboardMessage rhs = (BlackboardMessage) obj; + return new EqualsBuilder().append(id, rhs.getId()).append(date, rhs.getDate()).append(action, rhs.getAction()).append(actionStatus, + rhs.getActionStatus()).append(parameters, rhs.getParameters()).isEquals(); + } + + /** + * {@inheritDoc} + * + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return new HashCodeBuilder(HASH_SEED, HASH_SEED_2).append(id).append(date).append(action).append(actionStatus).append(parameters).toHashCode(); + } + + @Override + public String getDate() { + return date; + } + + @Override + public void setDate(final String date) { + this.date = date; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(final String id) { // NOPMD + this.id = id; + } + + @Override + public String getAction() { + return action; + } + + @Override + public void setAction(final String action) { + this.action = action; + } + + @Override + public List getParameters() { + return parameters; + } + + public void setParameters(final List parameters) { + this.parameters = parameters; + } + + @Override + public ActionStatus getActionStatus() { + return actionStatus; + } + + @Override + public void setActionStatus(final ActionStatus actionStatus) { + this.actionStatus = actionStatus; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardNotificationHandler.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardNotificationHandler.java new file mode 100644 index 0000000..96f0adb --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardNotificationHandler.java @@ -0,0 +1,59 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.util.HashMap; +import java.util.Map; + +/** + * This notification handler dispatches incoming notifications to registered blackboard message listeners. Usually used + * by blackboard notification clients, since they are usually interested only in the messages they have sent and not in + * messages sent by other BB clients. Moreover it's an useful hook for dispatching orchestration. + * + * @param + * blackboard handler (client or server) + * @author marko + * + */ +public class BlackboardNotificationHandler extends AbstractBlackboardNotificationHandler implements BlackboardJobRegistry { + + /** + * blackboard message listeners. + */ + private Map listeners = new HashMap(); + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.AbstractBlackboardNotificationHandler#processJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob) + */ + @Override + protected void processJob(final BlackboardJob job) { + final BlackboardJobListener listener = listeners.get(job.getId()); + if (listener != null) { + if (job.isCompleted()) { + listeners.remove(job.getId()); + } + + listener.processJob(job); + } + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardJobRegistry#registerJobListener(eu.dnetlib.enabling.tools.blackboard.BlackboardJob, + * eu.dnetlib.enabling.tools.blackboard.BlackboardJobListener) + */ + @Override + public void registerJobListener(final BlackboardJob job, final BlackboardJobListener listener) { + listeners.put(job.getId(), listener); + } + + public Map getListeners() { + return listeners; + } + + public void setListeners(final Map listeners) { + this.listeners = listeners; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardParameter.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardParameter.java new file mode 100644 index 0000000..5267edf --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardParameter.java @@ -0,0 +1,36 @@ +package eu.dnetlib.enabling.tools.blackboard; + +/** + * Blackboard parameter used in a BlackboardMessage. + * + * @author marko + * + */ +public interface BlackboardParameter { + + /** + * parameter name (key). + * @return name + */ + String getName(); + + /** + * setter. + * + * @param name name + */ + void setName(String name); + + /** + * parameter value. + * @return value + */ + String getValue(); + + /** + * setter. + * + * @param value value + */ + void setValue(String value); +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardParameterImpl.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardParameterImpl.java new file mode 100644 index 0000000..88e5226 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardParameterImpl.java @@ -0,0 +1,87 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +/** + * Blackboard parameter. + * + * @author marko + * + */ +@XmlRootElement(name = "PARAMETER") +public class BlackboardParameterImpl implements BlackboardParameter { + + /** + * hash seed. + */ + private static final int HASH_SEED_2 = 59; + + /** + * hash seed. + */ + private static final int HASH_SEED = 35; + + /** + * parameter name. + */ + private String name; + + /** + * parameter value. + */ + private String value; + + /** + * {@inheritDoc} + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(final Object obj) { + if (!(obj instanceof BlackboardParameter)) + return false; + + if (this == obj) + return true; + + final BlackboardParameter rhs = (BlackboardParameter) obj; + return new EqualsBuilder().append(name, rhs.getName()).append(value, rhs.getValue()).isEquals(); + } + + /** + * {@inheritDoc} + * + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return new HashCodeBuilder(HASH_SEED, HASH_SEED_2).append(name).append(value).toHashCode(); + } + + @Override + @XmlAttribute + public String getName() { + return name; + } + + @Override + public void setName(final String name) { + this.name = name; + } + + @Override + @XmlAttribute + public String getValue() { + return value; + } + + @Override + public void setValue(final String value) { + this.value = value; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerAction.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerAction.java new file mode 100644 index 0000000..2ef3536 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerAction.java @@ -0,0 +1,24 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; +import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler; + +/** + * Callback interface for serverside blackboard actions. + * + *

+ * Actions should use the 'handler' methods to set the 'ongoing' or 'done' status. + *

+ * + *

+ * failed status is automatically set upon catching an exception thrown by this interface, so feel free to simply throw + * whatever you want + *

+ * + * @author marko + * + * @param + */ +public interface BlackboardServerAction> { + void execute(BlackboardServerHandler handler, BlackboardJob job) throws Exception; +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerActionExecutor.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerActionExecutor.java new file mode 100644 index 0000000..6f278aa --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerActionExecutor.java @@ -0,0 +1,119 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.util.Map; + +import javax.annotation.PostConstruct; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Required; + +/** + * A BlackboardServerActionExecutor dispatches the execution of server side blackboard actions. + * + *

+ * The primary responsibility of this class is to organize the actions, dispatch the action when called, and transform + * exceptions into blackboard 'failures'. + *

+ * + *

+ * The action executor is synchronous. It's responsibility of the actual action implementations to put the jobs in + * background, if needed. + *

+ * + * @author marko + * + * @param + */ +public class BlackboardServerActionExecutor> { + private static final Log log = LogFactory.getLog(BlackboardServerActionExecutor.class); // NOPMD by marko on 11/24/08 5:02 PM + + /** + * set by spring, so key has to be string + */ + private Map> actionMap; + + /** + * used to set bb msg status as done, failed etc. + */ + private BlackboardServerHandler blackboardHandler; + + /** + * used to exploit static enum declaration for runtime checks of allowed types. + */ + private Class actionType; + + /** + * if true, the executor will allow for an incomplete implementation of actions. If false, it an exception will be + * thrown if not all actions are implemented, according to the actionType enum. + */ + private boolean incomplete = false; + + @PostConstruct + public void check() { + for (String key : actionMap.keySet()) + Enum.valueOf(actionType, key); + + if (!incomplete) + for (X en : actionType.getEnumConstants()) + if (actionMap.get(en.toString()) == null) + throw new IllegalArgumentException("action " + en + " not declared in action map. Action is mandatory"); + } + + /** + * Executes a blackboard job, watching for + * + * @param job + */ + public void execute(final BlackboardJob job) { + try { + final X label = Enum.valueOf(actionType, job.getAction()); + + final BlackboardServerAction action = actionMap.get(label.toString()); + if (action != null) { + action.execute(blackboardHandler, job); + } else { + log.warn("Cannot find action handler for blackboard action: " + label); + throw new IllegalArgumentException("Cannot find action handler for blackboard action: " + label); + } + } catch (final Throwable e) { + log.warn("got some exception during blackboard handler execution", e); + blackboardHandler.failed(job, e); + } + } + + public BlackboardServerHandler getBlackboardHandler() { + return blackboardHandler; + } + + @Required + public void setBlackboardHandler(final BlackboardServerHandler blackboardHandler) { + this.blackboardHandler = blackboardHandler; + } + + public Class getActionType() { + return actionType; + } + + @Required + public void setActionType(final Class actionType) { + this.actionType = actionType; + } + + public Map> getActionMap() { + return actionMap; + } + + public void setActionMap(Map> actionMap) { + this.actionMap = actionMap; + } + + public boolean isIncomplete() { + return incomplete; + } + + public void setIncomplete(boolean incomplete) { + this.incomplete = incomplete; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerExecutorNotificationHandler.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerExecutorNotificationHandler.java new file mode 100644 index 0000000..00f93cc --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerExecutorNotificationHandler.java @@ -0,0 +1,52 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import javax.annotation.PostConstruct; + +/** + * In most cases a service just dispatches every incoming blackboard execution request to the blackboard server + * executor, so that the right action call back will be called. + * + *

+ * This bean will wrap a BlackboardServerExecutor as a BlackboardNotificationHandler so that it can be installed as a + * notification handler of a server.

+ * + * @author marko + * + * @param + */ +public class BlackboardServerExecutorNotificationHandler> extends AbstractBlackboardNotificationHandler { + + private BlackboardServerActionExecutor blackboardExecutor; + + @PostConstruct + public void init() { + setBlackboardHandler(blackboardExecutor.getBlackboardHandler()); + } + + @Override + protected void processJob(final BlackboardJob job) { + blackboardExecutor.execute(job); + } + + public BlackboardServerActionExecutor getBlackboardExecutor() { + return blackboardExecutor; + } + + public void setBlackboardExecutor(BlackboardServerActionExecutor blackboardExecutor) { + this.blackboardExecutor = blackboardExecutor; + } + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.AbstractBlackboardNotificationHandler#setBlackboardHandler(eu.dnetlib.enabling.tools.blackboard.BlackboardHandler) + * + * Redefined here to avoid inheriting the @Required property. It simplifies the spring config for an + * unnecessarry property, since we can get it from the blackboardExecutor. + */ + @Override + public void setBlackboardHandler(BlackboardServerHandler handler) { + super.setBlackboardHandler(handler); + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerHandler.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerHandler.java new file mode 100644 index 0000000..5782d00 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/BlackboardServerHandler.java @@ -0,0 +1,37 @@ +package eu.dnetlib.enabling.tools.blackboard; + +/** + * Helper component used to bridge the high level blackboard job handling from the low level blackboard protocol based + * on notifications and blackboard messages. + * + * @author marko + * + */ +public interface BlackboardServerHandler extends BlackboardHandler { + /** + * Sets the ongoing action status to the given job, publishing this new state. + * + * @param job + * blackboard job + */ + void ongoing(BlackboardJob job); + + /** + * Sets the "failed" action status to the given job, publishing this new state along with the error message obtained + * from the exception. + * + * @param job + * blackboard job + * @param exception + * exception which caused the failure + */ + void failed(BlackboardJob job, Throwable exception); + + /** + * Set the "done" action status to the given job, publishing the new state. + * + * @param job + * blackboard job + */ + void done(BlackboardJob job); +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/DeletingBlackboardNotificationHandler.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/DeletingBlackboardNotificationHandler.java new file mode 100644 index 0000000..707cb15 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/DeletingBlackboardNotificationHandler.java @@ -0,0 +1,70 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import javax.annotation.Resource; +import javax.xml.bind.JAXBException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import eu.dnetlib.miscutils.jaxb.JaxbFactory; + +/** + * This BB handler deletes completed (successful or unsuccessful) blackboard messages after dispatching. Job should be registered to only + * one "deleting BB handler". + * + * @author marko + * + */ +public class DeletingBlackboardNotificationHandler extends BlackboardNotificationHandler { + + /** + * Logger. + */ + private static final Log log = LogFactory.getLog(DeletingBlackboardNotificationHandler.class); // NOPMD by marko on 11/24/08 5:02 PM + + /** + * blackboard message factory. + */ + @Resource(name = "blackboardMessageFactory") + private JaxbFactory messageFactory; + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.BlackboardNotificationHandler#processJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob) + */ + @Override + protected void processJob(final BlackboardJob job) { + + if (getListeners().containsKey(job.getId()) && job.isCompleted()) { + if (log.isDebugEnabled()) { + log.debug(serializeBlackBoardMessage(job)); + } + getBlackboardHandler().delete(job); + } + super.processJob(job); + } + + /** + * Helper method, serializes a Blackboard message using a blackboardMessageFactory. + * + * @param job + * @return + */ + private String serializeBlackBoardMessage(final BlackboardJob job) { + try { + return getMessageFactory().serialize(job.getMessage()); + } catch (JAXBException e) { + return "cannot serialize blackboard message: " + e.getMessage(); + } + } + + public void setMessageFactory(final JaxbFactory messageFactory) { + this.messageFactory = messageFactory; + } + + public JaxbFactory getMessageFactory() { + return messageFactory; + } + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandler.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandler.java new file mode 100644 index 0000000..dc6fbac --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandler.java @@ -0,0 +1,22 @@ +package eu.dnetlib.enabling.tools.blackboard; + + +/** + * Each service may have a chain of NotificationHandlers, which process incoming notifications. + * + * @author marko + * + */ +public interface NotificationHandler { + /** + * Incoming notification received. + * + * @param subscrId subscriptionId + * @param topic topic + * @param rsId resource id + * @param profile resource profile + */ + void notified(String subscrId, final String topic, final String rsId, final String profile); + + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChain.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChain.java new file mode 100644 index 0000000..9661c22 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChain.java @@ -0,0 +1,14 @@ +package eu.dnetlib.enabling.tools.blackboard; + +/** + * Invokes a chain of notification handlers for each incoming notification. Normally an instance of this class is + * registered as the notification handler for a given service. + * + * @author marko + * + */ +public interface NotificationHandlerChain extends NotificationHandler { + + public void delegateNotification(final String subscrId, final String topic, final String rsId, final String profile, final NotificationHandler handler); + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainImpl.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainImpl.java new file mode 100644 index 0000000..d2a504c --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainImpl.java @@ -0,0 +1,104 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.util.Collection; +import java.util.Map; + +import javax.annotation.Resource; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Required; +import org.springframework.core.task.TaskExecutor; + +import eu.dnetlib.enabling.actions.AbstractSubscriptionAction; +import eu.dnetlib.enabling.tools.Enableable; +import eu.dnetlib.enabling.tools.EnableableEnumerator; + +public class NotificationHandlerChainImpl implements NotificationHandlerChain { + + /** + * logger. + */ + private static final Log log = LogFactory.getLog(NotificationHandlerChain.class); // NOPMD by marko on 11/24/08 5:02 PM + + /** + * notification handler chain. + */ + private Collection handlers; + + /** + * task executor used for invoking the handlers. + */ + private TaskExecutor handlerExecutor; + + @Resource + private NotificationHistory notificationHistory; + + @Resource + private EnableableEnumerator enableableEnumerator; + + /** + * {@inheritDoc} + * + * @see eu.dnetlib.enabling.tools.blackboard.NotificationHandler#notified(java.lang.String, java.lang.String, + * java.lang.String, java.lang.String) + */ + @Override + public void notified(final String subscrId, final String topic, final String rsId, final String profile) { + + for (final NotificationHandler handler : handlers) { + + try { + if (handler instanceof AbstractSubscriptionAction) + if (topic.startsWith(((AbstractSubscriptionAction) handler).getTopicPrefix())) + if (!((Enableable) handler).isEnabled()) { + for (Map.Entry entry : enableableEnumerator.getAllEnableables().entrySet()) { + if (entry.getValue() == handler) { + NotificationInfo info = new NotificationInfo(); + info.setName(entry.getKey()); + info.setProfile(profile); + info.setRsId(rsId); + info.setSubscrId(subscrId); + info.setTopic(topic); + notificationHistory.saveNotification(info); + } + } + continue; + } + + delegateNotification(subscrId, topic, rsId, profile, handler); + } catch (final RuntimeException e) { + log.fatal("error processing notification handler " + handler, e); + } + } + } + + @Override + public void delegateNotification(final String subscrId, final String topic, final String rsId, final String profile, final NotificationHandler handler) { + + handlerExecutor.execute(new Runnable() { + @Override + public void run() { + handler.notified(subscrId, topic, rsId, profile); + } + }); + } + + public Collection getHandlers() { + return handlers; + } + + @Required + public void setHandlers(final Collection handlers) { + this.handlers = handlers; + } + + public TaskExecutor getHandlerExecutor() { + return handlerExecutor; + } + + @Required + public void setHandlerExecutor(TaskExecutor handlerExecutor) { + this.handlerExecutor = handlerExecutor; + } +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHistory.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHistory.java new file mode 100644 index 0000000..9978225 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHistory.java @@ -0,0 +1,37 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.util.Map; + +public interface NotificationHistory { + + /** + * Return a saved notification associated to a bean and to a rsId. + * + * @param name + * @return + */ + public NotificationInfo obtainNotification(String bean, String rsId); + + /** + * Save a notification. + * + * @param info + */ + void saveNotification(NotificationInfo info); + + + /** + * Return all saved notifications. + * + * @return + */ + Map> obtainAllNotifications(); + + /** + * Remove a saved notifications. + * + * @return + */ + public void clearNotification(String bean, String rsId); + +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHistoryImpl.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHistoryImpl.java new file mode 100644 index 0000000..9098475 --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationHistoryImpl.java @@ -0,0 +1,39 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import java.util.HashMap; +import java.util.Map; + + +public class NotificationHistoryImpl implements NotificationHistory { + + private Map> notifications = new HashMap>(); + + @Override + public NotificationInfo obtainNotification(String bean, String rsId) { + if (!notifications.containsKey(bean)) + return null; + return notifications.get(bean).get(rsId); + } + + @Override + public void saveNotification(NotificationInfo info) { + String bean = info.getName(); + String rsId = info.getRsId(); + + if (!notifications.containsKey(bean)) + notifications.put(bean, new HashMap()); + + notifications.get(bean).put(rsId, info); + } + + @Override + public Map> obtainAllNotifications() { + return notifications; + } + + @Override + public void clearNotification(String bean, String rsId) { + if (notifications.containsKey(bean)) + notifications.get(bean).remove(rsId); + } +} diff --git a/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationInfo.java b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationInfo.java new file mode 100644 index 0000000..624a17f --- /dev/null +++ b/dnet-core-components/src/main/java/eu/dnetlib/enabling/tools/blackboard/NotificationInfo.java @@ -0,0 +1,74 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import eu.dnetlib.miscutils.datetime.DateUtils; + +public class NotificationInfo { + private String name; + private String subscrId; + private String topic; + private String rsId; + private String profile; + private String date; + + public NotificationInfo() { + this.date = DateUtils.now_ISO8601(); + } + + public NotificationInfo(String name, String subscrId, String topic, String rsId, String profile) { + super(); + this.name = name; + this.subscrId = subscrId; + this.topic = topic; + this.rsId = rsId; + this.profile = profile; + this.date = DateUtils.now_ISO8601(); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getSubscrId() { + return subscrId; + } + + public void setSubscrId(String subscrId) { + this.subscrId = subscrId; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getRsId() { + return rsId; + } + + public void setRsId(String rsId) { + this.rsId = rsId; + } + + public String getProfile() { + return profile; + } + + public void setProfile(String profile) { + this.profile = profile; + } + + public String getDate() { + return date; + } + + public void setDate(String date) { + this.date = date; + } +} diff --git a/dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/applicationContext-enableable.xml b/dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/applicationContext-enableable.xml new file mode 100644 index 0000000..b62693a --- /dev/null +++ b/dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/applicationContext-enableable.xml @@ -0,0 +1,12 @@ + + + + + + + + diff --git a/dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/applicationContext-schedulable.xml b/dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/applicationContext-schedulable.xml new file mode 100644 index 0000000..afedb18 --- /dev/null +++ b/dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/applicationContext-schedulable.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + + diff --git a/dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/blackboard/applicationContext-blackboard-tools.xml b/dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/blackboard/applicationContext-blackboard-tools.xml new file mode 100644 index 0000000..3f13985 --- /dev/null +++ b/dnet-core-components/src/main/resources/eu/dnetlib/enabling/tools/blackboard/applicationContext-blackboard-tools.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + diff --git a/dnet-core-components/src/test/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainTest.java b/dnet-core-components/src/test/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainTest.java new file mode 100644 index 0000000..cd7c7b9 --- /dev/null +++ b/dnet-core-components/src/test/java/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainTest.java @@ -0,0 +1,104 @@ +package eu.dnetlib.enabling.tools.blackboard; + +import javax.annotation.Resource; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(value = SpringJUnit4ClassRunner.class) +@ContextConfiguration +public class NotificationHandlerChainTest { + + public class RecJob implements Runnable { + + private int times; + private int num; + + public RecJob(int num, int times) { + this.num = num; + this.times = times; + } + + @Override + public void run() { + System.out.println("starting " + num); + if(times >= 0) + executor.execute(new RecJob(num + 1, times - 1)); + try { + Thread.sleep(4000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + System.out.println("thread finished - " + num); + } + + } + + private static final class Job implements Runnable { + private final int value; + + public Job(final int i) { + value = i; + } + + @Override + public void run() { + System.out.println("thread started - " + value); + try { + Thread.sleep(4000); + } catch (final InterruptedException e) { + // + } + System.out.println("thread finished - " + value); + } + } + + @Resource + private transient ThreadPoolTaskExecutor executor; + + @Test + public void testDelegateNotification() throws InterruptedException { + System.out.println(executor); + + System.out.println("executing"); + + for (int i = 0; i < 100; i++) + executor.execute(new Job(i)); + + System.out.println("executed - waiting"); + Thread.sleep(2000); + + System.out.println("active count: " + executor.getActiveCount()); + System.out.println("current pool size: " + executor.getCorePoolSize()); + System.out.println("pool size " + executor.getPoolSize()); + + Thread.sleep(3000); + + System.out.println("ok"); + } + + @Test + public void testRecursive() throws InterruptedException { + + for (int i = 0; i < 4; i++) + executor.execute(new RecJob(i * 10, 4)); + + + System.out.println("executed - waiting"); + Thread.sleep(2000); + + System.out.println("active count: " + executor.getActiveCount()); + System.out.println("current pool size: " + executor.getCorePoolSize()); + System.out.println("pool size " + executor.getPoolSize()); + + Thread.sleep(3000); + + System.out.println("ok"); + } + +} diff --git a/dnet-core-components/src/test/java/eu/dnetlib/miscutils/iterators/xml/IterableXmlParserTest.java b/dnet-core-components/src/test/java/eu/dnetlib/miscutils/iterators/xml/IterableXmlParserTest.java index 4e4e777..080888d 100644 --- a/dnet-core-components/src/test/java/eu/dnetlib/miscutils/iterators/xml/IterableXmlParserTest.java +++ b/dnet-core-components/src/test/java/eu/dnetlib/miscutils/iterators/xml/IterableXmlParserTest.java @@ -83,7 +83,7 @@ public class IterableXmlParserTest { parser = new IterableXmlParser(element, stream); int count = 0; for (String xml : parser) { - System.out.println(xml); + //System.out.println(xml); Document doc = reader.read(new StringReader(xml)); assertNotNull(doc); assertNotNull(doc.selectSingleNode("//" + element)); diff --git a/dnet-core-components/src/test/resources/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainTest-context.xml b/dnet-core-components/src/test/resources/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainTest-context.xml new file mode 100644 index 0000000..6579c11 --- /dev/null +++ b/dnet-core-components/src/test/resources/eu/dnetlib/enabling/tools/blackboard/NotificationHandlerChainTest-context.xml @@ -0,0 +1,16 @@ + + + + + + + diff --git a/pom.xml b/pom.xml index 812bddd..c58e1cb 100644 --- a/pom.xml +++ b/pom.xml @@ -269,6 +269,11 @@ spring-tx ${spring.version} + + org.springframework + spring-context-support + ${spring.version} +