You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sdi-service/src/main/java/org/gcube/spatial/data/sdi/engine/impl/cache/AbstractISModule.java

157 lines
6.3 KiB
Java

package org.gcube.spatial.data.sdi.engine.impl.cache;
import java.util.ArrayList;
import java.util.List;
import org.gcube.common.resources.gcore.GCoreEndpoint;
import org.gcube.common.resources.gcore.ServiceEndpoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.Profile;
import org.gcube.common.resources.gcore.common.Platform;
import org.gcube.spatial.data.geonetwork.utils.ScopeUtils;
import org.gcube.spatial.data.sdi.LocalConfiguration;
import org.gcube.spatial.data.sdi.engine.impl.faults.InvalidServiceDefinitionException;
import org.gcube.spatial.data.sdi.engine.impl.faults.ServiceRegistrationException;
import org.gcube.spatial.data.sdi.model.health.Level;
import org.gcube.spatial.data.sdi.model.health.ServiceHealthReport;
import org.gcube.spatial.data.sdi.model.health.Status;
import org.gcube.spatial.data.sdi.model.services.ServiceDefinition;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class AbstractISModule<T> implements ISModule<T> {
protected abstract String getGCoreEndpointServiceClass();
protected abstract String getGCoreEndpointServiceName();
protected abstract String getServiceEndpointAccessPointName();
protected abstract String getServiceEndpointCategory();
protected abstract String getServiceEndpointPlatformName();
protected abstract String getManagedServiceType();
@Override
public ServiceHealthReport getHealthReport() {
List<Status> checkStatuses=new ArrayList<>();
try {
log.trace("Checking {} heatlh under context {} ",getManagedServiceType(),ScopeUtils.getCurrentScope());
//Check if existing
List<GCoreEndpoint> gCoreEndpoints=getGcoreEndpoints();
List<ServiceEndpoint> serviceEndpoints=getServiceEndpoints();
log.debug("Found {} GC Endpoints and {} SE Endpoints",gCoreEndpoints.size(),serviceEndpoints.size());
if(serviceEndpoints.isEmpty())
if(gCoreEndpoints.isEmpty())checkStatuses.add(new Status("No "+getManagedServiceType()+" found in context "+ScopeUtils.getCurrentScope(),Level.ERROR));
else checkStatuses.add(new Status("Unregistered "+getManagedServiceType()+" instances found. Check following messages",Level.ERROR));
//For each GC check for missing SE
for(GCoreEndpoint gc:gCoreEndpoints) {
String hostname= gc.profile().endpoints().iterator().next().uri().getHost();
if(ISUtils.getGCEByHostname(hostname, serviceEndpoints)==null) {
String msg="Found unregistered "+getManagedServiceType()+" hosted on "+hostname;
log.debug(msg);
checkStatuses.add(new Status(msg,Level.WARNING));
}
}
for(ServiceEndpoint se : serviceEndpoints) {
checkStatuses.addAll(performInstanceCheck(se));
}
}catch(Throwable t) {
log.error("Unable to perform checks", t);
checkStatuses.add(new Status("Internal error while checking "+getManagedServiceType()+" Status.",Level.ERROR));
}
return new ServiceHealthReport(checkStatuses);
}
protected abstract List<Status> performInstanceCheck(ServiceEndpoint se);
protected List<GCoreEndpoint> getGcoreEndpoints(){
String geClass=getGCoreEndpointServiceClass();
String geName=getGCoreEndpointServiceName();
return ISUtils.queryForGCoreEndpoint(geClass, geName);
}
protected List<ServiceEndpoint> getServiceEndpoints(){
String seCategory=getServiceEndpointCategory();
String sePlatform=getServiceEndpointPlatformName();
return ISUtils.queryForServiceEndpoints(seCategory, sePlatform);
}
@Override
@Synchronized
public String registerService(ServiceDefinition definition) throws ServiceRegistrationException {
log.info("Registering {} ",definition);
log.debug("Checking definition type..");
checkDefinitionType(definition);
log.debug("Checking IS ..");
checkDefinition(definition);
log.debug("Performing type specific checks..");
checkDefinitionForServiceType(definition);
log.debug("Preparing ServiceEndpoint.. ");
ServiceEndpoint ep=prepareEndpoint(definition);
log.debug("Publishing resource..");
String id=ISUtils.registerService(ep);
List<String> registered=null;
long registrationTime=System.currentTimeMillis();
long timeout=Long.parseLong(LocalConfiguration.get().getProperty(LocalConfiguration.IS_REGISTRATION_TIMEOUT));
do{
log.debug("Waiting for IS to update. Passed {} ms.",(System.currentTimeMillis()-registrationTime));
try{Thread.sleep(500);
}catch(Exception e) {}
registered=ISUtils.queryById(id);
}while(registered.isEmpty()&&((System.currentTimeMillis()-registrationTime)<=timeout));
if(registered.isEmpty()) {
log.warn("Registered resource [ID :{}] was not found before Timeout of {} ms. Returning id. ",id,timeout);
return id;
}else return registered.get(0);
}
protected abstract void checkDefinitionForServiceType(ServiceDefinition definition) throws InvalidServiceDefinitionException;
protected abstract void checkDefinitionType(ServiceDefinition definition) throws InvalidServiceDefinitionException;
protected void checkDefinition(ServiceDefinition definition) throws ServiceRegistrationException {
String hostname=definition.getHostname();
List<ServiceEndpoint> serviceEndpoints=getServiceEndpoints();
ServiceEndpoint existing=ISUtils.getGCEByHostname(hostname, serviceEndpoints);
if(existing!=null) {
throw new ServiceRegistrationException("Service is already registered");
}
List<GCoreEndpoint> gCoreNodes=getGcoreEndpoints();
GCoreEndpoint running=ISUtils.getGCEByHostname(hostname, gCoreNodes);
if(running==null) throw new ServiceRegistrationException("No GCoreEndpoint found for "+definition);
}
protected ServiceEndpoint prepareEndpoint(ServiceDefinition definition) {
ServiceEndpoint toCreate=new ServiceEndpoint();
Profile profile=toCreate.newProfile();
profile.category(getServiceEndpointCategory());
profile.description(definition.getDescription());
Platform platform=profile.newPlatform();
platform.name(getServiceEndpointPlatformName()).
version(definition.getMajorVersion()).
minorVersion(definition.getMinorVersion()).
revisionVersion(definition.getReleaseVersion());
org.gcube.common.resources.gcore.ServiceEndpoint.Runtime runtime=profile.newRuntime();
runtime.hostedOn(definition.getHostname());
GCoreEndpoint relatedGHN=ISUtils.getGCEByHostname(definition.getHostname(), getGcoreEndpoints());
runtime.ghnId(relatedGHN.id());
runtime.status("READY");
return toCreate;
}
}