Gianpaolo Coro 2016-10-05 17:05:34 +00:00
parent 31b83790e6
commit 433f02c1ef
2 changed files with 26 additions and 14 deletions

View File

@ -99,8 +99,11 @@ public class DistributedProcessingAgentWPS {
jobManager = new WPSJobManager(); jobManager = new WPSJobManager();
// we split along right dimension so if elements are less than nodes, we should reduce the number of nodes // we split along right dimension so if elements are less than nodes, we should reduce the number of nodes
// chunkize the number of species in order to lower the computational effort of the workers // chunkize the number of species in order to lower the computational effort of the workers
subdivisiondiv = rightSetNumberOfElements / (maxElementsAllowedPerJob); int nservices = jobManager.estimateNumberOfServices(configuration.getGcubeScope());
int rest = rightSetNumberOfElements % (maxElementsAllowedPerJob); //subdivisiondiv = rightSetNumberOfElements / (maxElementsAllowedPerJob);
subdivisiondiv = nservices; //rightSetNumberOfElements / (nservices);
AnalysisLogger.getLogger().debug("Subdivision for the job "+subdivisiondiv);
int rest = rightSetNumberOfElements % (nservices);
if (rest > 0) if (rest > 0)
subdivisiondiv++; subdivisiondiv++;
if (subdivisiondiv == 0) if (subdivisiondiv == 0)

View File

@ -24,7 +24,7 @@ public class WPSJobManager {
int overallFailures = 0; int overallFailures = 0;
int overallSuccess = 0; int overallSuccess = 0;
int overallTasks = 0; int overallTasks = 0;
int nservices = -1;
boolean stopThreads = false; boolean stopThreads = false;
boolean hasResentMessages = false; boolean hasResentMessages = false;
@ -223,19 +223,12 @@ public class WPSJobManager {
return hasResentMessages; return hasResentMessages;
} }
public void uploadAndExecuteChunkized(AlgorithmConfiguration configuration, String algorithmClass, List<String> arguments, String session) throws Exception{ public int estimateNumberOfServices(String scope) throws Exception{
ExecutorService executor = null; List<String> wpsservices = InfraRetrieval.retrieveService("DataMiner", scope);
try{
int numberofservices = 1;
String callTemplate = getCallTemplate();
AnalysisLogger.getLogger().debug("WPSJobManager->Estimating the number of services");
List<String> wpsservices = InfraRetrieval.retrieveService("DataMiner", configuration.getGcubeScope());
if (wpsservices==null || wpsservices.size()==0){ if (wpsservices==null || wpsservices.size()==0){
AnalysisLogger.getLogger().debug("WPSJobManager->Error: No DataMiner GCore Endpoints found!"); AnalysisLogger.getLogger().debug("WPSJobManager->Error: No DataMiner GCore Endpoints found!");
throw new Exception ("No DataMinerWorkers GCore Endpoint found in the VRE "+configuration.getGcubeScope()); throw new Exception ("No DataMinerWorkers GCore Endpoint found in the VRE "+scope);
} }
List<String> differentServices = new ArrayList<String>(); List<String> differentServices = new ArrayList<String>();
for (String service:wpsservices){ for (String service:wpsservices){
@ -247,7 +240,23 @@ public class WPSJobManager {
} }
numberofservices = differentServices.size(); int numberofservices = differentServices.size();
AnalysisLogger.getLogger().debug("WPSJobManager->Number of found services "+numberofservices);
nservices = numberofservices;
return numberofservices;
}
public void uploadAndExecuteChunkized(AlgorithmConfiguration configuration, String algorithmClass, List<String> arguments, String session) throws Exception{
ExecutorService executor = null;
try{
int numberofservices = 1;
String callTemplate = getCallTemplate();
AnalysisLogger.getLogger().debug("WPSJobManager->Estimating the number of services");
if (nservices>0)
numberofservices = nservices;
else
numberofservices = estimateNumberOfServices(configuration.getGcubeScope());
AnalysisLogger.getLogger().debug("WPSJobManager->Number of dataminer services "+numberofservices); AnalysisLogger.getLogger().debug("WPSJobManager->Number of dataminer services "+numberofservices);
int parallelisation = numberofservices*2; int parallelisation = numberofservices*2;