resource-registry-handlers/src/main/java/org/gcube/smartgears/handler/resourceregistry/HostingNodeHandler.java

264 lines
9.6 KiB
Java

package org.gcube.smartgears.handler.resourceregistry;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.gcube.common.events.Observes.Kind.resilient;
import static org.gcube.smartgears.handlers.ProfileEvents.addToContext;
import static org.gcube.smartgears.handlers.ProfileEvents.removeFromContext;
import static org.gcube.smartgears.lifecycle.container.ContainerLifecycle.activation;
import static org.gcube.smartgears.lifecycle.container.ContainerLifecycle.failure;
import static org.gcube.smartgears.lifecycle.container.ContainerLifecycle.part_activation;
import static org.gcube.smartgears.lifecycle.container.ContainerLifecycle.shutdown;
import static org.gcube.smartgears.lifecycle.container.ContainerLifecycle.stop;
import static org.gcube.smartgears.lifecycle.container.ContainerState.active;
import static org.gcube.smartgears.utils.Utils.rethrowUnchecked;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import javax.xml.bind.annotation.XmlRootElement;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.events.Observes;
import org.gcube.informationsystem.model.reference.entities.Facet;
import org.gcube.informationsystem.model.reference.entities.Resource;
import org.gcube.informationsystem.model.reference.relations.ConsistsOf;
import org.gcube.informationsystem.resourceregistry.api.exceptions.ResourceRegistryException;
import org.gcube.resourcemanagement.model.reference.entities.facets.StateFacet;
import org.gcube.resourcemanagement.model.reference.entities.resources.HostingNode;
import org.gcube.smartgears.context.Property;
import org.gcube.smartgears.context.container.ContainerContext;
import org.gcube.smartgears.handler.resourceregistry.resourcemanager.HostingNodeManager;
import org.gcube.smartgears.handlers.container.ContainerHandler;
import org.gcube.smartgears.handlers.container.ContainerLifecycleEvent.Start;
import org.gcube.smartgears.lifecycle.container.ContainerLifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages the {@link HostingNode} {@link Resource} of the application.
* <p>
* The manager:
* <ul>
* <li>creates the {@link HostingNode} {@link Resource} and the facets it
* {@link ConsistsOf} when the container starts for the first time;</li>
* <li>update the {@link StateFacet} when the application becomes
* active, and at any lifecycle change thereafter;</li>
* <li>schedule a periodic update of {@link Facet}s containing variables
* information.</li>
* </ul>
* </p>
*
* @author Luca Frosini (ISTI-CNR)
*/
@XmlRootElement(name = Constants.RESOURCE_MANAGEMENT)
public class HostingNodeHandler extends ContainerHandler {
private static Logger logger = LoggerFactory.getLogger(HostingNodeHandler.class);
private ContainerContext containerContext;
private ScheduledFuture<?> periodicUpdates;
protected HostingNodeManager hostingNodeManager;
public HostingNodeHandler() {
super();
}
@Override
public void onStart(Start event) {
try {
logger.info("{} onStart started", this.getClass().getSimpleName());
this.containerContext = event.context();
init();
registerObservers();
schedulePeriodicUpdates();
logger.info("{} onStart terminated", this.getClass().getSimpleName());
} catch (Throwable re) {
logger.error("onStart failed", re);
}
}
protected void removeResourceFromOldContexts(Set<UUID> startContexts, Set<UUID> resourceContexts) throws ResourceRegistryException {
Set<UUID> contextsToRemove = new HashSet<>(resourceContexts);
contextsToRemove.removeAll(startContexts);
for(UUID contextToRemove : contextsToRemove) {
hostingNodeManager.removeFromContext(contextToRemove);
}
}
private void init() {
ClassLoader contextCL = Thread.currentThread().getContextClassLoader();
String previousToken = SecurityTokenProvider.instance.get();
try {
Thread.currentThread().setContextClassLoader(HostingNodeHandler.class.getClassLoader());
boolean create = true;
List<String> startTokens = containerContext.configuration().startTokens();
String firstToken = startTokens.iterator().next();
ContextUtility.setContextFromToken(firstToken);
hostingNodeManager = new HostingNodeManager(containerContext);
Set<UUID> startContextsUUID = new HashSet<>();
for (String token : startTokens) {
UUID contextUUID = ContextUtility.getContextUUID(token);
startContextsUUID.add(contextUUID);
if (create) {
hostingNodeManager.createHostingNode();
containerContext.properties().add(new Property(Constants.HOSTING_NODE_MANAGER_PROPERTY, hostingNodeManager));
create = false;
} else {
hostingNodeManager.addToContext(contextUUID);
}
}
Set<UUID> resourceContextsUUID = hostingNodeManager.getContextsUUID().keySet();
removeResourceFromOldContexts(startContextsUUID, resourceContextsUUID);
} catch (Throwable e) {
rethrowUnchecked(e);
} finally {
ContextUtility.setContextFromToken(previousToken);
Thread.currentThread().setContextClassLoader(contextCL);
}
logger.info("{} init() terminated", this.getClass().getSimpleName());
}
private void registerObservers() {
containerContext.events().subscribe(new Object() {
@Observes({ activation, part_activation, shutdown, stop, failure })
void onChanged(ContainerLifecycle cl) {
ClassLoader contextCL = Thread.currentThread().getContextClassLoader();
String previousToken = SecurityTokenProvider.instance.get();
try {
Thread.currentThread().setContextClassLoader(HostingNodeHandler.class.getClassLoader());
if (previousToken == null) {
String token = containerContext.configuration().startTokens().iterator().next();
ContextUtility.setContextFromToken(token);
}
hostingNodeManager.updateFacets();
} catch (Exception e) {
logger.error("Failed to update {} State", HostingNode.NAME, e);
} finally {
ContextUtility.setContextFromToken(previousToken);
Thread.currentThread().setContextClassLoader(contextCL);
}
}
@Observes(value = addToContext)
void addTo(String token) {
ClassLoader contextCL = Thread.currentThread().getContextClassLoader();
String previousToken = SecurityTokenProvider.instance.get();
try {
Thread.currentThread().setContextClassLoader(HostingNodeHandler.class.getClassLoader());
ContextUtility.setContextFromToken(token);
UUID contextUUID = ContextUtility.getContextUUID(token);
hostingNodeManager.addToContext(contextUUID);
} catch (Exception e) {
logger.error("Failed to update Service State", e);
} finally {
ContextUtility.setContextFromToken(previousToken);
Thread.currentThread().setContextClassLoader(contextCL);
}
}
@Observes(value = removeFromContext)
void removeFrom(String token) {
ClassLoader contextCL = Thread.currentThread().getContextClassLoader();
String previousToken = SecurityTokenProvider.instance.get();
try {
Thread.currentThread().setContextClassLoader(HostingNodeHandler.class.getClassLoader());
ContextUtility.setContextFromToken(token);
UUID contextUUID = ContextUtility.getContextUUID(token);
hostingNodeManager.removeFromContext(contextUUID);
} catch (Exception e) {
logger.error("Failed to update Service State", e);
} finally {
ContextUtility.setContextFromToken(previousToken);
Thread.currentThread().setContextClassLoader(contextCL);
}
}
});
}
private void schedulePeriodicUpdates() {
// register to cancel updates
containerContext.events().subscribe(
new Object() {
final ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
// we register it in response to lifecycle events so that we can
// stop and resume along with application
@Observes(value = { activation, part_activation }, kind = resilient)
synchronized void restartPeriodicUpdates(ContainerLifecycle cl) {
// already running
if (periodicUpdates != null) {
return;
}
if (cl.state() == active) {
logger.info("Scheduling periodic updates of {}", HostingNode.NAME);
} else {
logger.info("Resuming periodic updates of {}", HostingNode.NAME);
}
final Runnable updateTask = new Runnable() {
public void run() {
String previousToken = SecurityTokenProvider.instance.get();
if (previousToken == null) {
String token = containerContext.configuration().startTokens().iterator().next();
ContextUtility.setContextFromToken(token);
}
try {
hostingNodeManager.updateFacets();
} catch (Exception e) {
logger.error("Cannot complete periodic update of {}", HostingNode.NAME, e);
}finally {
ContextUtility.setContextFromToken(previousToken);
}
}
};
periodicUpdates = service.scheduleAtFixedRate(updateTask, 3,
containerContext.configuration().publicationFrequency(), SECONDS);
}
@Observes(value = { stop, failure, shutdown }, kind = resilient)
synchronized void cancelPeriodicUpdates(ContainerLifecycle cl) {
if (periodicUpdates != null) {
logger.trace("Stopping periodic updates of {}", HostingNode.NAME);
try {
periodicUpdates.cancel(true);
service.shutdownNow();
periodicUpdates = null;
} catch (Exception e) {
logger.warn("Could not stop periodic updates of {}", HostingNode.NAME, e);
}
}
}
});
}
@Override
public String toString() {
return Constants.RESOURCE_MANAGEMENT;
}
}