From d71fae70fbc1d68ff7b67a81d223d5bf25ce4303 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Mon, 16 Feb 2015 11:12:01 +0000 Subject: [PATCH] Fixing the new version designed over SmartExecutor git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@112113 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../DistributedProcessingAgent.java | 3 + .../job/management/QueueJobManager.java | 60 ++++++++++++++----- .../job/management/RemoteJobManager.java | 26 ++++++-- 3 files changed, 68 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java index 6d6647c..774f5a6 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java @@ -96,12 +96,15 @@ public class DistributedProcessingAgent { AnalysisLogger.getLogger().debug("SCOPE: "+gscope); if (endpoints != null) { + /* List eprtList = new ArrayList(); for (String ep : endpoints) { eprtList.add(new EndpointReferenceType(new Address(ep))); } jobManager = new QueueJobManager(gscope, endpoints.size(), eprtList); + */ + jobManager = new QueueJobManager(gscope, endpoints.size(), endpoints); } else jobManager = new QueueJobManager(gscope, 1); diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java index 586825c..a784862 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java @@ -13,9 +13,9 @@ import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; -import javax.xml.ws.EndpointReference; import org.apache.activemq.ActiveMQConnection; +import org.gcube.common.clients.ProxyBuilderImpl; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.contentmanagement.blobstorage.resource.StorageObject; import org.gcube.contentmanagement.blobstorage.service.IClient; @@ -30,8 +30,13 @@ import org.gcube.dataanalysis.executor.messagequeue.Producer; import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS; import org.gcube.dataanalysis.executor.messagequeue.QueueManager; import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker; +import org.gcube.vremanagement.executor.api.SmartExecutor; +import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery; +import org.gcube.vremanagement.executor.client.plugins.query.filter.ListEndpointDiscoveryFilter; +import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; +import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; public class QueueJobManager { @@ -50,7 +55,7 @@ public class QueueJobManager { public static int queueWatcherMaxwaitingTime = QCONSTANTS.refreshStatusTime;// * 5; protected int maxFailureTries; - private static String pluginName = "generic-worker";//"GenericWorker"; + private static String pluginName = "SmartGenericWorker";//"GenericWorker"; protected String scope; protected String session; @@ -61,7 +66,7 @@ public class QueueJobManager { protected boolean abort; protected boolean shutdown; - protected List eprs; + protected List eprs; protected int activeNodes; protected int computingNodes; protected int numberOfMessages; @@ -187,7 +192,7 @@ public class QueueJobManager { init(scope, numberOfNodes); } - public QueueJobManager(String scope, int numberOfNodes, List eprs) throws Exception { + public QueueJobManager(String scope, int numberOfNodes, List eprs) throws Exception { init(scope, numberOfNodes); this.eprs = eprs; } @@ -211,7 +216,7 @@ public class QueueJobManager { long t0 = System.currentTimeMillis(); int elements = arguments.size(); - /* + /*generic-worker * int div = elements / (maxNumberOfMessages); int rest = elements % (maxNumberOfMessages); if (rest > 0) div++; if (div == 0) { div = 1; } */ session = (("" + UUID.randomUUID()).replace("-", "") + Math.random()).replace(".", ""); @@ -408,13 +413,17 @@ public class QueueJobManager { } } + @SuppressWarnings("unchecked") private void contactNodes(List tasksProxies, int order, String queueName, String queueUSER, String queuePWD, String queueURL, String queueResponse, String session, String purgeQueue) throws Exception { // generate the input map according to the arguments Map inputs = generateWorkerInput(queueName, queueUSER, queuePWD, queueURL, queueResponse, session, purgeQueue); AnalysisLogger.getLogger().info("Inputs " + inputs); // take the i-th endpoint of the executor - EndpointReferenceType selectedEPR = eprs.get(order); - AnalysisLogger.getLogger().info("Broadcasting to node " + (order + 1) + " on " + selectedEPR.getAddress()); + String selectedEPR = eprs.get(order); + AnalysisLogger.getLogger().info("Broadcasting to node " + (order + 1) + " on " + selectedEPR); + + + /* // run the executor script ExecutorCall call = new ExecutorCall(pluginName, gscope); call.setEndpointReference(selectedEPR); @@ -425,21 +434,44 @@ public class QueueJobManager { TaskProxy proxy = task.getProxy(); tasksProxies.add(new WorkerWatcher(proxy, AnalysisLogger.getLogger())); // AnalysisLogger.getLogger().info("Contacting node " + (order + 1) + " OK on " + selectedEPR); + */ + + + ExecutorPlugin runExecutorPlugin = new ExecutorPlugin(); + SmartExecutorPluginQuery runQuery = new SmartExecutorPluginQuery(runExecutorPlugin); + + /* TODO Add key_value filter here + * Tuple[] tuples = new Tuple[n]; + * + * runQuery.addConditions(pluginName, tuples); + */ + runQuery.addConditions(pluginName); + SpecificEndpointDiscoveryFilter sedf = new SpecificEndpointDiscoveryFilter(selectedEPR); + runQuery.setEndpointDiscoveryFilter(sedf); + SmartExecutorProxy proxy = new ProxyBuilderImpl(runExecutorPlugin, runQuery).build(); + + LaunchParameter launchParameter = new LaunchParameter(pluginName, inputs); + String excecutionIdentifier = proxy.launch(launchParameter); + tasksProxies.add(new WorkerWatcher(proxy, excecutionIdentifier, AnalysisLogger.getLogger())); + + AnalysisLogger.getLogger().info("Contacting node " + (order + 1) + " OK on " + selectedEPR); + } @SuppressWarnings("unchecked") - private List getFilteredEndpoints(String scopeString){ + private List getFilteredEndpoints(String scopeString){ ScopeProvider.instance.set(scopeString); ExecutorPlugin executorPlugin = new ExecutorPlugin(); SmartExecutorPluginQuery query = new SmartExecutorPluginQuery(executorPlugin); /* - Tuple[] tuples = new Tuple[1]; - tuples[0] = new Tuple("Version", "1.0.0-SNAPSHOT"); - query.addConditions("SmartGenericWorker", tuples); + add key_value filter here + * Tuple[] tuples = new Tuple[n]; + * + * runQuery.addConditions(pluginName, tuples); */ query.addConditions(pluginName); @@ -447,11 +479,7 @@ public class QueueJobManager { /* Used to add extra filter to ServiceEndpoint discovery */ query.setServiceEndpointQueryFilter(null); - /* Used to add extra filter to GCore Endpoint discovery */ - query.setEndpointDiscoveryFilter(null); - - - return query.fire(); + return query.discoverEndpoints(new ListEndpointDiscoveryFilter()); } diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/RemoteJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/RemoteJobManager.java index 365bbc0..b25067d 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/RemoteJobManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/RemoteJobManager.java @@ -9,15 +9,18 @@ import java.util.UUID; import javax.xml.ws.EndpointReference; +import org.gcube.common.clients.ProxyBuilderImpl; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.contentmanagement.blobstorage.service.IClient; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.contentmanager.storageclient.wrapper.AccessType; import org.gcube.contentmanager.storageclient.wrapper.StorageClient; import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker; +import org.gcube.vremanagement.executor.api.SmartExecutor; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery; +import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; import org.gcube.vremanagement.executor.plugin.PluginState; @@ -28,7 +31,7 @@ public class RemoteJobManager { private static String pluginName = "SmartGenericWorker"; private int actualNumberOfNodes; - private List eprs; + private List eprs; float status; boolean abort; boolean shutdown; @@ -70,7 +73,7 @@ public class RemoteJobManager { init(scope, numberOfNodes); } - public RemoteJobManager(String scope, int numberOfNodes, List eprs) throws Exception { + public RemoteJobManager(String scope, int numberOfNodes, List eprs) throws Exception { this.eprs = eprs; init(scope, numberOfNodes); } @@ -80,6 +83,7 @@ public class RemoteJobManager { boolean yetuploaded; String session; + @SuppressWarnings("unchecked") public boolean uploadAndExecute(String serviceClass, String serviceName, String owner, String localDir, String remoteDir, String outputDir, String script, List arguments, boolean deletefiles) throws Exception { boolean executeAll = false; long t0 = System.currentTimeMillis(); @@ -127,7 +131,7 @@ public class RemoteJobManager { //take the i-th endpoint of the executor - EndpointReference selectedEPR = eprs.get(i); + String selectedEPR = eprs.get(i); AnalysisLogger.getLogger().debug("Launching node " + (i + 1) + " on " + selectedEPR); //run the executor script @@ -139,8 +143,20 @@ public class RemoteJobManager { TaskProxy proxy = task.getProxy(); */ - // TODO Selected the proxy using selectedEPR - SmartExecutorProxy proxy = null; + ExecutorPlugin runExecutorPlugin = new ExecutorPlugin(); + SmartExecutorPluginQuery runQuery = new SmartExecutorPluginQuery(runExecutorPlugin); + + /* TODO Add key_value filter here + * Tuple[] tuples = new Tuple[n]; + * + * runQuery.addConditions(pluginName, tuples); + */ + runQuery.addConditions(pluginName); + SpecificEndpointDiscoveryFilter sedf = new SpecificEndpointDiscoveryFilter(selectedEPR); + runQuery.setEndpointDiscoveryFilter(sedf); + SmartExecutorProxy proxy = new ProxyBuilderImpl(runExecutorPlugin, runQuery).build(); + + LaunchParameter launchParameter = new LaunchParameter(pluginName, inputs); String excecutionIdentifier = proxy.launch(launchParameter);