git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@132046 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
5e0aabec9f
commit
b67804bf15
|
@ -1,26 +1,40 @@
|
|||
package org.gcube.dataanalysis.ecoengine.interfaces;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.net.URLEncoder;
|
||||
|
||||
import org.gcube.contentmanagement.graphtools.utils.HttpRequest;
|
||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
||||
import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE;
|
||||
import org.gcube.dataanalysis.ecoengine.datatypes.DatabaseType;
|
||||
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType;
|
||||
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
|
||||
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.DatabaseParameters;
|
||||
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
|
||||
import org.gcube.dataanalysis.ecoengine.interfaces.Transducerer;
|
||||
import org.gcube.dataanalysis.ecoengine.utils.ResourceFactory;
|
||||
import org.gcube.dataanalysis.executor.util.InfraRetrieval;
|
||||
|
||||
public abstract class StandardLocalInfraAlgorithm extends StandardLocalExternalAlgorithm {
|
||||
|
||||
public abstract class StandardLocalInfraAlgorithm extends StandardLocalExternalAlgorithm{
|
||||
|
||||
public boolean sendNotificationEmail(){
|
||||
return true;
|
||||
|
||||
public void sendNotificationEmail(String subject, String body) throws Exception {
|
||||
|
||||
AnalysisLogger.getLogger().debug("Emailing System->Starting request of email in scope "+config.getGcubeScope());
|
||||
|
||||
String serviceAddress = InfraRetrieval.findEmailingSystemAddress(config.getGcubeScope());
|
||||
|
||||
String requestForMessage = serviceAddress + "/messages/writeMessageToUsers" + "?gcube-token=" + config.getGcubeToken();
|
||||
requestForMessage = requestForMessage.replace("http", "https").replace("80", ""); // remove the port (or set it to 443) otherwise you get an SSL error
|
||||
|
||||
AnalysisLogger.getLogger().debug("Emailing System->Request url is going to be " + requestForMessage);
|
||||
|
||||
// put the sender, the recipients, subject and body of the mail here
|
||||
subject=URLEncoder.encode(subject,"UTF-8");
|
||||
body=URLEncoder.encode(body,"UTF-8");
|
||||
String requestParameters = "sender=dataminer&recipients="+config.getGcubeUserName()+"&subject="+subject+"&body="+body;
|
||||
|
||||
String response = HttpRequest.sendPostRequest(requestForMessage, requestParameters);
|
||||
AnalysisLogger.getLogger().debug("Emailing System->Emailing response OK ");
|
||||
|
||||
if (response==null){
|
||||
Exception e = new Exception("Error in email sending response");
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -77,10 +77,15 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
|
|||
logger.debug("GenericWorker-> isduplicate:" +isduplicate);
|
||||
logger.debug("GenericWorker-> execution directory:" +config.getConfigPath());
|
||||
logger.debug("GenericWorker-> nodeConfigurationFileObject.getName():" +nodeConfigurationFileObject.getName());
|
||||
logger.debug("GenericWorker-> nodeConfigurationFileObject.getPath():" +nodeConfigurationFileObject.getAbsolutePath());
|
||||
|
||||
logger.debug("GenericWorker-> session :" +session);
|
||||
logger.debug("GenericWorker-> delete files :" +deleteFiles);
|
||||
|
||||
File sandboxfile = new File(config.getConfigPath(),nodeConfigurationFileObject.getName());
|
||||
|
||||
Files.copy(nodeConfigurationFileObject.toPath(), sandboxfile.toPath(), REPLACE_EXISTING);
|
||||
|
||||
logger.debug("GenericWorker-> copied configuration file as " +sandboxfile.getAbsolutePath());
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
|
@ -88,11 +93,14 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
|
|||
|
||||
System.setOut(ps);
|
||||
System.setErr(ps);
|
||||
node.executeNode(leftStartIndex, numberOfLeftElementsToProcess, rightStartIndex, numberOfRightElementsToProcess, isduplicate,
|
||||
int result = node.executeNode(leftStartIndex, numberOfLeftElementsToProcess, rightStartIndex, numberOfRightElementsToProcess, isduplicate,
|
||||
config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt");
|
||||
System.setOut(origOut);
|
||||
System.setErr(origErr);
|
||||
|
||||
String log = new String(baos.toByteArray(), StandardCharsets.UTF_8);
|
||||
//manage known issues
|
||||
/*
|
||||
log=log.replace(".XMLStreamException: Unbound namespace URI", "Known Except");
|
||||
log=log.replace("java.io.IOException: Error copying XML", "Known Except");
|
||||
log=log.replace("java.io.FileNotFoundException: /home/gcube/tomcat/tmp/ConfigurationFile", "Known Except");
|
||||
|
@ -101,23 +109,24 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
|
|||
logger.debug("GenericWorker-> Execution Fulllog" );
|
||||
logger.debug("GenericWorker-> " + log);
|
||||
logger.debug("GenericWorker-> Script executed! " );
|
||||
*/
|
||||
|
||||
boolean del = sandboxfile.delete();
|
||||
logger.debug("GenericWorker-> deleted sandbox file: "+del );
|
||||
logger.debug("GenericWorker-> all done");
|
||||
|
||||
|
||||
|
||||
|
||||
if (log.contains("Exception:")){
|
||||
//if (log.contains("Exception:")){
|
||||
if (result!= 0){
|
||||
outputParameters.put(OutputParameter, TASK_FAILURE);
|
||||
String cutLog = URLEncoder.encode(log, "UTF-8");
|
||||
/*
|
||||
int maxlen = 20240;
|
||||
int startidx = log.indexOf("Exception:");
|
||||
log = log.substring(startidx);
|
||||
|
||||
if (log.length()>maxlen)
|
||||
cutLog = cutLog.substring(0,maxlen)+"...";
|
||||
|
||||
*/
|
||||
cutLog = log;
|
||||
outputParameters.put("Log", cutLog);
|
||||
logger.debug("GenericWorker-> Failure!");
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.gcube.dataanalysis.executor.job.management;
|
|||
import java.io.BufferedReader;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URLDecoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -164,7 +165,7 @@ public class WPSJobManager {
|
|||
if (failure)
|
||||
{
|
||||
exitstatus = GenericWorker.TASK_FAILURE;
|
||||
AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Failure cause: " + result);
|
||||
AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Failure cause: " + URLDecoder.decode(result,"UTF-8"));
|
||||
}
|
||||
// AnalysisLogger.getLogger().debug("Process execution finished: " + exitstatus);
|
||||
|
||||
|
|
|
@ -228,6 +228,7 @@ public class AquamapsSuitableNode extends ActorNode {
|
|||
} catch (Exception e) {
|
||||
System.err.println("ERROR " + e);
|
||||
e.printStackTrace();
|
||||
return -1;
|
||||
}
|
||||
finally{
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode;
|
|||
import org.gcube.dataanalysis.ecoengine.utils.IOHelper;
|
||||
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
|
||||
import org.gcube.dataanalysis.executor.scripts.OSCommand;
|
||||
import org.gcube.dataanalysis.executor.util.LocalRScriptsManager;
|
||||
import org.gcube.dataanalysis.executor.util.RScriptsManager;
|
||||
import org.gcube.dataanalysis.executor.util.StorageUtils;
|
||||
|
||||
|
@ -83,7 +84,7 @@ public class CMSY extends ActorNode {
|
|||
public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) {
|
||||
try {
|
||||
status = 0;
|
||||
config = Transformations.restoreConfig(nodeConfigurationFileObject);
|
||||
config = Transformations.restoreConfig(new File (sandboxFolder,nodeConfigurationFileObject).getAbsolutePath());
|
||||
String outputFile = config.getParam(processOutput);
|
||||
AnalysisLogger.getLogger().info("CMSY expected output "+outputFile);
|
||||
|
||||
|
@ -93,11 +94,12 @@ public class CMSY extends ActorNode {
|
|||
StorageUtils.downloadInputFile(config.getParam(stocksFile), filestock);
|
||||
AnalysisLogger.getLogger().debug("Check fileID: "+fileid+" "+new File(fileid).exists());
|
||||
AnalysisLogger.getLogger().debug("Check fileStocks: "+filestock+" "+new File(filestock).exists());
|
||||
RScriptsManager scriptmanager = new RScriptsManager();
|
||||
//RScriptsManager scriptmanager = new RScriptsManager();
|
||||
LocalRScriptsManager scriptmanager = new LocalRScriptsManager();
|
||||
|
||||
HashMap<String,String> codeinj = new HashMap<String,String>();
|
||||
codeinj.put("HLH_M07",config.getParam(stock));
|
||||
config.setConfigPath("./");
|
||||
//config.setConfigPath("./");
|
||||
scriptmanager.executeRScript(config, scriptName, "", new HashMap<String,String>(), "", "outputfile.txt", codeinj, true,false,false,sandboxFolder);
|
||||
|
||||
outputFileName = scriptmanager.getCurrentOutputFileName();
|
||||
|
@ -109,6 +111,7 @@ public class CMSY extends ActorNode {
|
|||
AnalysisLogger.getLogger().info("CMSY Finished");
|
||||
}catch(Exception e){
|
||||
e.printStackTrace();
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -102,7 +102,7 @@ public class FAOMSY extends ActorNode {
|
|||
public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) {
|
||||
try {
|
||||
status = 0;
|
||||
config = Transformations.restoreConfig(nodeConfigurationFileObject);
|
||||
config = config = Transformations.restoreConfig(new File (sandboxFolder,nodeConfigurationFileObject).getAbsolutePath());
|
||||
String outputFile = config.getParam(processOutput)+"_part"+rightStartIndex;
|
||||
String nonprocessedoutputFile = config.getParam(nonProcessedOutput)+"_part"+rightStartIndex;
|
||||
AnalysisLogger.getLogger().info("FAOMSY ranges: "+" Li:"+leftStartIndex+" NLi:"+leftStartIndex+" Ri:"+rightStartIndex+" NRi:"+numberOfRightElementsToProcess);
|
||||
|
@ -152,6 +152,7 @@ public class FAOMSY extends ActorNode {
|
|||
AnalysisLogger.getLogger().info("FAOMSY Finished");
|
||||
}catch(Exception e){
|
||||
e.printStackTrace();
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -158,7 +158,7 @@ public class ICCATVPA extends ActorNode {
|
|||
public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) {
|
||||
try {
|
||||
status = 0;
|
||||
config = Transformations.restoreConfig(nodeConfigurationFileObject);
|
||||
config = config = Transformations.restoreConfig(new File (sandboxFolder,nodeConfigurationFileObject).getAbsolutePath());
|
||||
|
||||
String outputFile = config.getParam(processOutputParam);
|
||||
String localzipFile = "iccat_zip.zip";
|
||||
|
@ -239,6 +239,7 @@ public class ICCATVPA extends ActorNode {
|
|||
AnalysisLogger.getLogger().info("ICCAT-VPA : Finished");
|
||||
}catch(Exception e){
|
||||
e.printStackTrace();
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -111,7 +111,7 @@ public class LWR extends ActorNode {
|
|||
String insertQuery = null;
|
||||
try {
|
||||
status = 0;
|
||||
AlgorithmConfiguration config = Transformations.restoreConfig(nodeConfigurationFileObject);
|
||||
AlgorithmConfiguration config = config = Transformations.restoreConfig(new File (sandboxFolder,nodeConfigurationFileObject).getAbsolutePath());
|
||||
config.setConfigPath(sandboxFolder);
|
||||
System.out.println("Initializing DB");
|
||||
dbconnection = DatabaseUtils.initDBSession(config);
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package org.gcube.dataanalysis.executor.nodes.transducers;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
|
||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||
|
@ -69,7 +70,7 @@ public class OccurrenceMergingNode extends ActorNode {
|
|||
|
||||
try{
|
||||
status = 0;
|
||||
AlgorithmConfiguration config = Transformations.restoreConfig(nodeConfigurationFileObject);
|
||||
AlgorithmConfiguration config = Transformations.restoreConfig(new File(sandboxFolder,nodeConfigurationFileObject).getAbsolutePath());
|
||||
config.setConfigPath(sandboxFolder);
|
||||
processor.setConfiguration(config);
|
||||
AnalysisLogger.getLogger().info("Initializing variables");
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
package org.gcube.dataanalysis.executor.tests;
|
||||
|
||||
import org.gcube.common.scope.api.ScopeProvider;
|
||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
||||
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm;
|
||||
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer;
|
||||
import org.gcube.dataanalysis.executor.nodes.transducers.bionym.utils.YasmeenGlobalParameters;
|
||||
|
||||
public class TestEmailingSystem {
|
||||
|
||||
public static void main (String args[]) throws Exception{
|
||||
AlgorithmConfiguration config = new AlgorithmConfiguration();
|
||||
config.setConfigPath("./cfg/");
|
||||
|
||||
AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile);
|
||||
config.setParam("DatabaseUserName", "utente");
|
||||
config.setParam("DatabasePassword", "d4science");
|
||||
config.setParam("DatabaseURL", "jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb");
|
||||
config.setPersistencePath("./");
|
||||
//config.setGcubeScope("/gcube/devNext/NextNext");
|
||||
config.setGcubeScope("/gcube/devNext/NextNext");
|
||||
config.setParam("ServiceUserName", "gianpaolo.coro");
|
||||
config.setParam("DatabaseDriver", "org.postgresql.Driver");
|
||||
config.setGcubeUserName("gianpaolo.coro");
|
||||
config.setGcubeToken("f9d49d76-cd60-48ed-9f8e-036bcc1fc045-98187548");
|
||||
|
||||
ScopeProvider.instance.set(config.getGcubeScope());
|
||||
StandardLocalInfraAlgorithm infraAlg = new StandardLocalInfraAlgorithm() {
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setInputParameters() {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void process() throws Exception {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() throws Exception {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
};
|
||||
infraAlg.setConfiguration(config);
|
||||
infraAlg.sendNotificationEmail("hello&ernrinndnknd","test++èèééé222");
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -55,4 +55,31 @@ public class InfraRetrieval {
|
|||
return addresses;
|
||||
}
|
||||
|
||||
|
||||
public static String findEmailingSystemAddress(String scope) throws Exception {
|
||||
|
||||
String resource = "jersey-servlet";
|
||||
String serviceName = "SocialNetworking";
|
||||
String serviceClass = "Portal";
|
||||
SimpleQuery query = ICFactory.queryFor(GCoreEndpoint.class);
|
||||
query.addCondition(String.format("$resource/Profile/ServiceClass/text() eq '%s'",serviceClass));
|
||||
query.addCondition("$resource/Profile/DeploymentData/Status/text() eq 'ready'");
|
||||
query.addCondition(String.format("$resource/Profile/ServiceName/text() eq '%s'",serviceName));
|
||||
query.setResult("$resource/Profile/AccessPoint/RunningInstanceInterfaces//Endpoint[@EntryName/string() eq \""+resource+"\"]/text()");
|
||||
|
||||
DiscoveryClient<String> client = ICFactory.client();
|
||||
List<String> endpoints = client.submit(query);
|
||||
if (endpoints == null || endpoints.isEmpty()) throw new Exception("Cannot retrieve the GCoreEndpoint serviceName: "+serviceName +", serviceClass: " +serviceClass +", in scope: "+scope);
|
||||
|
||||
String resourceEntyName = endpoints.get(0);
|
||||
|
||||
if(resourceEntyName==null)
|
||||
throw new Exception("Endpoint:"+resource+", is null for serviceName: "+serviceName +", serviceClass: " +serviceClass +", in scope: "+scope);
|
||||
|
||||
return resourceEntyName;
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue