is-collector/src/org/gcube/informationsystem/collector/impl/state/AggregatorRegisteredResourc...

308 lines
9.6 KiB
Java
Executable File

package org.gcube.informationsystem.collector.impl.state;
import java.util.Calendar;
import org.apache.axis.encoding.AnyContentType;
import org.apache.axis.message.MessageElement;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.axis.message.addressing.ReferencePropertiesType;
import org.gcube.common.core.utils.logging.GCUBELog;
import org.gcube.informationsystem.collector.impl.persistence.AggregatorPersistentResource;
import org.gcube.informationsystem.collector.impl.persistence.PersistentResource.RESOURCETYPE;
import org.gcube.informationsystem.collector.impl.state.AggregatorRegisteredResource;
import org.gcube.informationsystem.collector.impl.utils.*;
import org.gcube.informationsystem.collector.impl.xmlstorage.exist.State;
import org.gcube.informationsystem.collector.impl.porttypes.XMLCollectionAccess;
import org.globus.mds.aggregator.impl.AggregatorServiceGroupResource;
import org.globus.mds.aggregator.impl.AggregatorServiceGroupEntryResource;
import org.globus.mds.aggregator.impl.AggregatorSource;
import org.globus.mds.aggregator.impl.AggregatorSink;
import org.globus.wsrf.ResourceKey;
import org.globus.wsrf.ResourceProperty;
import org.globus.wsrf.ResourcePropertySet;
import org.globus.wsrf.TopicList;
import org.globus.wsrf.impl.ReflectionResourceProperty;
import org.globus.wsrf.impl.SimpleTopicList;
import org.globus.wsrf.impl.SimpleResourcePropertyMetaData;
import org.globus.wsrf.impl.servicegroup.ServiceGroupConstants;
import org.globus.wsrf.impl.servicegroup.EntryResourcePropertyTopic;
/**
* This class implements an aggregating in-memory service group resource. <br>
* For every registered AggregatorSource instance one connected AggregatorRegisteredResource
* instance is created and it is delivered with its data, following the chosen registration mode
* (Push/Pull).
*
* @author Manuele Simi (ISTI-CNR)
*
*/
public class AggregatorRegisteredResource extends AggregatorServiceGroupResource
implements AggregatorSink, ICRegisteredResource {
private static GCUBELog logger = new GCUBELog(AggregatorRegisteredResource.class);
private ResourceKey resourceKey = null;
// private static I18n i18n = I18n.getI18n(Resources.class.getName());
private static final String registryNS = "gcube/informationsystem/registry/Registry";
protected Calendar terminationTime, currentTime;
private ResourcePropertySet propSet;
private TopicList topicList;
// private String baseDir;
/**
* Builds a new resource
*/
public AggregatorRegisteredResource() {
super.init(XMLCollectionAccess.RP_SET);
// this.baseDir = getBaseDirectory();
this.propSet = this.getResourcePropertySet();
this.topicList = new SimpleTopicList(this);
ResourceProperty prop = null;
EntryResourcePropertyTopic rpTopic = new EntryResourcePropertyTopic(this.propSet.get(ServiceGroupConstants.ENTRY));
this.propSet.add(rpTopic);
this.topicList.addTopic(rpTopic);
try {
// ResourceLifeTime properties
prop = new ReflectionResourceProperty(SimpleResourcePropertyMetaData.TERMINATION_TIME, this);
this.propSet.add(prop);
prop = new ReflectionResourceProperty(SimpleResourcePropertyMetaData.CURRENT_TIME, this);
this.propSet.add(prop);
this.propSet.add(prop);
// initialize aggregator
this.loadCompatibleSources(this);
} catch (Exception e) {
logger.error(State.logPrefix + "Error during DISICResource creation: ", e);
}
}
/**
* Used to inform the resource of its key.
*
* @param k the resource key
*
* @throws Exception if the resource key is already set
*/
public void setResourceKey(ResourceKey k) throws Exception {
if (resourceKey == null)
resourceKey = k;
else
throw new Exception("Resource key can only be set once.");
}
/**
*
* @return the resource key
*/
public ResourceKey getResourceKey() {
return resourceKey;
}
// ResourceLifetime methods
/**
* Sets the termination time
*
* @param time the new termination time
*/
public void setTerminationTime(Calendar time) {
logger.debug(State.logPrefix + "Set Termination time called: " + time.getTime());
this.terminationTime = time;
}
/**
*
*
* {@inheritDoc}
*/
public Calendar getTerminationTime() {
return this.terminationTime;
}
/**
* {@inheritDoc}
*
*/
public Calendar getCurrentTime() {
return Calendar.getInstance();
}
/**
*
* {@inheritDoc}
*/
public TopicList getTopicList() {
return this.topicList;
}
// AggregatorSink methods
/**
* Takes delivery of a message from an AggregatorSource. <br>
* This is called by an AggregatorSource to
* deliver a message with the assumption that the message data will be aggregated as its native
* type and into whatever data storage format is used by the underlying AggregatorSink class.
*
* @param messageObj the message content
* @param entry the related service group entry
*
*/
public void deliver(AnyContentType messageObj, AggregatorServiceGroupEntryResource entry) {
logger.info("New delivered resource");
try {
/*
* logger.debug(State.logPrefix + "delivered message -> " +
* AnyHelper.toSingleString(messageObj));
*/
// get the message content
MsgParser aentry = new MsgParser(messageObj);
String entryType = aentry.getEntryType();
logger.debug("Entry type " + entryType);
logger.debug("Entry RunningInstance ID " + aentry.getRunningInstanceID());
logger.debug("Entry Service Name " + aentry.getServiceName());
logger.debug("Entry Service Class " + aentry.getServiceClass());
// logger.debug(State.logPrefix + "getEntryAsString " +
// aentry.getEntryAsString());
// MessageElement[] message = messageObj.get_any();
// extract the entry EPR
EntryEPRParser eprparser = new EntryEPRParser(entry.getEntryEPR());
logger.debug("aggregator EntryEPR-> " + entry.getEntryEPR().toString());
// extract the member EPR
EndpointReferenceType memberEpr = entry.getMemberEPR();
logger.debug("aggregator MemberEPR-> " + memberEpr.toString());
// get RP set from entry
// ResourcePropertySet rpSet = entry.getResourcePropertySet();
// get content RP from entry
/*
* ResourceProperty contentRP = rpSet.get(ServiceGroupConstants.CONTENT);
*
* AggregatorContent content = entry.getContent();
*
* AggregatorConfig config = content.getAggregatorConfig();
*
* MessageElement[] any = config.get_any();
*/
// Build the new resource to store
logger.debug("Storing the new delivered resource");
AggregatorPersistentResource res = new AggregatorPersistentResource();
res.setData(aentry.getEntryAsString());
res.setEntryKey(eprparser.getEntryKey());
res.setGroupKey(eprparser.getGroupKey());
res.setTerminationTime(entry.getTerminationTime());
// select the resource type
if (memberEpr.getAddress().toString().endsWith(AggregatorRegisteredResource.registryNS)) {
res.setType(RESOURCETYPE.Profile);
} else {
res.setType(RESOURCETYPE.Properties);
}
// set the EPR and Key of the remote WS-Resource that publish the
// resource
res.setSource(memberEpr.getAddress().toString());
logger.debug("Source: " + memberEpr.getAddress().toString());
try {
ReferencePropertiesType prop = memberEpr.getProperties();
if (prop != null) {
MessageElement[] any = prop.get_any();
if (any.length > 0) {
res.setSourceKey(any[0].getValue());
res.setCompleteSourceKey(any[0].toString());
}
}
} catch (java.lang.NullPointerException npe) {
// nothing to do, the source key does not exist (may be the publisher is a singleton
// or stateless service)
}
synchronized (State.deletedResources) {
if (State.deletedResources.contains(res)) {
State.deletedResources.remove(res);
throw new Exception("the resource " + res.getID() + " is no longer available");
}
}
logger.trace("Resource: " + res.toString());
// store/update the new resource
State.storage_manager.storeResource(res);
aentry.dispose();
logger.info("Delivered resource stored with success");
} catch (Exception e) {
logger.error("When managing aggregator content:" + e.getMessage());
logger.error("returned exception ", e);
}
}
/**
* Called to initialize any necessary state.
*
* @param parameters any initialization parameters (not used)
*/
public void initialize(Object parameters) {
// NO OP
}
/**
* Sets the AggregatorSource connected to this sink
*
* @param source the source
*
*/
public void setSource(AggregatorSource source) {
// NO OP
}
/**
* Gets the AggregatorSource connected to this sink
*
* @return always null
*/
public AggregatorSource getSource() {
return null;
}
/**
* Called to free resources used by the sink.
*
*/
public void terminate() {
logger.debug("Resource terminated");
}
// RemoveCallback methods
// Notifies that the resource was removed
/**
* Callback method invoked when the resource is removed by the Aggregator Framework
*/
public void remove() {
logger.debug("Resource removed");
}
}