diff --git a/cfg/knitr_test_julien_wfs.zip b/cfg/knitr_test_julien_wfs.zip new file mode 100644 index 0000000..d67510f Binary files /dev/null and b/cfg/knitr_test_julien_wfs.zip differ diff --git a/pom.xml b/pom.xml index b3c2620..c93cbc3 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.gcube.dataanalysis ecological-engine-smart-executor - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT Smart Ecological Engine Executor Smart Ecological Engine Executor Description diff --git a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java index 39833c7..395c55b 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java +++ b/src/main/java/org/gcube/dataanalysis/executor/generators/D4ScienceDistributedProcessing.java @@ -17,6 +17,7 @@ import org.gcube.dataanalysis.ecoengine.interfaces.GenericAlgorithm; import org.gcube.dataanalysis.executor.job.management.DistributedProcessingAgent; public class D4ScienceDistributedProcessing implements Generator { + public static int maxMessagesAllowedPerJob = 20; public static boolean forceUpload = true; public static String defaultContainerFolder = "PARALLEL_PROCESSING"; diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java index f23449e..867560a 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgent.java @@ -20,6 +20,7 @@ import com.thoughtworks.xstream.XStream; public class DistributedProcessingAgent { + protected QueueJobManager jobManager; protected boolean deletefiles = true; protected String mainclass; diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java index dc78946..5f44e0a 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/QueueJobManager.java @@ -45,6 +45,7 @@ import static org.gcube.resources.discovery.icclient.ICFactory.*; public class QueueJobManager { + // broadcast message period public static int broadcastTimePeriod = 120000; // max silence before computation stops diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java index 2fde988..1b2e84a 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java @@ -103,17 +103,17 @@ public class ICCATVPA extends ActorNode { processOutput = "ICCAT-VPA_"+"output_"+uuid+".zip"; config.setParam(processOutputParam, processOutput); - AnalysisLogger.getLogger().debug("ICCAT-VPA Uploading input files: "+config.getGeneralProperties()); + AnalysisLogger.getLogger().debug("ICCAT-VPA Uploading input files (http): "+config.getGeneralProperties()); //upload files on the storage manager - CAAInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(CAAInp)).getParent(), new File(config.getParam(CAAInp)).getName()); + CAAInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(CAAInp)).getParent(), "/",new File(config.getParam(CAAInp)).getName(),true); AnalysisLogger.getLogger().debug("ICCAT-VPA: CAA DONE! "+CAAInpURL); - PCAAInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(PCAAInp)).getParent(), new File(config.getParam(PCAAInp)).getName()); + PCAAInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(PCAAInp)).getParent(),"/", new File(config.getParam(PCAAInp)).getName(),true); AnalysisLogger.getLogger().debug("ICCAT-VPA: PCAA DONE! "+PCAAInpURL); - CPUEInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(CPUEInp)).getParent(), new File(config.getParam(CPUEInp)).getName()); + CPUEInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(CPUEInp)).getParent(),"/", new File(config.getParam(CPUEInp)).getName(),true); AnalysisLogger.getLogger().debug("ICCAT-VPA: CPUE DONE! "+CPUEInpURL); - PwaaInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(PwaaInp)).getParent(), new File(config.getParam(PwaaInp)).getName()); + PwaaInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(PwaaInp)).getParent(), "/",new File(config.getParam(PwaaInp)).getName(),true); AnalysisLogger.getLogger().debug("ICCAT-VPA: Pwaa DONE! "+PwaaInpURL); - waaInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(waaInp)).getParent(), new File(config.getParam(waaInp)).getName()); + waaInpURL = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), new File(config.getParam(waaInp)).getParent(),"/", new File(config.getParam(waaInp)).getName(),true); AnalysisLogger.getLogger().debug("ICCAT-VPA: waa DONE! "+waaInpURL); AnalysisLogger.getLogger().debug("ICCAT-VPA Input files uploaded!"); @@ -168,7 +168,7 @@ public class ICCATVPA extends ActorNode { //download the package AnalysisLogger.getLogger().info("ICCAT-VPA : downloading package URL: "+packageURL); - StorageUtils.downloadInputFile(packageURL, localzipFile); + StorageUtils.downloadInputFile(packageURL, localzipFile,true); //unzip the package AnalysisLogger.getLogger().info("ICCAT-VPA : Unzipping file: "+localzipFile+" having size "+new File(localzipFile).length()); @@ -177,14 +177,14 @@ public class ICCATVPA extends ActorNode { //download input files AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading remote input files "+config.getGeneralProperties()); AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading CAA"); - StorageUtils.downloadInputFile(config.getParam(CAAInp), "CAA_Age1_25.csv"); - StorageUtils.downloadInputFile(config.getParam(PCAAInp), "PCAA_Age1_25_Run3.csv"); + StorageUtils.downloadInputFile(config.getParam(CAAInp), "CAA_Age1_25.csv",true); + StorageUtils.downloadInputFile(config.getParam(PCAAInp), "PCAA_Age1_25_Run3.csv",true); AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading PCAA"); - StorageUtils.downloadInputFile(config.getParam(CPUEInp), "CPUE_Run3.csv"); + StorageUtils.downloadInputFile(config.getParam(CPUEInp), "CPUE_Run3.csv",true); AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading CPUE"); - StorageUtils.downloadInputFile(config.getParam(PwaaInp), "waa.csv"); + StorageUtils.downloadInputFile(config.getParam(PwaaInp), "waa.csv",true); AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading Pwaa"); - StorageUtils.downloadInputFile(config.getParam(waaInp), "fecaa.csv"); + StorageUtils.downloadInputFile(config.getParam(waaInp), "fecaa.csv",true); AnalysisLogger.getLogger().info("ICCAT-VPA : Downloading waa"); AnalysisLogger.getLogger().info("ICCAT-VPA : all files downloaded: "); diff --git a/src/main/java/org/gcube/dataanalysis/executor/rscripts/KnitrCompiler.java b/src/main/java/org/gcube/dataanalysis/executor/rscripts/KnitrCompiler.java index a5c9821..7906913 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/rscripts/KnitrCompiler.java +++ b/src/main/java/org/gcube/dataanalysis/executor/rscripts/KnitrCompiler.java @@ -1,5 +1,7 @@ package org.gcube.dataanalysis.executor.rscripts; + + import java.io.File; import java.util.ArrayList; import java.util.LinkedHashMap; diff --git a/src/main/java/org/gcube/dataanalysis/executor/rscripts/TemplateRScripts.java b/src/main/java/org/gcube/dataanalysis/executor/rscripts/TemplateRScripts.java new file mode 100644 index 0000000..248b5be --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/rscripts/TemplateRScripts.java @@ -0,0 +1,66 @@ +package org.gcube.dataanalysis.executor.rscripts; + + + +import java.io.File; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.LinkedHashMap; + +import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; +import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; +import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; +import org.gcube.dataanalysis.ecoengine.utils.DynamicEnum; +import org.gcube.dataanalysis.executor.rscripts.generic.GenericRScript; + +public class TemplateRScripts extends GenericRScript { + + + @Override + public String getDescription() { + return "An algorithm to compile Knitr documents. Developed by IRD (reference Julien Bard, julien.barde@ird.fr)"; + } + + protected void initVariables(){ + mainScriptName="IRDTunaAtlas-master/report/knitr/compileKnitR_CNR.R"; + packageURL="http://goo.gl/T7V8LV"; + + environmentalvariables = new ArrayList(); + inputvariables.add("zipfile"); + inputvariables.add("file.inout"); + outputvariables.add("pdfresult"); + } + + + //Op12345 e Eop12345 are two automatically generated names - Use UUID + static class Op12345 extends DynamicEnum { + public enum Eop12345 {}; + public Field[] getFields() { + Field[] fields = Eop12345.class.getDeclaredFields(); + return fields; + } + } + + @Override + protected void setInputParameters() { + if (org.gcube.dataanalysis.executor.rscripts.TemplateRScripts.Op12345.Eop12345.values().length==0){ + Op12345 en = new Op12345(); + en.addEnum(org.gcube.dataanalysis.executor.rscripts.TemplateRScripts.Op12345.Eop12345.class, "CIAO"); + en.addEnum(org.gcube.dataanalysis.executor.rscripts.TemplateRScripts.Op12345.Eop12345.class, "TEST"); + en.addEnum(org.gcube.dataanalysis.executor.rscripts.TemplateRScripts.Op12345.Eop12345.class, "MIAO *_$"); + } + //Add Enumerate Type + addEnumerateInput(org.gcube.dataanalysis.executor.rscripts.TemplateRScripts.Op12345.Eop12345.values(), "Name", "Description", "Hello"); + + inputs.add(new PrimitiveType(File.class.getName(), null, PrimitiveTypes.FILE, "zipfile", "The file containing R and the markdown (Rnw) files to compile","knitr_wfs.zip")); + inputs.add(new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, "file.inout", "The name of the R file in the zip package", "main.r")); + } + + @Override + public StatisticalType getOutput() { + output.put("pdfresult",new PrimitiveType(File.class.getName(), new File(outputValues.get("pdfresult")), PrimitiveTypes.FILE, "pdfresult", "The compiled PDF file")); + PrimitiveType o = new PrimitiveType(LinkedHashMap.class.getName(), output, PrimitiveTypes.MAP, "Output", ""); + return o; + } + +} diff --git a/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java b/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java index 700bceb..802e943 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java +++ b/src/main/java/org/gcube/dataanalysis/executor/rscripts/generic/GenericRScript.java @@ -18,7 +18,6 @@ import org.gcube.dataanalysis.executor.util.LocalRScriptsManager; import org.gcube.dataanalysis.executor.util.StorageUtils; public abstract class GenericRScript extends StandardLocalExternalAlgorithm { - // FIXED part protected HashMap outputValues = new HashMap(); @@ -176,7 +175,7 @@ public abstract class GenericRScript extends StandardLocalExternalAlgorithm { fw.write(Rlog); fw.close(); AnalysisLogger.getLogger().debug("Written in " + logfile); - String httpurl = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("Username"), logfile.getParent(), "/ScriptLogs/" + uuid + "/", logfile.getName(),true); + String httpurl = StorageUtils.uploadFilesOnStorage(config.getGcubeScope(), config.getParam("ServiceUserName"), logfile.getParent(), "/ScriptLogs/" + uuid + "/", logfile.getName(),true); AnalysisLogger.getLogger().debug("Uploaded on storage: " + httpurl); // String httpurl = url.replace("smp:", "http:"); diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/LocalRScriptsManager.java b/src/main/java/org/gcube/dataanalysis/executor/util/LocalRScriptsManager.java index f09ec40..50a6eef 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/util/LocalRScriptsManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/util/LocalRScriptsManager.java @@ -21,6 +21,7 @@ import org.junit.runner.notification.RunListener; public class LocalRScriptsManager { + public float status = 0; public String currentOutputURL; Process process; diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java b/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java index 984e90c..632fc5a 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java +++ b/src/main/java/org/gcube/dataanalysis/executor/util/StorageUtils.java @@ -24,13 +24,14 @@ import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; public class StorageUtils { + public static void downloadInputFile(String fileurl, String destinationFile) throws Exception{ downloadInputFile(fileurl, destinationFile, false); } public static void downloadInputFile(String fileurl, String destinationFile, boolean httpURL) throws Exception{ try { - if (!httpURL) + if (!httpURL || !fileurl.toLowerCase().startsWith("http:")) Handler.activateProtocol(); URL smpFile = new URL(fileurl); URLConnection uc = (URLConnection) smpFile.openConnection();