This commit is contained in:
Lucio Lelii 2017-07-13 09:14:22 +00:00
parent af905ef49a
commit 9dc7f7711b
6 changed files with 95 additions and 50 deletions

View File

@ -31,13 +31,16 @@ import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.I
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.dataspace.ComputationData;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.dataspace.DataspaceManager;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.dataspace.StoredData;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils.Cancellable;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils.Observable;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils.Observer;
import org.hibernate.SessionFactory;
import org.n52.wps.algorithm.annotation.Execute;
import org.n52.wps.server.AbstractAnnotatedAlgorithm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm implements Observable, Cancellable{
/**
* Deploying procedure: 1 - modify configuration files 2 - modify resource file: resources/templates/setup.cfg 3 - generate classes with ClassGenerator 4 - add new classes in the wps_config.xml on the wps web app config folder 5 - produce the Jar file of this project 6 - copy the jar file in the lib folder of the wps web app change the server parameters in the wps_config.xml file
@ -45,6 +48,10 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEcologicalEngineMapper.class);
private Observer observer = null;
private boolean cancelled = false;
// inputs and outputs
public LinkedHashMap<String, Object> inputs = new LinkedHashMap<String, Object>();
public LinkedHashMap<String, Object> outputs = new LinkedHashMap<String, Object>();
@ -84,7 +91,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
}
}
// inner objects
public AlgorithmConfiguration config;
public InfrastructureDialoguer infrastructureDialoguer;
@ -96,7 +103,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
public static synchronized void addDatabaseInfo(String scope, DatabaseInfo info) {
databaseParametersMemoryCache.put(scope, info);
}
public ComputationalAgent getComputationalAgent(String algorithmName) throws Exception {
LOGGER.debug("Searching for Agents.. " + algorithmName);
List<ComputationalAgent> agents = new ArrayList<ComputationalAgent>();
@ -227,48 +234,22 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
inputs.put(ConfigurationManager.tokenParameter, token);
}
long statusInterrupt = 0;
float previousStatus = -3;
public void updateStatus(float status) {
if (agent != null) {
long stream = 0;
try {
stream = org.n52.wps.server.database.DatabaseFactory.getDatabase().getContentLengthForStoreResponse(wpsExternalID);
// LOGGER.debug("STATUS bytes " + stream + " interrupt bytes " + statusInterrupt);
if (statusInterrupt == 0 || statusInterrupt > stream - 3 && stream != 468) {
statusInterrupt = stream;
} else {
LOGGER.debug("STATUS INTERRUPTED!");
agent.shutdown();
statusInterrupt = -1;
agent = null;
status = -1f;
super.update(new Integer((int) status));
try {
updateComputationOnWS(status, null);
} catch (Exception e) {
}
System.gc();
if (status != previousStatus) {
LOGGER.debug("STATUS update to: {} ", status );
previousStatus = status;
super.update(new Integer((int) status));
try {
updateComputationOnWS(status, null);
} catch (Exception e) {
LOGGER.warn("error updating compution on WS");
}
if (status != previousStatus) {
LOGGER.debug("STATUS update to:" + status + " - status interrupt " + statusInterrupt);
previousStatus = status;
super.update(new Integer((int) status));
try {
updateComputationOnWS(status, null);
} catch (Exception e) {
}
}
} catch (Exception e) {
LOGGER.warn("STATUS RETRIEVAL EXCEPTION ",e);
// stream = statusInterrupt;
}
}
}
public void updateComputationOnWS(float status, String exception) {
@ -312,6 +293,9 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
@Execute
public void run() throws Exception {
if (observer!=null)
observer.isStarted(this);
String algorithm = "";
List<String> generatedInputTables = null;
List<String> generatedOutputTables = null;
@ -439,7 +423,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
LOGGER.debug("Final Computation Output: " + outputs);
endTime = new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(System.currentTimeMillis());
if (statusInterrupt != -1) {
if (!isCancelled()) {
saveComputationOnWS(inputsManager.getProvenanceData(), outputmanager.getProvenanceData(), agent, generatedFiles);
} else {
LOGGER.debug("Computation interrupted - no update");
@ -449,14 +433,14 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
} catch (Exception e) {
LOGGER.error("Error execution Algorithm {}",algorithm,e);
int exitstatus = -2;
if (statusInterrupt == -1)
if (isCancelled())
exitstatus = -1;
if (inputsManager != null)
updateComputationOnWS(exitstatus, e.getMessage(), inputsManager.getProvenanceData(), generatedFiles);
else
updateComputationOnWS(exitstatus, e.getMessage());
if (statusInterrupt == -1)
if (isCancelled())
throw new Exception("Computation cancelled");
else
throw e;
@ -473,6 +457,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
time("Cleaning of resources");
displayTimes();
cleanResources();
if (observer!=null) observer.isFinished(this);
LOGGER.debug("All done - Computation Finished");
}
@ -483,14 +468,11 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
@Override
public void run() {
while (agent != null && statusInterrupt != -1 && agent.getStatus() < 100) {
while (agent != null && !isCancelled() && agent.getStatus() < 100) {
try {
updateStatus(agent.getStatus());
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (InterruptedException e) {}
}
LOGGER.info("Status updater terminated");
}
@ -591,4 +573,45 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
}
}
@Override
public void setObserver(Observer o) {
LOGGER.debug("setting observer in {} ",wpsExternalID);
this.observer = o;
}
@Override
public synchronized boolean cancel() {
if (!cancelled){
LOGGER.debug("COMPUTATION INTERRUPTED! ({})",wpsExternalID);
try{
if (agent!=null){
agent.shutdown();
agent = null;
}
super.update(new Integer((int) -1));
try {
updateComputationOnWS(-1, null);
} catch (Exception e) {
}
System.gc();
cancelled = true;
}catch(Exception e){
LOGGER.warn("error cancelling computation with id {}",wpsExternalID);
return false;
}
} else {
LOGGER.debug("COMPUTATION ALREADY INTERRUPT! ({})",wpsExternalID);
return false;
}
return true;
}
@Override
public boolean isCancelled() {
return cancelled;
}
}

View File

@ -74,7 +74,6 @@ public class DataspaceManager implements Runnable {
try {
deleteRunningComputationData();
} catch (Exception e) {
e.printStackTrace();
LOGGER.debug("Dataspace->No running computation available");
}
LOGGER.debug("Dataspace->Writing provenance information");
@ -412,8 +411,7 @@ public class DataspaceManager implements Runnable {
try {
deleteRunningComputationData();
} catch (Exception e) {
e.printStackTrace();
LOGGER.debug("Dataspace->impossible to delete running computation");
LOGGER.debug("Dataspace->impossible to delete running computation : {} ",e.getMessage());
}
// LOGGER.debug("Dataspace->updating computation status");
// LOGGER.debug("Dataspace->connecting to Workspace");

View File

@ -0,0 +1,9 @@
package org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils;
public interface Cancellable {
boolean cancel();
boolean isCancelled();
}

View File

@ -0,0 +1,7 @@
package org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils;
public interface Observable {
public void setObserver(Observer o);
}

View File

@ -0,0 +1,8 @@
package org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils;
public interface Observer {
public void isFinished(Observable o);
public void isStarted(Observable o);
}

View File

@ -19,7 +19,7 @@ public class AlgorithmTest {
public void executeAlgorithmsFromFile() throws Exception{
String protocol = "http";
String hostname = "dataminer2-d-d4s.d4science.org";
String token = "52b59669-ccde-46d2-a4da-108b9e941f7c-98187548";
String token = "7e52e574-3c0d-4673-941e-71cf17ea5134-843339462";
Iterator<String> uris = getUrisIterator();