From d55a8d2163332d817b01542137b20a120f761210 Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Wed, 13 Sep 2017 12:16:02 +0000 Subject: [PATCH] Using JobUsageRecord instead of TaskUsagerecord accounting due to changes in accounting model. git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@153048 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../pluginmanager/RunnablePlugin.java | 79 +++++++++---------- .../scheduler/SmartExecutorScheduler.java | 22 +++--- .../executor/scheduler/SmartExecutorTask.java | 4 +- 3 files changed, 51 insertions(+), 54 deletions(-) diff --git a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java index aa062f3..055f28b 100644 --- a/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java +++ b/src/main/java/org/gcube/vremanagement/executor/pluginmanager/RunnablePlugin.java @@ -9,11 +9,15 @@ import java.util.Map; import java.util.UUID; import org.gcube.accounting.datamodel.UsageRecord.OperationResult; -import org.gcube.accounting.datamodel.usagerecords.TaskUsageRecord; +import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord; import org.gcube.accounting.persistence.AccountingPersistence; import org.gcube.accounting.persistence.AccountingPersistenceFactory; +import org.gcube.common.authorization.client.Constants; +import org.gcube.common.authorization.library.AuthorizationEntry; +import org.gcube.common.authorization.library.provider.ClientInfo; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.documentstore.exception.InvalidValueException; +import org.gcube.smartgears.ContextProvider; import org.gcube.vremanagement.executor.SmartExecutorInitializator; import org.gcube.vremanagement.executor.exception.AlreadyInFinalStateException; import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException; @@ -39,13 +43,6 @@ public class RunnablePlugin> imple */ private static Logger logger = LoggerFactory.getLogger(RunnablePlugin.class); - public static final String MAX_LAUNCH_TIMES = "___max_launch_times___"; - - protected static final String SEPARATOR = "---"; - - private static final String ITERATION_NUMBER = "iterationNumber"; - private static final String FINAL_STATE = "finalState"; - protected final T plugin; protected final Map inputs; @@ -84,44 +81,44 @@ public class RunnablePlugin> imple @Override public void run(){ - //String previousToken = SecurityTokenProvider.instance.get(); + String pluginName = plugin.getPluginDeclaration().getName(); logger.info("{} : {} is going to be launched (UUID={}, iterationNumber={}) with the following inputs {}", - plugin.getPluginDeclaration().getName(), plugin.getPluginDeclaration().getVersion(), + pluginName, plugin.getPluginDeclaration().getVersion(), uuid, iterationNumber, inputs); - TaskUsageRecord taskUsageRecord = new TaskUsageRecord(); + + JobUsageRecord jobUsageRecord = new JobUsageRecord(); + + long startTime = actualStateEvolution.getTimestamp(); try { SmartExecutorInitializator.setContext(token); setState(PluginState.RUNNING); - Calendar taskStartTime = Calendar.getInstance(); - taskStartTime.setTimeInMillis(actualStateEvolution.getTimestamp()); - taskUsageRecord.setTaskStartTime(taskStartTime); - - taskUsageRecord.setTaskId(uuid.toString()); - taskUsageRecord.setResourceProperty(ITERATION_NUMBER, iterationNumber); - - taskUsageRecord.setConsumerId(SmartExecutorInitializator.getClientInfo().getId()); + jobUsageRecord.setJobName(pluginName); + jobUsageRecord.setConsumerId(SmartExecutorInitializator.getClientInfo().getId()); RunOn runOn = ScheduledTask.generateRunOn(); Ref hnRef = runOn.getHostingNode(); - taskUsageRecord.setRefHostingNodeId(hnRef.getId()); - taskUsageRecord.setHost(hnRef.getAddress()); + jobUsageRecord.setHost(hnRef.getAddress()); - /* - if(inputs!=null && inputs.size()>0){ - HashMap map = - new HashMap(); - for(String key : inputs.keySet()){ - if(inputs.get(key) instanceof Serializable){ - map.put(key, (Serializable) inputs.get(key)); - } - } - taskUsageRecord.setInputParameters(map); + + ClientInfo clientInfo = SmartExecutorInitializator.getClientInfo(); + String consumerId = clientInfo.getId(); + jobUsageRecord.setConsumerId(consumerId); + + try { + AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token); + String callerQualifier = authorizationEntry.getQualifier(); + jobUsageRecord.setCallerQualifier(callerQualifier); + }catch (Exception e) { + jobUsageRecord.setCallerQualifier("UNKNOWN"); } - */ + + jobUsageRecord.setServiceClass(ContextProvider.get().configuration().serviceClass()); + jobUsageRecord.setServiceName(ContextProvider.get().configuration().name()); + this.plugin.setUUID(uuid); this.plugin.setIterationNumber(iterationNumber); @@ -141,29 +138,27 @@ public class RunnablePlugin> imple AccountingPersistence accountingPersistence = AccountingPersistenceFactory.getPersistence(); try { - Calendar taskEndTime = Calendar.getInstance(); - taskEndTime.setTimeInMillis(actualStateEvolution.getTimestamp()); - taskUsageRecord.setTaskEndTime(taskEndTime); + long endTime = actualStateEvolution.getTimestamp(); + long duration = endTime - startTime; + jobUsageRecord.setDuration(duration); PluginState pluginState = actualStateEvolution.getPluginState(); switch (pluginState) { case DONE: - taskUsageRecord.setOperationResult(OperationResult.SUCCESS); + jobUsageRecord.setOperationResult(OperationResult.SUCCESS); break; default: - taskUsageRecord.setOperationResult(OperationResult.FAILED); + jobUsageRecord.setOperationResult(OperationResult.FAILED); break; } - taskUsageRecord.setResourceProperty(FINAL_STATE, pluginState.toString()); - accountingPersistence.account(taskUsageRecord); + accountingPersistence.account(jobUsageRecord); } catch (InvalidValueException e) { - logger.error("Unable to account {}", taskUsageRecord, e); + logger.error("Unable to account {}", jobUsageRecord, e); } - - // SmartExecutorInitializator.setContext(previousToken); + } } diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java index 9a7cefd..e21ca53 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorScheduler.java @@ -49,11 +49,11 @@ public class SmartExecutorScheduler { protected Set scheduledJobs; - protected final Scheduler shedul; + protected final Scheduler scheduler; SmartExecutorScheduler(Scheduler scheduler) throws SchedulerException { - this.shedul = scheduler; - this.shedul.start(); + this.scheduler = scheduler; + this.scheduler.start(); this.scheduledJobs = new HashSet<>(); } @@ -167,8 +167,8 @@ public class SmartExecutorScheduler { try { SmartExecutorTaskListener sejl = new SmartExecutorTaskListener(); - shedul.getListenerManager().addJobListener(sejl); - shedul.scheduleJob(jobDetail, triggerBuilder.build()); + scheduler.getListenerManager().addJobListener(sejl); + scheduler.scheduleJob(jobDetail, triggerBuilder.build()); } catch (SchedulerException e) { throw new RuntimeException(e); } @@ -218,14 +218,14 @@ public class SmartExecutorScheduler { try { logger.debug("Going to stop current SmartExecutor Task {} execution if any", uuid); - if(!shedul.checkExists(jobKey)){ + if(!scheduler.checkExists(jobKey)){ logger.trace("SmartExecutor Task {} does not have any instaces associated. Cleaning the environment. That's all folk.", uuid); scheduledJobs.remove(uuid); throw new SchedulerNotFoundException("Scheduler Not Found"); } - boolean interrupted = shedul.interrupt(jobKey); - shedul.deleteJob(jobKey); + boolean interrupted = scheduler.interrupt(jobKey); + scheduler.deleteJob(jobKey); if (interrupted) { logger.debug("SmartExecutor Task {} interrupted successfully.", uuid); } else { @@ -245,7 +245,7 @@ public class SmartExecutorScheduler { } public LaunchParameter getLaunchParameter(JobKey jobKey) throws SchedulerException{ - JobDetail jobDetail = shedul.getJobDetail(jobKey); + JobDetail jobDetail = scheduler.getJobDetail(jobKey); JobDataMap jobDataMap = jobDetail.getJobDataMap(); return (LaunchParameter) jobDataMap.get(SmartExecutorTask.LAUNCH_PARAMETER); } @@ -287,7 +287,7 @@ public class SmartExecutorScheduler { } - public void stopAll() { + public void stopAll() throws SchedulerException { List set = new ArrayList(scheduledJobs); for (UUID uuid : set) { try { @@ -297,6 +297,8 @@ public class SmartExecutorScheduler { uuid, e); } } + scheduler.clear(); + scheduler.shutdown(); } } diff --git a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java index c63032b..ff48b81 100644 --- a/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java +++ b/src/main/java/org/gcube/vremanagement/executor/scheduler/SmartExecutorTask.java @@ -209,7 +209,7 @@ public class SmartExecutorTask implements InterruptableJob { if(isMaxExecutionNumberReached()){ logger.debug("The Scheduled Max Number of execution ({}) is reached. The SmartExecutor Task {} will be descheduled", maxExecutionNumber, uuid); try { - deschedule(true); + unschedule(true); } catch (Exception e) { throw new JobExecutionException(e); } @@ -295,7 +295,7 @@ public class SmartExecutorTask implements InterruptableJob { return false; } - protected void deschedule(boolean globally) + protected void unschedule(boolean globally) throws Exception { SmartExecutorSchedulerFactory.getSmartExecutorScheduler().stop(uuid, globally); }