diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java new file mode 100644 index 0000000..4af05de --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/GenericWorker.java @@ -0,0 +1,196 @@ +package org.gcube.dataanalysis.executor.job.management; + +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler; +import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; +import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm; + +public class GenericWorker extends StandardLocalExternalAlgorithm{ + + private static String genericWorkerDir = "/genericworker/"; + + + public static String AlgorithmClassParameter = "AlgorithmClass"; + public static String RightSetStartIndexParameter = "RightSetStartIndex"; + public static String NumberOfRightElementsToProcessParameter = "NumberOfRightElementsToProcess"; + public static String LeftSetStartIndexParameter = "LeftSetStartIndex"; + public static String NumberOfLeftElementsToProcessParameter = "NumberOfLeftElementsToProcess"; + public static String IsDuplicateParameter = "IsDuplicate"; + public static String SessionParameter = "Session"; + public static String ConfigurationFileParameter = "ConfigurationFile"; + public static String DeleteTemporaryFilesParameter = "DeleteTemporaryFiles"; + + public static String OutputParameter = "Process_Outcome"; + public static String TASK_SUCCESS = "TASK_SUCCESS"; + public static String TASK_FAILURE = "TASK_FAILURE"; + + private static void inputStreamToFile(InputStream is, String path) throws FileNotFoundException, IOException { + FileOutputStream out = new FileOutputStream(new File(path)); + byte buf[] = new byte[1024]; + int len = 0; + while ((len = is.read(buf)) > 0) + out.write(buf, 0, len); + out.close(); + } + + public void executeAlgorithm(String algorithmClass, + int rightStartIndex, + int numberOfRightElementsToProcess, + int leftStartIndex, + int numberOfLeftElementsToProcess, + boolean isduplicate, + String session, + File nodeConfigurationFileObject, + boolean deleteFiles) throws Exception { + + PrintStream origOut = System.out; + PrintStream origErr = System.err; + status = 0f; + Logger logger = AnalysisLogger.getLogger(); + StringBuffer sb = new StringBuffer(); + File tempDir = null ; + try { + Handler.activateProtocol(); + String locDir = session; + if (session == null) + locDir = ("" + UUID.randomUUID()).replace("-", ""); + + // invoke the algorithm + logger.debug("GenericWorker-> Creating algorithm " + algorithmClass); + ActorNode node = (ActorNode) Class.forName(algorithmClass).newInstance(); + logger.debug("GenericWorker-> executing algorithm " + algorithmClass +" with parameters:"); + logger.debug("GenericWorker-> rightStartIndex:" +rightStartIndex); + logger.debug("GenericWorker-> numberOfRightElementsToProcess:" +numberOfRightElementsToProcess); + logger.debug("GenericWorker-> leftStartIndex:" +leftStartIndex); + logger.debug("GenericWorker-> numberOfLeftElementsToProcess:" +numberOfLeftElementsToProcess); + logger.debug("GenericWorker-> isduplicate:" +isduplicate); + logger.debug("GenericWorker-> execution directory:" +config.getConfigPath()); + logger.debug("GenericWorker-> nodeConfigurationFileObject.getName():" +nodeConfigurationFileObject.getName()); + + File sandboxfile = new File(config.getConfigPath(),nodeConfigurationFileObject.getName()); + + Files.copy(nodeConfigurationFileObject.toPath(), sandboxfile.toPath(), REPLACE_EXISTING); + logger.debug("GenericWorker-> copied configuration file as " +sandboxfile.getAbsolutePath()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + + System.setOut(ps); + System.setErr(ps); + node.executeNode(rightStartIndex, numberOfRightElementsToProcess, leftStartIndex, numberOfLeftElementsToProcess, isduplicate, + config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt"); + + String log = new String(baos.toByteArray(), StandardCharsets.UTF_8); + logger.debug("GenericWorker-> Execution Fulllog" ); + logger.debug("GenericWorker-> " + log); + logger.debug("GenericWorker-> Script executed! " ); + + boolean del = sandboxfile.delete(); + logger.debug("GenericWorker-> deleted sandbox file: "+del ); + logger.debug("GenericWorker-> all done"); + + if (log.contains("Exception:") && log.contains("Caused by:")){ + outputParameters.put(OutputParameter, TASK_FAILURE); + logger.debug("GenericWorker-> Failure!"); + } + else + outputParameters.put(OutputParameter, TASK_SUCCESS); + logger.debug("GenericWorker-> Success!"); + } catch (Throwable e) { + outputParameters.put(OutputParameter, TASK_FAILURE); + e.printStackTrace(); + logger.debug("GenericWorker-> ERROR: " + e.getLocalizedMessage()); + status = 100f; + throw new Exception(e.getLocalizedMessage()); + } + finally{ + System.setOut(origOut); + System.setErr(origErr); + try{ + if (deleteFiles && (tempDir!=null)) { + logger.debug("GenericWorker-> ... deleting local files"); + // delete all after execution + for (File singlefile : tempDir.listFiles()) { + boolean del = singlefile.delete(); + if (!del) + logger.debug("GenericWorker-> ERROR deleting " + singlefile.getName() + " " + del); + else + logger.debug("GenericWorker-> deleted LOCAL FILE " + singlefile.getName() + " " + del); + } + logger.debug("GenericWorker-> deleting temporary directory"); + tempDir.delete(); + } + }catch(Exception e3){ + e3.printStackTrace(); + logger.debug("GenericWorker-> Error deleting files"); + } + status = 100f; + } + } + + @Override + public void init() throws Exception { + + + } + + @Override + public String getDescription() { + return "An algorithm that executes another other algorithm"; + } + + @Override + protected void process() throws Exception { + AnalysisLogger.getLogger().debug("Parameters: "+config.getGeneralProperties()); + + String algorithmClass = config.getParam("AlgorithmClass"); + + int rightStartIndex = Integer.parseInt(config.getParam("RightSetStartIndex")); + int numberOfRightElementsToProcess =Integer.parseInt(config.getParam("NumberOfRightElementsToProcess")); + int leftStartIndex =Integer.parseInt(config.getParam("LeftSetStartIndex")); + int numberOfLeftElementsToProcess =Integer.parseInt(config.getParam("NumberOfLeftElementsToProcess")); + boolean isduplicate=Boolean.parseBoolean(config.getParam("IsDuplicate")); + String session=config.getParam("Session"); + File nodeConfigurationFileObject=new File (config.getParam("ConfigurationFile")); + boolean deleteFiles= Boolean.parseBoolean(config.getParam("DeleteTemporaryFiles")); + + AnalysisLogger.getLogger().debug("Executing the algorithm"); + executeAlgorithm(algorithmClass, rightStartIndex, numberOfRightElementsToProcess, leftStartIndex, numberOfLeftElementsToProcess, isduplicate, session, nodeConfigurationFileObject, deleteFiles); + AnalysisLogger.getLogger().debug("Algorithm executed!"); + + } + + @Override + protected void setInputParameters() { + + addStringInput("AlgorithmClass", "The full class path of the algorithm", "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer"); + addIntegerInput("RightSetStartIndex", "The start index of the right set in a cartesian product of the input", "1"); + addIntegerInput("NumberOfRightElementsToProcess", "The number of elements to process in the right set", "1"); + addIntegerInput("LeftSetStartIndex", "The start index of the left set in a cartesian product of the input", "1"); + addIntegerInput("NumberOfLeftElementsToProcess", "The number of elements to process in the left set", "1"); + addBooleanInput("IsDuplicate", "Indicate if this sub computation is a duplicate of another sub-computation", "false"); + addStringInput("Session", "The session this sub-computation belongs to", "123456"); + addFileInput("ConfigurationFile", "A configuration file for the algorithm in an XML serialisation format for the AlgorithmConfiguration Object", "config.dat"); + addBooleanInput("DeleteTemporaryFiles","Delete local temporary files after the computation","true"); + } + + @Override + public void shutdown() { + + } + +}