Gianpaolo Coro 2015-03-24 11:11:03 +00:00
parent d71fae70fb
commit d58009544e
2 changed files with 117 additions and 6 deletions

View File

@ -16,6 +16,8 @@ import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQConnection;
import org.gcube.common.clients.ProxyBuilderImpl;
import org.gcube.common.resources.gcore.ServiceEndpoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.contentmanagement.blobstorage.resource.StorageObject;
import org.gcube.contentmanagement.blobstorage.service.IClient;
@ -30,6 +32,8 @@ import org.gcube.dataanalysis.executor.messagequeue.Producer;
import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS;
import org.gcube.dataanalysis.executor.messagequeue.QueueManager;
import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker;
import org.gcube.resources.discovery.client.api.DiscoveryClient;
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
import org.gcube.vremanagement.executor.api.SmartExecutor;
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
@ -37,6 +41,7 @@ import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPlugin
import org.gcube.vremanagement.executor.client.plugins.query.filter.ListEndpointDiscoveryFilter;
import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter;
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
import static org.gcube.resources.discovery.icclient.ICFactory.*;
public class QueueJobManager {
@ -273,11 +278,10 @@ public class QueueJobManager {
AnalysisLogger.getLogger().info("Contacting " + actualNumberOfNodes + " Nodes");
// set globals
setGlobalVars(serviceClass, serviceName, owner, localDir, remoteDir, outputDir, script, arguments, configuration, deletefiles);
// if not yet uploaded , upload required files
uploadFilesOnStorage(forceUpload);
// initializing queue
setQueueVariables();
// if not yet uploaded , upload required files
uploadFilesOnStorage(forceUpload);
// broadcast a message to all executors for purging previous queues
// purgeQueues();
createClientProducer();
@ -423,7 +427,7 @@ public class QueueJobManager {
AnalysisLogger.getLogger().info("Broadcasting to node " + (order + 1) + " on " + selectedEPR);
/*
/*OLD EXECUTOR CALL
// run the executor script
ExecutorCall call = new ExecutorCall(pluginName, gscope);
call.setEndpointReference(selectedEPR);
@ -512,6 +516,26 @@ public class QueueJobManager {
return numberOfEP;
}
*/
public String getQueueURL(String scope) throws Exception{
//set the scope provider first!
//<Category>Service</Category>
//<Name>MessageBroker</Name>
ScopeProvider.instance.set(scope);
SimpleQuery query = queryFor(ServiceEndpoint.class);
query.addCondition("$resource/Profile/Category/text() eq 'Service' and $resource/Profile/Name eq 'MessageBroker' ");
DiscoveryClient<ServiceEndpoint> client = clientFor(ServiceEndpoint.class);
List<ServiceEndpoint> resources = client.submit(query);
if (resources==null || resources.size()==0){
throw new Exception("No Message-Queue available in scope "+scope);
}
else{
AccessPoint ap = resources.get(0).profile().accessPoints().iterator().next();
String queue = ap.address();
AnalysisLogger.getLogger().debug("Found AMQ Url : "+queue);
return queue;
}
}
private void setQueueVariables() throws Exception {
queueName = "D4ScienceJob"; // + session;
@ -520,8 +544,8 @@ public class QueueJobManager {
// TODO Check THIS
queueURL = gscope.getServiceMap().getEndpoints(GHNContext.MSGBROKER).iterator().next().getAddress().toString();
// queueURL = gscope.getServiceMap().getEndpoints(GHNContext.MSGBROKER).iterator().next().getAddress().toString();
queueURL = getQueueURL(scope);
//tests on ecosystem

View File

@ -0,0 +1,87 @@
package org.gcube.dataanalysis.executor.tests;
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer;
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.utils.YasmeenGlobalParameters;
public class RegressionTestBiOnym1Matcher {
public static void executeWF(String[] args) throws Exception {
// Generate
AlgorithmConfiguration config = new AlgorithmConfiguration();
config.setConfigPath("./cfg/");
config.setParam("DatabaseUserName","utente");
config.setParam("DatabasePassword","d4science");
config.setParam("DatabaseURL","jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb");
config.setParam(YasmeenGlobalParameters.parserNameParam,YasmeenGlobalParameters.BuiltinParsers.SIMPLE.name());
config.setParam(YasmeenGlobalParameters.taxaAuthorityFileParam,YasmeenGlobalParameters.BuiltinDataSources.ASFIS.name());
config.setParam(YasmeenGlobalParameters.activatePreParsingProcessing,"true");
config.setParam(YasmeenGlobalParameters.useStemmedGenusAndSpecies,"false");
config.setParam(BionymFlexibleWorkflowTransducer.matcherParamPrefix+"_"+1,YasmeenGlobalParameters.BuiltinMatchers.GSAy.name());
config.setParam(BionymFlexibleWorkflowTransducer.thresholdParamPrefix+"_"+1,"0.6");
config.setParam(BionymFlexibleWorkflowTransducer.maxresultsParamPrefix+"_"+1,"10");
config.setParam(BionymFlexibleWorkflowTransducer.destinationTableParam, "taxamatchoutputlocal");
config.setParam(BionymFlexibleWorkflowTransducer.destinationTableLableParam, "taxamatchoutputlabel");
//2
config.setParam(BionymFlexibleWorkflowTransducer.originTableParam, "generic_id471e6d50_d243_4112_bc07_e22152438e5c");
config.setParam(BionymFlexibleWorkflowTransducer.rawnamesColumnParam, "field0");
config.setAgent("BIONYM");
config.setPersistencePath("./");
config.setGcubeScope( "/gcube");
// config.setGcubeScope( "/d4science.research-infrastructures.eu");
config.setParam("ServiceUserName", "gianpaolo.coro");
config.setParam("DatabaseDriver", "org.postgresql.Driver");
generate(config);
}
public static void main(String []args) throws Exception{
for (int i=0;i<1;i++){
executeWF(args);
}
}
public static void generate(AlgorithmConfiguration config) throws Exception {
D4ScienceDistributedProcessing generator = new D4ScienceDistributedProcessing(config);
generator.init();
if (generator != null) {
long t0 = System.currentTimeMillis();
TestGenerator tgs = new TestGenerator(generator);
Thread t = new Thread(tgs);
t.start();
while (generator.getStatus() < 100) {
String resLoad = generator.getResourceLoad();
String ress = generator.getResources();
String species = generator.getLoad();
System.out.println("LOAD: " + resLoad);
System.out.println("RESOURCES: " + ress);
System.out.println("SPECIES: " + species);
System.out.println("STATUS: " + generator.getStatus());
Thread.sleep(5000);
}
System.out.println("FINAL STATUS: " + generator.getStatus()+ " ELAPSED "+(System.currentTimeMillis()-t0));
}
else
System.out.println("Generator Algorithm Not Supported");
// generator.generate();
// }
}
}