Moved generic Worker to Dataminer
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@131883 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
7a10924e63
commit
81d79089e3
|
@ -0,0 +1,26 @@
|
||||||
|
package org.gcube.dataanalysis.ecoengine.interfaces;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.datatypes.DatabaseType;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.DatabaseParameters;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.interfaces.Transducerer;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.utils.ResourceFactory;
|
||||||
|
|
||||||
|
|
||||||
|
public abstract class StandardLocalInfraAlgorithm extends StandardLocalExternalAlgorithm{
|
||||||
|
|
||||||
|
public boolean sendNotificationEmail(){
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -42,7 +42,7 @@ public class D4ScienceDistributedProcessing implements Generator {
|
||||||
agent.compute();
|
agent.compute();
|
||||||
distributedModel.postProcess(agent.hasResentMessages(),false);
|
distributedModel.postProcess(agent.hasResentMessages(),false);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
distributedModel.postProcess(false,true);
|
try{distributedModel.postProcess(false,true);}catch(Exception ee){}
|
||||||
AnalysisLogger.getLogger().error("ERROR: An Error occurred ", e);
|
AnalysisLogger.getLogger().error("ERROR: An Error occurred ", e);
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -107,7 +107,7 @@ public class DistributedProcessingAgentWPS {
|
||||||
subdivisiondiv = 1;
|
subdivisiondiv = 1;
|
||||||
|
|
||||||
executeWork(leftSetNumberOfElements, rightSetNumberOfElements, 0, subdivisiondiv, deletefiles, forceUpload);
|
executeWork(leftSetNumberOfElements, rightSetNumberOfElements, 0, subdivisiondiv, deletefiles, forceUpload);
|
||||||
|
AnalysisLogger.getLogger().debug("The WPS job has been completely executed");
|
||||||
if (jobManager.wasAborted()) {
|
if (jobManager.wasAborted()) {
|
||||||
logger.debug("Warning: Job was aborted");
|
logger.debug("Warning: Job was aborted");
|
||||||
// distributionModel.postProcess(false,true);
|
// distributionModel.postProcess(false,true);
|
||||||
|
@ -120,7 +120,7 @@ public class DistributedProcessingAgentWPS {
|
||||||
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("ERROR: An Error occurred ", e);
|
AnalysisLogger.getLogger().debug("The WPS job got an error "+e.getLocalizedMessage());
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -12,15 +12,14 @@ import java.io.PrintStream;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||||
import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler;
|
import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler;
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode;
|
import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode;
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm;
|
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm;
|
||||||
|
|
||||||
public class GenericWorker extends StandardLocalExternalAlgorithm{
|
public class GenericWorker extends StandardLocalInfraAlgorithm{
|
||||||
|
|
||||||
private static String genericWorkerDir = "/genericworker/";
|
private static String genericWorkerDir = "/genericworker/";
|
||||||
|
|
||||||
|
@ -67,10 +66,6 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{
|
||||||
File tempDir = null ;
|
File tempDir = null ;
|
||||||
try {
|
try {
|
||||||
Handler.activateProtocol();
|
Handler.activateProtocol();
|
||||||
|
|
||||||
if (session == null)
|
|
||||||
|
|
||||||
|
|
||||||
// invoke the algorithm
|
// invoke the algorithm
|
||||||
logger.debug("GenericWorker-> Creating algorithm " + algorithmClass);
|
logger.debug("GenericWorker-> Creating algorithm " + algorithmClass);
|
||||||
ActorNode node = (ActorNode) Class.forName(algorithmClass).newInstance();
|
ActorNode node = (ActorNode) Class.forName(algorithmClass).newInstance();
|
||||||
|
@ -97,6 +92,12 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{
|
||||||
config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt");
|
config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt");
|
||||||
|
|
||||||
String log = new String(baos.toByteArray(), StandardCharsets.UTF_8);
|
String log = new String(baos.toByteArray(), StandardCharsets.UTF_8);
|
||||||
|
//manage known issues
|
||||||
|
log=log.replace(".XMLStreamException: Unbound namespace URI", "Known Except");
|
||||||
|
log=log.replace("java.io.IOException: Error copying XML", "Known Except");
|
||||||
|
log=log.replace("java.io.FileNotFoundException: /home/gcube/tomcat/tmp/ConfigurationFile", "Known Except");
|
||||||
|
log=log.replace("java.io.FileNotFoundException: payload was not made available for this dataset", "Known Except");
|
||||||
|
|
||||||
logger.debug("GenericWorker-> Execution Fulllog" );
|
logger.debug("GenericWorker-> Execution Fulllog" );
|
||||||
logger.debug("GenericWorker-> " + log);
|
logger.debug("GenericWorker-> " + log);
|
||||||
logger.debug("GenericWorker-> Script executed! " );
|
logger.debug("GenericWorker-> Script executed! " );
|
||||||
|
@ -105,19 +106,25 @@ public class GenericWorker extends StandardLocalExternalAlgorithm{
|
||||||
logger.debug("GenericWorker-> deleted sandbox file: "+del );
|
logger.debug("GenericWorker-> deleted sandbox file: "+del );
|
||||||
logger.debug("GenericWorker-> all done");
|
logger.debug("GenericWorker-> all done");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (log.contains("Exception:")){
|
if (log.contains("Exception:")){
|
||||||
outputParameters.put(OutputParameter, TASK_FAILURE);
|
outputParameters.put(OutputParameter, TASK_FAILURE);
|
||||||
String cutLog = URLEncoder.encode(log, "UTF-8");
|
String cutLog = URLEncoder.encode(log, "UTF-8");
|
||||||
int maxlen = 20240;
|
int maxlen = 20240;
|
||||||
|
int startidx = log.indexOf("Exception:");
|
||||||
|
log = log.substring(startidx);
|
||||||
if (log.length()>maxlen)
|
if (log.length()>maxlen)
|
||||||
cutLog = cutLog.substring(0,maxlen)+"...";
|
cutLog = cutLog.substring(0,maxlen)+"...";
|
||||||
|
|
||||||
outputParameters.put("Log", cutLog);
|
outputParameters.put("Log", cutLog);
|
||||||
logger.debug("GenericWorker-> Failure!");
|
logger.debug("GenericWorker-> Failure!");
|
||||||
}
|
}
|
||||||
else
|
else{
|
||||||
outputParameters.put(OutputParameter, TASK_SUCCESS);
|
outputParameters.put(OutputParameter, TASK_SUCCESS);
|
||||||
logger.debug("GenericWorker-> Success!");
|
logger.debug("GenericWorker-> Success!");
|
||||||
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
outputParameters.put(OutputParameter, TASK_FAILURE);
|
outputParameters.put(OutputParameter, TASK_FAILURE);
|
||||||
outputParameters.put("Log", e.getLocalizedMessage());
|
outputParameters.put("Log", e.getLocalizedMessage());
|
||||||
|
|
|
@ -1,6 +1,10 @@
|
||||||
package org.gcube.dataanalysis.executor.job.management;
|
package org.gcube.dataanalysis.executor.job.management;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools;
|
import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools;
|
||||||
|
@ -11,12 +15,13 @@ import com.thoughtworks.xstream.XStream;
|
||||||
public class GenericWorkerCaller {
|
public class GenericWorkerCaller {
|
||||||
|
|
||||||
|
|
||||||
public static String getGenericWorkerCall(String algorithm, String session, AlgorithmConfiguration configuration,int leftSetIndex,int rightSetIndex,int leftElements,int rightElements, boolean isduplicate,boolean deleteTemporaryFiles) throws Exception{
|
public static String getGenericWorkerCall(String algorithm, String session, AlgorithmConfiguration configuration,int leftSetIndex,int rightSetIndex,int leftElements,int rightElements, boolean isduplicate,boolean deleteTemporaryFiles, String callTemplate) throws Exception{
|
||||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
|
||||||
String xmlconfig = new XStream().toXML(configuration);
|
String xmlconfig = new XStream().toXML(configuration);
|
||||||
xmlconfig = xmlconfig.replace("\n", "").replace("\t", "");
|
xmlconfig = xmlconfig.replace("\n", "").replace("\t", "");
|
||||||
xmlconfig = xmlconfig.replaceAll(">[ ]+<", "> <");
|
xmlconfig = xmlconfig.replaceAll(">[ ]+<", "> <");
|
||||||
|
|
||||||
|
/*
|
||||||
AnalysisLogger.getLogger().debug("CONFIG of Task:");
|
AnalysisLogger.getLogger().debug("CONFIG of Task:");
|
||||||
AnalysisLogger.getLogger().debug("algorithm: "+algorithm);
|
AnalysisLogger.getLogger().debug("algorithm: "+algorithm);
|
||||||
AnalysisLogger.getLogger().debug("leftSetIndex: "+leftSetIndex);
|
AnalysisLogger.getLogger().debug("leftSetIndex: "+leftSetIndex);
|
||||||
|
@ -26,10 +31,11 @@ public class GenericWorkerCaller {
|
||||||
AnalysisLogger.getLogger().debug("session: "+session);
|
AnalysisLogger.getLogger().debug("session: "+session);
|
||||||
AnalysisLogger.getLogger().debug("isduplicate: "+isduplicate);
|
AnalysisLogger.getLogger().debug("isduplicate: "+isduplicate);
|
||||||
AnalysisLogger.getLogger().debug("deleteTemporaryFiles: "+deleteTemporaryFiles);
|
AnalysisLogger.getLogger().debug("deleteTemporaryFiles: "+deleteTemporaryFiles);
|
||||||
|
*/
|
||||||
File is = new File(classLoader.getResource("WPSGWTemplate.xml").getFile());
|
//String call=FileTools.loadString(is.getAbsolutePath(), "UTF-8");
|
||||||
String call=FileTools.loadString(is.getAbsolutePath(), "UTF-8");
|
String call = new String(callTemplate.getBytes());
|
||||||
AnalysisLogger.getLogger().debug("call template : "+call);
|
// String call = callTemplate;
|
||||||
|
//AnalysisLogger.getLogger().debug("call template : "+call);
|
||||||
call = call.replace("#"+GenericWorker.AlgorithmClassParameter+"#", algorithm);
|
call = call.replace("#"+GenericWorker.AlgorithmClassParameter+"#", algorithm);
|
||||||
call = call.replace("#"+GenericWorker.LeftSetStartIndexParameter+"#", ""+leftSetIndex);
|
call = call.replace("#"+GenericWorker.LeftSetStartIndexParameter+"#", ""+leftSetIndex);
|
||||||
call = call.replace("#"+GenericWorker.NumberOfLeftElementsToProcessParameter+"#", ""+leftElements);
|
call = call.replace("#"+GenericWorker.NumberOfLeftElementsToProcessParameter+"#", ""+leftElements);
|
||||||
|
|
|
@ -1,63 +1,23 @@
|
||||||
package org.gcube.dataanalysis.executor.job.management;
|
package org.gcube.dataanalysis.executor.job.management;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.BufferedReader;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Timer;
|
|
||||||
import java.util.TimerTask;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
import javax.jms.ExceptionListener;
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
import javax.jms.Message;
|
|
||||||
import javax.jms.MessageListener;
|
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
|
||||||
import org.gcube.common.clients.ProxyBuilderImpl;
|
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint;
|
|
||||||
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
|
|
||||||
import org.gcube.common.scope.api.ScopeProvider;
|
|
||||||
import org.gcube.contentmanagement.blobstorage.resource.StorageObject;
|
|
||||||
import org.gcube.contentmanagement.blobstorage.service.IClient;
|
|
||||||
import org.gcube.contentmanagement.graphtools.utils.HttpRequest;
|
import org.gcube.contentmanagement.graphtools.utils.HttpRequest;
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||||
import org.gcube.contentmanager.storageclient.wrapper.AccessType;
|
|
||||||
import org.gcube.contentmanager.storageclient.wrapper.MemoryType;
|
|
||||||
import org.gcube.contentmanager.storageclient.wrapper.StorageClient;
|
|
||||||
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.Operations;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.ATTRIBUTE;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.Consumer;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.Producer;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.QCONSTANTS;
|
|
||||||
import org.gcube.dataanalysis.executor.messagequeue.QueueManager;
|
|
||||||
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer;
|
|
||||||
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.utils.YasmeenGlobalParameters;
|
|
||||||
import org.gcube.dataanalysis.executor.scripts.ScriptIOWorker;
|
|
||||||
import org.gcube.dataanalysis.executor.util.InfraRetrieval;
|
import org.gcube.dataanalysis.executor.util.InfraRetrieval;
|
||||||
import org.gcube.resources.discovery.client.api.DiscoveryClient;
|
|
||||||
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
|
|
||||||
import org.gcube.vremanagement.executor.api.SmartExecutor;
|
|
||||||
import org.gcube.vremanagement.executor.api.types.LaunchParameter;
|
|
||||||
import org.gcube.vremanagement.executor.client.plugins.ExecutorPlugin;
|
|
||||||
import org.gcube.vremanagement.executor.client.plugins.query.SmartExecutorPluginQuery;
|
|
||||||
import org.gcube.vremanagement.executor.client.plugins.query.filter.ListEndpointDiscoveryFilter;
|
|
||||||
import org.gcube.vremanagement.executor.client.plugins.query.filter.SpecificEndpointDiscoveryFilter;
|
|
||||||
import org.gcube.vremanagement.executor.client.proxies.SmartExecutorProxy;
|
|
||||||
|
|
||||||
import com.thoughtworks.xstream.XStream;
|
|
||||||
|
|
||||||
import static org.gcube.resources.discovery.icclient.ICFactory.*;
|
|
||||||
|
|
||||||
public class WPSJobManager {
|
public class WPSJobManager {
|
||||||
|
|
||||||
static final int pollingTime = 5000;
|
static final int pollingTime = 5000;
|
||||||
static final int maxTrialsPerThread = 3;
|
static final long maxTaskTime= 12*60000; //allowed max 12 hours per task
|
||||||
|
|
||||||
|
|
||||||
int overallFailures = 0;
|
int overallFailures = 0;
|
||||||
|
@ -68,6 +28,31 @@ public class WPSJobManager {
|
||||||
boolean stopThreads = false;
|
boolean stopThreads = false;
|
||||||
boolean hasResentMessages = false;
|
boolean hasResentMessages = false;
|
||||||
|
|
||||||
|
|
||||||
|
public static String getCallTemplate(){
|
||||||
|
String call = null;
|
||||||
|
try{
|
||||||
|
InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("templates/WPSGWTemplate2.xml");
|
||||||
|
AnalysisLogger.getLogger().debug("WPSJobManager->GW template Input stream is null "+(is==null));
|
||||||
|
BufferedReader in = new BufferedReader(new InputStreamReader(is));
|
||||||
|
String line = null;
|
||||||
|
StringBuilder vud = new StringBuilder();
|
||||||
|
|
||||||
|
while ((line = in.readLine()) != null) {
|
||||||
|
vud.append(line + "\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
in.close();
|
||||||
|
|
||||||
|
call = vud.toString();
|
||||||
|
|
||||||
|
|
||||||
|
}catch(Exception e){
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return call;
|
||||||
|
}
|
||||||
|
|
||||||
final public synchronized void incrementOverallFailures() {
|
final public synchronized void incrementOverallFailures() {
|
||||||
overallFailures++;
|
overallFailures++;
|
||||||
}
|
}
|
||||||
|
@ -103,9 +88,10 @@ public class WPSJobManager {
|
||||||
int rightSetIndex;
|
int rightSetIndex;
|
||||||
int leftElements;
|
int leftElements;
|
||||||
int rightElements;
|
int rightElements;
|
||||||
|
String callTemplate;
|
||||||
|
int maxTrialsPerThread;
|
||||||
|
|
||||||
|
public TasksWatcher(String algorithm, String username, String token, String wpsHost, int wpsPort, String session, int taskNumber, AlgorithmConfiguration configuration, int leftSetIndex, int rightSetIndex, int leftElements, int rightElements,String callTemplate, int maxTrialsPerThread) {
|
||||||
public TasksWatcher(String algorithm, String username, String token, String wpsHost, int wpsPort, String session, int taskNumber, AlgorithmConfiguration configuration, int leftSetIndex, int rightSetIndex, int leftElements, int rightElements) {
|
|
||||||
this.algorithm = algorithm;
|
this.algorithm = algorithm;
|
||||||
this.token = token;
|
this.token = token;
|
||||||
this.wpsHost = wpsHost;
|
this.wpsHost = wpsHost;
|
||||||
|
@ -118,6 +104,8 @@ public class WPSJobManager {
|
||||||
this.leftElements = leftElements;
|
this.leftElements = leftElements;
|
||||||
this.rightSetIndex = rightSetIndex;
|
this.rightSetIndex = rightSetIndex;
|
||||||
this.rightElements = rightElements;
|
this.rightElements = rightElements;
|
||||||
|
this.callTemplate=callTemplate;
|
||||||
|
this.maxTrialsPerThread=maxTrialsPerThread;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -125,11 +113,12 @@ public class WPSJobManager {
|
||||||
String url = "http://" + wpsHost + ":" + wpsPort + "/wps/WebProcessingService";
|
String url = "http://" + wpsHost + ":" + wpsPort + "/wps/WebProcessingService";
|
||||||
|
|
||||||
boolean deleteTemporaryFiles = true;
|
boolean deleteTemporaryFiles = true;
|
||||||
AnalysisLogger.getLogger().debug("Task Number : " + taskNumber+" GO!");
|
AnalysisLogger.getLogger().debug("WPSJobManager->Task Number : " + taskNumber+" GO!");
|
||||||
try {
|
try {
|
||||||
String algorithmCall = GenericWorkerCaller.getGenericWorkerCall(algorithm, session, configuration, leftSetIndex, rightSetIndex, leftElements, rightElements, isduplicate, deleteTemporaryFiles);
|
AnalysisLogger.getLogger().debug("WPSJobManager->Invoking the GW to start");
|
||||||
|
String algorithmCall = GenericWorkerCaller.getGenericWorkerCall(algorithm, session, configuration, leftSetIndex, rightSetIndex, leftElements, rightElements, isduplicate, deleteTemporaryFiles,callTemplate);
|
||||||
String result = HttpRequest.PostXmlString(url, wpsHost, wpsPort, new LinkedHashMap<String, String>(), username, token, algorithmCall);
|
String result = HttpRequest.PostXmlString(url, wpsHost, wpsPort, new LinkedHashMap<String, String>(), username, token, algorithmCall);
|
||||||
// AnalysisLogger.getLogger().debug("Result: " + result);
|
AnalysisLogger.getLogger().debug("WPSJobManager->GW starting Output " + result.replace("\n", ""));
|
||||||
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
boolean failure = false;
|
boolean failure = false;
|
||||||
|
@ -140,11 +129,13 @@ public class WPSJobManager {
|
||||||
failure = true;
|
failure = true;
|
||||||
|
|
||||||
String statusLocation = "";
|
String statusLocation = "";
|
||||||
|
long taskTimeCounter = 0;
|
||||||
while (!success && !isStopped() && (!failure) ) { //while !success and failure
|
while (!success && !isStopped() && (!failure) ) { //while !success and failure
|
||||||
if (result == null || result.contains(GenericWorker.TASK_FAILURE) || result.contains("Exception"))
|
if (result == null || result.contains(GenericWorker.TASK_FAILURE))
|
||||||
failure = true;
|
failure = true;
|
||||||
|
else if (taskTimeCounter>maxTaskTime){
|
||||||
|
failure = true;
|
||||||
|
}
|
||||||
else if (result.contains(GenericWorker.TASK_SUCCESS))
|
else if (result.contains(GenericWorker.TASK_SUCCESS))
|
||||||
success = true;
|
success = true;
|
||||||
else if (result.contains("<wps:ProcessAccepted>Process Accepted</wps:ProcessAccepted>")) {
|
else if (result.contains("<wps:ProcessAccepted>Process Accepted</wps:ProcessAccepted>")) {
|
||||||
|
@ -156,6 +147,7 @@ public class WPSJobManager {
|
||||||
result= "";
|
result= "";
|
||||||
} else {
|
} else {
|
||||||
Thread.sleep(pollingTime);
|
Thread.sleep(pollingTime);
|
||||||
|
taskTimeCounter+=pollingTime;
|
||||||
result = HttpRequest.sendGetRequest(statusLocation, "");
|
result = HttpRequest.sendGetRequest(statusLocation, "");
|
||||||
// AnalysisLogger.getLogger().debug("Result in location: " + result);
|
// AnalysisLogger.getLogger().debug("Result in location: " + result);
|
||||||
}
|
}
|
||||||
|
@ -172,14 +164,14 @@ public class WPSJobManager {
|
||||||
if (failure)
|
if (failure)
|
||||||
{
|
{
|
||||||
exitstatus = GenericWorker.TASK_FAILURE;
|
exitstatus = GenericWorker.TASK_FAILURE;
|
||||||
AnalysisLogger.getLogger().debug("Task Number "+taskNumber+" - Failure cause: " + result);
|
AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Failure cause: " + result);
|
||||||
}
|
}
|
||||||
// AnalysisLogger.getLogger().debug("Process execution finished: " + exitstatus);
|
// AnalysisLogger.getLogger().debug("Process execution finished: " + exitstatus);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
AnalysisLogger.getLogger().debug(e);
|
AnalysisLogger.getLogger().debug(e);
|
||||||
AnalysisLogger.getLogger().debug("Task Number "+taskNumber+" - Process exception: " + e.getLocalizedMessage());
|
AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Process exception: " + e.getLocalizedMessage());
|
||||||
exitstatus = GenericWorker.TASK_FAILURE;
|
exitstatus = GenericWorker.TASK_FAILURE;
|
||||||
|
|
||||||
}finally{
|
}finally{
|
||||||
|
@ -196,7 +188,7 @@ public class WPSJobManager {
|
||||||
trials++;
|
trials++;
|
||||||
hasResentTrue();
|
hasResentTrue();
|
||||||
duplicate = true;
|
duplicate = true;
|
||||||
AnalysisLogger.getLogger().debug("Task Number "+taskNumber+" - Retrying n."+trials);
|
AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Retrying n."+trials);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,7 +197,7 @@ public class WPSJobManager {
|
||||||
else
|
else
|
||||||
incrementOverallFailures();
|
incrementOverallFailures();
|
||||||
|
|
||||||
AnalysisLogger.getLogger().debug("Task Number "+taskNumber+" - Finished: " + exitstatus);
|
AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Finished: " + exitstatus);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -230,18 +222,20 @@ public class WPSJobManager {
|
||||||
return hasResentMessages;
|
return hasResentMessages;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void uploadAndExecuteChunkized(AlgorithmConfiguration configuration, String algorithmClass, List<String> arguments, String session) {
|
public void uploadAndExecuteChunkized(AlgorithmConfiguration configuration, String algorithmClass, List<String> arguments, String session) throws Exception{
|
||||||
ExecutorService executor = null;
|
ExecutorService executor = null;
|
||||||
try{
|
try{
|
||||||
int numberofservices = 1;
|
int numberofservices = 1;
|
||||||
|
String callTemplate = getCallTemplate();
|
||||||
AnalysisLogger.getLogger().debug("Estimating the number of services");
|
|
||||||
|
AnalysisLogger.getLogger().debug("WPSJobManager->Estimating the number of services");
|
||||||
|
|
||||||
List<String> wpsservices = InfraRetrieval.retrieveService("DataMiner", configuration.getGcubeScope());
|
List<String> wpsservices = InfraRetrieval.retrieveService("DataMiner", configuration.getGcubeScope());
|
||||||
|
|
||||||
if (wpsservices==null || wpsservices.size()==0)
|
if (wpsservices==null || wpsservices.size()==0){
|
||||||
throw new Exception ("No Dataminer GCore Endpoint found in the VRE "+configuration.getGcubeScope());
|
AnalysisLogger.getLogger().debug("WPSJobManager->Error: No DataMiner GCore Endpoints found!");
|
||||||
|
throw new Exception ("No DataMinerWorkers GCore Endpoint found in the VRE "+configuration.getGcubeScope());
|
||||||
|
}
|
||||||
List<String> differentServices = new ArrayList<String>();
|
List<String> differentServices = new ArrayList<String>();
|
||||||
for (String service:wpsservices){
|
for (String service:wpsservices){
|
||||||
|
|
||||||
|
@ -251,19 +245,23 @@ public class WPSJobManager {
|
||||||
differentServices.add(service);
|
differentServices.add(service);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
numberofservices = differentServices.size();
|
numberofservices = differentServices.size();
|
||||||
|
|
||||||
AnalysisLogger.getLogger().debug("WPSJobManager->Number of dataminer services "+numberofservices);
|
AnalysisLogger.getLogger().debug("WPSJobManager->Number of dataminer services "+numberofservices);
|
||||||
int parallelisation = numberofservices*2;
|
int parallelisation = numberofservices*2;
|
||||||
AnalysisLogger.getLogger().debug("WPSJobManager->Number of parallel processes (parallelisation) : "+parallelisation);
|
AnalysisLogger.getLogger().debug("WPSJobManager->Number of parallel processes (parallelisation) : "+parallelisation);
|
||||||
|
|
||||||
List<String> wpshosts = InfraRetrieval.retrieveAddresses("DataAnalysis",configuration.getGcubeScope(),"-----");
|
//List<String> wpshosts = InfraRetrieval.retrieveAddresses("DataAnalysis",configuration.getGcubeScope(),"-----");
|
||||||
|
List<String> wpshosts = InfraRetrieval.retrieveServiceAddress("DataAnalysis","DataMinerWorkers",configuration.getGcubeScope(),"noexclusion");
|
||||||
if (wpshosts==null || wpshosts.size()==0)
|
|
||||||
throw new Exception ("WPSJobManager->No Dataminer Service Endpoint found in the VRE "+configuration.getGcubeScope());
|
|
||||||
|
|
||||||
|
if (wpshosts==null || wpshosts.size()==0){
|
||||||
|
AnalysisLogger.getLogger().debug("WPSJobManager->Error: No DataMinerWorkers Service Endpoints found at all!");
|
||||||
|
throw new Exception ("WPSJobManager->No Dataminer Workers Service Endpoint found in the VRE - DataMinerWorkers Resource is required in the VRE"+configuration.getGcubeScope());
|
||||||
|
}
|
||||||
|
|
||||||
String wpshost = wpshosts.get(0);
|
String wpshost = wpshosts.get(0);
|
||||||
|
|
||||||
wpshost = wpshost.substring(wpshost.indexOf("/")+2);
|
wpshost = wpshost.substring(wpshost.indexOf("/")+2);
|
||||||
//String wpshostAddress = wpshost.substring(0,wpshost.indexOf(":"));
|
//String wpshostAddress = wpshost.substring(0,wpshost.indexOf(":"));
|
||||||
String wpshostAddress = wpshost.substring(0,wpshost.indexOf("/"));
|
String wpshostAddress = wpshost.substring(0,wpshost.indexOf("/"));
|
||||||
|
@ -285,18 +283,21 @@ public class WPSJobManager {
|
||||||
int leftNum = Integer.parseInt(lfnlnr[1]);
|
int leftNum = Integer.parseInt(lfnlnr[1]);
|
||||||
int rightOff = Integer.parseInt(lfnlnr[2]);
|
int rightOff = Integer.parseInt(lfnlnr[2]);
|
||||||
int rightNum = Integer.parseInt(lfnlnr[3]);
|
int rightNum = Integer.parseInt(lfnlnr[3]);
|
||||||
|
int maxTrials = parallelisation;
|
||||||
TasksWatcher watcher = new TasksWatcher(algorithmClass,
|
TasksWatcher watcher = new TasksWatcher(algorithmClass,
|
||||||
configuration.getGcubeUserName(),
|
configuration.getGcubeUserName(),
|
||||||
configuration.getGcubeToken(),wpshost,wpsport,session,taskNumber,configuration, leftOff, rightOff,leftNum,rightNum);
|
configuration.getGcubeToken(),
|
||||||
|
wpshost,wpsport,session,taskNumber,configuration, leftOff, rightOff,leftNum,rightNum,callTemplate, maxTrials);
|
||||||
|
|
||||||
executor.execute(watcher);
|
executor.execute(watcher);
|
||||||
AnalysisLogger.getLogger().debug("WPSJobManager->Task number "+taskNumber+" launched!");
|
AnalysisLogger.getLogger().debug("WPSJobManager->Task number "+taskNumber+" launched with arguments: "+argument);
|
||||||
taskNumber++;
|
taskNumber++;
|
||||||
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
int njobs = overallFailures+overallSuccess;
|
int njobs = overallFailures+overallSuccess;
|
||||||
int pnjobs =njobs;
|
int pnjobs =njobs;
|
||||||
|
|
||||||
while (njobs<overallTasks){
|
while (njobs<overallTasks){
|
||||||
Thread.sleep(pollingTime);
|
Thread.sleep(pollingTime);
|
||||||
float percFailure = (float)(overallFailures)/(float)overallTasks;
|
float percFailure = (float)(overallFailures)/(float)overallTasks;
|
||||||
|
@ -307,17 +308,20 @@ public class WPSJobManager {
|
||||||
if (pnjobs<njobs){
|
if (pnjobs<njobs){
|
||||||
AnalysisLogger.getLogger().debug("WPSJobManager->Number of finished jobs "+njobs+" of "+overallTasks);
|
AnalysisLogger.getLogger().debug("WPSJobManager->Number of finished jobs "+njobs+" of "+overallTasks);
|
||||||
AnalysisLogger.getLogger().debug("WPSJobManager->Number of errors "+overallFailures+" - perc failure "+percFailure);
|
AnalysisLogger.getLogger().debug("WPSJobManager->Number of errors "+overallFailures+" - perc failure "+percFailure);
|
||||||
|
pnjobs=njobs;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
AnalysisLogger.getLogger().debug("WPSJobManager->Overall computation finished");
|
AnalysisLogger.getLogger().debug("WPSJobManager->Overall computation finished");
|
||||||
}catch(Exception e){
|
}catch(Exception e){
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
finally{
|
finally{
|
||||||
if (executor!=null){
|
if (executor!=null){
|
||||||
AnalysisLogger.getLogger().debug("WPSJobManager->Shutting down the executions");
|
AnalysisLogger.getLogger().debug("WPSJobManager->Shutting down the executions");
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
|
AnalysisLogger.getLogger().debug("WPSJobManager->Shut down completed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,13 +13,13 @@ import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType;
|
||||||
import org.gcube.dataanalysis.ecoengine.datatypes.ServiceType;
|
import org.gcube.dataanalysis.ecoengine.datatypes.ServiceType;
|
||||||
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
|
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
|
||||||
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.ServiceParameters;
|
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.ServiceParameters;
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm;
|
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm;
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.ZipTools;
|
import org.gcube.dataanalysis.ecoengine.utils.ZipTools;
|
||||||
import org.gcube.dataanalysis.executor.util.DataTransferer;
|
import org.gcube.dataanalysis.executor.util.DataTransferer;
|
||||||
import org.gcube.dataanalysis.executor.util.InfraRetrieval;
|
import org.gcube.dataanalysis.executor.util.InfraRetrieval;
|
||||||
|
|
||||||
|
|
||||||
public class WebApplicationPublisher extends StandardLocalExternalAlgorithm{
|
public class WebApplicationPublisher extends StandardLocalInfraAlgorithm{
|
||||||
// private static String MainPageParam = "MainPage";
|
// private static String MainPageParam = "MainPage";
|
||||||
private static String FileParam = "ZipFile";
|
private static String FileParam = "ZipFile";
|
||||||
private String transferServiceAddress = "";
|
private String transferServiceAddress = "";
|
||||||
|
|
|
@ -8,10 +8,10 @@ import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||||
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType;
|
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType;
|
||||||
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
|
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
|
||||||
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
|
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm;
|
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm;
|
||||||
import org.gcube.dataanalysis.executor.util.RScriptsManager;
|
import org.gcube.dataanalysis.executor.util.RScriptsManager;
|
||||||
|
|
||||||
public class SGVMS_Interpolation extends StandardLocalExternalAlgorithm {
|
public class SGVMS_Interpolation extends StandardLocalInfraAlgorithm {
|
||||||
|
|
||||||
private static int maxPoints = 10000;
|
private static int maxPoints = 10000;
|
||||||
public enum methodEnum { cHs, SL};
|
public enum methodEnum { cHs, SL};
|
||||||
|
|
|
@ -11,13 +11,15 @@ import java.util.UUID;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools;
|
import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType;
|
||||||
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
|
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
|
||||||
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm;
|
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm;
|
||||||
import org.gcube.dataanalysis.ecoengine.utils.ZipTools;
|
import org.gcube.dataanalysis.ecoengine.utils.ZipTools;
|
||||||
import org.gcube.dataanalysis.executor.util.LocalRScriptsManager;
|
import org.gcube.dataanalysis.executor.util.LocalRScriptsManager;
|
||||||
import org.gcube.dataanalysis.executor.util.StorageUtils;
|
import org.gcube.dataanalysis.executor.util.StorageUtils;
|
||||||
|
|
||||||
public abstract class GenericRScript extends StandardLocalExternalAlgorithm {
|
public abstract class GenericRScript extends StandardLocalInfraAlgorithm {
|
||||||
|
|
||||||
// FIXED part
|
// FIXED part
|
||||||
protected HashMap<String, String> outputValues = new HashMap<String, String>();
|
protected HashMap<String, String> outputValues = new HashMap<String, String>();
|
||||||
|
@ -154,6 +156,12 @@ public abstract class GenericRScript extends StandardLocalExternalAlgorithm {
|
||||||
AnalysisLogger.getLogger().debug(e);
|
AnalysisLogger.getLogger().debug(e);
|
||||||
AnalysisLogger.getLogger().debug("Could not delete sandbox folder " + folder.getAbsolutePath());
|
AnalysisLogger.getLogger().debug("Could not delete sandbox folder " + folder.getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (Rlog != null) {
|
||||||
|
File logFile = saveLogFile(Rlog);
|
||||||
|
output.put("Log", new PrimitiveType(File.class.getName(), logFile, PrimitiveTypes.FILE, "LogFile", "Log of the computation"));
|
||||||
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
||||||
if (Rlog != null) {
|
if (Rlog != null) {
|
||||||
|
@ -168,6 +176,18 @@ public abstract class GenericRScript extends StandardLocalExternalAlgorithm {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected File saveLogFile(String Rlog) throws Exception {
|
||||||
|
String uuid = "" + UUID.randomUUID();
|
||||||
|
AnalysisLogger.getLogger().debug("Writing the logs of the execution");
|
||||||
|
File logfile = new File(config.getPersistencePath(), "RLOG" + uuid + ".txt");
|
||||||
|
|
||||||
|
FileWriter fw = new FileWriter(logfile);
|
||||||
|
fw.write(Rlog);
|
||||||
|
fw.close();
|
||||||
|
AnalysisLogger.getLogger().debug("Written in " + logfile);
|
||||||
|
return logfile;
|
||||||
|
}
|
||||||
|
|
||||||
protected String generateRemoteLogFile(String Rlog) throws Exception {
|
protected String generateRemoteLogFile(String Rlog) throws Exception {
|
||||||
String uuid = "" + UUID.randomUUID();
|
String uuid = "" + UUID.randomUUID();
|
||||||
AnalysisLogger.getLogger().debug("Writing the logs of the execution");
|
AnalysisLogger.getLogger().debug("Writing the logs of the execution");
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
package org.gcube.dataanalysis.executor.tests;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.gcube.common.scope.api.ScopeProvider;
|
||||||
|
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
||||||
|
import org.gcube.dataanalysis.executor.job.management.GenericWorker;
|
||||||
|
import org.gcube.dataanalysis.executor.job.management.WPSJobManager;
|
||||||
|
import org.gcube.dataanalysis.executor.job.management.WPSJobManager.TasksWatcher;
|
||||||
|
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer;
|
||||||
|
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.utils.YasmeenGlobalParameters;
|
||||||
|
|
||||||
|
public class TestAquaMapsJobs {
|
||||||
|
|
||||||
|
public static AlgorithmConfiguration buildTestConfiguration(){
|
||||||
|
AlgorithmConfiguration config = new AlgorithmConfiguration();
|
||||||
|
config.setConfigPath("./cfg/");
|
||||||
|
AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile);
|
||||||
|
config.setParam("DatabaseUserName", "utente");
|
||||||
|
config.setParam("DatabasePassword", "d4science");
|
||||||
|
config.setParam("DatabaseURL", "jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb");
|
||||||
|
|
||||||
|
config.setParam("EnvelopeTable","http://data.d4science.org/b2hOQ1phWEVGcUxDZWZucS9UQkJmWG9JT2JDNUlTbjhHbWJQNStIS0N6Yz0");
|
||||||
|
config.setParam("CsquarecodesTable","http://data.d4science.org/d2JpZUZ4VkRvVTlmcElhcUlmQUpWdE1mOGZTZ0xhNHlHbWJQNStIS0N6Yz0");
|
||||||
|
config.setParam("DistributionTableLabel","hspec");
|
||||||
|
config.setParam("OccurrencePointsTable","http://data.d4science.org/ZGVCYjJaWTFmaGhmcElhcUlmQUpWb2NoYVFvclBZaG5HbWJQNStIS0N6Yz0");
|
||||||
|
|
||||||
|
config.setAgent("AQUAMAPS_SUITABLE");
|
||||||
|
config.setPersistencePath("./");
|
||||||
|
config.setGcubeScope("/gcube/devNext/NextNext");
|
||||||
|
// config.setGcubeScope("/gcube/devsec/devVRE");
|
||||||
|
config.setParam("ServiceUserName", "gianpaolo.coro");
|
||||||
|
config.setParam("DatabaseDriver", "org.postgresql.Driver");
|
||||||
|
config.setGcubeUserName("gianpaolo.coro");
|
||||||
|
config.setGcubeToken("f9d49d76-cd60-48ed-9f8e-036bcc1fc045-98187548");
|
||||||
|
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main1(String[] args) throws Exception {
|
||||||
|
|
||||||
|
String host = "dataminer1-devnext.d4science.org";
|
||||||
|
String session = "12345";
|
||||||
|
int port = 80;
|
||||||
|
String algorithm = "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer";
|
||||||
|
AlgorithmConfiguration config = buildTestConfiguration();
|
||||||
|
|
||||||
|
WPSJobManager manager = new WPSJobManager();
|
||||||
|
TasksWatcher taskWatcher = manager.new TasksWatcher(algorithm, config.getGcubeUserName(), config.getGcubeToken(), host, port, session, 1, config, 1, 1, 1, 1,"",1);
|
||||||
|
Thread t = new Thread(taskWatcher);
|
||||||
|
t.start();
|
||||||
|
|
||||||
|
while (taskWatcher.exitstatus.equals(GenericWorker.TASK_UNDEFINED)){
|
||||||
|
Thread.sleep(1000);
|
||||||
|
System.out.print(".");
|
||||||
|
}
|
||||||
|
|
||||||
|
AnalysisLogger.getLogger().debug("Task 1 terminated with output "+taskWatcher.exitstatus );
|
||||||
|
//taskWatcher.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
AlgorithmConfiguration config = buildTestConfiguration();
|
||||||
|
String algorithm = "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer";
|
||||||
|
ScopeProvider.instance.set(config.getGcubeScope());
|
||||||
|
WPSJobManager jobmanager = new WPSJobManager();
|
||||||
|
//int nArguments = 100;
|
||||||
|
int nArguments = 1;
|
||||||
|
List<String> arguments = new ArrayList<String>();
|
||||||
|
for (int i=1;i<=nArguments;i++){
|
||||||
|
String argument = "0 178204 0 11";
|
||||||
|
arguments.add(argument);
|
||||||
|
}
|
||||||
|
String sessionID ="1234";
|
||||||
|
jobmanager.uploadAndExecuteChunkized(config, algorithm, arguments,sessionID);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,7 +23,9 @@ public class TestWPSJobs {
|
||||||
config.setParam("DatabaseURL", "jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb");
|
config.setParam("DatabaseURL", "jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb");
|
||||||
|
|
||||||
config.setParam(YasmeenGlobalParameters.parserNameParam, YasmeenGlobalParameters.BuiltinParsers.SIMPLE.name());
|
config.setParam(YasmeenGlobalParameters.parserNameParam, YasmeenGlobalParameters.BuiltinParsers.SIMPLE.name());
|
||||||
config.setParam(YasmeenGlobalParameters.taxaAuthorityFileParam, YasmeenGlobalParameters.BuiltinDataSources.WORMS_PISCES.name());
|
config.setParam(YasmeenGlobalParameters.taxaAuthorityFileParam, YasmeenGlobalParameters.BuiltinDataSources.FISHBASE.name());
|
||||||
|
config.setParam(YasmeenGlobalParameters.performanceParam, YasmeenGlobalParameters.Performance.MAX_SPEED.name());
|
||||||
|
|
||||||
config.setParam(YasmeenGlobalParameters.activatePreParsingProcessing, "true");
|
config.setParam(YasmeenGlobalParameters.activatePreParsingProcessing, "true");
|
||||||
config.setParam(YasmeenGlobalParameters.useStemmedGenusAndSpecies, "false");
|
config.setParam(YasmeenGlobalParameters.useStemmedGenusAndSpecies, "false");
|
||||||
|
|
||||||
|
@ -60,7 +62,7 @@ public class TestWPSJobs {
|
||||||
config.setParam("ServiceUserName", "gianpaolo.coro");
|
config.setParam("ServiceUserName", "gianpaolo.coro");
|
||||||
config.setParam("DatabaseDriver", "org.postgresql.Driver");
|
config.setParam("DatabaseDriver", "org.postgresql.Driver");
|
||||||
config.setGcubeUserName("gianpaolo.coro");
|
config.setGcubeUserName("gianpaolo.coro");
|
||||||
config.setGcubeToken("cb289202-e7d6-45ee-8076-a80bc4d4be51-98187548");
|
config.setGcubeToken("f9d49d76-cd60-48ed-9f8e-036bcc1fc045-98187548");
|
||||||
|
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
@ -74,7 +76,7 @@ public class TestWPSJobs {
|
||||||
AlgorithmConfiguration config = buildTestConfiguration();
|
AlgorithmConfiguration config = buildTestConfiguration();
|
||||||
|
|
||||||
WPSJobManager manager = new WPSJobManager();
|
WPSJobManager manager = new WPSJobManager();
|
||||||
TasksWatcher taskWatcher = manager.new TasksWatcher(algorithm, config.getGcubeUserName(), config.getGcubeToken(), host, port, session, 1, config, 1, 1, 1, 1);
|
TasksWatcher taskWatcher = manager.new TasksWatcher(algorithm, config.getGcubeUserName(), config.getGcubeToken(), host, port, session, 1, config, 1, 1, 1, 1,"",1);
|
||||||
Thread t = new Thread(taskWatcher);
|
Thread t = new Thread(taskWatcher);
|
||||||
t.start();
|
t.start();
|
||||||
|
|
||||||
|
@ -92,7 +94,8 @@ public class TestWPSJobs {
|
||||||
String algorithm = "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer";
|
String algorithm = "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer";
|
||||||
ScopeProvider.instance.set(config.getGcubeScope());
|
ScopeProvider.instance.set(config.getGcubeScope());
|
||||||
WPSJobManager jobmanager = new WPSJobManager();
|
WPSJobManager jobmanager = new WPSJobManager();
|
||||||
int nArguments = 100;
|
//int nArguments = 100;
|
||||||
|
int nArguments = 20;
|
||||||
List<String> arguments = new ArrayList<String>();
|
List<String> arguments = new ArrayList<String>();
|
||||||
for (int i=1;i<=nArguments;i++){
|
for (int i=1;i<=nArguments;i++){
|
||||||
String argument = "1 1 "+i+" 1";
|
String argument = "1 1 "+i+" 1";
|
||||||
|
|
Loading…
Reference in New Issue