diff --git a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java index 395c55b..966a6ac 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java +++ b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java @@ -3,6 +3,7 @@ package org.gcube.dataanalysis.executor.generators; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.UUID; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS; @@ -15,6 +16,7 @@ import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; import org.gcube.dataanalysis.ecoengine.interfaces.Generator; import org.gcube.dataanalysis.ecoengine.interfaces.GenericAlgorithm; import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgent; +import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgentWPS; public class D4ScienceDistributedProcessing implements Generator { @@ -24,7 +26,7 @@ public class D4ScienceDistributedProcessing implements Generator { protected AlgorithmConfiguration config; protected ActorNode distributedModel; protected String mainclass; - DistributedProcessingAgent agent; + DistributedProcessingAgentWPS agent; public D4ScienceDistributedProcessing(){ } @@ -100,10 +102,13 @@ public class D4ScienceDistributedProcessing implements Generator { distributedModel.setup(config); String scope = config.getGcubeScope(); AnalysisLogger.getLogger().info("Using the following scope for the computation:"+scope); - String owner = config.getParam("ServiceUserName"); + String owner = config.getGcubeUserName(); int leftNum = distributedModel.getNumberOfLeftElements(); int rightNum = distributedModel.getNumberOfRightElements(); - agent = new DistributedProcessingAgent(config, scope, owner, mainclass, config.getPersistencePath(), algorithm, defaultContainerFolder, maxMessagesAllowedPerJob, forceUpload, leftNum, rightNum,config.getTaskID()); + if (config.getTaskID()==null || config.getTaskID().length()==0) + config.setTaskID(""+UUID.randomUUID()); + + agent = new DistributedProcessingAgentWPS(config, scope, owner, mainclass, config.getPersistencePath(), algorithm, defaultContainerFolder, maxMessagesAllowedPerJob, forceUpload, leftNum, rightNum,config.getTaskID()); agent.setLogger(AnalysisLogger.getLogger()); } @@ -143,7 +148,7 @@ public class D4ScienceDistributedProcessing implements Generator { @Override public String getDescription() { - return "A D4Science Cloud Processor for Species Distributions"; + return "A D4Science Cloud Processor"; } } diff --git a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessingExecutor.java b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessingExecutor.java new file mode 100644 index 0000000..55e03c9 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessingExecutor.java @@ -0,0 +1,149 @@ +package org.gcube.dataanalysis.executor.generators; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE; +import org.gcube.dataanalysis.ecoengine.datatypes.ServiceType; +import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; +import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.ServiceParameters; +import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; +import org.gcube.dataanalysis.ecoengine.interfaces.Generator; +import org.gcube.dataanalysis.ecoengine.interfaces.GenericAlgorithm; +import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgent; + +public class D4ScienceDistributedProcessingExecutor implements Generator { + + public static int maxMessagesAllowedPerJob = 20; + public static boolean forceUpload = true; + public static String defaultContainerFolder = "PARALLEL_PROCESSING"; + protected AlgorithmConfiguration config; + protected ActorNode distributedModel; + protected String mainclass; + DistributedProcessingAgent agent; + + public D4ScienceDistributedProcessingExecutor(){ + } + + public D4ScienceDistributedProcessingExecutor(AlgorithmConfiguration config) { + this.config = config; + + AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile); + } + + public void compute() throws Exception { + try { + agent.compute(); + distributedModel.postProcess(agent.hasResentMessages(),false); + } catch (Exception e) { + distributedModel.postProcess(false,true); + AnalysisLogger.getLogger().error("ERROR: An Error occurred ", e); + throw e; + } finally { + shutdown(); + } + } + + @Override + public List getInputParameters() { + + List distributionModelParams = new ArrayList(); + distributionModelParams.add(new ServiceType(ServiceParameters.USERNAME,"ServiceUserName","The final user Name")); + + return distributionModelParams; + } + + + @Override + public String getResources() { + return agent.getResources(); + } + + @Override + public float getStatus() { + return agent.getStatus(); + } + + @Override + public StatisticalType getOutput() { + return distributedModel.getOutput(); + } + + @Override + public ALG_PROPS[] getSupportedAlgorithms() { + ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON}; + return p; + } + + @Override + public INFRASTRUCTURE getInfrastructure() { + return INFRASTRUCTURE.D4SCIENCE; + } + + @Override + public void init() throws Exception { + + Properties p = AlgorithmConfiguration.getProperties(config.getConfigPath() + AlgorithmConfiguration.nodeAlgorithmsFile); + String model = config.getModel(); + String algorithm = null; + if ((model!=null) && (model.length()>0)) + algorithm = model; + else + algorithm=config.getAgent(); + + mainclass = p.getProperty(algorithm); + distributedModel = (ActorNode) Class.forName(mainclass).newInstance(); + distributedModel.setup(config); + String scope = config.getGcubeScope(); + AnalysisLogger.getLogger().info("Using the following scope for the computation:"+scope); + String owner = config.getParam("ServiceUserName"); + int leftNum = distributedModel.getNumberOfLeftElements(); + int rightNum = distributedModel.getNumberOfRightElements(); + agent = new DistributedProcessingAgent(config, scope, owner, mainclass, config.getPersistencePath(), algorithm, defaultContainerFolder, maxMessagesAllowedPerJob, forceUpload, leftNum, rightNum,config.getTaskID()); + agent.setLogger(AnalysisLogger.getLogger()); + } + + @Override + public void setConfiguration(AlgorithmConfiguration config) { + this.config = config; + AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile); + } + + @Override + public void shutdown() { + try { + agent.shutdown(); + } catch (Exception e) { + } + try { + distributedModel.stop(); + } catch (Exception e) { + } + } + + @Override + public String getLoad() { + return agent.getLoad(); + } + + @Override + public String getResourceLoad() { + return agent.getResourceLoad(); + } + + + @Override + public GenericAlgorithm getAlgorithm() { + return distributedModel; + } + + @Override + public String getDescription() { + return "A D4Science Cloud Processor for Species Distributions"; + } + +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java new file mode 100644 index 0000000..14b5e68 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java @@ -0,0 +1,229 @@ +package org.gcube.dataanalysis.executor.job.management; + +import java.io.File; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.Logger; +import org.gcube.contentmanagement.graphtools.utils.HttpRequest; +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools; +import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE; +import org.gcube.dataanalysis.ecoengine.connectors.livemonitor.ResourceLoad; +import org.gcube.dataanalysis.ecoengine.connectors.livemonitor.Resources; +import org.gcube.dataanalysis.ecoengine.utils.Operations; + +import com.thoughtworks.xstream.XStream; + +public class DistributedProcessingAgentWPS { + + + protected WPSJobManager jobManager; + protected boolean deletefiles = true; + protected String mainclass; + public int maxElementsAllowedPerJob = 20; + protected boolean forceUpload = true; + protected boolean stop; + protected String gscope; + protected String userName; + protected String pathToLib; + protected String modelName; + protected String containerFolder; + protected AlgorithmConfiguration configuration; + protected int rightSetNumberOfElements; + protected int leftSetNumberOfElements; + protected List endpoints; + protected int subdivisiondiv; + protected String sessionID; + + protected static String defaultJobOutput = "execution.output"; + protected static String defaultScriptFile = "script"; + protected Logger logger; + + /** + * A distributed processing agent. Performs a distributed computation doing the MAP of the product of two sets: A and B + * Splits over B : A x B1 , A x B2, ... , A x Bn + * Prepares a script to be executed on remote nodes + * The computation is then sent to remote processors. + */ + public DistributedProcessingAgentWPS(AlgorithmConfiguration configuration, + String gCubeScope, + String computationOwner, + String mainClass, + String pathToLibFolder, + String modelName, + String containerFolder, + int maxElementsPerJob, + boolean forceReUploadofLibs, + int leftSetNumberOfElements, + int rightSetNumberOfElements, + String sessionID + ) { + this.stop = false; + this.deletefiles = true; + this.gscope=gCubeScope; + this.mainclass=mainClass; + this.maxElementsAllowedPerJob=maxElementsPerJob; + this.forceUpload=forceReUploadofLibs; + this.configuration=configuration; + this.rightSetNumberOfElements=rightSetNumberOfElements; + this.leftSetNumberOfElements=leftSetNumberOfElements; + this.userName=computationOwner; + this.pathToLib=pathToLibFolder; + this.modelName=modelName; + this.containerFolder=containerFolder; + this.sessionID = sessionID; + } + + public void setLogger(Logger logger){ + this.logger=logger; + } + + + public boolean hasResentMessages(){ + return jobManager.hasResentMessages(); + } + + public void compute() throws Exception { + try { + if (logger == null){ + logger = AnalysisLogger.getLogger(); + } + if (gscope == null) + throw new Exception("Null Scope"); + AnalysisLogger.getLogger().debug("SCOPE: "+gscope); + + jobManager = new WPSJobManager(); + // we split along right dimension so if elements are less than nodes, we should reduce the number of nodes + // chunkize the number of species in order to lower the computational effort of the workers + subdivisiondiv = rightSetNumberOfElements / (maxElementsAllowedPerJob); + int rest = rightSetNumberOfElements % (maxElementsAllowedPerJob); + if (rest > 0) + subdivisiondiv++; + if (subdivisiondiv == 0) + subdivisiondiv = 1; + + executeWork(leftSetNumberOfElements, rightSetNumberOfElements, 0, subdivisiondiv, deletefiles, forceUpload); + + if (jobManager.wasAborted()) { + logger.debug("Warning: Job was aborted"); +// distributionModel.postProcess(false,true); + throw new Exception("Job System Error"); + } + else{ + //postprocess +// distributionModel.postProcess(jobManager.hasResentMessages(),false); + } + + + } catch (Exception e) { + logger.error("ERROR: An Error occurred ", e); + e.printStackTrace(); + throw e; + } finally { + shutdown(); + } + } + + private void executeWork(int leftNum, int rightNum, int offset, int numberOfResources, boolean deletefiles, boolean forceUpload) throws Exception { + + int[] chunkSizes = Operations.takeChunks(rightNum, numberOfResources); + List arguments = new ArrayList(); + + // chunkize respect to the cells: take a chunk of cells vs all species at each node! + for (int i = 0; i < chunkSizes.length; i++) { + String argumentString = "0 " + leftNum + " " + offset + " " + chunkSizes[i]; + arguments.add(argumentString); + offset += chunkSizes[i]; + logger.debug("Generator-> Argument " + i + ": " + argumentString); + } + + jobManager.uploadAndExecuteChunkized(configuration,mainclass,arguments,sessionID); + + } + + public String getResources() { + Resources res = new Resources(); + try { + int activeNodes = jobManager.getActiveNodes(); + for (int i = 0; i < activeNodes; i++) { + try { + res.addResource("Worker_" + (i + 1), 100); + } catch (Exception e1) { + } + } + } catch (Exception e) { + AnalysisLogger.getLogger().debug("D4ScienceGenerator->active nodes not ready"); + } + if ((res != null) && (res.list != null)) + return HttpRequest.toJSon(res.list).replace("resId", "resID"); + else + return ""; + } + + public float getStatus() { + try { + if (stop) + return 100f; + else + if (jobManager!=null) + return Math.max(0.5f, jobManager.getStatus() * 100f); + else + return 0; + } catch (Exception e) { + return 0f; + } + } + + public ALG_PROPS[] getSupportedAlgorithms() { + ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON}; + return p; + } + + public INFRASTRUCTURE getInfrastructure() { + return INFRASTRUCTURE.D4SCIENCE; + } + + public void shutdown() { + + try { + jobManager.stop(); + } catch (Exception e) { + } + stop = true; + } + + public String getLoad() { + long tk = System.currentTimeMillis(); + ResourceLoad rs = null; + if (jobManager!=null) + rs = new ResourceLoad(tk, 1*subdivisiondiv); + else + rs = new ResourceLoad(tk, 0); + return rs.toString(); + } + + private long lastTime; + private int lastProcessed; + public String getResourceLoad() { + long thisTime = System.currentTimeMillis(); + int processedRecords = 0; + if ((jobManager!=null) && (subdivisiondiv>0)) + processedRecords = 1*subdivisiondiv; + + int estimatedProcessedRecords = 0; + if (processedRecords == lastProcessed) { + estimatedProcessedRecords = Math.round(((float) thisTime * (float) lastProcessed) / (float) lastTime); + } else { + lastProcessed = processedRecords; + estimatedProcessedRecords = lastProcessed; + } + lastTime = thisTime; + ResourceLoad rs = new ResourceLoad(thisTime, estimatedProcessedRecords); + return rs.toString(); + } + +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java index 4af05de..3aafa27 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java @@ -9,6 +9,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; +import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.UUID; @@ -37,6 +38,7 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ public static String OutputParameter = "Process_Outcome"; public static String TASK_SUCCESS = "TASK_SUCCESS"; public static String TASK_FAILURE = "TASK_FAILURE"; + public static String TASK_UNDEFINED = "TASK_UNDEFINED"; private static void inputStreamToFile(InputStream is, String path) throws FileNotFoundException, IOException { FileOutputStream out = new FileOutputStream(new File(path)); @@ -65,9 +67,9 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ File tempDir = null ; try { Handler.activateProtocol(); - String locDir = session; + if (session == null) - locDir = ("" + UUID.randomUUID()).replace("-", ""); + // invoke the algorithm logger.debug("GenericWorker-> Creating algorithm " + algorithmClass); @@ -91,7 +93,7 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ System.setOut(ps); System.setErr(ps); - node.executeNode(rightStartIndex, numberOfRightElementsToProcess, leftStartIndex, numberOfLeftElementsToProcess, isduplicate, + node.executeNode(leftStartIndex, numberOfLeftElementsToProcess, rightStartIndex, numberOfRightElementsToProcess, isduplicate, config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt"); String log = new String(baos.toByteArray(), StandardCharsets.UTF_8); @@ -103,8 +105,14 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ logger.debug("GenericWorker-> deleted sandbox file: "+del ); logger.debug("GenericWorker-> all done"); - if (log.contains("Exception:") && log.contains("Caused by:")){ + if (log.contains("Exception:")){ outputParameters.put(OutputParameter, TASK_FAILURE); + String cutLog = URLEncoder.encode(log, "UTF-8"); + int maxlen = 20240; + if (log.length()>maxlen) + cutLog = cutLog.substring(0,maxlen)+"..."; + + outputParameters.put("Log", cutLog); logger.debug("GenericWorker-> Failure!"); } else @@ -112,6 +120,7 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ logger.debug("GenericWorker-> Success!"); } catch (Throwable e) { outputParameters.put(OutputParameter, TASK_FAILURE); + outputParameters.put("Log", e.getLocalizedMessage()); e.printStackTrace(); logger.debug("GenericWorker-> ERROR: " + e.getLocalizedMessage()); status = 100f; @@ -134,6 +143,9 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ logger.debug("GenericWorker-> deleting temporary directory"); tempDir.delete(); } + if (nodeConfigurationFileObject!=null && nodeConfigurationFileObject.exists()) + nodeConfigurationFileObject.delete(); + }catch(Exception e3){ e3.printStackTrace(); logger.debug("GenericWorker-> Error deleting files"); @@ -157,16 +169,16 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ protected void process() throws Exception { AnalysisLogger.getLogger().debug("Parameters: "+config.getGeneralProperties()); - String algorithmClass = config.getParam("AlgorithmClass"); + String algorithmClass = config.getParam(AlgorithmClassParameter); - int rightStartIndex = Integer.parseInt(config.getParam("RightSetStartIndex")); - int numberOfRightElementsToProcess =Integer.parseInt(config.getParam("NumberOfRightElementsToProcess")); - int leftStartIndex =Integer.parseInt(config.getParam("LeftSetStartIndex")); - int numberOfLeftElementsToProcess =Integer.parseInt(config.getParam("NumberOfLeftElementsToProcess")); - boolean isduplicate=Boolean.parseBoolean(config.getParam("IsDuplicate")); - String session=config.getParam("Session"); - File nodeConfigurationFileObject=new File (config.getParam("ConfigurationFile")); - boolean deleteFiles= Boolean.parseBoolean(config.getParam("DeleteTemporaryFiles")); + int rightStartIndex = Integer.parseInt(config.getParam(RightSetStartIndexParameter)); + int numberOfRightElementsToProcess =Integer.parseInt(config.getParam(NumberOfRightElementsToProcessParameter)); + int leftStartIndex =Integer.parseInt(config.getParam(LeftSetStartIndexParameter)); + int numberOfLeftElementsToProcess =Integer.parseInt(config.getParam(NumberOfLeftElementsToProcessParameter)); + boolean isduplicate=Boolean.parseBoolean(config.getParam(IsDuplicateParameter)); + String session=config.getParam(SessionParameter); + File nodeConfigurationFileObject=new File (config.getParam(ConfigurationFileParameter)); + boolean deleteFiles= Boolean.parseBoolean(config.getParam(DeleteTemporaryFilesParameter)); AnalysisLogger.getLogger().debug("Executing the algorithm"); executeAlgorithm(algorithmClass, rightStartIndex, numberOfRightElementsToProcess, leftStartIndex, numberOfLeftElementsToProcess, isduplicate, session, nodeConfigurationFileObject, deleteFiles); @@ -177,15 +189,15 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{ @Override protected void setInputParameters() { - addStringInput("AlgorithmClass", "The full class path of the algorithm", "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer"); - addIntegerInput("RightSetStartIndex", "The start index of the right set in a cartesian product of the input", "1"); - addIntegerInput("NumberOfRightElementsToProcess", "The number of elements to process in the right set", "1"); - addIntegerInput("LeftSetStartIndex", "The start index of the left set in a cartesian product of the input", "1"); - addIntegerInput("NumberOfLeftElementsToProcess", "The number of elements to process in the left set", "1"); - addBooleanInput("IsDuplicate", "Indicate if this sub computation is a duplicate of another sub-computation", "false"); - addStringInput("Session", "The session this sub-computation belongs to", "123456"); - addFileInput("ConfigurationFile", "A configuration file for the algorithm in an XML serialisation format for the AlgorithmConfiguration Object", "config.dat"); - addBooleanInput("DeleteTemporaryFiles","Delete local temporary files after the computation","true"); + addStringInput(AlgorithmClassParameter, "The full class path of the algorithm", "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer"); + addIntegerInput(RightSetStartIndexParameter, "The start index of the right set in a cartesian product of the input", "1"); + addIntegerInput(NumberOfRightElementsToProcessParameter, "The number of elements to process in the right set", "1"); + addIntegerInput(LeftSetStartIndexParameter, "The start index of the left set in a cartesian product of the input", "1"); + addIntegerInput(NumberOfLeftElementsToProcessParameter, "The number of elements to process in the left set", "1"); + addBooleanInput(IsDuplicateParameter, "Indicate if this sub computation is a duplicate of another sub-computation", "false"); + addStringInput(SessionParameter, "The session this sub-computation belongs to", "123456"); + addFileInput(ConfigurationFileParameter, "A configuration file for the algorithm in an XML serialisation format for the AlgorithmConfiguration Object", "config.dat"); + addBooleanInput(DeleteTemporaryFilesParameter,"Delete local temporary files after the computation","true"); } @Override diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorkerCaller.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorkerCaller.java new file mode 100644 index 0000000..8ebb3ad --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorkerCaller.java @@ -0,0 +1,46 @@ +package org.gcube.dataanalysis.executor.job.management; + +import java.io.File; + +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; + +import com.thoughtworks.xstream.XStream; + +public class GenericWorkerCaller { + + + public static String getGenericWorkerCall(String algorithm, String session, AlgorithmConfiguration configuration,int leftSetIndex,int rightSetIndex,int leftElements,int rightElements, boolean isduplicate,boolean deleteTemporaryFiles) throws Exception{ + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + String xmlconfig = new XStream().toXML(configuration); + xmlconfig = xmlconfig.replace("\n", "").replace("\t", ""); + xmlconfig = xmlconfig.replaceAll(">[ ]+<", "> <"); + + AnalysisLogger.getLogger().debug("CONFIG of Task:"); + AnalysisLogger.getLogger().debug("algorithm: "+algorithm); + AnalysisLogger.getLogger().debug("leftSetIndex: "+leftSetIndex); + AnalysisLogger.getLogger().debug("leftElements: "+leftElements); + AnalysisLogger.getLogger().debug("rightSetIndex: "+rightSetIndex); + AnalysisLogger.getLogger().debug("rightElements: "+rightElements); + AnalysisLogger.getLogger().debug("session: "+session); + AnalysisLogger.getLogger().debug("isduplicate: "+isduplicate); + AnalysisLogger.getLogger().debug("deleteTemporaryFiles: "+deleteTemporaryFiles); + + File is = new File(classLoader.getResource("WPSGWTemplate.xml").getFile()); + String call=FileTools.loadString(is.getAbsolutePath(), "UTF-8"); + AnalysisLogger.getLogger().debug("call template : "+call); + call = call.replace("#"+GenericWorker.AlgorithmClassParameter+"#", algorithm); + call = call.replace("#"+GenericWorker.LeftSetStartIndexParameter+"#", ""+leftSetIndex); + call = call.replace("#"+GenericWorker.NumberOfLeftElementsToProcessParameter+"#", ""+leftElements); + call = call.replace("#"+GenericWorker.RightSetStartIndexParameter+"#", ""+rightSetIndex); + call = call.replace("#"+GenericWorker.NumberOfRightElementsToProcessParameter+"#", ""+rightElements); + call = call.replace("#"+GenericWorker.SessionParameter+"#", session); + call = call.replace("#"+GenericWorker.IsDuplicateParameter+"#", ""+isduplicate); + call = call.replace("#"+GenericWorker.DeleteTemporaryFilesParameter+"#", ""+deleteTemporaryFiles); + call = call.replace("#"+GenericWorker.ConfigurationFileParameter+"#", ""+xmlconfig); + + return call; + + } +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java index 5f44e0a..93eaa97 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java @@ -444,7 +444,7 @@ public class QueueJobManager { // AnalysisLogger.getLogger().info("Contacting node " + (order + 1) + " OK on " + selectedEPR); */ - ScopeProvider.instance.set(scope); +// ScopeProvider.instance.set(scope); ExecutorPlugin runExecutorPlugin = new ExecutorPlugin(); SmartExecutorPluginQuery runQuery = new SmartExecutorPluginQuery(runExecutorPlugin); @@ -458,7 +458,9 @@ public class QueueJobManager { SpecificEndpointDiscoveryFilter sedf = new SpecificEndpointDiscoveryFilter(selectedEPR); runQuery.setEndpointDiscoveryFilter(sedf); SmartExecutorProxy proxy = new ProxyBuilderImpl(runExecutorPlugin, runQuery).build(); - AnalysisLogger.getLogger().debug("Launching Smart Executor in namely Scope: "+scope+" real scope "+ScopeProvider.instance.get()); + + //AnalysisLogger.getLogger().debug("Launching Smart Executor in namely Scope: "+scope+" real scope "+ScopeProvider.instance.get()); + AnalysisLogger.getLogger().debug("Launching Smart Executor in namely Scope: "+scope); LaunchParameter launchParameter = new LaunchParameter(pluginName, inputs); String excecutionIdentifier = proxy.launch(launchParameter); tasksProxies.add(new WorkerWatcher(proxy, excecutionIdentifier, AnalysisLogger.getLogger())); diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/RemoteJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/RemoteJobManager.java deleted file mode 100644 index b25067d..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/RemoteJobManager.java +++ /dev/null @@ -1,311 +0,0 @@ -package org.gcube.dataanalysis.executor.job.management; - -import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import javax.xml.ws.EndpointReference; - -import org.gcube.common.clients.ProxyBuilderImpl; -import org.gcube.common.scope.api.ScopeProvider; -import org.gcube.contentmanagement.blobstorage.service.IClient; -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.contentmanager.storageclient.wrapper.AccessType; -import org.gcube.contentmanager.storageclient.wrapper.StorageClient; -import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker; -import org.gcube.vremanagement.executor.api.SmartExecutor; -import org.gcube.vremanagement.executor.api.types.LaunchParameter; -import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; -import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery; -import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; -import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; -import org.gcube.vremanagement.executor.plugin.PluginState; - -public class RemoteJobManager { - - // TODO Chek here: - //private static String pluginName = "ExecutorScript"; - private static String pluginName = "SmartGenericWorker"; - - private int actualNumberOfNodes; - private List eprs; - float status; - boolean abort; - boolean shutdown; - protected int activeNodes; - String scope; - - public int getActiveNodes() { - return activeNodes; - } - - public float getStatus() { - return status; - } - - public int getNumberOfNodes() { - return actualNumberOfNodes; - } - - public void setNumberOfNodes(int newNumberOfNodes) { - actualNumberOfNodes = newNumberOfNodes; - } - - public void init(String scope, int numberOfNodes) throws Exception { - this.scope = scope; - AnalysisLogger.getLogger().debug("Using the following scope for this computation: "+ scope); - shutdown = false; - yetuploaded = false; - if (eprs == null) - actualNumberOfNodes = findNodes(scope); - else - actualNumberOfNodes = eprs.size(); - - if (numberOfNodes < actualNumberOfNodes) - actualNumberOfNodes = numberOfNodes; - - } - - public RemoteJobManager(String scope, int numberOfNodes) throws Exception { - init(scope, numberOfNodes); - } - - public RemoteJobManager(String scope, int numberOfNodes, List eprs) throws Exception { - this.eprs = eprs; - init(scope, numberOfNodes); - } - - List filenames; - List fileurls; - boolean yetuploaded; - String session; - - @SuppressWarnings("unchecked") - public boolean uploadAndExecute(String serviceClass, String serviceName, String owner, String localDir, String remoteDir, String outputDir, String script, List arguments, boolean deletefiles) throws Exception { - boolean executeAll = false; - long t0 = System.currentTimeMillis(); - //if not yet uploaded , upload required files - if (!yetuploaded) { - ScopeProvider.instance.set(scope); - IClient client = new StorageClient(serviceClass, serviceName, owner, AccessType.SHARED).getClient(); -// IClient client = new StorageClient(serviceClass, serviceName, owner, AccessType.SHARED, gscope).getClient(); - File dir = new File(localDir); - File[] files = dir.listFiles(); - AnalysisLogger.getLogger().debug("Start uploading"); - filenames = new ArrayList(); - fileurls = new ArrayList(); - for (File sfile : files) { - String localf = sfile.getAbsolutePath(); - String filename = sfile.getName(); - String remotef = remoteDir + sfile.getName(); - client.put(true).LFile(localf).RFile(remotef); - String url = client.getUrl().RFile(remotef); - AnalysisLogger.getLogger().debug("URL created: " + url); - filenames.add(filename); - fileurls.add(url); - } - AnalysisLogger.getLogger().debug("Upload end"); - yetuploaded = true; - session = (""+UUID.randomUUID()).replace("-", ""); - } - - //if the number of available nodes is higher than zero launch the tasks - if (actualNumberOfNodes > 0) { - - AnalysisLogger.getLogger().debug("Executing script on " + actualNumberOfNodes + " nodes"); - int len = arguments.size(); - List tasksProxies = new ArrayList(); - activeNodes = 0; - //launch the tasks - for (int i = 0; i < actualNumberOfNodes; i++) { - String argum = ""; - //supply the arguments if they are available - if (i < len) - argum = arguments.get(i); - //generate the input map according to the arguments - Map inputs = generateInput(filenames, fileurls, outputDir, script, argum, i, scope, serviceClass, serviceName, owner, remoteDir,session,deletefiles); - AnalysisLogger.getLogger().debug("-> Owner: " + owner + " ServiceClass: " + serviceClass + " ServiceName:" + serviceName + " remoteDir:" + remoteDir); - - - //take the i-th endpoint of the executor - String selectedEPR = eprs.get(i); - AnalysisLogger.getLogger().debug("Launching node " + (i + 1) + " on " + selectedEPR); - //run the executor script - - /* - ExecutorCall call = new ExecutorCall(pluginName, gscope); - call.setEndpointReference(selectedEPR); - TaskCall task = null; - task = call.launch(inputs); - TaskProxy proxy = task.getProxy(); - */ - - ExecutorPlugin runExecutorPlugin = new ExecutorPlugin(); - SmartExecutorPluginQuery runQuery = new SmartExecutorPluginQuery(runExecutorPlugin); - - /* TODO Add key_value filter here - * Tuple[] tuples = new Tuple[n]; - * - * runQuery.addConditions(pluginName, tuples); - */ - runQuery.addConditions(pluginName); - SpecificEndpointDiscoveryFilter sedf = new SpecificEndpointDiscoveryFilter(selectedEPR); - runQuery.setEndpointDiscoveryFilter(sedf); - SmartExecutorProxy proxy = new ProxyBuilderImpl(runExecutorPlugin, runQuery).build(); - - - LaunchParameter launchParameter = new LaunchParameter(pluginName, inputs); - String excecutionIdentifier = proxy.launch(launchParameter); - - tasksProxies.add(new WorkerWatcher(proxy, excecutionIdentifier, AnalysisLogger.getLogger())); - - AnalysisLogger.getLogger().debug("Launching node " + (i + 1) + " OK on " + selectedEPR); - //add the task to the list in order to reuse it - } - - activeNodes = actualNumberOfNodes; - AnalysisLogger.getLogger().debug("Launch Finished - Controlling Status"); - int allstatus = 0; - abort = false; - //control the execution: go until there are active nodes or the process must stop - while ((activeNodes != 0) && (!abort) && (!shutdown)) { - //for each node get the task state - int nworkers = tasksProxies.size(); - int i=0; - while (i < nworkers) { - WorkerWatcher proxy = tasksProxies.get(i); - - /* ---- */ - PluginState enumState = proxy.getState(); - String state = enumState.toString(); - /* ----- */ - - - AnalysisLogger.getLogger().debug("REMOTE JOB MANAGER-> STATE " + state ); - //control for aborted computation - abort = ((state == null) || state.equals("FAILED") || (!state.equals("DONE") && !state.equals("RUNNING"))); - //control for finished computation - boolean finished = false; - if (state != null) - finished = state.equals("DONE"); - //if finished update the active nodes - if (finished) { - tasksProxies.remove(i); - allstatus++; - activeNodes--; - nworkers--; - if (activeNodes == 0) - break; - } - else - i++; - - status = Math.min(((float) allstatus / (float) actualNumberOfNodes) * 100f, 95f); - if (abort) - break; - if (shutdown) - break; - // AnalysisLogger.getLogger().debug(String.format("Task " + i + "executed started at %Tc with %s state ", proxy.getStartTime(), state)); - //sleep before polling again - Thread.sleep(2000); - } - } - - activeNodes = 0; - - AnalysisLogger.getLogger().debug("All Tasks have Finished"); - if (!abort) { - AnalysisLogger.getLogger().debug("All Task were successful"); - /* - * List listElements = client.showDir().RDir(remoteDir); for (StorageObject obj : listElements) { AnalysisLogger.getLogger().debug("obj stored in directory " + remoteDir + ": " + obj.getName()); } - */ - } else - AnalysisLogger.getLogger().debug("Tasks were NOT successful"); - } else - AnalysisLogger.getLogger().debug("Warning: could not execute tasks: No Nodes Available!"); - AnalysisLogger.getLogger().debug("Whole procedure done in " + (System.currentTimeMillis() - t0) + " ms"); - status = 100f; - return executeAll; - } - - public boolean wasAborted() { - return abort; - } - - public void stop() { - shutdown = true; - } - - @SuppressWarnings("unchecked") - private List getFilteredEndpoints(String scopeString){ - ScopeProvider.instance.set(scopeString); - - ExecutorPlugin executorPlugin = new ExecutorPlugin(); - SmartExecutorPluginQuery query = new SmartExecutorPluginQuery(executorPlugin); - - /* - Tuple[] tuples = new Tuple[1]; - tuples[0] = new Tuple("Version", "1.0.0-SNAPSHOT"); - query.addConditions("SmartGenericWorker", tuples); - */ - - query.addConditions(pluginName); - - /* Used to add extra filter to ServiceEndpoint discovery */ - query.setServiceEndpointQueryFilter(null); - - /* Used to add extra filter to GCore Endpoint discovery */ - query.setEndpointDiscoveryFilter(null); - - - return query.fire(); - } - - - private int findNodes(String scopeString) throws Exception { - return getFilteredEndpoints(scopeString).size(); - } - - /* - private int findNodes(String scopeString) throws Exception { - GCUBEScope scope = GCUBEScope.getScope(scopeString); - ISClient client = GHNContext.getImplementation(ISClient.class); - WSResourceQuery wsquery = client.getQuery(WSResourceQuery.class); - wsquery.addAtomicConditions(new AtomicCondition("//gc:ServiceName", "Executor")); - wsquery.addAtomicConditions(new AtomicCondition("/child::*[local-name()='Task']/name[text()='"+pluginName+"']", pluginName)); - List listdoc = client.execute(wsquery, scope); - EndpointReferenceType epr = null; - eprs = new ArrayList(); - int numberOfEP = 0; - for (RPDocument resource : listdoc) { - epr = resource.getEndpoint(); - numberOfEP++; - eprs.add(epr); - } - AnalysisLogger.getLogger().debug("Found " + numberOfEP + " endpoints"); - - return numberOfEP; - } - */ - - private Map generateInput(Object filenames, Object fileurls, String outputDir, String script, String argum, int i, String scope, String serviceClass, String serviceName, String owner, String remoteDir,String session,boolean deletefiles) { - Map inputs = new HashMap(); - inputs.put("FILE_NAMES", filenames); - inputs.put("FILE_URLS", fileurls); - inputs.put("OUTPUTDIR", ScriptIOWorker.toInputString(outputDir)); - inputs.put("SCRIPT", ScriptIOWorker.toInputString(script)); - inputs.put("ARGUMENTS", ScriptIOWorker.toInputString(argum)); - inputs.put("NODE_IDENTIFIER", "" + i); - inputs.put("SCOPE", ScriptIOWorker.toInputString(scope)); - inputs.put("SERVICE_CLASS", ScriptIOWorker.toInputString(serviceClass)); - inputs.put("SERVICE_NAME", ScriptIOWorker.toInputString(serviceName)); - inputs.put("OWNER", ScriptIOWorker.toInputString(owner)); - inputs.put("REMOTEDIR", ScriptIOWorker.toInputString(remoteDir)); - inputs.put("CLEAN_CACHE",""+deletefiles); -// inputs.put("SESSION", ScriptIO.toInputString(session)); - return inputs; - } -} diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java new file mode 100644 index 0000000..e2749ed --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java @@ -0,0 +1,326 @@ +package org.gcube.dataanalysis.executor.job.management; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + +import org.apache.activemq.ActiveMQConnection; +import org.gcube.common.clients.ProxyBuilderImpl; +import org.gcube.common.resources.gcore.ServiceEndpoint; +import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint; +import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.contentmanagement.blobstorage.resource.StorageObject; +import org.gcube.contentmanagement.blobstorage.service.IClient; +import org.gcube.contentmanagement.graphtools.utils.HttpRequest; +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.contentmanager.storageclient.wrapper.AccessType; +import org.gcube.contentmanager.storageclient.wrapper.MemoryType; +import org.gcube.contentmanager.storageclient.wrapper.StorageClient; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.ecoengine.utils.Operations; +import org.gcube.dataanalysis.executor.messagequeue.ATTRIBUTE; +import org.gcube.dataanalysis.executor.messagequeue.Consumer; +import org.gcube.dataanalysis.executor.messagequeue.Producer; +import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS; +import org.gcube.dataanalysis.executor.messagequeue.QueueManager; +import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer; +import org.gcube.dataanalysis.executor.nodes.transducers.bionym.utils.YasmeenGlobalParameters; +import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker; +import org.gcube.dataanalysis.executor.util.InfraRetrieval; +import org.gcube.resources.discovery.client.api.DiscoveryClient; +import org.gcube.resources.discovery.client.queries.api.SimpleQuery; +import org.gcube.vremanagement.executor.api.SmartExecutor; +import org.gcube.vremanagement.executor.api.types.LaunchParameter; +import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; +import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery; +import org.gcube.vremanagement.executor.client.plugins.query.filter.ListEndpointDiscoveryFilter; +import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; +import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; + +import com.thoughtworks.xstream.XStream; + +import static org.gcube.resources.discovery.icclient.ICFactory.*; + +public class WPSJobManager { + + static final int pollingTime = 5000; + static final int maxTrialsPerThread = 3; + + + int overallFailures = 0; + int overallSuccess = 0; + int overallTasks = 0; + + + boolean stopThreads = false; + boolean hasResentMessages = false; + + final public synchronized void incrementOverallFailures() { + overallFailures++; + } + + final public synchronized void hasResentTrue() { + if (!hasResentMessages) + hasResentMessages=true; + } + + final public synchronized void incrementOverallSuccess() { + overallSuccess++; + } + + final public synchronized void stop() { + stopThreads=true; + } + + final public synchronized boolean isStopped() { + return stopThreads; + } + + public class TasksWatcher implements Runnable { + AlgorithmConfiguration configuration; + String algorithm; + String username; + String token; + String wpsHost; + int wpsPort; + int taskNumber; + String session; + public String exitstatus=GenericWorker.TASK_UNDEFINED; + int leftSetIndex; + int rightSetIndex; + int leftElements; + int rightElements; + + + public TasksWatcher(String algorithm, String username, String token, String wpsHost, int wpsPort, String session, int taskNumber, AlgorithmConfiguration configuration, int leftSetIndex, int rightSetIndex, int leftElements, int rightElements) { + this.algorithm = algorithm; + this.token = token; + this.wpsHost = wpsHost; + this.wpsPort = wpsPort; + this.taskNumber = taskNumber; + this.session = session; + this.username = username; + this.configuration = configuration; + this.leftSetIndex = leftSetIndex; + this.leftElements = leftElements; + this.rightSetIndex = rightSetIndex; + this.rightElements = rightElements; + } + + + public void callTask(boolean isduplicate){ + String url = "http://" + wpsHost + ":" + wpsPort + "/wps/WebProcessingService"; + + boolean deleteTemporaryFiles = true; + AnalysisLogger.getLogger().debug("Task Number : " + taskNumber+" GO!"); + try { + String algorithmCall = GenericWorkerCaller.getGenericWorkerCall(algorithm, session, configuration, leftSetIndex, rightSetIndex, leftElements, rightElements, isduplicate, deleteTemporaryFiles); + String result = HttpRequest.PostXmlString(url, wpsHost, wpsPort, new LinkedHashMap(), username, token, algorithmCall); +// AnalysisLogger.getLogger().debug("Result: " + result); + + boolean success = false; + boolean failure = false; + + if (result.contains(GenericWorker.TASK_SUCCESS)) + success = true; + else if (result.contains(GenericWorker.TASK_FAILURE)) + failure = true; + + String statusLocation = ""; + + while (!success && !isStopped() && (!failure) ) { //while !success and failure + if (result == null || result.contains(GenericWorker.TASK_FAILURE) || result.contains("Exception")) + failure = true; + + else if (result.contains(GenericWorker.TASK_SUCCESS)) + success = true; + else if (result.contains("Process Accepted")) { + statusLocation = result.substring(result.indexOf("statusLocation=") + "statusLocation=".length()); + statusLocation = statusLocation.substring(0, statusLocation.indexOf(">")); + statusLocation = statusLocation.replace("\"", ""); + statusLocation = statusLocation + "&gcube-token=" + token; +// AnalysisLogger.getLogger().debug("Status Location: " + statusLocation); + result= ""; + } else { + Thread.sleep(pollingTime); + result = HttpRequest.sendGetRequest(statusLocation, ""); +// AnalysisLogger.getLogger().debug("Result in location: " + result); + } + // request = HttpRequest.sendGetRequest(url, ""); // AnalysisLogger.getLogger().debug("Answer for task "+taskNumber+": "+request); }catch(Exception e){ AnalysisLogger.getLogger().debug("Request failure for task "+taskNumber+": "+e.getLocalizedMessage()); } if (request.contains("")) success = true; if (request.contains("")){ failure = true; incrementOverallFailures(); } try { Thread.sleep(pollingTime); } catch (InterruptedException e) { e.printStackTrace(); } } + } + + if (isStopped() && statusLocation!=null && statusLocation.length()>0){ + String wpscancel = statusLocation.replace("RetrieveResultServlet", "CancelComputationServlet"); + result = HttpRequest.sendGetRequest(wpscancel, ""); + } + + + exitstatus = GenericWorker.TASK_SUCCESS; + if (failure) + { + exitstatus = GenericWorker.TASK_FAILURE; + AnalysisLogger.getLogger().debug("Task Number "+taskNumber+" - Failure cause: " + result); + } +// AnalysisLogger.getLogger().debug("Process execution finished: " + exitstatus); + + } catch (Exception e) { + e.printStackTrace(); + AnalysisLogger.getLogger().debug(e); + AnalysisLogger.getLogger().debug("Task Number "+taskNumber+" - Process exception: " + e.getLocalizedMessage()); + exitstatus = GenericWorker.TASK_FAILURE; + + }finally{ + + } + } + @Override + public void run() { + int trials = 0; + boolean duplicate = false; + while (!exitstatus.equals(GenericWorker.TASK_SUCCESS) && trials arguments, String session) { + ExecutorService executor = null; + try{ + int numberofservices = 1; + + AnalysisLogger.getLogger().debug("Estimating the number of services"); + + List wpsservices = InfraRetrieval.retrieveService("DataMiner", configuration.getGcubeScope()); + + if (wpsservices==null || wpsservices.size()==0) + throw new Exception ("No Dataminer GCore Endpoint found in the VRE "+configuration.getGcubeScope()); + + List differentServices = new ArrayList(); + for (String service:wpsservices){ + + service = service.substring(service.indexOf("/")+2); + service = service.substring(0,service.indexOf(":")); + if (!differentServices.contains(service)) + differentServices.add(service); + + } + numberofservices = differentServices.size(); + + AnalysisLogger.getLogger().debug("WPSJobManager->Number of dataminer services "+numberofservices); + int parallelisation = numberofservices*2; + AnalysisLogger.getLogger().debug("WPSJobManager->Number of parallel processes (parallelisation) : "+parallelisation); + + List wpshosts = InfraRetrieval.retrieveAddresses("DataAnalysis",configuration.getGcubeScope(),"-----"); + + if (wpshosts==null || wpshosts.size()==0) + throw new Exception ("WPSJobManager->No Dataminer Service Endpoint found in the VRE "+configuration.getGcubeScope()); + + + String wpshost = wpshosts.get(0); + wpshost = wpshost.substring(wpshost.indexOf("/")+2); + //String wpshostAddress = wpshost.substring(0,wpshost.indexOf(":")); + String wpshostAddress = wpshost.substring(0,wpshost.indexOf("/")); + //String wpshostPort = wpshost.substring(wpshost.indexOf(":")+1,wpshost.indexOf("/")); + //http://dataminer1-devnext.d4science.org:80/wps/gcube/resourc + wpshost=wpshostAddress; + int wpsport = 80; + overallTasks=arguments.size(); + + executor = Executors.newFixedThreadPool(parallelisation); + int taskNumber = 0; + + AnalysisLogger.getLogger().debug("WPSJobManager->Executing algorithm class:"+algorithmClass); + + + for (String argument:arguments) { + String[] lfnlnr = argument.split(" "); + int leftOff = Integer.parseInt(lfnlnr[0]); + int leftNum = Integer.parseInt(lfnlnr[1]); + int rightOff = Integer.parseInt(lfnlnr[2]); + int rightNum = Integer.parseInt(lfnlnr[3]); + + TasksWatcher watcher = new TasksWatcher(algorithmClass, + configuration.getGcubeUserName(), + configuration.getGcubeToken(),wpshost,wpsport,session,taskNumber,configuration, leftOff, rightOff,leftNum,rightNum); + + executor.execute(watcher); + AnalysisLogger.getLogger().debug("WPSJobManager->Task number "+taskNumber+" launched!"); + taskNumber++; + } + + int njobs = overallFailures+overallSuccess; + int pnjobs =njobs; + while (njobs0.5) + if (overallFailures>0) + stop(); + njobs = overallFailures+overallSuccess; + if (pnjobsNumber of finished jobs "+njobs+" of "+overallTasks); + AnalysisLogger.getLogger().debug("WPSJobManager->Number of errors "+overallFailures+" - perc failure "+percFailure); + } + } + + AnalysisLogger.getLogger().debug("WPSJobManager->Overall computation finished"); + }catch(Exception e){ + e.printStackTrace(); + } + finally{ + if (executor!=null){ + AnalysisLogger.getLogger().debug("WPSJobManager->Shutting down the executions"); + executor.shutdown(); + } + } + + } + +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java b/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java index 6183391..e788baf 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java +++ b/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java @@ -70,6 +70,8 @@ public abstract class GenericRScript extends StandardLocalExternalAlgorithm { List inputs = getInputParameters(); for (String input : inputvariables) { String value = config.getParam(input); + if (value == null) + value = ""; String defaultValue = inputs.get(i).getDefaultValue(); defaultValue = defaultValue.replace("(", "\\(").replace(")", "\\)").replace("[", "\\[").replace("]", "\\]").replace("|", "\\|").replace(".", "\\.").replace("?", "\\?").replace("*", "\\*").replace("+", "\\+").replace("{", "\\{").replace("}", "\\}"); // inputParameters.put(defaultValue, value); diff --git a/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestBiOnym.java b/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestBiOnym.java index b481db0..233f99a 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestBiOnym.java +++ b/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestBiOnym.java @@ -1,5 +1,8 @@ package org.gcube.dataanalysis.executor.tests; +import org.gcube.common.authorization.library.provider.AuthorizationProvider; +import org.gcube.common.authorization.library.provider.SecurityTokenProvider; +import org.gcube.common.scope.api.ScopeProvider; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer; @@ -11,7 +14,7 @@ public class RegressionTestBiOnym { // Generate AlgorithmConfiguration config = new AlgorithmConfiguration(); config.setConfigPath("./cfg/"); - + config.setParam("DatabaseUserName","utente"); config.setParam("DatabasePassword","d4science"); config.setParam("DatabaseURL","jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb"); @@ -43,14 +46,15 @@ public class RegressionTestBiOnym { config.setParam(BionymFlexibleWorkflowTransducer.destinationTableParam, "taxamatchoutputlocal"); config.setParam(BionymFlexibleWorkflowTransducer.destinationTableLableParam, "taxamatchoutputlabel"); //1000 -// config.setParam(BionymFlexibleWorkflowTransducer.originTableParam, "taxamatchinput1000"); -// config.setParam(BionymFlexibleWorkflowTransducer.rawnamesColumnParam, "rawstrings"); + config.setParam(BionymFlexibleWorkflowTransducer.originTableParam, "taxamatchinput1000"); + config.setParam(BionymFlexibleWorkflowTransducer.rawnamesColumnParam, "rawstrings"); // config.setParam(BionymFlexibleWorkflowTransducer.originTableParam, "taxamatchinput"); // config.setParam(BionymFlexibleWorkflowTransducer.rawnamesColumnParam, "rawstrings"); //4 - config.setParam(BionymFlexibleWorkflowTransducer.originTableParam, "generic_id1ecb405c_980f_47a4_926a_3043d065fc7d"); - config.setParam(BionymFlexibleWorkflowTransducer.rawnamesColumnParam, "field0"); + //config.setParam(BionymFlexibleWorkflowTransducer.originTableParam, "generic_id1ecb405c_980f_47a4_926a_3043d065fc7d"); + //config.setParam(BionymFlexibleWorkflowTransducer.rawnamesColumnParam, "field0"); + //2 // config.setParam(BionymFlexibleWorkflowTransducer.originTableParam, "generic_id471e6d50_d243_4112_bc07_e22152438e5c"); // config.setParam(BionymFlexibleWorkflowTransducer.rawnamesColumnParam, "field0"); @@ -62,7 +66,13 @@ public class RegressionTestBiOnym { config.setAgent("BIONYM"); config.setPersistencePath("./"); - config.setGcubeScope( "/gcube"); + String scope = "/gcube/devNext/NextNext"; + config.setGcubeScope( scope ); + ScopeProvider.instance.set(scope); + String authorizationToken = "cb289202-e7d6-45ee-8076-a80bc4d4be51-98187548"; + SecurityTokenProvider.instance.set(authorizationToken); + config.setGcubeToken(authorizationToken); + config.setGcubeUserName("gianpaolo.coro"); // config.setGcubeScope( "/d4science.research-infrastructures.eu"); config.setParam("ServiceUserName", "gianpaolo.coro"); diff --git a/src/main/java/org/gcube/dataanalysis/executor/tests/TestRemoteJobLaunch.java b/src/main/java/org/gcube/dataanalysis/executor/tests/TestRemoteJobLaunch.java deleted file mode 100644 index 1bd0ed6..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/tests/TestRemoteJobLaunch.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.gcube.dataanalysis.executor.tests; - -import java.util.ArrayList; -import java.util.List; - -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.dataanalysis.executor.job.management.RemoteJobManager; - -public class TestRemoteJobLaunch { - - - public static void main(String [] args) throws Exception{ - String scope = "/gcube"; - String serviceClass = "TestGP"; - String serviceName = "TestGPHome"; - String owner = "GP"; - String directory = "./shipping/"; - String remotedirectory = "/shipping/"; - String tempDir = "./"; - String scriptName = "execute.sh"; - int numberOfNodes = 1; - List argums = new ArrayList(); - argums.add("0_178204_0_3_./"); - AnalysisLogger.setLogger("./cfg/ALog.properties"); - RemoteJobManager job = new RemoteJobManager(scope,numberOfNodes); -// job.uploadAndExecute(serviceClass, serviceName, owner, directory, remotedirectory, tempDir, scriptName, argums); - } - -} diff --git a/src/main/java/org/gcube/dataanalysis/executor/tests/TestWPSJobs.java b/src/main/java/org/gcube/dataanalysis/executor/tests/TestWPSJobs.java new file mode 100644 index 0000000..1e3388a --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/tests/TestWPSJobs.java @@ -0,0 +1,105 @@ +package org.gcube.dataanalysis.executor.tests; + +import java.util.ArrayList; +import java.util.List; + +import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.executor.job.management.GenericWorker; +import org.gcube.dataanalysis.executor.job.management.WPSJobManager; +import org.gcube.dataanalysis.executor.job.management.WPSJobManager.TasksWatcher; +import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer; +import org.gcube.dataanalysis.executor.nodes.transducers.bionym.utils.YasmeenGlobalParameters; + +public class TestWPSJobs { + + public static AlgorithmConfiguration buildTestConfiguration(){ + AlgorithmConfiguration config = new AlgorithmConfiguration(); + config.setConfigPath("./cfg/"); + AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile); + config.setParam("DatabaseUserName", "utente"); + config.setParam("DatabasePassword", "d4science"); + config.setParam("DatabaseURL", "jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb"); + + config.setParam(YasmeenGlobalParameters.parserNameParam, YasmeenGlobalParameters.BuiltinParsers.SIMPLE.name()); + config.setParam(YasmeenGlobalParameters.taxaAuthorityFileParam, YasmeenGlobalParameters.BuiltinDataSources.WORMS_PISCES.name()); + config.setParam(YasmeenGlobalParameters.activatePreParsingProcessing, "true"); + config.setParam(YasmeenGlobalParameters.useStemmedGenusAndSpecies, "false"); + + config.setParam(BionymFlexibleWorkflowTransducer.matcherParamPrefix + "_" + 1, YasmeenGlobalParameters.BuiltinMatchers.GSAy.name()); + config.setParam(BionymFlexibleWorkflowTransducer.thresholdParamPrefix + "_" + 1, "0.6"); + config.setParam(BionymFlexibleWorkflowTransducer.maxresultsParamPrefix + "_" + 1, "10"); + + config.setParam(BionymFlexibleWorkflowTransducer.matcherParamPrefix + "_" + 2, YasmeenGlobalParameters.BuiltinMatchers.FUZZYMATCH.name()); + config.setParam(BionymFlexibleWorkflowTransducer.thresholdParamPrefix + "_" + 2, "0.6"); + config.setParam(BionymFlexibleWorkflowTransducer.maxresultsParamPrefix + "_" + 2, "10"); + + config.setParam(BionymFlexibleWorkflowTransducer.matcherParamPrefix + "_" + 3, YasmeenGlobalParameters.BuiltinMatchers.LEVENSHTEIN.name()); + config.setParam(BionymFlexibleWorkflowTransducer.thresholdParamPrefix + "_" + 3, "0.4"); + config.setParam(BionymFlexibleWorkflowTransducer.maxresultsParamPrefix + "_" + 3, "10"); + + config.setParam(BionymFlexibleWorkflowTransducer.matcherParamPrefix + "_" + 4, YasmeenGlobalParameters.BuiltinMatchers.TRIGRAM.name()); + config.setParam(BionymFlexibleWorkflowTransducer.thresholdParamPrefix + "_" + 4, "0.4"); + config.setParam(BionymFlexibleWorkflowTransducer.maxresultsParamPrefix + "_" + 4, "10"); + + config.setParam(BionymFlexibleWorkflowTransducer.destinationTableParam, "taxamatchoutputlocal"); + config.setParam(BionymFlexibleWorkflowTransducer.destinationTableLableParam, "taxamatchoutputlabel"); + + // 4 + //config.setParam(BionymFlexibleWorkflowTransducer.originTableParam, "generic_id1ecb405c_980f_47a4_926a_3043d065fc7d"); + //config.setParam(BionymFlexibleWorkflowTransducer.rawnamesColumnParam, "field0"); + + config.setParam(BionymFlexibleWorkflowTransducer.originTableParam, "taxamatchinput1000"); + config.setParam(BionymFlexibleWorkflowTransducer.rawnamesColumnParam, "rawstrings"); + + config.setAgent("BIONYM"); + config.setPersistencePath("./"); + config.setGcubeScope("/gcube/devNext/NextNext"); +// config.setGcubeScope("/gcube/devsec/devVRE"); + config.setParam("ServiceUserName", "gianpaolo.coro"); + config.setParam("DatabaseDriver", "org.postgresql.Driver"); + config.setGcubeUserName("gianpaolo.coro"); + config.setGcubeToken("cb289202-e7d6-45ee-8076-a80bc4d4be51-98187548"); + + return config; + } + + public static void main1(String[] args) throws Exception { + + String host = "dataminer1-devnext.d4science.org"; + String session = "12345"; + int port = 80; + String algorithm = "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer"; + AlgorithmConfiguration config = buildTestConfiguration(); + + WPSJobManager manager = new WPSJobManager(); + TasksWatcher taskWatcher = manager.new TasksWatcher(algorithm, config.getGcubeUserName(), config.getGcubeToken(), host, port, session, 1, config, 1, 1, 1, 1); + Thread t = new Thread(taskWatcher); + t.start(); + + while (taskWatcher.exitstatus.equals(GenericWorker.TASK_UNDEFINED)){ + Thread.sleep(1000); + System.out.print("."); + } + + AnalysisLogger.getLogger().debug("Task 1 terminated with output "+taskWatcher.exitstatus ); + //taskWatcher.run(); + } + + public static void main(String[] args) throws Exception { + AlgorithmConfiguration config = buildTestConfiguration(); + String algorithm = "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer"; + ScopeProvider.instance.set(config.getGcubeScope()); + WPSJobManager jobmanager = new WPSJobManager(); + int nArguments = 100; + List arguments = new ArrayList(); + for (int i=1;i<=nArguments;i++){ + String argument = "1 1 "+i+" 1"; + arguments.add(argument); + } + String sessionID ="1234"; + jobmanager.uploadAndExecuteChunkized(config, algorithm, arguments,sessionID); + + } +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/DataTransferer.java b/src/main/java/org/gcube/dataanalysis/executor/util/DataTransferer.java index 69235d7..0ee6153 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/util/DataTransferer.java +++ b/src/main/java/org/gcube/dataanalysis/executor/util/DataTransferer.java @@ -29,7 +29,7 @@ public class DataTransferer { //String scope = "/d4science.research-infrastructures.eu/gCubeApps"; String scope = "/gcube/devsec/devVRE"; - ScopeProvider.instance.set(scope); + //test only ScopeProvider.instance.set(scope); //String transferGHN = "dewn04.madgik.di.uoa.gr"; String transferGHN = "access.d4science.org"; int transferPort = 8080; @@ -71,7 +71,7 @@ public class DataTransferer { // returns the number of transferred bytes public static boolean transferFileToService(String scope, String username, String service, int port, String fileAbsolutePath, String remoteFolder) throws Exception { AnalysisLogger.getLogger().debug("Transferring file " + fileAbsolutePath + " to " + service + ":" + port); - ScopeProvider.instance.set(scope); +// ScopeProvider.instance.set(scope); AgentLibrary library = transferAgent().at(service, port).build(); ArrayList input = new ArrayList(); @@ -82,7 +82,7 @@ public class DataTransferer { String localfolder = localFile.getParent(); String file = localFile.getName(); AnalysisLogger.getLogger().debug("Uploading file " + file + " onto storage"); - ScopeProvider.instance.set(scope); +// ScopeProvider.instance.set(scope); AnalysisLogger.getLogger().info("Loading file on scope: " + scope); String storagesmpurl = StorageUtils.uploadFilesOnStorage(scope, username, localfolder, "/",file,true); diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java b/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java index f92afb5..ad7c9b5 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java +++ b/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java @@ -17,7 +17,7 @@ public class InfraRetrieval { return new ArrayList(); // AnalysisLogger.getLogger().debug("RetrieveAddressesFromInfra->Setting Scope to " + scope+" and executing query"); - ScopeProvider.instance.set(scope); +// ScopeProvider.instance.set(scope); SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class); query.addCondition("$resource/Profile/Category/text() eq '" + Category + "'").addCondition("$resource/Profile[Name[not(contains(., '" + exclude + "'))]]").setResult("$resource/Profile/AccessPoint/Interface/Endpoint/text()"); @@ -31,7 +31,7 @@ public class InfraRetrieval { if (scope == null || scope.length() == 0) return new ArrayList(); - ScopeProvider.instance.set(scope); +// ScopeProvider.instance.set(scope); SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class); query.addCondition("$resource/Profile/Category/text() eq '" + Category + "'").addCondition("$resource/Profile/Name/text() eq '" + Name+ "'").addCondition("$resource/Profile[Name[not(contains(., '" + exclude + "'))]]").setResult("$resource/Profile/AccessPoint/Interface/Endpoint/text()"); @@ -45,7 +45,7 @@ public class InfraRetrieval { if (scope == null || scope.length() == 0) return new ArrayList(); - ScopeProvider.instance.set(scope); +// ScopeProvider.instance.set(scope); SimpleQuery query = ICFactory.queryFor(GCoreEndpoint.class); query.addCondition("$resource/Profile/ServiceName/text() eq '"+service+"'").setResult("$resource/Profile/AccessPoint/RunningInstanceInterfaces/Endpoint/text()"); diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/RScriptsManager.java b/src/main/java/org/gcube/dataanalysis/executor/util/RScriptsManager.java index fb499b9..770e298 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/util/RScriptsManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/util/RScriptsManager.java @@ -13,6 +13,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.UUID; +import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; @@ -114,6 +115,7 @@ public class RScriptsManager { String scope = config.getGcubeScope(); if (scope == null) scope = ScopeProvider.instance.get(); + //SecurityTokenProvider.instance.set(authorizationToken); AnalysisLogger.getLogger().debug("Current User: " + owner); AnalysisLogger.getLogger().debug("Current Scope: " + scope); diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java b/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java index 05ce5a5..bda2e86 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java +++ b/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java @@ -55,7 +55,7 @@ public class StorageUtils { public static String uploadFilesOnStorage(String scope, String user, String localFolder, String remoteFolder, String file, boolean httplink) throws Exception { try { - ScopeProvider.instance.set(scope); +// ScopeProvider.instance.set(scope); AnalysisLogger.getLogger().info("Loading file on scope: " + scope); IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient(); String remotef = remoteFolder+file.replace(" ","%20"); @@ -77,7 +77,7 @@ public class StorageUtils { public static String uploadFilesOnStorage(String scope, String user, String localFolder, String file) throws Exception { try { - ScopeProvider.instance.set(scope); +// ScopeProvider.instance.set(scope); AnalysisLogger.getLogger().info("Loading file on scope: " + scope); IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient(); String remotef = "/"+file; @@ -142,7 +142,7 @@ public class StorageUtils { public static void downloadFilefromStorage(String scope, String user, String localFolder, String file) throws Exception { try { - ScopeProvider.instance.set(scope); +// ScopeProvider.instance.set(scope); AnalysisLogger.getLogger().info("Retrieving file on scope: " + scope); IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient(); String remotef = "/"+file;