
754 lines
25 KiB
Raw Normal View History

package org.gcube.portlets.user.dataminermanager.server.dmservice.wps;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import net.opengis.ows.x11.ExceptionReportDocument;
import net.opengis.ows.x11.OperationDocument.Operation;
import net.opengis.wps.x100.CapabilitiesDocument;
import net.opengis.wps.x100.ExecuteDocument;
import net.opengis.wps.x100.ExecuteResponseDocument;
import net.opengis.wps.x100.ProcessBriefType;
import net.opengis.wps.x100.ProcessDescriptionType;
import net.opengis.wps.x100.ProcessDescriptionsDocument;
import org.apache.commons.codec.binary.Base64;
import org.apache.xmlbeans.XmlException;
import org.apache.xmlbeans.XmlObject;
import org.apache.xmlbeans.XmlOptions;
import org.n52.wps.client.ClientCapabiltiesRequest;
import org.n52.wps.client.WPSClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.xml.sax.SAXException;
* @author Giancarlo Panichi
public class SClient4WPSSession implements Serializable {
private static final long serialVersionUID = -1387670579312851370L;
private static Logger logger = LoggerFactory.getLogger(SClient4WPSSession.class);
private static final String OGC_OWS_URI = "";
private static String SUPPORTED_VERSION = "1.0.0";
// private static StatWPSClientSession session;
private HashMap<String, CapabilitiesDocument> loggedServices;
private XmlOptions options = null;
// a Map of <url, all available process descriptions>
public HashMap<String, ProcessDescriptionsDocument> processDescriptions;
private String user;
private String password;
* Initializes a WPS client session.
* @param user
* user
* @param password
* password
public SClient4WPSSession(String user, String password) {
logger.debug("Create SClient4WPSSession: [user=" + user + ", password=" + password + "]");
this.user = user;
this.password = password;
options = new XmlOptions();
loggedServices = new HashMap<String, CapabilitiesDocument>();
processDescriptions = new HashMap<String, ProcessDescriptionsDocument>();
* Connects to a WPS and retrieves Capabilities plus puts all available
* Descriptions into cache.
* @param url
* the entry point for the service. This is used as id for
* further identification of the service.
* @return true, if connect succeeded, false else.
* @throws WPSClientException
* wps client exception
public boolean connect(String url) throws WPSClientException {"CONNECT: " + url);
logger.debug("LoggedSevices: " + loggedServices.keySet());
if (loggedServices.containsKey(url)) {
logger.debug("Service already registered: " + url);
return false;
logger.debug("Service not registered");
CapabilitiesDocument capsDoc = retrieveCapsViaGET(url);
if (capsDoc != null) {
logger.debug("Adding caps to logged services " + url);
loggedServices.put(url, capsDoc);
logger.debug("Logged Services key: " + loggedServices.keySet());
} else {
logger.error("CapsDoc is null!");
ProcessDescriptionsDocument processDescs = describeAllProcesses(url);
if (processDescs != null && capsDoc != null) {
logger.debug("Adding processes descriptions to logged services " + url);
processDescriptions.put(url, processDescs);
logger.debug("ProcessDescriptions key: " + processDescriptions.keySet());
return true;
} else {
logger.error("ProcessDescs is null!");
logger.warn("retrieving caps failed, caps are null");
return false;
* Connects to a WPS and retrieves Capabilities plus puts all available
* Descriptions into cache.
* @param url
* the entry point for the service. This is used as id for
* further identification of the service.
* @return true, if connect succeeded, false else.
* @throws WPSClientException
* wps client exception
public boolean connectForMonitoring(String url) throws WPSClientException {
if (loggedServices.containsKey(url)) {
logger.debug("Service already registered: " + url);
return false;
logger.warn("retrieving caps failed, caps are null");
return false;
* removes a service from the session
* @param url
* url
public void disconnect(String url) {
* if (loggedServices.containsKey(url)) { loggedServices.remove(url);
* processDescriptions.remove(url);
*"service removed successfully: " + url); }
* returns the serverIDs of all loggedServices
* @return list of service
public List<String> getLoggedServices() {
if (loggedServices != null && loggedServices.keySet() != null) {
return new ArrayList<String>(loggedServices.keySet());
} else {
return new ArrayList<String>();
* informs you if the descriptions for the specified service is already in
* the session. in normal case it should return true :)
* @param serverID
* server id
* @return success if contain descriptions in cache
public boolean descriptionsAvailableInCache(String serverID) {
return processDescriptions.containsKey(serverID);
* returns the cached processdescriptions of a service.
* @param wpsUrl
* wps url
* @return process description document
* @throws IOException
* IO exception
private ProcessDescriptionsDocument getProcessDescriptionsFromCache(String wpsUrl) throws IOException {
if (!descriptionsAvailableInCache(wpsUrl)) {
try {
} catch (WPSClientException e) {
throw new IOException("Could not initialize WPS " + wpsUrl);
return processDescriptions.get(wpsUrl);
* return the processDescription for a specific process from Cache.
* @param serverID
* server id
* @param processID
* process id
* @return a ProcessDescription for a specific process from Cache.
* @throws IOException
* IO exception
public ProcessDescriptionType getProcessDescription(String serverID, String processID) throws IOException {
ProcessDescriptionType[] processes = getProcessDescriptionsFromCache(serverID).getProcessDescriptions()
for (ProcessDescriptionType process : processes) {
if (process.getIdentifier().getStringValue().equals(processID)) {
return process;
return null;
* Delivers all ProcessDescriptions from a WPS
* @param wpsUrl
* the URL of the WPS
* @return An Array of ProcessDescriptions
* @throws IOException
* IO exception
public ProcessDescriptionType[] getAllProcessDescriptions(String wpsUrl) throws IOException {
return getProcessDescriptionsFromCache(wpsUrl).getProcessDescriptions().getProcessDescriptionArray();
* looks up, if the service exists already in session.
* @param serverID
* service id
* @return true if service is registered
public boolean serviceAlreadyRegistered(String serverID) {
return loggedServices.containsKey(serverID);
* provides you the cached capabilities for a specified service.
* @param url
* service url
* @return capabilities document
public CapabilitiesDocument getWPSCaps(String url) {
return loggedServices.get(url);
* retrieves all current available ProcessDescriptions of a WPS. Mention: to
* get the current list of all processes, which will be requested, the
* cached capabilities will be used. Please keep that in mind. the retrieved
* descriptions will not be cached, so only transient information!
* @param url
* service url
* @return process descriptions document
* @throws WPSClientException
* WPS Client expcetion
public ProcessDescriptionsDocument describeAllProcesses(String url) throws WPSClientException {
CapabilitiesDocument doc = loggedServices.get(url);
if (doc == null) {
logger.warn("serviceCaps are null, perhaps server does not exist");
return null;
ProcessBriefType[] processes = doc.getCapabilities().getProcessOfferings().getProcessArray();
String[] processIDs = new String[processes.length];
for (int i = 0; i < processIDs.length; i++) {
processIDs[i] = processes[i].getIdentifier().getStringValue();
return describeProcess(processIDs, url);
* retrieves the desired description for a service. the retrieved
* information will not be held in cache!
* @param processIDs
* process ids
* @param serverID
* server id
* @return process description document
* @throws WPSClientException
* WPS Client expcetion
public ProcessDescriptionsDocument describeProcess(String[] processIDs, String serverID) throws WPSClientException {
CapabilitiesDocument caps = this.loggedServices.get(serverID);
Operation[] operations = caps.getCapabilities().getOperationsMetadata().getOperationArray();
String url = null;
for (Operation operation : operations) {
if (operation.getName().equals("DescribeProcess")) {
url = operation.getDCPArray()[0].getHTTP().getGetArray()[0].getHref();
if (url == null) {
throw new WPSClientException("Missing DescribeOperation in Capabilities");
return retrieveDescriptionViaGET(processIDs, url);
* Executes a process at a WPS
* @param url
* url of server not the entry additionally defined in the caps.
* @param execute
* Execute document
* @return either an ExecuteResponseDocument or an InputStream if asked for
* RawData or an Exception Report
* @param serverID
* server id
* @param execute
* execute document
* @param rawData
* true if raw data
* @return Object
* @throws WPSClientException
* WPS Client exception
private Object execute(String serverID, ExecuteDocument execute, boolean rawData) throws WPSClientException {
CapabilitiesDocument caps = loggedServices.get(serverID);
Operation[] operations = caps.getCapabilities().getOperationsMetadata().getOperationArray();
String url = null;
for (Operation operation : operations) {
if (operation.getName().equals("Execute")) {
url = operation.getDCPArray()[0].getHTTP().getPostArray()[0].getHref();
if (url == null) {
throw new WPSClientException(
"Caps does not contain any information about the entry point for process execution");
return retrieveExecuteResponseViaPOST(url, execute, rawData);
* Executes a process at a WPS
* @param serverId
* server id url of server not the entry additionally defined in
* the caps.
* @param execute
* Execute document
* @return either an ExecuteResponseDocument or an InputStream if asked for
* RawData or an Exception Report
* @throws WPSClientException
* WPS Client Exception
public Object execute(String serverId, ExecuteDocument execute) throws WPSClientException {
if (execute.getExecute().isSetResponseForm() == true && execute.getExecute().isSetResponseForm() == true
&& execute.getExecute().getResponseForm().isSetRawDataOutput() == true) {
return execute(serverId, execute, true);
} else {
return execute(serverId, execute, false);
private CapabilitiesDocument retrieveCapsViaGET(String url) throws WPSClientException {
logger.debug("retrieveCapsViaGET: " + url);
ClientCapabiltiesRequest req = new ClientCapabiltiesRequest();
url = req.getRequest(url);
try {
String authString = user + ":" + password;
logger.debug("auth string: " + authString);
byte[] authEncBytes = Base64.encodeBase64(authString.getBytes());
String encoded = new String(authEncBytes);
logger.debug("Base64 encoded auth string: " + encoded);
URL urlObj = new URL(url);
HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection();
connection.setRequestProperty("Authorization", "Basic " + encoded);
InputStream is = connection.getInputStream();
Document doc = checkInputStream(is);
CapabilitiesDocument capabilitiesDocument = CapabilitiesDocument.Factory.parse(doc, options);
return capabilitiesDocument;
} catch (MalformedURLException e) {
throw new WPSClientException("Capabilities URL seems to be unvalid: " + url, e);
} catch (IOException e) {
throw new WPSClientException("Error occured while retrieving capabilities from url: " + url, e);
} catch (XmlException e) {
throw new WPSClientException("Error occured while parsing XML", e);
private ProcessDescriptionsDocument retrieveDescriptionViaGET(String[] processIDs, String url)
throws WPSClientException {
try {
logger.debug("RetrieveDescription GET: " + processIDs + " url:" + url);
Path tempFile = Files.createTempFile("WPSProcessDescriptions", "txt");
List<String> lines = new ArrayList<>();
lines.add("<wps:ProcessDescriptions xmlns:wps=\"\" "
+ "xmlns:xsi=\"\" "
+ "xmlns:ows=\"\" "
+ "xsi:schemaLocation=\" "
+ "\" " + "xml:lang=\"en-US\" "
+ "service=\"WPS\" version=\"1.0.0\">");
Files.write(tempFile, lines, Charset.defaultCharset(), StandardOpenOption.APPEND);
for (String processId : processIDs) {
String[] process = { processId };
DClientDescribeProcessRequest req = new DClientDescribeProcessRequest();
String requestURL = req.getRequest(url);
String authString = user + ":" + password;
logger.debug("auth string: " + authString);
byte[] authEncBytes = Base64.encodeBase64(authString.getBytes());
String encoded = new String(authEncBytes);
logger.debug("Base64 encoded auth string: " + encoded);
URL urlObj = new URL(requestURL);
HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection();
connection.setRequestProperty("Authorization", "Basic " + encoded);
InputStream is = connection.getInputStream();
lines = retrievesSingleDescription(is);
Files.write(tempFile, lines, Charset.defaultCharset(), StandardOpenOption.APPEND);
lines = new ArrayList<>();
Files.write(tempFile, lines, Charset.defaultCharset(), StandardOpenOption.APPEND);
Document doc = null;
try (InputStream inputStream = Files.newInputStream(tempFile, StandardOpenOption.READ)) {
doc = checkInputStream(inputStream);
ProcessDescriptionsDocument processDescriptionsDocument = ProcessDescriptionsDocument.Factory.parse(doc,
return processDescriptionsDocument;
} catch (MalformedURLException e) {
logger.error("URL seems not to be valid");
throw new WPSClientException("URL seems not to be valid", e);
} catch (IOException e) {
logger.error("Error occured while receiving data");
throw new WPSClientException("Error occured while receiving data", e);
} catch (XmlException e) {
logger.error("Error occured while parsing ProcessDescription document");
throw new WPSClientException("Error occured while parsing ProcessDescription document", e);
} catch (Throwable e) {
throw new WPSClientException(e.getLocalizedMessage(), new Exception(e));
protected List<String> retrievesSingleDescription(InputStream is) throws WPSClientException {
try {
BufferedReader br = new BufferedReader(new InputStreamReader(is));
List<String> lines = new ArrayList<>();
String line = null;
boolean elementProcessDescriptionsFound = false;
boolean elementProcessDescriptionsClosureFound = false;
while ((line = br.readLine()) != null) {
if (elementProcessDescriptionsFound) {
if (elementProcessDescriptionsClosureFound) {
if (line.contains("</wps:ProcessDescriptions>")) {
} else {
} else {
int closeIndex = line.indexOf(">");
if (closeIndex != -1) {
elementProcessDescriptionsClosureFound = true;
if (closeIndex == line.length() - 1) {
} else {
} else {
if (line.contains("<wps:ProcessDescriptions")) {
elementProcessDescriptionsFound = true;
int closeIndex = line.indexOf(">");
if (closeIndex != -1) {
elementProcessDescriptionsClosureFound = true;
if (closeIndex == line.length() - 1) {
} else {
} else {
return lines;
} catch (Throwable e) {
throw new WPSClientException(e.getLocalizedMessage(), new Exception(e));
private InputStream retrieveDataViaPOST(XmlObject obj, String urlString) throws WPSClientException {
try {
logger.debug("RetrieveDataViaPost(): " + urlString);
String authString = user + ":" + password;
logger.debug("auth string: " + authString);
byte[] authEncBytes = Base64.encodeBase64(authString.getBytes());
String encoded = new String(authEncBytes);
logger.debug("Base64 encoded auth string: " + encoded);
URL url = new URL(urlString);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty("Authorization", "Basic " + encoded);
conn.setRequestProperty("Accept-Encoding", "gzip");
conn.setRequestProperty("Content-Type", "text/xml; charset=UTF-8");
InputStream input = null;
String encoding = conn.getContentEncoding();
if (encoding != null && encoding.equalsIgnoreCase("gzip")) {
logger.debug("Read encoding GZIP");
input = new GZIPInputStream(conn.getInputStream());
} else {
logger.debug("Read encoding: "+encoding);
input = conn.getInputStream();
return input;
} catch (MalformedURLException e) {
throw new WPSClientException("URL seems to be unvalid", e);
} catch (IOException e) {
throw new WPSClientException("Error while transmission", e);
private Document checkInputStream(InputStream is) throws WPSClientException {
try {
DocumentBuilderFactory fac = DocumentBuilderFactory.newInstance();
Document doc = fac.newDocumentBuilder().parse(is);
logger.debug("Document: " + doc);
if (doc == null) {
logger.error("Document is null");
throw new WPSClientException("Error in check input stream: Document is null");
if (getFirstElementNode(doc.getFirstChild()).getLocalName().equals("ExceptionReport")
&& getFirstElementNode(doc.getFirstChild()).getNamespaceURI().equals(OGC_OWS_URI)) {
try {
ExceptionReportDocument exceptionDoc = ExceptionReportDocument.Factory.parse(doc);
throw new WPSClientException("Error occured while executing query", exceptionDoc);
} catch (XmlException e) {
throw new WPSClientException("Error while parsing ExceptionReport retrieved from server", e);
} else {
logger.debug("No Exception Report");
return doc;
} catch (SAXException e) {
logger.error("Error while parsing input: " + e.getLocalizedMessage());
throw new WPSClientException("Error while parsing input", e);
} catch (IOException e) {
logger.error("Error occured while transfer: " + e.getLocalizedMessage());
throw new WPSClientException("Error occured while transfer", e);
} catch (ParserConfigurationException e) {
logger.error("Error occured, parser is not correctly configured: " + e.getLocalizedMessage());
throw new WPSClientException("Error occured, parser is not correctly configured", e);
} catch (WPSClientException e) {
throw e;
private Node getFirstElementNode(Node node) {
if (node == null) {
return null;
if (node.getNodeType() == Node.ELEMENT_NODE) {
return node;
} else {
return getFirstElementNode(node.getNextSibling());
private Object retrieveExecuteResponseViaPOST(String url, ExecuteDocument doc, boolean rawData)
throws WPSClientException {
InputStream is = retrieveDataViaPOST(doc, url);
if (rawData) {
return is;
Document documentObj = checkInputStream(is);
ExceptionReportDocument erDoc = null;
try {
return ExecuteResponseDocument.Factory.parse(documentObj);
} catch (XmlException e) {
try {
erDoc = ExceptionReportDocument.Factory.parse(documentObj);
} catch (XmlException e1) {
throw new WPSClientException("Error occured while parsing executeResponse", e);
return erDoc;
public String[] getProcessNames(String url) throws IOException {
ProcessDescriptionType[] processes = getProcessDescriptionsFromCache(url).getProcessDescriptions()
String[] processNames = new String[processes.length];
for (int i = 0; i < processNames.length; i++) {
processNames[i] = processes[i].getIdentifier().getStringValue();
return processNames;
* Executes a process at a WPS
* @param urlString
* url of server not the entry additionally defined in the caps.
* @param executeAsGETString
* KVP Execute request
* @return either an ExecuteResponseDocument or an InputStream if asked for
* RawData or an Exception Report
* @throws WPSClientException
* WPS Client exception
public Object executeViaGET(String urlString, String executeAsGETString) throws WPSClientException {
urlString = urlString + executeAsGETString;
logger.debug("ExecuteViaGet() Url: " + urlString);
try {
String authString = user + ":" + password;
logger.debug("auth string: " + authString);
byte[] authEncBytes = Base64.encodeBase64(authString.getBytes());
String encoded = new String(authEncBytes);
logger.debug("Base64 encoded auth string: " + encoded);
URL url = new URL(urlString);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty("Authorization", "Basic " + encoded);
InputStream is = conn.getInputStream();
if (executeAsGETString.toUpperCase().contains("RAWDATA")) {
logger.debug("ExecuteAsGETString as RAWDATA");
return is;
Document doc = checkInputStream(is);
ExceptionReportDocument erDoc = null;
logger.debug("ExecuteAsGETString as Document");
try {
return ExecuteResponseDocument.Factory.parse(doc);
} catch (XmlException e) {
try {
erDoc = ExceptionReportDocument.Factory.parse(doc);
} catch (XmlException e1) {
throw new WPSClientException("Error occured while parsing executeResponse", e);
throw new WPSClientException("Error occured while parsing executeResponse", erDoc);
} catch (MalformedURLException e) {
throw new WPSClientException("Capabilities URL seems to be unvalid: " + urlString, e);
} catch (IOException e) {
throw new WPSClientException("Error occured while retrieving capabilities from url: " + urlString, e);
public String cancelComputation(String url, String computationId) throws WPSClientException {
try {
String authString = user + ":" + password;
logger.debug("auth string: " + authString);
byte[] authEncBytes = Base64.encodeBase64(authString.getBytes());
String encoded = new String(authEncBytes);
logger.debug("Base64 encoded auth string: " + encoded);
url += "?id=" + computationId;
URL urlObj = new URL(url);
HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection();
connection.setRequestProperty("Authorization", "Basic " + encoded);
String responseMessage = connection.getResponseMessage();
return responseMessage;
} catch (MalformedURLException e) {
throw new WPSClientException("Capabilities URL seems to be unvalid: " + url, e);
} catch (IOException e) {
throw new WPSClientException("Error occured while retrieving capabilities from url: " + url, e);