git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@122885 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
c407d698e0
commit
30f0358066
Binary file not shown.
2
pom.xml
2
pom.xml
|
@ -9,7 +9,7 @@
|
||||||
</parent>
|
</parent>
|
||||||
<groupId>org.gcube.dataanalysis</groupId>
|
<groupId>org.gcube.dataanalysis</groupId>
|
||||||
<artifactId>ecological-engine-smart-executor</artifactId>
|
<artifactId>ecological-engine-smart-executor</artifactId>
|
||||||
<version>1.1.2-SNAPSHOT</version>
|
<version>1.2.0-SNAPSHOT</version>
|
||||||
<name>Smart Ecological Engine Executor</name>
|
<name>Smart Ecological Engine Executor</name>
|
||||||
<description>Smart Ecological Engine Executor Description</description>
|
<description>Smart Ecological Engine Executor Description</description>
|
||||||
<properties>
|
<properties>
|
||||||
|
|
|
@ -17,6 +17,7 @@ import org.gcube.dataanalysis.ecoengine.interfaces.GenericAlgorithm;
|
||||||
import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgent;
|
import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgent;
|
||||||
|
|
||||||
public class D4ScienceDistributedProcessing implements Generator {
|
public class D4ScienceDistributedProcessing implements Generator {
|
||||||
|
|
||||||
public static int maxMessagesAllowedPerJob = 20;
|
public static int maxMessagesAllowedPerJob = 20;
|
||||||
public static boolean forceUpload = true;
|
public static boolean forceUpload = true;
|
||||||
public static String defaultContainerFolder = "PARALLEL_PROCESSING";
|
public static String defaultContainerFolder = "PARALLEL_PROCESSING";
|
||||||
|
|
|
@ -20,6 +20,7 @@ import com.thoughtworks.xstream.XStream;
|
||||||
|
|
||||||
public class DistributedProcessingAgent {
|
public class DistributedProcessingAgent {
|
||||||
|
|
||||||
|
|
||||||
protected QueueJobManager jobManager;
|
protected QueueJobManager jobManager;
|
||||||
protected boolean deletefiles = true;
|
protected boolean deletefiles = true;
|
||||||
protected String mainclass;
|
protected String mainclass;
|
||||||
|
|
|
@ -45,6 +45,7 @@ import static org.gcube.resources.discovery.icclient.ICFactory.*;
|
||||||
|
|
||||||
public class QueueJobManager {
|
public class QueueJobManager {
|
||||||
|
|
||||||
|
|
||||||
// broadcast message period
|
// broadcast message period
|
||||||
public static int broadcastTimePeriod = 120000;
|
public static int broadcastTimePeriod = 120000;
|
||||||
// max silence before computation stops
|
// max silence before computation stops
|
||||||
|
|
|
@ -103,17 +103,17 @@ public class ICCATVPA extends ActorNode {
|
||||||
processOutput = "ICCAT-VPA_"+"output_"+uuid+".zip";
|
processOutput = "ICCAT-VPA_"+"output_"+uuid+".zip";
|
||||||
|
|
||||||
config.setParam(processOutputParam, processOutput);
|
config.setParam(processOutputParam, processOutput);
|
||||||
AnalysisLogger.getLogger().debug("ICCAT-VPA Uploading input files: "+config.getGeneralProperties());
|
AnalysisLogger.getLogger().debug("ICCAT-VPA Uploading input files (http): "+config.getGeneralProperties());
|
||||||
//upload files on the storage manager
|
//upload files on the storage manager
|
||||||
CAAInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(CAAInp)).getParent(), new File(config.getParam(CAAInp)).getName());
|
CAAInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(CAAInp)).getParent(), "/",new File(config.getParam(CAAInp)).getName(),true);
|
||||||
AnalysisLogger.getLogger().debug("ICCAT-VPA: CAA DONE! "+CAAInpURL);
|
AnalysisLogger.getLogger().debug("ICCAT-VPA: CAA DONE! "+CAAInpURL);
|
||||||
PCAAInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(PCAAInp)).getParent(), new File(config.getParam(PCAAInp)).getName());
|
PCAAInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(PCAAInp)).getParent(),"/", new File(config.getParam(PCAAInp)).getName(),true);
|
||||||
AnalysisLogger.getLogger().debug("ICCAT-VPA: PCAA DONE! "+PCAAInpURL);
|
AnalysisLogger.getLogger().debug("ICCAT-VPA: PCAA DONE! "+PCAAInpURL);
|
||||||
CPUEInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(CPUEInp)).getParent(), new File(config.getParam(CPUEInp)).getName());
|
CPUEInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(CPUEInp)).getParent(),"/", new File(config.getParam(CPUEInp)).getName(),true);
|
||||||
AnalysisLogger.getLogger().debug("ICCAT-VPA: CPUE DONE! "+CPUEInpURL);
|
AnalysisLogger.getLogger().debug("ICCAT-VPA: CPUE DONE! "+CPUEInpURL);
|
||||||
PwaaInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(PwaaInp)).getParent(), new File(config.getParam(PwaaInp)).getName());
|
PwaaInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(PwaaInp)).getParent(), "/",new File(config.getParam(PwaaInp)).getName(),true);
|
||||||
AnalysisLogger.getLogger().debug("ICCAT-VPA: Pwaa DONE! "+PwaaInpURL);
|
AnalysisLogger.getLogger().debug("ICCAT-VPA: Pwaa DONE! "+PwaaInpURL);
|
||||||
waaInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(waaInp)).getParent(), new File(config.getParam(waaInp)).getName());
|
waaInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(waaInp)).getParent(),"/", new File(config.getParam(waaInp)).getName(),true);
|
||||||
AnalysisLogger.getLogger().debug("ICCAT-VPA: waa DONE! "+waaInpURL);
|
AnalysisLogger.getLogger().debug("ICCAT-VPA: waa DONE! "+waaInpURL);
|
||||||
AnalysisLogger.getLogger().debug("ICCAT-VPA Input files uploaded!");
|
AnalysisLogger.getLogger().debug("ICCAT-VPA Input files uploaded!");
|
||||||
|
|
||||||
|
@ -168,7 +168,7 @@ public class ICCATVPA extends ActorNode {
|
||||||
|
|
||||||
//download the package
|
//download the package
|
||||||
AnalysisLogger.getLogger().info("ICCAT-VPA : downloading package URL: "+packageURL);
|
AnalysisLogger.getLogger().info("ICCAT-VPA : downloading package URL: "+packageURL);
|
||||||
StorageUtils.downloadInputFile(packageURL, localzipFile);
|
StorageUtils.downloadInputFile(packageURL, localzipFile,true);
|
||||||
|
|
||||||
//unzip the package
|
//unzip the package
|
||||||
AnalysisLogger.getLogger().info("ICCAT-VPA : Unzipping file: "+localzipFile+" having size "+new File(localzipFile).length());
|
AnalysisLogger.getLogger().info("ICCAT-VPA : Unzipping file: "+localzipFile+" having size "+new File(localzipFile).length());
|
||||||
|
@ -177,14 +177,14 @@ public class ICCATVPA extends ActorNode {
|
||||||
//download input files
|
//download input files
|
||||||
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading remote input files "+config.getGeneralProperties());
|
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading remote input files "+config.getGeneralProperties());
|
||||||
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading CAA");
|
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading CAA");
|
||||||
StorageUtils.downloadInputFile(config.getParam(CAAInp), "CAA_Age1_25.csv");
|
StorageUtils.downloadInputFile(config.getParam(CAAInp), "CAA_Age1_25.csv",true);
|
||||||
StorageUtils.downloadInputFile(config.getParam(PCAAInp), "PCAA_Age1_25_Run3.csv");
|
StorageUtils.downloadInputFile(config.getParam(PCAAInp), "PCAA_Age1_25_Run3.csv",true);
|
||||||
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading PCAA");
|
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading PCAA");
|
||||||
StorageUtils.downloadInputFile(config.getParam(CPUEInp), "CPUE_Run3.csv");
|
StorageUtils.downloadInputFile(config.getParam(CPUEInp), "CPUE_Run3.csv",true);
|
||||||
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading CPUE");
|
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading CPUE");
|
||||||
StorageUtils.downloadInputFile(config.getParam(PwaaInp), "waa.csv");
|
StorageUtils.downloadInputFile(config.getParam(PwaaInp), "waa.csv",true);
|
||||||
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading Pwaa");
|
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading Pwaa");
|
||||||
StorageUtils.downloadInputFile(config.getParam(waaInp), "fecaa.csv");
|
StorageUtils.downloadInputFile(config.getParam(waaInp), "fecaa.csv",true);
|
||||||
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading waa");
|
AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading waa");
|
||||||
|
|
||||||
AnalysisLogger.getLogger().info("ICCAT-VPA : all files downloaded: ");
|
AnalysisLogger.getLogger().info("ICCAT-VPA : all files downloaded: ");
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package org.gcube.dataanalysis.executor.rscripts;
|
package org.gcube.dataanalysis.executor.rscripts;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
package org.gcube.dataanalysis.executor.rscripts;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
|
||||||
|
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
|
||||||
|
import org.gcube.dataanalysis.ecoengine.utils.DynamicEnum;
|
||||||
|
import org.gcube.dataanalysis.executor.rscripts.generic.GenericRScript;
|
||||||
|
|
||||||
|
public class TemplateRScripts extends GenericRScript {
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return "An algorithm to compile Knitr documents. Developed by IRD (reference Julien Bard, julien.barde@ird.fr)";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void initVariables(){
|
||||||
|
mainScriptName="IRDTunaAtlas-master/report/knitr/compileKnitR_CNR.R";
|
||||||
|
packageURL="http://goo.gl/T7V8LV";
|
||||||
|
|
||||||
|
environmentalvariables = new ArrayList<String>();
|
||||||
|
inputvariables.add("zipfile");
|
||||||
|
inputvariables.add("file.inout");
|
||||||
|
outputvariables.add("pdfresult");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//Op12345 e Eop12345 are two automatically generated names - Use UUID
|
||||||
|
static class Op12345 extends DynamicEnum {
|
||||||
|
public enum Eop12345 {};
|
||||||
|
public Field[] getFields() {
|
||||||
|
Field[] fields = Eop12345.class.getDeclaredFields();
|
||||||
|
return fields;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setInputParameters() {
|
||||||
|
if (org.gcube.dataanalysis.executor.rscripts.TemplateRScripts.Op12345.Eop12345.values().length==0){
|
||||||
|
Op12345 en = new Op12345();
|
||||||
|
en.addEnum(org.gcube.dataanalysis.executor.rscripts.TemplateRScripts.Op12345.Eop12345.class, "CIAO");
|
||||||
|
en.addEnum(org.gcube.dataanalysis.executor.rscripts.TemplateRScripts.Op12345.Eop12345.class, "TEST");
|
||||||
|
en.addEnum(org.gcube.dataanalysis.executor.rscripts.TemplateRScripts.Op12345.Eop12345.class, "MIAO *_$");
|
||||||
|
}
|
||||||
|
//Add Enumerate Type
|
||||||
|
addEnumerateInput(org.gcube.dataanalysis.executor.rscripts.TemplateRScripts.Op12345.Eop12345.values(), "Name", "Description", "Hello");
|
||||||
|
|
||||||
|
inputs.add(new PrimitiveType(File.class.getName(), null, PrimitiveTypes.FILE, "zipfile", "The file containing R and the markdown (Rnw) files to compile","knitr_wfs.zip"));
|
||||||
|
inputs.add(new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, "file.inout", "The name of the R file in the zip package", "main.r"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StatisticalType getOutput() {
|
||||||
|
output.put("pdfresult",new PrimitiveType(File.class.getName(), new File(outputValues.get("pdfresult")), PrimitiveTypes.FILE, "pdfresult", "The compiled PDF file"));
|
||||||
|
PrimitiveType o = new PrimitiveType(LinkedHashMap.class.getName(), output, PrimitiveTypes.MAP, "Output", "");
|
||||||
|
return o;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -19,7 +19,6 @@ import org.gcube.dataanalysis.executor.util.StorageUtils;
|
||||||
|
|
||||||
public abstract class GenericRScript extends StandardLocalExternalAlgorithm {
|
public abstract class GenericRScript extends StandardLocalExternalAlgorithm {
|
||||||
|
|
||||||
|
|
||||||
// FIXED part
|
// FIXED part
|
||||||
protected HashMap<String, String> outputValues = new HashMap<String, String>();
|
protected HashMap<String, String> outputValues = new HashMap<String, String>();
|
||||||
protected LinkedHashMap<String, StatisticalType> output = new LinkedHashMap<String, StatisticalType>();
|
protected LinkedHashMap<String, StatisticalType> output = new LinkedHashMap<String, StatisticalType>();
|
||||||
|
@ -176,7 +175,7 @@ public abstract class GenericRScript extends StandardLocalExternalAlgorithm {
|
||||||
fw.write(Rlog);
|
fw.write(Rlog);
|
||||||
fw.close();
|
fw.close();
|
||||||
AnalysisLogger.getLogger().debug("Written in " + logfile);
|
AnalysisLogger.getLogger().debug("Written in " + logfile);
|
||||||
String httpurl = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("Username"), logfile.getParent(), "/ScriptLogs/" + uuid + "/", logfile.getName(),true);
|
String httpurl = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), logfile.getParent(), "/ScriptLogs/" + uuid + "/", logfile.getName(),true);
|
||||||
AnalysisLogger.getLogger().debug("Uploaded on storage: " + httpurl);
|
AnalysisLogger.getLogger().debug("Uploaded on storage: " + httpurl);
|
||||||
|
|
||||||
// String httpurl = url.replace("smp:", "http:");
|
// String httpurl = url.replace("smp:", "http:");
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.junit.runner.notification.RunListener;
|
||||||
|
|
||||||
public class LocalRScriptsManager {
|
public class LocalRScriptsManager {
|
||||||
|
|
||||||
|
|
||||||
public float status = 0;
|
public float status = 0;
|
||||||
public String currentOutputURL;
|
public String currentOutputURL;
|
||||||
Process process;
|
Process process;
|
||||||
|
|
|
@ -24,13 +24,14 @@ import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
||||||
|
|
||||||
public class StorageUtils {
|
public class StorageUtils {
|
||||||
|
|
||||||
|
|
||||||
public static void downloadInputFile(String fileurl, String destinationFile) throws Exception{
|
public static void downloadInputFile(String fileurl, String destinationFile) throws Exception{
|
||||||
downloadInputFile(fileurl, destinationFile, false);
|
downloadInputFile(fileurl, destinationFile, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void downloadInputFile(String fileurl, String destinationFile, boolean httpURL) throws Exception{
|
public static void downloadInputFile(String fileurl, String destinationFile, boolean httpURL) throws Exception{
|
||||||
try {
|
try {
|
||||||
if (!httpURL)
|
if (!httpURL || !fileurl.toLowerCase().startsWith("http:"))
|
||||||
Handler.activateProtocol();
|
Handler.activateProtocol();
|
||||||
URL smpFile = new URL(fileurl);
|
URL smpFile = new URL(fileurl);
|
||||||
URLConnection uc = (URLConnection) smpFile.openConnection();
|
URLConnection uc = (URLConnection) smpFile.openConnection();
|
||||||
|
|
Loading…
Reference in New Issue