Using JobUsageRecord instead of TaskUsagerecord accounting due to changes in accounting model.
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/vre-management/smart-executor@153048 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
a7937b7e4f
commit
d55a8d2163
|
@ -9,11 +9,15 @@ import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.gcube.accounting.datamodel.UsageRecord.OperationResult;
|
import org.gcube.accounting.datamodel.UsageRecord.OperationResult;
|
||||||
import org.gcube.accounting.datamodel.usagerecords.TaskUsageRecord;
|
import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord;
|
||||||
import org.gcube.accounting.persistence.AccountingPersistence;
|
import org.gcube.accounting.persistence.AccountingPersistence;
|
||||||
import org.gcube.accounting.persistence.AccountingPersistenceFactory;
|
import org.gcube.accounting.persistence.AccountingPersistenceFactory;
|
||||||
|
import org.gcube.common.authorization.client.Constants;
|
||||||
|
import org.gcube.common.authorization.library.AuthorizationEntry;
|
||||||
|
import org.gcube.common.authorization.library.provider.ClientInfo;
|
||||||
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
||||||
import org.gcube.documentstore.exception.InvalidValueException;
|
import org.gcube.documentstore.exception.InvalidValueException;
|
||||||
|
import org.gcube.smartgears.ContextProvider;
|
||||||
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
|
import org.gcube.vremanagement.executor.SmartExecutorInitializator;
|
||||||
import org.gcube.vremanagement.executor.exception.AlreadyInFinalStateException;
|
import org.gcube.vremanagement.executor.exception.AlreadyInFinalStateException;
|
||||||
import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException;
|
import org.gcube.vremanagement.executor.exception.InvalidPluginStateEvolutionException;
|
||||||
|
@ -39,13 +43,6 @@ public class RunnablePlugin<T extends Plugin<? extends PluginDeclaration>> imple
|
||||||
*/
|
*/
|
||||||
private static Logger logger = LoggerFactory.getLogger(RunnablePlugin.class);
|
private static Logger logger = LoggerFactory.getLogger(RunnablePlugin.class);
|
||||||
|
|
||||||
public static final String MAX_LAUNCH_TIMES = "___max_launch_times___";
|
|
||||||
|
|
||||||
protected static final String SEPARATOR = "---";
|
|
||||||
|
|
||||||
private static final String ITERATION_NUMBER = "iterationNumber";
|
|
||||||
private static final String FINAL_STATE = "finalState";
|
|
||||||
|
|
||||||
protected final T plugin;
|
protected final T plugin;
|
||||||
|
|
||||||
protected final Map<String, Object> inputs;
|
protected final Map<String, Object> inputs;
|
||||||
|
@ -84,44 +81,44 @@ public class RunnablePlugin<T extends Plugin<? extends PluginDeclaration>> imple
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(){
|
public void run(){
|
||||||
//String previousToken = SecurityTokenProvider.instance.get();
|
String pluginName = plugin.getPluginDeclaration().getName();
|
||||||
|
|
||||||
logger.info("{} : {} is going to be launched (UUID={}, iterationNumber={}) with the following inputs {}",
|
logger.info("{} : {} is going to be launched (UUID={}, iterationNumber={}) with the following inputs {}",
|
||||||
plugin.getPluginDeclaration().getName(), plugin.getPluginDeclaration().getVersion(),
|
pluginName, plugin.getPluginDeclaration().getVersion(),
|
||||||
uuid, iterationNumber, inputs);
|
uuid, iterationNumber, inputs);
|
||||||
TaskUsageRecord taskUsageRecord = new TaskUsageRecord();
|
|
||||||
|
JobUsageRecord jobUsageRecord = new JobUsageRecord();
|
||||||
|
|
||||||
|
long startTime = actualStateEvolution.getTimestamp();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
SmartExecutorInitializator.setContext(token);
|
SmartExecutorInitializator.setContext(token);
|
||||||
|
|
||||||
setState(PluginState.RUNNING);
|
setState(PluginState.RUNNING);
|
||||||
|
|
||||||
Calendar taskStartTime = Calendar.getInstance();
|
jobUsageRecord.setJobName(pluginName);
|
||||||
taskStartTime.setTimeInMillis(actualStateEvolution.getTimestamp());
|
jobUsageRecord.setConsumerId(SmartExecutorInitializator.getClientInfo().getId());
|
||||||
taskUsageRecord.setTaskStartTime(taskStartTime);
|
|
||||||
|
|
||||||
taskUsageRecord.setTaskId(uuid.toString());
|
|
||||||
taskUsageRecord.setResourceProperty(ITERATION_NUMBER, iterationNumber);
|
|
||||||
|
|
||||||
taskUsageRecord.setConsumerId(SmartExecutorInitializator.getClientInfo().getId());
|
|
||||||
|
|
||||||
RunOn runOn = ScheduledTask.generateRunOn();
|
RunOn runOn = ScheduledTask.generateRunOn();
|
||||||
Ref hnRef = runOn.getHostingNode();
|
Ref hnRef = runOn.getHostingNode();
|
||||||
taskUsageRecord.setRefHostingNodeId(hnRef.getId());
|
jobUsageRecord.setHost(hnRef.getAddress());
|
||||||
taskUsageRecord.setHost(hnRef.getAddress());
|
|
||||||
|
|
||||||
/*
|
|
||||||
if(inputs!=null && inputs.size()>0){
|
ClientInfo clientInfo = SmartExecutorInitializator.getClientInfo();
|
||||||
HashMap<String, Serializable> map =
|
String consumerId = clientInfo.getId();
|
||||||
new HashMap<String, Serializable>();
|
jobUsageRecord.setConsumerId(consumerId);
|
||||||
for(String key : inputs.keySet()){
|
|
||||||
if(inputs.get(key) instanceof Serializable){
|
try {
|
||||||
map.put(key, (Serializable) inputs.get(key));
|
AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token);
|
||||||
}
|
String callerQualifier = authorizationEntry.getQualifier();
|
||||||
}
|
jobUsageRecord.setCallerQualifier(callerQualifier);
|
||||||
taskUsageRecord.setInputParameters(map);
|
}catch (Exception e) {
|
||||||
|
jobUsageRecord.setCallerQualifier("UNKNOWN");
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
jobUsageRecord.setServiceClass(ContextProvider.get().configuration().serviceClass());
|
||||||
|
jobUsageRecord.setServiceName(ContextProvider.get().configuration().name());
|
||||||
|
|
||||||
|
|
||||||
this.plugin.setUUID(uuid);
|
this.plugin.setUUID(uuid);
|
||||||
this.plugin.setIterationNumber(iterationNumber);
|
this.plugin.setIterationNumber(iterationNumber);
|
||||||
|
@ -141,29 +138,27 @@ public class RunnablePlugin<T extends Plugin<? extends PluginDeclaration>> imple
|
||||||
AccountingPersistence accountingPersistence =
|
AccountingPersistence accountingPersistence =
|
||||||
AccountingPersistenceFactory.getPersistence();
|
AccountingPersistenceFactory.getPersistence();
|
||||||
try {
|
try {
|
||||||
Calendar taskEndTime = Calendar.getInstance();
|
long endTime = actualStateEvolution.getTimestamp();
|
||||||
taskEndTime.setTimeInMillis(actualStateEvolution.getTimestamp());
|
long duration = endTime - startTime;
|
||||||
taskUsageRecord.setTaskEndTime(taskEndTime);
|
jobUsageRecord.setDuration(duration);
|
||||||
|
|
||||||
PluginState pluginState = actualStateEvolution.getPluginState();
|
PluginState pluginState = actualStateEvolution.getPluginState();
|
||||||
switch (pluginState) {
|
switch (pluginState) {
|
||||||
case DONE:
|
case DONE:
|
||||||
taskUsageRecord.setOperationResult(OperationResult.SUCCESS);
|
jobUsageRecord.setOperationResult(OperationResult.SUCCESS);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
taskUsageRecord.setOperationResult(OperationResult.FAILED);
|
jobUsageRecord.setOperationResult(OperationResult.FAILED);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
taskUsageRecord.setResourceProperty(FINAL_STATE, pluginState.toString());
|
|
||||||
|
|
||||||
accountingPersistence.account(taskUsageRecord);
|
accountingPersistence.account(jobUsageRecord);
|
||||||
|
|
||||||
} catch (InvalidValueException e) {
|
} catch (InvalidValueException e) {
|
||||||
logger.error("Unable to account {}", taskUsageRecord, e);
|
logger.error("Unable to account {}", jobUsageRecord, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// SmartExecutorInitializator.setContext(previousToken);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,11 +49,11 @@ public class SmartExecutorScheduler {
|
||||||
|
|
||||||
|
|
||||||
protected Set<UUID> scheduledJobs;
|
protected Set<UUID> scheduledJobs;
|
||||||
protected final Scheduler shedul;
|
protected final Scheduler scheduler;
|
||||||
|
|
||||||
SmartExecutorScheduler(Scheduler scheduler) throws SchedulerException {
|
SmartExecutorScheduler(Scheduler scheduler) throws SchedulerException {
|
||||||
this.shedul = scheduler;
|
this.scheduler = scheduler;
|
||||||
this.shedul.start();
|
this.scheduler.start();
|
||||||
this.scheduledJobs = new HashSet<>();
|
this.scheduledJobs = new HashSet<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,8 +167,8 @@ public class SmartExecutorScheduler {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
SmartExecutorTaskListener sejl = new SmartExecutorTaskListener();
|
SmartExecutorTaskListener sejl = new SmartExecutorTaskListener();
|
||||||
shedul.getListenerManager().addJobListener(sejl);
|
scheduler.getListenerManager().addJobListener(sejl);
|
||||||
shedul.scheduleJob(jobDetail, triggerBuilder.build());
|
scheduler.scheduleJob(jobDetail, triggerBuilder.build());
|
||||||
} catch (SchedulerException e) {
|
} catch (SchedulerException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -218,14 +218,14 @@ public class SmartExecutorScheduler {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
logger.debug("Going to stop current SmartExecutor Task {} execution if any", uuid);
|
logger.debug("Going to stop current SmartExecutor Task {} execution if any", uuid);
|
||||||
if(!shedul.checkExists(jobKey)){
|
if(!scheduler.checkExists(jobKey)){
|
||||||
logger.trace("SmartExecutor Task {} does not have any instaces associated. Cleaning the environment. That's all folk.", uuid);
|
logger.trace("SmartExecutor Task {} does not have any instaces associated. Cleaning the environment. That's all folk.", uuid);
|
||||||
scheduledJobs.remove(uuid);
|
scheduledJobs.remove(uuid);
|
||||||
throw new SchedulerNotFoundException("Scheduler Not Found");
|
throw new SchedulerNotFoundException("Scheduler Not Found");
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean interrupted = shedul.interrupt(jobKey);
|
boolean interrupted = scheduler.interrupt(jobKey);
|
||||||
shedul.deleteJob(jobKey);
|
scheduler.deleteJob(jobKey);
|
||||||
if (interrupted) {
|
if (interrupted) {
|
||||||
logger.debug("SmartExecutor Task {} interrupted successfully.", uuid);
|
logger.debug("SmartExecutor Task {} interrupted successfully.", uuid);
|
||||||
} else {
|
} else {
|
||||||
|
@ -245,7 +245,7 @@ public class SmartExecutorScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
public LaunchParameter getLaunchParameter(JobKey jobKey) throws SchedulerException{
|
public LaunchParameter getLaunchParameter(JobKey jobKey) throws SchedulerException{
|
||||||
JobDetail jobDetail = shedul.getJobDetail(jobKey);
|
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
|
||||||
JobDataMap jobDataMap = jobDetail.getJobDataMap();
|
JobDataMap jobDataMap = jobDetail.getJobDataMap();
|
||||||
return (LaunchParameter) jobDataMap.get(SmartExecutorTask.LAUNCH_PARAMETER);
|
return (LaunchParameter) jobDataMap.get(SmartExecutorTask.LAUNCH_PARAMETER);
|
||||||
}
|
}
|
||||||
|
@ -287,7 +287,7 @@ public class SmartExecutorScheduler {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stopAll() {
|
public void stopAll() throws SchedulerException {
|
||||||
List<UUID> set = new ArrayList<UUID>(scheduledJobs);
|
List<UUID> set = new ArrayList<UUID>(scheduledJobs);
|
||||||
for (UUID uuid : set) {
|
for (UUID uuid : set) {
|
||||||
try {
|
try {
|
||||||
|
@ -297,6 +297,8 @@ public class SmartExecutorScheduler {
|
||||||
uuid, e);
|
uuid, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
scheduler.clear();
|
||||||
|
scheduler.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -209,7 +209,7 @@ public class SmartExecutorTask implements InterruptableJob {
|
||||||
if(isMaxExecutionNumberReached()){
|
if(isMaxExecutionNumberReached()){
|
||||||
logger.debug("The Scheduled Max Number of execution ({}) is reached. The SmartExecutor Task {} will be descheduled", maxExecutionNumber, uuid);
|
logger.debug("The Scheduled Max Number of execution ({}) is reached. The SmartExecutor Task {} will be descheduled", maxExecutionNumber, uuid);
|
||||||
try {
|
try {
|
||||||
deschedule(true);
|
unschedule(true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new JobExecutionException(e);
|
throw new JobExecutionException(e);
|
||||||
}
|
}
|
||||||
|
@ -295,7 +295,7 @@ public class SmartExecutorTask implements InterruptableJob {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void deschedule(boolean globally)
|
protected void unschedule(boolean globally)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
SmartExecutorSchedulerFactory.getSmartExecutorScheduler().stop(uuid, globally);
|
SmartExecutorSchedulerFactory.getSmartExecutorScheduler().stop(uuid, globally);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue