diff --git a/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/AbstractEcologicalEngineMapper.java b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/AbstractEcologicalEngineMapper.java index 464799b..0652b3c 100644 --- a/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/AbstractEcologicalEngineMapper.java +++ b/src/main/java/org/gcube/dataanalysis/wps/statisticalmanager/synchserver/mapping/AbstractEcologicalEngineMapper.java @@ -48,10 +48,15 @@ import org.n52.wps.server.AbstractAnnotatedAlgorithm; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm implements Observable, Cancellable{ +public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm implements Observable, Cancellable { /** - * Deploying procedure: 1 - modify configuration files 2 - modify resource file: resources/templates/setup.cfg 3 - generate classes with ClassGenerator 4 - add new classes in the wps_config.xml on the wps web app config folder 5 - produce the Jar file of this project 6 - copy the jar file in the lib folder of the wps web app change the server parameters in the wps_config.xml file + * Deploying procedure: 1 - modify configuration files 2 - modify resource + * file: resources/templates/setup.cfg 3 - generate classes with + * ClassGenerator 4 - add new classes in the wps_config.xml on the wps web + * app config folder 5 - produce the Jar file of this project 6 - copy the + * jar file in the lib folder of the wps web app change the server + * parameters in the wps_config.xml file */ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEcologicalEngineMapper.class); @@ -59,11 +64,11 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i private Observer observer = null; private boolean cancelled = false; - + private TokenManager tokenm = null; - + private EnvironmentVariableManager env = null; - + // inputs and outputs public LinkedHashMap inputs = new LinkedHashMap(); public LinkedHashMap outputs = new LinkedHashMap(); @@ -205,7 +210,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i LOGGER.debug("Could not drop Temporary Table: " + table + " table is null"); } } catch (Exception e) { - LOGGER.error("error deleting temporary table",e); + LOGGER.error("error deleting temporary table", e); } finally { DatabaseUtils.closeDBConnection(dbConnection); } @@ -248,14 +253,16 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i float previousStatus = -3; String host = WPSConfig.getInstance().getWPSConfig().getServer().getHostname(); + public void updateStatus(float status, boolean canWrite) { if (agent != null) { if (status != previousStatus) { - LOGGER.debug("STATUS update to: {} ", status ); + LOGGER.debug("STATUS update to: {} ", status); previousStatus = status; super.update(new Integer((int) status)); try { - if (canWrite) updateComputationOnWS(status, null); + if (canWrite) + updateComputationOnWS(status, null); } catch (Exception e) { LOGGER.warn("error updating compution on WS"); } @@ -267,17 +274,18 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i public void setEnvironmentVariableManager(EnvironmentVariableManager env) { this.env = env; } - + public void updateComputationOnWS(float status, String exception) { updateComputationOnWS(status, exception, null, null); } - class RunDataspaceManager implements Runnable{ + class RunDataspaceManager implements Runnable { List inputData; List generatedData; - public RunDataspaceManager(List inputData, List generatedData){ - this.inputData=inputData; - this.generatedData=generatedData; + + public RunDataspaceManager(List inputData, List generatedData) { + this.inputData = inputData; + this.generatedData = generatedData; } public void run() { @@ -286,36 +294,35 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i LOGGER.debug("Dataspace->Status updater->Writing computational info on the WS asyncronously"); manager.writeRunningComputationData(); } catch (Exception ez) { - LOGGER.error("Dataspace->Status updater->Impossible to write computation information on the Workspace",ez); + LOGGER.error("Dataspace->Status updater->Impossible to write computation information on the Workspace", + ez); } } }; - public void updateComputationOnWS(float status, String exception, List inputData, List generatedData) { + public void updateComputationOnWS(float status, String exception, List inputData, + List generatedData) { if (currentComputation != null) { currentComputation.setStatus("" + status); if (exception != null && exception.length() > 0) currentComputation.setException(exception); - LOGGER.debug("RunDataspaceManager: [inputData="+inputData+", generatedData="+generatedData+"]"); - RunDataspaceManager rundm = new RunDataspaceManager(inputData,generatedData); + LOGGER.debug("RunDataspaceManager: [inputData=" + inputData + ", generatedData=" + generatedData + "]"); + RunDataspaceManager rundm = new RunDataspaceManager(inputData, generatedData); rundm.run(); /* - Thread t = new Thread(rundm); - t.start(); + * Thread t = new Thread(rundm); t.start(); */ } } - - @Execute public void run() throws Exception { - if (observer!=null) + if (observer != null) observer.isStarted(this); - LOGGER.info("classloader context in this thread is {}",Thread.currentThread().getContextClassLoader()); - + LOGGER.info("classloader context in this thread is {}", Thread.currentThread().getContextClassLoader()); + long startTimeLong = System.currentTimeMillis(); OperationResult operationResult = null; @@ -324,7 +331,8 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i List generatedInputTables = null; List generatedOutputTables = null; List generatedFiles = null; - //String date = new java.text.SimpleDateFormat("dd_MM_yyyy_HH:mm:ss").format(System.currentTimeMillis()); + // String date = new + // java.text.SimpleDateFormat("dd_MM_yyyy_HH:mm:ss").format(System.currentTimeMillis()); String computationSession = this.getAlgorithmClass().getSimpleName() + "_ID_" + UUID.randomUUID().toString(); if (wpsExternalID != null) { LOGGER.info("Using wps External ID " + wpsExternalID); @@ -332,16 +340,19 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i } else LOGGER.info("Wps External ID not set"); InputsManager inputsManager = null; - ConfigurationManager configManager = new ConfigurationManager(this.env); // initializes parameters from web.xml + ConfigurationManager configManager = new ConfigurationManager(this.env); // initializes + // parameters + // from + // web.xml manageUserToken(); - + boolean canWriteOnShub = checkWriteAuthorization(tokenm.getUserName()); - + Path dir = Paths.get(System.getProperty("java.io.tmpdir"), "dmlocks"); if (!Files.exists(dir)) dir = Files.createDirectory(dir); Path lockFile = Files.createTempFile(dir, "dm", ".lck"); - LOGGER.info("lock file created {}",lockFile.toUri().toURL()); + LOGGER.info("lock file created {}", lockFile.toUri().toURL()); try { // wait for server resources to be available @@ -364,8 +375,10 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i String scope = configManager.getScope(); String username = configManager.getUsername(); - LOGGER.info("1 - Algorithm environment initialized in scope " + scope + " with user name " + username + " and session " + computationSession); - LOGGER.info("Max allowed computations " + ConfigurationManager.getMaxComputations() + " using storage " + ConfigurationManager.useStorage()); + LOGGER.info("1 - Algorithm environment initialized in scope " + scope + " with user name " + username + + " and session " + computationSession); + LOGGER.info("Max allowed computations " + ConfigurationManager.getMaxComputations() + " using storage " + + ConfigurationManager.useStorage()); // init the infrastructure dialoguer LOGGER.info("2 - Initializing connection to the e-Infrastructure"); infrastructureDialoguer = new InfrastructureDialoguer(scope); @@ -392,7 +405,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i // adding service parameters to the configuration LOGGER.info("5 - Adding Service parameters to the configuration"); List dataminerInputParameters = getInputParameters(algorithm); - LOGGER.debug("Dataminer Algo Default InputParameters: "+dataminerInputParameters); + LOGGER.debug("Dataminer Algo Default InputParameters: " + dataminerInputParameters); inputsManager.addInputServiceParameters(dataminerInputParameters, infrastructureDialoguer); time("Service parameters added to the algorithm"); // merging wps with ecological engine parameters - modifies the @@ -400,7 +413,9 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i LOGGER.info("6 - Translating WPS Inputs into Ecological Engine Inputs"); LOGGER.debug("Operator class is " + this.getClass().getCanonicalName()); // build computation Data - currentComputation = new ComputationData(config.getTaskID(), config.getAgent(), "", "", startTime, "-", "0", config.getTaskID(), configManager.getUsername(), config.getGcubeScope(), this.getClass().getCanonicalName()); + currentComputation = new ComputationData(config.getTaskID(), config.getAgent(), "", "", startTime, "-", "0", + config.getTaskID(), configManager.getUsername(), config.getGcubeScope(), + this.getClass().getCanonicalName()); inputsManager.mergeWpsAndEcologicalInputs(supportDatabaseInfo, dataminerInputParameters); generatedInputTables = inputsManager.getGeneratedTables(); generatedFiles = inputsManager.getGeneratedInputFiles(); @@ -452,14 +467,16 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i time("Output preparation for WPS document (no storage manager)"); outputmanager.shutdown(); - + LOGGER.debug("12 - Final Computation Output"); - LOGGER.debug("Outputs: "+ outputs); + LOGGER.debug("Outputs: " + outputs); endTime = new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(System.currentTimeMillis()); if (!isCancelled()) { LOGGER.debug("Save Computation Data"); - if (canWriteOnShub) saveComputationOnWS(inputsManager.getProvenanceData(), outputmanager.getProvenanceData(), agent, generatedFiles); + if (canWriteOnShub) + saveComputationOnWS(inputsManager.getProvenanceData(), outputmanager.getProvenanceData(), agent, + generatedFiles); } else { LOGGER.debug("Computation interrupted - no update"); throw new Exception("Computation cancelled"); @@ -468,21 +485,26 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i operationResult = OperationResult.SUCCESS; } catch (Exception e) { operationResult = OperationResult.FAILED; - LOGGER.error("Error execution Algorithm {}",algorithm,e); + LOGGER.error("Error execution Algorithm {}", algorithm, e); int exitstatus = -2; if (isCancelled()) exitstatus = -1; if (inputsManager != null) - if (canWriteOnShub) updateComputationOnWS(exitstatus, e.getMessage(), inputsManager.getProvenanceData(), generatedFiles); - else - if (canWriteOnShub) updateComputationOnWS(exitstatus, e.getMessage()); + if (canWriteOnShub) + updateComputationOnWS(exitstatus, e.getMessage(), inputsManager.getProvenanceData(), + generatedFiles); + else if (canWriteOnShub) + updateComputationOnWS(exitstatus, e.getMessage()); if (isCancelled()) throw new Exception("Computation cancelled"); else throw e; } finally { LOGGER.debug("accounting algorithm"); + if(operationResult==null){ + operationResult=OperationResult.FAILED; + } accountAlgorithmExecution(startTimeLong, System.currentTimeMillis(), operationResult); LOGGER.debug("Deleting Input Tables"); deleteTemporaryTables(generatedInputTables); @@ -496,7 +518,8 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i time("Cleaning of resources"); displayTimes(); cleanResources(); - if (observer!=null) observer.isFinished(this); + if (observer != null) + observer.isFinished(this); LOGGER.debug("All done - Computation Finished"); Files.deleteIfExists(lockFile); } @@ -504,7 +527,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i } private boolean checkWriteAuthorization(String username) { - if (env!=null && env.getShubUsersExcluded()!=null) { + if (env != null && env.getShubUsersExcluded() != null) { if (env.getShubUsersExcluded().isEmpty()) { return false; } @@ -516,41 +539,41 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i } private void accountAlgorithmExecution(long start, long end, OperationResult result) { - try{ + try { JobUsageRecord jobUsageRecord = new JobUsageRecord(); jobUsageRecord.setJobName(this.getAlgorithmClass().getSimpleName()); jobUsageRecord.setConsumerId(tokenm.getUserName()); - jobUsageRecord.setDuration(end-start); + jobUsageRecord.setDuration(end - start); jobUsageRecord.setOperationResult(result); jobUsageRecord.setServiceName("DataMiner"); jobUsageRecord.setServiceClass("WPS"); jobUsageRecord.setHost(WPSConfig.getInstance().getWPSConfig().getServer().getHostname()); jobUsageRecord.setCallerQualifier(tokenm.getTokenQualifier()); - - AccountingPersistence accountingPersistence = - AccountingPersistenceFactory.getPersistence(); + + AccountingPersistence accountingPersistence = AccountingPersistenceFactory.getPersistence(); accountingPersistence.account(jobUsageRecord); - }catch(Throwable e){ - LOGGER.error("error accounting algorithm execution",e); + } catch (Throwable e) { + LOGGER.error("error accounting algorithm execution", e); } } public class StatusUpdater implements Runnable { - + private boolean canWrite = true; - + public StatusUpdater(boolean canWrite) { this.canWrite = canWrite; } - + @Override public void run() { while (agent != null && !isCancelled() && agent.getStatus() < 100) { try { updateStatus(agent.getStatus(), canWrite); Thread.sleep(10000); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + } } LOGGER.info("Status updater terminated"); } @@ -564,15 +587,19 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i LOGGER.debug("Provenance manager running"); } - private void saveComputationOnWS(List inputData, List outputData, ComputationalAgent agent, List generatedFiles) { + private void saveComputationOnWS(List inputData, List outputData, ComputationalAgent agent, + List generatedFiles) { LOGGER.debug("Save Computation On WS"); - LOGGER.debug("InputData: "+inputData); - LOGGER.debug("OutputData: "+outputData); - LOGGER.debug("Agent: "+agent); - LOGGER.debug("Generated files: "+generatedFiles); + LOGGER.debug("InputData: " + inputData); + LOGGER.debug("OutputData: " + outputData); + LOGGER.debug("Agent: " + agent); + LOGGER.debug("Generated files: " + generatedFiles); LOGGER.debug("Provenance manager started for operator " + this.getClass().getCanonicalName()); - - ComputationData computation = new ComputationData(config.getTaskID(), config.getAgent(), agent.getDescription(), agent.getInfrastructure().name(), startTime, endTime, "100", config.getTaskID(), config.getParam(ConfigurationManager.serviceUserNameParameterVariable), config.getGcubeScope(), this.getClass().getCanonicalName()); + + ComputationData computation = new ComputationData(config.getTaskID(), config.getAgent(), agent.getDescription(), + agent.getInfrastructure().name(), startTime, endTime, "100", config.getTaskID(), + config.getParam(ConfigurationManager.serviceUserNameParameterVariable), config.getGcubeScope(), + this.getClass().getCanonicalName()); // post on WS DataspaceManager manager = new DataspaceManager(config, computation, inputData, outputData, generatedFiles); @@ -637,7 +664,8 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i public static List command(final String cmdline, final String directory) { try { - Process process = new ProcessBuilder(new String[] { "bash", "-c", cmdline }).redirectErrorStream(true).directory(new File(directory)).start(); + Process process = new ProcessBuilder(new String[] { "bash", "-c", cmdline }).redirectErrorStream(true) + .directory(new File(directory)).start(); List output = new ArrayList(); BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream())); @@ -658,16 +686,16 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i @Override public void setObserver(Observer o) { - LOGGER.debug("setting observer in {} ",wpsExternalID); - this.observer = o; + LOGGER.debug("setting observer in {} ", wpsExternalID); + this.observer = o; } @Override public synchronized boolean cancel() { - if (!cancelled){ - LOGGER.debug("COMPUTATION INTERRUPTED! ({})",wpsExternalID); - try{ - if (agent!=null){ + if (!cancelled) { + LOGGER.debug("COMPUTATION INTERRUPTED! ({})", wpsExternalID); + try { + if (agent != null) { agent.shutdown(); agent = null; } @@ -680,12 +708,12 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm i } System.gc(); cancelled = true; - }catch(Exception e){ - LOGGER.warn("error cancelling computation with id {}",wpsExternalID); + } catch (Exception e) { + LOGGER.warn("error cancelling computation with id {}", wpsExternalID); return false; } } else { - LOGGER.debug("COMPUTATION ALREADY INTERRUPT! ({})",wpsExternalID); + LOGGER.debug("COMPUTATION ALREADY INTERRUPT! ({})", wpsExternalID); return false; } return true;