From 433f02c1eff20a023dd067fb3b2d62b856d9c94b Mon Sep 17 00:00:00 2001 From: Gianpaolo Coro Date: Wed, 5 Oct 2016 17:05:34 +0000 Subject: [PATCH] git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@132752 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../DistributedProcessingAgentWPS.java | 7 ++-- .../job/management/WPSJobManager.java | 33 ++++++++++++------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java index 045a355..0edcabf 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/DistributedProcessingAgentWPS.java @@ -99,8 +99,11 @@ public class DistributedProcessingAgentWPS { jobManager = new WPSJobManager(); // we split along right dimension so if elements are less than nodes, we should reduce the number of nodes // chunkize the number of species in order to lower the computational effort of the workers - subdivisiondiv = rightSetNumberOfElements / (maxElementsAllowedPerJob); - int rest = rightSetNumberOfElements % (maxElementsAllowedPerJob); + int nservices = jobManager.estimateNumberOfServices(configuration.getGcubeScope()); + //subdivisiondiv = rightSetNumberOfElements / (maxElementsAllowedPerJob); + subdivisiondiv = nservices; //rightSetNumberOfElements / (nservices); + AnalysisLogger.getLogger().debug("Subdivision for the job "+subdivisiondiv); + int rest = rightSetNumberOfElements % (nservices); if (rest > 0) subdivisiondiv++; if (subdivisiondiv == 0) diff --git a/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java b/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java index c3babf5..d621d75 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java +++ b/src/main/java/org/gcube/dataanalysis/executor/job/management/WPSJobManager.java @@ -24,7 +24,7 @@ public class WPSJobManager { int overallFailures = 0; int overallSuccess = 0; int overallTasks = 0; - + int nservices = -1; boolean stopThreads = false; boolean hasResentMessages = false; @@ -223,19 +223,12 @@ public class WPSJobManager { return hasResentMessages; } - public void uploadAndExecuteChunkized(AlgorithmConfiguration configuration, String algorithmClass, List arguments, String session) throws Exception{ - ExecutorService executor = null; - try{ - int numberofservices = 1; - String callTemplate = getCallTemplate(); - - AnalysisLogger.getLogger().debug("WPSJobManager->Estimating the number of services"); - - List wpsservices = InfraRetrieval.retrieveService("DataMiner", configuration.getGcubeScope()); + public int estimateNumberOfServices(String scope) throws Exception{ + List wpsservices = InfraRetrieval.retrieveService("DataMiner", scope); if (wpsservices==null || wpsservices.size()==0){ AnalysisLogger.getLogger().debug("WPSJobManager->Error: No DataMiner GCore Endpoints found!"); - throw new Exception ("No DataMinerWorkers GCore Endpoint found in the VRE "+configuration.getGcubeScope()); + throw new Exception ("No DataMinerWorkers GCore Endpoint found in the VRE "+scope); } List differentServices = new ArrayList(); for (String service:wpsservices){ @@ -247,7 +240,23 @@ public class WPSJobManager { } - numberofservices = differentServices.size(); + int numberofservices = differentServices.size(); + AnalysisLogger.getLogger().debug("WPSJobManager->Number of found services "+numberofservices); + nservices = numberofservices; + return numberofservices; + } + + public void uploadAndExecuteChunkized(AlgorithmConfiguration configuration, String algorithmClass, List arguments, String session) throws Exception{ + ExecutorService executor = null; + try{ + int numberofservices = 1; + String callTemplate = getCallTemplate(); + + AnalysisLogger.getLogger().debug("WPSJobManager->Estimating the number of services"); + if (nservices>0) + numberofservices = nservices; + else + numberofservices = estimateNumberOfServices(configuration.getGcubeScope()); AnalysisLogger.getLogger().debug("WPSJobManager->Number of dataminer services "+numberofservices); int parallelisation = numberofservices*2;