package org.gcube.smartgears.handlers.container.lifecycle; import static java.util.concurrent.TimeUnit.SECONDS; import static org.gcube.common.events.Observes.Kind.critical; import static org.gcube.common.events.Observes.Kind.resilient; import static org.gcube.smartgears.Constants.profile_management; import static org.gcube.smartgears.handlers.ProfileEvents.changed; import static org.gcube.smartgears.lifecycle.container.ContainerLifecycle.activation; import static org.gcube.smartgears.lifecycle.container.ContainerLifecycle.failure; import static org.gcube.smartgears.lifecycle.container.ContainerLifecycle.part_activation; import static org.gcube.smartgears.lifecycle.container.ContainerLifecycle.shutdown; import static org.gcube.smartgears.lifecycle.container.ContainerLifecycle.stop; import static org.gcube.smartgears.lifecycle.container.ContainerState.active; import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledFuture; import org.gcube.common.events.Observes; import org.gcube.smartgears.configuration.Mode; import org.gcube.smartgears.context.Property; import org.gcube.smartgears.context.container.ContainerContext; import org.gcube.smartgears.handlers.container.ContainerHandler; import org.gcube.smartgears.handlers.container.ContainerLifecycleEvent; import org.gcube.smartgears.lifecycle.container.ContainerLifecycle; import org.gcube.smartgears.lifecycle.container.ContainerState; import org.gcube.smartgears.managers.ContextEvents; import org.gcube.smartgears.provider.ProviderFactory; import org.gcube.smartgears.publishing.Publisher; import org.gcube.smartgears.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * Manages the resource profile of the application. *

* * The manager: * *

* * @author Fabio Simeoni * @see ProfileBuilder * @see ProfilePublisherImpl */ public class ContainerProfileManager extends ContainerHandler { Logger log = LoggerFactory.getLogger(ContainerProfileManager.class); private ContainerContext context; private ScheduledFuture periodicUpdates; private static final String PUBLISHED_PROP = "published"; private List publishers; @Override public void onStart(ContainerLifecycleEvent.Start e) { context = e.context(); activated(); // note we don't fire profile events, but wait for the final startup // outcome which // will result in a state change. only then we publish and store the // profile // this avoids the redundancy and performance penalty of storing and // publishing multiple // times in rapid succession (which would be correct). Revise if proves // problematic in corner // cases. } private void activated() { publishers = context.configuration().mode() != Mode.offline ? ProviderFactory.provider().publishers() : Collections.emptyList(); registerObservers(); schedulePeriodicUpdates(); } private void registerObservers() { context.events().subscribe(new Object() { @Observes({ activation, part_activation, shutdown, stop, failure }) void onChanged(ContainerLifecycle lc) { // since we do not know the observers, they will deal with failures and their // consequences // any that comes back will be logged in this event thread context.events().fire(context, changed); } @Observes(value = changed, kind = critical) void publishAfterChange(ContainerContext context) { log.info("Publish after profile Change event called -- contains published prop? {}",context.properties().contains(PUBLISHED_PROP)); // if we've failed before first publication do not try to publish // (we may well have failed there) if (context.lifecycle().state() != ContainerState.failed) { if (!context.properties().contains(PUBLISHED_PROP)) { context.properties().add(new Property(PUBLISHED_PROP, true)); log.info("publishing container for the first time"); publishers.parallelStream().forEach(p -> { try { p.create(context, context.authorizationProvider().getContexts()); } catch (Throwable e) { log.error( "cannot publish container for first time with publisher type {} (see details)", p.getClass().getCanonicalName(), e); } }); } else publishers.parallelStream().forEach(p -> { try { p.update(context); } catch (Throwable e) { log.error("cannot publish container with publisher type {} (see details)", p.getClass().getCanonicalName(), e); } }); } } @Observes(value = ContextEvents.ADD_CONTEXT_TO_CONTAINER) void addTo(String scope) { log.info("add_to_context event arrived in container"); for (Publisher publisher : publishers) try { log.trace("publishing container within new scope"); publisher.create(context, Collections.singleton(scope)); } catch (Exception e) { log.error("cannot add container to {} with publisher type {} (see details)", scope, publisher.getClass().getCanonicalName(), e); // since we've failed no published event is fired and profile // will not be stored. // we do it manually to ensure we leave some local trace of the // changed profile. // TODO: CHECK --- store(profile); } } @Observes(value = ContextEvents.REMOVE_CONTEXT_FROM_CONTAINER) void removeFrom(String scope) { log.info("remove_from_context event arrived in container"); for (Publisher publisher : publishers) try { log.trace("unpublishing container from context {}", scope); publisher.remove(context, Collections.singleton(scope)); } catch (Exception e) { log.error("cannot remove container from {} with publisher type {} (see details)", scope, publisher.getClass().getCanonicalName(), e); // since we've failed no published event is fired and profile // will not be stored. // we do it manually to ensure we leave some local trace of the // changed profile. // TODO: CHECK --- store(profile); } } }); } private void schedulePeriodicUpdates() { // register to cancel updates context.events().subscribe(new Object() { // we register it in response to lifecycle events so that we can stop and resume // along with application @Observes(value = { activation, part_activation }, kind = resilient) synchronized void restartPeriodicUpdates(ContainerLifecycle lc) { // already running if (periodicUpdates != null) return; if (lc.state() == active) log.info("scheduling periodic updates of container profile"); else log.info("resuming periodic updates of container profile"); final Runnable updateTask = new Runnable() { public void run() { context.events().fire(context, changed); } }; periodicUpdates = Utils.scheduledServicePool.scheduleAtFixedRate(updateTask, 3, context.configuration().publicationFrequency(), SECONDS); } @Observes(value = { stop, failure, shutdown }, kind = resilient) synchronized void cancelPeriodicUpdates(ContainerLifecycle ignore) { if (periodicUpdates != null) { log.trace("stopping periodic updates of container profile"); try { periodicUpdates.cancel(true); periodicUpdates = null; } catch (Exception e) { log.warn("could not stop periodic updates of container profile", e); } } } }); } @Override public String toString() { return profile_management; } }