imported cnr-notifications-common and cnr-blackboard-common

master
Claudio Atzori 5 years ago
parent 11c100f627
commit ec964c0c47

@ -161,6 +161,10 @@
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<!-- CXF -->

@ -0,0 +1,71 @@
package eu.dnetlib.enabling.actions;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Required;
import eu.dnetlib.enabling.actions.SubscriptionAction;
import eu.dnetlib.enabling.tools.Enableable;
/**
* Common subscription action.
*
* @author marko
*
*/
public abstract class AbstractSubscriptionAction implements SubscriptionAction, Enableable {
/**
* topic expression.
*/
private String topicExpression;
/**
* topic prefix.
*/
private String topicPrefix;
/**
* true if enabled.
*/
private boolean enabled = true;
/**
* init the default topic prefix.
*/
@PostConstruct
public void init() {
if (topicPrefix == null)
topicPrefix = getTopicExpression().replace("/", ".");
}
@Override
public String getTopicExpression() {
return topicExpression;
}
@Required
public void setTopicExpression(final String topicExpression) {
this.topicExpression = topicExpression;
}
@Override
public String getTopicPrefix() {
return topicPrefix;
}
public void setTopicPrefix(final String topicPrefix) {
this.topicPrefix = topicPrefix;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}

@ -0,0 +1,25 @@
package eu.dnetlib.enabling.actions;
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler;
/**
* Manager subscription action.
*
* @author marko
*
*/
public interface SubscriptionAction extends NotificationHandler {
/**
* topic expression associated with this subscription action.
*
* @return SN topic expression
*/
String getTopicExpression();
/**
* react on this topic prefix. Might be shorter than the topic expression.
*
* @return SN topic prefix
*/
String getTopicPrefix();
}

@ -0,0 +1,191 @@
package eu.dnetlib.enabling.tools;
import java.text.ParseException;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.quartz.*;
import org.quartz.Trigger.TriggerState;
import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.annotation.Required;
/**
* Common implementation for schedulable beans.
*
* @author claudio
*
*/
public abstract class AbstractSchedulable implements Schedulable, Job, BeanNameAware {
private static final String THIS = "this";
private final static String GROUP = "schedulableJobs";
private boolean enabled;
private CronExpression cronExpression;
private String beanName;
@Resource(name = "dnetJobScheduler")
private Scheduler jobScheduler;
@PostConstruct
protected void init() {
try {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put(THIS, this);
JobDetailImpl jd = new JobDetailImpl();
jd.setName(getBeanName());
jd.setGroup(GROUP);
jd.setJobDataMap(jobDataMap);
jd.setJobClass(this.getClass());
jobScheduler.scheduleJob(jd, createTrigger());
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
private Trigger createTrigger() {
try {
CronTriggerImpl trigger = new CronTriggerImpl(getBeanName(), GROUP, getCronExpression());
trigger.setMisfireInstruction(Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
trigger.setJobGroup(GROUP);
trigger.setJobName(getBeanName());
return trigger;
} catch (ParseException e) {
throw new IllegalArgumentException("invalid cron expression: " + cronExpression, e);
}
}
protected abstract void doExecute();
@Override
public void execute() {
// bean represents the quartz instance of this object
if (isEnabled()) {
doExecute();
}
}
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
AbstractSchedulable bean = (AbstractSchedulable) context.getJobDetail().getJobDataMap().get(THIS);
// bean represents the quartz instance of this object
if (bean.isEnabled()) {
bean.doExecute();
}
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
@Override
public String getCronExpression() {
return cronExpression.getCronExpression();
}
@Required
public void setCronExpression(String cronExpression) {
try {
this.cronExpression = new CronExpression(cronExpression);
} catch (ParseException e) {
throw new IllegalArgumentException("invalid cron expression: " + cronExpression, e);
}
}
@Override
public void updateCronExpression(String cronExpression) {
if (!cronExpression.equals(getCronExpression())) {
setCronExpression(cronExpression);
try {
final TriggerKey tk = new TriggerKey(getBeanName());
jobScheduler.rescheduleJob(tk, createTrigger());
} catch (SchedulerException e) {
throw new RuntimeException("unable to reschedule trigger", e);
}
}
}
@Override
public String getNextFireTime() {
try {
if (isPaused()) {
return "";
}
if (!isEnabled()) {
return "";
}
final TriggerKey tk = new TriggerKey(getBeanName());
final Trigger t = jobScheduler.getTrigger(tk);
return t != null ? t.getNextFireTime().toString() : "";
} catch (SchedulerException e) {
throw new RuntimeException("unable to get trigger", e);
}
}
@Override
public boolean isPaused() {
try {
final TriggerKey tk = new TriggerKey(getBeanName());
final TriggerState state = jobScheduler.getTriggerState(tk);
switch (state) {
case PAUSED:
case NONE:
case ERROR:
return true;
default:
return false;
}
} catch (SchedulerException e) {
throw new RuntimeException("unable to get trigger", e);
}
}
@Override
public void pause() {
try {
final TriggerKey tk = new TriggerKey(getBeanName());
jobScheduler.pauseTrigger(tk);
} catch (SchedulerException e) {
throw new RuntimeException("unable to pause trigger", e);
}
}
@Override
public void resume() {
try {
final TriggerKey tk = new TriggerKey(getBeanName());
jobScheduler.resumeTrigger(tk);
} catch (SchedulerException e) {
throw new RuntimeException("unable to resume trigger", e);
}
}
public String getBeanName() {
return beanName;
}
@Override
@Required
public void setBeanName(String beanName) {
this.beanName = beanName;
}
}

@ -0,0 +1,25 @@
package eu.dnetlib.enabling.tools;
/**
* Simple interface for classes, which's objects can be enabled or disabled.
*
*
* @author marko
*
*/
public interface Enableable {
/**
* True if this object enabled.
*
* @return if this object is currently enabled.
*/
boolean isEnabled();
/**
* Enables of disables this object.
*
* @param enabled
* enabled
*/
void setEnabled(boolean enabled);
}

@ -0,0 +1,57 @@
package eu.dnetlib.enabling.tools;
import java.util.Map;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
/**
* This bean has to live in the same bean factory where all enableables live. Thus it cannot be put inside the EnableablController, which
* lives in the webContext.
*
* @author marko
*
*/
public class EnableableEnumerator implements BeanFactoryAware {
/**
* bean factory.
*/
private ListableBeanFactory beanFactory;
/**
* Get all beans implementing the Enableable interface.
*
* @return
*/
public Map<String, Enableable> getAllEnableables() {
return beanFactory.getBeansOfType(Enableable.class);
}
@Override
public void setBeanFactory(final BeanFactory beanFactory) throws BeansException {
this.beanFactory = (ListableBeanFactory) beanFactory;
}
public ListableBeanFactory getBeanFactory() {
return beanFactory;
}
/**
* Get given enableable or null.
*
* @param name
* @return
*/
public Enableable getEnableable(final String name) {
try {
return beanFactory.getBean(name, Enableable.class);
} catch (final NoSuchBeanDefinitionException e) {
return null;
}
}
}

@ -0,0 +1,52 @@
package eu.dnetlib.enabling.tools;
/**
* Simple interface for classes, which's objects actions can be scheduled.
*
* @author claudio
*/
public interface Schedulable extends Enableable {
/**
* Updates the cron expression.
*
* @param cronExpression
*/
void updateCronExpression(String cronExpression);
/**
* Getter for the cron expression.
*
* @return the String representing the cron expression.
*/
String getCronExpression();
/**
* Computes the next time the doExecute method will be invoked.
*
* @return the String representing the next fire time.
*/
String getNextFireTime();
/**
* True if the object has been paused.
*
* @return if the bean has been paused.
*/
boolean isPaused();
/**
* Prevents the bean for being scheduled for future invocations.
*/
void pause();
/**
* Resumes the bean for being scheduled for future invocations.
*/
void resume();
/**
* Must be implemented to perform the real action.
*/
void execute();
}

@ -0,0 +1,57 @@
package eu.dnetlib.enabling.tools;
import java.util.Map;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
/**
* This bean has to live in the same bean factory where all schedulables live. Thus it cannot be put inside the EnableablController, which
* lives in the webContext.
*
* @author claudio
*
*/
public class SchedulableEnumerator implements BeanFactoryAware {
/**
* bean factory.
*/
private ListableBeanFactory beanFactory;
/**
* Get all beans implementing the Schedulable interface.
*
* @return
*/
public Map<String, Schedulable> getAllSchedulables() {
return beanFactory.getBeansOfType(Schedulable.class);
}
@Override
public void setBeanFactory(final BeanFactory beanFactory) throws BeansException {
this.beanFactory = (ListableBeanFactory) beanFactory;
}
public ListableBeanFactory getBeanFactory() {
return beanFactory;
}
/**
* Get given schedulable or null.
*
* @param name
* @return
*/
public Schedulable getSchedulable(final String name) {
try {
return beanFactory.getBean(name, Schedulable.class);
} catch (final NoSuchBeanDefinitionException e) {
return null;
}
}
}

@ -0,0 +1,48 @@
package eu.dnetlib.enabling.tools;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
/**
* Schedulable implementation of an xUpdate.
*
* @author claudio
*
*/
public class SchedulableProfileUpdater extends AbstractSchedulable {
private static final Log log = LogFactory.getLog(SchedulableProfileUpdater.class); // NOPMD by marko on 11/24/08 5:02 PM
private String xUpdate;
@Resource
private UniqueServiceLocator serviceLocator;
@Override
protected void doExecute() {
try {
log.info("triggering scheduled reindex: " + getxUpdate());
serviceLocator.getService(ISRegistryService.class).executeXUpdate(getxUpdate());
} catch (ISRegistryException e) {
throw new RuntimeException(e);
}
}
public String getxUpdate() {
return xUpdate;
}
@Required
public void setxUpdate(String xUpdate) {
this.xUpdate = xUpdate;
}
}

@ -0,0 +1,62 @@
package eu.dnetlib.enabling.tools.blackboard;
/**
* Utility abstract class which dispatches to commodity onSomething() methods, one for each interesting
* blackboard job state.
*
* @author marko
*
*/
public abstract class AbstractBlackboardJobListener implements BlackboardJobListener {
/**
* {@inheritDoc}
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardJobListener#processJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
*/
@Override
public void processJob(final BlackboardJob job) {
if (job.getActionStatus() == ActionStatus.DONE)
onDone(job);
else if (job.getActionStatus() == ActionStatus.FAILED)
onFailed(job);
else if (job.getActionStatus() == ActionStatus.ASSIGNED)
onAssigned(job);
else if (job.getActionStatus() == ActionStatus.ONGOING)
onOngoing(job);
}
/**
* Called when the job enters the ASSIGNED state.
*
* @param job job
*/
protected void onAssigned(final BlackboardJob job) { // NOPMD
// default no operation
// TODO: increase job expiry time
}
/**
* Called when the job enters the ONGOING state.
*
* @param job job
*/
protected void onOngoing(final BlackboardJob job) { // NOPMD
// default no operation
// TODO: increase job expiry time
}
/**
* Called when the job finishes in the FAILED state.
*
* @param job job
*/
protected abstract void onFailed(BlackboardJob job);
/**
* Called when the job finishes in the DONE state.
*
* @param job job
*/
protected abstract void onDone(BlackboardJob job);
}

@ -0,0 +1,96 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPathExpressionException;
import org.springframework.beans.factory.annotation.Required;
import org.xml.sax.SAXException;
import eu.dnetlib.enabling.tools.Enableable;
import eu.dnetlib.enabling.tools.StringOpaqueResource;
/**
* Common blackboard notification handler. This notification handler processes only message with ...BODY.BLACKBOARD.LAST* as topic.
*
* @param <T>
* type of blackboard handler used to extract the blackboard message (client or server)
* @author marko
*
*/
public abstract class AbstractBlackboardNotificationHandler<T extends BlackboardHandler> implements NotificationHandler, Enableable {
/**
* blackboard handler.
*/
private T blackboardHandler;
/**
* true if enabled.
*/
private boolean enabled = true;
/**
* Executor handles the notified request in a dedicated thread and allows to return immediately.
*/
private Executor executor = Executors.newCachedThreadPool();
/**
* {@inheritDoc}
*
* @see eu.dnetlib.data.mdstore.NotificationHandler#notified(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
*/
@Override
public void notified(final String subscrId, final String topic, final String rsId, final String profile) {
if (!topic.contains("BODY.BLACKBOARD.LAST")) return;
executor.execute(new Runnable() {
@Override
public void run() {
try {
processJob(blackboardHandler.getJob(new StringOpaqueResource(profile)));
} catch (final XPathExpressionException e) {
throw new IllegalStateException(e);
} catch (final SAXException e) {
throw new IllegalStateException(e);
} catch (final IOException e) {
throw new IllegalStateException(e);
} catch (final ParserConfigurationException e) {
throw new IllegalStateException(e);
}
}
});
}
/**
* Subclassess override this to process incoming blackboard jobs.
*
* @param job
* blackboard job
*/
protected abstract void processJob(BlackboardJob job);
public T getBlackboardHandler() {
return blackboardHandler;
}
@Required
public void setBlackboardHandler(final T blackboardHandler) {
this.blackboardHandler = blackboardHandler;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public void setEnabled(final boolean enabled) {
this.enabled = enabled;
}
}

@ -0,0 +1,26 @@
package eu.dnetlib.enabling.tools.blackboard;
/**
* Blackboard action status.
*
* @author marko
*
*/
public enum ActionStatus {
/**
* The job/action is assigned but not jet taken into started execution.
*/
ASSIGNED,
/**
* The job/action is ongoing.
*/
ONGOING,
/**
* The job/action is completed successfully.
*/
DONE,
/**
* The job/action is completed with failure.
*/
FAILED
}

@ -0,0 +1,32 @@
package eu.dnetlib.enabling.tools.blackboard;
/**
* Helpers for the blackboard protocol client.
*
* @author marko
*
*/
public interface BlackboardClientHandler extends BlackboardHandler {
/**
* Create a new job.
*
* @param serviceId service identifier
* @return newly created blackboard job
*/
BlackboardJob newJob(String serviceId);
/**
* Assign a blackboard job to a service.
*
* @param job blackboard job to send
*/
void assign(BlackboardJob job);
/**
* The client can delete the job after it has reached a final state
* or the job timeout has expired.
*
* @param job blackboard job to delete.
*/
void delete(BlackboardJob job);
}

@ -0,0 +1,155 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.util.Map;
import javax.xml.bind.JAXBException;
import javax.xml.transform.dom.DOMSource;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.springframework.beans.factory.annotation.Required;
import org.w3c.dom.Element;
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.enabling.tools.OpaqueResource;
import eu.dnetlib.enabling.tools.UniqueIdentifierGenerator;
import eu.dnetlib.enabling.tools.UniqueIdentifierGeneratorImpl;
import eu.dnetlib.miscutils.jaxb.JaxbFactory;
/**
* Blackboard client.
*
* @author marko
*
*/
public class BlackboardClientHandlerImpl implements BlackboardClientHandler {
/**
* blackboard message factory.
*/
private JaxbFactory<BlackboardMessage> messageFactory;
/**
* service locator.
*/
private UniqueServiceLocator serviceLocator;
/**
* generates blackboard message identifiers for new jobs.
*/
private UniqueIdentifierGenerator uuidGenerator = new UniqueIdentifierGeneratorImpl("bb-");
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler#assign(eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
*/
@Override
public void assign(final BlackboardJob job) {
checkJob(job);
try {
serviceLocator.getService(ISRegistryService.class).addBlackBoardMessage(job.getServiceId(), job.getId(),
messageFactory.serialize(job.getMessage()));
} catch (final ISRegistryException e) {
throw new IllegalStateException("cannot register blackboard message", e);
} catch (final JAXBException e) {
throw new IllegalArgumentException("cannot serialize blackboard message", e);
}
}
/**
* Check that the job has sane values.
*
* @param job
*/
protected void checkJob(final BlackboardJob job) {
for (Map.Entry<String, String> param : job.getParameters().entrySet()) {
if (param.getValue() == null) { throw new IllegalStateException("job parameter value cannot be null: " + param.getKey()); }
}
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler#delete(eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
*/
@Override
public void delete(final BlackboardJob job) {
try {
serviceLocator.getService(ISRegistryService.class).deleteBlackBoardMessage(job.getServiceId(), job.getId());
} catch (final ISRegistryException e) {
throw new IllegalStateException("cannot delete blackboard message", e);
}
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler#newJob()
*/
@Override
public BlackboardJob newJob(final String serviceId) {
final BlackboardJob job = new BlackboardJobImpl(serviceId, messageFactory.newInstance());
job.setActionStatus(ActionStatus.ASSIGNED);
job.getParameters().put("id", "");
job.getParameters().put("error", "");
job.setId(uuidGenerator.generateIdentifier());
return job;
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardHandler#getJob(eu.dnetlib.enabling.tools.OpaqueResource)
*/
@Override
public BlackboardJob getJob(final OpaqueResource profile) {
final XPath xpa = XPathFactory.newInstance().newXPath();
try {
final Element source = (Element) xpa.evaluate("/RESOURCE_PROFILE/BODY/BLACKBOARD/MESSAGE[@id = /RESOURCE_PROFILE/BODY/BLACKBOARD/LAST_RESPONSE]",
profile.asDom(), XPathConstants.NODE);
if (source == null) { throw new IllegalStateException("cannot find last blackboard message in the service profile"); }
return new BlackboardJobImpl(profile.getResourceId(), messageFactory.parse(new DOMSource(source)));
} catch (final JAXBException e) {
throw new IllegalStateException("cannot parse blackboard message", e);
} catch (final XPathExpressionException e) {
throw new IllegalStateException("cannot find last blackboard message in the service profile", e);
}
}
public JaxbFactory<BlackboardMessage> getMessageFactory() {
return messageFactory;
}
public void setMessageFactory(final JaxbFactory<BlackboardMessage> messageFactory) {
this.messageFactory = messageFactory;
}
public UniqueIdentifierGenerator getUuidGenerator() {
return uuidGenerator;
}
public void setUuidGenerator(final UniqueIdentifierGenerator uuidGenerator) {
this.uuidGenerator = uuidGenerator;
}
public UniqueServiceLocator getServiceLocator() {
return serviceLocator;
}
@Required
public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
this.serviceLocator = serviceLocator;
}
}

@ -0,0 +1,20 @@
package eu.dnetlib.enabling.tools.blackboard;
import eu.dnetlib.enabling.tools.OpaqueResource;
/**
* Basic blackboard handler.
*
* @author marko
*
*/
public interface BlackboardHandler {
/**
* Get the current job from, as notified in the service profile.
*
* @param profile
* service profile
* @return notified blackboard job
*/
BlackboardJob getJob(OpaqueResource profile);
}

@ -0,0 +1,135 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.io.PrintWriter;
import java.io.StringWriter;
import javax.xml.bind.JAXBException;
import javax.xml.transform.dom.DOMSource;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.springframework.beans.factory.annotation.Required;
import org.w3c.dom.Element;
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.enabling.tools.OpaqueResource;
import eu.dnetlib.miscutils.jaxb.JaxbFactory;
/**
* Blackboard handler implementation.
*
* @author marko
*
*/
public class BlackboardHandlerImpl implements BlackboardServerHandler {
/**
* blackboard message factory.
*/
private JaxbFactory<BlackboardMessage> messageFactory;
/**
* service locator.
*/
private UniqueServiceLocator serviceLocator;
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardHandler#getJob(eu.dnetlib.enabling.tools.OpaqueResource)
*/
@Override
public BlackboardJob getJob(final OpaqueResource profile) {
final XPath xpa = XPathFactory.newInstance().newXPath();
try {
final Element source = (Element) xpa.evaluate("/RESOURCE_PROFILE/BODY/BLACKBOARD/MESSAGE[@id = /RESOURCE_PROFILE/BODY/BLACKBOARD/LAST_REQUEST]",
profile.asDom(), XPathConstants.NODE);
if (source == null) { throw new IllegalStateException("cannot find last blackboard message in the service profile"); }
return new BlackboardJobImpl(profile.getResourceId(), messageFactory.parse(new DOMSource(source)));
} catch (final JAXBException e) {
throw new IllegalStateException("cannot parse blackboard message", e);
} catch (final XPathExpressionException e) {
throw new IllegalStateException("cannot find last blackboard message in the service profile", e);
}
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardHandler#done(eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
*/
@Override
public void done(final BlackboardJob job) {
job.setActionStatus(ActionStatus.DONE);
replyJob(job);
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardHandler#failed(eu.dnetlib.enabling.tools.blackboard.BlackboardJob,
* java.lang.Exception)
*/
@Override
public void failed(final BlackboardJob job, final Throwable exception) {
job.setActionStatus(ActionStatus.FAILED);
final StringWriter stackTrace = new StringWriter();
exception.printStackTrace(new PrintWriter(stackTrace));
job.getParameters().put("error", exception.toString());
job.getParameters().put("errorDetails", stackTrace.toString());
replyJob(job);
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardHandler#ongoing(eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
*/
@Override
public void ongoing(final BlackboardJob job) {
job.setActionStatus(ActionStatus.ONGOING);
replyJob(job);
}
/**
* Internal helper method which replies a blackboard job.
*
* @param job
* blackboard job
*/
protected void replyJob(final BlackboardJob job) {
try {
serviceLocator.getService(ISRegistryService.class).replyBlackBoardMessage(job.getServiceId(), messageFactory.serialize(job.getMessage()));
} catch (final ISRegistryException e) {
throw new IllegalStateException("cannot reply the blackboard message", e);
} catch (final JAXBException e) {
throw new IllegalArgumentException("cannot serialize blackboard message", e);
}
}
public JaxbFactory<BlackboardMessage> getMessageFactory() {
return messageFactory;
}
public void setMessageFactory(final JaxbFactory<BlackboardMessage> messageFactory) {
this.messageFactory = messageFactory;
}
public UniqueServiceLocator getServiceLocator() {
return serviceLocator;
}
@Required
public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
this.serviceLocator = serviceLocator;
}
}

@ -0,0 +1,118 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.util.Map;
/**
* High level representation of a blackboard job.
*
* @author marko
*
*/
public interface BlackboardJob {
/**
* Get the message id.
*
* @return id
*/
String getId();
/**
* Set message id.
*
* @param identifier
* id
*/
void setId(String identifier);
/**
* Get the action name.
*
* @return action name
*/
String getAction();
/**
* Set the action name.
*
* @param action
* action name
*/
void setAction(String action);
/**
* Get the action status.
*
* @return action status
*/
ActionStatus getActionStatus();
/**
* Set the action status.
*
* @param actionStatus
* action status
*/
void setActionStatus(ActionStatus actionStatus);
/**
* Get the message date.
*
* @return date
*/
String getDate();
/**
* Set the message date.
*
* @param date
* date
*/
void setDate(String date);
/**
* obtains a mutable parameter map (key/value).
*
* @return mutable parameter map
*/
Map<String, String> getParameters();
/**
* Get the error message, if the actionStatus is FAILED. Usually the error message is also present in the parameter map but it's
* preferred to use this method.
*
* @return error message
*/
String getError();
/**
* Set the error message, if the actionStatus is FAILED. Usually the error message is also present in the parameter map but it's
* preferred to use this method.
*
* @param error
* error message
*/
void setError(String error);
/**
* Get the setviceId.
*
* @return the serviceId
*/
String getServiceId();
/**
* True if the action status is one of the termination statuses (e.g. done and failed).
*
* @return true if completed
*/
boolean isCompleted();
/**
* Get the bb message.
*
* @return the bb message
*/
BlackboardMessage getMessage();
}

@ -0,0 +1,163 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
/**
* High level representation of a blackboard job.
*
* @author marko
*
*/
public class BlackboardJobImpl implements BlackboardJob {
/**
* underlying low level blackboard message.
*/
private BlackboardMessage message;
/**
* service identifier.
*/
private String serviceId;
/**
* parameters.
*/
private final transient Map<String, String> parameters = new HashMap<String, String>();
/**
* Construct a new blackboard job from a blackboard message.
*
* @param serviceId
* service identifier
* @param message
* underlying low-level blackboard message
*/
public BlackboardJobImpl(final String serviceId, final BlackboardMessage message) {
super();
this.message = message;
this.serviceId = serviceId;
for (final BlackboardParameter param : message.getParameters()) {
parameters.put(param.getName(), param.getValue());
}
}
@Override
public String getId() {
return message.getId();
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardJob#setId(java.lang.String)
*/
@Override
public void setId(final String identifier) {
message.setId(identifier);
}
@Override
public String getAction() {
return message.getAction();
}
@Override
public ActionStatus getActionStatus() {
return message.getActionStatus();
}
@Override
public String getDate() {
return message.getDate();
}
@Override
public String getError() {
return getParameters().get("error");
}
@Override
public Map<String, String> getParameters() {
return parameters;
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardJob#setAction(java.lang.String)
*/
@Override
public void setAction(final String action) {
message.setAction(action);
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardJob#setActionStatus(eu.dnetlib.enabling.tools.blackboard.ActionStatus)
*/
@Override
public void setActionStatus(final ActionStatus actionStatus) {
message.setActionStatus(actionStatus);
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardJob#setDate(java.lang.String)
*/
@Override
public void setDate(final String date) {
message.setDate(date);
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardJob#setError(java.lang.String)
*/
@Override
public void setError(final String error) {
getParameters().put("error", error);
}
/**
* returns the blackboard message, potentially modified.
*
* @return underlying blackboard message
*/
@Override
public BlackboardMessage getMessage() {
message.getParameters().clear();
for (final Entry<String, String> entry : getParameters().entrySet()) {
final BlackboardParameterImpl param = new BlackboardParameterImpl(); // NOPMD
param.setName(entry.getKey());
param.setValue(entry.getValue());
message.getParameters().add(param);
}
return message;
}
public void setMessage(final BlackboardMessage message) {
this.message = message;
}
@Override
public String getServiceId() {
return serviceId;
}
public void setServiceId(final String serviceId) {
this.serviceId = serviceId;
}
@Override
public boolean isCompleted() {
return getActionStatus() == ActionStatus.DONE || getActionStatus() == ActionStatus.FAILED;
}
}

@ -0,0 +1,18 @@
package eu.dnetlib.enabling.tools.blackboard;
/**
* Implement this interface in order to receive notifications for specific blackboard messages.
*
* @author marko
*
*/
public interface BlackboardJobListener {
/**
* process the given job.
*
* @param job job to process
*/
void processJob(BlackboardJob job);
}

@ -0,0 +1,19 @@
package eu.dnetlib.enabling.tools.blackboard;
/**
* Registers BlackboardJobListeners to be called when related incoming blackboard job notifications arrive.
* @author marko
*
*/
public interface BlackboardJobRegistry {
/**
* Registers a new job listener. It will be notified when the job changes state. The listener will be unregistered
* when the job arrives to a final state (DONE, or FAILED).
*
* @param job
* job
* @param listener
* job listener
*/
void registerJobListener(BlackboardJob job, BlackboardJobListener listener);
}

@ -0,0 +1,82 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.util.List;
/**
* Implementor of this interface represent blackboard messages nodes as the appear in the service profile blackboard.
*
* This is the low-level representation of the blackboard message. The BlackboardJob is a higher level representation of a blackboard message,
* and a BlackboardHandler helps converting between the two.
*
* Implementors of this interface should simply provide a serialization so that the message can be stored in the service profile.
*
* @author marko
*
*/
public interface BlackboardMessage {
/**
* blackboard message date.
*
* @return date
*/
String getDate();
/**
* sets the blackboard message date/timestamp.
*
* @param date
* date
*/
void setDate(String date);
/**
* Blackboard message identifier.
*
* @return identifier
*/
String getId();
/**
* sets the blackboard message identifier.
*
* @param id identifier
*/
void setId(String id); // NOPMD
/**
* Blackboard message action name.
*
* @return action
*/
String getAction();
/**
* Sets the blackboard message action name.
*
* @param action
* action
*/
void setAction(String action);
/**
* Blackboard message parameter list.
*
* @return parameter list
*/
List<BlackboardParameter> getParameters();
/**
* Status of the action associated with this message.
*
* @return action status
*/
ActionStatus getActionStatus();
/**
* sets the action status.
*
* @param actionStatus
* action status
*/
void setActionStatus(ActionStatus actionStatus);
}

@ -0,0 +1,142 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.util.ArrayList;
import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
/**
* standard serialization of the blackboard message.
*
* @author marko
*
*/
@XmlRootElement(namespace = "", name = "MESSAGE")
@XmlAccessorType(XmlAccessType.NONE)
public class BlackboardMessageImpl implements BlackboardMessage {
/**
* hash seed.
*/
private static final int HASH_SEED_2 = 63;
/**
* hash seed.
*/
private static final int HASH_SEED = 13;
/**
* blackboard message timestamp.
*/
@XmlAttribute
private String date;
/**
* blackboard message identifier.
*/
@XmlAttribute
private String id; // NOPMD
/**
* blackboard message action name.
*/
@XmlElement(name = "ACTION", required = true)
private String action;
/**
* blackboard message parameters (key/value).
*/
@XmlElement(name = "PARAMETER", type = BlackboardParameterImpl.class)
private List<BlackboardParameter> parameters = new ArrayList<BlackboardParameter>();
/**
* the status of the action described by this message.
*/
@XmlElement(name = "ACTION_STATUS", required = true)
private ActionStatus actionStatus;
/**
* {@inheritDoc}
*
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(final Object obj) {
if (!(obj instanceof BlackboardMessage))
return false;
if (this == obj)
return true;
final BlackboardMessage rhs = (BlackboardMessage) obj;
return new EqualsBuilder().append(id, rhs.getId()).append(date, rhs.getDate()).append(action, rhs.getAction()).append(actionStatus,
rhs.getActionStatus()).append(parameters, rhs.getParameters()).isEquals();
}
/**
* {@inheritDoc}
*
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
return new HashCodeBuilder(HASH_SEED, HASH_SEED_2).append(id).append(date).append(action).append(actionStatus).append(parameters).toHashCode();
}
@Override
public String getDate() {
return date;
}
@Override
public void setDate(final String date) {
this.date = date;
}
@Override
public String getId() {
return id;
}
@Override
public void setId(final String id) { // NOPMD
this.id = id;
}
@Override
public String getAction() {
return action;
}
@Override
public void setAction(final String action) {
this.action = action;
}
@Override
public List<BlackboardParameter> getParameters() {
return parameters;
}
public void setParameters(final List<BlackboardParameter> parameters) {
this.parameters = parameters;
}
@Override
public ActionStatus getActionStatus() {
return actionStatus;
}
@Override
public void setActionStatus(final ActionStatus actionStatus) {
this.actionStatus = actionStatus;
}
}

@ -0,0 +1,59 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.util.HashMap;
import java.util.Map;
/**
* This notification handler dispatches incoming notifications to registered blackboard message listeners. Usually used
* by blackboard notification clients, since they are usually interested only in the messages they have sent and not in
* messages sent by other BB clients. Moreover it's an useful hook for dispatching orchestration.
*
* @param <T>
* blackboard handler (client or server)
* @author marko
*
*/
public class BlackboardNotificationHandler<T extends BlackboardHandler> extends AbstractBlackboardNotificationHandler<T> implements BlackboardJobRegistry {
/**
* blackboard message listeners.
*/
private Map<String, BlackboardJobListener> listeners = new HashMap<String, BlackboardJobListener>();
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.AbstractBlackboardNotificationHandler#processJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
*/
@Override
protected void processJob(final BlackboardJob job) {
final BlackboardJobListener listener = listeners.get(job.getId());
if (listener != null) {
if (job.isCompleted()) {
listeners.remove(job.getId());
}
listener.processJob(job);
}
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardJobRegistry#registerJobListener(eu.dnetlib.enabling.tools.blackboard.BlackboardJob,
* eu.dnetlib.enabling.tools.blackboard.BlackboardJobListener)
*/
@Override
public void registerJobListener(final BlackboardJob job, final BlackboardJobListener listener) {
listeners.put(job.getId(), listener);
}
public Map<String, BlackboardJobListener> getListeners() {
return listeners;
}
public void setListeners(final Map<String, BlackboardJobListener> listeners) {
this.listeners = listeners;
}
}

@ -0,0 +1,36 @@
package eu.dnetlib.enabling.tools.blackboard;
/**
* Blackboard parameter used in a BlackboardMessage.
*
* @author marko
*
*/
public interface BlackboardParameter {
/**
* parameter name (key).
* @return name
*/
String getName();
/**
* setter.
*
* @param name name
*/
void setName(String name);
/**
* parameter value.
* @return value
*/
String getValue();
/**
* setter.
*
* @param value value
*/
void setValue(String value);
}

@ -0,0 +1,87 @@
package eu.dnetlib.enabling.tools.blackboard;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
/**
* Blackboard parameter.
*
* @author marko
*
*/
@XmlRootElement(name = "PARAMETER")
public class BlackboardParameterImpl implements BlackboardParameter {
/**
* hash seed.
*/
private static final int HASH_SEED_2 = 59;
/**
* hash seed.
*/
private static final int HASH_SEED = 35;
/**
* parameter name.
*/
private String name;
/**
* parameter value.
*/
private String value;
/**
* {@inheritDoc}
*
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(final Object obj) {
if (!(obj instanceof BlackboardParameter))
return false;
if (this == obj)
return true;
final BlackboardParameter rhs = (BlackboardParameter) obj;
return new EqualsBuilder().append(name, rhs.getName()).append(value, rhs.getValue()).isEquals();
}
/**
* {@inheritDoc}
*
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
return new HashCodeBuilder(HASH_SEED, HASH_SEED_2).append(name).append(value).toHashCode();
}
@Override
@XmlAttribute
public String getName() {
return name;
}
@Override
public void setName(final String name) {
this.name = name;
}
@Override
@XmlAttribute
public String getValue() {
return value;
}
@Override
public void setValue(final String value) {
this.value = value;
}
}

@ -0,0 +1,24 @@
package eu.dnetlib.enabling.tools.blackboard;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
/**
* Callback interface for serverside blackboard actions.
*
* <p>
* Actions should use the 'handler' methods to set the 'ongoing' or 'done' status.
* </p>
*
* <p>
* failed status is automatically set upon catching an exception thrown by this interface, so feel free to simply throw
* whatever you want
* </p>
*
* @author marko
*
* @param <X>
*/
public interface BlackboardServerAction<X extends Enum<?>> {
void execute(BlackboardServerHandler handler, BlackboardJob job) throws Exception;
}

@ -0,0 +1,119 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;
/**
* A BlackboardServerActionExecutor dispatches the execution of server side blackboard actions.
*
* <p>
* The primary responsibility of this class is to organize the actions, dispatch the action when called, and transform
* exceptions into blackboard 'failures'.
* </p>
*
* <p>
* The action executor is synchronous. It's responsibility of the actual action implementations to put the jobs in
* background, if needed.
* </p>
*
* @author marko
*
* @param <X>
*/
public class BlackboardServerActionExecutor<X extends Enum<X>> {
private static final Log log = LogFactory.getLog(BlackboardServerActionExecutor.class); // NOPMD by marko on 11/24/08 5:02 PM
/**
* set by spring, so key has to be string
*/
private Map<String, BlackboardServerAction<X>> actionMap;
/**
* used to set bb msg status as done, failed etc.
*/
private BlackboardServerHandler blackboardHandler;
/**
* used to exploit static enum declaration for runtime checks of allowed types.
*/
private Class<X> actionType;
/**
* if true, the executor will allow for an incomplete implementation of actions. If false, it an exception will be
* thrown if not all actions are implemented, according to the actionType enum.
*/
private boolean incomplete = false;
@PostConstruct
public void check() {
for (String key : actionMap.keySet())
Enum.valueOf(actionType, key);
if (!incomplete)
for (X en : actionType.getEnumConstants())
if (actionMap.get(en.toString()) == null)
throw new IllegalArgumentException("action " + en + " not declared in action map. Action is mandatory");
}
/**
* Executes a blackboard job, watching for
*
* @param job
*/
public void execute(final BlackboardJob job) {
try {
final X label = Enum.valueOf(actionType, job.getAction());
final BlackboardServerAction<X> action = actionMap.get(label.toString());
if (action != null) {
action.execute(blackboardHandler, job);
} else {
log.warn("Cannot find action handler for blackboard action: " + label);
throw new IllegalArgumentException("Cannot find action handler for blackboard action: " + label);
}
} catch (final Throwable e) {
log.warn("got some exception during blackboard handler execution", e);
blackboardHandler.failed(job, e);
}
}
public BlackboardServerHandler getBlackboardHandler() {
return blackboardHandler;
}
@Required
public void setBlackboardHandler(final BlackboardServerHandler blackboardHandler) {
this.blackboardHandler = blackboardHandler;
}
public Class<X> getActionType() {
return actionType;
}
@Required
public void setActionType(final Class<X> actionType) {
this.actionType = actionType;
}
public Map<String, BlackboardServerAction<X>> getActionMap() {
return actionMap;
}
public void setActionMap(Map<String, BlackboardServerAction<X>> actionMap) {
this.actionMap = actionMap;
}
public boolean isIncomplete() {
return incomplete;
}
public void setIncomplete(boolean incomplete) {
this.incomplete = incomplete;
}
}

@ -0,0 +1,52 @@
package eu.dnetlib.enabling.tools.blackboard;
import javax.annotation.PostConstruct;
/**
* In most cases a service just dispatches every incoming blackboard execution request to the blackboard server
* executor, so that the right action call back will be called.
*
* <p>
* This bean will wrap a BlackboardServerExecutor as a BlackboardNotificationHandler so that it can be installed as a
* notification handler of a server.</p>
*
* @author marko
*
* @param <T>
*/
public class BlackboardServerExecutorNotificationHandler<T extends Enum<T>> extends AbstractBlackboardNotificationHandler<BlackboardServerHandler> {
private BlackboardServerActionExecutor<T> blackboardExecutor;
@PostConstruct
public void init() {
setBlackboardHandler(blackboardExecutor.getBlackboardHandler());
}
@Override
protected void processJob(final BlackboardJob job) {
blackboardExecutor.execute(job);
}
public BlackboardServerActionExecutor<T> getBlackboardExecutor() {
return blackboardExecutor;
}
public void setBlackboardExecutor(BlackboardServerActionExecutor<T> blackboardExecutor) {
this.blackboardExecutor = blackboardExecutor;
}
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.AbstractBlackboardNotificationHandler#setBlackboardHandler(eu.dnetlib.enabling.tools.blackboard.BlackboardHandler)
*
* Redefined here to avoid inheriting the @Required property. It simplifies the spring config for an
* unnecessarry property, since we can get it from the blackboardExecutor.
*/
@Override
public void setBlackboardHandler(BlackboardServerHandler handler) {
super.setBlackboardHandler(handler);
}
}

@ -0,0 +1,37 @@
package eu.dnetlib.enabling.tools.blackboard;
/**
* Helper component used to bridge the high level blackboard job handling from the low level blackboard protocol based
* on notifications and blackboard messages.
*
* @author marko
*
*/
public interface BlackboardServerHandler extends BlackboardHandler {
/**
* Sets the ongoing action status to the given job, publishing this new state.
*
* @param job
* blackboard job
*/
void ongoing(BlackboardJob job);
/**
* Sets the "failed" action status to the given job, publishing this new state along with the error message obtained
* from the exception.
*
* @param job
* blackboard job
* @param exception
* exception which caused the failure
*/
void failed(BlackboardJob job, Throwable exception);
/**
* Set the "done" action status to the given job, publishing the new state.
*
* @param job
* blackboard job
*/
void done(BlackboardJob job);
}

@ -0,0 +1,70 @@
package eu.dnetlib.enabling.tools.blackboard;
import javax.annotation.Resource;
import javax.xml.bind.JAXBException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.miscutils.jaxb.JaxbFactory;
/**
* This BB handler deletes completed (successful or unsuccessful) blackboard messages after dispatching. Job should be registered to only
* one "deleting BB handler".
*
* @author marko
*
*/
public class DeletingBlackboardNotificationHandler extends BlackboardNotificationHandler<BlackboardClientHandler> {
/**
* Logger.
*/
private static final Log log = LogFactory.getLog(DeletingBlackboardNotificationHandler.class); // NOPMD by marko on 11/24/08 5:02 PM
/**
* blackboard message factory.
*/
@Resource(name = "blackboardMessageFactory")
private JaxbFactory<BlackboardMessage> messageFactory;
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.BlackboardNotificationHandler#processJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
*/
@Override
protected void processJob(final BlackboardJob job) {
if (getListeners().containsKey(job.getId()) && job.isCompleted()) {
if (log.isDebugEnabled()) {
log.debug(serializeBlackBoardMessage(job));
}
getBlackboardHandler().delete(job);
}
super.processJob(job);
}
/**
* Helper method, serializes a Blackboard message using a blackboardMessageFactory.
*
* @param job
* @return
*/
private String serializeBlackBoardMessage(final BlackboardJob job) {
try {
return getMessageFactory().serialize(job.getMessage());
} catch (JAXBException e) {
return "cannot serialize blackboard message: " + e.getMessage();
}
}
public void setMessageFactory(final JaxbFactory<BlackboardMessage> messageFactory) {
this.messageFactory = messageFactory;
}
public JaxbFactory<BlackboardMessage> getMessageFactory() {
return messageFactory;
}
}

@ -0,0 +1,22 @@
package eu.dnetlib.enabling.tools.blackboard;
/**
* Each service may have a chain of NotificationHandlers, which process incoming notifications.
*
* @author marko
*
*/
public interface NotificationHandler {
/**
* Incoming notification received.
*
* @param subscrId subscriptionId
* @param topic topic
* @param rsId resource id
* @param profile resource profile
*/
void notified(String subscrId, final String topic, final String rsId, final String profile);
}

@ -0,0 +1,14 @@
package eu.dnetlib.enabling.tools.blackboard;
/**
* Invokes a chain of notification handlers for each incoming notification. Normally an instance of this class is
* registered as the notification handler for a given service.
*
* @author marko
*
*/
public interface NotificationHandlerChain extends NotificationHandler {
public void delegateNotification(final String subscrId, final String topic, final String rsId, final String profile, final NotificationHandler handler);
}

@ -0,0 +1,104 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.core.task.TaskExecutor;
import eu.dnetlib.enabling.actions.AbstractSubscriptionAction;
import eu.dnetlib.enabling.tools.Enableable;
import eu.dnetlib.enabling.tools.EnableableEnumerator;
public class NotificationHandlerChainImpl implements NotificationHandlerChain {
/**
* logger.
*/
private static final Log log = LogFactory.getLog(NotificationHandlerChain.class); // NOPMD by marko on 11/24/08 5:02 PM
/**
* notification handler chain.
*/
private Collection<NotificationHandler> handlers;
/**
* task executor used for invoking the handlers.
*/
private TaskExecutor handlerExecutor;
@Resource
private NotificationHistory notificationHistory;
@Resource
private EnableableEnumerator enableableEnumerator;
/**
* {@inheritDoc}
*
* @see eu.dnetlib.enabling.tools.blackboard.NotificationHandler#notified(java.lang.String, java.lang.String,
* java.lang.String, java.lang.String)
*/
@Override
public void notified(final String subscrId, final String topic, final String rsId, final String profile) {
for (final NotificationHandler handler : handlers) {
try {
if (handler instanceof AbstractSubscriptionAction)
if (topic.startsWith(((AbstractSubscriptionAction) handler).getTopicPrefix()))
if (!((Enableable) handler).isEnabled()) {
for (Map.Entry<String, Enableable> entry : enableableEnumerator.getAllEnableables().entrySet()) {
if (entry.getValue() == handler) {
NotificationInfo info = new NotificationInfo();
info.setName(entry.getKey());
info.setProfile(profile);
info.setRsId(rsId);
info.setSubscrId(subscrId);
info.setTopic(topic);
notificationHistory.saveNotification(info);
}
}
continue;
}
delegateNotification(subscrId, topic, rsId, profile, handler);
} catch (final RuntimeException e) {
log.fatal("error processing notification handler " + handler, e);
}
}
}
@Override
public void delegateNotification(final String subscrId, final String topic, final String rsId, final String profile, final NotificationHandler handler) {
handlerExecutor.execute(new Runnable() {
@Override
public void run() {
handler.notified(subscrId, topic, rsId, profile);
}
});
}
public Collection<NotificationHandler> getHandlers() {
return handlers;
}
@Required
public void setHandlers(final Collection<NotificationHandler> handlers) {
this.handlers = handlers;
}
public TaskExecutor getHandlerExecutor() {
return handlerExecutor;
}
@Required
public void setHandlerExecutor(TaskExecutor handlerExecutor) {
this.handlerExecutor = handlerExecutor;
}
}

@ -0,0 +1,37 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.util.Map;
public interface NotificationHistory {
/**
* Return a saved notification associated to a bean and to a rsId.
*
* @param name
* @return
*/
public NotificationInfo obtainNotification(String bean, String rsId);
/**
* Save a notification.
*
* @param info
*/
void saveNotification(NotificationInfo info);
/**
* Return all saved notifications.
*
* @return
*/
Map<String, Map<String, NotificationInfo>> obtainAllNotifications();
/**
* Remove a saved notifications.
*
* @return
*/
public void clearNotification(String bean, String rsId);
}

@ -0,0 +1,39 @@
package eu.dnetlib.enabling.tools.blackboard;
import java.util.HashMap;
import java.util.Map;
public class NotificationHistoryImpl implements NotificationHistory {
private Map<String, Map<String, NotificationInfo>> notifications = new HashMap<String, Map<String, NotificationInfo>>();
@Override
public NotificationInfo obtainNotification(String bean, String rsId) {
if (!notifications.containsKey(bean))
return null;
return notifications.get(bean).get(rsId);
}
@Override
public void saveNotification(NotificationInfo info) {
String bean = info.getName();
String rsId = info.getRsId();
if (!notifications.containsKey(bean))
notifications.put(bean, new HashMap<String, NotificationInfo>());
notifications.get(bean).put(rsId, info);
}
@Override
public Map<String, Map<String, NotificationInfo>> obtainAllNotifications() {
return notifications;
}
@Override
public void clearNotification(String bean, String rsId) {
if (notifications.containsKey(bean))
notifications.get(bean).remove(rsId);
}
}

@ -0,0 +1,74 @@
package eu.dnetlib.enabling.tools.blackboard;
import eu.dnetlib.miscutils.datetime.DateUtils;
public class NotificationInfo {
private String name;
private String subscrId;
private String topic;
private String rsId;
private String profile;
private String date;
public NotificationInfo() {
this.date = DateUtils.now_ISO8601();
}
public NotificationInfo(String name, String subscrId, String topic, String rsId, String profile) {
super();
this.name = name;
this.subscrId = subscrId;
this.topic = topic;
this.rsId = rsId;
this.profile = profile;
this.date = DateUtils.now_ISO8601();
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSubscrId() {
return subscrId;
}
public void setSubscrId(String subscrId) {
this.subscrId = subscrId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getRsId() {
return rsId;
}
public void setRsId(String rsId) {
this.rsId = rsId;
}
public String getProfile() {
return profile;
}
public void setProfile(String profile) {
this.profile = profile;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
}

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="enableableEnumerator"
class="eu.dnetlib.enabling.tools.EnableableEnumerator" />
<bean id="notificationHistory"
class="eu.dnetlib.enabling.tools.blackboard.NotificationHistoryImpl" />
</beans>

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jaxws="http://cxf.apache.org/jaxws"
xmlns:sec="http://cxf.apache.org/configuration/security" xmlns:wsa="http://cxf.apache.org/ws/addressing"
xmlns:p="http://www.springframework.org/schema/p" xmlns:http="http://cxf.apache.org/transports/http/configuration"
xmlns:t="http://dnetlib.eu/springbeans/t" xmlns:template="http://dnetlib.eu/springbeans/template"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://cxf.apache.org/ws/addressing http://cxf.apache.org/schemas/ws-addr-conf.xsd
http://cxf.apache.org/configuration/security http://cxf.apache.org/schemas/configuration/security.xsd
http://cxf.apache.org/transports/http/configuration http://cxf.apache.org/schemas/configuration/http-conf.xsd
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
http://dnetlib.eu/springbeans/template http://dnetlib.eu/springbeans/template.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd">
<bean id="dnetJobScheduler"
class="org.springframework.scheduling.quartz.SchedulerFactoryBean"
destroy-method="destroy">
<property name="jobFactory">
<bean id="jobSchedulerFactory"
class="org.springframework.scheduling.quartz.SpringBeanJobFactory" />
</property>
</bean>
<bean id="schedulableEnumerator"
class="eu.dnetlib.enabling.tools.SchedulableEnumerator" />
</beans>

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="blackboardMessageFactory" class="eu.dnetlib.miscutils.jaxb.JaxbFactory">
<constructor-arg
value="eu.dnetlib.enabling.tools.blackboard.BlackboardMessageImpl" />
</bean>
<bean id="blackboardHandler"
class="eu.dnetlib.enabling.tools.blackboard.BlackboardHandlerImpl"
p:messageFactory-ref="blackboardMessageFactory" p:serviceLocator-ref="uniqueServiceLocator" />
<bean id="blackboardClientHandler"
class="eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandlerImpl"
p:messageFactory-ref="blackboardMessageFactory" p:serviceLocator-ref="uniqueServiceLocator" />
</beans>

@ -0,0 +1,104 @@
package eu.dnetlib.enabling.tools.blackboard;
import javax.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(value = SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class NotificationHandlerChainTest {
public class RecJob implements Runnable {
private int times;
private int num;
public RecJob(int num, int times) {
this.num = num;
this.times = times;
}
@Override
public void run() {
System.out.println("starting " + num);
if(times >= 0)
executor.execute(new RecJob(num + 1, times - 1));
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("thread finished - " + num);
}
}
private static final class Job implements Runnable {
private final int value;
public Job(final int i) {
value = i;
}
@Override
public void run() {
System.out.println("thread started - " + value);
try {
Thread.sleep(4000);
} catch (final InterruptedException e) {
//
}
System.out.println("thread finished - " + value);
}
}
@Resource
private transient ThreadPoolTaskExecutor executor;
@Test
public void testDelegateNotification() throws InterruptedException {
System.out.println(executor);
System.out.println("executing");
for (int i = 0; i < 100; i++)
executor.execute(new Job(i));
System.out.println("executed - waiting");
Thread.sleep(2000);
System.out.println("active count: " + executor.getActiveCount());
System.out.println("current pool size: " + executor.getCorePoolSize());
System.out.println("pool size " + executor.getPoolSize());
Thread.sleep(3000);
System.out.println("ok");
}
@Test
public void testRecursive() throws InterruptedException {
for (int i = 0; i < 4; i++)
executor.execute(new RecJob(i * 10, 4));
System.out.println("executed - waiting");
Thread.sleep(2000);
System.out.println("active count: " + executor.getActiveCount());
System.out.println("current pool size: " + executor.getCorePoolSize());
System.out.println("pool size " + executor.getPoolSize());
Thread.sleep(3000);
System.out.println("ok");
}
}

@ -83,7 +83,7 @@ public class IterableXmlParserTest {
parser = new IterableXmlParser(element, stream);
int count = 0;
for (String xml : parser) {
System.out.println(xml);
//System.out.println(xml);
Document doc = reader.read(new StringReader(xml));
assertNotNull(doc);
assertNotNull(doc.selectSingleNode("//" + element));

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:lang="http://www.springframework.org/schema/lang"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.5.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd">
<bean id="msroNotificationExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
p:corePoolSize="32" p:keepAliveSeconds="3600" p:queueCapacity="0" />
</beans>

@ -269,6 +269,11 @@
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- CXF -->

Loading…
Cancel
Save