Fixing behaviour
This commit is contained in:
parent
b315c845e2
commit
4a109b9459
|
@ -1,5 +1,6 @@
|
||||||
package org.gcube.vremanagement.executor;
|
package org.gcube.vremanagement.executor;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -93,7 +94,14 @@ public class SmartExecutorInitializator implements ApplicationManager {
|
||||||
try {
|
try {
|
||||||
logger.debug("Going to get Orphan Scheduled Tasks in scope {}", context);
|
logger.debug("Going to get Orphan Scheduled Tasks in scope {}", context);
|
||||||
|
|
||||||
List<ScheduledTask> scheduledTasks = smartExecutorPersistenceConnector.getScheduledTasks(pluginManager.getAvailablePlugins().keySet(), true);
|
List<ScheduledTask> gotScheduledTasks = smartExecutorPersistenceConnector.getScheduledTasks(pluginManager.getAvailablePlugins().keySet());
|
||||||
|
List<ScheduledTask> scheduledTasks = new ArrayList<>();
|
||||||
|
for(ScheduledTask scheduledTask : gotScheduledTasks) {
|
||||||
|
if(smartExecutorPersistenceConnector.isOrphan(scheduledTask, true)) {
|
||||||
|
scheduledTasks.add(scheduledTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if(scheduledTasks.size()==0){
|
if(scheduledTasks.size()==0){
|
||||||
logger.debug("No Orphan Scheduled Tasks this instance can take in charge in scope {}", context);
|
logger.debug("No Orphan Scheduled Tasks this instance can take in charge in scope {}", context);
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif
|
||||||
*/
|
*/
|
||||||
public abstract PluginStateEvolution getPluginInstanceState(UUID uuid, Integer iterationNumber) throws PluginInstanceNotFoundException, ExecutorException;
|
public abstract PluginStateEvolution getPluginInstanceState(UUID uuid, Integer iterationNumber) throws PluginInstanceNotFoundException, ExecutorException;
|
||||||
|
|
||||||
protected boolean isOrphan(ScheduledTask scheduledTask) throws ExecutorException {
|
public boolean isOrphan(ScheduledTask scheduledTask, boolean sameHost) throws ExecutorException {
|
||||||
try {
|
try {
|
||||||
UUID uuid = scheduledTask.getUUID();
|
UUID uuid = scheduledTask.getUUID();
|
||||||
|
|
||||||
|
@ -64,19 +64,20 @@ public abstract class SmartExecutorPersistenceConnector extends PluginStateNotif
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
HostingNode hostingNode = ContextProvider.get().container().profile(HostingNode.class);
|
if(sameHost) {
|
||||||
String hnAddress = hostingNode.profile().description().name();
|
HostingNode hostingNode = ContextProvider.get().container().profile(HostingNode.class);
|
||||||
|
String hnAddress = hostingNode.profile().description().name();
|
||||||
if(runOn.getHostingNode().getAddress().compareTo(hnAddress)==0){
|
|
||||||
return true;
|
if(runOn.getHostingNode().getAddress().compareTo(hnAddress)==0){
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
logger.error("Unable to check if current hosting node is the same of the one in ScheduledTask", e);
|
logger.error("Unable to check if current hosting node is the same of the one in ScheduledTask", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
String address = runOn.getEService().getAddress();
|
String address = runOn.getEService().getAddress();
|
||||||
String pluginName = scheduledTask.getLaunchParameter()
|
String pluginName = scheduledTask.getLaunchParameter().getPluginName();
|
||||||
.getPluginName();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
SmartExecutorClientImpl smartExecutorClient = new SmartExecutorClientImpl();
|
SmartExecutorClientImpl smartExecutorClient = new SmartExecutorClientImpl();
|
||||||
|
|
|
@ -181,7 +181,7 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ScheduledTask> getScheduledTasks(Collection<String> plugins, boolean onlyOrphan)
|
public List<ScheduledTask> getScheduledTasks(Collection<String> plugins)
|
||||||
throws SchedulePersistenceException {
|
throws SchedulePersistenceException {
|
||||||
ODatabaseSession oDatabaseSession = null;
|
ODatabaseSession oDatabaseSession = null;
|
||||||
try {
|
try {
|
||||||
|
@ -215,20 +215,8 @@ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnec
|
||||||
|
|
||||||
for(ODocument doc : result) {
|
for(ODocument doc : result) {
|
||||||
String json = doc.toJSON("class");
|
String json = doc.toJSON("class");
|
||||||
|
|
||||||
ScheduledTask scheduledTask = ExtendedSEMapper.getInstance().unmarshal(ScheduledTask.class, json);
|
ScheduledTask scheduledTask = ExtendedSEMapper.getInstance().unmarshal(ScheduledTask.class, json);
|
||||||
try {
|
scheduledTasks.add(scheduledTask);
|
||||||
if(onlyOrphan) {
|
|
||||||
if(isOrphan(scheduledTask)) {
|
|
||||||
scheduledTasks.add(scheduledTask);
|
|
||||||
}
|
|
||||||
}else {
|
|
||||||
scheduledTasks.add(scheduledTask);
|
|
||||||
}
|
|
||||||
} catch(Exception e) {
|
|
||||||
logger.error("An Exception occurred while evaluating if {} is orphan", json, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return scheduledTasks;
|
return scheduledTasks;
|
||||||
|
|
|
@ -101,17 +101,29 @@ public class RestSmartExecutor {
|
||||||
PluginManager pluginManager = PluginManager.getInstance();
|
PluginManager pluginManager = PluginManager.getInstance();
|
||||||
|
|
||||||
List<String> plugins = new ArrayList<>();
|
List<String> plugins = new ArrayList<>();
|
||||||
|
|
||||||
boolean orphan = false;
|
boolean orphan = false;
|
||||||
|
|
||||||
if(pluginName.compareTo(RestConstants.ORPHAN_PATH_PARAM)!=0) {
|
if(pluginName.compareTo(RestConstants.ORPHAN_PATH_PARAM)!=0) {
|
||||||
plugins.add(pluginName);
|
plugins.add(pluginName);
|
||||||
|
|
||||||
}else {
|
}else {
|
||||||
plugins.addAll(pluginManager.getAvailablePlugins().keySet());
|
// plugins.addAll(pluginManager.getAvailablePlugins().keySet());
|
||||||
orphan = true;
|
orphan = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<ScheduledTask> scheduledTasks = persistenceConnector.getScheduledTasks(plugins, orphan);
|
List<ScheduledTask> gotScheduledTasks = persistenceConnector.getScheduledTasks(plugins);
|
||||||
|
|
||||||
|
List<ScheduledTask> scheduledTasks;
|
||||||
|
|
||||||
|
if(orphan) {
|
||||||
|
scheduledTasks = new ArrayList<>();
|
||||||
|
for(ScheduledTask scheduledTask : gotScheduledTasks) {
|
||||||
|
if(persistenceConnector.isOrphan(scheduledTask, false)) {
|
||||||
|
scheduledTasks.add(scheduledTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}else {
|
||||||
|
scheduledTasks = gotScheduledTasks;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Using SEMapper because the server must not return sensitive information like token
|
* Using SEMapper because the server must not return sensitive information like token
|
||||||
|
|
|
@ -22,7 +22,7 @@ public interface ScheduledTaskPersistence {
|
||||||
* @throws SchedulePersistenceException
|
* @throws SchedulePersistenceException
|
||||||
* if fails
|
* if fails
|
||||||
*/
|
*/
|
||||||
public List<ScheduledTask> getScheduledTasks(Collection<String> plugins, boolean onlyOrphan)
|
public List<ScheduledTask> getScheduledTasks(Collection<String> plugins)
|
||||||
throws SchedulePersistenceException;
|
throws SchedulePersistenceException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class SmartExecutorPersistenceConnectorTest extends ContextTest {
|
||||||
|
|
||||||
List<String> plugins = new ArrayList<>();
|
List<String> plugins = new ArrayList<>();
|
||||||
plugins.add("hello-world-se-plugin");
|
plugins.add("hello-world-se-plugin");
|
||||||
List<ScheduledTask> lc = stc.getScheduledTasks(plugins, true);
|
List<ScheduledTask> lc = stc.getScheduledTasks(plugins);
|
||||||
|
|
||||||
logger.debug("Available Scheduled Tasks : {}", lc);
|
logger.debug("Available Scheduled Tasks : {}", lc);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue