package org.gcube.data.analysis.dataminermanagercl.server.dmservice.wps; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Serializable; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.zip.GZIPInputStream; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import net.opengis.ows.x11.ExceptionReportDocument; import net.opengis.ows.x11.OperationDocument.Operation; import net.opengis.wps.x100.CapabilitiesDocument; import net.opengis.wps.x100.ExecuteDocument; import net.opengis.wps.x100.ExecuteResponseDocument; import net.opengis.wps.x100.ProcessBriefType; import net.opengis.wps.x100.ProcessDescriptionType; import net.opengis.wps.x100.ProcessDescriptionsDocument; import org.apache.commons.codec.binary.Base64; import org.apache.xmlbeans.XmlException; import org.apache.xmlbeans.XmlObject; import org.apache.xmlbeans.XmlOptions; import org.n52.wps.client.ClientCapabiltiesRequest; import org.n52.wps.client.WPSClientException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Node; import org.xml.sax.SAXException; /** * * @author Giancarlo Panichi * * */ public class SClient4WPSSession implements Serializable { private static final long serialVersionUID = -1387670579312851370L; private static Logger logger = LoggerFactory.getLogger(SClient4WPSSession.class); private static final String OGC_OWS_URI = "http://www.opengeospatial.net/ows"; private static String SUPPORTED_VERSION = "1.0.0"; // private static StatWPSClientSession session; private HashMap loggedServices; private XmlOptions options = null; // a Map of public HashMap processDescriptions; private String user; private String password; /** * Initializes a WPS client session. * * @param user * user * @param password * password */ public SClient4WPSSession(String user, String password) { super(); logger.debug("Create SClient4WPSSession: [user=" + user + ", password=" + password + "]"); this.user = user; this.password = password; options = new XmlOptions(); options.setLoadStripWhitespace(); options.setLoadTrimTextBuffer(); loggedServices = new HashMap(); processDescriptions = new HashMap(); } /** * Connects to a WPS and retrieves Capabilities plus puts all available * Descriptions into cache. * * @param url * the entry point for the service. This is used as id for * further identification of the service. * @return true, if connect succeeded, false else. * @throws WPSClientException * WPSClientException */ public boolean connect(String url) throws WPSClientException { return connect(url, false); } public boolean connect(String url, boolean refresh) throws WPSClientException { logger.info("CONNECT: " + url); logger.info("Service refresh: " + refresh); if (refresh) { logger.debug("LoggedServices: " + loggedServices.keySet()); if (loggedServices.containsKey(url)) { loggedServices.remove(url); } if (processDescriptions.containsKey(url)) { processDescriptions.remove(url); } CapabilitiesDocument capsDoc = retrieveCapsViaGET(url); if (capsDoc != null) { logger.debug("Adding caps to logged services " + url); loggedServices.put(url, capsDoc); logger.debug("Logged Services key: " + loggedServices.keySet()); } else { logger.error("CapsDoc is null!"); } ProcessDescriptionsDocument processDescs = describeAllProcesses(url); if (processDescs != null && capsDoc != null) { logger.debug("Adding processes descriptions to logged services " + url); processDescriptions.put(url, processDescs); logger.debug("ProcessDescriptions key: " + processDescriptions.keySet()); return true; } else { logger.error("ProcessDescs is null!"); } logger.warn("retrieving caps failed, caps are null"); return false; } else { logger.debug("LoggedServices: " + loggedServices.keySet()); boolean registered = loggedServices.containsKey(url); if (registered) { logger.debug("Service already registered: " + url); return false; } else { logger.debug("Service not registered"); } CapabilitiesDocument capsDoc = retrieveCapsViaGET(url); if (capsDoc != null) { logger.debug("Adding caps to logged services " + url); loggedServices.put(url, capsDoc); logger.debug("Logged Services key: " + loggedServices.keySet()); } else { logger.error("CapsDoc is null!"); } ProcessDescriptionsDocument processDescs = describeAllProcesses(url); if (processDescs != null && capsDoc != null) { logger.debug("Adding processes descriptions to logged services " + url); processDescriptions.put(url, processDescs); logger.debug("ProcessDescriptions key: " + processDescriptions.keySet()); return true; } else { logger.error("ProcessDescs is null!"); } logger.warn("retrieving caps failed, caps are null"); return false; } } /** * Connects to a WPS and retrieves Capabilities plus puts all available * Descriptions into cache. * * @param url * the entry point for the service. This is used as id for * further identification of the service. * @return true, if connect succeeded, false else. * @throws WPSClientException * WPSClientException */ public boolean connectForMonitoring(String url) throws WPSClientException { logger.debug("CONNECT"); if (loggedServices.containsKey(url)) { logger.debug("Service already registered: " + url); return false; } logger.warn("retrieving caps failed, caps are null"); return false; } /** * removes a service from the session * * @param url * url */ public void disconnect(String url) { /* * if (loggedServices.containsKey(url)) { loggedServices.remove(url); * processDescriptions.remove(url); * logger.debug("service removed successfully: " + url); } */ } /** * returns the serverIDs of all loggedServices * * @return list of server ids */ public List getLoggedServices() { if (loggedServices != null && loggedServices.keySet() != null) { return new ArrayList(loggedServices.keySet()); } else { return new ArrayList(); } } /** * informs you if the descriptions for the specified service is already in * the session. in normal case it should return true :) * * @param serverID * server id * @return success */ public boolean descriptionsAvailableInCache(String serverID) { return processDescriptions.containsKey(serverID); } /** * Returns the cached processdescriptions of a service. * * * @param wpsUrl * url * @return process descriptions document * @throws IOException * IOException */ private ProcessDescriptionsDocument getProcessDescriptionsFromCache(String wpsUrl) throws IOException { if (!descriptionsAvailableInCache(wpsUrl)) { try { connect(wpsUrl); } catch (WPSClientException e) { throw new IOException("Could not initialize WPS " + wpsUrl); } } return processDescriptions.get(wpsUrl); } /** * * @param serverID * server id * @param processID * process id * @return a ProcessDescription for a specific process from Cache. * @throws IOException * IOException */ public ProcessDescriptionType getProcessDescription(String serverID, String processID) throws IOException { ProcessDescriptionType[] processes = getProcessDescriptionsFromCache(serverID).getProcessDescriptions() .getProcessDescriptionArray(); for (ProcessDescriptionType process : processes) { if (process.getIdentifier().getStringValue().equals(processID)) { return process; } } return null; } /** * Delivers all ProcessDescriptions from a WPS * * @param wpsUrl * the URL of the WPS * @return An Array of ProcessDescriptions * @throws IOException * IOException */ public ProcessDescriptionType[] getAllProcessDescriptions(String wpsUrl) throws IOException { return getProcessDescriptionsFromCache(wpsUrl).getProcessDescriptions().getProcessDescriptionArray(); } /** * looks up, if the service exists already in session. * * @param serverID * server id * @return true if registered */ public boolean serviceAlreadyRegistered(String serverID) { return loggedServices.containsKey(serverID); } /** * provides you the cached capabilities for a specified service. * * @param url * url * @return CapabilitiesDocument */ public CapabilitiesDocument getWPSCaps(String url) { return loggedServices.get(url); } /** * retrieves all current available ProcessDescriptions of a WPS. Mention: to * get the current list of all processes, which will be requested, the * cached capabilities will be used. Please keep that in mind. the retrieved * descriptions will not be cached, so only transient information! * * @param url * url * @return process descriptions document * @throws WPSClientException * WPSClientException */ public ProcessDescriptionsDocument describeAllProcesses(String url) throws WPSClientException { CapabilitiesDocument doc = loggedServices.get(url); if (doc == null) { logger.debug("serviceCaps are null, perhaps server does not exist"); return null; } ProcessBriefType[] processes = doc.getCapabilities().getProcessOfferings().getProcessArray(); String[] processIDs = new String[processes.length]; for (int i = 0; i < processIDs.length; i++) { processIDs[i] = processes[i].getIdentifier().getStringValue(); } return describeProcess(processIDs, url); } /** * retrieves the desired description for a service. the retrieved * information will not be held in cache! * * @param processIDs * one or more processIDs * @param serverID * server id * @return process description document * @throws WPSClientException * WPSClientExpcetion */ public ProcessDescriptionsDocument describeProcess(String[] processIDs, String serverID) throws WPSClientException { CapabilitiesDocument caps = this.loggedServices.get(serverID); Operation[] operations = caps.getCapabilities().getOperationsMetadata().getOperationArray(); String url = null; for (Operation operation : operations) { if (operation.getName().equals("DescribeProcess")) { url = operation.getDCPArray()[0].getHTTP().getGetArray()[0].getHref(); } } if (url == null) { throw new WPSClientException("Missing DescribeOperation in Capabilities"); } return retrieveDescriptionViaGET(processIDs, url); } /** * Executes a process at a WPS * * @param serverID * server id * @param execute * Execute document * @param rawData * true if is raw data * @return either an ExecuteResponseDocument or an InputStream if asked for * RawData or an Exception Report * @throws WPSClientException * WPSClientException */ private Object execute(String serverID, ExecuteDocument execute, boolean rawData) throws WPSClientException { CapabilitiesDocument caps = loggedServices.get(serverID); Operation[] operations = caps.getCapabilities().getOperationsMetadata().getOperationArray(); String url = null; for (Operation operation : operations) { if (operation.getName().equals("Execute")) { url = operation.getDCPArray()[0].getHTTP().getPostArray()[0].getHref(); } } if (url == null) { throw new WPSClientException( "Caps does not contain any information about the entry point for process execution"); } execute.getExecute().setVersion(SUPPORTED_VERSION); return retrieveExecuteResponseViaPOST(url, execute, rawData); } /** * Executes a process at a WPS * * * * @param serverID * server id * @param execute * exceute document * @return either an ExecuteResponseDocument or an InputStream if asked for * RawData or an Exception Report * @throws WPSClientException * WPSClientException */ public Object execute(String serverID, ExecuteDocument execute) throws WPSClientException { if (execute.getExecute().isSetResponseForm() == true && execute.getExecute().isSetResponseForm() == true && execute.getExecute().getResponseForm().isSetRawDataOutput() == true) { return execute(serverID, execute, true); } else { return execute(serverID, execute, false); } } private CapabilitiesDocument retrieveCapsViaGET(String url) throws WPSClientException { logger.debug("retrieveCapsViaGET: " + url); ClientCapabiltiesRequest req = new ClientCapabiltiesRequest(); url = req.getRequest(url); logger.debug("url req: " + url); try { String authString = user + ":" + password; logger.debug("auth string: " + authString); byte[] authEncBytes = Base64.encodeBase64(authString.getBytes()); String encoded = new String(authEncBytes); logger.debug("Base64 encoded auth string: " + encoded); URL urlObj = new URL(url); HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection(); connection.setRequestMethod("GET"); connection.setDoOutput(true); connection.setRequestProperty("Authorization", "Basic " + encoded); logger.debug("Request: " + connection.toString()); InputStream is = connection.getInputStream(); Document doc = checkInputStream(is); CapabilitiesDocument capabilitiesDocument = CapabilitiesDocument.Factory.parse(doc, options); return capabilitiesDocument; } catch (MalformedURLException e) { logger.error(e.getLocalizedMessage(), e); throw new WPSClientException("Capabilities URL seems to be unvalid: " + url, e); } catch (IOException e) { logger.error(e.getLocalizedMessage(), e); throw new WPSClientException("Error occured while retrieving capabilities from url: " + url, e); } catch (XmlException e) { logger.error(e.getLocalizedMessage(), e); throw new WPSClientException("Error occured while parsing XML", e); } } private ProcessDescriptionsDocument retrieveDescriptionViaGET(String[] processIDs, String url) throws WPSClientException { try { logger.debug("RetrieveDescription GET: " + processIDs + " url:" + url); Path tempFile = Files.createTempFile("WPSProcessDescriptions", "txt"); List lines = new ArrayList<>(); lines.add(""); Files.write(tempFile, lines, Charset.defaultCharset(), StandardOpenOption.APPEND); for (String processId : processIDs) { String[] process = { processId }; DClientDescribeProcessRequest req = new DClientDescribeProcessRequest(); req.setIdentifier(process); String requestURL = req.getRequest(url); String authString = user + ":" + password; // logger.debug("auth string: " + authString); byte[] authEncBytes = Base64.encodeBase64(authString.getBytes()); String encoded = new String(authEncBytes); // logger.debug("Base64 encoded auth string: " + encoded); URL urlObj = new URL(requestURL); HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection(); connection.setRequestMethod("GET"); connection.setDoOutput(true); connection.setRequestProperty("Authorization", "Basic " + encoded); InputStream is = connection.getInputStream(); lines = retrievesSingleDescription(is); Files.write(tempFile, lines, Charset.defaultCharset(), StandardOpenOption.APPEND); } lines = new ArrayList<>(); lines.add(""); Files.write(tempFile, lines, Charset.defaultCharset(), StandardOpenOption.APPEND); logger.debug(tempFile.toString()); Document doc = null; try (InputStream inputStream = Files.newInputStream(tempFile, StandardOpenOption.READ)) { doc = checkInputStream(inputStream); } ProcessDescriptionsDocument processDescriptionsDocument = ProcessDescriptionsDocument.Factory.parse(doc, options); Files.delete(tempFile); logger.debug("RetrieveDescriptions via GET: success"); return processDescriptionsDocument; } catch (MalformedURLException e) { logger.error("URL seems not to be valid: " + e.getLocalizedMessage(), e); throw new WPSClientException("URL seems not to be valid: " + e.getLocalizedMessage(), e); } catch (IOException e) { logger.error("Error occured while receiving data: " + e.getLocalizedMessage(), e); throw new WPSClientException("Error occured while receiving data: " + e.getLocalizedMessage(), e); } catch (XmlException e) { logger.error("Error occured while parsing ProcessDescription document: " + e.getLocalizedMessage(), e); throw new WPSClientException( "Error occured while parsing ProcessDescription document: " + e.getLocalizedMessage(), e); } catch (Throwable e) { logger.error(e.getLocalizedMessage(), e); throw new WPSClientException(e.getLocalizedMessage(), new Exception(e)); } } protected List retrievesSingleDescription(InputStream is) throws WPSClientException { try { BufferedReader br = new BufferedReader(new InputStreamReader(is)); List lines = new ArrayList<>(); String line = null; boolean elementProcessDescriptionsFound = false; boolean elementProcessDescriptionsClosureFound = false; while ((line = br.readLine()) != null) { if (elementProcessDescriptionsFound) { if (elementProcessDescriptionsClosureFound) { if (line.contains("")) { break; } else { lines.add(line); } } else { int closeIndex = line.indexOf(">"); if (closeIndex != -1) { elementProcessDescriptionsClosureFound = true; if (closeIndex == line.length() - 1) { } else { } } } } else { if (line.contains(""); if (closeIndex != -1) { elementProcessDescriptionsClosureFound = true; if (closeIndex == line.length() - 1) { } else { } } } else { } } } return lines; } catch (Throwable e) { logger.error(e.getLocalizedMessage()); e.printStackTrace(); throw new WPSClientException(e.getLocalizedMessage(), new Exception(e)); } } private InputStream retrieveDataViaPOST(XmlObject obj, String urlString) throws WPSClientException { try { logger.debug("RetrieveDataViaPost(): " + urlString); String authString = user + ":" + password; logger.debug("auth string: " + authString); byte[] authEncBytes = Base64.encodeBase64(authString.getBytes()); String encoded = new String(authEncBytes); logger.debug("Base64 encoded auth string: " + encoded); URL url = new URL(urlString); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("POST"); conn.setRequestProperty("Authorization", "Basic " + encoded); conn.setRequestProperty("Accept-Encoding", "gzip"); conn.setRequestProperty("Content-Type", "text/xml; charset=UTF-8"); conn.setDoOutput(true); obj.save(conn.getOutputStream()); InputStream input = null; String encoding = conn.getContentEncoding(); if (encoding != null && encoding.equalsIgnoreCase("gzip")) { logger.debug("Read encoding GZIP"); input = new GZIPInputStream(conn.getInputStream()); } else { logger.debug("Read encoding: " + encoding); input = conn.getInputStream(); } return input; } catch (MalformedURLException e) { e.printStackTrace(); throw new WPSClientException("URL seems to be unvalid", e); } catch (IOException e) { e.printStackTrace(); throw new WPSClientException("Error while transmission", e); } } private Document checkInputStream(InputStream is) throws WPSClientException { try { DocumentBuilderFactory fac = DocumentBuilderFactory.newInstance(); fac.setNamespaceAware(true); Document doc = fac.newDocumentBuilder().parse(is); logger.debug("Document: " + doc); if (doc == null) { logger.error("Document is null"); throw new WPSClientException("Error in check input stream: Document is null"); } if (getFirstElementNode(doc.getFirstChild()).getLocalName().equals("ExceptionReport") && getFirstElementNode(doc.getFirstChild()).getNamespaceURI().equals(OGC_OWS_URI)) { try { ExceptionReportDocument exceptionDoc = ExceptionReportDocument.Factory.parse(doc); logger.debug(exceptionDoc.xmlText(options)); throw new WPSClientException("Error occured while executing query", exceptionDoc); } catch (XmlException e) { throw new WPSClientException("Error while parsing ExceptionReport retrieved from server", e); } } else { logger.debug("No Exception Report"); } return doc; } catch (SAXException e) { logger.error("Error while parsing input: " + e.getLocalizedMessage()); e.printStackTrace(); throw new WPSClientException("Error while parsing input", e); } catch (IOException e) { logger.error("Error occured while transfer: " + e.getLocalizedMessage()); e.printStackTrace(); throw new WPSClientException("Error occured while transfer", e); } catch (ParserConfigurationException e) { logger.error("Error occured, parser is not correctly configured: " + e.getLocalizedMessage()); e.printStackTrace(); throw new WPSClientException("Error occured, parser is not correctly configured", e); } catch (WPSClientException e) { throw e; } } private Node getFirstElementNode(Node node) { if (node == null) { return null; } if (node.getNodeType() == Node.ELEMENT_NODE) { return node; } else { return getFirstElementNode(node.getNextSibling()); } } /** * either an ExecuteResponseDocument or an InputStream if asked for RawData * or an Exception Report * * @param url * @param doc * @param rawData * @return * @throws WPSClientException */ private Object retrieveExecuteResponseViaPOST(String url, ExecuteDocument doc, boolean rawData) throws WPSClientException { InputStream is = retrieveDataViaPOST(doc, url); if (rawData) { return is; } Document documentObj = checkInputStream(is); ExceptionReportDocument erDoc = null; try { return ExecuteResponseDocument.Factory.parse(documentObj); } catch (XmlException e) { try { erDoc = ExceptionReportDocument.Factory.parse(documentObj); } catch (XmlException e1) { throw new WPSClientException("Error occured while parsing executeResponse", e); } return erDoc; } } public String[] getProcessNames(String url) throws IOException { ProcessDescriptionType[] processes = getProcessDescriptionsFromCache(url).getProcessDescriptions() .getProcessDescriptionArray(); String[] processNames = new String[processes.length]; for (int i = 0; i < processNames.length; i++) { processNames[i] = processes[i].getIdentifier().getStringValue(); } return processNames; } /** * Executes a process at a WPS * * * @param urlString * url of server not the entry additionally defined in the caps. * @param executeAsGETString * KVP Execute request * @return either an ExecuteResponseDocument or an InputStream if asked for * RawData or an Exception Report * @throws WPSClientException * WSPClientException */ public Object executeViaGET(String urlString, String executeAsGETString) throws WPSClientException { urlString = urlString + executeAsGETString; logger.debug("ExecuteViaGet() Url: " + urlString); try { // TODO String authString = user + ":" + password; logger.debug("auth string: " + authString); byte[] authEncBytes = Base64.encodeBase64(authString.getBytes()); String encoded = new String(authEncBytes); logger.debug("Base64 encoded auth string: " + encoded); URL url = new URL(urlString); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("GET"); conn.setRequestProperty("Authorization", "Basic " + encoded); conn.setDoOutput(true); InputStream is = conn.getInputStream(); if (executeAsGETString.toUpperCase().contains("RAWDATA")) { logger.debug("ExecuteAsGETString as RAWDATA"); return is; } Document doc = checkInputStream(is); ExceptionReportDocument erDoc = null; logger.debug("ExecuteAsGETString as Document"); try { return ExecuteResponseDocument.Factory.parse(doc); } catch (XmlException e) { e.printStackTrace(); try { erDoc = ExceptionReportDocument.Factory.parse(doc); } catch (XmlException e1) { e1.printStackTrace(); throw new WPSClientException("Error occured while parsing executeResponse", e); } throw new WPSClientException("Error occured while parsing executeResponse", erDoc); } } catch (MalformedURLException e) { e.printStackTrace(); throw new WPSClientException("Capabilities URL seems to be unvalid: " + urlString, e); } catch (IOException e) { e.printStackTrace(); throw new WPSClientException("Error occured while retrieving capabilities from url: " + urlString, e); } } public String cancelComputation(String url, String computationId) throws WPSClientException { try { String authString = user + ":" + password; logger.debug("auth string: " + authString); byte[] authEncBytes = Base64.encodeBase64(authString.getBytes()); String encoded = new String(authEncBytes); logger.debug("Base64 encoded auth string: " + encoded); url += "?id=" + computationId; URL urlObj = new URL(url); HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection(); connection.setRequestMethod("GET"); connection.setDoOutput(true); connection.setRequestProperty("Authorization", "Basic " + encoded); String responseMessage = connection.getResponseMessage(); return responseMessage; } catch (MalformedURLException e) { e.printStackTrace(); throw new WPSClientException("Capabilities URL seems to be unvalid: " + url, e); } catch (IOException e) { e.printStackTrace(); throw new WPSClientException("Error occured while retrieving capabilities from url: " + url, e); } } }