diff --git a/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java b/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java new file mode 100644 index 0000000..ab2b435 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java @@ -0,0 +1,26 @@ +package org.gcube.dataanalysis.ecoengine.interfaces; + +import java.io.File; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE; +import org.gcube.dataanalysis.ecoengine.datatypes.DatabaseType; +import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; +import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; +import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.DatabaseParameters; +import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; +import org.gcube.dataanalysis.ecoengine.interfaces.Transducerer; +import org.gcube.dataanalysis.ecoengine.utils.ResourceFactory; + + +public abstract class StandardLocalInfraAlgorithm extends StandardLocalExternalAlgorithm{ + + public boolean sendNotificationEmail(){ + return true; + } +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java index 966a6ac..ad2247d 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java +++ b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java @@ -42,7 +42,7 @@ public class D4ScienceDistributedProcessing implements Generator { agent.compute(); distributedModel.postProcess(agent.hasResentMessages(),false); } catch (Exception e) { - distributedModel.postProcess(false,true); + try{distributedModel.postProcess(false,true);}catch(Exception ee){} AnalysisLogger.getLogger().error("ERROR: An Error occurred ", e); throw e; } finally { diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java index 14b5e68..045a355 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java @@ -107,7 +107,7 @@ public class DistributedProcessingAgentWPS { subdivisiondiv = 1; executeWork(leftSetNumberOfElements, rightSetNumberOfElements, 0, subdivisiondiv, deletefiles, forceUpload); - + AnalysisLogger.getLogger().debug("The WPS job has been completely executed"); if (jobManager.wasAborted()) { logger.debug("Warning: Job was aborted"); // distributionModel.postProcess(false,true); @@ -120,7 +120,7 @@ public class DistributedProcessingAgentWPS { } catch (Exception e) { - logger.error("ERROR: An Error occurred ", e); + AnalysisLogger.getLogger().debug("The WPS job got an error "+e.getLocalizedMessage()); e.printStackTrace(); throw e; } finally { diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java index 3aafa27..cee3e0b 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java @@ -12,15 +12,14 @@ import java.io.PrintStream; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.util.UUID; import org.apache.log4j.Logger; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler; import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; -import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm; +import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm; -public class GenericWorker extends StandardLocalExternalAlgorithm{ +public class GenericWorker extends StandardLocalInfraAlgorithm{ private static String genericWorkerDir = "/genericworker/"; @@ -67,10 +66,6 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ File tempDir = null ; try { Handler.activateProtocol(); - - if (session == null) - - // invoke the algorithm logger.debug("GenericWorker-> Creating algorithm " + algorithmClass); ActorNode node = (ActorNode) Class.forName(algorithmClass).newInstance(); @@ -97,6 +92,12 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt"); String log = new String(baos.toByteArray(), StandardCharsets.UTF_8); + //manage known issues + log=log.replace(".XMLStreamException: Unbound namespace URI", "Known Except"); + log=log.replace("java.io.IOException: Error copying XML", "Known Except"); + log=log.replace("java.io.FileNotFoundException: /home/gcube/tomcat/tmp/ConfigurationFile", "Known Except"); + log=log.replace("java.io.FileNotFoundException: payload was not made available for this dataset", "Known Except"); + logger.debug("GenericWorker-> Execution Fulllog" ); logger.debug("GenericWorker-> " + log); logger.debug("GenericWorker-> Script executed! " ); @@ -105,19 +106,25 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ logger.debug("GenericWorker-> deleted sandbox file: "+del ); logger.debug("GenericWorker-> all done"); + + + if (log.contains("Exception:")){ outputParameters.put(OutputParameter, TASK_FAILURE); String cutLog = URLEncoder.encode(log, "UTF-8"); int maxlen = 20240; + int startidx = log.indexOf("Exception:"); + log = log.substring(startidx); if (log.length()>maxlen) cutLog = cutLog.substring(0,maxlen)+"..."; outputParameters.put("Log", cutLog); logger.debug("GenericWorker-> Failure!"); } - else + else{ outputParameters.put(OutputParameter, TASK_SUCCESS); - logger.debug("GenericWorker-> Success!"); + logger.debug("GenericWorker-> Success!"); + } } catch (Throwable e) { outputParameters.put(OutputParameter, TASK_FAILURE); outputParameters.put("Log", e.getLocalizedMessage()); diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorkerCaller.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorkerCaller.java index 8ebb3ad..d22adee 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorkerCaller.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorkerCaller.java @@ -1,6 +1,10 @@ package org.gcube.dataanalysis.executor.job.management; +import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.InputStreamReader; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools; @@ -11,12 +15,13 @@ import com.thoughtworks.xstream.XStream; public class GenericWorkerCaller { - public static String getGenericWorkerCall(String algorithm, String session, AlgorithmConfiguration configuration,int leftSetIndex,int rightSetIndex,int leftElements,int rightElements, boolean isduplicate,boolean deleteTemporaryFiles) throws Exception{ - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + public static String getGenericWorkerCall(String algorithm, String session, AlgorithmConfiguration configuration,int leftSetIndex,int rightSetIndex,int leftElements,int rightElements, boolean isduplicate,boolean deleteTemporaryFiles, String callTemplate) throws Exception{ + String xmlconfig = new XStream().toXML(configuration); xmlconfig = xmlconfig.replace("\n", "").replace("\t", ""); xmlconfig = xmlconfig.replaceAll(">[ ]+<", "> <"); + /* AnalysisLogger.getLogger().debug("CONFIG of Task:"); AnalysisLogger.getLogger().debug("algorithm: "+algorithm); AnalysisLogger.getLogger().debug("leftSetIndex: "+leftSetIndex); @@ -26,10 +31,11 @@ public class GenericWorkerCaller { AnalysisLogger.getLogger().debug("session: "+session); AnalysisLogger.getLogger().debug("isduplicate: "+isduplicate); AnalysisLogger.getLogger().debug("deleteTemporaryFiles: "+deleteTemporaryFiles); - - File is = new File(classLoader.getResource("WPSGWTemplate.xml").getFile()); - String call=FileTools.loadString(is.getAbsolutePath(), "UTF-8"); - AnalysisLogger.getLogger().debug("call template : "+call); + */ + //String call=FileTools.loadString(is.getAbsolutePath(), "UTF-8"); + String call = new String(callTemplate.getBytes()); +// String call = callTemplate; + //AnalysisLogger.getLogger().debug("call template : "+call); call = call.replace("#"+GenericWorker.AlgorithmClassParameter+"#", algorithm); call = call.replace("#"+GenericWorker.LeftSetStartIndexParameter+"#", ""+leftSetIndex); call = call.replace("#"+GenericWorker.NumberOfLeftElementsToProcessParameter+"#", ""+leftElements); diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java index e2749ed..6454cbf 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java @@ -1,63 +1,23 @@ package org.gcube.dataanalysis.executor.job.management; -import java.io.File; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; - -import org.apache.activemq.ActiveMQConnection; -import org.gcube.common.clients.ProxyBuilderImpl; -import org.gcube.common.resources.gcore.ServiceEndpoint; -import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint; -import org.gcube.common.scope.api.ScopeProvider; -import org.gcube.contentmanagement.blobstorage.resource.StorageObject; -import org.gcube.contentmanagement.blobstorage.service.IClient; import org.gcube.contentmanagement.graphtools.utils.HttpRequest; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.contentmanager.storageclient.wrapper.AccessType; -import org.gcube.contentmanager.storageclient.wrapper.MemoryType; -import org.gcube.contentmanager.storageclient.wrapper.StorageClient; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; -import org.gcube.dataanalysis.ecoengine.utils.Operations; -import org.gcube.dataanalysis.executor.messagequeue.ATTRIBUTE; -import org.gcube.dataanalysis.executor.messagequeue.Consumer; -import org.gcube.dataanalysis.executor.messagequeue.Producer; -import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS; -import org.gcube.dataanalysis.executor.messagequeue.QueueManager; -import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer; -import org.gcube.dataanalysis.executor.nodes.transducers.bionym.utils.YasmeenGlobalParameters; -import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker; import org.gcube.dataanalysis.executor.util.InfraRetrieval; -import org.gcube.resources.discovery.client.api.DiscoveryClient; -import org.gcube.resources.discovery.client.queries.api.SimpleQuery; -import org.gcube.vremanagement.executor.api.SmartExecutor; -import org.gcube.vremanagement.executor.api.types.LaunchParameter; -import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; -import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery; -import org.gcube.vremanagement.executor.client.plugins.query.filter.ListEndpointDiscoveryFilter; -import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; -import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; - -import com.thoughtworks.xstream.XStream; - -import static org.gcube.resources.discovery.icclient.ICFactory.*; public class WPSJobManager { static final int pollingTime = 5000; - static final int maxTrialsPerThread = 3; + static final long maxTaskTime= 12*60000; //allowed max 12 hours per task int overallFailures = 0; @@ -68,6 +28,31 @@ public class WPSJobManager { boolean stopThreads = false; boolean hasResentMessages = false; + + public static String getCallTemplate(){ + String call = null; + try{ + InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("templates/WPSGWTemplate2.xml"); + AnalysisLogger.getLogger().debug("WPSJobManager->GW template Input stream is null "+(is==null)); + BufferedReader in = new BufferedReader(new InputStreamReader(is)); + String line = null; + StringBuilder vud = new StringBuilder(); + + while ((line = in.readLine()) != null) { + vud.append(line + "\n"); + } + + in.close(); + + call = vud.toString(); + + + }catch(Exception e){ + e.printStackTrace(); + } + return call; + } + final public synchronized void incrementOverallFailures() { overallFailures++; } @@ -103,9 +88,10 @@ public class WPSJobManager { int rightSetIndex; int leftElements; int rightElements; + String callTemplate; + int maxTrialsPerThread; - - public TasksWatcher(String algorithm, String username, String token, String wpsHost, int wpsPort, String session, int taskNumber, AlgorithmConfiguration configuration, int leftSetIndex, int rightSetIndex, int leftElements, int rightElements) { + public TasksWatcher(String algorithm, String username, String token, String wpsHost, int wpsPort, String session, int taskNumber, AlgorithmConfiguration configuration, int leftSetIndex, int rightSetIndex, int leftElements, int rightElements,String callTemplate, int maxTrialsPerThread) { this.algorithm = algorithm; this.token = token; this.wpsHost = wpsHost; @@ -118,6 +104,8 @@ public class WPSJobManager { this.leftElements = leftElements; this.rightSetIndex = rightSetIndex; this.rightElements = rightElements; + this.callTemplate=callTemplate; + this.maxTrialsPerThread=maxTrialsPerThread; } @@ -125,11 +113,12 @@ public class WPSJobManager { String url = "http://" + wpsHost + ":" + wpsPort + "/wps/WebProcessingService"; boolean deleteTemporaryFiles = true; - AnalysisLogger.getLogger().debug("Task Number : " + taskNumber+" GO!"); + AnalysisLogger.getLogger().debug("WPSJobManager->Task Number : " + taskNumber+" GO!"); try { - String algorithmCall = GenericWorkerCaller.getGenericWorkerCall(algorithm, session, configuration, leftSetIndex, rightSetIndex, leftElements, rightElements, isduplicate, deleteTemporaryFiles); + AnalysisLogger.getLogger().debug("WPSJobManager->Invoking the GW to start"); + String algorithmCall = GenericWorkerCaller.getGenericWorkerCall(algorithm, session, configuration, leftSetIndex, rightSetIndex, leftElements, rightElements, isduplicate, deleteTemporaryFiles,callTemplate); String result = HttpRequest.PostXmlString(url, wpsHost, wpsPort, new LinkedHashMap(), username, token, algorithmCall); -// AnalysisLogger.getLogger().debug("Result: " + result); + AnalysisLogger.getLogger().debug("WPSJobManager->GW starting Output " + result.replace("\n", "")); boolean success = false; boolean failure = false; @@ -140,11 +129,13 @@ public class WPSJobManager { failure = true; String statusLocation = ""; - + long taskTimeCounter = 0; while (!success && !isStopped() && (!failure) ) { //while !success and failure - if (result == null || result.contains(GenericWorker.TASK_FAILURE) || result.contains("Exception")) + if (result == null || result.contains(GenericWorker.TASK_FAILURE)) failure = true; - + else if (taskTimeCounter>maxTaskTime){ + failure = true; + } else if (result.contains(GenericWorker.TASK_SUCCESS)) success = true; else if (result.contains("Process Accepted")) { @@ -156,6 +147,7 @@ public class WPSJobManager { result= ""; } else { Thread.sleep(pollingTime); + taskTimeCounter+=pollingTime; result = HttpRequest.sendGetRequest(statusLocation, ""); // AnalysisLogger.getLogger().debug("Result in location: " + result); } @@ -172,14 +164,14 @@ public class WPSJobManager { if (failure) { exitstatus = GenericWorker.TASK_FAILURE; - AnalysisLogger.getLogger().debug("Task Number "+taskNumber+" - Failure cause: " + result); + AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Failure cause: " + result); } // AnalysisLogger.getLogger().debug("Process execution finished: " + exitstatus); } catch (Exception e) { e.printStackTrace(); AnalysisLogger.getLogger().debug(e); - AnalysisLogger.getLogger().debug("Task Number "+taskNumber+" - Process exception: " + e.getLocalizedMessage()); + AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Process exception: " + e.getLocalizedMessage()); exitstatus = GenericWorker.TASK_FAILURE; }finally{ @@ -196,7 +188,7 @@ public class WPSJobManager { trials++; hasResentTrue(); duplicate = true; - AnalysisLogger.getLogger().debug("Task Number "+taskNumber+" - Retrying n."+trials); + AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Retrying n."+trials); } } @@ -205,7 +197,7 @@ public class WPSJobManager { else incrementOverallFailures(); - AnalysisLogger.getLogger().debug("Task Number "+taskNumber+" - Finished: " + exitstatus); + AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Finished: " + exitstatus); } } @@ -230,18 +222,20 @@ public class WPSJobManager { return hasResentMessages; } - public void uploadAndExecuteChunkized(AlgorithmConfiguration configuration, String algorithmClass, List arguments, String session) { + public void uploadAndExecuteChunkized(AlgorithmConfiguration configuration, String algorithmClass, List arguments, String session) throws Exception{ ExecutorService executor = null; try{ int numberofservices = 1; - - AnalysisLogger.getLogger().debug("Estimating the number of services"); + String callTemplate = getCallTemplate(); + + AnalysisLogger.getLogger().debug("WPSJobManager->Estimating the number of services"); List wpsservices = InfraRetrieval.retrieveService("DataMiner", configuration.getGcubeScope()); - if (wpsservices==null || wpsservices.size()==0) - throw new Exception ("No Dataminer GCore Endpoint found in the VRE "+configuration.getGcubeScope()); - + if (wpsservices==null || wpsservices.size()==0){ + AnalysisLogger.getLogger().debug("WPSJobManager->Error: No DataMiner GCore Endpoints found!"); + throw new Exception ("No DataMinerWorkers GCore Endpoint found in the VRE "+configuration.getGcubeScope()); + } List differentServices = new ArrayList(); for (String service:wpsservices){ @@ -251,19 +245,23 @@ public class WPSJobManager { differentServices.add(service); } + numberofservices = differentServices.size(); AnalysisLogger.getLogger().debug("WPSJobManager->Number of dataminer services "+numberofservices); int parallelisation = numberofservices*2; AnalysisLogger.getLogger().debug("WPSJobManager->Number of parallel processes (parallelisation) : "+parallelisation); - List wpshosts = InfraRetrieval.retrieveAddresses("DataAnalysis",configuration.getGcubeScope(),"-----"); - - if (wpshosts==null || wpshosts.size()==0) - throw new Exception ("WPSJobManager->No Dataminer Service Endpoint found in the VRE "+configuration.getGcubeScope()); + //List wpshosts = InfraRetrieval.retrieveAddresses("DataAnalysis",configuration.getGcubeScope(),"-----"); + List wpshosts = InfraRetrieval.retrieveServiceAddress("DataAnalysis","DataMinerWorkers",configuration.getGcubeScope(),"noexclusion"); + if (wpshosts==null || wpshosts.size()==0){ + AnalysisLogger.getLogger().debug("WPSJobManager->Error: No DataMinerWorkers Service Endpoints found at all!"); + throw new Exception ("WPSJobManager->No Dataminer Workers Service Endpoint found in the VRE - DataMinerWorkers Resource is required in the VRE"+configuration.getGcubeScope()); + } String wpshost = wpshosts.get(0); + wpshost = wpshost.substring(wpshost.indexOf("/")+2); //String wpshostAddress = wpshost.substring(0,wpshost.indexOf(":")); String wpshostAddress = wpshost.substring(0,wpshost.indexOf("/")); @@ -285,18 +283,21 @@ public class WPSJobManager { int leftNum = Integer.parseInt(lfnlnr[1]); int rightOff = Integer.parseInt(lfnlnr[2]); int rightNum = Integer.parseInt(lfnlnr[3]); - + int maxTrials = parallelisation; TasksWatcher watcher = new TasksWatcher(algorithmClass, configuration.getGcubeUserName(), - configuration.getGcubeToken(),wpshost,wpsport,session,taskNumber,configuration, leftOff, rightOff,leftNum,rightNum); + configuration.getGcubeToken(), + wpshost,wpsport,session,taskNumber,configuration, leftOff, rightOff,leftNum,rightNum,callTemplate, maxTrials); executor.execute(watcher); - AnalysisLogger.getLogger().debug("WPSJobManager->Task number "+taskNumber+" launched!"); + AnalysisLogger.getLogger().debug("WPSJobManager->Task number "+taskNumber+" launched with arguments: "+argument); taskNumber++; + Thread.sleep(1000); } int njobs = overallFailures+overallSuccess; int pnjobs =njobs; + while (njobsNumber of finished jobs "+njobs+" of "+overallTasks); AnalysisLogger.getLogger().debug("WPSJobManager->Number of errors "+overallFailures+" - perc failure "+percFailure); + pnjobs=njobs; } } AnalysisLogger.getLogger().debug("WPSJobManager->Overall computation finished"); }catch(Exception e){ e.printStackTrace(); + throw e; } finally{ if (executor!=null){ AnalysisLogger.getLogger().debug("WPSJobManager->Shutting down the executions"); executor.shutdown(); + AnalysisLogger.getLogger().debug("WPSJobManager->Shut down completed"); } } diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/WebApplicationPublisher.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/WebApplicationPublisher.java index dd915fe..a8d1596 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/WebApplicationPublisher.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/WebApplicationPublisher.java @@ -13,13 +13,13 @@ import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; import org.gcube.dataanalysis.ecoengine.datatypes.ServiceType; import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.ServiceParameters; -import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm; +import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm; import org.gcube.dataanalysis.ecoengine.utils.ZipTools; import org.gcube.dataanalysis.executor.util.DataTransferer; import org.gcube.dataanalysis.executor.util.InfraRetrieval; -public class WebApplicationPublisher extends StandardLocalExternalAlgorithm{ +public class WebApplicationPublisher extends StandardLocalInfraAlgorithm{ // private static String MainPageParam = "MainPage"; private static String FileParam = "ZipFile"; private String transferServiceAddress = ""; diff --git a/src/main/java/org/gcube/dataanalysis/executor/rscripts/SGVMS_Interpolation.java b/src/main/java/org/gcube/dataanalysis/executor/rscripts/SGVMS_Interpolation.java index 93cfb36..739c5e1 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/rscripts/SGVMS_Interpolation.java +++ b/src/main/java/org/gcube/dataanalysis/executor/rscripts/SGVMS_Interpolation.java @@ -8,10 +8,10 @@ import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; -import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm; +import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm; import org.gcube.dataanalysis.executor.util.RScriptsManager; -public class SGVMS_Interpolation extends StandardLocalExternalAlgorithm { +public class SGVMS_Interpolation extends StandardLocalInfraAlgorithm { private static int maxPoints = 10000; public enum methodEnum { cHs, SL}; diff --git a/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java b/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java index e788baf..0686911 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java +++ b/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java @@ -11,13 +11,15 @@ import java.util.UUID; import org.apache.commons.io.FileUtils; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools; +import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; -import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm; +import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; +import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm; import org.gcube.dataanalysis.ecoengine.utils.ZipTools; import org.gcube.dataanalysis.executor.util.LocalRScriptsManager; import org.gcube.dataanalysis.executor.util.StorageUtils; -public abstract class GenericRScript extends StandardLocalExternalAlgorithm { +public abstract class GenericRScript extends StandardLocalInfraAlgorithm { // FIXED part protected HashMap outputValues = new HashMap(); @@ -154,6 +156,12 @@ public abstract class GenericRScript extends StandardLocalExternalAlgorithm { AnalysisLogger.getLogger().debug(e); AnalysisLogger.getLogger().debug("Could not delete sandbox folder " + folder.getAbsolutePath()); } + + if (Rlog != null) { + File logFile = saveLogFile(Rlog); + output.put("Log", new PrimitiveType(File.class.getName(), logFile, PrimitiveTypes.FILE, "LogFile", "Log of the computation")); + } + } catch (Exception e) { if (Rlog != null) { @@ -168,6 +176,18 @@ public abstract class GenericRScript extends StandardLocalExternalAlgorithm { } } + protected File saveLogFile(String Rlog) throws Exception { + String uuid = "" + UUID.randomUUID(); + AnalysisLogger.getLogger().debug("Writing the logs of the execution"); + File logfile = new File(config.getPersistencePath(), "RLOG" + uuid + ".txt"); + + FileWriter fw = new FileWriter(logfile); + fw.write(Rlog); + fw.close(); + AnalysisLogger.getLogger().debug("Written in " + logfile); + return logfile; + } + protected String generateRemoteLogFile(String Rlog) throws Exception { String uuid = "" + UUID.randomUUID(); AnalysisLogger.getLogger().debug("Writing the logs of the execution"); diff --git a/src/main/java/org/gcube/dataanalysis/executor/tests/TestAquaMapsJobs.java b/src/main/java/org/gcube/dataanalysis/executor/tests/TestAquaMapsJobs.java new file mode 100644 index 0000000..af924f4 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/tests/TestAquaMapsJobs.java @@ -0,0 +1,80 @@ +package org.gcube.dataanalysis.executor.tests; + +import java.util.ArrayList; +import java.util.List; + +import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.executor.job.management.GenericWorker; +import org.gcube.dataanalysis.executor.job.management.WPSJobManager; +import org.gcube.dataanalysis.executor.job.management.WPSJobManager.TasksWatcher; +import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer; +import org.gcube.dataanalysis.executor.nodes.transducers.bionym.utils.YasmeenGlobalParameters; + +public class TestAquaMapsJobs { + + public static AlgorithmConfiguration buildTestConfiguration(){ + AlgorithmConfiguration config = new AlgorithmConfiguration(); + config.setConfigPath("./cfg/"); + AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile); + config.setParam("DatabaseUserName", "utente"); + config.setParam("DatabasePassword", "d4science"); + config.setParam("DatabaseURL", "jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb"); + + config.setParam("EnvelopeTable","http://data.d4science.org/b2hOQ1phWEVGcUxDZWZucS9UQkJmWG9JT2JDNUlTbjhHbWJQNStIS0N6Yz0"); + config.setParam("CsquarecodesTable","http://data.d4science.org/d2JpZUZ4VkRvVTlmcElhcUlmQUpWdE1mOGZTZ0xhNHlHbWJQNStIS0N6Yz0"); + config.setParam("DistributionTableLabel","hspec"); + config.setParam("OccurrencePointsTable","http://data.d4science.org/ZGVCYjJaWTFmaGhmcElhcUlmQUpWb2NoYVFvclBZaG5HbWJQNStIS0N6Yz0"); + + config.setAgent("AQUAMAPS_SUITABLE"); + config.setPersistencePath("./"); + config.setGcubeScope("/gcube/devNext/NextNext"); +// config.setGcubeScope("/gcube/devsec/devVRE"); + config.setParam("ServiceUserName", "gianpaolo.coro"); + config.setParam("DatabaseDriver", "org.postgresql.Driver"); + config.setGcubeUserName("gianpaolo.coro"); + config.setGcubeToken("f9d49d76-cd60-48ed-9f8e-036bcc1fc045-98187548"); + + return config; + } + + public static void main1(String[] args) throws Exception { + + String host = "dataminer1-devnext.d4science.org"; + String session = "12345"; + int port = 80; + String algorithm = "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer"; + AlgorithmConfiguration config = buildTestConfiguration(); + + WPSJobManager manager = new WPSJobManager(); + TasksWatcher taskWatcher = manager.new TasksWatcher(algorithm, config.getGcubeUserName(), config.getGcubeToken(), host, port, session, 1, config, 1, 1, 1, 1,"",1); + Thread t = new Thread(taskWatcher); + t.start(); + + while (taskWatcher.exitstatus.equals(GenericWorker.TASK_UNDEFINED)){ + Thread.sleep(1000); + System.out.print("."); + } + + AnalysisLogger.getLogger().debug("Task 1 terminated with output "+taskWatcher.exitstatus ); + //taskWatcher.run(); + } + + public static void main(String[] args) throws Exception { + AlgorithmConfiguration config = buildTestConfiguration(); + String algorithm = "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer"; + ScopeProvider.instance.set(config.getGcubeScope()); + WPSJobManager jobmanager = new WPSJobManager(); + //int nArguments = 100; + int nArguments = 1; + List arguments = new ArrayList(); + for (int i=1;i<=nArguments;i++){ + String argument = "0 178204 0 11"; + arguments.add(argument); + } + String sessionID ="1234"; + jobmanager.uploadAndExecuteChunkized(config, algorithm, arguments,sessionID); + + } +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/tests/TestWPSJobs.java b/src/main/java/org/gcube/dataanalysis/executor/tests/TestWPSJobs.java index 1e3388a..0d89f95 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/tests/TestWPSJobs.java +++ b/src/main/java/org/gcube/dataanalysis/executor/tests/TestWPSJobs.java @@ -23,7 +23,9 @@ public class TestWPSJobs { config.setParam("DatabaseURL", "jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb"); config.setParam(YasmeenGlobalParameters.parserNameParam, YasmeenGlobalParameters.BuiltinParsers.SIMPLE.name()); - config.setParam(YasmeenGlobalParameters.taxaAuthorityFileParam, YasmeenGlobalParameters.BuiltinDataSources.WORMS_PISCES.name()); + config.setParam(YasmeenGlobalParameters.taxaAuthorityFileParam, YasmeenGlobalParameters.BuiltinDataSources.FISHBASE.name()); + config.setParam(YasmeenGlobalParameters.performanceParam, YasmeenGlobalParameters.Performance.MAX_SPEED.name()); + config.setParam(YasmeenGlobalParameters.activatePreParsingProcessing, "true"); config.setParam(YasmeenGlobalParameters.useStemmedGenusAndSpecies, "false"); @@ -60,7 +62,7 @@ public class TestWPSJobs { config.setParam("ServiceUserName", "gianpaolo.coro"); config.setParam("DatabaseDriver", "org.postgresql.Driver"); config.setGcubeUserName("gianpaolo.coro"); - config.setGcubeToken("cb289202-e7d6-45ee-8076-a80bc4d4be51-98187548"); + config.setGcubeToken("f9d49d76-cd60-48ed-9f8e-036bcc1fc045-98187548"); return config; } @@ -74,7 +76,7 @@ public class TestWPSJobs { AlgorithmConfiguration config = buildTestConfiguration(); WPSJobManager manager = new WPSJobManager(); - TasksWatcher taskWatcher = manager.new TasksWatcher(algorithm, config.getGcubeUserName(), config.getGcubeToken(), host, port, session, 1, config, 1, 1, 1, 1); + TasksWatcher taskWatcher = manager.new TasksWatcher(algorithm, config.getGcubeUserName(), config.getGcubeToken(), host, port, session, 1, config, 1, 1, 1, 1,"",1); Thread t = new Thread(taskWatcher); t.start(); @@ -92,7 +94,8 @@ public class TestWPSJobs { String algorithm = "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer"; ScopeProvider.instance.set(config.getGcubeScope()); WPSJobManager jobmanager = new WPSJobManager(); - int nArguments = 100; + //int nArguments = 100; + int nArguments = 20; List arguments = new ArrayList(); for (int i=1;i<=nArguments;i++){ String argument = "1 1 "+i+" 1";