package org.gcube.data.transfer.plugins.thredds; import static org.gcube.resources.discovery.icclient.ICFactory.client; import static org.gcube.resources.discovery.icclient.ICFactory.clientFor; import static org.gcube.resources.discovery.icclient.ICFactory.queryFor; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.common.encryption.StringEncrypter; import org.gcube.common.resources.gcore.Resource; import org.gcube.common.resources.gcore.Resources; import org.gcube.common.resources.gcore.ServiceEndpoint; import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint; import org.gcube.common.resources.gcore.ServiceEndpoint.Profile; import org.gcube.common.resources.gcore.common.Platform; import org.gcube.common.resources.gcore.utils.Group; import org.gcube.data.transfer.model.plugins.thredds.ThreddsInfo; import org.gcube.data.transfer.plugin.model.DataTransferContext; import org.gcube.informationsystem.publisher.RegistryPublisher; import org.gcube.informationsystem.publisher.RegistryPublisherFactory; import org.gcube.resources.discovery.client.api.DiscoveryClient; import org.gcube.resources.discovery.client.queries.api.SimpleQuery; import org.gcube.resources.discovery.client.queries.impl.QueryBox; import org.gcube.smartgears.configuration.application.ApplicationConfiguration; import org.xml.sax.SAXException; import lombok.Synchronized; import lombok.extern.slf4j.Slf4j; @Slf4j public class ThreddsInstanceManager { protected static ThreddsInstanceManager instance=null; protected static final ConcurrentSkipListSet checkedTokens=new ConcurrentSkipListSet<>(); @Synchronized public static ThreddsInstanceManager get(DataTransferContext ctx) { if(instance==null) instance= new ThreddsInstanceManager(ctx); return instance; } // ***************** INSTANCE LOGIC protected ThreddsInfo cachedInfo=null; protected ExecutorService executor=Executors.newSingleThreadExecutor(); //loaded at construction time protected DataTransferContext ctx=null; protected ApplicationConfiguration threddsConfig=null; protected String threddsAdminUser; protected String threddsAdminPassword; protected String threddsPersistenceLocation; protected String threddsVersionString; protected String currentHostname; protected String currentGHNId; protected ThreddsInstanceManager(DataTransferContext context) { log.warn("Instance Creation. Should happen only once. Loading information from context.."); this.ctx=context; try { log.info("Loading proxy configuration.."); currentHostname=ctx.getCtx().configuration().proxyAddress().hostname(); if(currentHostname==null||currentHostname.isEmpty()) throw new Exception("Proxy is : "+currentHostname); }catch(Exception e) { log.info("Unable to get proxy..",e); currentHostname=ctx.getCtx().container().configuration().hostname(); } log.info("Hostname to be used is "+currentHostname); currentGHNId=ctx.getCtx().container().id(); String tomcatSecurityPath=System.getenv("WEB_CONTAINER_HOME")+"/conf/tomcat-users.xml"; log.info("Loading security from {} ",tomcatSecurityPath); try{ TomcatSecurityHandler tomcatHandler=new TomcatSecurityHandler(tomcatSecurityPath); threddsAdminUser=tomcatHandler.getThreddsAdminUser(); threddsAdminPassword=tomcatHandler.getThreddsAdminPassword(); }catch(Exception e) { throw new RuntimeException("Unable to parse security file "+tomcatSecurityPath,e); } log.info("Looking for Thredds Application Configuration.. "); //Use Future Future future=executor.submit(new ApplicationConfigurationRetriever(ctx)); try { threddsConfig=future.get(); if(threddsConfig==null) throw new Exception("Returned Application Configuration is null"); threddsPersistenceLocation=threddsConfig.persistence().location(); threddsVersionString=threddsConfig.version(); }catch(Exception e) { throw new RuntimeException("Unable to find Application Configuration for thredds.",e); } } /** * * * @return * @throws IOException * @throws SAXException * @throws Exception */ public synchronized ThreddsInfo getInfo() throws SAXException, IOException { if(cachedInfo==null) { log.info("Loading ThreddsInfo.."); String threddsContentRoot=getContentRoot(); log.debug("Found content root at {} ",threddsContentRoot); //Host ThreddsInfo info=new ThreddsInfo(); info.setHostname(currentHostname); info.setGhnId(currentGHNId); info.setLocalBasePath(threddsPersistenceLocation); info.setInstanceBaseUrl("https://"+info.getHostname()+"/thredds"); String mainCatalogPath=threddsContentRoot+"/catalog.xml"; log.info("Loading catalog information from {} ",mainCatalogPath); XMLCatalogHandler handler=new XMLCatalogHandler(new File(mainCatalogPath)); info.setCatalog(handler.getCatalogDescriptor()); //tomcat security info.setAdminPassword(threddsAdminPassword); info.setAdminUser(threddsAdminUser); //version String[] splittedVersion=threddsVersionString.split("\\."); info.setVersion(Integer.parseInt(splittedVersion[0])); info.setMinor(Integer.parseInt(splittedVersion[1])); info.setRevision(Integer.parseInt(splittedVersion[2])); log.info("Loaded ThreddsInfo is {} ",info); cachedInfo=info; } return cachedInfo; } public synchronized void clearCache() { log.debug("Clearing cache.."); cachedInfo=null; } public String getCurrentHostname() { return currentHostname; } public String getMainCatalogFile(){ return getContentRoot()+"/catalog.xml"; } public String getContentRoot() { return threddsPersistenceLocation; } public XMLCatalogHandler mainCatalogHandler() throws SAXException, IOException, Exception { return new XMLCatalogHandler(new File(getMainCatalogFile())); } @Synchronized public void updatePublishedInfo() throws Exception { String token=SecurityTokenProvider.instance.get(); log.info("Checking IS with token {} ",token); if(!checkedTokens.contains(token)) { checkedTokens.add(token); getInfo(); String currentHostname=cachedInfo.getHostname(); log.info("Checking IS Information, host is {}",currentHostname); List currentEndpoints=queryForServiceEndpoints(LocalConfiguration.getProperty(LocalConfiguration.THREDDS_SE_CATEGORY), LocalConfiguration.getProperty(LocalConfiguration.THREDDS_SE_PLATFORM)); ServiceEndpoint toCheck=null; //Checking by host log.debug("Found {} Service Endpoints, checking by hostname {} ",currentEndpoints.size()); for(ServiceEndpoint se:currentEndpoints) { String host=se.profile().runtime().hostedOn(); try{ if(isSameHost(host, currentHostname)) {toCheck=se; break;} }catch(Throwable t) { log.warn("Unable to check Host {} ",host,t); } } if(toCheck==null) { log.info("ServiceEndpoint not found, going to create one.."); // CREATE NEW ServiceEndpoint newSE=getNewServiceEndpoint(); updateAndWait(newSE, true); }else { // Check found boolean updateSE=true; String adminAPName=LocalConfiguration.getProperty(LocalConfiguration.THREDDS_SE_REMOTE_MANAGEMENT_ACCESS); log.debug("Looking for Access Point {} ",adminAPName); Group existentAP=toCheck.profile().accessPoints(); boolean addAccessPoint=true; for(AccessPoint ap:existentAP) if(ap.name().equals(adminAPName)) { addAccessPoint=false; // FOUND AP String pwd=decryptString(ap.password()); if(ap.username().equalsIgnoreCase(threddsAdminUser)&&pwd.equalsIgnoreCase(threddsAdminPassword)) { log.info("ServiceEndopint is up to date."); updateSE=false; }else { // AP is not up to date ap.credentials(threddsAdminPassword, threddsAdminUser); } } if(updateSE) { log.debug("Need to update SE... "); if(addAccessPoint) { log.debug("Access point {} not found. Adding it.. ",adminAPName); existentAP.add(getNewAccessPoint()); } ServiceEndpoint updated=updateAndWait(toCheck,false); log.info("Updated {} ",updated); } } }else log.info("Skipping token {}, already checked.",token); } private static String registerServiceEndpoint(ServiceEndpoint toRegister) { RegistryPublisher rp=RegistryPublisherFactory.create(); Resource r=rp.create(toRegister); return r.id(); } public static ServiceEndpoint update(ServiceEndpoint toUpdate) { RegistryPublisher rp=RegistryPublisherFactory.create(); return rp.update(toUpdate); } private ServiceEndpoint getNewServiceEndpoint() { ServiceEndpoint toReturn=new ServiceEndpoint(); Profile profile=toReturn.newProfile(); profile.category(LocalConfiguration.getProperty(LocalConfiguration.THREDDS_SE_CATEGORY)); profile.name("Thredds on "+cachedInfo.getHostname()); profile.description("Thredds on "+cachedInfo.getHostname()); // TODO Gather info on version Platform platform=profile.newPlatform(); platform.version((short)cachedInfo.getVersion()); platform.minorVersion((short)cachedInfo.getMinor()); platform.revisionVersion((short)cachedInfo.getRevision()); platform.buildVersion((short)cachedInfo.getBuild()); platform.name(LocalConfiguration.getProperty(LocalConfiguration.THREDDS_SE_PLATFORM)); org.gcube.common.resources.gcore.ServiceEndpoint.Runtime runtime=profile.newRuntime(); runtime.ghnId(cachedInfo.getGhnId()); runtime.hostedOn(cachedInfo.getHostname()); runtime.status("READY"); profile.accessPoints().add(getNewAccessPoint()); return toReturn; } private static ServiceEndpoint updateAndWait(ServiceEndpoint toUpdate,boolean isNew) { boolean equals=true; boolean timeoutReached=false; long timeout=LocalConfiguration.getTTL(LocalConfiguration.IS_REGISTRATION_TIMEOUT); log.info("Going to register {}. Timeout is {} ",toUpdate.id(),timeout); String toUpdateString=marshal(toUpdate); log.debug("Serialized resource is {} ",toUpdateString); if(isNew) registerServiceEndpoint(toUpdate); else update(toUpdate); long updateTime=System.currentTimeMillis(); String updatedString=null; do { try { Thread.sleep(500); } catch (InterruptedException e) {} List byIdResults=queryById(toUpdate.id()); if(byIdResults.isEmpty()) { equals=false; }else { updatedString=byIdResults.get(0); equals=toUpdateString.equals(updatedString); } timeoutReached=(System.currentTimeMillis()-updateTime)>timeout; }while(equals&&(!timeoutReached)); if(timeoutReached) log.warn("Timeout reached. Check if {} is updated ",toUpdate.id()); return querySEById(toUpdate.id()); } public static List queryById(String id) { DiscoveryClient client = client(); String queryString ="declare namespace ic = 'http://gcube-system.org/namespaces/informationsystem/registry'; "+ "for $profiles in collection('/db/Profiles')//Document/Data/ic:Profile/Resource "+ "where $profiles/ID/text() eq '"+id+"'"+ " return $profiles"; return client.submit(new QueryBox(queryString)); } public static ServiceEndpoint querySEById(String id) { SimpleQuery query = queryFor(ServiceEndpoint.class); query.addCondition("$resource/ID/text() eq '"+id+"'"); DiscoveryClient client = clientFor(ServiceEndpoint.class); return client.submit(query).get(0); } public static String marshal(Resource res) { ByteArrayOutputStream stream=new ByteArrayOutputStream(); Resources.marshal(res, stream); return stream.toString(); } private AccessPoint getNewAccessPoint() { AccessPoint toReturn=new AccessPoint(); toReturn.credentials(encrypt(threddsAdminPassword), threddsAdminUser); toReturn.description("Thredds Remote Management credentials"); toReturn.name(LocalConfiguration.getProperty(LocalConfiguration.THREDDS_SE_REMOTE_MANAGEMENT_ACCESS)); toReturn.address("https://"+getCurrentHostname()+"/thredds/admin/debug?catalogs/reinit"); return toReturn; } static String decryptString(String toDecrypt){ try{ return StringEncrypter.getEncrypter().decrypt(toDecrypt); }catch(Exception e) { throw new RuntimeException("Unable to decrypt.",e); } } static String encrypt(String toEncrypt) { try{ return StringEncrypter.getEncrypter().encrypt(toEncrypt); }catch(Exception e) { throw new RuntimeException("Unable to Encrypt.",e); } } static List queryForServiceEndpoints(String category, String platformName){ log.debug("Querying for Service Endpoints [category : {} , platformName : {}]",category,platformName); SimpleQuery query = queryFor(ServiceEndpoint.class); query.addCondition("$resource/Profile/Category/text() eq '"+category+"'") .addCondition("$resource/Profile/Platform/Name/text() eq '"+platformName+"'"); // .setResult("$resource/Profile/AccessPoint"); DiscoveryClient client = clientFor(ServiceEndpoint.class); return client.submit(query); } static boolean isSameHost(String toTestHost,String toLookForHost) throws UnknownHostException { log.debug("Checking same hosts {},{}",toTestHost,toLookForHost); if(toTestHost.equalsIgnoreCase(toLookForHost)) return true; else { InetAddress[] toTestHostIPs=InetAddress.getAllByName(toTestHost); InetAddress[] toLookForHostIPs=InetAddress.getAllByName(toLookForHost); log.debug("Checking IPs. ToTestIPs {}, ToLookForIPs {} ",toTestHostIPs,toLookForHostIPs); for(InetAddress toTestIP:toTestHostIPs) { for(InetAddress toLookForIP:toLookForHostIPs) if(toTestIP.equals(toLookForIP)) return true; } } log.debug("HOSTS are different."); return false; } }