git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@113923 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
bcabae5d6b
commit
129d834d62
|
@ -0,0 +1,223 @@
|
|||
package org.gcube.dataanalysis.executor.nodes.algorithms;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||
import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS;
|
||||
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
||||
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType;
|
||||
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveTypesList;
|
||||
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
|
||||
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
|
||||
import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode;
|
||||
import org.gcube.dataanalysis.ecoengine.utils.IOHelper;
|
||||
import org.gcube.dataanalysis.ecoengine.utils.Transformations;
|
||||
import org.gcube.dataanalysis.executor.scripts.OSCommand;
|
||||
import org.gcube.dataanalysis.executor.util.RScriptsManager;
|
||||
import org.gcube.dataanalysis.executor.util.StorageUtils;
|
||||
|
||||
public class FAOMSY extends ActorNode {
|
||||
|
||||
public int count;
|
||||
|
||||
public float status = 0;
|
||||
|
||||
@Override
|
||||
public ALG_PROPS[] getProperties() {
|
||||
ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON };
|
||||
return p;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "FAOMSY";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "An algorithm to estimate the Maximum Sustainable Yield from a catch statistic by FAO.";
|
||||
}
|
||||
|
||||
static String stocksFile = "StocksFile";
|
||||
static String processOutput= "ProcessOutput";
|
||||
static String nonProcessedOutput= "NonProcessedOutput";
|
||||
static String scriptName = "CatchMSY_Dec2014.R";
|
||||
|
||||
@Override
|
||||
public List<StatisticalType> getInputParameters() {
|
||||
|
||||
List<StatisticalType> parameters = new ArrayList<StatisticalType>();
|
||||
IOHelper.addStringInput(parameters, stocksFile, "Http link to a file containing catch statistics for a group of species. Example: http://goo.gl/g6YtVx", "");
|
||||
return parameters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StatisticalType getOutput() {
|
||||
File outfile = new File(config.getPersistencePath(),config.getParam(processOutput));
|
||||
File outfile2 = new File(config.getPersistencePath(),config.getParam(nonProcessedOutput));
|
||||
PrimitiveTypesList list = new PrimitiveTypesList(File.class.getName(), PrimitiveTypes.FILE, "OutputFiles", "Textual output files - processed and non-processed species",false);
|
||||
if (outfile.exists()){
|
||||
PrimitiveType o = new PrimitiveType(File.class.getName(), outfile, PrimitiveTypes.FILE, "ProcessedSpecies", "Output file");
|
||||
list.add(o);
|
||||
}
|
||||
if (outfile2.exists()){
|
||||
PrimitiveType o2 = new PrimitiveType(File.class.getName(), outfile2, PrimitiveTypes.FILE, "NonProcessedSpecies", "Output file");
|
||||
list.add(o2);
|
||||
}
|
||||
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initSingleNode(AlgorithmConfiguration config) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getInternalStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
AlgorithmConfiguration config;
|
||||
|
||||
@Override
|
||||
public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) {
|
||||
try {
|
||||
status = 0;
|
||||
config = Transformations.restoreConfig(nodeConfigurationFileObject);
|
||||
String outputFile = config.getParam(processOutput)+"_part"+rightStartIndex;
|
||||
String nonprocessedoutputFile = config.getParam(nonProcessedOutput)+"_part"+rightStartIndex;
|
||||
AnalysisLogger.getLogger().info("FAOMSY ranges: "+" Li:"+leftStartIndex+" NLi:"+leftStartIndex+" Ri:"+rightStartIndex+" NRi:"+numberOfRightElementsToProcess);
|
||||
|
||||
AnalysisLogger.getLogger().info("FAOMSY expected output "+outputFile);
|
||||
|
||||
File filestock=new File(sandboxFolder,"D20_1.csv");
|
||||
StorageUtils.downloadInputFile(config.getParam(stocksFile), filestock.getAbsolutePath());
|
||||
|
||||
AnalysisLogger.getLogger().debug("Check fileStocks: "+filestock.getAbsolutePath()+" "+filestock.exists());
|
||||
File filestocksub=new File(sandboxFolder,"D20.csv");
|
||||
|
||||
StorageUtils.FileSubset(filestock, filestocksub, rightStartIndex, numberOfRightElementsToProcess, true);
|
||||
|
||||
RScriptsManager scriptmanager = new RScriptsManager();
|
||||
|
||||
HashMap<String,String> codeinj = new HashMap<String,String>();
|
||||
config.setConfigPath("./");
|
||||
scriptmanager.executeRScript(config, scriptName, "", new HashMap<String,String>(), "", "CatchMSY_Output.csv", codeinj, false,false,false);
|
||||
AnalysisLogger.getLogger().info("FAOMSY The script has finished");
|
||||
String outputFileName = "";
|
||||
//manage the fact that the outputfile could even not exist
|
||||
try{outputFileName = scriptmanager.getCurrentOutputFileName();}catch(Exception e){
|
||||
AnalysisLogger.getLogger().info("FAOMSY Could not get curent output file");
|
||||
}
|
||||
String optionalFileOutputName = "NonProcessedSpecies.csv";
|
||||
|
||||
if (outputFileName!=null && outputFileName.length()>0 && new File(outputFileName).exists()){
|
||||
AnalysisLogger.getLogger().info("FAOMSY Main file exists!");
|
||||
outputFileName = scriptmanager.getCurrentOutputFileName();
|
||||
String outputFilePath = new File(sandboxFolder,outputFile).getAbsolutePath();
|
||||
AnalysisLogger.getLogger().info("FAOMSY writing output file in path "+outputFilePath);
|
||||
OSCommand.FileCopy(outputFileName,outputFilePath);
|
||||
AnalysisLogger.getLogger().info("FAOMSY uploading output file "+outputFile);
|
||||
StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), sandboxFolder,outputFile);
|
||||
}
|
||||
if (new File(optionalFileOutputName).exists()){
|
||||
AnalysisLogger.getLogger().info("FAOMSY Optional file exists!");
|
||||
OSCommand.FileCopy(optionalFileOutputName,nonprocessedoutputFile);
|
||||
AnalysisLogger.getLogger().info("FAOMSY uploading output file "+nonprocessedoutputFile);
|
||||
//check only
|
||||
// String file = FileTools.loadString(nonprocessedoutputFile, "UTF-8");
|
||||
// AnalysisLogger.getLogger().info("FAOMSY File check"+file);
|
||||
StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), sandboxFolder,nonprocessedoutputFile);
|
||||
}
|
||||
|
||||
AnalysisLogger.getLogger().info("FAOMSY Finished");
|
||||
}catch(Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup(AlgorithmConfiguration config) throws Exception {
|
||||
this.config = config;
|
||||
AnalysisLogger.getLogger().info("FAOMSY process is initialized");
|
||||
String uuid = (UUID.randomUUID()+".txt").replace("-", "");
|
||||
config.setParam(processOutput, "FAOMSY_"+"output_"+uuid);
|
||||
config.setParam(nonProcessedOutput, "FAOMSY_nonprocessed_"+"output_"+uuid);
|
||||
File tempfile = new File(config.getPersistencePath(),"FAOMSY_input_"+(UUID.randomUUID()+".csv").replace("-", ""));
|
||||
StorageUtils.downloadInputFile(config.getParam(stocksFile), tempfile.getAbsolutePath());
|
||||
nstocks = StorageUtils.calcFileRows(tempfile, true);
|
||||
AnalysisLogger.getLogger().info("FAOMSY Found "+nstocks+" stocks!");
|
||||
if (nstocks==0)
|
||||
throw new Exception("Error in FAOMSY: No stocks to process found in the file "+config.getParam(stocksFile));
|
||||
|
||||
}
|
||||
|
||||
int nstocks = 0;
|
||||
@Override
|
||||
public int getNumberOfRightElements() {
|
||||
return nstocks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumberOfLeftElements() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
AnalysisLogger.getLogger().info("CMSY process stopped");
|
||||
}
|
||||
|
||||
boolean haspostprocessed = false;
|
||||
|
||||
public void assembleFiles(String outputFileName) throws Exception{
|
||||
|
||||
//try downloading all the files
|
||||
List<String> fileslist = new ArrayList<String>();
|
||||
for (int i=0;i<=nstocks;i++){
|
||||
String filename = outputFileName+"_part"+i;
|
||||
try{
|
||||
StorageUtils.downloadFilefromStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), config.getPersistencePath(), filename);
|
||||
AnalysisLogger.getLogger().debug("FAOMSY - Saved from Storage: "+filename);
|
||||
fileslist.add(filename);
|
||||
}catch(Exception e){
|
||||
AnalysisLogger.getLogger().debug("FAOMSY - Did not save file from Storage: "+filename);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
AnalysisLogger.getLogger().debug("FAOMSY - Merging files in "+outputFileName);
|
||||
if (fileslist.size()>0)
|
||||
StorageUtils.mergeFiles(config.getPersistencePath(), fileslist, outputFileName, true);
|
||||
|
||||
AnalysisLogger.getLogger().debug("FAOMSY - Deleting parts");
|
||||
for (String file:fileslist){
|
||||
new File(config.getPersistencePath(),file).delete();
|
||||
}
|
||||
|
||||
AnalysisLogger.getLogger().debug("FAOMSY - File assembling complete");
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postProcess(boolean manageDuplicates, boolean manageFault) {
|
||||
try {
|
||||
String mainOutputfilename = config.getParam(processOutput);
|
||||
String optionalOutputfilename = config.getParam(nonProcessedOutput);
|
||||
assembleFiles(mainOutputfilename);
|
||||
assembleFiles(optionalOutputfilename);
|
||||
AnalysisLogger.getLogger().debug("FAOMSY - Postprocess complete");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package org.gcube.dataanalysis.executor.tests;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
||||
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
|
||||
import org.gcube.dataanalysis.ecoengine.interfaces.ComputationalAgent;
|
||||
import org.gcube.dataanalysis.ecoengine.processing.factories.GeneratorsFactory;
|
||||
import org.gcube.dataanalysis.ecoengine.test.regression.Regressor;
|
||||
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
|
||||
|
||||
public class RegressionTestFAOMSY {
|
||||
/**
|
||||
* example of parallel processing on a single machine the procedure will generate a new table for a distribution on suitable species
|
||||
*
|
||||
*/
|
||||
|
||||
public static AlgorithmConfiguration getConfig() {
|
||||
|
||||
AlgorithmConfiguration config = new AlgorithmConfiguration();
|
||||
|
||||
config.setConfigPath("./cfg/");
|
||||
config.setPersistencePath("./");
|
||||
config.setParam("DatabaseUserName","utente");
|
||||
config.setParam("DatabasePassword","d4science");
|
||||
config.setParam("DatabaseURL","jdbc:postgresql://dbtest.research-infrastructures.eu/testdb");
|
||||
config.setParam("DatabaseDriver","org.postgresql.Driver");
|
||||
AnalysisLogger.setLogger(config.getConfigPath()+AlgorithmConfiguration.defaultLoggerFile);
|
||||
return config;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
System.out.println("TEST 1");
|
||||
|
||||
List<ComputationalAgent> generators = GeneratorsFactory.getGenerators(testCMSY());
|
||||
generators.get(0).init();
|
||||
CustomRegressor.process(generators.get(0));
|
||||
StatisticalType output = generators.get(0).getOutput();
|
||||
AnalysisLogger.getLogger().debug("Output description: "+output.getDescription());
|
||||
generators = null;
|
||||
}
|
||||
|
||||
private static AlgorithmConfiguration testCMSY() {
|
||||
|
||||
AlgorithmConfiguration config = getConfig();
|
||||
config.setNumberOfResources(5);
|
||||
config.setModel("FAOMSY");
|
||||
|
||||
config.setParam("UserName", "gianpaolo.coro");
|
||||
// config.setGcubeScope("/gcube");
|
||||
config.setGcubeScope("/d4science.research-infrastructures.eu");
|
||||
config.setParam("ServiceUserName", "gianpaolo.coro");
|
||||
|
||||
D4ScienceDistributedProcessing.maxMessagesAllowedPerJob=2;
|
||||
|
||||
// config.setParam("StocksFile","http://goo.gl/g6YtVx");
|
||||
//config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Short1.csv");
|
||||
// config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Short2.csv");
|
||||
|
||||
// config.setParam("StocksFile","https://dl.dropboxusercontent.com/u/12809149/FAOMSY_Longtest.csv");
|
||||
config.setParam("StocksFile","http://goo.gl/B09ZL0"); //50species
|
||||
//config.setParam("IDsFile","http://goo.gl/9rg3qK");
|
||||
// config.setParam("StocksFile","http://goo.gl/Mp2ZLY");
|
||||
// config.setParam("StocksFile","http://goo.gl/btuIIe");
|
||||
// config.setParam("SelectedStock","Pan_bor_1");
|
||||
// config.setParam("SelectedStock","HLH_M08");
|
||||
|
||||
return config;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,173 @@
|
|||
package org.gcube.dataanalysis.executor.util;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileReader;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
import java.util.List;
|
||||
|
||||
import org.gcube.common.scope.api.ScopeProvider;
|
||||
import org.gcube.contentmanagement.blobstorage.service.IClient;
|
||||
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
|
||||
import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler;
|
||||
import org.gcube.contentmanager.storageclient.wrapper.AccessType;
|
||||
import org.gcube.contentmanager.storageclient.wrapper.MemoryType;
|
||||
import org.gcube.contentmanager.storageclient.wrapper.StorageClient;
|
||||
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
||||
|
||||
public class StorageUtils {
|
||||
|
||||
public static void downloadInputFile(String fileurl, String destinationFile) throws Exception{
|
||||
try {
|
||||
Handler.activateProtocol();
|
||||
URL smpFile = new URL(fileurl);
|
||||
URLConnection uc = (URLConnection) smpFile.openConnection();
|
||||
InputStream is = uc.getInputStream();
|
||||
AnalysisLogger.getLogger().debug("GenericWorker-> Retrieving from " + fileurl + " to :" + destinationFile);
|
||||
inputStreamToFile(is, destinationFile);
|
||||
is.close();
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public static void inputStreamToFile(InputStream is, String path) throws FileNotFoundException, IOException {
|
||||
FileOutputStream out = new FileOutputStream(new File(path));
|
||||
byte buf[] = new byte[1024];
|
||||
int len = 0;
|
||||
while ((len = is.read(buf)) > 0)
|
||||
out.write(buf, 0, len);
|
||||
out.close();
|
||||
}
|
||||
|
||||
public static String uploadFilesOnStorage(String scope, String user, String localFolder, String file) throws Exception {
|
||||
try {
|
||||
ScopeProvider.instance.set(scope);
|
||||
AnalysisLogger.getLogger().info("Loading file on scope: " + scope);
|
||||
IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient();
|
||||
String remotef = "/"+file;
|
||||
client.put(true).LFile(new File(localFolder,file).getAbsolutePath()).RFile(remotef);
|
||||
String url = client.getUrl().RFile(remotef);
|
||||
AnalysisLogger.getLogger().info("Loading finished");
|
||||
System.gc();
|
||||
return url;
|
||||
} catch (Exception e) {
|
||||
AnalysisLogger.getLogger().info("Error in uploading file: " + e.getLocalizedMessage());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public static int calcFileRows(File file, boolean hasheader){
|
||||
try{
|
||||
BufferedReader br = new BufferedReader(new FileReader(file));
|
||||
String line = br.readLine();
|
||||
if (hasheader)
|
||||
line = br.readLine(); // skip header
|
||||
int counter = 0;
|
||||
while (line!=null){
|
||||
counter++;
|
||||
line = br.readLine();
|
||||
}
|
||||
br.close();
|
||||
return counter;
|
||||
}catch(Exception e){
|
||||
e.printStackTrace();
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static void FileSubset(File infile, File outfile, int index, int numberofelements, boolean hasheader) throws Exception{
|
||||
|
||||
BufferedReader br = new BufferedReader(new FileReader(infile));
|
||||
BufferedWriter bw = new BufferedWriter(new FileWriter(outfile));
|
||||
|
||||
String header = null;
|
||||
if (hasheader){
|
||||
header = br.readLine(); // skip header
|
||||
bw.write(header+"\n");
|
||||
}
|
||||
String line = br.readLine();
|
||||
int counter = 0;
|
||||
while (line!=null && counter < (index+numberofelements)){
|
||||
|
||||
if (counter>=index)
|
||||
bw.write(line+"\n");
|
||||
|
||||
counter++;
|
||||
line = br.readLine();
|
||||
}
|
||||
|
||||
br.close();
|
||||
bw.close();
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static void downloadFilefromStorage(String scope, String user, String localFolder, String file) throws Exception {
|
||||
try {
|
||||
ScopeProvider.instance.set(scope);
|
||||
AnalysisLogger.getLogger().info("Retrieving file on scope: " + scope);
|
||||
IClient client = new StorageClient(AlgorithmConfiguration.StatisticalManagerClass, AlgorithmConfiguration.StatisticalManagerService, user, AccessType.SHARED, MemoryType.VOLATILE).getClient();
|
||||
String remotef = "/"+file;
|
||||
client.get().LFile(new File(localFolder,file).getAbsolutePath()).RFile(remotef);
|
||||
AnalysisLogger.getLogger().info("Retrieving finished");
|
||||
System.gc();
|
||||
} catch (Exception e) {
|
||||
AnalysisLogger.getLogger().info("Error in retrieving file: " + e.getLocalizedMessage());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public static void mergeFiles(String localFolder, List<String> filenames, String outputfile, boolean hasheader) throws Exception {
|
||||
try {
|
||||
|
||||
int nfiles = filenames.size();
|
||||
BufferedWriter bw = new BufferedWriter(new FileWriter(outputfile));
|
||||
for (int i=0;i<nfiles;i++){
|
||||
BufferedReader br = new BufferedReader(new FileReader(filenames.get(i)));
|
||||
String header = null;
|
||||
if (hasheader && i==0){
|
||||
header = br.readLine();
|
||||
bw.write(header+"\n");
|
||||
}
|
||||
else if (hasheader)
|
||||
br.readLine();
|
||||
|
||||
String line = br.readLine();
|
||||
while (line!=null){
|
||||
bw.write(line+"\n");
|
||||
line = br.readLine();
|
||||
}
|
||||
|
||||
br.close();
|
||||
}
|
||||
|
||||
|
||||
bw.close();
|
||||
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
AnalysisLogger.getLogger().info("Error in merging files: " + e.getLocalizedMessage());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static void main(String args[]) throws Exception{
|
||||
|
||||
// uploadFilesOnStorage("/gcube", "CMSY", "./", "tacsat.csv");
|
||||
// downloadFilefromStorage("/gcube", "CMSY", "./PARALLEL_PROCESSING", "tacsat.csv");
|
||||
FileSubset(new File("D20_1.csv"), new File("D20_11.csv"), 3, 10, true);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue