diff --git a/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/AbstractEcologicalEngineMapper.java b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/AbstractEcologicalEngineMapper.java index 89cbeb8..b20c810 100644 --- a/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/AbstractEcologicalEngineMapper.java +++ b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/AbstractEcologicalEngineMapper.java @@ -31,13 +31,16 @@ import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.I import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.dataspace.ComputationData; import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.dataspace.DataspaceManager; import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.dataspace.StoredData; +import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils.Cancellable; +import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils.Observable; +import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils.Observer; import org.hibernate.SessionFactory; import org.n52.wps.algorithm.annotation.Execute; import org.n52.wps.server.AbstractAnnotatedAlgorithm; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { +public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm implements Observable, Cancellable{ /** * Deploying procedure: 1 - modify configuration files 2 - modify resource file: resources/templates/setup.cfg 3 - generate classes with ClassGenerator 4 - add new classes in the wps_config.xml on the wps web app config folder 5 - produce the Jar file of this project 6 - copy the jar file in the lib folder of the wps web app change the server parameters in the wps_config.xml file @@ -45,6 +48,10 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEcologicalEngineMapper.class); + private Observer observer = null; + + private boolean cancelled = false; + // inputs and outputs public LinkedHashMap inputs = new LinkedHashMap(); public LinkedHashMap outputs = new LinkedHashMap(); @@ -84,7 +91,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { } } - + // inner objects public AlgorithmConfiguration config; public InfrastructureDialoguer infrastructureDialoguer; @@ -96,7 +103,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { public static synchronized void addDatabaseInfo(String scope, DatabaseInfo info) { databaseParametersMemoryCache.put(scope, info); } - + public ComputationalAgent getComputationalAgent(String algorithmName) throws Exception { LOGGER.debug("Searching for Agents.. " + algorithmName); List agents = new ArrayList(); @@ -227,48 +234,22 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { inputs.put(ConfigurationManager.tokenParameter, token); } - long statusInterrupt = 0; float previousStatus = -3; public void updateStatus(float status) { if (agent != null) { - long stream = 0; - try { - stream = org.n52.wps.server.database.DatabaseFactory.getDatabase().getContentLengthForStoreResponse(wpsExternalID); - - // LOGGER.debug("STATUS bytes " + stream + " interrupt bytes " + statusInterrupt); - if (statusInterrupt == 0 || statusInterrupt > stream - 3 && stream != 468) { - statusInterrupt = stream; - } else { - LOGGER.debug("STATUS INTERRUPTED!"); - agent.shutdown(); - statusInterrupt = -1; - agent = null; - status = -1f; - super.update(new Integer((int) status)); - try { - updateComputationOnWS(status, null); - } catch (Exception e) { - - } - System.gc(); + if (status != previousStatus) { + LOGGER.debug("STATUS update to: {} ", status ); + previousStatus = status; + super.update(new Integer((int) status)); + try { + updateComputationOnWS(status, null); + } catch (Exception e) { + LOGGER.warn("error updating compution on WS"); } - if (status != previousStatus) { - LOGGER.debug("STATUS update to:" + status + " - status interrupt " + statusInterrupt); - previousStatus = status; - super.update(new Integer((int) status)); - try { - updateComputationOnWS(status, null); - } catch (Exception e) { - - } - } - } catch (Exception e) { - LOGGER.warn("STATUS RETRIEVAL EXCEPTION ",e); - - // stream = statusInterrupt; } } + } public void updateComputationOnWS(float status, String exception) { @@ -312,6 +293,9 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { @Execute public void run() throws Exception { + if (observer!=null) + observer.isStarted(this); + String algorithm = ""; List generatedInputTables = null; List generatedOutputTables = null; @@ -439,7 +423,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { LOGGER.debug("Final Computation Output: " + outputs); endTime = new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(System.currentTimeMillis()); - if (statusInterrupt != -1) { + if (!isCancelled()) { saveComputationOnWS(inputsManager.getProvenanceData(), outputmanager.getProvenanceData(), agent, generatedFiles); } else { LOGGER.debug("Computation interrupted - no update"); @@ -449,14 +433,14 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { } catch (Exception e) { LOGGER.error("Error execution Algorithm {}",algorithm,e); int exitstatus = -2; - if (statusInterrupt == -1) + if (isCancelled()) exitstatus = -1; if (inputsManager != null) updateComputationOnWS(exitstatus, e.getMessage(), inputsManager.getProvenanceData(), generatedFiles); else updateComputationOnWS(exitstatus, e.getMessage()); - if (statusInterrupt == -1) + if (isCancelled()) throw new Exception("Computation cancelled"); else throw e; @@ -473,6 +457,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { time("Cleaning of resources"); displayTimes(); cleanResources(); + if (observer!=null) observer.isFinished(this); LOGGER.debug("All done - Computation Finished"); } @@ -483,14 +468,11 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { @Override public void run() { - while (agent != null && statusInterrupt != -1 && agent.getStatus() < 100) { + while (agent != null && !isCancelled() && agent.getStatus() < 100) { try { updateStatus(agent.getStatus()); Thread.sleep(10000); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + } catch (InterruptedException e) {} } LOGGER.info("Status updater terminated"); } @@ -591,4 +573,45 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm { } } + @Override + public void setObserver(Observer o) { + LOGGER.debug("setting observer in {} ",wpsExternalID); + this.observer = o; + } + + @Override + public synchronized boolean cancel() { + if (!cancelled){ + LOGGER.debug("COMPUTATION INTERRUPTED! ({})",wpsExternalID); + try{ + if (agent!=null){ + agent.shutdown(); + agent = null; + } + + super.update(new Integer((int) -1)); + try { + updateComputationOnWS(-1, null); + } catch (Exception e) { + + } + System.gc(); + cancelled = true; + }catch(Exception e){ + LOGGER.warn("error cancelling computation with id {}",wpsExternalID); + return false; + } + } else { + LOGGER.debug("COMPUTATION ALREADY INTERRUPT! ({})",wpsExternalID); + return false; + } + return true; + + } + + @Override + public boolean isCancelled() { + return cancelled; + } + } diff --git a/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/dataspace/DataspaceManager.java b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/dataspace/DataspaceManager.java index 9f49a3c..f7374d2 100644 --- a/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/dataspace/DataspaceManager.java +++ b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/dataspace/DataspaceManager.java @@ -74,7 +74,6 @@ public class DataspaceManager implements Runnable { try { deleteRunningComputationData(); } catch (Exception e) { - e.printStackTrace(); LOGGER.debug("Dataspace->No running computation available"); } LOGGER.debug("Dataspace->Writing provenance information"); @@ -412,8 +411,7 @@ public class DataspaceManager implements Runnable { try { deleteRunningComputationData(); } catch (Exception e) { - e.printStackTrace(); - LOGGER.debug("Dataspace->impossible to delete running computation"); + LOGGER.debug("Dataspace->impossible to delete running computation : {} ",e.getMessage()); } // LOGGER.debug("Dataspace->updating computation status"); // LOGGER.debug("Dataspace->connecting to Workspace"); diff --git a/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/utils/Cancellable.java b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/utils/Cancellable.java new file mode 100644 index 0000000..0b823d4 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/utils/Cancellable.java @@ -0,0 +1,9 @@ +package org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils; + +public interface Cancellable { + + boolean cancel(); + + boolean isCancelled(); + +} diff --git a/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/utils/Observable.java b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/utils/Observable.java new file mode 100644 index 0000000..87692f5 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/utils/Observable.java @@ -0,0 +1,7 @@ +package org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils; + + +public interface Observable { + + public void setObserver(Observer o); +} diff --git a/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/utils/Observer.java b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/utils/Observer.java new file mode 100644 index 0000000..e637c9e --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/utils/Observer.java @@ -0,0 +1,8 @@ +package org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils; + +public interface Observer { + + public void isFinished(Observable o); + + public void isStarted(Observable o); +} diff --git a/src/test/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/test/AlgorithmTest.java b/src/test/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/test/AlgorithmTest.java index 7e878d8..5de0931 100644 --- a/src/test/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/test/AlgorithmTest.java +++ b/src/test/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/test/AlgorithmTest.java @@ -19,7 +19,7 @@ public class AlgorithmTest { public void executeAlgorithmsFromFile() throws Exception{ String protocol = "http"; String hostname = "dataminer2-d-d4s.d4science.org"; - String token = "52b59669-ccde-46d2-a4da-108b9e941f7c-98187548"; + String token = "7e52e574-3c0d-4673-941e-71cf17ea5134-843339462"; Iterator uris = getUrisIterator();