diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java index a784862..7ed53e8 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java @@ -16,6 +16,8 @@ import javax.jms.MessageListener; import org.apache.activemq.ActiveMQConnection; import org.gcube.common.clients.ProxyBuilderImpl; +import org.gcube.common.resources.gcore.ServiceEndpoint; +import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.contentmanagement.blobstorage.resource.StorageObject; import org.gcube.contentmanagement.blobstorage.service.IClient; @@ -30,6 +32,8 @@ import org.gcube.dataanalysis.executor.messagequeue.Producer; import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS; import org.gcube.dataanalysis.executor.messagequeue.QueueManager; import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker; +import org.gcube.resources.discovery.client.api.DiscoveryClient; +import org.gcube.resources.discovery.client.queries.api.SimpleQuery; import org.gcube.vremanagement.executor.api.SmartExecutor; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin; @@ -37,6 +41,7 @@ import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPlugin import org.gcube.vremanagement.executor.client.plugins.query.filter.ListEndpointDiscoveryFilter; import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter; import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy; +import static org.gcube.resources.discovery.icclient.ICFactory.*; public class QueueJobManager { @@ -273,11 +278,10 @@ public class QueueJobManager { AnalysisLogger.getLogger().info("Contacting " + actualNumberOfNodes + " Nodes"); // set globals setGlobalVars(serviceClass, serviceName, owner, localDir, remoteDir, outputDir, script, arguments, configuration, deletefiles); - // if not yet uploaded , upload required files - uploadFilesOnStorage(forceUpload); - // initializing queue setQueueVariables(); + // if not yet uploaded , upload required files + uploadFilesOnStorage(forceUpload); // broadcast a message to all executors for purging previous queues // purgeQueues(); createClientProducer(); @@ -423,7 +427,7 @@ public class QueueJobManager { AnalysisLogger.getLogger().info("Broadcasting to node " + (order + 1) + " on " + selectedEPR); - /* + /*OLD EXECUTOR CALL // run the executor script ExecutorCall call = new ExecutorCall(pluginName, gscope); call.setEndpointReference(selectedEPR); @@ -512,6 +516,26 @@ public class QueueJobManager { return numberOfEP; } */ + public String getQueueURL(String scope) throws Exception{ + //set the scope provider first! + //Service + //MessageBroker + + ScopeProvider.instance.set(scope); + SimpleQuery query = queryFor(ServiceEndpoint.class); + query.addCondition("$resource/Profile/Category/text() eq 'Service' and $resource/Profile/Name eq 'MessageBroker' "); + DiscoveryClient client = clientFor(ServiceEndpoint.class); + List resources = client.submit(query); + if (resources==null || resources.size()==0){ + throw new Exception("No Message-Queue available in scope "+scope); + } + else{ + AccessPoint ap = resources.get(0).profile().accessPoints().iterator().next(); + String queue = ap.address(); + AnalysisLogger.getLogger().debug("Found AMQ Url : "+queue); + return queue; + } + } private void setQueueVariables() throws Exception { queueName = "D4ScienceJob"; // + session; @@ -520,8 +544,8 @@ public class QueueJobManager { // TODO Check THIS - queueURL = gscope.getServiceMap().getEndpoints(GHNContext.MSGBROKER).iterator().next().getAddress().toString(); - +// queueURL = gscope.getServiceMap().getEndpoints(GHNContext.MSGBROKER).iterator().next().getAddress().toString(); + queueURL = getQueueURL(scope); //tests on ecosystem diff --git a/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestBiOnym1Matcher.java b/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestBiOnym1Matcher.java new file mode 100644 index 0000000..8d13174 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestBiOnym1Matcher.java @@ -0,0 +1,87 @@ +package org.gcube.dataanalysis.executor.tests; + +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; +import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer; +import org.gcube.dataanalysis.executor.nodes.transducers.bionym.utils.YasmeenGlobalParameters; + +public class RegressionTestBiOnym1Matcher { + + public static void executeWF(String[] args) throws Exception { + // Generate + AlgorithmConfiguration config = new AlgorithmConfiguration(); + config.setConfigPath("./cfg/"); + + config.setParam("DatabaseUserName","utente"); + config.setParam("DatabasePassword","d4science"); + config.setParam("DatabaseURL","jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb"); + + config.setParam(YasmeenGlobalParameters.parserNameParam,YasmeenGlobalParameters.BuiltinParsers.SIMPLE.name()); + config.setParam(YasmeenGlobalParameters.taxaAuthorityFileParam,YasmeenGlobalParameters.BuiltinDataSources.ASFIS.name()); + config.setParam(YasmeenGlobalParameters.activatePreParsingProcessing,"true"); + config.setParam(YasmeenGlobalParameters.useStemmedGenusAndSpecies,"false"); + + + config.setParam(BionymFlexibleWorkflowTransducer.matcherParamPrefix+"_"+1,YasmeenGlobalParameters.BuiltinMatchers.GSAy.name()); + config.setParam(BionymFlexibleWorkflowTransducer.thresholdParamPrefix+"_"+1,"0.6"); + config.setParam(BionymFlexibleWorkflowTransducer.maxresultsParamPrefix+"_"+1,"10"); + + config.setParam(BionymFlexibleWorkflowTransducer.destinationTableParam, "taxamatchoutputlocal"); + config.setParam(BionymFlexibleWorkflowTransducer.destinationTableLableParam, "taxamatchoutputlabel"); + //2 + config.setParam(BionymFlexibleWorkflowTransducer.originTableParam, "generic_id471e6d50_d243_4112_bc07_e22152438e5c"); + config.setParam(BionymFlexibleWorkflowTransducer.rawnamesColumnParam, "field0"); + + + config.setAgent("BIONYM"); + + config.setPersistencePath("./"); + config.setGcubeScope( "/gcube"); +// config.setGcubeScope( "/d4science.research-infrastructures.eu"); + config.setParam("ServiceUserName", "gianpaolo.coro"); + + config.setParam("DatabaseDriver", "org.postgresql.Driver"); + + generate(config); + + } + + public static void main(String []args) throws Exception{ + + for (int i=0;i<1;i++){ + executeWF(args); + } + } + + + public static void generate(AlgorithmConfiguration config) throws Exception { + + D4ScienceDistributedProcessing generator = new D4ScienceDistributedProcessing(config); + generator.init(); + + if (generator != null) { + long t0 = System.currentTimeMillis(); + TestGenerator tgs = new TestGenerator(generator); + Thread t = new Thread(tgs); + t.start(); + while (generator.getStatus() < 100) { + + String resLoad = generator.getResourceLoad(); + String ress = generator.getResources(); + String species = generator.getLoad(); + System.out.println("LOAD: " + resLoad); + System.out.println("RESOURCES: " + ress); + System.out.println("SPECIES: " + species); + System.out.println("STATUS: " + generator.getStatus()); + Thread.sleep(5000); + } + System.out.println("FINAL STATUS: " + generator.getStatus()+ " ELAPSED "+(System.currentTimeMillis()-t0)); + + } + else + System.out.println("Generator Algorithm Not Supported"); + +// generator.generate(); +// } + } +}