deleted dependency from smart-executor
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@132175 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
809c7adebb
commit
c46b66041c
|
@ -15,7 +15,6 @@ import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.ServiceParameters;
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode;
|
import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode;
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.Generator;
|
import org.gcube.dataanalysis.ecoengine.interfaces.Generator;
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.GenericAlgorithm;
|
import org.gcube.dataanalysis.ecoengine.interfaces.GenericAlgorithm;
|
||||||
import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgent;
|
|
||||||
import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgentWPS;
|
import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgentWPS;
|
||||||
|
|
||||||
public class D4ScienceDistributedProcessing implements Generator {
|
public class D4ScienceDistributedProcessing implements Generator {
|
||||||
|
|
|
@ -1,149 +0,0 @@
|
||||||
package org.gcube.dataanalysis.executor.generators;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.datatypes.ServiceType;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.ServiceParameters;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.Generator;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.GenericAlgorithm;
|
|
||||||
import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgent;
|
|
||||||
|
|
||||||
public class D4ScienceDistributedProcessingExecutor implements Generator {
|
|
||||||
|
|
||||||
public static int maxMessagesAllowedPerJob = 20;
|
|
||||||
public static boolean forceUpload = true;
|
|
||||||
public static String defaultContainerFolder = "PARALLEL_PROCESSING";
|
|
||||||
protected AlgorithmConfiguration config;
|
|
||||||
protected ActorNode distributedModel;
|
|
||||||
protected String mainclass;
|
|
||||||
DistributedProcessingAgent agent;
|
|
||||||
|
|
||||||
public D4ScienceDistributedProcessingExecutor(){
|
|
||||||
}
|
|
||||||
|
|
||||||
public D4ScienceDistributedProcessingExecutor(AlgorithmConfiguration config) {
|
|
||||||
this.config = config;
|
|
||||||
|
|
||||||
AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void compute() throws Exception {
|
|
||||||
try {
|
|
||||||
agent.compute();
|
|
||||||
distributedModel.postProcess(agent.hasResentMessages(),false);
|
|
||||||
} catch (Exception e) {
|
|
||||||
distributedModel.postProcess(false,true);
|
|
||||||
AnalysisLogger.getLogger().error("ERROR: An Error occurred ", e);
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<StatisticalType> getInputParameters() {
|
|
||||||
|
|
||||||
List<StatisticalType> distributionModelParams = new ArrayList<StatisticalType>();
|
|
||||||
distributionModelParams.add(new ServiceType(ServiceParameters.USERNAME,"ServiceUserName","The final user Name"));
|
|
||||||
|
|
||||||
return distributionModelParams;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getResources() {
|
|
||||||
return agent.getResources();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public float getStatus() {
|
|
||||||
return agent.getStatus();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public StatisticalType getOutput() {
|
|
||||||
return distributedModel.getOutput();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ALG_PROPS[] getSupportedAlgorithms() {
|
|
||||||
ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON};
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public INFRASTRUCTURE getInfrastructure() {
|
|
||||||
return INFRASTRUCTURE.D4SCIENCE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void init() throws Exception {
|
|
||||||
|
|
||||||
Properties p = AlgorithmConfiguration.getProperties(config.getConfigPath() + AlgorithmConfiguration.nodeAlgorithmsFile);
|
|
||||||
String model = config.getModel();
|
|
||||||
String algorithm = null;
|
|
||||||
if ((model!=null) && (model.length()>0))
|
|
||||||
algorithm = model;
|
|
||||||
else
|
|
||||||
algorithm=config.getAgent();
|
|
||||||
|
|
||||||
mainclass = p.getProperty(algorithm);
|
|
||||||
distributedModel = (ActorNode) Class.forName(mainclass).newInstance();
|
|
||||||
distributedModel.setup(config);
|
|
||||||
String scope = config.getGcubeScope();
|
|
||||||
AnalysisLogger.getLogger().info("Using the following scope for the computation:"+scope);
|
|
||||||
String owner = config.getParam("ServiceUserName");
|
|
||||||
int leftNum = distributedModel.getNumberOfLeftElements();
|
|
||||||
int rightNum = distributedModel.getNumberOfRightElements();
|
|
||||||
agent = new DistributedProcessingAgent(config, scope, owner, mainclass, config.getPersistencePath(), algorithm, defaultContainerFolder, maxMessagesAllowedPerJob, forceUpload, leftNum, rightNum,config.getTaskID());
|
|
||||||
agent.setLogger(AnalysisLogger.getLogger());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setConfiguration(AlgorithmConfiguration config) {
|
|
||||||
this.config = config;
|
|
||||||
AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void shutdown() {
|
|
||||||
try {
|
|
||||||
agent.shutdown();
|
|
||||||
} catch (Exception e) {
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
distributedModel.stop();
|
|
||||||
} catch (Exception e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getLoad() {
|
|
||||||
return agent.getLoad();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getResourceLoad() {
|
|
||||||
return agent.getResourceLoad();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GenericAlgorithm getAlgorithm() {
|
|
||||||
return distributedModel;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDescription() {
|
|
||||||
return "A D4Science Cloud Processor for Species Distributions";
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,300 +0,0 @@
|
||||||
package org.gcube.dataanalysis.executor.job.management;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
import org.gcube.contentmanagement.graphtools.utils.HttpRequest;
|
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.connectors.livemonitor.ResourceLoad;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.connectors.livemonitor.Resources;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.Operations;
|
|
||||||
|
|
||||||
import com.thoughtworks.xstream.XStream;
|
|
||||||
|
|
||||||
public class DistributedProcessingAgent {
|
|
||||||
|
|
||||||
|
|
||||||
protected QueueJobManager jobManager;
|
|
||||||
protected boolean deletefiles = true;
|
|
||||||
protected String mainclass;
|
|
||||||
public int maxElementsAllowedPerJob = 20;
|
|
||||||
protected boolean forceUpload = true;
|
|
||||||
protected boolean stop;
|
|
||||||
protected String gscope;
|
|
||||||
protected String userName;
|
|
||||||
protected String pathToLib;
|
|
||||||
protected String modelName;
|
|
||||||
protected String containerFolder;
|
|
||||||
protected Serializable configurationFile;
|
|
||||||
protected int rightSetNumberOfElements;
|
|
||||||
protected int leftSetNumberOfElements;
|
|
||||||
protected List<String> endpoints;
|
|
||||||
protected int subdivisiondiv;
|
|
||||||
protected String sessionID;
|
|
||||||
|
|
||||||
protected static String defaultJobOutput = "execution.output";
|
|
||||||
protected static String defaultScriptFile = "script";
|
|
||||||
protected Logger logger;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A distributed processing agent. Performs a distributed computation doing the MAP of the product of two sets: A and B
|
|
||||||
* Splits over B : A x B1 , A x B2, ... , A x Bn
|
|
||||||
* Prepares a script to be executed on remote nodes
|
|
||||||
* The computation is then sent to remote processors.
|
|
||||||
*/
|
|
||||||
public DistributedProcessingAgent(Serializable configurationFile,
|
|
||||||
String gCubeScope,
|
|
||||||
String computationOwner,
|
|
||||||
String mainClass,
|
|
||||||
String pathToLibFolder,
|
|
||||||
String modelName,
|
|
||||||
String containerFolder,
|
|
||||||
int maxElementsPerJob,
|
|
||||||
boolean forceReUploadofLibs,
|
|
||||||
int leftSetNumberOfElements,
|
|
||||||
int rightSetNumberOfElements,
|
|
||||||
String sessionID
|
|
||||||
) {
|
|
||||||
this.stop = false;
|
|
||||||
this.deletefiles = true;
|
|
||||||
this.gscope=gCubeScope;
|
|
||||||
this.mainclass=mainClass;
|
|
||||||
this.maxElementsAllowedPerJob=maxElementsPerJob;
|
|
||||||
this.forceUpload=forceReUploadofLibs;
|
|
||||||
this.configurationFile=configurationFile;
|
|
||||||
this.rightSetNumberOfElements=rightSetNumberOfElements;
|
|
||||||
this.leftSetNumberOfElements=leftSetNumberOfElements;
|
|
||||||
this.userName=computationOwner;
|
|
||||||
this.pathToLib=pathToLibFolder;
|
|
||||||
this.modelName=modelName;
|
|
||||||
this.containerFolder=containerFolder;
|
|
||||||
this.sessionID = sessionID;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setLogger(Logger logger){
|
|
||||||
this.logger=logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setEndPoints(List<String> endpoints){
|
|
||||||
this.endpoints=endpoints;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasResentMessages(){
|
|
||||||
return jobManager.hasResentMessages();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void compute() throws Exception {
|
|
||||||
try {
|
|
||||||
if (logger == null){
|
|
||||||
logger = AnalysisLogger.getLogger();
|
|
||||||
}
|
|
||||||
if (gscope == null)
|
|
||||||
throw new Exception("Null Scope");
|
|
||||||
AnalysisLogger.getLogger().debug("SCOPE: "+gscope);
|
|
||||||
if (endpoints != null) {
|
|
||||||
|
|
||||||
/*
|
|
||||||
List<EndpointReferenceType> eprtList = new ArrayList<EndpointReferenceType>();
|
|
||||||
for (String ep : endpoints) {
|
|
||||||
eprtList.add(new EndpointReferenceType(new Address(ep)));
|
|
||||||
}
|
|
||||||
|
|
||||||
jobManager = new QueueJobManager(gscope, endpoints.size(), eprtList);
|
|
||||||
*/
|
|
||||||
jobManager = new QueueJobManager(gscope, endpoints.size(), endpoints,sessionID);
|
|
||||||
} else
|
|
||||||
jobManager = new QueueJobManager(gscope, 1,sessionID);
|
|
||||||
|
|
||||||
int numberOfResources = jobManager.getNumberOfNodes();
|
|
||||||
// we split along right dimension so if elements are less than nodes, we should reduce the number of nodes
|
|
||||||
if (numberOfResources > 0) {
|
|
||||||
// chunkize the number of species in order to lower the computational effort of the workers
|
|
||||||
subdivisiondiv = rightSetNumberOfElements / (numberOfResources * maxElementsAllowedPerJob);
|
|
||||||
int rest = rightSetNumberOfElements % (numberOfResources * maxElementsAllowedPerJob);
|
|
||||||
if (rest > 0)
|
|
||||||
subdivisiondiv++;
|
|
||||||
if (subdivisiondiv == 0)
|
|
||||||
subdivisiondiv = 1;
|
|
||||||
|
|
||||||
executeWork(leftSetNumberOfElements, rightSetNumberOfElements, 0, subdivisiondiv, deletefiles, forceUpload);
|
|
||||||
|
|
||||||
if (jobManager.wasAborted()) {
|
|
||||||
logger.debug("Warning: Job was aborted");
|
|
||||||
// distributionModel.postProcess(false,true);
|
|
||||||
throw new Exception("Job System Error");
|
|
||||||
}
|
|
||||||
else{
|
|
||||||
//postprocess
|
|
||||||
// distributionModel.postProcess(jobManager.hasResentMessages(),false);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
logger.debug("Warning: No Workers available");
|
|
||||||
throw new Exception("No Workers available");
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("ERROR: An Error occurred ", e);
|
|
||||||
e.printStackTrace();
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void executeWork(int leftNum, int rightNum, int offset, int numberOfResources, boolean deletefiles, boolean forceUpload) throws Exception {
|
|
||||||
|
|
||||||
String owner = userName;
|
|
||||||
|
|
||||||
int[] chunkSizes = Operations.takeChunks(rightNum, numberOfResources);
|
|
||||||
List<String> arguments = new ArrayList<String>();
|
|
||||||
// chunkize respect to the cells: take a chunk of cells vs all species at each node!
|
|
||||||
|
|
||||||
for (int i = 0; i < chunkSizes.length; i++) {
|
|
||||||
String argumentString = "0 " + leftNum + " " + offset + " " + chunkSizes[i] + " ./ "+mainclass;
|
|
||||||
arguments.add(argumentString);
|
|
||||||
offset += chunkSizes[i];
|
|
||||||
logger.debug("Generator-> Argument " + i + ": " + argumentString);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (owner == null)
|
|
||||||
throw new Exception("Null Owner");
|
|
||||||
|
|
||||||
String pathToDir = new File (pathToLib, containerFolder).getAbsolutePath();
|
|
||||||
|
|
||||||
if (!(new File(pathToDir).exists()))
|
|
||||||
throw new Exception("No Implementation of node-model found for algorithm " + pathToDir);
|
|
||||||
|
|
||||||
if (mainclass == null)
|
|
||||||
throw new Exception("No mainClass found for algorithm " + pathToDir);
|
|
||||||
|
|
||||||
buildScriptFile(modelName, defaultJobOutput, pathToDir, mainclass);
|
|
||||||
|
|
||||||
jobManager.uploadAndExecuteChunkized(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, owner, pathToDir, "/" + modelName + "/", "./", getScriptName(mainclass), arguments, new XStream().toXML(configurationFile), deletefiles, forceUpload);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getScriptName(String fullMainClass){
|
|
||||||
String scriptName = defaultScriptFile+"_"+fullMainClass.substring(fullMainClass.lastIndexOf(".")+1)+".sh";
|
|
||||||
return scriptName;
|
|
||||||
}
|
|
||||||
// builds a job.sh
|
|
||||||
public void buildScriptFile(String jobName, String jobOutput, String jarsPath, String fullMainClass) throws Exception {
|
|
||||||
File expectedscript = new File(jarsPath,getScriptName(fullMainClass));
|
|
||||||
if (!expectedscript.exists()) {
|
|
||||||
StringBuffer sb = new StringBuffer();
|
|
||||||
sb.append("#!/bin/sh\n");
|
|
||||||
sb.append("# " + jobName + "\n");
|
|
||||||
sb.append("cd $1\n");
|
|
||||||
sb.append("\n");
|
|
||||||
sb.append("java -Xmx1024M -classpath ./:");
|
|
||||||
File jarsPathF = new File(jarsPath);
|
|
||||||
File[] files = jarsPathF.listFiles();
|
|
||||||
|
|
||||||
for (File jar : files) {
|
|
||||||
|
|
||||||
if (jar.getName().endsWith(".jar")) {
|
|
||||||
sb.append("./" + jar.getName());
|
|
||||||
sb.append(":");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sb.deleteCharAt(sb.length() - 1);
|
|
||||||
sb.append(" " + fullMainClass + " $2 " + jobOutput);
|
|
||||||
sb.append("\n");
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().trace("D4ScienceGenerator->Generating script in " + expectedscript.getAbsolutePath());
|
|
||||||
FileTools.saveString(expectedscript.getAbsolutePath(), sb.toString(), true, "UTF-8");
|
|
||||||
}
|
|
||||||
AnalysisLogger.getLogger().trace("D4ScienceGenerator->Script " + expectedscript.getAbsolutePath()+" yet exists!");
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getResources() {
|
|
||||||
Resources res = new Resources();
|
|
||||||
try {
|
|
||||||
int activeNodes = jobManager.getActiveNodes();
|
|
||||||
for (int i = 0; i < activeNodes; i++) {
|
|
||||||
try {
|
|
||||||
res.addResource("Worker_" + (i + 1), 100);
|
|
||||||
} catch (Exception e1) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
AnalysisLogger.getLogger().debug("D4ScienceGenerator->active nodes not ready");
|
|
||||||
}
|
|
||||||
if ((res != null) && (res.list != null))
|
|
||||||
return HttpRequest.toJSon(res.list).replace("resId", "resID");
|
|
||||||
else
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
|
|
||||||
public float getStatus() {
|
|
||||||
try {
|
|
||||||
if (stop)
|
|
||||||
return 100f;
|
|
||||||
else
|
|
||||||
if (jobManager!=null)
|
|
||||||
return Math.max(0.5f, jobManager.getStatus() * 100f);
|
|
||||||
else
|
|
||||||
return 0;
|
|
||||||
} catch (Exception e) {
|
|
||||||
return 0f;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public ALG_PROPS[] getSupportedAlgorithms() {
|
|
||||||
ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON};
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
|
|
||||||
public INFRASTRUCTURE getInfrastructure() {
|
|
||||||
return INFRASTRUCTURE.D4SCIENCE;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void shutdown() {
|
|
||||||
|
|
||||||
try {
|
|
||||||
jobManager.stop();
|
|
||||||
} catch (Exception e) {
|
|
||||||
}
|
|
||||||
stop = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getLoad() {
|
|
||||||
long tk = System.currentTimeMillis();
|
|
||||||
ResourceLoad rs = null;
|
|
||||||
if (jobManager!=null)
|
|
||||||
rs = new ResourceLoad(tk, jobManager.currentNumberOfStages*subdivisiondiv);
|
|
||||||
else
|
|
||||||
rs = new ResourceLoad(tk, 0);
|
|
||||||
return rs.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
private long lastTime;
|
|
||||||
private int lastProcessed;
|
|
||||||
public String getResourceLoad() {
|
|
||||||
long thisTime = System.currentTimeMillis();
|
|
||||||
int processedRecords = 0;
|
|
||||||
if ((jobManager!=null) && (subdivisiondiv>0))
|
|
||||||
processedRecords = jobManager.currentNumberOfStages*subdivisiondiv;
|
|
||||||
|
|
||||||
int estimatedProcessedRecords = 0;
|
|
||||||
if (processedRecords == lastProcessed) {
|
|
||||||
estimatedProcessedRecords = Math.round(((float) thisTime * (float) lastProcessed) / (float) lastTime);
|
|
||||||
} else {
|
|
||||||
lastProcessed = processedRecords;
|
|
||||||
estimatedProcessedRecords = lastProcessed;
|
|
||||||
}
|
|
||||||
lastTime = thisTime;
|
|
||||||
ResourceLoad rs = new ResourceLoad(thisTime, estimatedProcessedRecords);
|
|
||||||
return rs.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,915 +0,0 @@
|
||||||
package org.gcube.dataanalysis.executor.job.management;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Timer;
|
|
||||||
import java.util.TimerTask;
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import javax.jms.ExceptionListener;
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
import javax.jms.Message;
|
|
||||||
import javax.jms.MessageListener;
|
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
|
||||||
import org.gcube.common.clients.ProxyBuilderImpl;
|
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint;
|
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
|
|
||||||
import org.gcube.common.scope.api.ScopeProvider;
|
|
||||||
import org.gcube.contentmanagement.blobstorage.resource.StorageObject;
|
|
||||||
import org.gcube.contentmanagement.blobstorage.service.IClient;
|
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
|
||||||
import org.gcube.contentmanager.storageclient.wrapper.AccessType;
|
|
||||||
import org.gcube.contentmanager.storageclient.wrapper.MemoryType;
|
|
||||||
import org.gcube.contentmanager.storageclient.wrapper.StorageClient;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.Operations;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.ATTRIBUTE;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.Consumer;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.Producer;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.QueueManager;
|
|
||||||
import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker;
|
|
||||||
import org.gcube.resources.discovery.client.api.DiscoveryClient;
|
|
||||||
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
|
|
||||||
import org.gcube.vremanagement.executor.api.SmartExecutor;
|
|
||||||
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
|
||||||
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
|
|
||||||
import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery;
|
|
||||||
import org.gcube.vremanagement.executor.client.plugins.query.filter.ListEndpointDiscoveryFilter;
|
|
||||||
import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter;
|
|
||||||
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
|
|
||||||
import static org.gcube.resources.discovery.icclient.ICFactory.*;
|
|
||||||
|
|
||||||
public class QueueJobManager {
|
|
||||||
|
|
||||||
|
|
||||||
// broadcast message period
|
|
||||||
public static int broadcastTimePeriod = 120000;
|
|
||||||
// max silence before computation stops
|
|
||||||
public static int maxSilenceTimeBeforeComputationStop = 10800000;
|
|
||||||
// max number of retries per computation step
|
|
||||||
public static int maxNumberOfComputationRetries = 1;
|
|
||||||
// period for controlling a node activity
|
|
||||||
public static int computationWatcherTimerPeriod = 120000;
|
|
||||||
// max number of message to put in a queue
|
|
||||||
// protected static int maxNumberOfMessages = 20;
|
|
||||||
public static int maxNumberOfStages = Integer.MAX_VALUE;//10;
|
|
||||||
// timeout for resending a message
|
|
||||||
public static int queueWatcherMaxwaitingTime = QCONSTANTS.refreshStatusTime;// * 5;
|
|
||||||
|
|
||||||
protected int maxFailureTries;
|
|
||||||
private static String pluginName = "SmartGenericWorker";//"GenericWorker";
|
|
||||||
|
|
||||||
protected String scope;
|
|
||||||
protected String session;
|
|
||||||
|
|
||||||
protected boolean yetstopped;
|
|
||||||
protected boolean messagesresent;
|
|
||||||
protected float status;
|
|
||||||
protected boolean abort;
|
|
||||||
protected boolean shutdown;
|
|
||||||
|
|
||||||
protected List<String> eprs;
|
|
||||||
protected int activeNodes;
|
|
||||||
protected int computingNodes;
|
|
||||||
protected int numberOfMessages;
|
|
||||||
protected int totalNumberOfMessages;
|
|
||||||
protected int actualNumberOfNodes;
|
|
||||||
protected int totalNumberOfStages;
|
|
||||||
public int currentNumberOfStages;
|
|
||||||
|
|
||||||
// files management
|
|
||||||
protected List<String> filenames;
|
|
||||||
protected List<String> fileurls;
|
|
||||||
|
|
||||||
// queue parameters
|
|
||||||
protected String queueName;
|
|
||||||
protected String queueResponse;
|
|
||||||
protected String queueURL;
|
|
||||||
protected String queueUSER;
|
|
||||||
protected String queuePWD;
|
|
||||||
protected org.gcube.dataanalysis.executor.messagequeue.Consumer consumer;
|
|
||||||
protected Producer producer;
|
|
||||||
|
|
||||||
Timer broadcastTimer;
|
|
||||||
Timer computationWatcherTimer;
|
|
||||||
ComputationTimerWatcher computationWatcher;
|
|
||||||
String serviceClass;
|
|
||||||
String serviceName;
|
|
||||||
String owner;
|
|
||||||
String localDir;
|
|
||||||
String remoteDir;
|
|
||||||
String outputDir;
|
|
||||||
String script;
|
|
||||||
List<String> arguments;
|
|
||||||
String configuration;
|
|
||||||
boolean deletefiles;
|
|
||||||
StatusListener statuslistener;
|
|
||||||
|
|
||||||
private void resetAllVars() {
|
|
||||||
scope = null;
|
|
||||||
|
|
||||||
yetstopped = false;
|
|
||||||
messagesresent = false;
|
|
||||||
status = 0;
|
|
||||||
abort = false;
|
|
||||||
shutdown = false;
|
|
||||||
|
|
||||||
eprs = null;
|
|
||||||
activeNodes = 0;
|
|
||||||
computingNodes = 0;
|
|
||||||
numberOfMessages = 0;
|
|
||||||
|
|
||||||
actualNumberOfNodes = 0;
|
|
||||||
filenames = null;
|
|
||||||
fileurls = null;
|
|
||||||
|
|
||||||
queueName = null;
|
|
||||||
queueResponse = null;
|
|
||||||
queueURL = null;
|
|
||||||
queueUSER = null;
|
|
||||||
queuePWD = null;
|
|
||||||
consumer = null;
|
|
||||||
producer = null;
|
|
||||||
broadcastTimer = null;
|
|
||||||
computationWatcherTimer = null;
|
|
||||||
computationWatcher = null;
|
|
||||||
serviceClass = null;
|
|
||||||
serviceName = null;
|
|
||||||
owner = null;
|
|
||||||
localDir = null;
|
|
||||||
remoteDir = null;
|
|
||||||
outputDir = null;
|
|
||||||
script = null;
|
|
||||||
arguments = null;
|
|
||||||
configuration = null;
|
|
||||||
deletefiles = false;
|
|
||||||
statuslistener = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getActiveNodes() {
|
|
||||||
return computingNodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
public float getStatus() {
|
|
||||||
float innerStatus = 0;
|
|
||||||
if (totalNumberOfMessages != 0)
|
|
||||||
innerStatus = (1f - ((float) numberOfMessages / (float) totalNumberOfMessages));
|
|
||||||
if (totalNumberOfStages == 0)
|
|
||||||
return innerStatus;
|
|
||||||
else {
|
|
||||||
float offset = ((float) Math.max(currentNumberOfStages - 1, 0)) / (float) totalNumberOfStages;
|
|
||||||
float status = offset + (innerStatus / (float) totalNumberOfStages);
|
|
||||||
// AnalysisLogger.getLogger().info("stages: "+totalNumberOfStages+" inner status: "+innerStatus+" currentStage: "+currentNumberOfStages+" status: "+status);
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// there is only one node from the client point of view
|
|
||||||
public int getNumberOfNodes() {
|
|
||||||
if (eprs.size() > 0)
|
|
||||||
return 1;
|
|
||||||
else
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setNumberOfNodes(int newNumberOfNodes) {
|
|
||||||
// ignore this setting in this case
|
|
||||||
}
|
|
||||||
|
|
||||||
private void init(String scope, int numberOfNodes) throws Exception {
|
|
||||||
resetAllVars();
|
|
||||||
// init scope variables
|
|
||||||
this.scope = scope;
|
|
||||||
// introduce a session
|
|
||||||
// initialize flags
|
|
||||||
shutdown = false;
|
|
||||||
yetstopped = false;
|
|
||||||
messagesresent = false;
|
|
||||||
abort = false;
|
|
||||||
// find all the nodes - initialize the eprs
|
|
||||||
findNodes(scope);
|
|
||||||
}
|
|
||||||
|
|
||||||
public QueueJobManager(String scope, int numberOfNodes, String session) throws Exception {
|
|
||||||
init(scope, numberOfNodes);
|
|
||||||
this.session = session;
|
|
||||||
}
|
|
||||||
|
|
||||||
public QueueJobManager(String scope, int numberOfNodes, List<String> eprs, String session) throws Exception {
|
|
||||||
init(scope, numberOfNodes);
|
|
||||||
this.eprs = eprs;
|
|
||||||
this.session = session;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setGlobalVars(String serviceClass, String serviceName, String owner, String localDir, String remoteDir, String outputDir, String script, List<String> arguments, String configuration, boolean deletefiles) {
|
|
||||||
this.serviceClass = serviceClass;
|
|
||||||
this.serviceName = serviceName;
|
|
||||||
this.owner = owner;
|
|
||||||
this.localDir = localDir;
|
|
||||||
this.remoteDir = remoteDir;
|
|
||||||
this.outputDir = outputDir;
|
|
||||||
this.script = script;
|
|
||||||
this.arguments = arguments;
|
|
||||||
this.configuration = configuration;
|
|
||||||
this.deletefiles = deletefiles;
|
|
||||||
}
|
|
||||||
|
|
||||||
private int totalmessages = 0;
|
|
||||||
|
|
||||||
public boolean uploadAndExecuteChunkized(String serviceClass, String serviceName, String owner, String localDir, String remoteDir, String outputDir, String script, List<String> arguments, String configuration, boolean deletefiles, boolean forceUpload) throws Exception {
|
|
||||||
long t0 = System.currentTimeMillis();
|
|
||||||
|
|
||||||
int elements = arguments.size();
|
|
||||||
/*generic-worker
|
|
||||||
* int div = elements / (maxNumberOfMessages); int rest = elements % (maxNumberOfMessages); if (rest > 0) div++; if (div == 0) { div = 1; }
|
|
||||||
*/
|
|
||||||
if (session == null || session.length()==0)
|
|
||||||
session = (("" + UUID.randomUUID()).replace("-", "") + Math.random()).replace(".", "");
|
|
||||||
int[] chunkSizes = null;
|
|
||||||
//up to 1120 species we don't make stages
|
|
||||||
if (elements>maxNumberOfStages)
|
|
||||||
chunkSizes = Operations.takeChunks(elements, maxNumberOfStages);
|
|
||||||
else {
|
|
||||||
chunkSizes = new int[1];
|
|
||||||
chunkSizes[0]=elements;
|
|
||||||
}
|
|
||||||
int allchunks = chunkSizes.length;
|
|
||||||
totalNumberOfStages = allchunks;
|
|
||||||
currentNumberOfStages = 0;
|
|
||||||
int start = 0;
|
|
||||||
totalmessages = 0;
|
|
||||||
AnalysisLogger.getLogger().info("Starting the computation in "+allchunks+" stages");
|
|
||||||
for (int i = 0; i < allchunks; i++) {
|
|
||||||
numberOfMessages = totalNumberOfMessages = 0;
|
|
||||||
currentNumberOfStages++;
|
|
||||||
int end = Math.min(elements, start + chunkSizes[i]);
|
|
||||||
AnalysisLogger.getLogger().info("Computing the chunk number " + (i + 1) + " of " + allchunks + " between " + start + " and " + (end - 1));
|
|
||||||
List<String> sublist = new ArrayList<String>();
|
|
||||||
for (int j = start; j < end; j++)
|
|
||||||
sublist.add(arguments.get(j));
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("size sub:" + sublist.size());
|
|
||||||
// totalmessages=totalmessages+sublist.size();
|
|
||||||
uploadAndExecute(serviceClass, serviceName, owner, localDir, remoteDir, outputDir, script, sublist, configuration, deletefiles, forceUpload);
|
|
||||||
if (abort)
|
|
||||||
break;
|
|
||||||
start = end;
|
|
||||||
AnalysisLogger.getLogger().info("Processed chunk number " + (i + 1));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
currentNumberOfStages = totalNumberOfStages;
|
|
||||||
AnalysisLogger.getLogger().info("Finished computation on all chunks and messages " + totalmessages);
|
|
||||||
AnalysisLogger.getLogger().info("Whole Procedure done in " + (System.currentTimeMillis() - t0) + " ms");
|
|
||||||
return (!abort);
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean uploadAndExecute(String serviceClass, String serviceName, String owner, String localDir, String remoteDir, String outputDir, String script, List<String> arguments, String configuration, boolean deletefiles, boolean forceUpload) throws Exception {
|
|
||||||
int numberOfRetries = maxNumberOfComputationRetries;
|
|
||||||
boolean recompute = true;
|
|
||||||
|
|
||||||
while ((numberOfRetries > 0) && (recompute)) {
|
|
||||||
long t0 = System.currentTimeMillis();
|
|
||||||
// if (numberOfRetries<maxNumberOfComputationRetries)
|
|
||||||
init(scope, 1);
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Computation Try number " + (maxNumberOfComputationRetries + 1 - numberOfRetries));
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Contacting " + actualNumberOfNodes + " Nodes");
|
|
||||||
// set globals
|
|
||||||
setGlobalVars(serviceClass, serviceName, owner, localDir, remoteDir, outputDir, script, arguments, configuration, deletefiles);
|
|
||||||
// initializing queue
|
|
||||||
setQueueVariables();
|
|
||||||
// if not yet uploaded , upload required files
|
|
||||||
uploadFilesOnStorage(forceUpload);
|
|
||||||
// broadcast a message to all executors for purging previous queues
|
|
||||||
// purgeQueues();
|
|
||||||
createClientProducer();
|
|
||||||
broadcastListenCommandToExecutorNodes();
|
|
||||||
|
|
||||||
maxFailureTries = activeNodes * 1;
|
|
||||||
|
|
||||||
broadcastTimer = new Timer();
|
|
||||||
broadcastTimer.schedule(new Broadcaster(), broadcastTimePeriod, broadcastTimePeriod);
|
|
||||||
|
|
||||||
computationWatcherTimer = new Timer();
|
|
||||||
computationWatcher = new ComputationTimerWatcher(maxSilenceTimeBeforeComputationStop);
|
|
||||||
computationWatcherTimer.schedule(computationWatcher, computationWatcherTimerPeriod, computationWatcherTimerPeriod);
|
|
||||||
|
|
||||||
// send all messages
|
|
||||||
sendMessages();
|
|
||||||
createClientConsumer();
|
|
||||||
// wait for messages
|
|
||||||
waitForMessages();
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Wait for message finished - checking result");
|
|
||||||
if (numberOfMessages == 0) {
|
|
||||||
AnalysisLogger.getLogger().info("All tasks have correctly finished!");
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* else{ AnalysisLogger.getLogger().info("Timeout - Warning Some Task is missing!"); for (int k=0;k<finishedChunks.length;k++){ if (finishedChunks[k]==0){ AnalysisLogger.getLogger().info("Sending Again message number " + k); Map<String, Object> inputs = generateInputMessage(filenames, fileurls, outputDir, script, arguments.get(k), k, scope, serviceClass, serviceName, owner, remoteDir, session, configuration, deletefiles); producer.sendMessage(inputs, 0); AnalysisLogger.getLogger().info("Sent Message " + k); } } waitForMessages(); if (numberOfMessages>0){ abort = true; } }
|
|
||||||
*/
|
|
||||||
|
|
||||||
// deleteRemoteFolder();
|
|
||||||
// summary
|
|
||||||
AnalysisLogger.getLogger().info("-SUMMARY-");
|
|
||||||
for (int i = 0; i < totalNumberOfMessages; i++) {
|
|
||||||
if (activeMessages[i])
|
|
||||||
AnalysisLogger.getLogger().info("Error : the Message Number " + i + " Was Never Processed!");
|
|
||||||
if (resentMessages[i] > 0) {
|
|
||||||
messagesresent = true;
|
|
||||||
AnalysisLogger.getLogger().info("Warning : the Message Number " + i + " Was resent " + resentMessages[i] + " Times");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AnalysisLogger.getLogger().info("-SUMMARY END-");
|
|
||||||
|
|
||||||
stop();
|
|
||||||
AnalysisLogger.getLogger().info("Stopped");
|
|
||||||
AnalysisLogger.getLogger().info("Single Step Procedure done in " + (System.currentTimeMillis() - t0) + " ms");
|
|
||||||
activeNodes = 0;
|
|
||||||
numberOfRetries--;
|
|
||||||
if (abort) {
|
|
||||||
recompute = true;
|
|
||||||
if (numberOfRetries > 0)
|
|
||||||
Thread.sleep(10000);
|
|
||||||
} else
|
|
||||||
recompute = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return (!abort);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasResentMessages() {
|
|
||||||
return messagesresent;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void waitForMessages() throws Exception {
|
|
||||||
AnalysisLogger.getLogger().info("Waiting...");
|
|
||||||
while ((numberOfMessages > 0) && (!abort)) {
|
|
||||||
Thread.sleep(2000);
|
|
||||||
|
|
||||||
// long tcurrent = System.currentTimeMillis();
|
|
||||||
// if ((tcurrent - waitTime) > maxwaitingTime) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
AnalysisLogger.getLogger().info("...Stop - Abort?" + abort);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean wasAborted() {
|
|
||||||
return abort;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void purgeQueues() throws Exception {
|
|
||||||
AnalysisLogger.getLogger().info("Purging Queue");
|
|
||||||
List<WorkerWatcher> tasksProxies = new ArrayList<WorkerWatcher>();
|
|
||||||
for (int j = 0; j < actualNumberOfNodes; j++) {
|
|
||||||
try {
|
|
||||||
contactNodes(tasksProxies, j, queueName, queueUSER, queuePWD, queueURL, queueResponse, session, "true");
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
AnalysisLogger.getLogger().info("Error in purgin queue on node " + j);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AnalysisLogger.getLogger().info("Queue Purged");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stop() {
|
|
||||||
try {
|
|
||||||
if (!yetstopped) {
|
|
||||||
if (broadcastTimer != null) {
|
|
||||||
AnalysisLogger.getLogger().info("Stopping Broadcaster");
|
|
||||||
broadcastTimer.cancel();
|
|
||||||
broadcastTimer.purge();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (computationWatcherTimer != null) {
|
|
||||||
AnalysisLogger.getLogger().info("Stopping Watcher");
|
|
||||||
computationWatcherTimer.cancel();
|
|
||||||
computationWatcherTimer.purge();
|
|
||||||
}
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Purging Status Listener");
|
|
||||||
|
|
||||||
if (statuslistener != null)
|
|
||||||
statuslistener.destroyAllWatchers();
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Stopping Producer and Consumer");
|
|
||||||
|
|
||||||
try{
|
|
||||||
producer.stop();
|
|
||||||
producer.closeSession();
|
|
||||||
}catch(Exception e1){}
|
|
||||||
try{
|
|
||||||
consumer.stop();
|
|
||||||
consumer.closeSession();
|
|
||||||
}catch(Exception e2){}
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Purging Remote Queues");
|
|
||||||
purgeQueues();
|
|
||||||
|
|
||||||
yetstopped = true;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
AnalysisLogger.getLogger().info("Not completely stopped");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void contactNodes(List<WorkerWatcher> tasksProxies, int order, String queueName, String queueUSER, String queuePWD, String queueURL, String queueResponse, String session, String purgeQueue) throws Exception {
|
|
||||||
// generate the input map according to the arguments
|
|
||||||
Map<String, Object> inputs = generateWorkerInput(queueName, queueUSER, queuePWD, queueURL, queueResponse, session, purgeQueue);
|
|
||||||
AnalysisLogger.getLogger().info("Inputs " + inputs);
|
|
||||||
// take the i-th endpoint of the executor
|
|
||||||
String selectedEPR = eprs.get(order);
|
|
||||||
AnalysisLogger.getLogger().info("Broadcasting to node " + (order + 1) + " on " + selectedEPR);
|
|
||||||
|
|
||||||
|
|
||||||
/*OLD EXECUTOR CALL
|
|
||||||
// run the executor script
|
|
||||||
ExecutorCall call = new ExecutorCall(pluginName, gscope);
|
|
||||||
call.setEndpointReference(selectedEPR);
|
|
||||||
TaskCall task = null;
|
|
||||||
AnalysisLogger.getLogger().info("EPR:" + selectedEPR);
|
|
||||||
task = call.launch(inputs);
|
|
||||||
// AnalysisLogger.getLogger().info("Task EPR:" + task.getEndpointReference());
|
|
||||||
TaskProxy proxy = task.getProxy();
|
|
||||||
tasksProxies.add(new WorkerWatcher(proxy, AnalysisLogger.getLogger()));
|
|
||||||
// AnalysisLogger.getLogger().info("Contacting node " + (order + 1) + " OK on " + selectedEPR);
|
|
||||||
*/
|
|
||||||
|
|
||||||
// ScopeProvider.instance.set(scope);
|
|
||||||
|
|
||||||
ExecutorPlugin runExecutorPlugin = new ExecutorPlugin();
|
|
||||||
SmartExecutorPluginQuery runQuery = new SmartExecutorPluginQuery(runExecutorPlugin);
|
|
||||||
|
|
||||||
/* TODO Add key_value filter here
|
|
||||||
* Tuple<String, String>[] tuples = new Tuple[n];
|
|
||||||
*
|
|
||||||
* runQuery.addConditions(pluginName, tuples);
|
|
||||||
*/
|
|
||||||
runQuery.addConditions(pluginName);
|
|
||||||
SpecificEndpointDiscoveryFilter sedf = new SpecificEndpointDiscoveryFilter(selectedEPR);
|
|
||||||
runQuery.setEndpointDiscoveryFilter(sedf);
|
|
||||||
SmartExecutorProxy proxy = new ProxyBuilderImpl<SmartExecutor, SmartExecutorProxy>(runExecutorPlugin, runQuery).build();
|
|
||||||
|
|
||||||
//AnalysisLogger.getLogger().debug("Launching Smart Executor in namely Scope: "+scope+" real scope "+ScopeProvider.instance.get());
|
|
||||||
AnalysisLogger.getLogger().debug("Launching Smart Executor in namely Scope: "+scope);
|
|
||||||
LaunchParameter launchParameter = new LaunchParameter(pluginName, inputs);
|
|
||||||
String excecutionIdentifier = proxy.launch(launchParameter);
|
|
||||||
tasksProxies.add(new WorkerWatcher(proxy, excecutionIdentifier, AnalysisLogger.getLogger()));
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Contacting node " + (order + 1) + " OK on " + selectedEPR);
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private List<String> getFilteredEndpoints(String scopeString){
|
|
||||||
ScopeProvider.instance.set(scopeString);
|
|
||||||
|
|
||||||
ExecutorPlugin executorPlugin = new ExecutorPlugin();
|
|
||||||
SmartExecutorPluginQuery query = new SmartExecutorPluginQuery(executorPlugin);
|
|
||||||
|
|
||||||
/*
|
|
||||||
add key_value filter here
|
|
||||||
* Tuple<String, String>[] tuples = new Tuple[n];
|
|
||||||
*
|
|
||||||
* runQuery.addConditions(pluginName, tuples);
|
|
||||||
*/
|
|
||||||
|
|
||||||
query.addConditions(pluginName);
|
|
||||||
|
|
||||||
/* Used to add extra filter to ServiceEndpoint discovery */
|
|
||||||
query.setServiceEndpointQueryFilter(null);
|
|
||||||
List<String> nodes = query.discoverEndpoints(new ListEndpointDiscoveryFilter());
|
|
||||||
AnalysisLogger.getLogger().debug("Found the following nodes: "+nodes+" in scope "+scopeString);
|
|
||||||
return nodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private int findNodes(String scopeString) throws Exception {
|
|
||||||
eprs = getFilteredEndpoints(scopeString);
|
|
||||||
actualNumberOfNodes = eprs.size();
|
|
||||||
return actualNumberOfNodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
private int findNodes(String scopeString) throws Exception {
|
|
||||||
AnalysisLogger.getLogger().debug("SCOPE:"+scopeString);
|
|
||||||
GCUBEScope scope = GCUBEScope.getScope(scopeString);
|
|
||||||
ISClient client = GHNContext.getImplementation(ISClient.class);
|
|
||||||
WSResourceQuery wsquery = client.getQuery(WSResourceQuery.class);
|
|
||||||
wsquery.addAtomicConditions(new AtomicCondition("//gc:ServiceName", "Executor"));
|
|
||||||
wsquery.addAtomicConditions(new AtomicCondition("/child::*[local-name()='Task']/name[text()='" + pluginName + "']", pluginName));
|
|
||||||
List<RPDocument> listdoc = client.execute(wsquery, scope);
|
|
||||||
EndpointReferenceType epr = null;
|
|
||||||
eprs = new ArrayList<EndpointReferenceType>();
|
|
||||||
int numberOfEP = 0;
|
|
||||||
for (RPDocument resource : listdoc) {
|
|
||||||
epr = resource.getEndpoint();
|
|
||||||
numberOfEP++;
|
|
||||||
eprs.add(epr);
|
|
||||||
}
|
|
||||||
AnalysisLogger.getLogger().info("Found " + numberOfEP + " endpoints");
|
|
||||||
// get current number of available nodes
|
|
||||||
actualNumberOfNodes = eprs.size();
|
|
||||||
return numberOfEP;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
public String getQueueURL(String scope) throws Exception{
|
|
||||||
//set the scope provider first!
|
|
||||||
//<Category>Service</Category>
|
|
||||||
//<Name>MessageBroker</Name>
|
|
||||||
|
|
||||||
ScopeProvider.instance.set(scope);
|
|
||||||
SimpleQuery query = queryFor(ServiceEndpoint.class);
|
|
||||||
query.addCondition("$resource/Profile/Category/text() eq 'Service' and $resource/Profile/Name eq 'MessageBroker' ");
|
|
||||||
DiscoveryClient<ServiceEndpoint> client = clientFor(ServiceEndpoint.class);
|
|
||||||
List<ServiceEndpoint> resources = client.submit(query);
|
|
||||||
if (resources==null || resources.size()==0){
|
|
||||||
throw new Exception("No Message-Queue available in scope "+scope);
|
|
||||||
}
|
|
||||||
else{
|
|
||||||
AccessPoint ap = resources.get(0).profile().accessPoints().iterator().next();
|
|
||||||
String queue = ap.address();
|
|
||||||
AnalysisLogger.getLogger().debug("Found AMQ Url : "+queue);
|
|
||||||
return queue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setQueueVariables() throws Exception {
|
|
||||||
queueName = "D4ScienceJob"; // + session;
|
|
||||||
queueResponse = queueName + "Response"+session;
|
|
||||||
//general scope
|
|
||||||
|
|
||||||
|
|
||||||
// TODO Check THIS
|
|
||||||
// queueURL = gscope.getServiceMap().getEndpoints(GHNContext.MSGBROKER).iterator().next().getAddress().toString();
|
|
||||||
queueURL = getQueueURL(scope);
|
|
||||||
|
|
||||||
|
|
||||||
//tests on ecosystem
|
|
||||||
//TODO: delete this!
|
|
||||||
// queueURL = "tcp://ui.grid.research-infrastructures.eu:6166";
|
|
||||||
// queueURL = "tcp://message-broker.d4science.research-infrastructures.eu:6166";
|
|
||||||
AnalysisLogger.getLogger().info("Queue for the scope: " + queueURL);
|
|
||||||
if (queueURL==null){
|
|
||||||
if (scope.startsWith("/gcube"))
|
|
||||||
queueURL = "tcp://ui.grid.research-infrastructures.eu:6166";
|
|
||||||
else
|
|
||||||
queueURL = "tcp://message-broker.d4science.research-infrastructures.eu:6166";
|
|
||||||
}
|
|
||||||
queueUSER = ActiveMQConnection.DEFAULT_USER;
|
|
||||||
queuePWD = ActiveMQConnection.DEFAULT_PASSWORD;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void deleteRemoteFolder() throws Exception {
|
|
||||||
ScopeProvider.instance.set(scope);
|
|
||||||
IClient client = new StorageClient(serviceClass, serviceName, owner, AccessType.SHARED,MemoryType.VOLATILE).getClient();
|
|
||||||
// IClient client = new StorageClient(serviceClass, serviceName, owner, AccessType.SHARED, gscope).getClient();
|
|
||||||
AnalysisLogger.getLogger().info("Removing Remote Dir " + remoteDir);
|
|
||||||
client.removeDir().RDir(remoteDir);
|
|
||||||
AnalysisLogger.getLogger().info("Removed");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void uploadFilesOnStorage(boolean forceupload) throws Exception {
|
|
||||||
ScopeProvider.instance.set(scope);
|
|
||||||
IClient client = new StorageClient(serviceClass, serviceName, owner, AccessType.SHARED, MemoryType.VOLATILE).getClient();
|
|
||||||
// IClient client = new StorageClient(serviceClass, serviceName, owner, AccessType.SHARED, gscope).getClient();
|
|
||||||
File dir = new File(localDir);
|
|
||||||
File[] files = dir.listFiles();
|
|
||||||
AnalysisLogger.getLogger().info("Start uploading");
|
|
||||||
filenames = new ArrayList<String>();
|
|
||||||
fileurls = new ArrayList<String>();
|
|
||||||
boolean uploadFiles = forceupload;
|
|
||||||
// if we do not force upload then check if the folder is yet there
|
|
||||||
if (!uploadFiles) {
|
|
||||||
List<StorageObject> remoteObjects = client.showDir().RDir(remoteDir);
|
|
||||||
// only upload files if they are not yet uploaded
|
|
||||||
if (remoteObjects.size() == 0)
|
|
||||||
uploadFiles = true;
|
|
||||||
}
|
|
||||||
if (!uploadFiles)
|
|
||||||
AnalysisLogger.getLogger().info("Unnecessary to Uploading Files");
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Loading files");
|
|
||||||
//patch for concurrent uploads
|
|
||||||
String tempdir = ""+UUID.randomUUID()+"/";
|
|
||||||
for (File sfile : files) {
|
|
||||||
if (sfile.getName().startsWith("."))
|
|
||||||
continue;
|
|
||||||
|
|
||||||
String localf = sfile.getAbsolutePath();
|
|
||||||
String filename = sfile.getName();
|
|
||||||
|
|
||||||
String remotef = remoteDir + tempdir+sfile.getName();
|
|
||||||
if (uploadFiles) {
|
|
||||||
client.put(true).LFile(localf).RFile(remotef);
|
|
||||||
AnalysisLogger.getLogger().info("Uploading File "+localf+" as remote file "+remotef);
|
|
||||||
}
|
|
||||||
String url = client.getUrl().RFile(remotef);
|
|
||||||
// AnalysisLogger.getLogger().info("URL obtained: " + url);
|
|
||||||
filenames.add(filename);
|
|
||||||
fileurls.add(url);
|
|
||||||
}
|
|
||||||
AnalysisLogger.getLogger().info("Loading finished");
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private void broadcastListenCommandToExecutorNodes() throws Exception {
|
|
||||||
AnalysisLogger.getLogger().info("Submitting script to Remote Queue " + queueName);
|
|
||||||
List<WorkerWatcher> tasksProxies = new ArrayList<WorkerWatcher>();
|
|
||||||
try{
|
|
||||||
findNodes(scope);
|
|
||||||
}catch(Exception e){
|
|
||||||
AnalysisLogger.getLogger().info("Error in Finding nodes - using previous value");
|
|
||||||
}
|
|
||||||
activeNodes = actualNumberOfNodes;
|
|
||||||
// launch the tasks
|
|
||||||
for (int i = 0; i < actualNumberOfNodes; i++) {
|
|
||||||
try {
|
|
||||||
contactNodes(tasksProxies, i, queueName, queueUSER, queuePWD, queueURL, queueResponse, session, "false");
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
AnalysisLogger.getLogger().info("Error in Contacting nodes");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createClientProducer() throws Exception {
|
|
||||||
AnalysisLogger.getLogger().info("Creating Message Queue and Producer");
|
|
||||||
// create the Producer
|
|
||||||
QueueManager qm = new QueueManager();
|
|
||||||
qm.createAndConnect(queueUSER, queuePWD, queueURL, queueName);
|
|
||||||
producer = new Producer(qm, queueName);
|
|
||||||
AnalysisLogger.getLogger().info("Producer OK");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createClientConsumer() throws Exception {
|
|
||||||
AnalysisLogger.getLogger().info("Creating Response Message Queue and Consumer");
|
|
||||||
// create the listener
|
|
||||||
statuslistener = new StatusListener();
|
|
||||||
QueueManager qm1 = new QueueManager();
|
|
||||||
qm1.createAndConnect(queueUSER, queuePWD, queueURL, queueResponse);
|
|
||||||
consumer = new Consumer(qm1, statuslistener, statuslistener, queueResponse);
|
|
||||||
AnalysisLogger.getLogger().info("Consumers OK");
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean activeMessages[];
|
|
||||||
public int resentMessages[];
|
|
||||||
|
|
||||||
private void sendMessages() throws Exception {
|
|
||||||
int i = 0;
|
|
||||||
numberOfMessages = arguments.size();
|
|
||||||
totalNumberOfMessages = numberOfMessages;
|
|
||||||
AnalysisLogger.getLogger().info("Messages To Send " + numberOfMessages);
|
|
||||||
activeMessages = new boolean[numberOfMessages];
|
|
||||||
resentMessages = new int[numberOfMessages];
|
|
||||||
for (String argum : arguments) {
|
|
||||||
Map<String, Object> inputs = generateInputMessage(filenames, fileurls, outputDir, script, argum, i, scope, serviceClass, serviceName, owner, remoteDir, session, configuration, deletefiles, false);
|
|
||||||
producer.sendMessage(inputs, 0);
|
|
||||||
AnalysisLogger.getLogger().info("Send " + i);
|
|
||||||
activeMessages[i] = true;
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
AnalysisLogger.getLogger().info("Messages Sent " + numberOfMessages);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<String, Object> generateInputMessage(Object filenames, Object fileurls, String outputDir, String script, String argum, int i, String scope, String serviceClass, String serviceName, String owner, String remoteDir, String session, String configuration, boolean deletefiles, boolean duplicateMessage) {
|
|
||||||
Map<String, Object> inputs = new HashMap<String, Object>();
|
|
||||||
|
|
||||||
inputs.put(ATTRIBUTE.FILE_NAMES.name(), filenames);
|
|
||||||
inputs.put(ATTRIBUTE.FILE_URLS.name(), fileurls);
|
|
||||||
inputs.put(ATTRIBUTE.OUTPUTDIR.name(), outputDir);
|
|
||||||
inputs.put(ATTRIBUTE.SCRIPT.name(), script);
|
|
||||||
inputs.put(ATTRIBUTE.ARGUMENTS.name(), argum + " " + duplicateMessage);
|
|
||||||
inputs.put(ATTRIBUTE.ORDER.name(), "" + i);
|
|
||||||
inputs.put(ATTRIBUTE.SCOPE.name(), scope);
|
|
||||||
inputs.put(ATTRIBUTE.SERVICE_CLASS.name(), serviceClass);
|
|
||||||
inputs.put(ATTRIBUTE.SERVICE_NAME.name(), serviceName);
|
|
||||||
inputs.put(ATTRIBUTE.OWNER.name(), owner);
|
|
||||||
inputs.put(ATTRIBUTE.REMOTEDIR.name(), remoteDir);
|
|
||||||
inputs.put(ATTRIBUTE.CLEAN_CACHE.name(), "" + deletefiles);
|
|
||||||
inputs.put(ATTRIBUTE.QSESSION.name(), session);
|
|
||||||
inputs.put(ATTRIBUTE.CONFIGURATION.name(), configuration);
|
|
||||||
inputs.put(ATTRIBUTE.TOPIC_RESPONSE_NAME.name(), queueResponse);
|
|
||||||
inputs.put(ATTRIBUTE.QUEUE_USER.name(), queueUSER);
|
|
||||||
inputs.put(ATTRIBUTE.QUEUE_PASSWORD.name(), queuePWD);
|
|
||||||
inputs.put(ATTRIBUTE.QUEUE_URL.name(), queueURL);
|
|
||||||
return inputs;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<String, Object> generateWorkerInput(String queueName, String queueUser, String queuePassword, String queueURL, String queueResponse, String session, String purge) {
|
|
||||||
|
|
||||||
Map<String, Object> inputs = new HashMap<String, Object>();
|
|
||||||
|
|
||||||
inputs.put(ATTRIBUTE.TOPIC_NAME.name(), ScriptIOWorker.toInputString(queueName));
|
|
||||||
inputs.put(ATTRIBUTE.QUEUE_USER.name(), ScriptIOWorker.toInputString(queueUser));
|
|
||||||
inputs.put(ATTRIBUTE.QUEUE_PASSWORD.name(), ScriptIOWorker.toInputString(queuePassword));
|
|
||||||
inputs.put(ATTRIBUTE.QUEUE_URL.name(), ScriptIOWorker.toInputString(queueURL));
|
|
||||||
inputs.put(ATTRIBUTE.TOPIC_RESPONSE_NAME.name(), ScriptIOWorker.toInputString(queueResponse));
|
|
||||||
inputs.put(ATTRIBUTE.QSESSION.name(), session);
|
|
||||||
inputs.put(ATTRIBUTE.ERASE.name(), purge);
|
|
||||||
return inputs;
|
|
||||||
}
|
|
||||||
|
|
||||||
public class Broadcaster extends TimerTask {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
AnalysisLogger.getLogger().info("(((((((((((((((((((((((((((------Broadcasting Information To Watchers------)))))))))))))))))))))))))))");
|
|
||||||
broadcastListenCommandToExecutorNodes();
|
|
||||||
AnalysisLogger.getLogger().info("(((((((((((((((((((((((((((------END Broadcasting Information To Watchers------)))))))))))))))))))))))))))");
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
AnalysisLogger.getLogger().info("--------------------------------Broadcaster: Error Sending Listen Message to Executors------)))))))))))))))))))))))))))");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public class ComputationTimerWatcher extends TimerTask {
|
|
||||||
|
|
||||||
long maxTime;
|
|
||||||
long lastTimeClock;
|
|
||||||
|
|
||||||
public ComputationTimerWatcher(long maxtime) {
|
|
||||||
this.maxTime = maxtime;
|
|
||||||
this.lastTimeClock = System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void reset() {
|
|
||||||
lastTimeClock = System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setmaxTime(long maxTime) {
|
|
||||||
this.maxTime = maxTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
long t0 = System.currentTimeMillis();
|
|
||||||
AnalysisLogger.getLogger().info("Computation Watcher Timing Is " + (t0 - lastTimeClock)+" max computation time is "+maxTime);
|
|
||||||
if ((t0 - lastTimeClock) > maxTime) {
|
|
||||||
AnalysisLogger.getLogger().info("Computation Watcher - Computation Timeout: Closing Queue Job Manager!!!");
|
|
||||||
abort();
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
AnalysisLogger.getLogger().info("Error Taking clock");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void abort() {
|
|
||||||
AnalysisLogger.getLogger().info("Computation Aborted");
|
|
||||||
this.abort = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public class StatusListener implements MessageListener, ExceptionListener {
|
|
||||||
|
|
||||||
private QueueWorkerWatcher[] watchers;
|
|
||||||
|
|
||||||
synchronized public void onException(JMSException ex) {
|
|
||||||
abort();
|
|
||||||
AnalysisLogger.getLogger().info("JMS Exception occured. Shutting down client.");
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized void addWatcher(int order) {
|
|
||||||
if (watchers == null)
|
|
||||||
watchers = new QueueWorkerWatcher[totalNumberOfMessages];
|
|
||||||
|
|
||||||
QueueWorkerWatcher watcher = watchers[order];
|
|
||||||
if (watcher != null) {
|
|
||||||
destroyWatcher(order);
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, Object> message = generateInputMessage(filenames, fileurls, outputDir, script, arguments.get(order), order, scope, serviceClass, serviceName, owner, remoteDir, session, configuration, deletefiles, true);
|
|
||||||
watchers[order] = new QueueWorkerWatcher(producer, message, order);
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized void resetWatcher(int order) {
|
|
||||||
if (watchers == null)
|
|
||||||
watchers = new QueueWorkerWatcher[totalNumberOfMessages];
|
|
||||||
else if (watchers[order] != null)
|
|
||||||
watchers[order].resetTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized void destroyWatcher(int order) {
|
|
||||||
if (watchers != null && watchers[order] != null) {
|
|
||||||
if (watchers[order].hasResent())
|
|
||||||
resentMessages[order] = resentMessages[order] + 1;
|
|
||||||
|
|
||||||
watchers[order].destroy();
|
|
||||||
watchers[order] = null;
|
|
||||||
AnalysisLogger.getLogger().info("Destroyed Watcher number " + order);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void destroyAllWatchers() {
|
|
||||||
if (watchers != null) {
|
|
||||||
for (int i = 0; i < watchers.length; i++) {
|
|
||||||
destroyWatcher(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void onMessage(Message message) {
|
|
||||||
|
|
||||||
// get message
|
|
||||||
try {
|
|
||||||
|
|
||||||
HashMap<String, Object> details = (HashMap<String, Object>) (HashMap<String, Object>) message.getObjectProperty(ATTRIBUTE.CONTENT.name());
|
|
||||||
String status = (String) details.get(ATTRIBUTE.STATUS.name());
|
|
||||||
String order = "" + details.get(ATTRIBUTE.ORDER.name());
|
|
||||||
String nodeaddress = (String) details.get(ATTRIBUTE.NODE.name());
|
|
||||||
String msession = (String) details.get(ATTRIBUTE.QSESSION.name());
|
|
||||||
Object error = details.get(ATTRIBUTE.ERROR.name());
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Current session " + session);
|
|
||||||
if ((msession != null) && (msession.equals(session))) {
|
|
||||||
AnalysisLogger.getLogger().info("Session " + session + " is right - acknowledge");
|
|
||||||
message.acknowledge();
|
|
||||||
AnalysisLogger.getLogger().info("Session " + session + " acknowledged");
|
|
||||||
int orderInt = -1;
|
|
||||||
try {
|
|
||||||
orderInt = Integer.parseInt(order);
|
|
||||||
} catch (Exception e3) {
|
|
||||||
e3.printStackTrace();
|
|
||||||
}
|
|
||||||
if (orderInt > -1) {
|
|
||||||
|
|
||||||
// reset the watcher
|
|
||||||
if (computationWatcher!=null)
|
|
||||||
computationWatcher.reset();
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Task number " + order + " is " + status + " on node " + nodeaddress + " and session " + session);
|
|
||||||
|
|
||||||
if (status.equals(ATTRIBUTE.STARTED.name())) {
|
|
||||||
computingNodes++;
|
|
||||||
addWatcher(orderInt);
|
|
||||||
}
|
|
||||||
if (status.equals(ATTRIBUTE.PROCESSING.name())) {
|
|
||||||
|
|
||||||
resetWatcher(orderInt);
|
|
||||||
} else if (status.equals(ATTRIBUTE.FINISHED.name())) {
|
|
||||||
|
|
||||||
totalmessages++;
|
|
||||||
computingNodes--;
|
|
||||||
destroyWatcher(orderInt);
|
|
||||||
if (numberOfMessages > 0)
|
|
||||||
numberOfMessages--;
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Remaining " + numberOfMessages + " messages to manage");
|
|
||||||
activeMessages[orderInt] = false;
|
|
||||||
|
|
||||||
} else if (status.equals(ATTRIBUTE.FATAL_ERROR.name())) {
|
|
||||||
if (error!=null)
|
|
||||||
AnalysisLogger.getLogger().info("REPORTED FATAL_ERROR on " +nodeaddress+" : ");
|
|
||||||
AnalysisLogger.getLogger().info(error);
|
|
||||||
|
|
||||||
computingNodes--;
|
|
||||||
if (maxFailureTries <= 0) {
|
|
||||||
AnalysisLogger.getLogger().info("Too much Failures - Aborting");
|
|
||||||
destroyAllWatchers();
|
|
||||||
abort();
|
|
||||||
} else {
|
|
||||||
AnalysisLogger.getLogger().info("Failure Occurred - Now Resending Message " + orderInt);
|
|
||||||
resentMessages[orderInt] = resentMessages[orderInt] + 1;
|
|
||||||
maxFailureTries--;
|
|
||||||
// resend message
|
|
||||||
Map<String, Object> retrymessage = generateInputMessage(filenames, fileurls, outputDir, script, arguments.get(orderInt), orderInt, scope, serviceClass, serviceName, owner, remoteDir, session, configuration, deletefiles, true);
|
|
||||||
producer.sendMessage(retrymessage, QCONSTANTS.timeToLive);
|
|
||||||
AnalysisLogger.getLogger().info("Failure Occurred - Resent Message " + orderInt);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
} else
|
|
||||||
AnalysisLogger.getLogger().info("Ignoring message " + order + " with status " + status);
|
|
||||||
} else {
|
|
||||||
AnalysisLogger.getLogger().info("wrong session " + msession + " ignoring message");
|
|
||||||
// consumer.manager.session.recover();
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Error reading details ", e);
|
|
||||||
AnalysisLogger.getLogger().info("...Aborting Job...");
|
|
||||||
abort();
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,76 +0,0 @@
|
||||||
package org.gcube.dataanalysis.executor.job.management;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Timer;
|
|
||||||
import java.util.TimerTask;
|
|
||||||
|
|
||||||
import javax.jms.Message;
|
|
||||||
|
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.ATTRIBUTE;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.Producer;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS;
|
|
||||||
|
|
||||||
public class QueueWorkerWatcher {
|
|
||||||
|
|
||||||
protected int maxwaitingTime = 2*QueueJobManager.queueWatcherMaxwaitingTime;
|
|
||||||
private long lastTimeClock;
|
|
||||||
Timer watcher;
|
|
||||||
Producer producer;
|
|
||||||
Map<String, Object> message;
|
|
||||||
public boolean resent=false;
|
|
||||||
int order;
|
|
||||||
|
|
||||||
public QueueWorkerWatcher(Producer producer, Map<String, Object> message, int order) {
|
|
||||||
this.producer = producer;
|
|
||||||
this.message = message;
|
|
||||||
resent=false;
|
|
||||||
this.order = order;
|
|
||||||
|
|
||||||
watcher = new Timer();
|
|
||||||
watcher.schedule(new Controller(), 0, QCONSTANTS.refreshStatusTime);
|
|
||||||
resetTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void resetTime() {
|
|
||||||
lastTimeClock = System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void destroy() {
|
|
||||||
if (watcher != null) {
|
|
||||||
watcher.cancel();
|
|
||||||
watcher.purge();
|
|
||||||
watcher = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasResent(){
|
|
||||||
return resent;
|
|
||||||
}
|
|
||||||
|
|
||||||
private class Controller extends TimerTask {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
long t0 = System.currentTimeMillis();
|
|
||||||
AnalysisLogger.getLogger().debug("Watcher "+order+" Timing Is "+(t0 - lastTimeClock)+ " max waiting time: "+maxwaitingTime);
|
|
||||||
if ((t0 - lastTimeClock) > maxwaitingTime) {
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Watcher "+order+" Time Is Over "+(t0 - lastTimeClock));
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Watcher "+order+" Re-Sending Message "+message);
|
|
||||||
producer.sendMessage(message, QCONSTANTS.timeToLive);
|
|
||||||
// QueueJobManager.resentMessages[Integer.parseInt(""+message.get(ATTRIBUTE.ORDER.name()))]=QueueJobManager.resentMessages[Integer.parseInt(""+message.get(ATTRIBUTE.ORDER.name()))]+1;
|
|
||||||
resent = true;
|
|
||||||
AnalysisLogger.getLogger().info("Watcher "+order+" Destroying watcher");
|
|
||||||
destroy();
|
|
||||||
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
package org.gcube.dataanalysis.executor.job.management;
|
|
||||||
|
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
|
|
||||||
import org.gcube.vremanagement.executor.plugin.PluginState;
|
|
||||||
|
|
||||||
public class WorkerWatcher {
|
|
||||||
private static int maxTries = 15;
|
|
||||||
private int currentTries;
|
|
||||||
|
|
||||||
Logger logger;
|
|
||||||
SmartExecutorProxy proxy;
|
|
||||||
String excecutionIdentifier;
|
|
||||||
|
|
||||||
public WorkerWatcher(SmartExecutorProxy proxy, String excecutionIdentifier, Logger logger){
|
|
||||||
this.proxy = proxy;
|
|
||||||
this.excecutionIdentifier = excecutionIdentifier;
|
|
||||||
this.logger = logger;
|
|
||||||
currentTries = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
public PluginState getState(){
|
|
||||||
try{
|
|
||||||
return proxy.getState(excecutionIdentifier);
|
|
||||||
}catch(Exception e){
|
|
||||||
logger.error("Error in getting state: recover try number "+currentTries,e);
|
|
||||||
currentTries++;
|
|
||||||
if (currentTries>maxTries){
|
|
||||||
return PluginState.FAILED;
|
|
||||||
}
|
|
||||||
else return PluginState.RUNNING;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -28,7 +28,6 @@ import org.gcube.dataanalysis.ecoengine.utils.DatabaseFactory;
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils;
|
import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils;
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
|
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
|
||||||
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
|
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
|
||||||
import org.gcube.dataanalysis.executor.job.management.QueueJobManager;
|
|
||||||
import org.gcube.dataanalysis.executor.scripts.OSCommand;
|
import org.gcube.dataanalysis.executor.scripts.OSCommand;
|
||||||
import org.hibernate.SessionFactory;
|
import org.hibernate.SessionFactory;
|
||||||
|
|
||||||
|
@ -313,10 +312,6 @@ public class LWR extends ActorNode {
|
||||||
|
|
||||||
prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob;
|
prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob;
|
||||||
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=1;
|
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=1;
|
||||||
prevbroadcastTimePeriod = QueueJobManager.broadcastTimePeriod;
|
|
||||||
QueueJobManager.broadcastTimePeriod=4*3600000;
|
|
||||||
prevmaxNumberOfStages = QueueJobManager.maxNumberOfStages;
|
|
||||||
QueueJobManager.maxNumberOfStages=10000;
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Creating Destination Table " + destinationTable);
|
AnalysisLogger.getLogger().info("Creating Destination Table " + destinationTable);
|
||||||
try{
|
try{
|
||||||
|
@ -364,11 +359,7 @@ public class LWR extends ActorNode {
|
||||||
boolean haspostprocessed = false;
|
boolean haspostprocessed = false;
|
||||||
@Override
|
@Override
|
||||||
public void postProcess(boolean manageDuplicates, boolean manageFault) {
|
public void postProcess(boolean manageDuplicates, boolean manageFault) {
|
||||||
// D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=previousnumberofspeciesperjob;
|
|
||||||
QueueJobManager.broadcastTimePeriod=prevbroadcastTimePeriod;
|
|
||||||
QueueJobManager.maxNumberOfStages=prevmaxNumberOfStages;
|
|
||||||
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages;
|
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages;
|
||||||
// select fam, sf, bs, spc , lwr, priormeanlog10a, priorsdlog10a, priormeanb, priorsdb,note into testfam from (select distinct(spc) as a, fam, sf, bs, spc , lwr, priormeanlog10a, priorsdlog10a, priormeanb, priorsdb,note from lwr1 order by fam) as d
|
|
||||||
haspostprocessed=true;
|
haspostprocessed=true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,6 @@ import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode;
|
||||||
import org.gcube.dataanalysis.ecoengine.transducers.OccurrencePointsMerger;
|
import org.gcube.dataanalysis.ecoengine.transducers.OccurrencePointsMerger;
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
|
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
|
||||||
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
|
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
|
||||||
import org.gcube.dataanalysis.executor.job.management.QueueJobManager;
|
|
||||||
import org.hibernate.SessionFactory;
|
import org.hibernate.SessionFactory;
|
||||||
|
|
||||||
public class OccurrenceMergingNode extends ActorNode {
|
public class OccurrenceMergingNode extends ActorNode {
|
||||||
|
@ -100,10 +99,6 @@ public class OccurrenceMergingNode extends ActorNode {
|
||||||
processor.takeFullRanges();
|
processor.takeFullRanges();
|
||||||
prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob;
|
prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob;
|
||||||
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=100;
|
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=100;
|
||||||
prevbroadcastTimePeriod = QueueJobManager.broadcastTimePeriod;
|
|
||||||
QueueJobManager.broadcastTimePeriod=4*3600000;
|
|
||||||
prevmaxNumberOfStages = QueueJobManager.maxNumberOfStages;
|
|
||||||
QueueJobManager.maxNumberOfStages=100000;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -123,9 +118,6 @@ public class OccurrenceMergingNode extends ActorNode {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postProcess(boolean manageDuplicates, boolean manageFault) {
|
public void postProcess(boolean manageDuplicates, boolean manageFault) {
|
||||||
QueueJobManager.broadcastTimePeriod=prevbroadcastTimePeriod;
|
|
||||||
QueueJobManager.maxNumberOfStages=prevmaxNumberOfStages;
|
|
||||||
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages;
|
|
||||||
|
|
||||||
processor.shutdown();
|
processor.shutdown();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.gcube.dataanalysis.ecoengine.utils.DatabaseFactory;
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils;
|
import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils;
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
|
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
|
||||||
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
|
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
|
||||||
import org.gcube.dataanalysis.executor.job.management.QueueJobManager;
|
|
||||||
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.abstracts.MatcherOutput;
|
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.abstracts.MatcherOutput;
|
||||||
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.abstracts.SingleEntry;
|
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.abstracts.SingleEntry;
|
||||||
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.implementations.matchers.FuzzyMatcher;
|
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.implementations.matchers.FuzzyMatcher;
|
||||||
|
@ -387,10 +386,6 @@ public class BionymFlexibleWorkflowTransducer extends ActorNode {
|
||||||
|
|
||||||
prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob;
|
prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob;
|
||||||
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=50;
|
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=50;
|
||||||
prevbroadcastTimePeriod = QueueJobManager.broadcastTimePeriod;
|
|
||||||
QueueJobManager.broadcastTimePeriod=30*60000;
|
|
||||||
prevmaxNumberOfStages = QueueJobManager.maxNumberOfStages;
|
|
||||||
QueueJobManager.maxNumberOfStages=10000;
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Destination Table Created! Addressing " + rawnamescount + " names");
|
AnalysisLogger.getLogger().info("Destination Table Created! Addressing " + rawnamescount + " names");
|
||||||
}
|
}
|
||||||
|
@ -426,8 +421,6 @@ public class BionymFlexibleWorkflowTransducer extends ActorNode {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postProcess(boolean manageDuplicates, boolean manageFault) {
|
public void postProcess(boolean manageDuplicates, boolean manageFault) {
|
||||||
QueueJobManager.broadcastTimePeriod=prevbroadcastTimePeriod;
|
|
||||||
QueueJobManager.maxNumberOfStages=prevmaxNumberOfStages;
|
|
||||||
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages;
|
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages;
|
||||||
haspostprocessed = true;
|
haspostprocessed = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils;
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
|
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.Tuple;
|
import org.gcube.dataanalysis.ecoengine.utils.Tuple;
|
||||||
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
|
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
|
||||||
import org.gcube.dataanalysis.executor.job.management.QueueJobManager;
|
|
||||||
import org.gcube.dataanalysis.executor.scripts.OSCommand;
|
import org.gcube.dataanalysis.executor.scripts.OSCommand;
|
||||||
import org.hibernate.SessionFactory;
|
import org.hibernate.SessionFactory;
|
||||||
|
|
||||||
|
@ -347,10 +346,6 @@ public class BionymWorkflow extends ActorNode {
|
||||||
|
|
||||||
prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob;
|
prevmaxMessages=D4ScienceDistributedProcessing.maxMessagesAllowedPerJob;
|
||||||
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=1;
|
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=1;
|
||||||
prevbroadcastTimePeriod = QueueJobManager.broadcastTimePeriod;
|
|
||||||
QueueJobManager.broadcastTimePeriod=4*3600000;
|
|
||||||
prevmaxNumberOfStages = QueueJobManager.maxNumberOfStages;
|
|
||||||
QueueJobManager.maxNumberOfStages=10000;
|
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("Destination Table Created! Addressing " + rawnamescount + " names");
|
AnalysisLogger.getLogger().info("Destination Table Created! Addressing " + rawnamescount + " names");
|
||||||
|
|
||||||
|
@ -387,8 +382,6 @@ public class BionymWorkflow extends ActorNode {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postProcess(boolean manageDuplicates, boolean manageFault) {
|
public void postProcess(boolean manageDuplicates, boolean manageFault) {
|
||||||
QueueJobManager.broadcastTimePeriod=prevbroadcastTimePeriod;
|
|
||||||
QueueJobManager.maxNumberOfStages=prevmaxNumberOfStages;
|
|
||||||
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages;
|
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=prevmaxMessages;
|
||||||
haspostprocessed = true;
|
haspostprocessed = true;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue