Fixing the transaction logs issue
git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/trunk/information-system/gCubeIS/Collector@15537 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
dcdb16c16f
commit
cfff708717
2
.project
2
.project
|
@ -1,6 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<projectDescription>
|
||||
<name>InformationCollector</name>
|
||||
<name>InformationCollector_previous</name>
|
||||
<comment></comment>
|
||||
<projects>
|
||||
</projects>
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
<environment name="scheduledBackupInHours" value="12"
|
||||
type="java.lang.String" override="false" />
|
||||
|
||||
<!-- a value=="-1" means no sweeping is requested -->
|
||||
<environment name="sweeperIntervalInMillis" value="240000"
|
||||
type="java.lang.String" override="false" />
|
||||
|
||||
|
@ -27,6 +28,9 @@
|
|||
|
||||
<environment name="deleteRPsOnStartup" value="true"
|
||||
type="java.lang.String" override="false" />
|
||||
|
||||
<environment name="maxOperationsPerConnection" value="5000" type="java.lang.String"
|
||||
override="false" />
|
||||
|
||||
<!-- <environment name="startScopes" value=""
|
||||
type="java.lang.String" override="false" />
|
||||
|
|
|
@ -32,7 +32,18 @@
|
|||
<Version>[2.0.0]</Version>
|
||||
<Scope level="GHN" />
|
||||
<Optional>false</Optional>
|
||||
</Dependency>
|
||||
</Dependency>
|
||||
<Dependency>
|
||||
<Service>
|
||||
<Class>ExternalSoftware</Class>
|
||||
<Name>exist</Name>
|
||||
<Version>1.2.6</Version>
|
||||
</Service>
|
||||
<Package>exist</Package>
|
||||
<Version>1.2.6</Version>
|
||||
<Scope level="GHN"/>
|
||||
<Optional>false</Optional>
|
||||
</Dependency>
|
||||
</Dependencies>
|
||||
<GARArchive>org.gcube.informationsystem.collector.gar
|
||||
</GARArchive>
|
||||
|
|
|
@ -200,9 +200,9 @@ public class OldService extends GCUBEPortType {
|
|||
logger.info("Dispose operation invoked");
|
||||
logger.info("trying to shutdown the storage instances...");
|
||||
try {
|
||||
State.getDataManager().shutdown();
|
||||
State.getQueryManager().shutdown();
|
||||
} catch (NullPointerException se) {/* nothng to do */
|
||||
State.getDataManager().shutdown(true);
|
||||
State.getQueryManager().shutdown(true);
|
||||
} catch (NullPointerException se) {/* nothing to do */
|
||||
}
|
||||
// request the interruption of the sweeper thread
|
||||
State.sweeperT.interrupt();
|
||||
|
|
|
@ -36,12 +36,9 @@ public class QueryManager extends XMLStorageManager {
|
|||
ResourceSet result = null;
|
||||
Collection currentCollection = null;
|
||||
currentCollection = this.loadAllCollections();
|
||||
|
||||
this.lock();
|
||||
while ((retry) && (attempts < MAXQUERYATTEMPTS)) {
|
||||
try {
|
||||
// wait until the DB is unlocked
|
||||
while (State.getDataManager().isLocked())
|
||||
Thread.sleep(1000);
|
||||
try {
|
||||
// execute query and get results in ResourceSet
|
||||
if (currentCollection == null)
|
||||
result = query.execute(this.rootCollection);
|
||||
|
@ -73,9 +70,10 @@ public class QueryManager extends XMLStorageManager {
|
|||
|
||||
}
|
||||
}
|
||||
|
||||
//operationsCounter++;
|
||||
this.resetCollection(currentCollection);
|
||||
|
||||
this.unlock();
|
||||
//this.checkConnection();
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -116,8 +116,8 @@ public class State {
|
|||
State.schedulerT.interrupt();
|
||||
State.schedulerT = null;
|
||||
}
|
||||
State.dataManager.shutdown();
|
||||
State.queryManager.shutdown();
|
||||
State.dataManager.shutdown(true);
|
||||
State.queryManager.shutdown(true);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -14,7 +14,6 @@ import org.gcube.common.core.utils.logging.GCUBELog;
|
|||
import org.gcube.informationsystem.collector.impl.persistence.PersistentResource;
|
||||
import org.gcube.informationsystem.collector.impl.xmlstorage.exist.State;
|
||||
import org.gcube.informationsystem.collector.impl.xmlstorage.exist.Sweeper;
|
||||
import org.gcube.informationsystem.collector.impl.xmlstorage.exist.XMLStorageManager;
|
||||
|
||||
/**
|
||||
* This class provides some cleanup procedures to use to maintain a consistent
|
||||
|
@ -32,7 +31,7 @@ public class Sweeper implements Runnable {
|
|||
|
||||
private static GCUBELog logger = new GCUBELog(Sweeper.class);
|
||||
|
||||
private static XMLStorageManager storage = null;
|
||||
//private static XMLStorageManager storage = null;
|
||||
|
||||
/**
|
||||
* Initializes a new Sweeper
|
||||
|
@ -45,14 +44,14 @@ public class Sweeper implements Runnable {
|
|||
Sweeper.DELAY = delay;
|
||||
Sweeper.resourceExpirationTime = resourceExpirationTime;
|
||||
logger.info("Starting sweeper thread with an interval of " + Sweeper.DELAY + " ms");
|
||||
Sweeper.storage = new XMLStorageManager();
|
||||
Sweeper.storage.initialize();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public void run() {
|
||||
if (Sweeper.DELAY == -1) return;
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
Thread.sleep(Sweeper.DELAY);
|
||||
|
@ -63,7 +62,6 @@ public class Sweeper implements Runnable {
|
|||
} catch (InterruptedException ie) {
|
||||
logger.error("Unable to sleep (yawn)", ie);
|
||||
// thread was interrupted
|
||||
storage.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -77,7 +75,7 @@ public class Sweeper implements Runnable {
|
|||
now.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
|
||||
try {
|
||||
String[] ids = Sweeper.storage.listAllPropertiesIDs();
|
||||
String[] ids = State.getDataManager().listAllPropertiesIDs();
|
||||
for (String id : ids) {
|
||||
try {
|
||||
PersistentResource res = State.getDataManager().retrievePropertyResourceFromID(id);
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.exist.xmldb.DatabaseInstanceManager;
|
|||
import org.exist.storage.DBBroker;
|
||||
|
||||
import org.gcube.common.core.utils.logging.GCUBELog;
|
||||
import org.gcube.informationsystem.collector.impl.contexts.ICServiceContext;
|
||||
import org.gcube.informationsystem.collector.impl.persistence.AggregatorPersistentResource;
|
||||
import org.gcube.informationsystem.collector.impl.persistence.PersistentResource;
|
||||
import org.gcube.informationsystem.collector.impl.persistence.PersistentResource.MalformedResourceException;
|
||||
|
@ -27,8 +28,6 @@ import java.io.IOException;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
;
|
||||
|
||||
/**
|
||||
* A thread safe manager to interact with the XML Storage repository.
|
||||
*
|
||||
|
@ -55,9 +54,14 @@ public class XMLStorageManager {
|
|||
|
||||
private Collection profilesRootCollection;
|
||||
|
||||
// lock for write operations
|
||||
protected Lock lock;
|
||||
|
||||
// lock for writing operations
|
||||
private Lock writeLock;
|
||||
|
||||
|
||||
// lock for reading operations
|
||||
protected Lock readLock;
|
||||
|
||||
// flag to warn when the DB is locked
|
||||
private boolean locked = false;
|
||||
|
||||
|
@ -69,6 +73,10 @@ public class XMLStorageManager {
|
|||
|
||||
private STATUS status = STATUS.CLOSED;
|
||||
|
||||
protected static long maxOperationsPerConnection = 1000;
|
||||
|
||||
protected long operationsCounter = 0;
|
||||
|
||||
|
||||
/**
|
||||
* Creates a new manager
|
||||
|
@ -76,27 +84,32 @@ public class XMLStorageManager {
|
|||
*/
|
||||
public XMLStorageManager() {
|
||||
logger.debug("Creating a new XMLStorageManager");
|
||||
writeLock = new ReentrantLock();
|
||||
lock = new ReentrantLock();
|
||||
writeLock = lock;
|
||||
readLock = lock;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the local XML Storage repository
|
||||
* @param lock
|
||||
*
|
||||
* @throws Exception
|
||||
* if the connection to eXist or its initialization fail
|
||||
*/
|
||||
public void initialize() throws XMLStorageNotAvailableException {
|
||||
public void initialize(boolean ... lock) throws XMLStorageNotAvailableException {
|
||||
|
||||
if (this.getStatus() == STATUS.INITIALISED) {
|
||||
logger.warn("XMLStorage already initialized");
|
||||
return;
|
||||
}
|
||||
// lock the instance
|
||||
if (lock != null && lock.length > 0 && lock[0]) {
|
||||
this.lock();
|
||||
}
|
||||
try {
|
||||
logger.info("Initializing XMLStorage...");
|
||||
// this.printEnv();
|
||||
|
||||
// lock the instance
|
||||
writeLock.lock();
|
||||
maxOperationsPerConnection = Long.valueOf((String) ICServiceContext.getContext().getProperty("maxOperationsPerConnection", true));
|
||||
|
||||
// register/create the DB instance
|
||||
Class<?> cl = Class.forName(driver);
|
||||
|
@ -126,6 +139,7 @@ public class XMLStorageManager {
|
|||
this.profilesRootCollection.setProperty("pretty", "true");
|
||||
this.profilesRootCollection.setProperty("encoding", "UTF-8");
|
||||
this.setStatus(STATUS.INITIALISED);
|
||||
|
||||
logger.info("XMLStorage initialized with success");
|
||||
} catch (XMLDBException edb) {
|
||||
logger.error("unable to initialize XML storage ", edb);
|
||||
|
@ -137,28 +151,53 @@ public class XMLStorageManager {
|
|||
logger.error("unable to initialize XML storage", ncdfe);
|
||||
throw new XMLStorageNotAvailableException("unable to initialize XML storage");
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
if (lock != null && lock.length > 0 && lock[0]) {
|
||||
this.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks all the writing operations
|
||||
*/
|
||||
protected void lock() {
|
||||
writeLock.lock();
|
||||
this.locked = true;
|
||||
logger.trace("WRITE LOCK acquired");
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Unblocks all the writing operations
|
||||
*/
|
||||
protected void unlock() {
|
||||
writeLock.unlock();
|
||||
this.locked = false;
|
||||
logger.trace("WRITE LOCK released");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Shutdowns the XML Storage repository
|
||||
*
|
||||
* @return true if the operation succeed
|
||||
*/
|
||||
public boolean shutdown() {
|
||||
writeLock.lock();
|
||||
public boolean shutdown(boolean lock) {
|
||||
if (lock)
|
||||
this.lock();
|
||||
|
||||
logger.info("XML storage is shutting down");
|
||||
try {
|
||||
DatabaseInstanceManager manager = (DatabaseInstanceManager) rootCollection.getService("DatabaseInstanceManager", "1.0");
|
||||
manager.shutdown();
|
||||
this.setStatus(STATUS.SHUTDOWN);
|
||||
logger.info("...XML storage is down");
|
||||
if (lock)
|
||||
this.lock();
|
||||
} catch (XMLDBException edb) {
|
||||
logger.error("Unable to shutdown XML storage");
|
||||
logger.error("" + edb.getCause());
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
logger.fatal("Unable to shutdown XML storage", edb);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -290,8 +329,7 @@ public class XMLStorageManager {
|
|||
logger.error("Unable to open the Collection");
|
||||
throw new XMLStorageNotAvailableException("Unable to open the Collection");
|
||||
}
|
||||
writeLock.lock();
|
||||
this.locked = true;
|
||||
this.lock();
|
||||
try {
|
||||
XMLResource document = (XMLResource) currentCollection.createResource(resource.getID(), "XMLResource");
|
||||
document.setContent(resource.toString());
|
||||
|
@ -305,9 +343,10 @@ public class XMLStorageManager {
|
|||
logger.error("" + e.getMessage(), e);
|
||||
} finally {
|
||||
this.resetCollection(currentCollection);
|
||||
writeLock.unlock();
|
||||
this.locked = false;
|
||||
operationsCounter++;
|
||||
this.unlock();
|
||||
}
|
||||
this.checkConnection();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -392,67 +431,6 @@ public class XMLStorageManager {
|
|||
|
||||
}
|
||||
|
||||
// /**
|
||||
// * Executes the given XQuery on the current collection or on the root collection if any was
|
||||
// * previously loaded
|
||||
// *
|
||||
// * @param query the XQuery to run
|
||||
// * @return a formatted resultset
|
||||
// * @throws XMLStorageNotAvailableException
|
||||
// */
|
||||
// public ResourceSet executeXQuery(XQuery query) throws XMLStorageNotAvailableException {
|
||||
//
|
||||
// if (status != STATUS.INITIALISED)
|
||||
// throw new XMLStorageNotAvailableException("XMLStorage is not available");
|
||||
//
|
||||
// boolean retry = true;
|
||||
// int attempts = 0, max_attempts = 3;
|
||||
// ResourceSet result = null;
|
||||
// Collection currentCollection = null;
|
||||
// currentCollection = this.loadAllCollections();
|
||||
//
|
||||
// while ((retry) && (attempts < max_attempts)) {
|
||||
// try {
|
||||
// // wait until the DB is unlocked
|
||||
// while (State.getDataManager().isLocked())
|
||||
// Thread.sleep(1000);
|
||||
// // execute query and get results in ResourceSet
|
||||
// if (currentCollection == null)
|
||||
// result = query.execute(this.rootCollection);
|
||||
// else
|
||||
// result = query.execute(currentCollection);
|
||||
// retry = false;
|
||||
// } catch (XMLDBException edb) {
|
||||
// logger.error("Failed to execute XQuery " + query.toString());
|
||||
// logger.error("Error details: " + edb.errorCode + " " + edb.getMessage(), edb);
|
||||
// // if the cause is a NullPointer, this can be due to a temporary
|
||||
// // lock on the database instance
|
||||
// if (edb.getCause() instanceof java.lang.NullPointerException) {
|
||||
// retry = true;
|
||||
// attempts++;
|
||||
// logger.warn("Trying a new attempt for query execution");
|
||||
// } else
|
||||
// retry = false;
|
||||
//
|
||||
// } catch (Exception e) {
|
||||
// logger.error("", e);
|
||||
// // if the cause is a NullPointer, this can be due to a temporary
|
||||
// // lock on the database instance
|
||||
// if (e instanceof java.lang.NullPointerException) {
|
||||
// retry = true;
|
||||
// attempts++;
|
||||
// logger.warn("Trying a new attempt for query execution");
|
||||
// } else
|
||||
// retry = false;
|
||||
//
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// this.resetCollection(currentCollection);
|
||||
//
|
||||
// return result;
|
||||
// }
|
||||
|
||||
/**
|
||||
* Deletes a WS-ResourceProperties resource identified by the given ID
|
||||
*/
|
||||
|
@ -523,8 +501,7 @@ public class XMLStorageManager {
|
|||
|
||||
XMLResource res = null;
|
||||
// lock the instance
|
||||
this.writeLock.lock();
|
||||
this.locked = true;
|
||||
this.lock();
|
||||
try {
|
||||
res = (XMLResource) col.getResource(resourceID);
|
||||
if (res == null)
|
||||
|
@ -539,8 +516,7 @@ public class XMLStorageManager {
|
|||
throw new Exception();
|
||||
} finally {
|
||||
this.resetCollection(col);
|
||||
this.writeLock.unlock();
|
||||
this.locked = false;
|
||||
this.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -578,14 +554,14 @@ public class XMLStorageManager {
|
|||
private Collection createCollection(Collection parentCollection, String collectionName) {
|
||||
|
||||
Collection col = null;
|
||||
this.writeLock.lock();
|
||||
this.lock();
|
||||
try {
|
||||
CollectionManagementService mgtService = (CollectionManagementService) parentCollection.getService("CollectionManagementService", "1.0");
|
||||
col = mgtService.createCollection(collectionName);
|
||||
} catch (XMLDBException edb) {
|
||||
logger.error("Failed to create collection " + collectionName, edb);
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
this.unlock();
|
||||
}
|
||||
|
||||
return col;
|
||||
|
@ -596,7 +572,7 @@ public class XMLStorageManager {
|
|||
*/
|
||||
public void deleteAllProperties() {
|
||||
|
||||
this.writeLock.lock();
|
||||
this.lock();
|
||||
try {
|
||||
logger.info("Trying to delete the collection " + XMLStorageManager.PROPERTIES_COLLECTION_NAME + "...");
|
||||
CollectionManagementService mgtService = (CollectionManagementService) rootCollection.getService("CollectionManagementService", "1.0");
|
||||
|
@ -605,7 +581,7 @@ public class XMLStorageManager {
|
|||
} catch (XMLDBException edb) {
|
||||
logger.warn("Unable to delete the collection " + XMLStorageManager.PROPERTIES_COLLECTION_NAME + ": " + edb.toString());
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
this.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -644,6 +620,24 @@ public class XMLStorageManager {
|
|||
}
|
||||
return ids;
|
||||
}
|
||||
|
||||
protected void checkConnection() {
|
||||
if (this.operationsCounter > XMLStorageManager.maxOperationsPerConnection) {
|
||||
logger.info("It's time to reset the connection...");
|
||||
this.lock();
|
||||
this.shutdown(false);
|
||||
//give a breath to the Transaction Manager
|
||||
try {Thread.sleep(5000);} catch (InterruptedException e) {}
|
||||
try {
|
||||
this.initialize(false);
|
||||
} catch (XMLStorageNotAvailableException e) {
|
||||
logger.fatal("Unable to initialize XML storage", e);
|
||||
}
|
||||
this.operationsCounter = 0;
|
||||
this.unlock();
|
||||
logger.info("Connection reset");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads a file content in a String
|
||||
|
|
Loading…
Reference in New Issue