From b67804bf15cf5644dceea562b5f43766f85a0d81 Mon Sep 17 00:00:00 2001 From: Gianpaolo Coro Date: Thu, 29 Sep 2016 10:49:16 +0000 Subject: [PATCH] git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@132046 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../StandardLocalInfraAlgorithm.java | 48 ++++++++----- .../job/management/GenericWorker.java | 23 +++++-- .../job/management/WPSJobManager.java | 3 +- .../algorithms/AquamapsSuitableNode.java | 1 + .../executor/nodes/algorithms/CMSY.java | 9 ++- .../executor/nodes/algorithms/FAOMSY.java | 3 +- .../executor/nodes/algorithms/ICCATVPA.java | 3 +- .../executor/nodes/algorithms/LWR.java | 2 +- .../transducers/OccurrenceMergingNode.java | 3 +- .../executor/tests/TestEmailingSystem.java | 69 +++++++++++++++++++ .../executor/util/InfraRetrieval.java | 27 ++++++++ 11 files changed, 159 insertions(+), 32 deletions(-) create mode 100644 src/main/java/org/gcube/dataanalysis/executor/tests/TestEmailingSystem.java diff --git a/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java b/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java index ab2b435..b18eb02 100644 --- a/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java +++ b/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java @@ -1,26 +1,40 @@ package org.gcube.dataanalysis.ecoengine.interfaces; -import java.io.File; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.net.URLEncoder; +import org.gcube.contentmanagement.graphtools.utils.HttpRequest; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; -import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE; -import org.gcube.dataanalysis.ecoengine.datatypes.DatabaseType; -import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; -import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; -import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.DatabaseParameters; -import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; -import org.gcube.dataanalysis.ecoengine.interfaces.Transducerer; -import org.gcube.dataanalysis.ecoengine.utils.ResourceFactory; +import org.gcube.dataanalysis.executor.util.InfraRetrieval; +public abstract class StandardLocalInfraAlgorithm extends StandardLocalExternalAlgorithm { -public abstract class StandardLocalInfraAlgorithm extends StandardLocalExternalAlgorithm{ - public boolean sendNotificationEmail(){ - return true; + + public void sendNotificationEmail(String subject, String body) throws Exception { + + AnalysisLogger.getLogger().debug("Emailing System->Starting request of email in scope "+config.getGcubeScope()); + + String serviceAddress = InfraRetrieval.findEmailingSystemAddress(config.getGcubeScope()); + + String requestForMessage = serviceAddress + "/messages/writeMessageToUsers" + "?gcube-token=" + config.getGcubeToken(); + requestForMessage = requestForMessage.replace("http", "https").replace("80", ""); // remove the port (or set it to 443) otherwise you get an SSL error + + AnalysisLogger.getLogger().debug("Emailing System->Request url is going to be " + requestForMessage); + + // put the sender, the recipients, subject and body of the mail here + subject=URLEncoder.encode(subject,"UTF-8"); + body=URLEncoder.encode(body,"UTF-8"); + String requestParameters = "sender=dataminer&recipients="+config.getGcubeUserName()+"&subject="+subject+"&body="+body; + + String response = HttpRequest.sendPostRequest(requestForMessage, requestParameters); + AnalysisLogger.getLogger().debug("Emailing System->Emailing response OK "); + + if (response==null){ + Exception e = new Exception("Error in email sending response"); + throw e; + } + } + + } diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java index cee3e0b..3b828be 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java @@ -77,10 +77,15 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ logger.debug("GenericWorker-> isduplicate:" +isduplicate); logger.debug("GenericWorker-> execution directory:" +config.getConfigPath()); logger.debug("GenericWorker-> nodeConfigurationFileObject.getName():" +nodeConfigurationFileObject.getName()); + logger.debug("GenericWorker-> nodeConfigurationFileObject.getPath():" +nodeConfigurationFileObject.getAbsolutePath()); + + logger.debug("GenericWorker-> session :" +session); + logger.debug("GenericWorker-> delete files :" +deleteFiles); File sandboxfile = new File(config.getConfigPath(),nodeConfigurationFileObject.getName()); Files.copy(nodeConfigurationFileObject.toPath(), sandboxfile.toPath(), REPLACE_EXISTING); + logger.debug("GenericWorker-> copied configuration file as " +sandboxfile.getAbsolutePath()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -88,11 +93,14 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ System.setOut(ps); System.setErr(ps); - node.executeNode(leftStartIndex, numberOfLeftElementsToProcess, rightStartIndex, numberOfRightElementsToProcess, isduplicate, + int result = node.executeNode(leftStartIndex, numberOfLeftElementsToProcess, rightStartIndex, numberOfRightElementsToProcess, isduplicate, config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt"); + System.setOut(origOut); + System.setErr(origErr); String log = new String(baos.toByteArray(), StandardCharsets.UTF_8); //manage known issues + /* log=log.replace(".XMLStreamException: Unbound namespace URI", "Known Except"); log=log.replace("java.io.IOException: Error copying XML", "Known Except"); log=log.replace("java.io.FileNotFoundException: /home/gcube/tomcat/tmp/ConfigurationFile", "Known Except"); @@ -101,23 +109,24 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ logger.debug("GenericWorker-> Execution Fulllog" ); logger.debug("GenericWorker-> " + log); logger.debug("GenericWorker-> Script executed! " ); + */ boolean del = sandboxfile.delete(); logger.debug("GenericWorker-> deleted sandbox file: "+del ); logger.debug("GenericWorker-> all done"); - - - if (log.contains("Exception:")){ + //if (log.contains("Exception:")){ + if (result!= 0){ outputParameters.put(OutputParameter, TASK_FAILURE); String cutLog = URLEncoder.encode(log, "UTF-8"); + /* int maxlen = 20240; - int startidx = log.indexOf("Exception:"); - log = log.substring(startidx); + if (log.length()>maxlen) cutLog = cutLog.substring(0,maxlen)+"..."; - + */ + cutLog = log; outputParameters.put("Log", cutLog); logger.debug("GenericWorker-> Failure!"); } diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java index 6454cbf..966643e 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java @@ -3,6 +3,7 @@ package org.gcube.dataanalysis.executor.job.management; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; +import java.net.URLDecoder; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -164,7 +165,7 @@ public class WPSJobManager { if (failure) { exitstatus = GenericWorker.TASK_FAILURE; - AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Failure cause: " + result); + AnalysisLogger.getLogger().debug("WPSJobManager->Task Number "+taskNumber+" - Failure cause: " + URLDecoder.decode(result,"UTF-8")); } // AnalysisLogger.getLogger().debug("Process execution finished: " + exitstatus); diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/AquamapsSuitableNode.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/AquamapsSuitableNode.java index 3aafdc1..cb92574 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/AquamapsSuitableNode.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/AquamapsSuitableNode.java @@ -228,6 +228,7 @@ public class AquamapsSuitableNode extends ActorNode { } catch (Exception e) { System.err.println("ERROR " + e); e.printStackTrace(); + return -1; } finally{ diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/CMSY.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/CMSY.java index b7a2e5b..fe21a89 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/CMSY.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/CMSY.java @@ -16,6 +16,7 @@ import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; import org.gcube.dataanalysis.ecoengine.utils.IOHelper; import org.gcube.dataanalysis.ecoengine.utils.Transformations; import org.gcube.dataanalysis.executor.scripts.OSCommand; +import org.gcube.dataanalysis.executor.util.LocalRScriptsManager; import org.gcube.dataanalysis.executor.util.RScriptsManager; import org.gcube.dataanalysis.executor.util.StorageUtils; @@ -83,7 +84,7 @@ public class CMSY extends ActorNode { public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) { try { status = 0; - config = Transformations.restoreConfig(nodeConfigurationFileObject); + config = Transformations.restoreConfig(new File (sandboxFolder,nodeConfigurationFileObject).getAbsolutePath()); String outputFile = config.getParam(processOutput); AnalysisLogger.getLogger().info("CMSY expected output "+outputFile); @@ -93,11 +94,12 @@ public class CMSY extends ActorNode { StorageUtils.downloadInputFile(config.getParam(stocksFile), filestock); AnalysisLogger.getLogger().debug("Check fileID: "+fileid+" "+new File(fileid).exists()); AnalysisLogger.getLogger().debug("Check fileStocks: "+filestock+" "+new File(filestock).exists()); - RScriptsManager scriptmanager = new RScriptsManager(); + //RScriptsManager scriptmanager = new RScriptsManager(); + LocalRScriptsManager scriptmanager = new LocalRScriptsManager(); HashMap codeinj = new HashMap(); codeinj.put("HLH_M07",config.getParam(stock)); - config.setConfigPath("./"); + //config.setConfigPath("./"); scriptmanager.executeRScript(config, scriptName, "", new HashMap(), "", "outputfile.txt", codeinj, true,false,false,sandboxFolder); outputFileName = scriptmanager.getCurrentOutputFileName(); @@ -109,6 +111,7 @@ public class CMSY extends ActorNode { AnalysisLogger.getLogger().info("CMSY Finished"); }catch(Exception e){ e.printStackTrace(); + return -1; } return 0; diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java index 01d8e3e..ccda547 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java @@ -102,7 +102,7 @@ public class FAOMSY extends ActorNode { public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) { try { status = 0; - config = Transformations.restoreConfig(nodeConfigurationFileObject); + config = config = Transformations.restoreConfig(new File (sandboxFolder,nodeConfigurationFileObject).getAbsolutePath()); String outputFile = config.getParam(processOutput)+"_part"+rightStartIndex; String nonprocessedoutputFile = config.getParam(nonProcessedOutput)+"_part"+rightStartIndex; AnalysisLogger.getLogger().info("FAOMSY ranges: "+" Li:"+leftStartIndex+" NLi:"+leftStartIndex+" Ri:"+rightStartIndex+" NRi:"+numberOfRightElementsToProcess); @@ -152,6 +152,7 @@ public class FAOMSY extends ActorNode { AnalysisLogger.getLogger().info("FAOMSY Finished"); }catch(Exception e){ e.printStackTrace(); + return -1; } return 0; diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java index 1b2e84a..41a10f4 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java @@ -158,7 +158,7 @@ public class ICCATVPA extends ActorNode { public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) { try { status = 0; - config = Transformations.restoreConfig(nodeConfigurationFileObject); + config = config = Transformations.restoreConfig(new File (sandboxFolder,nodeConfigurationFileObject).getAbsolutePath()); String outputFile = config.getParam(processOutputParam); String localzipFile = "iccat_zip.zip"; @@ -239,6 +239,7 @@ public class ICCATVPA extends ActorNode { AnalysisLogger.getLogger().info("ICCAT-VPA : Finished"); }catch(Exception e){ e.printStackTrace(); + return -1; } return 0; diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/LWR.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/LWR.java index 9fd5816..350faa3 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/LWR.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/LWR.java @@ -111,7 +111,7 @@ public class LWR extends ActorNode { String insertQuery = null; try { status = 0; - AlgorithmConfiguration config = Transformations.restoreConfig(nodeConfigurationFileObject); + AlgorithmConfiguration config = config = Transformations.restoreConfig(new File (sandboxFolder,nodeConfigurationFileObject).getAbsolutePath()); config.setConfigPath(sandboxFolder); System.out.println("Initializing DB"); dbconnection = DatabaseUtils.initDBSession(config); diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java index b94a988..d419805 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java @@ -1,5 +1,6 @@ package org.gcube.dataanalysis.executor.nodes.transducers; +import java.io.File; import java.util.List; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; @@ -69,7 +70,7 @@ public class OccurrenceMergingNode extends ActorNode { try{ status = 0; - AlgorithmConfiguration config = Transformations.restoreConfig(nodeConfigurationFileObject); + AlgorithmConfiguration config = Transformations.restoreConfig(new File(sandboxFolder,nodeConfigurationFileObject).getAbsolutePath()); config.setConfigPath(sandboxFolder); processor.setConfiguration(config); AnalysisLogger.getLogger().info("Initializing variables"); diff --git a/src/main/java/org/gcube/dataanalysis/executor/tests/TestEmailingSystem.java b/src/main/java/org/gcube/dataanalysis/executor/tests/TestEmailingSystem.java new file mode 100644 index 0000000..f20a53d --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/tests/TestEmailingSystem.java @@ -0,0 +1,69 @@ +package org.gcube.dataanalysis.executor.tests; + +import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm; +import org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer; +import org.gcube.dataanalysis.executor.nodes.transducers.bionym.utils.YasmeenGlobalParameters; + +public class TestEmailingSystem { + + public static void main (String args[]) throws Exception{ + AlgorithmConfiguration config = new AlgorithmConfiguration(); + config.setConfigPath("./cfg/"); + + AnalysisLogger.setLogger(config.getConfigPath() + AlgorithmConfiguration.defaultLoggerFile); + config.setParam("DatabaseUserName", "utente"); + config.setParam("DatabasePassword", "d4science"); + config.setParam("DatabaseURL", "jdbc:postgresql://statistical-manager.d.d4science.research-infrastructures.eu/testdb"); + config.setPersistencePath("./"); + //config.setGcubeScope("/gcube/devNext/NextNext"); + config.setGcubeScope("/gcube/devNext/NextNext"); + config.setParam("ServiceUserName", "gianpaolo.coro"); + config.setParam("DatabaseDriver", "org.postgresql.Driver"); + config.setGcubeUserName("gianpaolo.coro"); + config.setGcubeToken("f9d49d76-cd60-48ed-9f8e-036bcc1fc045-98187548"); + + ScopeProvider.instance.set(config.getGcubeScope()); + StandardLocalInfraAlgorithm infraAlg = new StandardLocalInfraAlgorithm() { + + @Override + public void shutdown() { + // TODO Auto-generated method stub + + } + + @Override + protected void setInputParameters() { + // TODO Auto-generated method stub + + } + + @Override + protected void process() throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public void init() throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public String getDescription() { + // TODO Auto-generated method stub + return null; + } + }; + infraAlg.setConfiguration(config); + infraAlg.sendNotificationEmail("hello&ernrinndnknd","test++èèééé222"); + + + } + + + +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java b/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java index ad7c9b5..e4f157b 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java +++ b/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java @@ -55,4 +55,31 @@ public class InfraRetrieval { return addresses; } + + public static String findEmailingSystemAddress(String scope) throws Exception { + + String resource = "jersey-servlet"; + String serviceName = "SocialNetworking"; + String serviceClass = "Portal"; + SimpleQuery query = ICFactory.queryFor(GCoreEndpoint.class); + query.addCondition(String.format("$resource/Profile/ServiceClass/text() eq '%s'",serviceClass)); + query.addCondition("$resource/Profile/DeploymentData/Status/text() eq 'ready'"); + query.addCondition(String.format("$resource/Profile/ServiceName/text() eq '%s'",serviceName)); + query.setResult("$resource/Profile/AccessPoint/RunningInstanceInterfaces//Endpoint[@EntryName/string() eq \""+resource+"\"]/text()"); + + DiscoveryClient client = ICFactory.client(); + List endpoints = client.submit(query); + if (endpoints == null || endpoints.isEmpty()) throw new Exception("Cannot retrieve the GCoreEndpoint serviceName: "+serviceName +", serviceClass: " +serviceClass +", in scope: "+scope); + + String resourceEntyName = endpoints.get(0); + + if(resourceEntyName==null) + throw new Exception("Endpoint:"+resource+", is null for serviceName: "+serviceName +", serviceClass: " +serviceClass +", in scope: "+scope); + + return resourceEntyName; + + } + + + }