Fixing the new version designed over SmartExecutor

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@112042 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-02-13 16:41:32 +00:00
parent 8c8d1c3167
commit 14cf6ad9b3
12 changed files with 182 additions and 104 deletions

View File

@ -1,7 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="output" path="target/classes"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>EcologicalEngineExecutor</name>
<name>EcologicalEngineSmartExecutor</name>
<comment></comment>
<projects>
</projects>

View File

@ -1,4 +1,4 @@
#Fri Jun 22 18:05:41 CEST 2012
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding/<project>=UTF-8

View File

@ -5,7 +5,7 @@
<Profile>
<Description>Ecological Engine Executor Library</Description>
<Class>EcologicalEngineExecutor</Class>
<Name>${artifactId}</Name>
<Name>${artifactId}</Name>
<Version>1.0.0</Version>
<Packages>
<Software>

41
pom.xml
View File

@ -8,18 +8,18 @@
<relativePath />
</parent>
<groupId>org.gcube.dataanalysis</groupId>
<artifactId>EcologicalEngineExecutor</artifactId>
<artifactId>SmartEcologicalEngineExecutor</artifactId>
<version>1.6.4-SNAPSHOT</version>
<name>ecological-engine-executor</name>
<description>ecological-engine-executor</description>
<name>Smart Ecological Engine Executor</name>
<description>Smart Ecological Engine Executor Description</description>
<properties>
<distroDirectory>${project.basedir}/distro</distroDirectory>
</properties>
<dependencies>
<dependency>
<groupId>org.gcube.resourcemanagement</groupId>
<artifactId>executor-service</artifactId>
<version>[1.2.0-SNAPSHOT,2.0.0-SNAPSHOT)</version>
<groupId>org.gcube.vremanagement</groupId>
<artifactId>smart-executor-client</artifactId>
<version>[1.0.0-SNAPSHOT,)</version>
</dependency>
<dependency>
<groupId>org.gcube.contentmanagement</groupId>
@ -36,40 +36,11 @@
<artifactId>activemq-core</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>org.gcube.core</groupId>
<artifactId>gcf</artifactId>
<version>[1.4.1,2.0.0)</version>
</dependency>
<dependency>
<groupId>org.gcube.dataanalysis</groupId>
<artifactId>ecological-engine</artifactId>
<version>[1.8.0-SNAPSHOT,2.0.0)</version>
</dependency>
<dependency>
<groupId>org.gcube.informationsystem</groupId>
<artifactId>is-client</artifactId>
<version>[1.5.1,1.6.0]</version>
</dependency>
<dependency>
<groupId>org.gcube.informationsystem</groupId>
<artifactId>is-collector-stubs</artifactId>
<version>[3.0.0-SNAPSHOT, 3.1.0)</version>
</dependency>
<dependency>
<groupId>org.gcube.core</groupId>
<artifactId>common-scope</artifactId>
<version>[1.2.0-SNAPSHOT,3.0.0)</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.jcs</groupId>
<artifactId>jcs</artifactId>
<version>1.3</version>
</dependency>-->
<!-- <dependency> <groupId>org.gcube.dataanalysis</groupId> <artifactId>generic-worker</artifactId>
<version>1.2.0-SNAPSHOT</version> <type>jar</type> <scope>compile</scope>
<exclusions> <exclusion> <artifactId>common-scope</artifactId> <groupId>org.gcube.core</groupId>
</exclusion> </exclusions> </dependency> -->
</dependencies>
<build>
<plugins>

View File

@ -5,8 +5,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.axis.message.addressing.Address;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.log4j.Logger;
import org.gcube.contentmanagement.graphtools.utils.HttpRequest;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
@ -97,10 +95,12 @@ public class DistributedProcessingAgent {
throw new Exception("Null Scope");
AnalysisLogger.getLogger().debug("SCOPE: "+gscope);
if (endpoints != null) {
List<EndpointReferenceType> eprtList = new ArrayList<EndpointReferenceType>();
for (String ep : endpoints) {
eprtList.add(new EndpointReferenceType(new Address(ep)));
}
jobManager = new QueueJobManager(gscope, endpoints.size(), eprtList);
} else
jobManager = new QueueJobManager(gscope, 1);

View File

@ -13,15 +13,9 @@ import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.xml.ws.EndpointReference;
import org.apache.activemq.ActiveMQConnection;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.gcube.common.core.contexts.GHNContext;
import org.gcube.common.core.informationsystem.client.AtomicCondition;
import org.gcube.common.core.informationsystem.client.ISClient;
import org.gcube.common.core.informationsystem.client.RPDocument;
import org.gcube.common.core.informationsystem.client.queries.WSResourceQuery;
import org.gcube.common.core.scope.GCUBEScope;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.contentmanagement.blobstorage.resource.StorageObject;
import org.gcube.contentmanagement.blobstorage.service.IClient;
@ -36,9 +30,8 @@ import org.gcube.dataanalysis.executor.messagequeue.Producer;
import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS;
import org.gcube.dataanalysis.executor.messagequeue.QueueManager;
import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker;
import org.gcube.vremanagement.executor.stubs.ExecutorCall;
import org.gcube.vremanagement.executor.stubs.TaskCall;
import org.gcube.vremanagement.executor.stubs.TaskProxy;
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery;
public class QueueJobManager {
@ -60,7 +53,6 @@ public class QueueJobManager {
private static String pluginName = "generic-worker";//"GenericWorker";
protected String scope;
protected GCUBEScope gscope;
protected String session;
protected boolean yetstopped;
@ -69,7 +61,7 @@ public class QueueJobManager {
protected boolean abort;
protected boolean shutdown;
protected List<EndpointReferenceType> eprs;
protected List<EndpointReference> eprs;
protected int activeNodes;
protected int computingNodes;
protected int numberOfMessages;
@ -108,7 +100,6 @@ public class QueueJobManager {
private void resetAllVars() {
scope = null;
gscope = null;
yetstopped = false;
messagesresent = false;
@ -182,7 +173,6 @@ public class QueueJobManager {
resetAllVars();
// init scope variables
this.scope = scope;
gscope = GCUBEScope.getScope(scope);
// introduce a session
// initialize flags
shutdown = false;
@ -197,7 +187,7 @@ public class QueueJobManager {
init(scope, numberOfNodes);
}
public QueueJobManager(String scope, int numberOfNodes, List<EndpointReferenceType> eprs) throws Exception {
public QueueJobManager(String scope, int numberOfNodes, List<EndpointReference> eprs) throws Exception {
init(scope, numberOfNodes);
this.eprs = eprs;
}
@ -438,6 +428,40 @@ public class QueueJobManager {
}
@SuppressWarnings("unchecked")
private List<EndpointReference> getFilteredEndpoints(String scopeString){
ScopeProvider.instance.set(scopeString);
ExecutorPlugin executorPlugin = new ExecutorPlugin();
SmartExecutorPluginQuery query = new SmartExecutorPluginQuery(executorPlugin);
/*
Tuple<String, String>[] tuples = new Tuple[1];
tuples[0] = new Tuple<String, String>("Version", "1.0.0-SNAPSHOT");
query.addConditions("SmartGenericWorker", tuples);
*/
query.addConditions(pluginName);
/* Used to add extra filter to ServiceEndpoint discovery */
query.setServiceEndpointQueryFilter(null);
/* Used to add extra filter to GCore Endpoint discovery */
query.setEndpointDiscoveryFilter(null);
return query.fire();
}
private int findNodes(String scopeString) throws Exception {
eprs = getFilteredEndpoints(scopeString);
actualNumberOfNodes = eprs.size();
return actualNumberOfNodes;
}
/*
private int findNodes(String scopeString) throws Exception {
AnalysisLogger.getLogger().debug("SCOPE:"+scopeString);
GCUBEScope scope = GCUBEScope.getScope(scopeString);
@ -459,12 +483,19 @@ public class QueueJobManager {
actualNumberOfNodes = eprs.size();
return numberOfEP;
}
*/
private void setQueueVariables() throws Exception {
queueName = "D4ScienceJob"; // + session;
queueResponse = queueName + "Response"+session;
//general scope
// TODO Check THIS
queueURL = gscope.getServiceMap().getEndpoints(GHNContext.MSGBROKER).iterator().next().getAddress().toString();
//tests on ecosystem
//TODO: delete this!
// queueURL = "tcp://ui.grid.research-infrastructures.eu:6166";

View File

@ -7,29 +7,28 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.gcube.common.core.contexts.GHNContext;
import org.gcube.common.core.informationsystem.client.AtomicCondition;
import org.gcube.common.core.informationsystem.client.ISClient;
import org.gcube.common.core.informationsystem.client.RPDocument;
import org.gcube.common.core.informationsystem.client.queries.WSResourceQuery;
import org.gcube.common.core.scope.GCUBEScope;
import javax.xml.ws.EndpointReference;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.contentmanagement.blobstorage.service.IClient;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.contentmanager.storageclient.wrapper.AccessType;
import org.gcube.contentmanager.storageclient.wrapper.StorageClient;
import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker;
import org.gcube.vremanagement.executor.stubs.ExecutorCall;
import org.gcube.vremanagement.executor.stubs.TaskCall;
import org.gcube.vremanagement.executor.stubs.TaskProxy;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery;
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
import org.gcube.vremanagement.executor.plugin.PluginState;
public class RemoteJobManager {
private static String pluginName = "ExecutorScript";
// TODO Chek here:
//private static String pluginName = "ExecutorScript";
private static String pluginName = "SmartGenericWorker";
private int actualNumberOfNodes;
private GCUBEScope gscope;
private List<EndpointReferenceType> eprs;
private List<EndpointReference> eprs;
float status;
boolean abort;
boolean shutdown;
@ -54,8 +53,7 @@ public class RemoteJobManager {
public void init(String scope, int numberOfNodes) throws Exception {
this.scope = scope;
gscope = GCUBEScope.getScope(scope);
AnalysisLogger.getLogger().debug("Using the following scope for this computation:"+gscope);
AnalysisLogger.getLogger().debug("Using the following scope for this computation: "+ scope);
shutdown = false;
yetuploaded = false;
if (eprs == null)
@ -72,7 +70,7 @@ public class RemoteJobManager {
init(scope, numberOfNodes);
}
public RemoteJobManager(String scope, int numberOfNodes, List<EndpointReferenceType> eprs) throws Exception {
public RemoteJobManager(String scope, int numberOfNodes, List<EndpointReference> eprs) throws Exception {
this.eprs = eprs;
init(scope, numberOfNodes);
}
@ -126,16 +124,27 @@ public class RemoteJobManager {
//generate the input map according to the arguments
Map<String, Object> inputs = generateInput(filenames, fileurls, outputDir, script, argum, i, scope, serviceClass, serviceName, owner, remoteDir,session,deletefiles);
AnalysisLogger.getLogger().debug("-> Owner: " + owner + " ServiceClass: " + serviceClass + " ServiceName:" + serviceName + " remoteDir:" + remoteDir);
//take the i-th endpoint of the executor
EndpointReferenceType selectedEPR = eprs.get(i);
EndpointReference selectedEPR = eprs.get(i);
AnalysisLogger.getLogger().debug("Launching node " + (i + 1) + " on " + selectedEPR);
//run the executor script
/*
ExecutorCall call = new ExecutorCall(pluginName, gscope);
call.setEndpointReference(selectedEPR);
TaskCall task = null;
task = call.launch(inputs);
TaskProxy proxy = task.getProxy();
tasksProxies.add(new WorkerWatcher(proxy, AnalysisLogger.getLogger()));
*/
// TODO Selected the proxy using selectedEPR
SmartExecutorProxy proxy = null;
LaunchParameter launchParameter = new LaunchParameter(pluginName, inputs);
String excecutionIdentifier = proxy.launch(launchParameter);
tasksProxies.add(new WorkerWatcher(proxy, excecutionIdentifier, AnalysisLogger.getLogger()));
AnalysisLogger.getLogger().debug("Launching node " + (i + 1) + " OK on " + selectedEPR);
//add the task to the list in order to reuse it
@ -151,8 +160,14 @@ public class RemoteJobManager {
int nworkers = tasksProxies.size();
int i=0;
while (i < nworkers) {
WorkerWatcher proxy = tasksProxies.get(i);
String state = proxy.getState();
WorkerWatcher proxy = tasksProxies.get(i);
/* ---- */
PluginState enumState = proxy.getState();
String state = enumState.toString();
/* ----- */
AnalysisLogger.getLogger().debug("REMOTE JOB MANAGER-> STATE " + state );
//control for aborted computation
abort = ((state == null) || state.equals("FAILED") || (!state.equals("DONE") && !state.equals("RUNNING")));
@ -208,6 +223,37 @@ public class RemoteJobManager {
shutdown = true;
}
@SuppressWarnings("unchecked")
private List<EndpointReference> getFilteredEndpoints(String scopeString){
ScopeProvider.instance.set(scopeString);
ExecutorPlugin executorPlugin = new ExecutorPlugin();
SmartExecutorPluginQuery query = new SmartExecutorPluginQuery(executorPlugin);
/*
Tuple<String, String>[] tuples = new Tuple[1];
tuples[0] = new Tuple<String, String>("Version", "1.0.0-SNAPSHOT");
query.addConditions("SmartGenericWorker", tuples);
*/
query.addConditions(pluginName);
/* Used to add extra filter to ServiceEndpoint discovery */
query.setServiceEndpointQueryFilter(null);
/* Used to add extra filter to GCore Endpoint discovery */
query.setEndpointDiscoveryFilter(null);
return query.fire();
}
private int findNodes(String scopeString) throws Exception {
return getFilteredEndpoints(scopeString).size();
}
/*
private int findNodes(String scopeString) throws Exception {
GCUBEScope scope = GCUBEScope.getScope(scopeString);
ISClient client = GHNContext.getImplementation(ISClient.class);
@ -227,7 +273,8 @@ public class RemoteJobManager {
return numberOfEP;
}
*/
private Map<String, Object> generateInput(Object filenames, Object fileurls, String outputDir, String script, String argum, int i, String scope, String serviceClass, String serviceName, String owner, String remoteDir,String session,boolean deletefiles) {
Map<String, Object> inputs = new HashMap<String, Object>();
inputs.put("FILE_NAMES", filenames);

View File

@ -1,35 +1,34 @@
package org.gcube.dataanalysis.executor.job.management;
import org.apache.log4j.Logger;
import org.gcube.vremanagement.executor.stubs.TaskProxy;
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
import org.gcube.vremanagement.executor.plugin.PluginState;
public class WorkerWatcher {
private static int maxTries = 15;
private int currentTries;
private static String runningState = "RUNNING";
private static String failedState = "FAILED";
Logger logger;
SmartExecutorProxy proxy;
String excecutionIdentifier;
TaskProxy proxy;
public WorkerWatcher(TaskProxy proxy, Logger logger){
public WorkerWatcher(SmartExecutorProxy proxy, String excecutionIdentifier, Logger logger){
this.proxy = proxy;
this.excecutionIdentifier = excecutionIdentifier;
this.logger = logger;
currentTries = 1;
}
public String getState(){
String state ="";
public PluginState getState(){
try{
proxy.synchronize();
state = proxy.getState();
return state;
return proxy.getState(excecutionIdentifier);
}catch(Exception e){
logger.error("Error in getting state: recover try number "+currentTries,e);
currentTries++;
if (currentTries>maxTries){
return failedState;
return PluginState.FAILED;
}
else return runningState;
else return PluginState.RUNNING;
}
}

View File

@ -10,7 +10,6 @@ import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import org.gcube.common.core.utils.logging.GCUBELog;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS;
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
@ -29,7 +28,6 @@ import org.gcube.dataanalysis.ecoengine.utils.DatabaseFactory;
import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils;
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgent;
import org.gcube.dataanalysis.executor.job.management.QueueJobManager;
import org.gcube.dataanalysis.executor.scripts.OSCommand;
import org.hibernate.SessionFactory;

View File

@ -7,11 +7,11 @@ import java.io.FileReader;
import java.io.FileWriter;
import java.io.InputStreamReader;
import org.gcube.common.core.utils.logging.GCUBELog;
import org.slf4j.Logger;
public class OSCommand {
public static String ExecuteGetLine(String cmd, GCUBELog logger) {
public static String ExecuteGetLine(String cmd, Logger logger) {
Process process = null;
String lastline = "";

View File

@ -3,6 +3,7 @@ package org.gcube.dataanalysis.executor.tests;
import java.util.ArrayList;
import java.util.List;
/*
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.gcube.common.core.contexts.GHNContext;
import org.gcube.common.core.informationsystem.client.AtomicCondition;
@ -10,11 +11,12 @@ import org.gcube.common.core.informationsystem.client.ISClient;
import org.gcube.common.core.informationsystem.client.RPDocument;
import org.gcube.common.core.informationsystem.client.queries.WSResourceQuery;
import org.gcube.common.core.scope.GCUBEScope;
*/
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
public class TestGetRunningExecutor {
/* TODO Rewrite
private static int findNodes(String scopeString) throws Exception {
AnalysisLogger.getLogger().debug("*****");
GCUBEScope scope = GCUBEScope.getScope(scopeString);
@ -38,9 +40,11 @@ public class TestGetRunningExecutor {
return numberOfEP;
}
*/
/* TODO Rewrite
private static int findRunningNodes(String scopeString) throws Exception {
AnalysisLogger.getLogger().debug("*****");
GCUBEScope scope = GCUBEScope.getScope(scopeString);
System.out.println("BROKER:"+scope.getServiceMap().getEndpoints(GHNContext.MSGBROKER).iterator().next().getAddress().toString());
@ -61,12 +65,16 @@ public class TestGetRunningExecutor {
AnalysisLogger.getLogger().debug("Found " + numberOfEP + " endpoints");
AnalysisLogger.getLogger().debug("-> "+ eprs);
return numberOfEP;
}
}*/
public static void main(String[] args) throws Exception{
String scope = "/gcube";
AnalysisLogger.setLogger("./cfg/ALog.properties");
findNodes(scope);
/*
* TODO Revisit
* findNodes(scope);
*/
// findRunningNodes(scope);
}