diff --git a/etc/deploy-jndi-config.xml b/etc/deploy-jndi-config.xml index bd4366a..926f1c8 100755 --- a/etc/deploy-jndi-config.xml +++ b/etc/deploy-jndi-config.xml @@ -13,10 +13,10 @@ - - - - - + --> diff --git a/src/org/gcube/informationsystem/collector/impl/persistence/AggregatorPersistentResource.java b/src/org/gcube/informationsystem/collector/impl/persistence/AggregatorPersistentResource.java index fb770d5..a248876 100755 --- a/src/org/gcube/informationsystem/collector/impl/persistence/AggregatorPersistentResource.java +++ b/src/org/gcube/informationsystem/collector/impl/persistence/AggregatorPersistentResource.java @@ -64,19 +64,28 @@ public class AggregatorPersistentResource extends PersistentResource { /** * Builds a new empty DISPersinstentresource + * @param key + * @param source + * @throws Exception if it was impossible to extract the ID from the resource * */ - public AggregatorPersistentResource() { + public AggregatorPersistentResource(String source, String key, String data, RESOURCETYPE type) throws Exception { // defatult termination time is now Calendar cal = new GregorianCalendar(); cal.setTimeZone(TimeZone.getTimeZone("GMT")); this.setTerminationTime(cal); - + this.source = source; + this.sourceKey = key; + this.data = data; + this.type = type; lastUpdateTime = new GregorianCalendar(); lastUpdateTime.setTimeZone(TimeZone.getTimeZone("GMT")); + this.buildID(); + logger.debug("Resource ID: " + this.resourceID); } - + + /** * Builds a DISPersinstentresource starting from an eXist resource * @@ -91,17 +100,40 @@ public class AggregatorPersistentResource extends PersistentResource { this.originalSource = resource; this.resource_string = resource.getContent().toString(); this.parseResource(); + logger.debug("Resource ID: " + this.resourceID); } catch (XMLDBException dbe) { throw new Exception("invalid resource"); } } + private void buildID() throws Exception { + if ((this.type != null) && (type == RESOURCETYPE.Profile)) { //it's a gCube profile + try { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder = factory.newDocumentBuilder(); + StringReader reader = new StringReader(this.toXML()); + InputSource source = new InputSource(reader); + Document internalDOM = builder.parse(source); + // gets the type + XPath path = XPathFactory.newInstance().newXPath(); + this.profile_type = path.evaluate("/Document/Data/child::*[local-name()='Profile']/Resource/Type", internalDOM); + // uses the GCUBEResource ID as local resource ID + this.resourceID = path.evaluate("/Document/Data/child::*[local-name()='Profile']/Resource/ID", internalDOM); + + } catch (Exception e) { + logger.error("Unable to extract the ID from the resource " + e.getMessage()); + throw e; + + } + } else { //it's a RP doc + //we need to replace the schema and colons in order to make it an ID accepted by eXist + this.resourceID = this.source.replace("http://", "").replace(":", "") + "-" + this.sourceKey;//this.getGroupKey() + this.getEntryKey(); + } + } /** * Sets the content of the resource * - * @param data - * the new content - * + * @param data the new content */ public void setData(String data) { this.data = data; @@ -110,11 +142,9 @@ public class AggregatorPersistentResource extends PersistentResource { /** * Sets the content of the resource using the content of a file * - * @param f - * the file to use as content source + * @param f the file to use as content source * - * @throws IOException - * if the access to the given file fails + * @throws IOException if the access to the given file fails * */ public void setData(File f) throws IOException { @@ -151,11 +181,9 @@ public class AggregatorPersistentResource extends PersistentResource { * @return the ID */ public String getID() { - // create a unique ID unless the resource contains a profile - if (this.resourceID == null) - this.resourceID = this.getGroupKey() + this.getEntryKey(); - + //if (this.resourceID == null) + // this.resourceID = this.source + this.sourceKey;//this.getGroupKey() + this.getEntryKey(); return this.resourceID; } @@ -240,8 +268,7 @@ public class AggregatorPersistentResource extends PersistentResource { /** * Sets the complete source key * - * @param completeKey - * the new complete key + * @param completeKey the new complete key */ public void setCompleteSourceKey(String completeKey) { this.completeSourceKey = completeKey; diff --git a/src/org/gcube/informationsystem/collector/impl/state/AggregatorRegisteredResource.java b/src/org/gcube/informationsystem/collector/impl/state/AggregatorRegisteredResource.java index a086816..b7561b0 100755 --- a/src/org/gcube/informationsystem/collector/impl/state/AggregatorRegisteredResource.java +++ b/src/org/gcube/informationsystem/collector/impl/state/AggregatorRegisteredResource.java @@ -3,13 +3,9 @@ package org.gcube.informationsystem.collector.impl.state; import java.util.Calendar; import org.apache.axis.encoding.AnyContentType; -import org.apache.axis.message.MessageElement; -import org.apache.axis.message.addressing.EndpointReferenceType; -import org.apache.axis.message.addressing.ReferencePropertiesType; import org.gcube.common.core.utils.logging.GCUBELog; import org.gcube.informationsystem.collector.impl.persistence.AggregatorPersistentResource; -import org.gcube.informationsystem.collector.impl.persistence.PersistentResource.RESOURCETYPE; import org.gcube.informationsystem.collector.impl.state.AggregatorRegisteredResource; import org.gcube.informationsystem.collector.impl.utils.*; import org.gcube.informationsystem.collector.impl.xmlstorage.exist.State; @@ -47,9 +43,7 @@ public class AggregatorRegisteredResource extends AggregatorServiceGroupResource private ResourceKey resourceKey = null; - // private static I18n i18n = I18n.getI18n(Resources.class.getName()); - - private static final String registryNS = "gcube/informationsystem/registry/Registry"; + // private static I18n i18n = I18n.getI18n(Resources.class.getName()); protected Calendar terminationTime, currentTime; @@ -179,66 +173,34 @@ public class AggregatorRegisteredResource extends AggregatorServiceGroupResource logger.debug("Entry RunningInstance ID " + aentry.getRunningInstanceID()); logger.debug("Entry Service Name " + aentry.getServiceName()); logger.debug("Entry Service Class " + aentry.getServiceClass()); + + // logger.debug(State.logPrefix + "getEntryAsString " + // aentry.getEntryAsString()); // MessageElement[] message = messageObj.get_any(); // extract the entry EPR - EntryEPRParser eprparser = new EntryEPRParser(entry.getEntryEPR()); - - logger.debug("aggregator EntryEPR-> " + entry.getEntryEPR().toString()); - - // extract the member EPR - EndpointReferenceType memberEpr = entry.getMemberEPR(); - logger.debug("aggregator MemberEPR-> " + memberEpr.toString()); - - // get RP set from entry - // ResourcePropertySet rpSet = entry.getResourcePropertySet(); - - // get content RP from entry - /* - * ResourceProperty contentRP = rpSet.get(ServiceGroupConstants.CONTENT); - * - * AggregatorContent content = entry.getContent(); - * - * AggregatorConfig config = content.getAggregatorConfig(); - * - * MessageElement[] any = config.get_any(); - */ + EntryParser entryparser = new EntryParser(entry); + EntryEPRParser sinkparser = entryparser.getEPRSinkParser(); + logger.debug("Aggregator Source " + entryparser.getSource()); + logger.debug("Aggregator Sink " + entryparser.getSink()); + // Build the new resource to store logger.debug("Storing the new delivered resource"); - AggregatorPersistentResource res = new AggregatorPersistentResource(); + AggregatorPersistentResource res = new AggregatorPersistentResource(entryparser.getSourceURI(), entryparser.getSourceKey(), aentry.getEntryAsString(), entryparser.getType()); res.setData(aentry.getEntryAsString()); - res.setEntryKey(eprparser.getEntryKey()); - res.setGroupKey(eprparser.getGroupKey()); + res.setEntryKey(sinkparser.getEntryKey()); + res.setGroupKey(sinkparser.getGroupKey()); res.setTerminationTime(entry.getTerminationTime()); - // select the resource type - if (memberEpr.getAddress().toString().endsWith(AggregatorRegisteredResource.registryNS)) { - res.setType(RESOURCETYPE.Profile); - } else { - res.setType(RESOURCETYPE.Properties); - } - - // set the EPR and Key of the remote WS-Resource that publish the - // resource - res.setSource(memberEpr.getAddress().toString()); - logger.debug("Source: " + memberEpr.getAddress().toString()); - try { - ReferencePropertiesType prop = memberEpr.getProperties(); - if (prop != null) { - MessageElement[] any = prop.get_any(); - if (any.length > 0) { - res.setSourceKey(any[0].getValue()); - res.setCompleteSourceKey(any[0].toString()); - } - } - } catch (java.lang.NullPointerException npe) { - // nothing to do, the source key does not exist (may be the publisher is a singleton - // or stateless service) - } + //res.setType(entryparser.getType()); + res.setSource(entryparser.getSourceURI()); + res.setSourceKey(entryparser.getSourceKey()); + logger.debug("Qualified Source Key: " + entryparser.getQualifiedSourceKey()); + res.setCompleteSourceKey(entryparser.getQualifiedSourceKey()); + //if the resource is in the to-be-removed list, raise an exception synchronized (State.deletedResources) { if (State.deletedResources.contains(res)) { State.deletedResources.remove(res); diff --git a/src/org/gcube/informationsystem/collector/impl/state/ICAggregatorRemoveCallback.java b/src/org/gcube/informationsystem/collector/impl/state/ICAggregatorRemoveCallback.java index b494956..e05405c 100755 --- a/src/org/gcube/informationsystem/collector/impl/state/ICAggregatorRemoveCallback.java +++ b/src/org/gcube/informationsystem/collector/impl/state/ICAggregatorRemoveCallback.java @@ -2,9 +2,10 @@ package org.gcube.informationsystem.collector.impl.state; import org.gcube.common.core.utils.logging.GCUBELog; import org.gcube.informationsystem.collector.impl.persistence.AggregatorPersistentResource; +import org.gcube.informationsystem.collector.impl.persistence.PersistentResource.RESOURCETYPE; import org.gcube.informationsystem.collector.impl.utils.EntryEPRParser; +import org.gcube.informationsystem.collector.impl.utils.EntryParser; import org.gcube.informationsystem.collector.impl.xmlstorage.exist.State; - import org.globus.mds.aggregator.impl.AggregatorServiceGroupEntryRemovedCallback; import org.globus.mds.aggregator.impl.AggregatorServiceGroupEntryResource; @@ -33,21 +34,26 @@ public class ICAggregatorRemoveCallback implements AggregatorServiceGroupEntryRe * Removes from the storage the supplied resource * * @param entry the AggregatorServiceGroupEntryResource that is about to be removed - * @throws Exception - * if the delete operation fails + * @throws Exception if the delete operation fails */ public void remove(AggregatorServiceGroupEntryResource entry) throws Exception { logger.debug("ICAggregatorRemoveCallback invoked " + entry.getEntryEPR().toString()); - EntryEPRParser parser = new EntryEPRParser(entry.getEntryEPR()); - - AggregatorPersistentResource res = new AggregatorPersistentResource(); + + EntryParser eparser = new EntryParser(entry); + logger.debug("Aggregator Source " + eparser.getSource()); + logger.debug("Aggregator Sink " + eparser.getSink()); + + EntryEPRParser parser = eparser.getEPRSinkParser(); + AggregatorPersistentResource res = new AggregatorPersistentResource(eparser.getSourceURI(), eparser.getSourceKey(), "", RESOURCETYPE.Properties); res.setEntryKey(parser.getEntryKey()); res.setGroupKey(parser.getGroupKey()); + res.setSourceKey(eparser.getSourceKey()); + res.setSource(eparser.getSourceURI()); // mark the resource as no longer available synchronized (State.deletedResources) { - State.deletedResources.add(res); + State.getDeletedResources().add(res); } // delete the resource from the database diff --git a/src/org/gcube/informationsystem/collector/impl/utils/EntryParser.java b/src/org/gcube/informationsystem/collector/impl/utils/EntryParser.java new file mode 100644 index 0000000..6181fde --- /dev/null +++ b/src/org/gcube/informationsystem/collector/impl/utils/EntryParser.java @@ -0,0 +1,125 @@ +package org.gcube.informationsystem.collector.impl.utils; + +import org.apache.axis.message.MessageElement; +import org.apache.axis.message.addressing.EndpointReferenceType; +import org.apache.axis.message.addressing.ReferencePropertiesType; +import org.globus.mds.aggregator.impl.AggregatorServiceGroupEntryResource; + +import org.gcube.informationsystem.collector.impl.persistence.PersistentResource.RESOURCETYPE; + +/** + * Parser for {@link AggregatorServiceGroupEntryResource} + * + * @author Manuele Simi (ISTI-CNR) + * + */ +public class EntryParser { + + private AggregatorServiceGroupEntryResource entry = null; + + private static final String registryNS = "gcube/informationsystem/registry/Registry"; + + public EntryParser(AggregatorServiceGroupEntryResource entry) { + this.entry = entry; + } + + + public EndpointReferenceType getSource() { + return entry.getMemberEPR(); + } + + public EndpointReferenceType getSink() { + return entry.getEntryEPR(); + } + + /** + * @return the source key or an empty string if it does not exist + */ + public String getSourceKey() { + String key = ""; + EndpointReferenceType memberEpr = entry.getMemberEPR(); + try { + ReferencePropertiesType prop = memberEpr.getProperties(); + if (prop != null) { + MessageElement[] any = prop.get_any(); + if (any.length > 0) + key = any[0].getValue(); + } + } catch (java.lang.NullPointerException npe) { + // nothing to do, the source key does not exist (may be the publisher is a singleton + // or stateless service) + } + + return key; + } + + /** + * @return the fully qualified source key or an empty string if it does not exist + */ + public String getQualifiedSourceKey() { + String key = ""; + EndpointReferenceType memberEpr = entry.getMemberEPR(); + try { + ReferencePropertiesType prop = memberEpr.getProperties(); + if (prop != null) { + MessageElement[] any = prop.get_any(); + if (any.length > 0) + key = any[0].toString(); + } + } catch (java.lang.NullPointerException npe) { + // nothing to do, the source key does not exist (may be the publisher is a singleton + // or stateless service) + } + + return key; + } + + + /** + * + * @return the source URI, i.e. the URI from which the resource has been registered + */ + public String getSourceURI() { + EndpointReferenceType memberEpr = entry.getMemberEPR(); + return memberEpr.getAddress().toString(); + } + + /** + * + * @return the {@link RESOURCETYPE} + */ + public RESOURCETYPE getType() { + EndpointReferenceType memberEpr = entry.getMemberEPR(); + if (memberEpr.getAddress().toString().endsWith(registryNS)) { + return RESOURCETYPE.Profile; + } else { + return RESOURCETYPE.Properties; + } + } + + public void getRPSet() { + // get RP set from entry + // ResourcePropertySet rpSet = entry.getResourcePropertySet(); + + // get content RP from entry + /* + * ResourceProperty contentRP = rpSet.get(ServiceGroupConstants.CONTENT); + * + * AggregatorContent content = entry.getContent(); + * + * AggregatorConfig config = content.getAggregatorConfig(); + * + * MessageElement[] any = config.get_any(); + */ + + } + + /** + * + * @return a parser for the Sink EPR + * @throws Exception + */ + public EntryEPRParser getEPRSinkParser() throws Exception { + return new EntryEPRParser(entry.getEntryEPR()); + } +} diff --git a/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/DataManager.java b/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/DataManager.java index 41c4dfc..855a9ea 100644 --- a/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/DataManager.java +++ b/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/DataManager.java @@ -2,12 +2,9 @@ package org.gcube.informationsystem.collector.impl.xmlstorage.exist; import java.io.IOException; import java.util.Properties; -import javax.xml.parsers.ParserConfigurationException; import org.exist.backup.Restore; import org.exist.storage.BrokerPool; import org.exist.storage.ConsistencyCheckTask; -import org.xml.sax.SAXException; -import org.xmldb.api.base.XMLDBException; import org.gcube.common.core.utils.logging.GCUBELog; import org.gcube.informationsystem.collector.impl.contexts.ICServiceContext; @@ -63,12 +60,8 @@ public class DataManager extends XMLStorageManager { } /** - * Restore from the latest backup + * Restores from the latest backup * @throws IOException - * @throws XMLDBException - * @throws SAXException - * @throws ParserConfigurationException - * @throws Exception */ public synchronized void restore() throws IOException { @@ -77,8 +70,13 @@ public class DataManager extends XMLStorageManager { try { ExistBackupFolder lastBackup = this.getLastBackup(); logger.info("Restoring from " + lastBackup.getBackupFile()); - Restore restore = new Restore("admin", "admin","admin", lastBackup.getBackupFile(), URI); - restore.restore(false, null); + Restore restore = new Restore(USER, PWD, PWD, lastBackup.getBackupFile(), URI); + restore.restore(false, null); + if (Boolean.valueOf((String) ICServiceContext.getContext().getProperty("deleteRPsOnStartup", true))) { + // cleanup the RPs collection + logger.info("deleting all RPs..."); + this.deleteAllProperties(); + } logger.info("Restore completed"); } catch (Exception e1) { logger.fatal("Failed to restore", e1); diff --git a/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/State.java b/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/State.java index 92f1926..beb1c55 100755 --- a/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/State.java +++ b/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/State.java @@ -51,22 +51,22 @@ public class State { * if the intialization fails */ public static void initialize() throws Exception { - logger.info("starting IC service initialization..."); + logger.info("Starting IC service initialization..."); State.initializeDataManager(); State.initializeQueryManager(); if (Boolean.valueOf((String) ICServiceContext.getContext().getProperty("deleteRPsOnStartup", true))) { // cleanup the RPs collection - logger.info("deleting all RPs..."); + logger.info("Deleting all RPs..."); State.dataManager.deleteAllProperties(); } else { - logger.info("all RPs previously stored are keept in the storage"); + logger.info("All RPs previously stored are kept in the storage"); } logger.info("Initialising the sweeper..."); // start the sweeper to periodically cleanup the storage and some data structures if (State.sweeperT == null) { - Sweeper sweeper = new Sweeper(Long.valueOf((String) ICServiceContext.getContext().getProperty("sweeperIntervalInMillis", true)), Long.valueOf((String) ICServiceContext.getContext() - .getProperty("resourceExpirationTimeInMillis", true))); + Sweeper sweeper = new Sweeper(Long.valueOf((String) ICServiceContext.getContext().getProperty("sweeperIntervalInMillis", true)), + Long.valueOf((String) ICServiceContext.getContext().getProperty("resourceExpirationTimeInMillis", true))); State.sweeperT = new Thread(sweeper); State.sweeperT.start(); } @@ -108,10 +108,14 @@ public class State { */ public static void dispose() throws Exception { logger.info("Disposing IC service's resources..."); - State.sweeperT.interrupt(); - State.sweeperT = null; - State.schedulerT.interrupt(); - State.schedulerT = null; + if (State.sweeperT != null) { + State.sweeperT.interrupt(); + State.sweeperT = null; + } + if (State.schedulerT == null) { + State.schedulerT.interrupt(); + State.schedulerT = null; + } State.dataManager.shutdown(); State.queryManager.shutdown(); @@ -132,6 +136,13 @@ public class State { } + /** + * @return the deletedResources + */ + public static List getDeletedResources() { + return deletedResources; + } + /** * Prints the enviromnet variables */ diff --git a/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/Sweeper.java b/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/Sweeper.java index 96f0bfc..04a3aec 100755 --- a/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/Sweeper.java +++ b/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/Sweeper.java @@ -30,7 +30,7 @@ public class Sweeper implements Runnable { private static long resourceExpirationTime = 1800000; // default value - private static GCUBELog logger = new GCUBELog(Sweeper.class.getName()); + private static GCUBELog logger = new GCUBELog(Sweeper.class); private static XMLStorageManager storage = null; @@ -77,12 +77,10 @@ public class Sweeper implements Runnable { now.setTimeZone(TimeZone.getTimeZone("GMT")); try { - String[] ids = Sweeper.storage.listAllPropertiesIDs(); for (String id : ids) { try { - PersistentResource res = Sweeper.storage - .retrievePropertyResourceFromID(id); + PersistentResource res = State.getDataManager().retrievePropertyResourceFromID(id); if (now.getTimeInMillis() - res.getLastUpdateTimeinMills() > Sweeper.resourceExpirationTime) // removes the resources from the database State.getDataManager().retrieveAndDeleteResourceFromID(id); diff --git a/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/XMLStorageManager.java b/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/XMLStorageManager.java index 7aa38fe..a2410d8 100755 --- a/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/XMLStorageManager.java +++ b/src/org/gcube/informationsystem/collector/impl/xmlstorage/exist/XMLStorageManager.java @@ -38,6 +38,10 @@ import java.util.concurrent.locks.ReentrantLock; public class XMLStorageManager { protected static String URI = "xmldb:exist://"; + + protected static String USER = "admin"; + + protected static String PWD = ""; protected static String driver = "org.exist.xmldb.DatabaseImpl"; @@ -101,13 +105,13 @@ public class XMLStorageManager { DatabaseManager.registerDatabase(this.database); // try to load the collections for props and profiles - logger.debug("Initializing the root collection"); - this.rootCollection = DatabaseManager.getCollection(URI + DBBroker.ROOT_COLLECTION, "admin", "admin"); + logger.info("Initializing the root collection"); + this.rootCollection = DatabaseManager.getCollection(URI + DBBroker.ROOT_COLLECTION, USER, PWD); if (this.rootCollection == null) { logger.error("invalid root collection!"); throw new XMLStorageNotAvailableException("unable to load root collection"); } - logger.debug("Initializing the collection Profiles"); + logger.info("Initializing the Profiles collection"); this.profilesRootCollection = this.rootCollection.getChildCollection(XMLStorageManager.PROFILES_COLLECTION_NAME); if (this.profilesRootCollection == null) { logger.debug("Creating Profiles collection"); @@ -122,6 +126,7 @@ public class XMLStorageManager { this.profilesRootCollection.setProperty("pretty", "true"); this.profilesRootCollection.setProperty("encoding", "UTF-8"); this.setStatus(STATUS.INITIALISED); + logger.info("XMLStorage initialized with success"); } catch (XMLDBException edb) { logger.error("unable to initialize XML storage ", edb); throw new XMLStorageNotAvailableException("unable to initialize XML storage"); @@ -251,7 +256,7 @@ public class XMLStorageManager { // return this.loadCollection(this.rootCollection, // XMLStorageManager.PROFILES_COLLECTION_NAME); try { - currentCollection = DatabaseManager.getCollection(URI + DBBroker.ROOT_COLLECTION, "admin", "admin"); + currentCollection = DatabaseManager.getCollection(URI + DBBroker.ROOT_COLLECTION, USER, PWD); } catch (XMLDBException edb) { logger.error("Failed to load all collections!"); logger.error("", edb); diff --git a/src/org/gcube/informationsystem/collector/stubs/testsuite/RestoreTester.java b/src/org/gcube/informationsystem/collector/stubs/testsuite/RestoreTester.java index 69c3c58..2cb8611 100644 --- a/src/org/gcube/informationsystem/collector/stubs/testsuite/RestoreTester.java +++ b/src/org/gcube/informationsystem/collector/stubs/testsuite/RestoreTester.java @@ -44,7 +44,7 @@ public class RestoreTester { XMLStorageAccessPortType port = null; try { port = new XMLStorageAccessServiceLocator().getXMLStorageAccessPortTypePort(new URL(portTypeURI)); - port = GCUBERemotePortTypeContext.getProxy(port, GCUBEScope.getScope("/CNRPrivate")); + port = GCUBERemotePortTypeContext.getProxy(port, GCUBEScope.getScope(args[2])); } catch (Exception e) { logger.error("",e); }