diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java new file mode 100644 index 0000000..2c91d71 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java @@ -0,0 +1,223 @@ +package org.gcube.dataanalysis.executor.nodes.algorithms; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; +import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveTypesList; +import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; +import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; +import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; +import org.gcube.dataanalysis.ecoengine.utils.IOHelper; +import org.gcube.dataanalysis.ecoengine.utils.Transformations; +import org.gcube.dataanalysis.executor.scripts.OSCommand; +import org.gcube.dataanalysis.executor.util.RScriptsManager; +import org.gcube.dataanalysis.executor.util.StorageUtils; + +public class FAOMSY extends ActorNode { + + public int count; + + public float status = 0; + + @Override + public ALG_PROPS[] getProperties() { + ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON }; + return p; + } + + @Override + public String getName() { + return "FAOMSY"; + } + + @Override + public String getDescription() { + return "An algorithm to estimate the Maximum Sustainable Yield from a catch statistic by FAO."; + } + + static String stocksFile = "StocksFile"; + static String processOutput= "ProcessOutput"; + static String nonProcessedOutput= "NonProcessedOutput"; + static String scriptName = "CatchMSY_Dec2014.R"; + + @Override + public List getInputParameters() { + + List parameters = new ArrayList(); + IOHelper.addStringInput(parameters, stocksFile, "Http link to a file containing catch statistics for a group of species. Example: http://goo.gl/g6YtVx", ""); + return parameters; + } + + @Override + public StatisticalType getOutput() { + File outfile = new File(config.getPersistencePath(),config.getParam(processOutput)); + File outfile2 = new File(config.getPersistencePath(),config.getParam(nonProcessedOutput)); + PrimitiveTypesList list = new PrimitiveTypesList(File.class.getName(), PrimitiveTypes.FILE, "OutputFiles", "Textual output files - processed and non-processed species",false); + if (outfile.exists()){ + PrimitiveType o = new PrimitiveType(File.class.getName(), outfile, PrimitiveTypes.FILE, "ProcessedSpecies", "Output file"); + list.add(o); + } + if (outfile2.exists()){ + PrimitiveType o2 = new PrimitiveType(File.class.getName(), outfile2, PrimitiveTypes.FILE, "NonProcessedSpecies", "Output file"); + list.add(o2); + } + + + return list; + } + + @Override + public void initSingleNode(AlgorithmConfiguration config) { + + } + + @Override + public float getInternalStatus() { + return status; + } + + AlgorithmConfiguration config; + + @Override + public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) { + try { + status = 0; + config = Transformations.restoreConfig(nodeConfigurationFileObject); + String outputFile = config.getParam(processOutput)+"_part"+rightStartIndex; + String nonprocessedoutputFile = config.getParam(nonProcessedOutput)+"_part"+rightStartIndex; + AnalysisLogger.getLogger().info("FAOMSY ranges: "+" Li:"+leftStartIndex+" NLi:"+leftStartIndex+" Ri:"+rightStartIndex+" NRi:"+numberOfRightElementsToProcess); + + AnalysisLogger.getLogger().info("FAOMSY expected output "+outputFile); + + File filestock=new File(sandboxFolder,"D20_1.csv"); + StorageUtils.downloadInputFile(config.getParam(stocksFile), filestock.getAbsolutePath()); + + AnalysisLogger.getLogger().debug("Check fileStocks: "+filestock.getAbsolutePath()+" "+filestock.exists()); + File filestocksub=new File(sandboxFolder,"D20.csv"); + + StorageUtils.FileSubset(filestock, filestocksub, rightStartIndex, numberOfRightElementsToProcess, true); + + RScriptsManager scriptmanager = new RScriptsManager(); + + HashMap codeinj = new HashMap(); + config.setConfigPath("./"); + scriptmanager.executeRScript(config, scriptName, "", new HashMap(), "", "CatchMSY_Output.csv", codeinj, false,false,false); + AnalysisLogger.getLogger().info("FAOMSY The script has finished"); + String outputFileName = ""; + //manage the fact that the outputfile could even not exist + try{outputFileName = scriptmanager.getCurrentOutputFileName();}catch(Exception e){ + AnalysisLogger.getLogger().info("FAOMSY Could not get curent output file"); + } + String optionalFileOutputName = "NonProcessedSpecies.csv"; + + if (outputFileName!=null && outputFileName.length()>0 && new File(outputFileName).exists()){ + AnalysisLogger.getLogger().info("FAOMSY Main file exists!"); + outputFileName = scriptmanager.getCurrentOutputFileName(); + String outputFilePath = new File(sandboxFolder,outputFile).getAbsolutePath(); + AnalysisLogger.getLogger().info("FAOMSY writing output file in path "+outputFilePath); + OSCommand.FileCopy(outputFileName,outputFilePath); + AnalysisLogger.getLogger().info("FAOMSY uploading output file "+outputFile); + StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), sandboxFolder,outputFile); + } + if (new File(optionalFileOutputName).exists()){ + AnalysisLogger.getLogger().info("FAOMSY Optional file exists!"); + OSCommand.FileCopy(optionalFileOutputName,nonprocessedoutputFile); + AnalysisLogger.getLogger().info("FAOMSY uploading output file "+nonprocessedoutputFile); + //check only +// String file = FileTools.loadString(nonprocessedoutputFile, "UTF-8"); +// AnalysisLogger.getLogger().info("FAOMSY File check"+file); + StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), sandboxFolder,nonprocessedoutputFile); + } + + AnalysisLogger.getLogger().info("FAOMSY Finished"); + }catch(Exception e){ + e.printStackTrace(); + } + + return 0; + } + + @Override + public void setup(AlgorithmConfiguration config) throws Exception { + this.config = config; + AnalysisLogger.getLogger().info("FAOMSY process is initialized"); + String uuid = (UUID.randomUUID()+".txt").replace("-", ""); + config.setParam(processOutput, "FAOMSY_"+"output_"+uuid); + config.setParam(nonProcessedOutput, "FAOMSY_nonprocessed_"+"output_"+uuid); + File tempfile = new File(config.getPersistencePath(),"FAOMSY_input_"+(UUID.randomUUID()+".csv").replace("-", "")); + StorageUtils.downloadInputFile(config.getParam(stocksFile), tempfile.getAbsolutePath()); + nstocks = StorageUtils.calcFileRows(tempfile, true); + AnalysisLogger.getLogger().info("FAOMSY Found "+nstocks+" stocks!"); + if (nstocks==0) + throw new Exception("Error in FAOMSY: No stocks to process found in the file "+config.getParam(stocksFile)); + + } + + int nstocks = 0; + @Override + public int getNumberOfRightElements() { + return nstocks; + } + + @Override + public int getNumberOfLeftElements() { + return 1; + } + + @Override + public void stop() { + AnalysisLogger.getLogger().info("CMSY process stopped"); + } + + boolean haspostprocessed = false; + + public void assembleFiles(String outputFileName) throws Exception{ + + //try downloading all the files + List fileslist = new ArrayList(); + for (int i=0;i<=nstocks;i++){ + String filename = outputFileName+"_part"+i; + try{ + StorageUtils.downloadFilefromStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), config.getPersistencePath(), filename); + AnalysisLogger.getLogger().debug("FAOMSY - Saved from Storage: "+filename); + fileslist.add(filename); + }catch(Exception e){ + AnalysisLogger.getLogger().debug("FAOMSY - Did not save file from Storage: "+filename); + } + } + + + AnalysisLogger.getLogger().debug("FAOMSY - Merging files in "+outputFileName); + if (fileslist.size()>0) + StorageUtils.mergeFiles(config.getPersistencePath(), fileslist, outputFileName, true); + + AnalysisLogger.getLogger().debug("FAOMSY - Deleting parts"); + for (String file:fileslist){ + new File(config.getPersistencePath(),file).delete(); + } + + AnalysisLogger.getLogger().debug("FAOMSY - File assembling complete"); + + } + + @Override + public void postProcess(boolean manageDuplicates, boolean manageFault) { + try { + String mainOutputfilename = config.getParam(processOutput); + String optionalOutputfilename = config.getParam(nonProcessedOutput); + assembleFiles(mainOutputfilename); + assembleFiles(optionalOutputfilename); + AnalysisLogger.getLogger().debug("FAOMSY - Postprocess complete"); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java b/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java new file mode 100644 index 0000000..d74a50a --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java @@ -0,0 +1,72 @@ +package org.gcube.dataanalysis.executor.tests; + +import java.util.List; + +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; +import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; +import org.gcube.dataanalysis.ecoengine.interfaces.ComputationalAgent; +import org.gcube.dataanalysis.ecoengine.processing.factories.GeneratorsFactory; +import org.gcube.dataanalysis.ecoengine.test.regression.Regressor; +import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; + +public class RegressionTestFAOMSY { + /** + * example of parallel processing on a single machine the procedure will generate a new table for a distribution on suitable species + * + */ + +public static AlgorithmConfiguration getConfig() { + + AlgorithmConfiguration config = new AlgorithmConfiguration(); + + config.setConfigPath("./cfg/"); + config.setPersistencePath("./"); + config.setParam("DatabaseUserName","utente"); + config.setParam("DatabasePassword","d4science"); + config.setParam("DatabaseURL","jdbc:postgresql://dbtest.research-infrastructures.eu/testdb"); + config.setParam("DatabaseDriver","org.postgresql.Driver"); + AnalysisLogger.setLogger(config.getConfigPath()+AlgorithmConfiguration.defaultLoggerFile); + return config; + } + + public static void main(String[] args) throws Exception { + + System.out.println("TEST 1"); + + List generators = GeneratorsFactory.getGenerators(testCMSY()); + generators.get(0).init(); + CustomRegressor.process(generators.get(0)); + StatisticalType output = generators.get(0).getOutput(); + AnalysisLogger.getLogger().debug("Output description: "+output.getDescription()); + generators = null; + } + + private static AlgorithmConfiguration testCMSY() { + + AlgorithmConfiguration config = getConfig(); + config.setNumberOfResources(5); + config.setModel("FAOMSY"); + + config.setParam("UserName", "gianpaolo.coro"); +// config.setGcubeScope("/gcube"); + config.setGcubeScope("/d4science.research-infrastructures.eu"); + config.setParam("ServiceUserName", "gianpaolo.coro"); + + D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=2; + +// config.setParam("StocksFile","http://goo.gl/g6YtVx"); + //config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Short1.csv"); +// config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Short2.csv"); + +// config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Longtest.csv"); + config.setParam("StocksFile","http://goo.gl/B09ZL0"); //50species + //config.setParam("IDsFile","http://goo.gl/9rg3qK"); + // config.setParam("StocksFile","http://goo.gl/Mp2ZLY"); +// config.setParam("StocksFile","http://goo.gl/btuIIe"); +// config.setParam("SelectedStock","Pan_bor_1"); +// config.setParam("SelectedStock","HLH_M08"); + + return config; + } +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java b/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java new file mode 100644 index 0000000..6882260 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java @@ -0,0 +1,173 @@ +package org.gcube.dataanalysis.executor.util; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; +import java.util.List; + +import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.contentmanagement.blobstorage.service.IClient; +import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; +import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler; +import org.gcube.contentmanager.storageclient.wrapper.AccessType; +import org.gcube.contentmanager.storageclient.wrapper.MemoryType; +import org.gcube.contentmanager.storageclient.wrapper.StorageClient; +import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; + +public class StorageUtils { + + public static void downloadInputFile(String fileurl, String destinationFile) throws Exception{ + try { + Handler.activateProtocol(); + URL smpFile = new URL(fileurl); + URLConnection uc = (URLConnection) smpFile.openConnection(); + InputStream is = uc.getInputStream(); + AnalysisLogger.getLogger().debug("GenericWorker-> Retrieving from " + fileurl + " to :" + destinationFile); + inputStreamToFile(is, destinationFile); + is.close(); + } catch (Exception e) { + throw e; + } + } + + public static void inputStreamToFile(InputStream is, String path) throws FileNotFoundException, IOException { + FileOutputStream out = new FileOutputStream(new File(path)); + byte buf[] = new byte[1024]; + int len = 0; + while ((len = is.read(buf)) > 0) + out.write(buf, 0, len); + out.close(); + } + + public static String uploadFilesOnStorage(String scope, String user, String localFolder, String file) throws Exception { + try { + ScopeProvider.instance.set(scope); + AnalysisLogger.getLogger().info("Loading file on scope: " + scope); + IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient(); + String remotef = "/"+file; + client.put(true).LFile(new File(localFolder,file).getAbsolutePath()).RFile(remotef); + String url = client.getUrl().RFile(remotef); + AnalysisLogger.getLogger().info("Loading finished"); + System.gc(); + return url; + } catch (Exception e) { + AnalysisLogger.getLogger().info("Error in uploading file: " + e.getLocalizedMessage()); + throw e; + } + } + + public static int calcFileRows(File file, boolean hasheader){ + try{ + BufferedReader br = new BufferedReader(new FileReader(file)); + String line = br.readLine(); + if (hasheader) + line = br.readLine(); // skip header + int counter = 0; + while (line!=null){ + counter++; + line = br.readLine(); + } + br.close(); + return counter; + }catch(Exception e){ + e.printStackTrace(); + return 0; + } + + } + + + public static void FileSubset(File infile, File outfile, int index, int numberofelements, boolean hasheader) throws Exception{ + + BufferedReader br = new BufferedReader(new FileReader(infile)); + BufferedWriter bw = new BufferedWriter(new FileWriter(outfile)); + + String header = null; + if (hasheader){ + header = br.readLine(); // skip header + bw.write(header+"\n"); + } + String line = br.readLine(); + int counter = 0; + while (line!=null && counter < (index+numberofelements)){ + + if (counter>=index) + bw.write(line+"\n"); + + counter++; + line = br.readLine(); + } + + br.close(); + bw.close(); + + } + + + public static void downloadFilefromStorage(String scope, String user, String localFolder, String file) throws Exception { + try { + ScopeProvider.instance.set(scope); + AnalysisLogger.getLogger().info("Retrieving file on scope: " + scope); + IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient(); + String remotef = "/"+file; + client.get().LFile(new File(localFolder,file).getAbsolutePath()).RFile(remotef); + AnalysisLogger.getLogger().info("Retrieving finished"); + System.gc(); + } catch (Exception e) { + AnalysisLogger.getLogger().info("Error in retrieving file: " + e.getLocalizedMessage()); + throw e; + } + } + + public static void mergeFiles(String localFolder, List filenames, String outputfile, boolean hasheader) throws Exception { + try { + + int nfiles = filenames.size(); + BufferedWriter bw = new BufferedWriter(new FileWriter(outputfile)); + for (int i=0;i