Gianpaolo Coro 2015-03-30 15:32:33 +00:00
parent b7b5df763b
commit 6dc30f53b0
3 changed files with 295 additions and 84 deletions

View File

@ -0,0 +1,223 @@
package org.gcube.dataanalysis.executor.nodes.algorithms;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS;
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType;
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveTypesList;
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode;
import org.gcube.dataanalysis.ecoengine.utils.IOHelper;
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
import org.gcube.dataanalysis.executor.scripts.OSCommand;
import org.gcube.dataanalysis.executor.util.RScriptsManager;
import org.gcube.dataanalysis.executor.util.StorageUtils;
public class FAOMSY extends ActorNode {
public int count;
public float status = 0;
@Override
public ALG_PROPS[] getProperties() {
ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON };
return p;
}
@Override
public String getName() {
return "FAOMSY";
}
@Override
public String getDescription() {
return "An algorithm to estimate the Maximum Sustainable Yield from a catch statistic by FAO.";
}
static String stocksFile = "StocksFile";
static String processOutput= "ProcessOutput";
static String nonProcessedOutput= "NonProcessedOutput";
static String scriptName = "CatchMSY_Dec2014.R";
@Override
public List<StatisticalType> getInputParameters() {
List<StatisticalType> parameters = new ArrayList<StatisticalType>();
IOHelper.addStringInput(parameters, stocksFile, "Http link to a file containing catch statistics for a group of species. Example: http://goo.gl/g6YtVx", "");
return parameters;
}
@Override
public StatisticalType getOutput() {
File outfile = new File(config.getPersistencePath(),config.getParam(processOutput));
File outfile2 = new File(config.getPersistencePath(),config.getParam(nonProcessedOutput));
PrimitiveTypesList list = new PrimitiveTypesList(File.class.getName(), PrimitiveTypes.FILE, "OutputFiles", "Textual output files - processed and non-processed species",false);
if (outfile.exists()){
PrimitiveType o = new PrimitiveType(File.class.getName(), outfile, PrimitiveTypes.FILE, "ProcessedSpecies", "Output file");
list.add(o);
}
if (outfile2.exists()){
PrimitiveType o2 = new PrimitiveType(File.class.getName(), outfile2, PrimitiveTypes.FILE, "NonProcessedSpecies", "Output file");
list.add(o2);
}
return list;
}
@Override
public void initSingleNode(AlgorithmConfiguration config) {
}
@Override
public float getInternalStatus() {
return status;
}
AlgorithmConfiguration config;
@Override
public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) {
try {
status = 0;
config = Transformations.restoreConfig(nodeConfigurationFileObject);
String outputFile = config.getParam(processOutput)+"_part"+rightStartIndex;
String nonprocessedoutputFile = config.getParam(nonProcessedOutput)+"_part"+rightStartIndex;
AnalysisLogger.getLogger().info("FAOMSY ranges: "+" Li:"+leftStartIndex+" NLi:"+leftStartIndex+" Ri:"+rightStartIndex+" NRi:"+numberOfRightElementsToProcess);
AnalysisLogger.getLogger().info("FAOMSY expected output "+outputFile);
File filestock=new File(sandboxFolder,"D20_1.csv");
StorageUtils.downloadInputFile(config.getParam(stocksFile), filestock.getAbsolutePath());
AnalysisLogger.getLogger().debug("Check fileStocks: "+filestock.getAbsolutePath()+" "+filestock.exists());
File filestocksub=new File(sandboxFolder,"D20.csv");
StorageUtils.FileSubset(filestock, filestocksub, rightStartIndex, numberOfRightElementsToProcess, true);
RScriptsManager scriptmanager = new RScriptsManager();
HashMap<String,String> codeinj = new HashMap<String,String>();
config.setConfigPath("./");
scriptmanager.executeRScript(config, scriptName, "", new HashMap<String,String>(), "", "CatchMSY_Output.csv", codeinj, false,false,false);
AnalysisLogger.getLogger().info("FAOMSY The script has finished");
String outputFileName = "";
//manage the fact that the outputfile could even not exist
try{outputFileName = scriptmanager.getCurrentOutputFileName();}catch(Exception e){
AnalysisLogger.getLogger().info("FAOMSY Could not get curent output file");
}
String optionalFileOutputName = "NonProcessedSpecies.csv";
if (outputFileName!=null && outputFileName.length()>0 && new File(outputFileName).exists()){
AnalysisLogger.getLogger().info("FAOMSY Main file exists!");
outputFileName = scriptmanager.getCurrentOutputFileName();
String outputFilePath = new File(sandboxFolder,outputFile).getAbsolutePath();
AnalysisLogger.getLogger().info("FAOMSY writing output file in path "+outputFilePath);
OSCommand.FileCopy(outputFileName,outputFilePath);
AnalysisLogger.getLogger().info("FAOMSY uploading output file "+outputFile);
StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), sandboxFolder,outputFile);
}
if (new File(optionalFileOutputName).exists()){
AnalysisLogger.getLogger().info("FAOMSY Optional file exists!");
OSCommand.FileCopy(optionalFileOutputName,nonprocessedoutputFile);
AnalysisLogger.getLogger().info("FAOMSY uploading output file "+nonprocessedoutputFile);
//check only
// String file = FileTools.loadString(nonprocessedoutputFile, "UTF-8");
// AnalysisLogger.getLogger().info("FAOMSY File check"+file);
StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), sandboxFolder,nonprocessedoutputFile);
}
AnalysisLogger.getLogger().info("FAOMSY Finished");
}catch(Exception e){
e.printStackTrace();
}
return 0;
}
@Override
public void setup(AlgorithmConfiguration config) throws Exception {
this.config = config;
AnalysisLogger.getLogger().info("FAOMSY process is initialized");
String uuid = (UUID.randomUUID()+".txt").replace("-", "");
config.setParam(processOutput, "FAOMSY_"+"output_"+uuid);
config.setParam(nonProcessedOutput, "FAOMSY_nonprocessed_"+"output_"+uuid);
File tempfile = new File(config.getPersistencePath(),"FAOMSY_input_"+(UUID.randomUUID()+".csv").replace("-", ""));
StorageUtils.downloadInputFile(config.getParam(stocksFile), tempfile.getAbsolutePath());
nstocks = StorageUtils.calcFileRows(tempfile, true);
AnalysisLogger.getLogger().info("FAOMSY Found "+nstocks+" stocks!");
if (nstocks==0)
throw new Exception("Error in FAOMSY: No stocks to process found in the file "+config.getParam(stocksFile));
}
int nstocks = 0;
@Override
public int getNumberOfRightElements() {
return nstocks;
}
@Override
public int getNumberOfLeftElements() {
return 1;
}
@Override
public void stop() {
AnalysisLogger.getLogger().info("CMSY process stopped");
}
boolean haspostprocessed = false;
public void assembleFiles(String outputFileName) throws Exception{
//try downloading all the files
List<String> fileslist = new ArrayList<String>();
for (int i=0;i<=nstocks;i++){
String filename = outputFileName+"_part"+i;
try{
StorageUtils.downloadFilefromStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), config.getPersistencePath(), filename);
AnalysisLogger.getLogger().debug("FAOMSY - Saved from Storage: "+filename);
fileslist.add(filename);
}catch(Exception e){
AnalysisLogger.getLogger().debug("FAOMSY - Did not save file from Storage: "+filename);
}
}
AnalysisLogger.getLogger().debug("FAOMSY - Merging files in "+outputFileName);
if (fileslist.size()>0)
StorageUtils.mergeFiles(config.getPersistencePath(), fileslist, outputFileName, true);
AnalysisLogger.getLogger().debug("FAOMSY - Deleting parts");
for (String file:fileslist){
new File(config.getPersistencePath(),file).delete();
}
AnalysisLogger.getLogger().debug("FAOMSY - File assembling complete");
}
@Override
public void postProcess(boolean manageDuplicates, boolean manageFault) {
try {
String mainOutputfilename = config.getParam(processOutput);
String optionalOutputfilename = config.getParam(nonProcessedOutput);
assembleFiles(mainOutputfilename);
assembleFiles(optionalOutputfilename);
AnalysisLogger.getLogger().debug("FAOMSY - Postprocess complete");
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,72 @@
package org.gcube.dataanalysis.executor.tests;
import java.util.List;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
import org.gcube.dataanalysis.ecoengine.interfaces.ComputationalAgent;
import org.gcube.dataanalysis.ecoengine.processing.factories.GeneratorsFactory;
import org.gcube.dataanalysis.ecoengine.test.regression.Regressor;
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
public class RegressionTestFAOMSY {
/**
* example of parallel processing on a single machine the procedure will generate a new table for a distribution on suitable species
*
*/
public static AlgorithmConfiguration getConfig() {
AlgorithmConfiguration config = new AlgorithmConfiguration();
config.setConfigPath("./cfg/");
config.setPersistencePath("./");
config.setParam("DatabaseUserName","utente");
config.setParam("DatabasePassword","d4science");
config.setParam("DatabaseURL","jdbc:postgresql://dbtest.research-infrastructures.eu/testdb");
config.setParam("DatabaseDriver","org.postgresql.Driver");
AnalysisLogger.setLogger(config.getConfigPath()+AlgorithmConfiguration.defaultLoggerFile);
return config;
}
public static void main(String[] args) throws Exception {
System.out.println("TEST 1");
List<ComputationalAgent> generators = GeneratorsFactory.getGenerators(testCMSY());
generators.get(0).init();
CustomRegressor.process(generators.get(0));
StatisticalType output = generators.get(0).getOutput();
AnalysisLogger.getLogger().debug("Output description: "+output.getDescription());
generators = null;
}
private static AlgorithmConfiguration testCMSY() {
AlgorithmConfiguration config = getConfig();
config.setNumberOfResources(5);
config.setModel("FAOMSY");
config.setParam("UserName", "gianpaolo.coro");
config.setGcubeScope("/gcube");
// config.setGcubeScope("/d4science.research-infrastructures.eu");
config.setParam("ServiceUserName", "gianpaolo.coro");
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=2;
// config.setParam("StocksFile","http://goo.gl/g6YtVx");
config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Short1.csv");
// config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Short2.csv");
// config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Longtest.csv");
// config.setParam("StocksFile","http://goo.gl/B09ZL0"); //50species
//config.setParam("IDsFile","http://goo.gl/9rg3qK");
// config.setParam("StocksFile","http://goo.gl/Mp2ZLY");
// config.setParam("StocksFile","http://goo.gl/btuIIe");
// config.setParam("SelectedStock","Pan_bor_1");
// config.setParam("SelectedStock","HLH_M08");
return config;
}
}

View File

@ -1,84 +0,0 @@
package org.gcube.dataanalysis.executor.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.contentmanagement.blobstorage.service.IClient;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler;
import org.gcube.contentmanager.storageclient.wrapper.AccessType;
import org.gcube.contentmanager.storageclient.wrapper.MemoryType;
import org.gcube.contentmanager.storageclient.wrapper.StorageClient;
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
public class StorageUtils {
public static void downloadInputFile(String fileurl, String destinationFile) throws Exception{
try {
Handler.activateProtocol();
URL smpFile = new URL(fileurl);
URLConnection uc = (URLConnection) smpFile.openConnection();
InputStream is = uc.getInputStream();
AnalysisLogger.getLogger().debug("GenericWorker-> Retrieving from " + fileurl + " to :" + destinationFile);
inputStreamToFile(is, destinationFile);
is.close();
} catch (Exception e) {
throw e;
}
}
public static void inputStreamToFile(InputStream is, String path) throws FileNotFoundException, IOException {
FileOutputStream out = new FileOutputStream(new File(path));
byte buf[] = new byte[1024];
int len = 0;
while ((len = is.read(buf)) > 0)
out.write(buf, 0, len);
out.close();
}
public static String uploadFilesOnStorage(String scope, String user, String localFolder, String file) throws Exception {
try {
ScopeProvider.instance.set(scope);
AnalysisLogger.getLogger().info("Loading file on scope: " + scope);
IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient();
String remotef = "/"+file;
client.put(true).LFile(new File(localFolder,file).getAbsolutePath()).RFile(remotef);
String url = client.getUrl().RFile(remotef);
AnalysisLogger.getLogger().info("Loading finished");
System.gc();
return url;
} catch (Exception e) {
AnalysisLogger.getLogger().info("Error in uploading file: " + e.getLocalizedMessage());
throw e;
}
}
public static void downloadFilefromStorage(String scope, String user, String localFolder, String file) throws Exception {
try {
ScopeProvider.instance.set(scope);
AnalysisLogger.getLogger().info("Retrieving file on scope: " + scope);
IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient();
String remotef = "/"+file;
client.get().LFile(new File(localFolder,file).getAbsolutePath()).RFile(remotef);
AnalysisLogger.getLogger().info("Retrieving finished");
System.gc();
} catch (Exception e) {
AnalysisLogger.getLogger().info("Error in retrieving file: " + e.getLocalizedMessage());
throw e;
}
}
public static void main(String args[]) throws Exception{
uploadFilesOnStorage("/gcube", "CMSY", "./", "tacsat.csv");
downloadFilefromStorage("/gcube", "CMSY", "./PARALLEL_PROCESSING", "tacsat.csv");
}
}