diff --git a/pom.xml b/pom.xml index c52b742..1f2df54 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,15 @@ org.gcube.core common-scope + + + + com.googlecode.json-simple + json-simple + 1.1.1 + + + junit junit diff --git a/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java b/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java index 39fa585..d51391e 100644 --- a/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java +++ b/src/main/java/org/gcube/dataanalysis/ecoengine/interfaces/StandardLocalInfraAlgorithm.java @@ -12,6 +12,8 @@ import org.gcube.common.resources.gcore.GCoreEndpoint; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.resources.discovery.client.api.DiscoveryClient; import org.gcube.resources.discovery.client.queries.impl.XQuery; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,16 +34,27 @@ public abstract class StandardLocalInfraAlgorithm extends StandardLocalExternalA } + @SuppressWarnings("unchecked") private boolean sendMessage(String token, String email, String subject, String body) throws Exception { String socialServiceEnpoint = retrieveSocialServiceEnpoint(); + LOGGER.debug("contacting social service endpoint {}",socialServiceEnpoint+SEND_MESSAGE_METHOD+"?gcube-token="+token); + PostMethod putMessage = new PostMethod(socialServiceEnpoint+SEND_MESSAGE_METHOD+"?gcube-token="+token); - String jsonRequest = String.format("{\"subject\":\"%s\", \"body\":\"%s\", \"recipients\":[\"%s\"]}",subject,body, email); + + JSONObject obj = new JSONObject(); + obj.put("subject", subject); + obj.put("body", body); + JSONArray list = new JSONArray(); + list.add(email); + obj.put("recipients", list); + + //String jsonRequest = String.format("{\"subject\":\"%s\", \"body\":\"%s\", \"recipients\":[{\"id\":\"%s\"}]}",subject,body, email); - putMessage.setRequestEntity(new StringRequestEntity(jsonRequest, "application/json" , "UTF-8")); + putMessage.setRequestEntity(new StringRequestEntity(obj.toJSONString(), "application/json" , "UTF-8")); - LOGGER.debug("json request is {}", jsonRequest); + LOGGER.debug("json request is {}", obj.toJSONString()); HttpClient httpClient = new HttpClient(); diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java index f2d7131..5d49435 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java @@ -13,17 +13,18 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler; import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class GenericWorker extends StandardLocalInfraAlgorithm{ private static String genericWorkerDir = "/genericworker/"; - - + + private static Logger logger = LoggerFactory.getLogger(GenericWorker.class); + public static String AlgorithmClassParameter = "AlgorithmClass"; public static String RightSetStartIndexParameter = "RightSetStartIndex"; public static String NumberOfRightElementsToProcessParameter = "NumberOfRightElementsToProcess"; @@ -33,12 +34,12 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ public static String SessionParameter = "Session"; public static String ConfigurationFileParameter = "ConfigurationFile"; public static String DeleteTemporaryFilesParameter = "DeleteTemporaryFiles"; - + public static String OutputParameter = "Process_Outcome"; public static String TASK_SUCCESS = "TASK_SUCCESS"; public static String TASK_FAILURE = "TASK_FAILURE"; public static String TASK_UNDEFINED = "TASK_UNDEFINED"; - + private static void inputStreamToFile(InputStream is, String path) throws FileNotFoundException, IOException { FileOutputStream out = new FileOutputStream(new File(path)); byte buf[] = new byte[1024]; @@ -47,7 +48,7 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ out.write(buf, 0, len); out.close(); } - + public void executeAlgorithm(String algorithmClass, int rightStartIndex, int numberOfRightElementsToProcess, @@ -57,88 +58,86 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ String session, File nodeConfigurationFileObject, boolean deleteFiles) throws Exception { - + PrintStream origOut = System.out; PrintStream origErr = System.err; status = 0f; - Logger logger = AnalysisLogger.getLogger(); StringBuffer sb = new StringBuffer(); File tempDir = null ; try { Handler.activateProtocol(); - // invoke the algorithm - logger.debug("GenericWorker-> Creating algorithm " + algorithmClass); - ActorNode node = (ActorNode) Class.forName(algorithmClass).newInstance(); - logger.debug("GenericWorker-> executing algorithm " + algorithmClass +" with parameters:"); - logger.debug("GenericWorker-> rightStartIndex:" +rightStartIndex); - logger.debug("GenericWorker-> numberOfRightElementsToProcess:" +numberOfRightElementsToProcess); - logger.debug("GenericWorker-> leftStartIndex:" +leftStartIndex); - logger.debug("GenericWorker-> numberOfLeftElementsToProcess:" +numberOfLeftElementsToProcess); - logger.debug("GenericWorker-> isduplicate:" +isduplicate); - logger.debug("GenericWorker-> execution directory:" +config.getConfigPath()); - logger.debug("GenericWorker-> nodeConfigurationFileObject.getName():" +nodeConfigurationFileObject.getName()); - logger.debug("GenericWorker-> nodeConfigurationFileObject.getPath():" +nodeConfigurationFileObject.getAbsolutePath()); - - logger.debug("GenericWorker-> session :" +session); - logger.debug("GenericWorker-> delete files :" +deleteFiles); - - File sandboxfile = new File(config.getConfigPath(),nodeConfigurationFileObject.getName()); - - Files.copy(nodeConfigurationFileObject.toPath(), sandboxfile.toPath(), REPLACE_EXISTING); - - logger.debug("GenericWorker-> copied configuration file as " +sandboxfile.getAbsolutePath()); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); - - System.setOut(ps); - System.setErr(ps); - int result = node.executeNode(leftStartIndex, numberOfLeftElementsToProcess, rightStartIndex, numberOfRightElementsToProcess, isduplicate, - config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt"); - System.setOut(origOut); - System.setErr(origErr); - - String log = new String(baos.toByteArray(), StandardCharsets.UTF_8); - //manage known issues - /* + // invoke the algorithm + logger.debug("GenericWorker-> Creating algorithm " + algorithmClass); + ActorNode node = (ActorNode) Class.forName(algorithmClass).newInstance(); + logger.debug("GenericWorker-> executing algorithm " + algorithmClass +" with parameters:"); + logger.debug("GenericWorker-> rightStartIndex:" +rightStartIndex); + logger.debug("GenericWorker-> numberOfRightElementsToProcess:" +numberOfRightElementsToProcess); + logger.debug("GenericWorker-> leftStartIndex:" +leftStartIndex); + logger.debug("GenericWorker-> numberOfLeftElementsToProcess:" +numberOfLeftElementsToProcess); + logger.debug("GenericWorker-> isduplicate:" +isduplicate); + logger.debug("GenericWorker-> execution directory:" +config.getConfigPath()); + logger.debug("GenericWorker-> nodeConfigurationFileObject.getName():" +nodeConfigurationFileObject.getName()); + logger.debug("GenericWorker-> nodeConfigurationFileObject.getPath():" +nodeConfigurationFileObject.getAbsolutePath()); + + logger.debug("GenericWorker-> session :" +session); + logger.debug("GenericWorker-> delete files :" +deleteFiles); + + File sandboxfile = new File(config.getConfigPath(),nodeConfigurationFileObject.getName()); + + Files.copy(nodeConfigurationFileObject.toPath(), sandboxfile.toPath(), REPLACE_EXISTING); + + logger.debug("GenericWorker-> copied configuration file as " +sandboxfile.getAbsolutePath()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + + System.setOut(ps); + System.setErr(ps); + int result = node.executeNode(leftStartIndex, numberOfLeftElementsToProcess, rightStartIndex, numberOfRightElementsToProcess, isduplicate, + config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt"); + System.setOut(origOut); + System.setErr(origErr); + + String log = new String(baos.toByteArray(), StandardCharsets.UTF_8); + //manage known issues + /* log=log.replace(".XMLStreamException: Unbound namespace URI", "Known Except"); log=log.replace("java.io.IOException: Error copying XML", "Known Except"); log=log.replace("java.io.FileNotFoundException: /home/gcube/tomcat/tmp/ConfigurationFile", "Known Except"); log=log.replace("java.io.FileNotFoundException: payload was not made available for this dataset", "Known Except"); - + logger.debug("GenericWorker-> Execution Fulllog" ); logger.debug("GenericWorker-> " + log); logger.debug("GenericWorker-> Script executed! " ); - */ - - boolean del = sandboxfile.delete(); - logger.debug("GenericWorker-> deleted sandbox file: "+del ); - logger.debug("GenericWorker-> all done"); - - - //if (log.contains("Exception:")){ - if (result!= 0){ - outputParameters.put(OutputParameter, TASK_FAILURE); - String cutLog = URLEncoder.encode(log, "UTF-8"); - /* + */ + + boolean del = sandboxfile.delete(); + logger.debug("GenericWorker-> deleted sandbox file: "+del ); + logger.debug("GenericWorker-> all done"); + + + //if (log.contains("Exception:")){ + if (result!= 0){ + outputParameters.put(OutputParameter, TASK_FAILURE); + String cutLog = URLEncoder.encode(log, "UTF-8"); + /* int maxlen = 20240; - + if (log.length()>maxlen) cutLog = cutLog.substring(0,maxlen)+"..."; - */ - cutLog = log; - outputParameters.put("Log", cutLog); - logger.debug("GenericWorker-> Failure!"); - } - else{ - outputParameters.put(OutputParameter, TASK_SUCCESS); - logger.debug("GenericWorker-> Success!"); - } + */ + cutLog = log; + outputParameters.put("Log", cutLog); + logger.debug("GenericWorker-> Failure!"); + } + else{ + outputParameters.put(OutputParameter, TASK_SUCCESS); + logger.debug("GenericWorker-> Success!"); + } } catch (Throwable e) { outputParameters.put(OutputParameter, TASK_FAILURE); outputParameters.put("Log", e.getLocalizedMessage()); - e.printStackTrace(); - logger.debug("GenericWorker-> ERROR: " + e.getLocalizedMessage()); + logger.error("GenericWorker-> ERROR: " ,e); status = 100f; throw new Exception(e.getLocalizedMessage()); } @@ -146,25 +145,24 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ System.setOut(origOut); System.setErr(origErr); try{ - if (deleteFiles && (tempDir!=null)) { - logger.debug("GenericWorker-> ... deleting local files"); - // delete all after execution - for (File singlefile : tempDir.listFiles()) { - boolean del = singlefile.delete(); - if (!del) - logger.debug("GenericWorker-> ERROR deleting " + singlefile.getName() + " " + del); - else - logger.debug("GenericWorker-> deleted LOCAL FILE " + singlefile.getName() + " " + del); + if (deleteFiles && (tempDir!=null)) { + logger.debug("GenericWorker-> ... deleting local files"); + // delete all after execution + for (File singlefile : tempDir.listFiles()) { + boolean del = singlefile.delete(); + if (!del) + logger.debug("GenericWorker-> ERROR deleting " + singlefile.getName() + " " + del); + else + logger.debug("GenericWorker-> deleted LOCAL FILE " + singlefile.getName() + " " + del); + } + logger.debug("GenericWorker-> deleting temporary directory"); + tempDir.delete(); } - logger.debug("GenericWorker-> deleting temporary directory"); - tempDir.delete(); - } - if (nodeConfigurationFileObject!=null && nodeConfigurationFileObject.exists()) - nodeConfigurationFileObject.delete(); - + if (nodeConfigurationFileObject!=null && nodeConfigurationFileObject.exists()) + nodeConfigurationFileObject.delete(); + }catch(Exception e3){ - e3.printStackTrace(); - logger.debug("GenericWorker-> Error deleting files"); + logger.warn("GenericWorker-> Error deleting files",e3); } status = 100f; } @@ -172,8 +170,8 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ @Override public void init() throws Exception { - - + + } @Override @@ -183,10 +181,10 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ @Override protected void process() throws Exception { - AnalysisLogger.getLogger().debug("Parameters: "+config.getGeneralProperties()); - + logger.debug("Parameters: "+config.getGeneralProperties()); + String algorithmClass = config.getParam(AlgorithmClassParameter); - + int rightStartIndex = Integer.parseInt(config.getParam(RightSetStartIndexParameter)); int numberOfRightElementsToProcess =Integer.parseInt(config.getParam(NumberOfRightElementsToProcessParameter)); int leftStartIndex =Integer.parseInt(config.getParam(LeftSetStartIndexParameter)); @@ -195,16 +193,16 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ String session=config.getParam(SessionParameter); File nodeConfigurationFileObject=new File (config.getParam(ConfigurationFileParameter)); boolean deleteFiles= Boolean.parseBoolean(config.getParam(DeleteTemporaryFilesParameter)); - - AnalysisLogger.getLogger().debug("Executing the algorithm"); + + logger.debug("Executing the algorithm"); executeAlgorithm(algorithmClass, rightStartIndex, numberOfRightElementsToProcess, leftStartIndex, numberOfLeftElementsToProcess, isduplicate, session, nodeConfigurationFileObject, deleteFiles); - AnalysisLogger.getLogger().debug("Algorithm executed!"); - + logger.debug("Algorithm executed!"); + } @Override protected void setInputParameters() { - + addStringInput(AlgorithmClassParameter, "The full class path of the algorithm", "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer"); addIntegerInput(RightSetStartIndexParameter, "The start index of the right set in a cartesian product of the input", "1"); addIntegerInput(NumberOfRightElementsToProcessParameter, "The number of elements to process in the right set", "1"); @@ -218,7 +216,7 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{ @Override public void shutdown() { - + } - + } diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java index c5e833d..2060a2d 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/transducers/OccurrenceMergingNode.java @@ -2,8 +2,6 @@ package org.gcube.dataanalysis.executor.nodes.transducers; import java.io.File; import java.util.List; - -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; @@ -12,9 +10,13 @@ import org.gcube.dataanalysis.ecoengine.transducers.OccurrencePointsMerger; import org.gcube.dataanalysis.ecoengine.utils.Transformations; import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; import org.hibernate.SessionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class OccurrenceMergingNode extends ActorNode { + private static Logger logger = LoggerFactory.getLogger(OccurrenceMergingNode.class); + // variables protected AlgorithmConfiguration currentconfig; protected SessionFactory dbHibConnection; @@ -69,12 +71,14 @@ public class OccurrenceMergingNode extends ActorNode { try{ status = 0; - AlgorithmConfiguration config = Transformations.restoreConfig(new File(sandboxFolder,nodeConfigurationFileObject).getAbsolutePath()); + String configFileAbsolutePath = new File(sandboxFolder,nodeConfigurationFileObject).getAbsolutePath(); + logger.debug("config file absolute path is {}", configFileAbsolutePath); + AlgorithmConfiguration config = Transformations.restoreConfig(configFileAbsolutePath); config.setConfigPath(sandboxFolder); processor.setConfiguration(config); - AnalysisLogger.getLogger().info("Initializing variables"); + logger.info("Initializing variables"); processor.init(); - AnalysisLogger.getLogger().info("Initializing DB"); + logger.info("Initializing DB"); processor.initDB(false); status = 0.5f; processor.takeRange(leftStartIndex, numberOfLeftElementsToProcess, rightStarIndex, numberOfRightElementsToProcess); @@ -123,7 +127,7 @@ public class OccurrenceMergingNode extends ActorNode { try { processor.postProcess(); } catch (Exception e) { - AnalysisLogger.getLogger().info("Postprocessing Inapplicable"); + logger.info("Postprocessing Inapplicable"); } }