From 6dc30f53b0751cf82323b5b4072692a63dc0a954 Mon Sep 17 00:00:00 2001 From: Gianpaolo Coro Date: Mon, 30 Mar 2015 15:32:33 +0000 Subject: [PATCH] git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@113795 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../executor/nodes/algorithms/FAOMSY.java | 223 ++++++++++++++++++ .../executor/tests/RegressionTestFAOMSY.java | 72 ++++++ .../executor/util/StorageUtils.java | 84 ------- 3 files changed, 295 insertions(+), 84 deletions(-) create mode 100644 src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java create mode 100644 src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java delete mode 100644 src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java new file mode 100644 index 0000000..2c91d71 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java @@ -0,0 +1,223 @@ +package org.gcube.dataanalysis.executor.nodes.algorithms; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; +import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveTypesList; +import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; +import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; +import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; +import org.gcube.dataanalysis.ecoengine.utils.IOHelper; +import org.gcube.dataanalysis.ecoengine.utils.Transformations; +import org.gcube.dataanalysis.executor.scripts.OSCommand; +import org.gcube.dataanalysis.executor.util.RScriptsManager; +import org.gcube.dataanalysis.executor.util.StorageUtils; + +public class FAOMSY extends ActorNode { + + public int count; + + public float status = 0; + + @Override + public ALG_PROPS[] getProperties() { + ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON }; + return p; + } + + @Override + public String getName() { + return "FAOMSY"; + } + + @Override + public String getDescription() { + return "An algorithm to estimate the Maximum Sustainable Yield from a catch statistic by FAO."; + } + + static String stocksFile = "StocksFile"; + static String processOutput= "ProcessOutput"; + static String nonProcessedOutput= "NonProcessedOutput"; + static String scriptName = "CatchMSY_Dec2014.R"; + + @Override + public List getInputParameters() { + + List parameters = new ArrayList(); + IOHelper.addStringInput(parameters, stocksFile, "Http link to a file containing catch statistics for a group of species. Example: http://goo.gl/g6YtVx", ""); + return parameters; + } + + @Override + public StatisticalType getOutput() { + File outfile = new File(config.getPersistencePath(),config.getParam(processOutput)); + File outfile2 = new File(config.getPersistencePath(),config.getParam(nonProcessedOutput)); + PrimitiveTypesList list = new PrimitiveTypesList(File.class.getName(), PrimitiveTypes.FILE, "OutputFiles", "Textual output files - processed and non-processed species",false); + if (outfile.exists()){ + PrimitiveType o = new PrimitiveType(File.class.getName(), outfile, PrimitiveTypes.FILE, "ProcessedSpecies", "Output file"); + list.add(o); + } + if (outfile2.exists()){ + PrimitiveType o2 = new PrimitiveType(File.class.getName(), outfile2, PrimitiveTypes.FILE, "NonProcessedSpecies", "Output file"); + list.add(o2); + } + + + return list; + } + + @Override + public void initSingleNode(AlgorithmConfiguration config) { + + } + + @Override + public float getInternalStatus() { + return status; + } + + AlgorithmConfiguration config; + + @Override + public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) { + try { + status = 0; + config = Transformations.restoreConfig(nodeConfigurationFileObject); + String outputFile = config.getParam(processOutput)+"_part"+rightStartIndex; + String nonprocessedoutputFile = config.getParam(nonProcessedOutput)+"_part"+rightStartIndex; + AnalysisLogger.getLogger().info("FAOMSY ranges: "+" Li:"+leftStartIndex+" NLi:"+leftStartIndex+" Ri:"+rightStartIndex+" NRi:"+numberOfRightElementsToProcess); + + AnalysisLogger.getLogger().info("FAOMSY expected output "+outputFile); + + File filestock=new File(sandboxFolder,"D20_1.csv"); + StorageUtils.downloadInputFile(config.getParam(stocksFile), filestock.getAbsolutePath()); + + AnalysisLogger.getLogger().debug("Check fileStocks: "+filestock.getAbsolutePath()+" "+filestock.exists()); + File filestocksub=new File(sandboxFolder,"D20.csv"); + + StorageUtils.FileSubset(filestock, filestocksub, rightStartIndex, numberOfRightElementsToProcess, true); + + RScriptsManager scriptmanager = new RScriptsManager(); + + HashMap codeinj = new HashMap(); + config.setConfigPath("./"); + scriptmanager.executeRScript(config, scriptName, "", new HashMap(), "", "CatchMSY_Output.csv", codeinj, false,false,false); + AnalysisLogger.getLogger().info("FAOMSY The script has finished"); + String outputFileName = ""; + //manage the fact that the outputfile could even not exist + try{outputFileName = scriptmanager.getCurrentOutputFileName();}catch(Exception e){ + AnalysisLogger.getLogger().info("FAOMSY Could not get curent output file"); + } + String optionalFileOutputName = "NonProcessedSpecies.csv"; + + if (outputFileName!=null && outputFileName.length()>0 && new File(outputFileName).exists()){ + AnalysisLogger.getLogger().info("FAOMSY Main file exists!"); + outputFileName = scriptmanager.getCurrentOutputFileName(); + String outputFilePath = new File(sandboxFolder,outputFile).getAbsolutePath(); + AnalysisLogger.getLogger().info("FAOMSY writing output file in path "+outputFilePath); + OSCommand.FileCopy(outputFileName,outputFilePath); + AnalysisLogger.getLogger().info("FAOMSY uploading output file "+outputFile); + StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), sandboxFolder,outputFile); + } + if (new File(optionalFileOutputName).exists()){ + AnalysisLogger.getLogger().info("FAOMSY Optional file exists!"); + OSCommand.FileCopy(optionalFileOutputName,nonprocessedoutputFile); + AnalysisLogger.getLogger().info("FAOMSY uploading output file "+nonprocessedoutputFile); + //check only +// String file = FileTools.loadString(nonprocessedoutputFile, "UTF-8"); +// AnalysisLogger.getLogger().info("FAOMSY File check"+file); + StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), sandboxFolder,nonprocessedoutputFile); + } + + AnalysisLogger.getLogger().info("FAOMSY Finished"); + }catch(Exception e){ + e.printStackTrace(); + } + + return 0; + } + + @Override + public void setup(AlgorithmConfiguration config) throws Exception { + this.config = config; + AnalysisLogger.getLogger().info("FAOMSY process is initialized"); + String uuid = (UUID.randomUUID()+".txt").replace("-", ""); + config.setParam(processOutput, "FAOMSY_"+"output_"+uuid); + config.setParam(nonProcessedOutput, "FAOMSY_nonprocessed_"+"output_"+uuid); + File tempfile = new File(config.getPersistencePath(),"FAOMSY_input_"+(UUID.randomUUID()+".csv").replace("-", "")); + StorageUtils.downloadInputFile(config.getParam(stocksFile), tempfile.getAbsolutePath()); + nstocks = StorageUtils.calcFileRows(tempfile, true); + AnalysisLogger.getLogger().info("FAOMSY Found "+nstocks+" stocks!"); + if (nstocks==0) + throw new Exception("Error in FAOMSY: No stocks to process found in the file "+config.getParam(stocksFile)); + + } + + int nstocks = 0; + @Override + public int getNumberOfRightElements() { + return nstocks; + } + + @Override + public int getNumberOfLeftElements() { + return 1; + } + + @Override + public void stop() { + AnalysisLogger.getLogger().info("CMSY process stopped"); + } + + boolean haspostprocessed = false; + + public void assembleFiles(String outputFileName) throws Exception{ + + //try downloading all the files + List fileslist = new ArrayList(); + for (int i=0;i<=nstocks;i++){ + String filename = outputFileName+"_part"+i; + try{ + StorageUtils.downloadFilefromStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), config.getPersistencePath(), filename); + AnalysisLogger.getLogger().debug("FAOMSY - Saved from Storage: "+filename); + fileslist.add(filename); + }catch(Exception e){ + AnalysisLogger.getLogger().debug("FAOMSY - Did not save file from Storage: "+filename); + } + } + + + AnalysisLogger.getLogger().debug("FAOMSY - Merging files in "+outputFileName); + if (fileslist.size()>0) + StorageUtils.mergeFiles(config.getPersistencePath(), fileslist, outputFileName, true); + + AnalysisLogger.getLogger().debug("FAOMSY - Deleting parts"); + for (String file:fileslist){ + new File(config.getPersistencePath(),file).delete(); + } + + AnalysisLogger.getLogger().debug("FAOMSY - File assembling complete"); + + } + + @Override + public void postProcess(boolean manageDuplicates, boolean manageFault) { + try { + String mainOutputfilename = config.getParam(processOutput); + String optionalOutputfilename = config.getParam(nonProcessedOutput); + assembleFiles(mainOutputfilename); + assembleFiles(optionalOutputfilename); + AnalysisLogger.getLogger().debug("FAOMSY - Postprocess complete"); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java b/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java new file mode 100644 index 0000000..e32f90a --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java @@ -0,0 +1,72 @@ +package org.gcube.dataanalysis.executor.tests; + +import java.util.List; + +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; +import org.gcube.dataanalysis.ecoengine.interfaces.ComputationalAgent; +import org.gcube.dataanalysis.ecoengine.processing.factories.GeneratorsFactory; +import org.gcube.dataanalysis.ecoengine.test.regression.Regressor; +import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; + +public class RegressionTestFAOMSY { + /** + * example of parallel processing on a single machine the procedure will generate a new table for a distribution on suitable species + * + */ + +public static AlgorithmConfiguration getConfig() { + + AlgorithmConfiguration config = new AlgorithmConfiguration(); + + config.setConfigPath("./cfg/"); + config.setPersistencePath("./"); + config.setParam("DatabaseUserName","utente"); + config.setParam("DatabasePassword","d4science"); + config.setParam("DatabaseURL","jdbc:postgresql://dbtest.research-infrastructures.eu/testdb"); + config.setParam("DatabaseDriver","org.postgresql.Driver"); + AnalysisLogger.setLogger(config.getConfigPath()+AlgorithmConfiguration.defaultLoggerFile); + return config; + } + + public static void main(String[] args) throws Exception { + + System.out.println("TEST 1"); + + List generators = GeneratorsFactory.getGenerators(testCMSY()); + generators.get(0).init(); + CustomRegressor.process(generators.get(0)); + StatisticalType output = generators.get(0).getOutput(); + AnalysisLogger.getLogger().debug("Output description: "+output.getDescription()); + generators = null; + } + + private static AlgorithmConfiguration testCMSY() { + + AlgorithmConfiguration config = getConfig(); + config.setNumberOfResources(5); + config.setModel("FAOMSY"); + + config.setParam("UserName", "gianpaolo.coro"); + config.setGcubeScope("/gcube"); +// config.setGcubeScope("/d4science.research-infrastructures.eu"); + config.setParam("ServiceUserName", "gianpaolo.coro"); + + D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=2; + +// config.setParam("StocksFile","http://goo.gl/g6YtVx"); + config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Short1.csv"); +// config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Short2.csv"); + +// config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Longtest.csv"); +// config.setParam("StocksFile","http://goo.gl/B09ZL0"); //50species + //config.setParam("IDsFile","http://goo.gl/9rg3qK"); + // config.setParam("StocksFile","http://goo.gl/Mp2ZLY"); +// config.setParam("StocksFile","http://goo.gl/btuIIe"); +// config.setParam("SelectedStock","Pan_bor_1"); +// config.setParam("SelectedStock","HLH_M08"); + + return config; + } +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java b/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java deleted file mode 100644 index 296f44c..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java +++ /dev/null @@ -1,84 +0,0 @@ -package org.gcube.dataanalysis.executor.util; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.net.URLConnection; - -import org.gcube.common.scope.api.ScopeProvider; -import org.gcube.contentmanagement.blobstorage.service.IClient; -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler; -import org.gcube.contentmanager.storageclient.wrapper.AccessType; -import org.gcube.contentmanager.storageclient.wrapper.MemoryType; -import org.gcube.contentmanager.storageclient.wrapper.StorageClient; -import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; - -public class StorageUtils { - - public static void downloadInputFile(String fileurl, String destinationFile) throws Exception{ - try { - Handler.activateProtocol(); - URL smpFile = new URL(fileurl); - URLConnection uc = (URLConnection) smpFile.openConnection(); - InputStream is = uc.getInputStream(); - AnalysisLogger.getLogger().debug("GenericWorker-> Retrieving from " + fileurl + " to :" + destinationFile); - inputStreamToFile(is, destinationFile); - is.close(); - } catch (Exception e) { - throw e; - } - } - - public static void inputStreamToFile(InputStream is, String path) throws FileNotFoundException, IOException { - FileOutputStream out = new FileOutputStream(new File(path)); - byte buf[] = new byte[1024]; - int len = 0; - while ((len = is.read(buf)) > 0) - out.write(buf, 0, len); - out.close(); - } - - public static String uploadFilesOnStorage(String scope, String user, String localFolder, String file) throws Exception { - try { - ScopeProvider.instance.set(scope); - AnalysisLogger.getLogger().info("Loading file on scope: " + scope); - IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient(); - String remotef = "/"+file; - client.put(true).LFile(new File(localFolder,file).getAbsolutePath()).RFile(remotef); - String url = client.getUrl().RFile(remotef); - AnalysisLogger.getLogger().info("Loading finished"); - System.gc(); - return url; - } catch (Exception e) { - AnalysisLogger.getLogger().info("Error in uploading file: " + e.getLocalizedMessage()); - throw e; - } - } - - - public static void downloadFilefromStorage(String scope, String user, String localFolder, String file) throws Exception { - try { - ScopeProvider.instance.set(scope); - AnalysisLogger.getLogger().info("Retrieving file on scope: " + scope); - IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient(); - String remotef = "/"+file; - client.get().LFile(new File(localFolder,file).getAbsolutePath()).RFile(remotef); - AnalysisLogger.getLogger().info("Retrieving finished"); - System.gc(); - } catch (Exception e) { - AnalysisLogger.getLogger().info("Error in retrieving file: " + e.getLocalizedMessage()); - throw e; - } - } - - - public static void main(String args[]) throws Exception{ - - uploadFilesOnStorage("/gcube", "CMSY", "./", "tacsat.csv"); - downloadFilefromStorage("/gcube", "CMSY", "./PARALLEL_PROCESSING", "tacsat.csv"); - } -}