Fixing the new version designed over SmartExecutor

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@112113 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2015-02-16 11:12:01 +00:00
parent 14cf6ad9b3
commit d71fae70fb
3 changed files with 68 additions and 21 deletions

View File

@ -96,12 +96,15 @@ public class DistributedProcessingAgent {
AnalysisLogger.getLogger().debug("SCOPE: "+gscope);
if (endpoints != null) {
/*
List<EndpointReferenceType> eprtList = new ArrayList<EndpointReferenceType>();
for (String ep : endpoints) {
eprtList.add(new EndpointReferenceType(new Address(ep)));
}
jobManager = new QueueJobManager(gscope, endpoints.size(), eprtList);
*/
jobManager = new QueueJobManager(gscope, endpoints.size(), endpoints);
} else
jobManager = new QueueJobManager(gscope, 1);

View File

@ -13,9 +13,9 @@ import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.xml.ws.EndpointReference;
import org.apache.activemq.ActiveMQConnection;
import org.gcube.common.clients.ProxyBuilderImpl;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.contentmanagement.blobstorage.resource.StorageObject;
import org.gcube.contentmanagement.blobstorage.service.IClient;
@ -30,8 +30,13 @@ import org.gcube.dataanalysis.executor.messagequeue.Producer;
import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS;
import org.gcube.dataanalysis.executor.messagequeue.QueueManager;
import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker;
import org.gcube.vremanagement.executor.api.SmartExecutor;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery;
import org.gcube.vremanagement.executor.client.plugins.query.filter.ListEndpointDiscoveryFilter;
import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter;
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
public class QueueJobManager {
@ -50,7 +55,7 @@ public class QueueJobManager {
public static int queueWatcherMaxwaitingTime = QCONSTANTS.refreshStatusTime;// * 5;
protected int maxFailureTries;
private static String pluginName = "generic-worker";//"GenericWorker";
private static String pluginName = "SmartGenericWorker";//"GenericWorker";
protected String scope;
protected String session;
@ -61,7 +66,7 @@ public class QueueJobManager {
protected boolean abort;
protected boolean shutdown;
protected List<EndpointReference> eprs;
protected List<String> eprs;
protected int activeNodes;
protected int computingNodes;
protected int numberOfMessages;
@ -187,7 +192,7 @@ public class QueueJobManager {
init(scope, numberOfNodes);
}
public QueueJobManager(String scope, int numberOfNodes, List<EndpointReference> eprs) throws Exception {
public QueueJobManager(String scope, int numberOfNodes, List<String> eprs) throws Exception {
init(scope, numberOfNodes);
this.eprs = eprs;
}
@ -211,7 +216,7 @@ public class QueueJobManager {
long t0 = System.currentTimeMillis();
int elements = arguments.size();
/*
/*generic-worker
* int div = elements / (maxNumberOfMessages); int rest = elements % (maxNumberOfMessages); if (rest > 0) div++; if (div == 0) { div = 1; }
*/
session = (("" + UUID.randomUUID()).replace("-", "") + Math.random()).replace(".", "");
@ -408,13 +413,17 @@ public class QueueJobManager {
}
}
@SuppressWarnings("unchecked")
private void contactNodes(List<WorkerWatcher> tasksProxies, int order, String queueName, String queueUSER, String queuePWD, String queueURL, String queueResponse, String session, String purgeQueue) throws Exception {
// generate the input map according to the arguments
Map<String, Object> inputs = generateWorkerInput(queueName, queueUSER, queuePWD, queueURL, queueResponse, session, purgeQueue);
AnalysisLogger.getLogger().info("Inputs " + inputs);
// take the i-th endpoint of the executor
EndpointReferenceType selectedEPR = eprs.get(order);
AnalysisLogger.getLogger().info("Broadcasting to node " + (order + 1) + " on " + selectedEPR.getAddress());
String selectedEPR = eprs.get(order);
AnalysisLogger.getLogger().info("Broadcasting to node " + (order + 1) + " on " + selectedEPR);
/*
// run the executor script
ExecutorCall call = new ExecutorCall(pluginName, gscope);
call.setEndpointReference(selectedEPR);
@ -425,21 +434,44 @@ public class QueueJobManager {
TaskProxy proxy = task.getProxy();
tasksProxies.add(new WorkerWatcher(proxy, AnalysisLogger.getLogger()));
// AnalysisLogger.getLogger().info("Contacting node " + (order + 1) + " OK on " + selectedEPR);
*/
ExecutorPlugin runExecutorPlugin = new ExecutorPlugin();
SmartExecutorPluginQuery runQuery = new SmartExecutorPluginQuery(runExecutorPlugin);
/* TODO Add key_value filter here
* Tuple<String, String>[] tuples = new Tuple[n];
*
* runQuery.addConditions(pluginName, tuples);
*/
runQuery.addConditions(pluginName);
SpecificEndpointDiscoveryFilter sedf = new SpecificEndpointDiscoveryFilter(selectedEPR);
runQuery.setEndpointDiscoveryFilter(sedf);
SmartExecutorProxy proxy = new ProxyBuilderImpl<SmartExecutor, SmartExecutorProxy>(runExecutorPlugin, runQuery).build();
LaunchParameter launchParameter = new LaunchParameter(pluginName, inputs);
String excecutionIdentifier = proxy.launch(launchParameter);
tasksProxies.add(new WorkerWatcher(proxy, excecutionIdentifier, AnalysisLogger.getLogger()));
AnalysisLogger.getLogger().info("Contacting node " + (order + 1) + " OK on " + selectedEPR);
}
@SuppressWarnings("unchecked")
private List<EndpointReference> getFilteredEndpoints(String scopeString){
private List<String> getFilteredEndpoints(String scopeString){
ScopeProvider.instance.set(scopeString);
ExecutorPlugin executorPlugin = new ExecutorPlugin();
SmartExecutorPluginQuery query = new SmartExecutorPluginQuery(executorPlugin);
/*
Tuple<String, String>[] tuples = new Tuple[1];
tuples[0] = new Tuple<String, String>("Version", "1.0.0-SNAPSHOT");
query.addConditions("SmartGenericWorker", tuples);
add key_value filter here
* Tuple<String, String>[] tuples = new Tuple[n];
*
* runQuery.addConditions(pluginName, tuples);
*/
query.addConditions(pluginName);
@ -447,11 +479,7 @@ public class QueueJobManager {
/* Used to add extra filter to ServiceEndpoint discovery */
query.setServiceEndpointQueryFilter(null);
/* Used to add extra filter to GCore Endpoint discovery */
query.setEndpointDiscoveryFilter(null);
return query.fire();
return query.discoverEndpoints(new ListEndpointDiscoveryFilter());
}

View File

@ -9,15 +9,18 @@ import java.util.UUID;
import javax.xml.ws.EndpointReference;
import org.gcube.common.clients.ProxyBuilderImpl;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.contentmanagement.blobstorage.service.IClient;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.contentmanager.storageclient.wrapper.AccessType;
import org.gcube.contentmanager.storageclient.wrapper.StorageClient;
import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker;
import org.gcube.vremanagement.executor.api.SmartExecutor;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery;
import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter;
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
import org.gcube.vremanagement.executor.plugin.PluginState;
@ -28,7 +31,7 @@ public class RemoteJobManager {
private static String pluginName = "SmartGenericWorker";
private int actualNumberOfNodes;
private List<EndpointReference> eprs;
private List<String> eprs;
float status;
boolean abort;
boolean shutdown;
@ -70,7 +73,7 @@ public class RemoteJobManager {
init(scope, numberOfNodes);
}
public RemoteJobManager(String scope, int numberOfNodes, List<EndpointReference> eprs) throws Exception {
public RemoteJobManager(String scope, int numberOfNodes, List<String> eprs) throws Exception {
this.eprs = eprs;
init(scope, numberOfNodes);
}
@ -80,6 +83,7 @@ public class RemoteJobManager {
boolean yetuploaded;
String session;
@SuppressWarnings("unchecked")
public boolean uploadAndExecute(String serviceClass, String serviceName, String owner, String localDir, String remoteDir, String outputDir, String script, List<String> arguments, boolean deletefiles) throws Exception {
boolean executeAll = false;
long t0 = System.currentTimeMillis();
@ -127,7 +131,7 @@ public class RemoteJobManager {
//take the i-th endpoint of the executor
EndpointReference selectedEPR = eprs.get(i);
String selectedEPR = eprs.get(i);
AnalysisLogger.getLogger().debug("Launching node " + (i + 1) + " on " + selectedEPR);
//run the executor script
@ -139,8 +143,20 @@ public class RemoteJobManager {
TaskProxy proxy = task.getProxy();
*/
// TODO Selected the proxy using selectedEPR
SmartExecutorProxy proxy = null;
ExecutorPlugin runExecutorPlugin = new ExecutorPlugin();
SmartExecutorPluginQuery runQuery = new SmartExecutorPluginQuery(runExecutorPlugin);
/* TODO Add key_value filter here
* Tuple<String, String>[] tuples = new Tuple[n];
*
* runQuery.addConditions(pluginName, tuples);
*/
runQuery.addConditions(pluginName);
SpecificEndpointDiscoveryFilter sedf = new SpecificEndpointDiscoveryFilter(selectedEPR);
runQuery.setEndpointDiscoveryFilter(sedf);
SmartExecutorProxy proxy = new ProxyBuilderImpl<SmartExecutor, SmartExecutorProxy>(runExecutorPlugin, runQuery).build();
LaunchParameter launchParameter = new LaunchParameter(pluginName, inputs);
String excecutionIdentifier = proxy.launch(launchParameter);