diff --git a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java index ad2247d..5506400 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java +++ b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java @@ -15,7 +15,6 @@ import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.ServiceParameters; import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; import org.gcube.dataanalysis.ecoengine.interfaces.Generator; import org.gcube.dataanalysis.ecoengine.interfaces.GenericAlgorithm; -import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgent; import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgentWPS; public class D4ScienceDistributedProcessing implements Generator { diff --git a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessingExecutor.java b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessingExecutor.java deleted file mode 100644 index 55e03c9..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessingExecutor.java +++ /dev/null @@ -1,149 +0,0 @@ -package org.gcube.dataanalysis.executor.generators; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS; -import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; -import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE; -import org.gcube.dataanalysis.ecoengine.datatypes.ServiceType; -import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; -import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.ServiceParameters; -import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; -import org.gcube.dataanalysis.ecoengine.interfaces.Generator; -import org.gcube.dataanalysis.ecoengine.interfaces.GenericAlgorithm; -import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgent; - -public class D4ScienceDistributedProcessingExecutor implements Generator { - - public static int maxMessagesAllowedPerJob = 20; - public static boolean forceUpload = true; - public static String defaultContainerFolder = "PARALLEL_PROCESSING"; - protected AlgorithmConfiguration config; - protected ActorNode distributedModel; - protected String mainclass; - DistributedProcessingAgent agent; - - public D4ScienceDistributedProcessingExecutor(){ - } - - public D4ScienceDistributedProcessingExecutor(AlgorithmConfiguration config) { - this.config = config; - - AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile); - } - - public void compute() throws Exception { - try { - agent.compute(); - distributedModel.postProcess(agent.hasResentMessages(),false); - } catch (Exception e) { - distributedModel.postProcess(false,true); - AnalysisLogger.getLogger().error("ERROR: An Error occurred ", e); - throw e; - } finally { - shutdown(); - } - } - - @Override - public List getInputParameters() { - - List distributionModelParams = new ArrayList(); - distributionModelParams.add(new ServiceType(ServiceParameters.USERNAME,"ServiceUserName","The final user Name")); - - return distributionModelParams; - } - - - @Override - public String getResources() { - return agent.getResources(); - } - - @Override - public float getStatus() { - return agent.getStatus(); - } - - @Override - public StatisticalType getOutput() { - return distributedModel.getOutput(); - } - - @Override - public ALG_PROPS[] getSupportedAlgorithms() { - ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON}; - return p; - } - - @Override - public INFRASTRUCTURE getInfrastructure() { - return INFRASTRUCTURE.D4SCIENCE; - } - - @Override - public void init() throws Exception { - - Properties p = AlgorithmConfiguration.getProperties(config.getConfigPath() + AlgorithmConfiguration.nodeAlgorithmsFile); - String model = config.getModel(); - String algorithm = null; - if ((model!=null) && (model.length()>0)) - algorithm = model; - else - algorithm=config.getAgent(); - - mainclass = p.getProperty(algorithm); - distributedModel = (ActorNode) Class.forName(mainclass).newInstance(); - distributedModel.setup(config); - String scope = config.getGcubeScope(); - AnalysisLogger.getLogger().info("Using the following scope for the computation:"+scope); - String owner = config.getParam("ServiceUserName"); - int leftNum = distributedModel.getNumberOfLeftElements(); - int rightNum = distributedModel.getNumberOfRightElements(); - agent = new DistributedProcessingAgent(config, scope, owner, mainclass, config.getPersistencePath(), algorithm, defaultContainerFolder, maxMessagesAllowedPerJob, forceUpload, leftNum, rightNum,config.getTaskID()); - agent.setLogger(AnalysisLogger.getLogger()); - } - - @Override - public void setConfiguration(AlgorithmConfiguration config) { - this.config = config; - AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile); - } - - @Override - public void shutdown() { - try { - agent.shutdown(); - } catch (Exception e) { - } - try { - distributedModel.stop(); - } catch (Exception e) { - } - } - - @Override - public String getLoad() { - return agent.getLoad(); - } - - @Override - public String getResourceLoad() { - return agent.getResourceLoad(); - } - - - @Override - public GenericAlgorithm getAlgorithm() { - return distributedModel; - } - - @Override - public String getDescription() { - return "A D4Science Cloud Processor for Species Distributions"; - } - -} diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java deleted file mode 100644 index 867560a..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java +++ /dev/null @@ -1,300 +0,0 @@ -package org.gcube.dataanalysis.executor.job.management; - -import java.io.File; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.log4j.Logger; -import org.gcube.contentmanagement.graphtools.utils.HttpRequest; -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools; -import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS; -import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; -import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE; -import org.gcube.dataanalysis.ecoengine.connectors.livemonitor.ResourceLoad; -import org.gcube.dataanalysis.ecoengine.connectors.livemonitor.Resources; -import org.gcube.dataanalysis.ecoengine.utils.Operations; - -import com.thoughtworks.xstream.XStream; - -public class DistributedProcessingAgent { - - - protected QueueJobManager jobManager; - protected boolean deletefiles = true; - protected String mainclass; - public int maxElementsAllowedPerJob = 20; - protected boolean forceUpload = true; - protected boolean stop; - protected String gscope; - protected String userName; - protected String pathToLib; - protected String modelName; - protected String containerFolder; - protected Serializable configurationFile; - protected int rightSetNumberOfElements; - protected int leftSetNumberOfElements; - protected List endpoints; - protected int subdivisiondiv; - protected String sessionID; - - protected static String defaultJobOutput = "execution.output"; - protected static String defaultScriptFile = "script"; - protected Logger logger; - - /** - * A distributed processing agent. Performs a distributed computation doing the MAP of the product of two sets: A and B - * Splits over B : A x B1 , A x B2, ... , A x Bn - * Prepares a script to be executed on remote nodes - * The computation is then sent to remote processors. - */ - public DistributedProcessingAgent(Serializable configurationFile, - String gCubeScope, - String computationOwner, - String mainClass, - String pathToLibFolder, - String modelName, - String containerFolder, - int maxElementsPerJob, - boolean forceReUploadofLibs, - int leftSetNumberOfElements, - int rightSetNumberOfElements, - String sessionID - ) { - this.stop = false; - this.deletefiles = true; - this.gscope=gCubeScope; - this.mainclass=mainClass; - this.maxElementsAllowedPerJob=maxElementsPerJob; - this.forceUpload=forceReUploadofLibs; - this.configurationFile=configurationFile; - this.rightSetNumberOfElements=rightSetNumberOfElements; - this.leftSetNumberOfElements=leftSetNumberOfElements; - this.userName=computationOwner; - this.pathToLib=pathToLibFolder; - this.modelName=modelName; - this.containerFolder=containerFolder; - this.sessionID = sessionID; - } - - public void setLogger(Logger logger){ - this.logger=logger; - } - - public void setEndPoints(List endpoints){ - this.endpoints=endpoints; - } - - public boolean hasResentMessages(){ - return jobManager.hasResentMessages(); - } - - public void compute() throws Exception { - try { - if (logger == null){ - logger = AnalysisLogger.getLogger(); - } - if (gscope == null) - throw new Exception("Null Scope"); - AnalysisLogger.getLogger().debug("SCOPE: "+gscope); - if (endpoints != null) { - - /* - List eprtList = new ArrayList(); - for (String ep : endpoints) { - eprtList.add(new EndpointReferenceType(new Address(ep))); - } - - jobManager = new QueueJobManager(gscope, endpoints.size(), eprtList); - */ - jobManager = new QueueJobManager(gscope, endpoints.size(), endpoints,sessionID); - } else - jobManager = new QueueJobManager(gscope, 1,sessionID); - - int numberOfResources = jobManager.getNumberOfNodes(); - // we split along right dimension so if elements are less than nodes, we should reduce the number of nodes - if (numberOfResources > 0) { - // chunkize the number of species in order to lower the computational effort of the workers - subdivisiondiv = rightSetNumberOfElements / (numberOfResources * maxElementsAllowedPerJob); - int rest = rightSetNumberOfElements % (numberOfResources * maxElementsAllowedPerJob); - if (rest > 0) - subdivisiondiv++; - if (subdivisiondiv == 0) - subdivisiondiv = 1; - - executeWork(leftSetNumberOfElements, rightSetNumberOfElements, 0, subdivisiondiv, deletefiles, forceUpload); - - if (jobManager.wasAborted()) { - logger.debug("Warning: Job was aborted"); -// distributionModel.postProcess(false,true); - throw new Exception("Job System Error"); - } - else{ - //postprocess -// distributionModel.postProcess(jobManager.hasResentMessages(),false); - } - - } else { - logger.debug("Warning: No Workers available"); - throw new Exception("No Workers available"); - } - - } catch (Exception e) { - logger.error("ERROR: An Error occurred ", e); - e.printStackTrace(); - throw e; - } finally { - shutdown(); - } - } - - private void executeWork(int leftNum, int rightNum, int offset, int numberOfResources, boolean deletefiles, boolean forceUpload) throws Exception { - - String owner = userName; - - int[] chunkSizes = Operations.takeChunks(rightNum, numberOfResources); - List arguments = new ArrayList(); - // chunkize respect to the cells: take a chunk of cells vs all species at each node! - - for (int i = 0; i < chunkSizes.length; i++) { - String argumentString = "0 " + leftNum + " " + offset + " " + chunkSizes[i] + " ./ "+mainclass; - arguments.add(argumentString); - offset += chunkSizes[i]; - logger.debug("Generator-> Argument " + i + ": " + argumentString); - } - - if (owner == null) - throw new Exception("Null Owner"); - - String pathToDir = new File (pathToLib, containerFolder).getAbsolutePath(); - - if (!(new File(pathToDir).exists())) - throw new Exception("No Implementation of node-model found for algorithm " + pathToDir); - - if (mainclass == null) - throw new Exception("No mainClass found for algorithm " + pathToDir); - - buildScriptFile(modelName, defaultJobOutput, pathToDir, mainclass); - - jobManager.uploadAndExecuteChunkized(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, owner, pathToDir, "/" + modelName + "/", "./", getScriptName(mainclass), arguments, new XStream().toXML(configurationFile), deletefiles, forceUpload); - - } - - private String getScriptName(String fullMainClass){ - String scriptName = defaultScriptFile+"_"+fullMainClass.substring(fullMainClass.lastIndexOf(".")+1)+".sh"; - return scriptName; - } - // builds a job.sh - public void buildScriptFile(String jobName, String jobOutput, String jarsPath, String fullMainClass) throws Exception { - File expectedscript = new File(jarsPath,getScriptName(fullMainClass)); - if (!expectedscript.exists()) { - StringBuffer sb = new StringBuffer(); - sb.append("#!/bin/sh\n"); - sb.append("# " + jobName + "\n"); - sb.append("cd $1\n"); - sb.append("\n"); - sb.append("java -Xmx1024M -classpath ./:"); - File jarsPathF = new File(jarsPath); - File[] files = jarsPathF.listFiles(); - - for (File jar : files) { - - if (jar.getName().endsWith(".jar")) { - sb.append("./" + jar.getName()); - sb.append(":"); - } - } - - sb.deleteCharAt(sb.length() - 1); - sb.append(" " + fullMainClass + " $2 " + jobOutput); - sb.append("\n"); - - AnalysisLogger.getLogger().trace("D4ScienceGenerator->Generating script in " + expectedscript.getAbsolutePath()); - FileTools.saveString(expectedscript.getAbsolutePath(), sb.toString(), true, "UTF-8"); - } - AnalysisLogger.getLogger().trace("D4ScienceGenerator->Script " + expectedscript.getAbsolutePath()+" yet exists!"); - } - - public String getResources() { - Resources res = new Resources(); - try { - int activeNodes = jobManager.getActiveNodes(); - for (int i = 0; i < activeNodes; i++) { - try { - res.addResource("Worker_" + (i + 1), 100); - } catch (Exception e1) { - } - } - } catch (Exception e) { - AnalysisLogger.getLogger().debug("D4ScienceGenerator->active nodes not ready"); - } - if ((res != null) && (res.list != null)) - return HttpRequest.toJSon(res.list).replace("resId", "resID"); - else - return ""; - } - - public float getStatus() { - try { - if (stop) - return 100f; - else - if (jobManager!=null) - return Math.max(0.5f, jobManager.getStatus() * 100f); - else - return 0; - } catch (Exception e) { - return 0f; - } - } - - public ALG_PROPS[] getSupportedAlgorithms() { - ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON}; - return p; - } - - public INFRASTRUCTURE getInfrastructure() { - return INFRASTRUCTURE.D4SCIENCE; - } - - public void shutdown() { - - try { - jobManager.stop(); - } catch (Exception e) { - } - stop = true; - } - - public String getLoad() { - long tk = System.currentTimeMillis(); - ResourceLoad rs = null; - if (jobManager!=null) - rs = new ResourceLoad(tk, jobManager.currentNumberOfStages*subdivisiondiv); - else - rs = new ResourceLoad(tk, 0); - return rs.toString(); - } - - private long lastTime; - private int lastProcessed; - public String getResourceLoad() { - long thisTime = System.currentTimeMillis(); - int processedRecords = 0; - if ((jobManager!=null) && (subdivisiondiv>0)) - processedRecords = jobManager.currentNumberOfStages*subdivisiondiv; - - int estimatedProcessedRecords = 0; - if (processedRecords == lastProcessed) { - estimatedProcessedRecords = Math.round(((float) thisTime * (float) lastProcessed) / (float) lastTime); - } else { - lastProcessed = processedRecords; - estimatedProcessedRecords = lastProcessed; - } - lastTime = thisTime; - ResourceLoad rs = new ResourceLoad(thisTime, estimatedProcessedRecords); - return rs.toString(); - } - -} diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java deleted file mode 100644 index 93eaa97..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java +++ /dev/null @@ -1,915 +0,0 @@ -package org.gcube.dataanalysis.executor.job.management; - -import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.UUID; - -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; - -import org.apache.activemq.ActiveMQConnection; -import org.gcube.common.clients.ProxyBuilderImpl; -import org.gcube.common.resources.gcore.ServiceEndpoint; -import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint; -import org.gcube.common.scope.api.ScopeProvider; -import org.gcube.contentmanagement.blobstorage.resource.StorageObject; -import org.gcube.contentmanagement.blobstorage.service.IClient; -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.contentmanager.storageclient.wrapper.AccessType; -import org.gcube.contentmanager.storageclient.wrapper.MemoryType; -import org.gcube.contentmanager.storageclient.wrapper.StorageClient; -import org.gcube.dataanalysis.ecoengine.utils.Operations; -import org.gcube.dataanalysis.executor.messagequeue.ATTRIBUTE; -import org.gcube.dataanalysis.executor.messagequeue.Consumer; -import org.gcube.dataanalysis.executor.messagequeue.Producer; -import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS; -import org.gcube.dataanalysis.executor.messagequeue.QueueManager; -import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker; -import org.gcube.resources.discovery.client.api.DiscoveryClient; -import org.gcube.resources.discovery.client.queries.api.SimpleQuery; -import org.gcube.vremanagement.executor.api.SmartExecutor; -import org.gcube.vremanagement.executor.api.types.LaunchParameter; -import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; -import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery; -import org.gcube.vremanagement.executor.client.plugins.query.filter.ListEndpointDiscoveryFilter; -import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; -import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; -import static org.gcube.resources.discovery.icclient.ICFactory.*; - -public class QueueJobManager { - - - // broadcast message period - public static int broadcastTimePeriod = 120000; - // max silence before computation stops - public static int maxSilenceTimeBeforeComputationStop = 10800000; - // max number of retries per computation step - public static int maxNumberOfComputationRetries = 1; - // period for controlling a node activity - public static int computationWatcherTimerPeriod = 120000; - // max number of message to put in a queue -// protected static int maxNumberOfMessages = 20; - public static int maxNumberOfStages = Integer.MAX_VALUE;//10; - // timeout for resending a message - public static int queueWatcherMaxwaitingTime = QCONSTANTS.refreshStatusTime;// * 5; - - protected int maxFailureTries; - private static String pluginName = "SmartGenericWorker";//"GenericWorker"; - - protected String scope; - protected String session; - - protected boolean yetstopped; - protected boolean messagesresent; - protected float status; - protected boolean abort; - protected boolean shutdown; - - protected List eprs; - protected int activeNodes; - protected int computingNodes; - protected int numberOfMessages; - protected int totalNumberOfMessages; - protected int actualNumberOfNodes; - protected int totalNumberOfStages; - public int currentNumberOfStages; - - // files management - protected List filenames; - protected List fileurls; - - // queue parameters - protected String queueName; - protected String queueResponse; - protected String queueURL; - protected String queueUSER; - protected String queuePWD; - protected org.gcube.dataanalysis.executor.messagequeue.Consumer consumer; - protected Producer producer; - - Timer broadcastTimer; - Timer computationWatcherTimer; - ComputationTimerWatcher computationWatcher; - String serviceClass; - String serviceName; - String owner; - String localDir; - String remoteDir; - String outputDir; - String script; - List arguments; - String configuration; - boolean deletefiles; - StatusListener statuslistener; - - private void resetAllVars() { - scope = null; - - yetstopped = false; - messagesresent = false; - status = 0; - abort = false; - shutdown = false; - - eprs = null; - activeNodes = 0; - computingNodes = 0; - numberOfMessages = 0; - - actualNumberOfNodes = 0; - filenames = null; - fileurls = null; - - queueName = null; - queueResponse = null; - queueURL = null; - queueUSER = null; - queuePWD = null; - consumer = null; - producer = null; - broadcastTimer = null; - computationWatcherTimer = null; - computationWatcher = null; - serviceClass = null; - serviceName = null; - owner = null; - localDir = null; - remoteDir = null; - outputDir = null; - script = null; - arguments = null; - configuration = null; - deletefiles = false; - statuslistener = null; - } - - public int getActiveNodes() { - return computingNodes; - } - - public float getStatus() { - float innerStatus = 0; - if (totalNumberOfMessages != 0) - innerStatus = (1f - ((float) numberOfMessages / (float) totalNumberOfMessages)); - if (totalNumberOfStages == 0) - return innerStatus; - else { - float offset = ((float) Math.max(currentNumberOfStages - 1, 0)) / (float) totalNumberOfStages; - float status = offset + (innerStatus / (float) totalNumberOfStages); - // AnalysisLogger.getLogger().info("stages: "+totalNumberOfStages+" inner status: "+innerStatus+" currentStage: "+currentNumberOfStages+" status: "+status); - return status; - } - } - - // there is only one node from the client point of view - public int getNumberOfNodes() { - if (eprs.size() > 0) - return 1; - else - return 0; - } - - public void setNumberOfNodes(int newNumberOfNodes) { - // ignore this setting in this case - } - - private void init(String scope, int numberOfNodes) throws Exception { - resetAllVars(); - // init scope variables - this.scope = scope; - // introduce a session - // initialize flags - shutdown = false; - yetstopped = false; - messagesresent = false; - abort = false; - // find all the nodes - initialize the eprs - findNodes(scope); - } - - public QueueJobManager(String scope, int numberOfNodes, String session) throws Exception { - init(scope, numberOfNodes); - this.session = session; - } - - public QueueJobManager(String scope, int numberOfNodes, List eprs, String session) throws Exception { - init(scope, numberOfNodes); - this.eprs = eprs; - this.session = session; - } - - private void setGlobalVars(String serviceClass, String serviceName, String owner, String localDir, String remoteDir, String outputDir, String script, List arguments, String configuration, boolean deletefiles) { - this.serviceClass = serviceClass; - this.serviceName = serviceName; - this.owner = owner; - this.localDir = localDir; - this.remoteDir = remoteDir; - this.outputDir = outputDir; - this.script = script; - this.arguments = arguments; - this.configuration = configuration; - this.deletefiles = deletefiles; - } - - private int totalmessages = 0; - - public boolean uploadAndExecuteChunkized(String serviceClass, String serviceName, String owner, String localDir, String remoteDir, String outputDir, String script, List arguments, String configuration, boolean deletefiles, boolean forceUpload) throws Exception { - long t0 = System.currentTimeMillis(); - - int elements = arguments.size(); - /*generic-worker - * int div = elements / (maxNumberOfMessages); int rest = elements % (maxNumberOfMessages); if (rest > 0) div++; if (div == 0) { div = 1; } - */ - if (session == null || session.length()==0) - session = (("" + UUID.randomUUID()).replace("-", "") + Math.random()).replace(".", ""); - int[] chunkSizes = null; - //up to 1120 species we don't make stages - if (elements>maxNumberOfStages) - chunkSizes = Operations.takeChunks(elements, maxNumberOfStages); - else { - chunkSizes = new int[1]; - chunkSizes[0]=elements; - } - int allchunks = chunkSizes.length; - totalNumberOfStages = allchunks; - currentNumberOfStages = 0; - int start = 0; - totalmessages = 0; - AnalysisLogger.getLogger().info("Starting the computation in "+allchunks+" stages"); - for (int i = 0; i < allchunks; i++) { - numberOfMessages = totalNumberOfMessages = 0; - currentNumberOfStages++; - int end = Math.min(elements, start + chunkSizes[i]); - AnalysisLogger.getLogger().info("Computing the chunk number " + (i + 1) + " of " + allchunks + " between " + start + " and " + (end - 1)); - List sublist = new ArrayList(); - for (int j = start; j < end; j++) - sublist.add(arguments.get(j)); - - AnalysisLogger.getLogger().info("size sub:" + sublist.size()); - // totalmessages=totalmessages+sublist.size(); - uploadAndExecute(serviceClass, serviceName, owner, localDir, remoteDir, outputDir, script, sublist, configuration, deletefiles, forceUpload); - if (abort) - break; - start = end; - AnalysisLogger.getLogger().info("Processed chunk number " + (i + 1)); - - } - - currentNumberOfStages = totalNumberOfStages; - AnalysisLogger.getLogger().info("Finished computation on all chunks and messages " + totalmessages); - AnalysisLogger.getLogger().info("Whole Procedure done in " + (System.currentTimeMillis() - t0) + " ms"); - return (!abort); - } - - private boolean uploadAndExecute(String serviceClass, String serviceName, String owner, String localDir, String remoteDir, String outputDir, String script, List arguments, String configuration, boolean deletefiles, boolean forceUpload) throws Exception { - int numberOfRetries = maxNumberOfComputationRetries; - boolean recompute = true; - - while ((numberOfRetries > 0) && (recompute)) { - long t0 = System.currentTimeMillis(); - // if (numberOfRetries inputs = generateInputMessage(filenames, fileurls, outputDir, script, arguments.get(k), k, scope, serviceClass, serviceName, owner, remoteDir, session, configuration, deletefiles); producer.sendMessage(inputs, 0); AnalysisLogger.getLogger().info("Sent Message " + k); } } waitForMessages(); if (numberOfMessages>0){ abort = true; } } - */ - - // deleteRemoteFolder(); - // summary - AnalysisLogger.getLogger().info("-SUMMARY-"); - for (int i = 0; i < totalNumberOfMessages; i++) { - if (activeMessages[i]) - AnalysisLogger.getLogger().info("Error : the Message Number " + i + " Was Never Processed!"); - if (resentMessages[i] > 0) { - messagesresent = true; - AnalysisLogger.getLogger().info("Warning : the Message Number " + i + " Was resent " + resentMessages[i] + " Times"); - } - } - AnalysisLogger.getLogger().info("-SUMMARY END-"); - - stop(); - AnalysisLogger.getLogger().info("Stopped"); - AnalysisLogger.getLogger().info("Single Step Procedure done in " + (System.currentTimeMillis() - t0) + " ms"); - activeNodes = 0; - numberOfRetries--; - if (abort) { - recompute = true; - if (numberOfRetries > 0) - Thread.sleep(10000); - } else - recompute = false; - } - - return (!abort); - } - - public boolean hasResentMessages() { - return messagesresent; - } - - public void waitForMessages() throws Exception { - AnalysisLogger.getLogger().info("Waiting..."); - while ((numberOfMessages > 0) && (!abort)) { - Thread.sleep(2000); - - // long tcurrent = System.currentTimeMillis(); - // if ((tcurrent - waitTime) > maxwaitingTime) { - // break; - // } - } - AnalysisLogger.getLogger().info("...Stop - Abort?" + abort); - } - - public boolean wasAborted() { - return abort; - } - - public void purgeQueues() throws Exception { - AnalysisLogger.getLogger().info("Purging Queue"); - List tasksProxies = new ArrayList(); - for (int j = 0; j < actualNumberOfNodes; j++) { - try { - contactNodes(tasksProxies, j, queueName, queueUSER, queuePWD, queueURL, queueResponse, session, "true"); - } catch (Exception e) { - e.printStackTrace(); - AnalysisLogger.getLogger().info("Error in purgin queue on node " + j); - } - } - AnalysisLogger.getLogger().info("Queue Purged"); - } - - public void stop() { - try { - if (!yetstopped) { - if (broadcastTimer != null) { - AnalysisLogger.getLogger().info("Stopping Broadcaster"); - broadcastTimer.cancel(); - broadcastTimer.purge(); - } - - if (computationWatcherTimer != null) { - AnalysisLogger.getLogger().info("Stopping Watcher"); - computationWatcherTimer.cancel(); - computationWatcherTimer.purge(); - } - - AnalysisLogger.getLogger().info("Purging Status Listener"); - - if (statuslistener != null) - statuslistener.destroyAllWatchers(); - - AnalysisLogger.getLogger().info("Stopping Producer and Consumer"); - - try{ - producer.stop(); - producer.closeSession(); - }catch(Exception e1){} - try{ - consumer.stop(); - consumer.closeSession(); - }catch(Exception e2){} - - AnalysisLogger.getLogger().info("Purging Remote Queues"); - purgeQueues(); - - yetstopped = true; - } - } catch (Exception e) { - e.printStackTrace(); - AnalysisLogger.getLogger().info("Not completely stopped"); - } - } - - @SuppressWarnings("unchecked") - private void contactNodes(List tasksProxies, int order, String queueName, String queueUSER, String queuePWD, String queueURL, String queueResponse, String session, String purgeQueue) throws Exception { - // generate the input map according to the arguments - Map inputs = generateWorkerInput(queueName, queueUSER, queuePWD, queueURL, queueResponse, session, purgeQueue); - AnalysisLogger.getLogger().info("Inputs " + inputs); - // take the i-th endpoint of the executor - String selectedEPR = eprs.get(order); - AnalysisLogger.getLogger().info("Broadcasting to node " + (order + 1) + " on " + selectedEPR); - - - /*OLD EXECUTOR CALL - // run the executor script - ExecutorCall call = new ExecutorCall(pluginName, gscope); - call.setEndpointReference(selectedEPR); - TaskCall task = null; - AnalysisLogger.getLogger().info("EPR:" + selectedEPR); - task = call.launch(inputs); -// AnalysisLogger.getLogger().info("Task EPR:" + task.getEndpointReference()); - TaskProxy proxy = task.getProxy(); - tasksProxies.add(new WorkerWatcher(proxy, AnalysisLogger.getLogger())); - // AnalysisLogger.getLogger().info("Contacting node " + (order + 1) + " OK on " + selectedEPR); - */ - -// ScopeProvider.instance.set(scope); - - ExecutorPlugin runExecutorPlugin = new ExecutorPlugin(); - SmartExecutorPluginQuery runQuery = new SmartExecutorPluginQuery(runExecutorPlugin); - - /* TODO Add key_value filter here - * Tuple[] tuples = new Tuple[n]; - * - * runQuery.addConditions(pluginName, tuples); - */ - runQuery.addConditions(pluginName); - SpecificEndpointDiscoveryFilter sedf = new SpecificEndpointDiscoveryFilter(selectedEPR); - runQuery.setEndpointDiscoveryFilter(sedf); - SmartExecutorProxy proxy = new ProxyBuilderImpl(runExecutorPlugin, runQuery).build(); - - //AnalysisLogger.getLogger().debug("Launching Smart Executor in namely Scope: "+scope+" real scope "+ScopeProvider.instance.get()); - AnalysisLogger.getLogger().debug("Launching Smart Executor in namely Scope: "+scope); - LaunchParameter launchParameter = new LaunchParameter(pluginName, inputs); - String excecutionIdentifier = proxy.launch(launchParameter); - tasksProxies.add(new WorkerWatcher(proxy, excecutionIdentifier, AnalysisLogger.getLogger())); - - AnalysisLogger.getLogger().info("Contacting node " + (order + 1) + " OK on " + selectedEPR); - - - } - - - @SuppressWarnings("unchecked") - private List getFilteredEndpoints(String scopeString){ - ScopeProvider.instance.set(scopeString); - - ExecutorPlugin executorPlugin = new ExecutorPlugin(); - SmartExecutorPluginQuery query = new SmartExecutorPluginQuery(executorPlugin); - - /* - add key_value filter here - * Tuple[] tuples = new Tuple[n]; - * - * runQuery.addConditions(pluginName, tuples); - */ - - query.addConditions(pluginName); - - /* Used to add extra filter to ServiceEndpoint discovery */ - query.setServiceEndpointQueryFilter(null); - List nodes = query.discoverEndpoints(new ListEndpointDiscoveryFilter()); - AnalysisLogger.getLogger().debug("Found the following nodes: "+nodes+" in scope "+scopeString); - return nodes; - } - - - private int findNodes(String scopeString) throws Exception { - eprs = getFilteredEndpoints(scopeString); - actualNumberOfNodes = eprs.size(); - return actualNumberOfNodes; - } - - /* - private int findNodes(String scopeString) throws Exception { - AnalysisLogger.getLogger().debug("SCOPE:"+scopeString); - GCUBEScope scope = GCUBEScope.getScope(scopeString); - ISClient client = GHNContext.getImplementation(ISClient.class); - WSResourceQuery wsquery = client.getQuery(WSResourceQuery.class); - wsquery.addAtomicConditions(new AtomicCondition("//gc:ServiceName", "Executor")); - wsquery.addAtomicConditions(new AtomicCondition("/child::*[local-name()='Task']/name[text()='" + pluginName + "']", pluginName)); - List listdoc = client.execute(wsquery, scope); - EndpointReferenceType epr = null; - eprs = new ArrayList(); - int numberOfEP = 0; - for (RPDocument resource : listdoc) { - epr = resource.getEndpoint(); - numberOfEP++; - eprs.add(epr); - } - AnalysisLogger.getLogger().info("Found " + numberOfEP + " endpoints"); - // get current number of available nodes - actualNumberOfNodes = eprs.size(); - return numberOfEP; - } - */ - public String getQueueURL(String scope) throws Exception{ - //set the scope provider first! - //Service - //MessageBroker - - ScopeProvider.instance.set(scope); - SimpleQuery query = queryFor(ServiceEndpoint.class); - query.addCondition("$resource/Profile/Category/text() eq 'Service' and $resource/Profile/Name eq 'MessageBroker' "); - DiscoveryClient client = clientFor(ServiceEndpoint.class); - List resources = client.submit(query); - if (resources==null || resources.size()==0){ - throw new Exception("No Message-Queue available in scope "+scope); - } - else{ - AccessPoint ap = resources.get(0).profile().accessPoints().iterator().next(); - String queue = ap.address(); - AnalysisLogger.getLogger().debug("Found AMQ Url : "+queue); - return queue; - } - } - - private void setQueueVariables() throws Exception { - queueName = "D4ScienceJob"; // + session; - queueResponse = queueName + "Response"+session; - //general scope - - - // TODO Check THIS -// queueURL = gscope.getServiceMap().getEndpoints(GHNContext.MSGBROKER).iterator().next().getAddress().toString(); - queueURL = getQueueURL(scope); - - - //tests on ecosystem - //TODO: delete this! -// queueURL = "tcp://ui.grid.research-infrastructures.eu:6166"; -// queueURL = "tcp://message-broker.d4science.research-infrastructures.eu:6166"; - AnalysisLogger.getLogger().info("Queue for the scope: " + queueURL); - if (queueURL==null){ - if (scope.startsWith("/gcube")) - queueURL = "tcp://ui.grid.research-infrastructures.eu:6166"; - else - queueURL = "tcp://message-broker.d4science.research-infrastructures.eu:6166"; - } - queueUSER = ActiveMQConnection.DEFAULT_USER; - queuePWD = ActiveMQConnection.DEFAULT_PASSWORD; - } - - public void deleteRemoteFolder() throws Exception { - ScopeProvider.instance.set(scope); - IClient client = new StorageClient(serviceClass, serviceName, owner, AccessType.SHARED,MemoryType.VOLATILE).getClient(); -// IClient client = new StorageClient(serviceClass, serviceName, owner, AccessType.SHARED, gscope).getClient(); - AnalysisLogger.getLogger().info("Removing Remote Dir " + remoteDir); - client.removeDir().RDir(remoteDir); - AnalysisLogger.getLogger().info("Removed"); - } - - private void uploadFilesOnStorage(boolean forceupload) throws Exception { - ScopeProvider.instance.set(scope); - IClient client = new StorageClient(serviceClass, serviceName, owner, AccessType.SHARED, MemoryType.VOLATILE).getClient(); -// IClient client = new StorageClient(serviceClass, serviceName, owner, AccessType.SHARED, gscope).getClient(); - File dir = new File(localDir); - File[] files = dir.listFiles(); - AnalysisLogger.getLogger().info("Start uploading"); - filenames = new ArrayList(); - fileurls = new ArrayList(); - boolean uploadFiles = forceupload; - // if we do not force upload then check if the folder is yet there - if (!uploadFiles) { - List remoteObjects = client.showDir().RDir(remoteDir); - // only upload files if they are not yet uploaded - if (remoteObjects.size() == 0) - uploadFiles = true; - } - if (!uploadFiles) - AnalysisLogger.getLogger().info("Unnecessary to Uploading Files"); - - AnalysisLogger.getLogger().info("Loading files"); - //patch for concurrent uploads - String tempdir = ""+UUID.randomUUID()+"/"; - for (File sfile : files) { - if (sfile.getName().startsWith(".")) - continue; - - String localf = sfile.getAbsolutePath(); - String filename = sfile.getName(); - - String remotef = remoteDir + tempdir+sfile.getName(); - if (uploadFiles) { - client.put(true).LFile(localf).RFile(remotef); - AnalysisLogger.getLogger().info("Uploading File "+localf+" as remote file "+remotef); - } - String url = client.getUrl().RFile(remotef); -// AnalysisLogger.getLogger().info("URL obtained: " + url); - filenames.add(filename); - fileurls.add(url); - } - AnalysisLogger.getLogger().info("Loading finished"); - - } - - private void broadcastListenCommandToExecutorNodes() throws Exception { - AnalysisLogger.getLogger().info("Submitting script to Remote Queue " + queueName); - List tasksProxies = new ArrayList(); - try{ - findNodes(scope); - }catch(Exception e){ - AnalysisLogger.getLogger().info("Error in Finding nodes - using previous value"); - } - activeNodes = actualNumberOfNodes; - // launch the tasks - for (int i = 0; i < actualNumberOfNodes; i++) { - try { - contactNodes(tasksProxies, i, queueName, queueUSER, queuePWD, queueURL, queueResponse, session, "false"); - } catch (Exception e) { - e.printStackTrace(); - AnalysisLogger.getLogger().info("Error in Contacting nodes"); - } - } - } - - private void createClientProducer() throws Exception { - AnalysisLogger.getLogger().info("Creating Message Queue and Producer"); - // create the Producer - QueueManager qm = new QueueManager(); - qm.createAndConnect(queueUSER, queuePWD, queueURL, queueName); - producer = new Producer(qm, queueName); - AnalysisLogger.getLogger().info("Producer OK"); - } - - private void createClientConsumer() throws Exception { - AnalysisLogger.getLogger().info("Creating Response Message Queue and Consumer"); - // create the listener - statuslistener = new StatusListener(); - QueueManager qm1 = new QueueManager(); - qm1.createAndConnect(queueUSER, queuePWD, queueURL, queueResponse); - consumer = new Consumer(qm1, statuslistener, statuslistener, queueResponse); - AnalysisLogger.getLogger().info("Consumers OK"); - } - - boolean activeMessages[]; - public int resentMessages[]; - - private void sendMessages() throws Exception { - int i = 0; - numberOfMessages = arguments.size(); - totalNumberOfMessages = numberOfMessages; - AnalysisLogger.getLogger().info("Messages To Send " + numberOfMessages); - activeMessages = new boolean[numberOfMessages]; - resentMessages = new int[numberOfMessages]; - for (String argum : arguments) { - Map inputs = generateInputMessage(filenames, fileurls, outputDir, script, argum, i, scope, serviceClass, serviceName, owner, remoteDir, session, configuration, deletefiles, false); - producer.sendMessage(inputs, 0); - AnalysisLogger.getLogger().info("Send " + i); - activeMessages[i] = true; - i++; - } - AnalysisLogger.getLogger().info("Messages Sent " + numberOfMessages); - } - - private Map generateInputMessage(Object filenames, Object fileurls, String outputDir, String script, String argum, int i, String scope, String serviceClass, String serviceName, String owner, String remoteDir, String session, String configuration, boolean deletefiles, boolean duplicateMessage) { - Map inputs = new HashMap(); - - inputs.put(ATTRIBUTE.FILE_NAMES.name(), filenames); - inputs.put(ATTRIBUTE.FILE_URLS.name(), fileurls); - inputs.put(ATTRIBUTE.OUTPUTDIR.name(), outputDir); - inputs.put(ATTRIBUTE.SCRIPT.name(), script); - inputs.put(ATTRIBUTE.ARGUMENTS.name(), argum + " " + duplicateMessage); - inputs.put(ATTRIBUTE.ORDER.name(), "" + i); - inputs.put(ATTRIBUTE.SCOPE.name(), scope); - inputs.put(ATTRIBUTE.SERVICE_CLASS.name(), serviceClass); - inputs.put(ATTRIBUTE.SERVICE_NAME.name(), serviceName); - inputs.put(ATTRIBUTE.OWNER.name(), owner); - inputs.put(ATTRIBUTE.REMOTEDIR.name(), remoteDir); - inputs.put(ATTRIBUTE.CLEAN_CACHE.name(), "" + deletefiles); - inputs.put(ATTRIBUTE.QSESSION.name(), session); - inputs.put(ATTRIBUTE.CONFIGURATION.name(), configuration); - inputs.put(ATTRIBUTE.TOPIC_RESPONSE_NAME.name(), queueResponse); - inputs.put(ATTRIBUTE.QUEUE_USER.name(), queueUSER); - inputs.put(ATTRIBUTE.QUEUE_PASSWORD.name(), queuePWD); - inputs.put(ATTRIBUTE.QUEUE_URL.name(), queueURL); - return inputs; - } - - private Map generateWorkerInput(String queueName, String queueUser, String queuePassword, String queueURL, String queueResponse, String session, String purge) { - - Map inputs = new HashMap(); - - inputs.put(ATTRIBUTE.TOPIC_NAME.name(), ScriptIOWorker.toInputString(queueName)); - inputs.put(ATTRIBUTE.QUEUE_USER.name(), ScriptIOWorker.toInputString(queueUser)); - inputs.put(ATTRIBUTE.QUEUE_PASSWORD.name(), ScriptIOWorker.toInputString(queuePassword)); - inputs.put(ATTRIBUTE.QUEUE_URL.name(), ScriptIOWorker.toInputString(queueURL)); - inputs.put(ATTRIBUTE.TOPIC_RESPONSE_NAME.name(), ScriptIOWorker.toInputString(queueResponse)); - inputs.put(ATTRIBUTE.QSESSION.name(), session); - inputs.put(ATTRIBUTE.ERASE.name(), purge); - return inputs; - } - - public class Broadcaster extends TimerTask { - - @Override - public void run() { - try { - AnalysisLogger.getLogger().info("(((((((((((((((((((((((((((------Broadcasting Information To Watchers------)))))))))))))))))))))))))))"); - broadcastListenCommandToExecutorNodes(); - AnalysisLogger.getLogger().info("(((((((((((((((((((((((((((------END Broadcasting Information To Watchers------)))))))))))))))))))))))))))"); - } catch (Exception e) { - e.printStackTrace(); - AnalysisLogger.getLogger().info("--------------------------------Broadcaster: Error Sending Listen Message to Executors------)))))))))))))))))))))))))))"); - } - } - - } - - public class ComputationTimerWatcher extends TimerTask { - - long maxTime; - long lastTimeClock; - - public ComputationTimerWatcher(long maxtime) { - this.maxTime = maxtime; - this.lastTimeClock = System.currentTimeMillis(); - } - - public void reset() { - lastTimeClock = System.currentTimeMillis(); - } - - public void setmaxTime(long maxTime) { - this.maxTime = maxTime; - } - - @Override - public void run() { - try { - long t0 = System.currentTimeMillis(); - AnalysisLogger.getLogger().info("Computation Watcher Timing Is " + (t0 - lastTimeClock)+" max computation time is "+maxTime); - if ((t0 - lastTimeClock) > maxTime) { - AnalysisLogger.getLogger().info("Computation Watcher - Computation Timeout: Closing Queue Job Manager!!!"); - abort(); - } - } catch (Exception e) { - e.printStackTrace(); - AnalysisLogger.getLogger().info("Error Taking clock"); - } - } - - } - - public synchronized void abort() { - AnalysisLogger.getLogger().info("Computation Aborted"); - this.abort = true; - } - - public class StatusListener implements MessageListener, ExceptionListener { - - private QueueWorkerWatcher[] watchers; - - synchronized public void onException(JMSException ex) { - abort(); - AnalysisLogger.getLogger().info("JMS Exception occured. Shutting down client."); - } - - private synchronized void addWatcher(int order) { - if (watchers == null) - watchers = new QueueWorkerWatcher[totalNumberOfMessages]; - - QueueWorkerWatcher watcher = watchers[order]; - if (watcher != null) { - destroyWatcher(order); - } - - Map message = generateInputMessage(filenames, fileurls, outputDir, script, arguments.get(order), order, scope, serviceClass, serviceName, owner, remoteDir, session, configuration, deletefiles, true); - watchers[order] = new QueueWorkerWatcher(producer, message, order); - } - - private synchronized void resetWatcher(int order) { - if (watchers == null) - watchers = new QueueWorkerWatcher[totalNumberOfMessages]; - else if (watchers[order] != null) - watchers[order].resetTime(); - } - - private synchronized void destroyWatcher(int order) { - if (watchers != null && watchers[order] != null) { - if (watchers[order].hasResent()) - resentMessages[order] = resentMessages[order] + 1; - - watchers[order].destroy(); - watchers[order] = null; - AnalysisLogger.getLogger().info("Destroyed Watcher number " + order); - } - } - - public synchronized void destroyAllWatchers() { - if (watchers != null) { - for (int i = 0; i < watchers.length; i++) { - destroyWatcher(i); - } - } - } - - public void onMessage(Message message) { - - // get message - try { - - HashMap details = (HashMap) (HashMap) message.getObjectProperty(ATTRIBUTE.CONTENT.name()); - String status = (String) details.get(ATTRIBUTE.STATUS.name()); - String order = "" + details.get(ATTRIBUTE.ORDER.name()); - String nodeaddress = (String) details.get(ATTRIBUTE.NODE.name()); - String msession = (String) details.get(ATTRIBUTE.QSESSION.name()); - Object error = details.get(ATTRIBUTE.ERROR.name()); - - AnalysisLogger.getLogger().info("Current session " + session); - if ((msession != null) && (msession.equals(session))) { - AnalysisLogger.getLogger().info("Session " + session + " is right - acknowledge"); - message.acknowledge(); - AnalysisLogger.getLogger().info("Session " + session + " acknowledged"); - int orderInt = -1; - try { - orderInt = Integer.parseInt(order); - } catch (Exception e3) { - e3.printStackTrace(); - } - if (orderInt > -1) { - - // reset the watcher - if (computationWatcher!=null) - computationWatcher.reset(); - - AnalysisLogger.getLogger().info("Task number " + order + " is " + status + " on node " + nodeaddress + " and session " + session); - - if (status.equals(ATTRIBUTE.STARTED.name())) { - computingNodes++; - addWatcher(orderInt); - } - if (status.equals(ATTRIBUTE.PROCESSING.name())) { - - resetWatcher(orderInt); - } else if (status.equals(ATTRIBUTE.FINISHED.name())) { - - totalmessages++; - computingNodes--; - destroyWatcher(orderInt); - if (numberOfMessages > 0) - numberOfMessages--; - - AnalysisLogger.getLogger().info("Remaining " + numberOfMessages + " messages to manage"); - activeMessages[orderInt] = false; - - } else if (status.equals(ATTRIBUTE.FATAL_ERROR.name())) { - if (error!=null) - AnalysisLogger.getLogger().info("REPORTED FATAL_ERROR on " +nodeaddress+" : "); - AnalysisLogger.getLogger().info(error); - - computingNodes--; - if (maxFailureTries <= 0) { - AnalysisLogger.getLogger().info("Too much Failures - Aborting"); - destroyAllWatchers(); - abort(); - } else { - AnalysisLogger.getLogger().info("Failure Occurred - Now Resending Message " + orderInt); - resentMessages[orderInt] = resentMessages[orderInt] + 1; - maxFailureTries--; - // resend message - Map retrymessage = generateInputMessage(filenames, fileurls, outputDir, script, arguments.get(orderInt), orderInt, scope, serviceClass, serviceName, owner, remoteDir, session, configuration, deletefiles, true); - producer.sendMessage(retrymessage, QCONSTANTS.timeToLive); - AnalysisLogger.getLogger().info("Failure Occurred - Resent Message " + orderInt); - } - - } - - } else - AnalysisLogger.getLogger().info("Ignoring message " + order + " with status " + status); - } else { - AnalysisLogger.getLogger().info("wrong session " + msession + " ignoring message"); -// consumer.manager.session.recover(); - } - } catch (Exception e) { - - AnalysisLogger.getLogger().info("Error reading details ", e); - AnalysisLogger.getLogger().info("...Aborting Job..."); - abort(); - - } - } - } - -} diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueWorkerWatcher.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueWorkerWatcher.java deleted file mode 100644 index e7b0dc0..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueWorkerWatcher.java +++ /dev/null @@ -1,76 +0,0 @@ -package org.gcube.dataanalysis.executor.job.management; - -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; - -import javax.jms.Message; - -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.dataanalysis.executor.messagequeue.ATTRIBUTE; -import org.gcube.dataanalysis.executor.messagequeue.Producer; -import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS; - -public class QueueWorkerWatcher { - - protected int maxwaitingTime = 2*QueueJobManager.queueWatcherMaxwaitingTime; - private long lastTimeClock; - Timer watcher; - Producer producer; - Map message; - public boolean resent=false; - int order; - - public QueueWorkerWatcher(Producer producer, Map message, int order) { - this.producer = producer; - this.message = message; - resent=false; - this.order = order; - - watcher = new Timer(); - watcher.schedule(new Controller(), 0, QCONSTANTS.refreshStatusTime); - resetTime(); - } - - public synchronized void resetTime() { - lastTimeClock = System.currentTimeMillis(); - } - - public synchronized void destroy() { - if (watcher != null) { - watcher.cancel(); - watcher.purge(); - watcher = null; - } - } - - public boolean hasResent(){ - return resent; - } - - private class Controller extends TimerTask { - - @Override - public void run() { - try { - long t0 = System.currentTimeMillis(); - AnalysisLogger.getLogger().debug("Watcher "+order+" Timing Is "+(t0 - lastTimeClock)+ " max waiting time: "+maxwaitingTime); - if ((t0 - lastTimeClock) > maxwaitingTime) { - - AnalysisLogger.getLogger().info("Watcher "+order+" Time Is Over "+(t0 - lastTimeClock)); - - AnalysisLogger.getLogger().info("Watcher "+order+" Re-Sending Message "+message); - producer.sendMessage(message, QCONSTANTS.timeToLive); -// QueueJobManager.resentMessages[Integer.parseInt(""+message.get(ATTRIBUTE.ORDER.name()))]=QueueJobManager.resentMessages[Integer.parseInt(""+message.get(ATTRIBUTE.ORDER.name()))]+1; - resent = true; - AnalysisLogger.getLogger().info("Watcher "+order+" Destroying watcher"); - destroy(); - - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - } -} diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/WorkerWatcher.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/WorkerWatcher.java deleted file mode 100644 index 4b28553..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/WorkerWatcher.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.gcube.dataanalysis.executor.job.management; - -import org.apache.log4j.Logger; -import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; -import org.gcube.vremanagement.executor.plugin.PluginState; - -public class WorkerWatcher { - private static int maxTries = 15; - private int currentTries; - - Logger logger; - SmartExecutorProxy proxy; - String excecutionIdentifier; - - public WorkerWatcher(SmartExecutorProxy proxy, String excecutionIdentifier, Logger logger){ - this.proxy = proxy; - this.excecutionIdentifier = excecutionIdentifier; - this.logger = logger; - currentTries = 1; - } - - public PluginState getState(){ - try{ - return proxy.getState(excecutionIdentifier); - }catch(Exception e){ - logger.error("Error in getting state: recover try number "+currentTries,e); - currentTries++; - if (currentTries>maxTries){ - return PluginState.FAILED; - } - else return PluginState.RUNNING; - } - - } - -} diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/LWR.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/LWR.java index 350faa3..62f1041 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/LWR.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/LWR.java @@ -28,7 +28,6 @@ import org.gcube.dataanalysis.ecoengine.utils.DatabaseFactory; import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils; import org.gcube.dataanalysis.ecoengine.utils.Transformations; import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; -import org.gcube.dataanalysis.executor.job.management.QueueJobManager; import org.gcube.dataanalysis.executor.scripts.OSCommand; import org.hibernate.SessionFactory; @@ -313,10 +312,6 @@ public class LWR extends ActorNode { prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob; D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=1; - prevbroadcastTimePeriod = QueueJobManager.broadcastTimePeriod; - QueueJobManager.broadcastTimePeriod=4*3600000; - prevmaxNumberOfStages = QueueJobManager.maxNumberOfStages; - QueueJobManager.maxNumberOfStages=10000; AnalysisLogger.getLogger().info("Creating Destination Table " + destinationTable); try{ @@ -364,11 +359,7 @@ public class LWR extends ActorNode { boolean haspostprocessed = false; @Override public void postProcess(boolean manageDuplicates, boolean manageFault) { -// D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=previousnumberofspeciesperjob; - QueueJobManager.broadcastTimePeriod=prevbroadcastTimePeriod; - QueueJobManager.maxNumberOfStages=prevmaxNumberOfStages; D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages; -// select fam, sf, bs, spc , lwr, priormeanlog10a, priorsdlog10a, priormeanb, priorsdb,note into testfam from (select distinct(spc) as a, fam, sf, bs, spc , lwr, priormeanlog10a, priorsdlog10a, priormeanb, priorsdb,note from lwr1 order by fam) as d haspostprocessed=true; } diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java index d419805..c5e833d 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java @@ -11,7 +11,6 @@ import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; import org.gcube.dataanalysis.ecoengine.transducers.OccurrencePointsMerger; import org.gcube.dataanalysis.ecoengine.utils.Transformations; import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; -import org.gcube.dataanalysis.executor.job.management.QueueJobManager; import org.hibernate.SessionFactory; public class OccurrenceMergingNode extends ActorNode { @@ -100,10 +99,6 @@ public class OccurrenceMergingNode extends ActorNode { processor.takeFullRanges(); prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob; D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=100; - prevbroadcastTimePeriod = QueueJobManager.broadcastTimePeriod; - QueueJobManager.broadcastTimePeriod=4*3600000; - prevmaxNumberOfStages = QueueJobManager.maxNumberOfStages; - QueueJobManager.maxNumberOfStages=100000; } @Override @@ -123,9 +118,6 @@ public class OccurrenceMergingNode extends ActorNode { @Override public void postProcess(boolean manageDuplicates, boolean manageFault) { - QueueJobManager.broadcastTimePeriod=prevbroadcastTimePeriod; - QueueJobManager.maxNumberOfStages=prevmaxNumberOfStages; - D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages; processor.shutdown(); try { diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/bionym/BionymFlexibleWorkflowTransducer.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/bionym/BionymFlexibleWorkflowTransducer.java index 38a2861..c2b6ea8 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/bionym/BionymFlexibleWorkflowTransducer.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/bionym/BionymFlexibleWorkflowTransducer.java @@ -24,7 +24,6 @@ import org.gcube.dataanalysis.ecoengine.utils.DatabaseFactory; import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils; import org.gcube.dataanalysis.ecoengine.utils.Transformations; import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; -import org.gcube.dataanalysis.executor.job.management.QueueJobManager; import org.gcube.dataanalysis.executor.nodes.transducers.bionym.abstracts.MatcherOutput; import org.gcube.dataanalysis.executor.nodes.transducers.bionym.abstracts.SingleEntry; import org.gcube.dataanalysis.executor.nodes.transducers.bionym.implementations.matchers.FuzzyMatcher; @@ -387,10 +386,6 @@ public class BionymFlexibleWorkflowTransducer extends ActorNode { prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob; D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=50; - prevbroadcastTimePeriod = QueueJobManager.broadcastTimePeriod; - QueueJobManager.broadcastTimePeriod=30*60000; - prevmaxNumberOfStages = QueueJobManager.maxNumberOfStages; - QueueJobManager.maxNumberOfStages=10000; AnalysisLogger.getLogger().info("Destination Table Created! Addressing " + rawnamescount + " names"); } @@ -426,8 +421,6 @@ public class BionymFlexibleWorkflowTransducer extends ActorNode { @Override public void postProcess(boolean manageDuplicates, boolean manageFault) { - QueueJobManager.broadcastTimePeriod=prevbroadcastTimePeriod; - QueueJobManager.maxNumberOfStages=prevmaxNumberOfStages; D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages; haspostprocessed = true; } diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/bionym/BionymWorkflow.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/bionym/BionymWorkflow.java index 2cdd83b..409bae4 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/bionym/BionymWorkflow.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/bionym/BionymWorkflow.java @@ -25,7 +25,6 @@ import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils; import org.gcube.dataanalysis.ecoengine.utils.Transformations; import org.gcube.dataanalysis.ecoengine.utils.Tuple; import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; -import org.gcube.dataanalysis.executor.job.management.QueueJobManager; import org.gcube.dataanalysis.executor.scripts.OSCommand; import org.hibernate.SessionFactory; @@ -347,10 +346,6 @@ public class BionymWorkflow extends ActorNode { prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob; D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=1; - prevbroadcastTimePeriod = QueueJobManager.broadcastTimePeriod; - QueueJobManager.broadcastTimePeriod=4*3600000; - prevmaxNumberOfStages = QueueJobManager.maxNumberOfStages; - QueueJobManager.maxNumberOfStages=10000; AnalysisLogger.getLogger().info("Destination Table Created! Addressing " + rawnamescount + " names"); @@ -387,8 +382,6 @@ public class BionymWorkflow extends ActorNode { @Override public void postProcess(boolean manageDuplicates, boolean manageFault) { - QueueJobManager.broadcastTimePeriod=prevbroadcastTimePeriod; - QueueJobManager.maxNumberOfStages=prevmaxNumberOfStages; D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages; haspostprocessed = true; }