From bcabae5d6be200f643b1ba21b8e44c6a2de032f0 Mon Sep 17 00:00:00 2001 From: Gianpaolo Coro Date: Thu, 9 Apr 2015 11:06:41 +0000 Subject: [PATCH] git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@113922 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../job/management/QueueJobManager.java | 5 +- .../executor/nodes/algorithms/FAOMSY.java | 223 ------------------ .../executor/tests/RegressionTestFAOMSY.java | 72 ------ .../executor/util/StorageUtils.java | 173 -------------- 4 files changed, 3 insertions(+), 470 deletions(-) delete mode 100644 src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java delete mode 100644 src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java delete mode 100644 src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java index 7ed53e8..13f0bfa 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java @@ -482,8 +482,9 @@ public class QueueJobManager { /* Used to add extra filter to ServiceEndpoint discovery */ query.setServiceEndpointQueryFilter(null); - - return query.discoverEndpoints(new ListEndpointDiscoveryFilter()); + List nodes = query.discoverEndpoints(new ListEndpointDiscoveryFilter()); + AnalysisLogger.getLogger().debug("Found the following nodes: "+nodes+" in scope "+scopeString); + return nodes; } diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java deleted file mode 100644 index 2c91d71..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/FAOMSY.java +++ /dev/null @@ -1,223 +0,0 @@ -package org.gcube.dataanalysis.executor.nodes.algorithms; - -import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; - -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS; -import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; -import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; -import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveTypesList; -import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; -import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; -import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; -import org.gcube.dataanalysis.ecoengine.utils.IOHelper; -import org.gcube.dataanalysis.ecoengine.utils.Transformations; -import org.gcube.dataanalysis.executor.scripts.OSCommand; -import org.gcube.dataanalysis.executor.util.RScriptsManager; -import org.gcube.dataanalysis.executor.util.StorageUtils; - -public class FAOMSY extends ActorNode { - - public int count; - - public float status = 0; - - @Override - public ALG_PROPS[] getProperties() { - ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON }; - return p; - } - - @Override - public String getName() { - return "FAOMSY"; - } - - @Override - public String getDescription() { - return "An algorithm to estimate the Maximum Sustainable Yield from a catch statistic by FAO."; - } - - static String stocksFile = "StocksFile"; - static String processOutput= "ProcessOutput"; - static String nonProcessedOutput= "NonProcessedOutput"; - static String scriptName = "CatchMSY_Dec2014.R"; - - @Override - public List getInputParameters() { - - List parameters = new ArrayList(); - IOHelper.addStringInput(parameters, stocksFile, "Http link to a file containing catch statistics for a group of species. Example: http://goo.gl/g6YtVx", ""); - return parameters; - } - - @Override - public StatisticalType getOutput() { - File outfile = new File(config.getPersistencePath(),config.getParam(processOutput)); - File outfile2 = new File(config.getPersistencePath(),config.getParam(nonProcessedOutput)); - PrimitiveTypesList list = new PrimitiveTypesList(File.class.getName(), PrimitiveTypes.FILE, "OutputFiles", "Textual output files - processed and non-processed species",false); - if (outfile.exists()){ - PrimitiveType o = new PrimitiveType(File.class.getName(), outfile, PrimitiveTypes.FILE, "ProcessedSpecies", "Output file"); - list.add(o); - } - if (outfile2.exists()){ - PrimitiveType o2 = new PrimitiveType(File.class.getName(), outfile2, PrimitiveTypes.FILE, "NonProcessedSpecies", "Output file"); - list.add(o2); - } - - - return list; - } - - @Override - public void initSingleNode(AlgorithmConfiguration config) { - - } - - @Override - public float getInternalStatus() { - return status; - } - - AlgorithmConfiguration config; - - @Override - public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) { - try { - status = 0; - config = Transformations.restoreConfig(nodeConfigurationFileObject); - String outputFile = config.getParam(processOutput)+"_part"+rightStartIndex; - String nonprocessedoutputFile = config.getParam(nonProcessedOutput)+"_part"+rightStartIndex; - AnalysisLogger.getLogger().info("FAOMSY ranges: "+" Li:"+leftStartIndex+" NLi:"+leftStartIndex+" Ri:"+rightStartIndex+" NRi:"+numberOfRightElementsToProcess); - - AnalysisLogger.getLogger().info("FAOMSY expected output "+outputFile); - - File filestock=new File(sandboxFolder,"D20_1.csv"); - StorageUtils.downloadInputFile(config.getParam(stocksFile), filestock.getAbsolutePath()); - - AnalysisLogger.getLogger().debug("Check fileStocks: "+filestock.getAbsolutePath()+" "+filestock.exists()); - File filestocksub=new File(sandboxFolder,"D20.csv"); - - StorageUtils.FileSubset(filestock, filestocksub, rightStartIndex, numberOfRightElementsToProcess, true); - - RScriptsManager scriptmanager = new RScriptsManager(); - - HashMap codeinj = new HashMap(); - config.setConfigPath("./"); - scriptmanager.executeRScript(config, scriptName, "", new HashMap(), "", "CatchMSY_Output.csv", codeinj, false,false,false); - AnalysisLogger.getLogger().info("FAOMSY The script has finished"); - String outputFileName = ""; - //manage the fact that the outputfile could even not exist - try{outputFileName = scriptmanager.getCurrentOutputFileName();}catch(Exception e){ - AnalysisLogger.getLogger().info("FAOMSY Could not get curent output file"); - } - String optionalFileOutputName = "NonProcessedSpecies.csv"; - - if (outputFileName!=null && outputFileName.length()>0 && new File(outputFileName).exists()){ - AnalysisLogger.getLogger().info("FAOMSY Main file exists!"); - outputFileName = scriptmanager.getCurrentOutputFileName(); - String outputFilePath = new File(sandboxFolder,outputFile).getAbsolutePath(); - AnalysisLogger.getLogger().info("FAOMSY writing output file in path "+outputFilePath); - OSCommand.FileCopy(outputFileName,outputFilePath); - AnalysisLogger.getLogger().info("FAOMSY uploading output file "+outputFile); - StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), sandboxFolder,outputFile); - } - if (new File(optionalFileOutputName).exists()){ - AnalysisLogger.getLogger().info("FAOMSY Optional file exists!"); - OSCommand.FileCopy(optionalFileOutputName,nonprocessedoutputFile); - AnalysisLogger.getLogger().info("FAOMSY uploading output file "+nonprocessedoutputFile); - //check only -// String file = FileTools.loadString(nonprocessedoutputFile, "UTF-8"); -// AnalysisLogger.getLogger().info("FAOMSY File check"+file); - StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), sandboxFolder,nonprocessedoutputFile); - } - - AnalysisLogger.getLogger().info("FAOMSY Finished"); - }catch(Exception e){ - e.printStackTrace(); - } - - return 0; - } - - @Override - public void setup(AlgorithmConfiguration config) throws Exception { - this.config = config; - AnalysisLogger.getLogger().info("FAOMSY process is initialized"); - String uuid = (UUID.randomUUID()+".txt").replace("-", ""); - config.setParam(processOutput, "FAOMSY_"+"output_"+uuid); - config.setParam(nonProcessedOutput, "FAOMSY_nonprocessed_"+"output_"+uuid); - File tempfile = new File(config.getPersistencePath(),"FAOMSY_input_"+(UUID.randomUUID()+".csv").replace("-", "")); - StorageUtils.downloadInputFile(config.getParam(stocksFile), tempfile.getAbsolutePath()); - nstocks = StorageUtils.calcFileRows(tempfile, true); - AnalysisLogger.getLogger().info("FAOMSY Found "+nstocks+" stocks!"); - if (nstocks==0) - throw new Exception("Error in FAOMSY: No stocks to process found in the file "+config.getParam(stocksFile)); - - } - - int nstocks = 0; - @Override - public int getNumberOfRightElements() { - return nstocks; - } - - @Override - public int getNumberOfLeftElements() { - return 1; - } - - @Override - public void stop() { - AnalysisLogger.getLogger().info("CMSY process stopped"); - } - - boolean haspostprocessed = false; - - public void assembleFiles(String outputFileName) throws Exception{ - - //try downloading all the files - List fileslist = new ArrayList(); - for (int i=0;i<=nstocks;i++){ - String filename = outputFileName+"_part"+i; - try{ - StorageUtils.downloadFilefromStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), config.getPersistencePath(), filename); - AnalysisLogger.getLogger().debug("FAOMSY - Saved from Storage: "+filename); - fileslist.add(filename); - }catch(Exception e){ - AnalysisLogger.getLogger().debug("FAOMSY - Did not save file from Storage: "+filename); - } - } - - - AnalysisLogger.getLogger().debug("FAOMSY - Merging files in "+outputFileName); - if (fileslist.size()>0) - StorageUtils.mergeFiles(config.getPersistencePath(), fileslist, outputFileName, true); - - AnalysisLogger.getLogger().debug("FAOMSY - Deleting parts"); - for (String file:fileslist){ - new File(config.getPersistencePath(),file).delete(); - } - - AnalysisLogger.getLogger().debug("FAOMSY - File assembling complete"); - - } - - @Override - public void postProcess(boolean manageDuplicates, boolean manageFault) { - try { - String mainOutputfilename = config.getParam(processOutput); - String optionalOutputfilename = config.getParam(nonProcessedOutput); - assembleFiles(mainOutputfilename); - assembleFiles(optionalOutputfilename); - AnalysisLogger.getLogger().debug("FAOMSY - Postprocess complete"); - } catch (Exception e) { - e.printStackTrace(); - } - } - -} diff --git a/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java b/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java deleted file mode 100644 index e32f90a..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/tests/RegressionTestFAOMSY.java +++ /dev/null @@ -1,72 +0,0 @@ -package org.gcube.dataanalysis.executor.tests; - -import java.util.List; - -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; -import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; -import org.gcube.dataanalysis.ecoengine.interfaces.ComputationalAgent; -import org.gcube.dataanalysis.ecoengine.processing.factories.GeneratorsFactory; -import org.gcube.dataanalysis.ecoengine.test.regression.Regressor; -import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; - -public class RegressionTestFAOMSY { - /** - * example of parallel processing on a single machine the procedure will generate a new table for a distribution on suitable species - * - */ - -public static AlgorithmConfiguration getConfig() { - - AlgorithmConfiguration config = new AlgorithmConfiguration(); - - config.setConfigPath("./cfg/"); - config.setPersistencePath("./"); - config.setParam("DatabaseUserName","utente"); - config.setParam("DatabasePassword","d4science"); - config.setParam("DatabaseURL","jdbc:postgresql://dbtest.research-infrastructures.eu/testdb"); - config.setParam("DatabaseDriver","org.postgresql.Driver"); - AnalysisLogger.setLogger(config.getConfigPath()+AlgorithmConfiguration.defaultLoggerFile); - return config; - } - - public static void main(String[] args) throws Exception { - - System.out.println("TEST 1"); - - List generators = GeneratorsFactory.getGenerators(testCMSY()); - generators.get(0).init(); - CustomRegressor.process(generators.get(0)); - StatisticalType output = generators.get(0).getOutput(); - AnalysisLogger.getLogger().debug("Output description: "+output.getDescription()); - generators = null; - } - - private static AlgorithmConfiguration testCMSY() { - - AlgorithmConfiguration config = getConfig(); - config.setNumberOfResources(5); - config.setModel("FAOMSY"); - - config.setParam("UserName", "gianpaolo.coro"); - config.setGcubeScope("/gcube"); -// config.setGcubeScope("/d4science.research-infrastructures.eu"); - config.setParam("ServiceUserName", "gianpaolo.coro"); - - D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=2; - -// config.setParam("StocksFile","http://goo.gl/g6YtVx"); - config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Short1.csv"); -// config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Short2.csv"); - -// config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Longtest.csv"); -// config.setParam("StocksFile","http://goo.gl/B09ZL0"); //50species - //config.setParam("IDsFile","http://goo.gl/9rg3qK"); - // config.setParam("StocksFile","http://goo.gl/Mp2ZLY"); -// config.setParam("StocksFile","http://goo.gl/btuIIe"); -// config.setParam("SelectedStock","Pan_bor_1"); -// config.setParam("SelectedStock","HLH_M08"); - - return config; - } -} diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java b/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java deleted file mode 100644 index 6882260..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java +++ /dev/null @@ -1,173 +0,0 @@ -package org.gcube.dataanalysis.executor.util; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.net.URLConnection; -import java.util.List; - -import org.gcube.common.scope.api.ScopeProvider; -import org.gcube.contentmanagement.blobstorage.service.IClient; -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler; -import org.gcube.contentmanager.storageclient.wrapper.AccessType; -import org.gcube.contentmanager.storageclient.wrapper.MemoryType; -import org.gcube.contentmanager.storageclient.wrapper.StorageClient; -import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; - -public class StorageUtils { - - public static void downloadInputFile(String fileurl, String destinationFile) throws Exception{ - try { - Handler.activateProtocol(); - URL smpFile = new URL(fileurl); - URLConnection uc = (URLConnection) smpFile.openConnection(); - InputStream is = uc.getInputStream(); - AnalysisLogger.getLogger().debug("GenericWorker-> Retrieving from " + fileurl + " to :" + destinationFile); - inputStreamToFile(is, destinationFile); - is.close(); - } catch (Exception e) { - throw e; - } - } - - public static void inputStreamToFile(InputStream is, String path) throws FileNotFoundException, IOException { - FileOutputStream out = new FileOutputStream(new File(path)); - byte buf[] = new byte[1024]; - int len = 0; - while ((len = is.read(buf)) > 0) - out.write(buf, 0, len); - out.close(); - } - - public static String uploadFilesOnStorage(String scope, String user, String localFolder, String file) throws Exception { - try { - ScopeProvider.instance.set(scope); - AnalysisLogger.getLogger().info("Loading file on scope: " + scope); - IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient(); - String remotef = "/"+file; - client.put(true).LFile(new File(localFolder,file).getAbsolutePath()).RFile(remotef); - String url = client.getUrl().RFile(remotef); - AnalysisLogger.getLogger().info("Loading finished"); - System.gc(); - return url; - } catch (Exception e) { - AnalysisLogger.getLogger().info("Error in uploading file: " + e.getLocalizedMessage()); - throw e; - } - } - - public static int calcFileRows(File file, boolean hasheader){ - try{ - BufferedReader br = new BufferedReader(new FileReader(file)); - String line = br.readLine(); - if (hasheader) - line = br.readLine(); // skip header - int counter = 0; - while (line!=null){ - counter++; - line = br.readLine(); - } - br.close(); - return counter; - }catch(Exception e){ - e.printStackTrace(); - return 0; - } - - } - - - public static void FileSubset(File infile, File outfile, int index, int numberofelements, boolean hasheader) throws Exception{ - - BufferedReader br = new BufferedReader(new FileReader(infile)); - BufferedWriter bw = new BufferedWriter(new FileWriter(outfile)); - - String header = null; - if (hasheader){ - header = br.readLine(); // skip header - bw.write(header+"\n"); - } - String line = br.readLine(); - int counter = 0; - while (line!=null && counter < (index+numberofelements)){ - - if (counter>=index) - bw.write(line+"\n"); - - counter++; - line = br.readLine(); - } - - br.close(); - bw.close(); - - } - - - public static void downloadFilefromStorage(String scope, String user, String localFolder, String file) throws Exception { - try { - ScopeProvider.instance.set(scope); - AnalysisLogger.getLogger().info("Retrieving file on scope: " + scope); - IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient(); - String remotef = "/"+file; - client.get().LFile(new File(localFolder,file).getAbsolutePath()).RFile(remotef); - AnalysisLogger.getLogger().info("Retrieving finished"); - System.gc(); - } catch (Exception e) { - AnalysisLogger.getLogger().info("Error in retrieving file: " + e.getLocalizedMessage()); - throw e; - } - } - - public static void mergeFiles(String localFolder, List filenames, String outputfile, boolean hasheader) throws Exception { - try { - - int nfiles = filenames.size(); - BufferedWriter bw = new BufferedWriter(new FileWriter(outputfile)); - for (int i=0;i