Manuele Simi 2009-09-07 21:18:30 +00:00
parent 131c9b3c35
commit dcdb16c16f
10 changed files with 243 additions and 111 deletions

View File

@ -13,10 +13,10 @@
<environment name="maxBackups" value="10" type="java.lang.String"
override="false" />
<environment name="scheduledBackupInHours" value="2"
<environment name="scheduledBackupInHours" value="12"
type="java.lang.String" override="false" />
<environment name="sweeperIntervalInMillis" value="120000"
<environment name="sweeperIntervalInMillis" value="240000"
type="java.lang.String" override="false" />
<environment name="registrationURI" value="http://...."
@ -25,12 +25,12 @@
<environment name="resourceExpirationTimeInMillis" value="600000"
type="java.lang.String" override="false" />
<environment name="deleteRPsOnStartup" value="false"
<environment name="deleteRPsOnStartup" value="true"
type="java.lang.String" override="false" />
<environment name="startScopes" value="/CNRPrivate"
<!-- <environment name="startScopes" value=""
type="java.lang.String" override="false" />
-->
</service>
<global>

View File

@ -64,19 +64,28 @@ public class AggregatorPersistentResource extends PersistentResource {
/**
* Builds a new empty DISPersinstentresource
* @param key
* @param source
* @throws Exception if it was impossible to extract the ID from the resource
*
*/
public AggregatorPersistentResource() {
public AggregatorPersistentResource(String source, String key, String data, RESOURCETYPE type) throws Exception {
// defatult termination time is now
Calendar cal = new GregorianCalendar();
cal.setTimeZone(TimeZone.getTimeZone("GMT"));
this.setTerminationTime(cal);
this.source = source;
this.sourceKey = key;
this.data = data;
this.type = type;
lastUpdateTime = new GregorianCalendar();
lastUpdateTime.setTimeZone(TimeZone.getTimeZone("GMT"));
this.buildID();
logger.debug("Resource ID: " + this.resourceID);
}
/**
* Builds a DISPersinstentresource starting from an eXist resource
*
@ -91,17 +100,40 @@ public class AggregatorPersistentResource extends PersistentResource {
this.originalSource = resource;
this.resource_string = resource.getContent().toString();
this.parseResource();
logger.debug("Resource ID: " + this.resourceID);
} catch (XMLDBException dbe) {
throw new Exception("invalid resource");
}
}
private void buildID() throws Exception {
if ((this.type != null) && (type == RESOURCETYPE.Profile)) { //it's a gCube profile
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
StringReader reader = new StringReader(this.toXML());
InputSource source = new InputSource(reader);
Document internalDOM = builder.parse(source);
// gets the type
XPath path = XPathFactory.newInstance().newXPath();
this.profile_type = path.evaluate("/Document/Data/child::*[local-name()='Profile']/Resource/Type", internalDOM);
// uses the GCUBEResource ID as local resource ID
this.resourceID = path.evaluate("/Document/Data/child::*[local-name()='Profile']/Resource/ID", internalDOM);
} catch (Exception e) {
logger.error("Unable to extract the ID from the resource " + e.getMessage());
throw e;
}
} else { //it's a RP doc
//we need to replace the schema and colons in order to make it an ID accepted by eXist
this.resourceID = this.source.replace("http://", "").replace(":", "") + "-" + this.sourceKey;//this.getGroupKey() + this.getEntryKey();
}
}
/**
* Sets the content of the resource
*
* @param data
* the new content
*
* @param data the new content
*/
public void setData(String data) {
this.data = data;
@ -110,11 +142,9 @@ public class AggregatorPersistentResource extends PersistentResource {
/**
* Sets the content of the resource using the content of a file
*
* @param f
* the file to use as content source
* @param f the file to use as content source
*
* @throws IOException
* if the access to the given file fails
* @throws IOException if the access to the given file fails
*
*/
public void setData(File f) throws IOException {
@ -151,11 +181,9 @@ public class AggregatorPersistentResource extends PersistentResource {
* @return the ID
*/
public String getID() {
// create a unique ID unless the resource contains a profile
if (this.resourceID == null)
this.resourceID = this.getGroupKey() + this.getEntryKey();
//if (this.resourceID == null)
// this.resourceID = this.source + this.sourceKey;//this.getGroupKey() + this.getEntryKey();
return this.resourceID;
}
@ -240,8 +268,7 @@ public class AggregatorPersistentResource extends PersistentResource {
/**
* Sets the complete source key
*
* @param completeKey
* the new complete key
* @param completeKey the new complete key
*/
public void setCompleteSourceKey(String completeKey) {
this.completeSourceKey = completeKey;

View File

@ -3,13 +3,9 @@ package org.gcube.informationsystem.collector.impl.state;
import java.util.Calendar;
import org.apache.axis.encoding.AnyContentType;
import org.apache.axis.message.MessageElement;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.axis.message.addressing.ReferencePropertiesType;
import org.gcube.common.core.utils.logging.GCUBELog;
import org.gcube.informationsystem.collector.impl.persistence.AggregatorPersistentResource;
import org.gcube.informationsystem.collector.impl.persistence.PersistentResource.RESOURCETYPE;
import org.gcube.informationsystem.collector.impl.state.AggregatorRegisteredResource;
import org.gcube.informationsystem.collector.impl.utils.*;
import org.gcube.informationsystem.collector.impl.xmlstorage.exist.State;
@ -47,9 +43,7 @@ public class AggregatorRegisteredResource extends AggregatorServiceGroupResource
private ResourceKey resourceKey = null;
// private static I18n i18n = I18n.getI18n(Resources.class.getName());
private static final String registryNS = "gcube/informationsystem/registry/Registry";
// private static I18n i18n = I18n.getI18n(Resources.class.getName());
protected Calendar terminationTime, currentTime;
@ -179,66 +173,34 @@ public class AggregatorRegisteredResource extends AggregatorServiceGroupResource
logger.debug("Entry RunningInstance ID " + aentry.getRunningInstanceID());
logger.debug("Entry Service Name " + aentry.getServiceName());
logger.debug("Entry Service Class " + aentry.getServiceClass());
// logger.debug(State.logPrefix + "getEntryAsString " +
// aentry.getEntryAsString());
// MessageElement[] message = messageObj.get_any();
// extract the entry EPR
EntryEPRParser eprparser = new EntryEPRParser(entry.getEntryEPR());
logger.debug("aggregator EntryEPR-> " + entry.getEntryEPR().toString());
// extract the member EPR
EndpointReferenceType memberEpr = entry.getMemberEPR();
logger.debug("aggregator MemberEPR-> " + memberEpr.toString());
// get RP set from entry
// ResourcePropertySet rpSet = entry.getResourcePropertySet();
// get content RP from entry
/*
* ResourceProperty contentRP = rpSet.get(ServiceGroupConstants.CONTENT);
*
* AggregatorContent content = entry.getContent();
*
* AggregatorConfig config = content.getAggregatorConfig();
*
* MessageElement[] any = config.get_any();
*/
EntryParser entryparser = new EntryParser(entry);
EntryEPRParser sinkparser = entryparser.getEPRSinkParser();
logger.debug("Aggregator Source " + entryparser.getSource());
logger.debug("Aggregator Sink " + entryparser.getSink());
// Build the new resource to store
logger.debug("Storing the new delivered resource");
AggregatorPersistentResource res = new AggregatorPersistentResource();
AggregatorPersistentResource res = new AggregatorPersistentResource(entryparser.getSourceURI(), entryparser.getSourceKey(), aentry.getEntryAsString(), entryparser.getType());
res.setData(aentry.getEntryAsString());
res.setEntryKey(eprparser.getEntryKey());
res.setGroupKey(eprparser.getGroupKey());
res.setEntryKey(sinkparser.getEntryKey());
res.setGroupKey(sinkparser.getGroupKey());
res.setTerminationTime(entry.getTerminationTime());
// select the resource type
if (memberEpr.getAddress().toString().endsWith(AggregatorRegisteredResource.registryNS)) {
res.setType(RESOURCETYPE.Profile);
} else {
res.setType(RESOURCETYPE.Properties);
}
// set the EPR and Key of the remote WS-Resource that publish the
// resource
res.setSource(memberEpr.getAddress().toString());
logger.debug("Source: " + memberEpr.getAddress().toString());
try {
ReferencePropertiesType prop = memberEpr.getProperties();
if (prop != null) {
MessageElement[] any = prop.get_any();
if (any.length > 0) {
res.setSourceKey(any[0].getValue());
res.setCompleteSourceKey(any[0].toString());
}
}
} catch (java.lang.NullPointerException npe) {
// nothing to do, the source key does not exist (may be the publisher is a singleton
// or stateless service)
}
//res.setType(entryparser.getType());
res.setSource(entryparser.getSourceURI());
res.setSourceKey(entryparser.getSourceKey());
logger.debug("Qualified Source Key: " + entryparser.getQualifiedSourceKey());
res.setCompleteSourceKey(entryparser.getQualifiedSourceKey());
//if the resource is in the to-be-removed list, raise an exception
synchronized (State.deletedResources) {
if (State.deletedResources.contains(res)) {
State.deletedResources.remove(res);

View File

@ -2,9 +2,10 @@ package org.gcube.informationsystem.collector.impl.state;
import org.gcube.common.core.utils.logging.GCUBELog;
import org.gcube.informationsystem.collector.impl.persistence.AggregatorPersistentResource;
import org.gcube.informationsystem.collector.impl.persistence.PersistentResource.RESOURCETYPE;
import org.gcube.informationsystem.collector.impl.utils.EntryEPRParser;
import org.gcube.informationsystem.collector.impl.utils.EntryParser;
import org.gcube.informationsystem.collector.impl.xmlstorage.exist.State;
import org.globus.mds.aggregator.impl.AggregatorServiceGroupEntryRemovedCallback;
import org.globus.mds.aggregator.impl.AggregatorServiceGroupEntryResource;
@ -33,21 +34,26 @@ public class ICAggregatorRemoveCallback implements AggregatorServiceGroupEntryRe
* Removes from the storage the supplied resource
*
* @param entry the AggregatorServiceGroupEntryResource that is about to be removed
* @throws Exception
* if the delete operation fails
* @throws Exception if the delete operation fails
*/
public void remove(AggregatorServiceGroupEntryResource entry) throws Exception {
logger.debug("ICAggregatorRemoveCallback invoked " + entry.getEntryEPR().toString());
EntryEPRParser parser = new EntryEPRParser(entry.getEntryEPR());
AggregatorPersistentResource res = new AggregatorPersistentResource();
EntryParser eparser = new EntryParser(entry);
logger.debug("Aggregator Source " + eparser.getSource());
logger.debug("Aggregator Sink " + eparser.getSink());
EntryEPRParser parser = eparser.getEPRSinkParser();
AggregatorPersistentResource res = new AggregatorPersistentResource(eparser.getSourceURI(), eparser.getSourceKey(), "", RESOURCETYPE.Properties);
res.setEntryKey(parser.getEntryKey());
res.setGroupKey(parser.getGroupKey());
res.setSourceKey(eparser.getSourceKey());
res.setSource(eparser.getSourceURI());
// mark the resource as no longer available
synchronized (State.deletedResources) {
State.deletedResources.add(res);
State.getDeletedResources().add(res);
}
// delete the resource from the database

View File

@ -0,0 +1,125 @@
package org.gcube.informationsystem.collector.impl.utils;
import org.apache.axis.message.MessageElement;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.axis.message.addressing.ReferencePropertiesType;
import org.globus.mds.aggregator.impl.AggregatorServiceGroupEntryResource;
import org.gcube.informationsystem.collector.impl.persistence.PersistentResource.RESOURCETYPE;
/**
* Parser for {@link AggregatorServiceGroupEntryResource}
*
* @author Manuele Simi (ISTI-CNR)
*
*/
public class EntryParser {
private AggregatorServiceGroupEntryResource entry = null;
private static final String registryNS = "gcube/informationsystem/registry/Registry";
public EntryParser(AggregatorServiceGroupEntryResource entry) {
this.entry = entry;
}
public EndpointReferenceType getSource() {
return entry.getMemberEPR();
}
public EndpointReferenceType getSink() {
return entry.getEntryEPR();
}
/**
* @return the source key or an empty string if it does not exist
*/
public String getSourceKey() {
String key = "";
EndpointReferenceType memberEpr = entry.getMemberEPR();
try {
ReferencePropertiesType prop = memberEpr.getProperties();
if (prop != null) {
MessageElement[] any = prop.get_any();
if (any.length > 0)
key = any[0].getValue();
}
} catch (java.lang.NullPointerException npe) {
// nothing to do, the source key does not exist (may be the publisher is a singleton
// or stateless service)
}
return key;
}
/**
* @return the fully qualified source key or an empty string if it does not exist
*/
public String getQualifiedSourceKey() {
String key = "";
EndpointReferenceType memberEpr = entry.getMemberEPR();
try {
ReferencePropertiesType prop = memberEpr.getProperties();
if (prop != null) {
MessageElement[] any = prop.get_any();
if (any.length > 0)
key = any[0].toString();
}
} catch (java.lang.NullPointerException npe) {
// nothing to do, the source key does not exist (may be the publisher is a singleton
// or stateless service)
}
return key;
}
/**
*
* @return the source URI, i.e. the URI from which the resource has been registered
*/
public String getSourceURI() {
EndpointReferenceType memberEpr = entry.getMemberEPR();
return memberEpr.getAddress().toString();
}
/**
*
* @return the {@link RESOURCETYPE}
*/
public RESOURCETYPE getType() {
EndpointReferenceType memberEpr = entry.getMemberEPR();
if (memberEpr.getAddress().toString().endsWith(registryNS)) {
return RESOURCETYPE.Profile;
} else {
return RESOURCETYPE.Properties;
}
}
public void getRPSet() {
// get RP set from entry
// ResourcePropertySet rpSet = entry.getResourcePropertySet();
// get content RP from entry
/*
* ResourceProperty contentRP = rpSet.get(ServiceGroupConstants.CONTENT);
*
* AggregatorContent content = entry.getContent();
*
* AggregatorConfig config = content.getAggregatorConfig();
*
* MessageElement[] any = config.get_any();
*/
}
/**
*
* @return a parser for the Sink EPR
* @throws Exception
*/
public EntryEPRParser getEPRSinkParser() throws Exception {
return new EntryEPRParser(entry.getEntryEPR());
}
}

View File

@ -2,12 +2,9 @@ package org.gcube.informationsystem.collector.impl.xmlstorage.exist;
import java.io.IOException;
import java.util.Properties;
import javax.xml.parsers.ParserConfigurationException;
import org.exist.backup.Restore;
import org.exist.storage.BrokerPool;
import org.exist.storage.ConsistencyCheckTask;
import org.xml.sax.SAXException;
import org.xmldb.api.base.XMLDBException;
import org.gcube.common.core.utils.logging.GCUBELog;
import org.gcube.informationsystem.collector.impl.contexts.ICServiceContext;
@ -63,12 +60,8 @@ public class DataManager extends XMLStorageManager {
}
/**
* Restore from the latest backup
* Restores from the latest backup
* @throws IOException
* @throws XMLDBException
* @throws SAXException
* @throws ParserConfigurationException
* @throws Exception
*/
public synchronized void restore() throws IOException {
@ -77,8 +70,13 @@ public class DataManager extends XMLStorageManager {
try {
ExistBackupFolder lastBackup = this.getLastBackup();
logger.info("Restoring from " + lastBackup.getBackupFile());
Restore restore = new Restore("admin", "admin","admin", lastBackup.getBackupFile(), URI);
restore.restore(false, null);
Restore restore = new Restore(USER, PWD, PWD, lastBackup.getBackupFile(), URI);
restore.restore(false, null);
if (Boolean.valueOf((String) ICServiceContext.getContext().getProperty("deleteRPsOnStartup", true))) {
// cleanup the RPs collection
logger.info("deleting all RPs...");
this.deleteAllProperties();
}
logger.info("Restore completed");
} catch (Exception e1) {
logger.fatal("Failed to restore", e1);

View File

@ -51,22 +51,22 @@ public class State {
* if the intialization fails
*/
public static void initialize() throws Exception {
logger.info("starting IC service initialization...");
logger.info("Starting IC service initialization...");
State.initializeDataManager();
State.initializeQueryManager();
if (Boolean.valueOf((String) ICServiceContext.getContext().getProperty("deleteRPsOnStartup", true))) {
// cleanup the RPs collection
logger.info("deleting all RPs...");
logger.info("Deleting all RPs...");
State.dataManager.deleteAllProperties();
} else {
logger.info("all RPs previously stored are keept in the storage");
logger.info("All RPs previously stored are kept in the storage");
}
logger.info("Initialising the sweeper...");
// start the sweeper to periodically cleanup the storage and some data structures
if (State.sweeperT == null) {
Sweeper sweeper = new Sweeper(Long.valueOf((String) ICServiceContext.getContext().getProperty("sweeperIntervalInMillis", true)), Long.valueOf((String) ICServiceContext.getContext()
.getProperty("resourceExpirationTimeInMillis", true)));
Sweeper sweeper = new Sweeper(Long.valueOf((String) ICServiceContext.getContext().getProperty("sweeperIntervalInMillis", true)),
Long.valueOf((String) ICServiceContext.getContext().getProperty("resourceExpirationTimeInMillis", true)));
State.sweeperT = new Thread(sweeper);
State.sweeperT.start();
}
@ -108,10 +108,14 @@ public class State {
*/
public static void dispose() throws Exception {
logger.info("Disposing IC service's resources...");
State.sweeperT.interrupt();
State.sweeperT = null;
State.schedulerT.interrupt();
State.schedulerT = null;
if (State.sweeperT != null) {
State.sweeperT.interrupt();
State.sweeperT = null;
}
if (State.schedulerT == null) {
State.schedulerT.interrupt();
State.schedulerT = null;
}
State.dataManager.shutdown();
State.queryManager.shutdown();
@ -132,6 +136,13 @@ public class State {
}
/**
* @return the deletedResources
*/
public static List<AggregatorPersistentResource> getDeletedResources() {
return deletedResources;
}
/**
* Prints the enviromnet variables
*/

View File

@ -30,7 +30,7 @@ public class Sweeper implements Runnable {
private static long resourceExpirationTime = 1800000; // default value
private static GCUBELog logger = new GCUBELog(Sweeper.class.getName());
private static GCUBELog logger = new GCUBELog(Sweeper.class);
private static XMLStorageManager storage = null;
@ -77,12 +77,10 @@ public class Sweeper implements Runnable {
now.setTimeZone(TimeZone.getTimeZone("GMT"));
try {
String[] ids = Sweeper.storage.listAllPropertiesIDs();
for (String id : ids) {
try {
PersistentResource res = Sweeper.storage
.retrievePropertyResourceFromID(id);
PersistentResource res = State.getDataManager().retrievePropertyResourceFromID(id);
if (now.getTimeInMillis() - res.getLastUpdateTimeinMills() > Sweeper.resourceExpirationTime)
// removes the resources from the database
State.getDataManager().retrieveAndDeleteResourceFromID(id);

View File

@ -38,6 +38,10 @@ import java.util.concurrent.locks.ReentrantLock;
public class XMLStorageManager {
protected static String URI = "xmldb:exist://";
protected static String USER = "admin";
protected static String PWD = "";
protected static String driver = "org.exist.xmldb.DatabaseImpl";
@ -101,13 +105,13 @@ public class XMLStorageManager {
DatabaseManager.registerDatabase(this.database);
// try to load the collections for props and profiles
logger.debug("Initializing the root collection");
this.rootCollection = DatabaseManager.getCollection(URI + DBBroker.ROOT_COLLECTION, "admin", "admin");
logger.info("Initializing the root collection");
this.rootCollection = DatabaseManager.getCollection(URI + DBBroker.ROOT_COLLECTION, USER, PWD);
if (this.rootCollection == null) {
logger.error("invalid root collection!");
throw new XMLStorageNotAvailableException("unable to load root collection");
}
logger.debug("Initializing the collection Profiles");
logger.info("Initializing the Profiles collection");
this.profilesRootCollection = this.rootCollection.getChildCollection(XMLStorageManager.PROFILES_COLLECTION_NAME);
if (this.profilesRootCollection == null) {
logger.debug("Creating Profiles collection");
@ -122,6 +126,7 @@ public class XMLStorageManager {
this.profilesRootCollection.setProperty("pretty", "true");
this.profilesRootCollection.setProperty("encoding", "UTF-8");
this.setStatus(STATUS.INITIALISED);
logger.info("XMLStorage initialized with success");
} catch (XMLDBException edb) {
logger.error("unable to initialize XML storage ", edb);
throw new XMLStorageNotAvailableException("unable to initialize XML storage");
@ -251,7 +256,7 @@ public class XMLStorageManager {
// return this.loadCollection(this.rootCollection,
// XMLStorageManager.PROFILES_COLLECTION_NAME);
try {
currentCollection = DatabaseManager.getCollection(URI + DBBroker.ROOT_COLLECTION, "admin", "admin");
currentCollection = DatabaseManager.getCollection(URI + DBBroker.ROOT_COLLECTION, USER, PWD);
} catch (XMLDBException edb) {
logger.error("Failed to load all collections!");
logger.error("", edb);

View File

@ -44,7 +44,7 @@ public class RestoreTester {
XMLStorageAccessPortType port = null;
try {
port = new XMLStorageAccessServiceLocator().getXMLStorageAccessPortTypePort(new URL(portTypeURI));
port = GCUBERemotePortTypeContext.getProxy(port, GCUBEScope.getScope("/CNRPrivate"));
port = GCUBERemotePortTypeContext.getProxy(port, GCUBEScope.getScope(args[2]));
} catch (Exception e) {
logger.error("",e);
}