This commit is contained in:
Lucio Lelii 2017-07-11 15:42:42 +00:00
parent dbfa5fbffd
commit ab1032edf3
6 changed files with 1253 additions and 4 deletions

View File

@ -0,0 +1,836 @@
package org.gcube.data.analysis.wps;
/**
* Handles an ExecuteRequest
*/
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.xml.parsers.DocumentBuilderFactory;
import net.opengis.ows.x11.BoundingBoxType;
import net.opengis.ows.x11.ExceptionType;
import net.opengis.wps.x100.ComplexDataType;
import net.opengis.wps.x100.DataInputsType;
import net.opengis.wps.x100.DocumentOutputDefinitionType;
import net.opengis.wps.x100.ExecuteDocument;
import net.opengis.wps.x100.ExecuteDocument.Execute;
import net.opengis.wps.x100.InputDescriptionType;
import net.opengis.wps.x100.InputReferenceType;
import net.opengis.wps.x100.InputType;
import net.opengis.wps.x100.LiteralDataType;
import net.opengis.wps.x100.OutputDefinitionType;
import net.opengis.wps.x100.OutputDescriptionType;
import net.opengis.wps.x100.ProcessDescriptionType;
import net.opengis.wps.x100.ResponseDocumentType;
import net.opengis.wps.x100.ResponseFormType;
import net.opengis.wps.x100.StatusType;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.io.IOUtils;
import org.apache.xmlbeans.XmlException;
import org.apache.xmlbeans.XmlObject;
import org.apache.xmlbeans.XmlOptions;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.AbstractEcologicalEngineMapper;
import org.n52.wps.commons.context.ExecutionContext;
import org.n52.wps.commons.context.ExecutionContextFactory;
import org.n52.wps.io.data.IComplexData;
import org.n52.wps.io.data.IData;
import org.n52.wps.server.AbstractTransactionalAlgorithm;
import org.n52.wps.server.ExceptionReport;
import org.n52.wps.server.IAlgorithm;
import org.n52.wps.server.RepositoryManager;
import org.n52.wps.server.database.DatabaseFactory;
import org.n52.wps.server.observerpattern.IObserver;
import org.n52.wps.server.observerpattern.ISubject;
import org.n52.wps.server.request.InputHandler;
import org.n52.wps.server.request.Request;
import org.n52.wps.server.response.Response;
import org.n52.wps.util.XMLBeansHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
public class ExecuteRequest extends Request implements IObserver {
private static Logger LOGGER = LoggerFactory.getLogger(ExecuteRequest.class);
private ExecuteDocument execDom;
private Map<String, IData> returnResults;
private ExecuteResponseBuilder execRespType;
/**
* Creates an ExecuteRequest based on a Document (HTTP_POST)
*
* @param doc
* The clients submission
* @throws ExceptionReport
*/
public ExecuteRequest(Document doc) throws ExceptionReport {
super(doc);
initWpsID();
try {
LOGGER.debug("Preparing the ExecuteRequest for POST");
XmlOptions option = new XmlOptions();
option.setLoadTrimTextBuffer();
LOGGER.debug("Parsing document");
this.execDom = ExecuteDocument.Factory.parse(doc, option);
if (this.execDom == null) {
LOGGER.error("ExecuteDocument is null");
LOGGER.debug("EXCEPTION ExecuteDocument is null");
throw new ExceptionReport("Error while parsing post data", ExceptionReport.MISSING_PARAMETER_VALUE);
}
} catch (XmlException e) {
LOGGER.debug("EXCEPTION Error while parsing post data" + ExceptionReport.MISSING_PARAMETER_VALUE + e.getLocalizedMessage());
throw new ExceptionReport("Error while parsing post data", ExceptionReport.MISSING_PARAMETER_VALUE, e);
}
LOGGER.debug("Validating document");
// validate the client input
// validate();
LOGGER.debug("Document OK");
// create an initial response
execRespType = new ExecuteResponseBuilder(this);
LOGGER.debug("Response Builder Ready");
storeRequest(execDom);
LOGGER.debug("Request Stored");
}
/*
* Creates an ExecuteRequest based on a Map (HTTP_GET). NOTE: Parameters are
* treated as non case sensitive. @param ciMap The client input @throws
* ExceptionReport
*/
public ExecuteRequest(CaseInsensitiveMap ciMap) throws ExceptionReport {
super(ciMap);
initWpsID();
initForGET(ciMap);
// validate the client input
validate();
// create an initial response
execRespType = new ExecuteResponseBuilder(this);
storeRequest(ciMap);
}
public void getKVPDataInputs() {
}
/**
* @param ciMap
*/
private void initForGET(CaseInsensitiveMap ciMap) throws ExceptionReport {
String version = getMapValue("version", ciMap, true);
if (!version.equals(Request.SUPPORTED_VERSION)) {
throw new ExceptionReport("request version is not supported: " + version, ExceptionReport.VERSION_NEGOTIATION_FAILED);
}
this.execDom = ExecuteDocument.Factory.newInstance();
Execute execute = execDom.addNewExecute();
String processID = getMapValue("Identifier", true);
if (!RepositoryManager.getInstance().containsAlgorithm(processID)) {
throw new ExceptionReport("Process does not exist", ExceptionReport.INVALID_PARAMETER_VALUE);
}
execute.addNewIdentifier().setStringValue(processID);
DataInputsType dataInputs = execute.addNewDataInputs();
String dataInputString = getMapValue("DataInputs", true);
dataInputString = dataInputString.replace("&amp;", "&");
String[] inputs = dataInputString.split(";");
// Handle data inputs
for (String inputString : inputs) {
int position = inputString.indexOf("=");
if (position == -1) {
throw new ExceptionReport("No \"=\" supplied for attribute: " + inputString, ExceptionReport.MISSING_PARAMETER_VALUE);
}
// get name
String key = inputString.substring(0, position);
String value = null;
if (key.length() + 1 < inputString.length()) {
// BS int valueDelimiter = inputString.indexOf("@");
int valueDelimiter = inputString.indexOf("@");
if (valueDelimiter != -1 && position + 1 < valueDelimiter) {
value = inputString.substring(position + 1, valueDelimiter);
} else {
value = inputString.substring(position + 1);
}
}
ProcessDescriptionType description = RepositoryManager.getInstance().getProcessDescription(processID);
if (description == null) {
throw new ExceptionReport("Data Identifier not supported: " + key, ExceptionReport.MISSING_PARAMETER_VALUE);
}
InputDescriptionType inputDesc = XMLBeansHelper.findInputByID(key, description.getDataInputs());
if (inputDesc == null) {
throw new ExceptionReport("Data Identifier not supported: " + key, ExceptionReport.MISSING_PARAMETER_VALUE);
}
InputType input = dataInputs.addNewInput();
input.addNewIdentifier().setStringValue(key);
// prepare attributes
String encodingAttribute = null;
String mimeTypeAttribute = null;
String schemaAttribute = null;
String hrefAttribute = null;
String uom = null;
String dataType = null;
String[] inputItemstemp = inputString.split("@");
String[] inputItems = null;
if (inputItemstemp.length == 2) {
inputItems = inputItemstemp[1].split("@");
} else {
inputItems = inputString.split("@");
}
if (inputItemstemp.length > 1) {
for (int i = 0; i < inputItems.length; i++) {
int attributePos = inputItems[i].indexOf("=");
if (attributePos == -1 || attributePos + 1 >= inputItems[i].length()) {
continue;
}
String attributeName = inputItems[i].substring(0, attributePos);
String attributeValue = inputItems[i].substring(attributePos + 1);
// attribute is input name
if (attributeName.equals(key)) {
continue;
}
try {
attributeValue = URLDecoder.decode(attributeValue, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new ExceptionReport("Something went wrong while trying to decode value of " + attributeName, ExceptionReport.NO_APPLICABLE_CODE, e);
}
if (attributeName.equalsIgnoreCase("encoding")) {
encodingAttribute = attributeValue;
} else if (attributeName.equalsIgnoreCase("mimeType")) {
mimeTypeAttribute = attributeValue;
} else if (attributeName.equalsIgnoreCase("schema")) {
schemaAttribute = attributeValue;
} else if (attributeName.equalsIgnoreCase("href") | attributeName.equalsIgnoreCase("xlink:href")) {
hrefAttribute = attributeValue;
} else if (attributeName.equalsIgnoreCase("uom")) {
uom = attributeValue;
} else if (attributeName.equalsIgnoreCase("datatype")) {
dataType = attributeValue;
} else {
throw new ExceptionReport("Attribute is not supported: " + attributeName, ExceptionReport.INVALID_PARAMETER_VALUE);
}
}
}
if (inputDesc.isSetComplexData()) {
// TODO: check for different attributes
// handling ComplexReference
if (!(hrefAttribute == null) && !hrefAttribute.equals("")) {
InputReferenceType reference = input.addNewReference();
reference.setHref(hrefAttribute);
if (schemaAttribute != null) {
reference.setSchema(schemaAttribute);
}
if (mimeTypeAttribute != null) {
reference.setMimeType(mimeTypeAttribute);
}
if (encodingAttribute != null) {
reference.setEncoding(encodingAttribute);
}
}
// Handling ComplexData
else {
ComplexDataType data = input.addNewData().addNewComplexData();
InputStream stream = new ByteArrayInputStream(value.getBytes());
try {
data.set(XmlObject.Factory.parse(stream));
} catch (Exception e) {
LOGGER.warn("Could not parse value: " + value + " as XMLObject. Trying to create text node.");
try {
Node textNode = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument().createTextNode(value);
data.set(XmlObject.Factory.parse(textNode));
} catch (Exception e1) {
throw new ExceptionReport("Exception while trying to parse value: " + value, ExceptionReport.NO_APPLICABLE_CODE, e1);
}
}
if (schemaAttribute != null) {
data.setSchema(schemaAttribute);
}
if (mimeTypeAttribute != null) {
data.setMimeType(mimeTypeAttribute);
}
if (encodingAttribute != null) {
data.setEncoding(encodingAttribute);
}
}
} else if (inputDesc.isSetLiteralData()) {
LiteralDataType data = input.addNewData().addNewLiteralData();
if (value == null) {
throw new ExceptionReport("No value provided for literal: " + inputDesc.getIdentifier().getStringValue(), ExceptionReport.MISSING_PARAMETER_VALUE);
}
data.setStringValue(value);
if (uom != null) {
data.setUom(uom);
}
if (dataType != null) {
data.setDataType(dataType);
}
} else if (inputDesc.isSetBoundingBoxData()) {
BoundingBoxType data = input.addNewData().addNewBoundingBoxData();
String[] values = value.split(",");
if (values.length < 4) {
throw new ExceptionReport("Invalid Number of BBOX Values: " + inputDesc.getIdentifier().getStringValue(), ExceptionReport.MISSING_PARAMETER_VALUE);
}
List<String> lowerCorner = new ArrayList<String>();
lowerCorner.add(values[0]);
lowerCorner.add(values[1]);
data.setLowerCorner(lowerCorner);
List<String> upperCorner = new ArrayList<String>();
upperCorner.add(values[2]);
upperCorner.add(values[3]);
data.setUpperCorner(upperCorner);
if (values.length > 4) {
data.setCrs(values[4]);
}
if (values.length > 5) {
data.setDimensions(BigInteger.valueOf(Long.valueOf(values[5])));
}
}
}
// retrieve status
boolean status = false;
String statusString = getMapValue("status", false);
LOGGER.debug("Incoming Status Request: "+statusString);
if (statusString != null) {
status = Boolean.parseBoolean(statusString);
}
boolean store = false;
String storeString = getMapValue("storeExecuteResponse", false);
LOGGER.debug("Incoming storeExecuteResponse Request: "+storeString);
if (storeString != null) {
store = Boolean.parseBoolean(storeString);
}
// Handle ResponseDocument option
String responseDocument = getMapValue("ResponseDocument", false);
if (responseDocument != null) {
String[] outputs = responseDocument.split(";");
ResponseDocumentType responseDoc = execute.addNewResponseForm().addNewResponseDocument();
responseDoc.setStatus(status);
responseDoc.setStoreExecuteResponse(store);
for (String outputID : outputs) {
String[] outputDataparameters = outputID.split("@");
String outputDataInput = "";
if (outputDataparameters.length > 0) {
outputDataInput = outputDataparameters[0];
} else {
outputDataInput = outputID;
}
outputDataInput = outputDataInput.replace("=", "");
ProcessDescriptionType description = RepositoryManager.getInstance().getProcessDescription(processID);
OutputDescriptionType outputDesc = XMLBeansHelper.findOutputByID(outputDataInput, description.getProcessOutputs().getOutputArray());
if (outputDesc == null) {
throw new ExceptionReport("Data output Identifier not supported: " + outputDataInput, ExceptionReport.MISSING_PARAMETER_VALUE);
}
DocumentOutputDefinitionType output = responseDoc.addNewOutput();
output.addNewIdentifier().setStringValue(outputDataInput);
for (int i = 1; i < outputDataparameters.length; i++) {
int attributePos = outputDataparameters[i].indexOf("=");
if (attributePos == -1 || attributePos + 1 >= outputDataparameters[i].length()) {
continue;
}
String attributeName = outputDataparameters[i].substring(0, attributePos);
String attributeValue = outputDataparameters[i].substring(attributePos + 1);
try {
attributeValue = URLDecoder.decode(attributeValue, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new ExceptionReport("Something went wrong while trying to decode value of " + attributeName, ExceptionReport.NO_APPLICABLE_CODE, e);
}
if (attributeName.equalsIgnoreCase("mimeType")) {
output.setMimeType(attributeValue);
} else if (attributeName.equalsIgnoreCase("schema")) {
output.setSchema(attributeValue);
} else if (attributeName.equalsIgnoreCase("encoding")) {
output.setEncoding(attributeValue);
}
}
}
}
String rawData = getMapValue("RawDataOutput", false);
if (rawData != null) {
String[] rawDataparameters = rawData.split("@");
String rawDataInput = "";
if (rawDataparameters.length > 0) {
rawDataInput = rawDataparameters[0];
} else {
rawDataInput = rawData;
}
ProcessDescriptionType description = RepositoryManager.getInstance().getProcessDescription(processID);
OutputDescriptionType outputDesc = XMLBeansHelper.findOutputByID(rawDataInput, description.getProcessOutputs().getOutputArray());
if (outputDesc == null) {
throw new ExceptionReport("Data output Identifier not supported: " + rawData, ExceptionReport.MISSING_PARAMETER_VALUE);
}
ResponseFormType responseForm = execute.addNewResponseForm();
OutputDefinitionType output = responseForm.addNewRawDataOutput();
output.addNewIdentifier().setStringValue(outputDesc.getIdentifier().getStringValue());
if (rawDataparameters.length > 0) {
for (int i = 0; i < rawDataparameters.length; i++) {
int attributePos = rawDataparameters[i].indexOf("=");
if (attributePos == -1 || attributePos + 1 >= rawDataparameters[i].length()) {
continue;
}
String attributeName = rawDataparameters[i].substring(0, attributePos);
String attributeValue = rawDataparameters[i].substring(attributePos + 1);
try {
attributeValue = URLDecoder.decode(attributeValue, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new ExceptionReport("Something went wrong while trying to decode value of " + attributeName, ExceptionReport.NO_APPLICABLE_CODE, e);
}
if (attributeName.equalsIgnoreCase("mimeType")) {
output.setMimeType(attributeValue);
} else if (attributeName.equalsIgnoreCase("schema")) {
output.setSchema(attributeValue);
} else if (attributeName.equalsIgnoreCase("encoding")) {
output.setEncoding(attributeValue);
} else {
throw new ExceptionReport("Attribute is not supported: " + attributeName, ExceptionReport.INVALID_PARAMETER_VALUE);
}
}
}
}
}
/**
* Validates the client request
*
* @return True if the input is valid, False otherwise
*/
public boolean validate() throws ExceptionReport {
// Identifier must be specified.
/*
* Only for HTTP_GET: String identifier = getMapValue("identifier");
*
* try{ // Specifies if all complex valued output(s) of this process
* should be stored by process // as web-accessible resources store =
* getMapValue("store").equals("true"); // Specifies if Execute
* operation response shall be returned quickly with status information
* status = getMapValue("status").equals("true"); }catch(ExceptionReport
* e){ // if parameters "store" or "status" are not included, they
* default to false; } // just testing if the number of arguments is
* even... String[] diArray = getMapValue("DataInputs").split(",");
* if(diArray.length % 2 != 0) { throw new ExceptionReport("Incorrect
* number of arguments for parameter dataInputs, please only a even
* number of parameter values",
* ExceptionReport.INVALID_PARAMETER_VALUE); }
*/
if (!execDom.getExecute().getVersion().equals(SUPPORTED_VERSION)) {
throw new ExceptionReport("Specified version is not supported.", ExceptionReport.INVALID_PARAMETER_VALUE, "version=" + getExecute().getVersion());
}
// Fix for bug https://bugzilla.52north.org/show_bug.cgi?id=906
String identifier = getAlgorithmIdentifier();
if (identifier == null) {
throw new ExceptionReport("No process identifier supplied.", ExceptionReport.MISSING_PARAMETER_VALUE, "identifier");
}
// check if the algorithm is in our repository
if (!RepositoryManager.getInstance().containsAlgorithm(identifier)) {
throw new ExceptionReport("Specified process identifier does not exist", ExceptionReport.INVALID_PARAMETER_VALUE, "identifier=" + identifier);
}
// validate if the process can be executed
ProcessDescriptionType desc = RepositoryManager.getInstance().getProcessDescription(getAlgorithmIdentifier());
// We need a description of the inputs for the algorithm
if (desc == null) {
LOGGER.warn("desc == null");
return false;
}
// Get the inputdescriptions of the algorithm
if (desc.getDataInputs() != null) {
InputDescriptionType[] inputDescs = desc.getDataInputs().getInputArray();
// prevent NullPointerException for zero input values in execute
// request (if only default values are used)
InputType[] inputs;
if (getExecute().getDataInputs() == null)
inputs = new InputType[0];
else
inputs = getExecute().getDataInputs().getInputArray();
// For each input supplied by the client
for (InputType input : inputs) {
boolean identifierMatched = false;
// Try to match the input with one of the descriptions
for (InputDescriptionType inputDesc : inputDescs) {
// If found, then process:
if (inputDesc.getIdentifier().getStringValue().equals(input.getIdentifier().getStringValue())) {
identifierMatched = true;
// If it is a literal value,
if (input.getData() != null && input.getData().getLiteralData() != null) {
// then check if the desription is also of type
// literal
if (inputDesc.getLiteralData() == null) {
throw new ExceptionReport("Inputtype LiteralData is not supported", ExceptionReport.INVALID_PARAMETER_VALUE);
}
// literalValue.getDataType ist optional
if (input.getData().getLiteralData().getDataType() != null) {
if (inputDesc.getLiteralData() != null)
if (inputDesc.getLiteralData().getDataType() != null)
if (inputDesc.getLiteralData().getDataType().getReference() != null)
if (!input.getData().getLiteralData().getDataType().equals(inputDesc.getLiteralData().getDataType().getReference())) {
throw new ExceptionReport("Specified dataType is not supported " + input.getData().getLiteralData().getDataType() + " for input " + input.getIdentifier().getStringValue(), ExceptionReport.INVALID_PARAMETER_VALUE);
}
}
}
// Excluded, because ProcessDescription validation
// should be
// done on startup!
// else if (input.getComplexValue() != null) {
// if(ParserFactory.getInstance().getParser(input.getComplexValue().getSchema())
// == null) {
// LOGGER.warn("Request validation message: schema
// attribute
// null, so the simple one will be used!");
// }
// }
// else if (input.getComplexValueReference() != null) {
// // we found a complexvalue input, try to get the
// parser.
// if(ParserFactory.getInstance().getParser(input.getComplexValueReference().getSchema())
// == null) {
// LOGGER.warn("Request validation message: schema
// attribute
// null, so the simple one will be used!");
// }
// }
break;
}
}
// if the identifier did not match one of the descriptions, it
// is
// invalid
if (!identifierMatched) {
throw new ExceptionReport("Input Identifier is not valid: " + input.getIdentifier().getStringValue(), ExceptionReport.INVALID_PARAMETER_VALUE, "input identifier");
}
}
}
return true;
}
/**
* Actually serves the Request.
*
* @throws ExceptionReport
*/
String wpsid = null;
private void initWpsID(){
wpsid = getUniqueId().toString();
}
public Response call() throws ExceptionReport {
IAlgorithm algorithm = null;
Map<String, List<IData>> inputMap = null;
try {
ExecutionContext context;
if (getExecute().isSetResponseForm()) {
context = getExecute().getResponseForm().isSetRawDataOutput() ? new ExecutionContext(getExecute().getResponseForm().getRawDataOutput()) : new ExecutionContext(Arrays.asList(getExecute().getResponseForm().getResponseDocument().getOutputArray()));
} else {
context = new ExecutionContext();
}
// register so that any function that calls
// ExecuteContextFactory.getContext() gets the instance registered
// with this thread
ExecutionContextFactory.registerContext(context);
LOGGER.debug("started with execution");
updateStatusStarted();
// parse the input
InputType[] inputs = new InputType[0];
if (getExecute().getDataInputs() != null) {
inputs = getExecute().getDataInputs().getInputArray();
}
InputHandler parser = new InputHandler.Builder(inputs, getAlgorithmIdentifier()).build();
// we got so far:
// get the algorithm, and run it with the clients input
/*
* IAlgorithm algorithm =
* RepositoryManager.getInstance().getAlgorithm
* (getAlgorithmIdentifier()); returnResults =
* algorithm.run((Map)parser.getParsedInputLayers(),
* (Map)parser.getParsedInputParameters());
*/
algorithm = RepositoryManager.getInstance().getAlgorithm(getAlgorithmIdentifier());
if (algorithm instanceof ISubject) {
ISubject subject = (ISubject) algorithm;
subject.addObserver(this);
}
if (algorithm instanceof AbstractEcologicalEngineMapper) {
((AbstractEcologicalEngineMapper) algorithm).setWpsExternalID(wpsid);
}
if (algorithm instanceof AbstractTransactionalAlgorithm) {
returnResults = ((AbstractTransactionalAlgorithm) algorithm).run(execDom);
} else {
inputMap = parser.getParsedInputData();
returnResults = algorithm.run(inputMap);
}
List<String> errorList = algorithm.getErrors();
if (errorList != null && !errorList.isEmpty()) {
String errorMessage = errorList.get(0);
LOGGER.error("Error reported while handling ExecuteRequest for " + getAlgorithmIdentifier() + ": " + errorMessage);
updateStatusError(errorMessage);
} else {
updateStatusSuccess();
}
} catch (Throwable e) {
String errorMessage = null;
if (algorithm != null && algorithm.getErrors() != null && !algorithm.getErrors().isEmpty()) {
errorMessage = algorithm.getErrors().get(0);
}
if (errorMessage == null) {
errorMessage = e.toString();
}
if (errorMessage == null) {
errorMessage = "UNKNOWN ERROR";
}
LOGGER.error("Exception/Error while executing ExecuteRequest for " + getAlgorithmIdentifier() + ": " + errorMessage);
updateStatusError(errorMessage);
if (e instanceof Error) {
// This is required when catching Error
throw (Error) e;
}
if (e instanceof ExceptionReport) {
throw (ExceptionReport) e;
} else {
throw new ExceptionReport("Error while executing the embedded process for: " + getAlgorithmIdentifier(), ExceptionReport.NO_APPLICABLE_CODE, e);
}
} finally {
// you ***MUST*** call this or else you will have a PermGen
// ClassLoader memory leak due to ThreadLocal use
ExecutionContextFactory.unregisterContext();
if (algorithm instanceof ISubject) {
((ISubject) algorithm).removeObserver(this);
}
if (inputMap != null) {
for (List<IData> l : inputMap.values()) {
for (IData d : l) {
if (d instanceof IComplexData) {
((IComplexData) d).dispose();
}
}
}
}
if (returnResults != null) {
for (IData d : returnResults.values()) {
if (d instanceof IComplexData) {
((IComplexData) d).dispose();
}
}
}
}
ExecuteResponse response = new ExecuteResponse(this);
return response;
}
/**
* Gets the identifier of the algorithm the client requested
*
* @return An identifier
*/
public String getAlgorithmIdentifier() {
// Fix for bug https://bugzilla.52north.org/show_bug.cgi?id=906
if (getExecute().getIdentifier() != null) {
return getExecute().getIdentifier().getStringValue();
}
return null;
}
/**
* Gets the Execute that is associated with this Request
*
* @return The Execute
*/
public Execute getExecute() {
return execDom.getExecute();
}
public Map<String, IData> getAttachedResult() {
return returnResults;
}
public boolean isStoreResponse() {
if (execDom.getExecute().getResponseForm() == null) {
return false;
}
if (execDom.getExecute().getResponseForm().getRawDataOutput() != null) {
return false;
}
return execDom.getExecute().getResponseForm().getResponseDocument().getStoreExecuteResponse();
}
public boolean isQuickStatus() {
if (execDom.getExecute().getResponseForm() == null) {
return false;
}
if (execDom.getExecute().getResponseForm().getRawDataOutput() != null) {
return false;
}
return execDom.getExecute().getResponseForm().getResponseDocument().getStatus();
}
public ExecuteResponseBuilder getExecuteResponseBuilder() {
return this.execRespType;
}
public boolean isRawData() {
if (execDom.getExecute().getResponseForm() == null) {
return false;
}
if (execDom.getExecute().getResponseForm().getRawDataOutput() != null) {
return true;
} else {
return false;
}
}
public void update(ISubject subject) {
Object state = subject.getState();
LOGGER.info("Update received from Subject, state changed to : " + state);
StatusType status = StatusType.Factory.newInstance();
int percentage = 0;
if (state instanceof Integer) {
percentage = (Integer) state;
status.addNewProcessStarted().setPercentCompleted(percentage);
} else if (state instanceof String) {
status.addNewProcessStarted().setStringValue((String) state);
}
updateStatus(status);
}
public void updateStatusAccepted() {
StatusType status = StatusType.Factory.newInstance();
status.setProcessAccepted("Process Accepted");
updateStatus(status);
}
public void updateStatusStarted() {
StatusType status = StatusType.Factory.newInstance();
status.addNewProcessStarted().setPercentCompleted(0);
updateStatus(status);
}
public void updateStatusSuccess() {
StatusType status = StatusType.Factory.newInstance();
status.setProcessSucceeded("Process successful");
updateStatus(status);
}
public void updateStatusError(String errorMessage) {
StatusType status = StatusType.Factory.newInstance();
net.opengis.ows.x11.ExceptionReportDocument.ExceptionReport excRep = status.addNewProcessFailed().addNewExceptionReport();
excRep.setVersion("1.0.0");
ExceptionType excType = excRep.addNewException();
excType.addNewExceptionText().setStringValue(errorMessage);
excType.setExceptionCode(ExceptionReport.NO_APPLICABLE_CODE);
updateStatus(status);
}
private void updateStatus(StatusType status) {
getExecuteResponseBuilder().setStatus(status);
try {
getExecuteResponseBuilder().update();
if (isStoreResponse()) {
ExecuteResponse executeResponse = new ExecuteResponse(this);
InputStream is = null;
try {
is = executeResponse.getAsStream();
DatabaseFactory.getDatabase().storeResponse(wpsid, is);
} finally {
IOUtils.closeQuietly(is);
}
}
} catch (ExceptionReport e) {
LOGGER.error("Update of process status failed.", e);
throw new RuntimeException(e);
}
}
private void storeRequest(ExecuteDocument executeDocument) {
InputStream is = null;
try {
is = executeDocument.newInputStream();
DatabaseFactory.getDatabase().insertRequest(wpsid, is, true);
//DatabaseFactory.getDatabase().insertRequest(wpsid, is, false);
} catch (Exception e) {
LOGGER.error("Exception storing ExecuteRequest", e);
} finally {
IOUtils.closeQuietly(is);
}
}
private void storeRequest(CaseInsensitiveMap map) {
BufferedWriter w = null;
ByteArrayOutputStream os = null;
ByteArrayInputStream is = null;
try {
os = new ByteArrayOutputStream();
w = new BufferedWriter(new OutputStreamWriter(os));
for (Object key : map.keySet()) {
Object value = map.get(key);
String valueString = "";
if (value instanceof String[]) {
valueString = ((String[]) value)[0];
} else {
valueString = value.toString();
}
w.append(key.toString()).append('=').append(valueString);
w.newLine();
}
w.flush();
is = new ByteArrayInputStream(os.toByteArray());
DatabaseFactory.getDatabase().insertRequest(wpsid, is, false);
} catch (Exception e) {
LOGGER.error("Exception storing ExecuteRequest", e);
} finally {
IOUtils.closeQuietly(w);
IOUtils.closeQuietly(os);
IOUtils.closeQuietly(is);
}
}
}

View File

@ -3,8 +3,6 @@ package org.gcube.data.analysis.wps;
import java.io.InputStream;
import org.n52.wps.server.ExceptionReport;
import org.n52.wps.server.request.ExecuteRequest;
import org.n52.wps.server.response.ExecuteResponseBuilder;
import org.n52.wps.server.response.Response;
public class ExecuteResponse extends Response {

View File

@ -0,0 +1,392 @@
package org.gcube.data.analysis.wps;
import java.io.InputStream;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.Calendar;
import javax.xml.XMLConstants;
import javax.xml.namespace.QName;
import net.opengis.ows.x11.DomainMetadataType;
import net.opengis.ows.x11.LanguageStringType;
import net.opengis.wps.x100.DataInputsType;
import net.opengis.wps.x100.DocumentOutputDefinitionType;
import net.opengis.wps.x100.ExecuteResponseDocument;
import net.opengis.wps.x100.ExecuteResponseDocument.ExecuteResponse;
import net.opengis.wps.x100.OutputDefinitionType;
import net.opengis.wps.x100.OutputDescriptionType;
import net.opengis.wps.x100.ProcessDescriptionType;
import net.opengis.wps.x100.StatusType;
import org.apache.xmlbeans.XmlCursor;
import org.n52.wps.commons.WPSConfig;
import org.n52.wps.io.data.IBBOXData;
import org.n52.wps.io.data.IData;
import org.n52.wps.server.ExceptionReport;
import org.n52.wps.server.RepositoryManager;
import org.n52.wps.server.WebProcessingService;
import org.n52.wps.server.request.Request;
import org.n52.wps.server.response.OutputDataItem;
import org.n52.wps.server.response.RawData;
import org.n52.wps.util.XMLBeansHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* WPS Execute operation response. By default, this XML document is delivered to the client in response to an Execute request. If "status" is "false" in the Execute operation request, this document is normally returned when process execution has been completed.
* If "status" in the Execute request is "true", this response shall be returned as soon as the Execute request has been accepted for processing. In this case, the same XML document is also made available as a web-accessible resource from the URL identified in the statusLocation, and the WPS server shall repopulate it once the process has completed. It may repopulate it on an ongoing basis while the process is executing.
* However, the response to an Execute request will not include this element in the special case where the output is a single complex value result and the Execute request indicates that "store" is "false".
* Instead, the server shall return the complex result (e.g., GIF image or GML) directly, without encoding it in the ExecuteResponse. If processing fails in this special case, the normal ExecuteResponse shall be sent, with the error condition indicated. This option is provided to simplify the programming required for simple clients and for service chaining.
* @author Timon ter Braak
*
*/
public class ExecuteResponseBuilder {
private String identifier;
private DataInputsType dataInputs;
//private DocumentOutputDefinitionType[] outputDefs;
private ExecuteRequest request;
private ExecuteResponseDocument doc;
private RawData rawDataHandler = null;
private ProcessDescriptionType description;
private static Logger LOGGER = LoggerFactory.getLogger(ExecuteResponseBuilder.class);
private Calendar creationTime;
String webPath;
String webStatus;
public ExecuteResponseBuilder(ExecuteRequest request) throws ExceptionReport{
LOGGER.debug("Building Doc");
this.request = request;
doc = ExecuteResponseDocument.Factory.newInstance();
doc.addNewExecuteResponse();
XmlCursor c = doc.newCursor();
c.toFirstChild();
c.toLastAttribute();
c.setAttributeText(new QName(XMLConstants.W3C_XML_SCHEMA_INSTANCE_NS_URI, "schemaLocation"), "http://www.opengis.net/wps/1.0.0 http://schemas.opengis.net/wps/1.0.0/wpsExecute_response.xsd");
String webapp = WPSConfig.getInstance().getWPSConfig().getServer().getWebappPath();
if (webapp == null)
webapp = "wps";
String host = WPSConfig.getInstance().getWPSConfig().getServer().getHostname();
if (host.toLowerCase().equals("localhost"))
try {
host = Inet4Address.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
LOGGER.error("error",e);
}
String port = WPSConfig.getInstance().getWPSConfig().getServer().getHostport();
LOGGER.debug("Host: " + host + " Port: " + port + " Webapp: " + webapp + " ");
webPath = "http://" + host + ":" + port + "/" + webapp + "/WebProcessingService";
webStatus = "http://" + host + ":" + port + "/" + webapp + "/RetrieveResultServlet";
//statistical-manager-new.d4science.org:8080/wps/WebProcessingService?Request=GetCapabilities&Service=WPS
// doc.getExecuteResponse().setServiceInstance(webPath+"?REQUEST=GetCapabilities&SERVICE=WPS");
doc.getExecuteResponse().setServiceInstance(webPath);
doc.getExecuteResponse().setLang(WebProcessingService.DEFAULT_LANGUAGE);
doc.getExecuteResponse().setService("WPS");
doc.getExecuteResponse().setVersion(Request.SUPPORTED_VERSION);
LOGGER.debug("Doc attributes set");
this.identifier = request.getExecute().getIdentifier().getStringValue().trim();
LOGGER.debug("Identifier: "+identifier);
ExecuteResponse responseElem = doc.getExecuteResponse();
responseElem.addNewProcess().addNewIdentifier().setStringValue(identifier);
LOGGER.debug("Getting description for "+identifier);
description = RepositoryManager.getInstance().getProcessDescription(this.identifier);
LOGGER.debug("Description "+description);
if(description==null){
// throw new RuntimeException("Error while accessing the process description for "+ identifier);
}
responseElem.getProcess().setTitle(description.getTitle());
responseElem.getProcess().setProcessVersion(description.getProcessVersion());
creationTime = Calendar.getInstance();
LOGGER.debug("Execute Response Created!");
}
public void update() throws ExceptionReport {
// copying the request parameters to the response
ExecuteResponse responseElem = doc.getExecuteResponse();
// if status succeeded, update reponse with result
if (responseElem.getStatus().isSetProcessSucceeded()) {
// the response only include dataInputs, if the property is set to true;
//if(Boolean.getBoolean(WPSConfiguration.getInstance().getProperty(WebProcessingService.PROPERTY_NAME_INCLUDE_DATAINPUTS_IN_RESPONSE))) {
if(new Boolean(WPSConfig.getInstance().getWPSConfig().getServer().getIncludeDataInputsInResponse())){
dataInputs = request.getExecute().getDataInputs();
responseElem.setDataInputs(dataInputs);
}
responseElem.addNewProcessOutputs();
// has the client specified the outputs?
if (request.getExecute().isSetResponseForm()) {
// Get the outputdescriptions from the algorithm
OutputDescriptionType[] outputDescs = description.getProcessOutputs().getOutputArray();
if(request.isRawData()) {
OutputDefinitionType rawDataOutput = request.getExecute().getResponseForm().getRawDataOutput();
String id = rawDataOutput.getIdentifier().getStringValue();
OutputDescriptionType desc = XMLBeansHelper.findOutputByID(id, outputDescs);
if(desc.isSetComplexOutput()) {
String encoding = ExecuteResponseBuilder.getEncoding(desc, rawDataOutput);
String schema = ExecuteResponseBuilder.getSchema(desc, rawDataOutput);
String responseMimeType = getMimeType(rawDataOutput);
generateComplexDataOutput(id, false, true, schema, responseMimeType, encoding, null);
}
else if (desc.isSetLiteralOutput()) {
String mimeType = null;
String schema = null;
String encoding = null;
DomainMetadataType dataType = desc.getLiteralOutput().getDataType();
String reference = dataType != null ? dataType.getReference() : null;
generateLiteralDataOutput(id, doc, true, reference, schema, mimeType, encoding, desc.getTitle());
}
else if (desc.isSetBoundingBoxOutput()) {
generateBBOXOutput(id, doc, true, desc.getTitle());
}
return;
}
// Get the outputdefinitions from the clients request
// For each request of output
for(int i = 0; i<request.getExecute().getResponseForm().getResponseDocument().getOutputArray().length; i++) {
OutputDefinitionType definition = request.getExecute().getResponseForm().getResponseDocument().getOutputArray(i);
DocumentOutputDefinitionType documentDef = request.getExecute().getResponseForm().getResponseDocument().getOutputArray(i);
String responseID = definition.getIdentifier().getStringValue();
OutputDescriptionType desc = XMLBeansHelper.findOutputByID(responseID, outputDescs);
if(desc==null){
throw new ExceptionReport("Could not find the output id " + responseID, ExceptionReport.INVALID_PARAMETER_VALUE);
}
if(desc.isSetComplexOutput()) {
String mimeType = getMimeType(definition);
String schema = ExecuteResponseBuilder.getSchema(desc, definition);
String encoding = ExecuteResponseBuilder.getEncoding(desc, definition);
generateComplexDataOutput(responseID, documentDef.getAsReference(), false, schema, mimeType, encoding, desc.getTitle());
}
else if (desc.isSetLiteralOutput()) {
String mimeType = null;
String schema = null;
String encoding = null;
DomainMetadataType dataType = desc.getLiteralOutput().getDataType();
String reference = dataType != null ? dataType.getReference() : null;
generateLiteralDataOutput(responseID, doc, false, reference, schema, mimeType, encoding, desc.getTitle());
}
else if (desc.isSetBoundingBoxOutput()) {
generateBBOXOutput(responseID, doc, false, desc.getTitle());
}
else{
throw new ExceptionReport("Requested type not supported: BBOX", ExceptionReport.INVALID_PARAMETER_VALUE);
}
}
}
else {
LOGGER.info("OutputDefinitions are not stated explicitly in request");
// THIS IS A WORKAROUND AND ACTUALLY NOT COMPLIANT TO THE SPEC.
ProcessDescriptionType description = RepositoryManager.getInstance().getProcessDescription(request.getExecute().getIdentifier().getStringValue());
if(description==null){
throw new RuntimeException("Error while accessing the process description for "+ request.getExecute().getIdentifier().getStringValue());
}
OutputDescriptionType [] d = description.getProcessOutputs().getOutputArray();
for (int i = 0; i < d.length; i++)
{
if(d[i].isSetComplexOutput()) {
String schema = d[i].getComplexOutput().getDefault().getFormat().getSchema();
String encoding = d[i].getComplexOutput().getDefault().getFormat().getEncoding();
String mimeType = d[i].getComplexOutput().getDefault().getFormat().getMimeType();
generateComplexDataOutput(d[i].getIdentifier().getStringValue(), false, false, schema, mimeType, encoding, d[i].getTitle());
}
else if(d[i].isSetLiteralOutput()) {
generateLiteralDataOutput(d[i].getIdentifier().getStringValue(), doc, false, d[i].getLiteralOutput().getDataType().getReference(), null, null, null, d[i].getTitle());
}
}
}
} else if(request.isStoreResponse()) {
//statusLocation="http://localhost:8080/wps/RetrieveResultServlet?id=e4defcf6-d39f-48bf-8128-5591dca13dcb"
// responseElem.setStatusLocation(DatabaseFactory.getDatabase().generateRetrieveResultURL((request.getUniqueId()).toString()));
responseElem.setStatusLocation(webStatus+"?id="+request.getUniqueId());
}
}
/**
* Returns the schema according to the given output description and type.
*/
private static String getSchema(OutputDescriptionType desc, OutputDefinitionType def) {
String schema = null;
if(def != null) {
schema = def.getSchema();
}
return schema;
}
private static String getEncoding(OutputDescriptionType desc, OutputDefinitionType def) {
String encoding = null;
if(def != null) {
encoding = def.getEncoding();
}
return encoding;
}
public String getMimeType() {
return getMimeType(null);
}
public String getMimeType(OutputDefinitionType def) {
String mimeType = "";
OutputDescriptionType[] outputDescs = description.getProcessOutputs()
.getOutputArray();
boolean isResponseForm = request.getExecute().isSetResponseForm();
String inputID = "";
if(def != null){
inputID = def.getIdentifier().getStringValue();
}else if(isResponseForm){
if (request.getExecute().getResponseForm().isSetRawDataOutput()) {
inputID = request.getExecute().getResponseForm().getRawDataOutput()
.getIdentifier().getStringValue();
} else if (request.getExecute().getResponseForm()
.isSetResponseDocument()) {
inputID = request.getExecute().getResponseForm()
.getResponseDocument().getOutputArray(0).getIdentifier()
.getStringValue();
}
}
OutputDescriptionType outputDes = null;
for (OutputDescriptionType tmpOutputDes : outputDescs) {
if (inputID.equalsIgnoreCase(tmpOutputDes.getIdentifier()
.getStringValue())) {
outputDes = tmpOutputDes;
break;
}
}
if (isResponseForm) {
// Get the outputdescriptions from the algorithm
if (request.isRawData()) {
mimeType = request.getExecute().getResponseForm()
.getRawDataOutput().getMimeType();
} else {
// mimeType = "text/xml";
// MSS 03/02/2009 defaulting to text/xml doesn't work when the
// data is a complex raster
if (outputDes.isSetLiteralOutput()) {
mimeType = "text/plain";
} else if(outputDes.isSetBoundingBoxOutput()){
mimeType = "text/xml";
} else {
if (def != null) {
mimeType = def.getMimeType();
} else {
if (outputDes.isSetComplexOutput()) {
mimeType = outputDes.getComplexOutput()
.getDefault().getFormat().getMimeType();
LOGGER.warn("Using default mime type: "
+ mimeType
+ " for input: "
+ inputID);
}
}
}
}
}
if (mimeType == null) {
if (outputDes.isSetLiteralOutput()) {
mimeType = "text/plain";
} else if(outputDes.isSetBoundingBoxOutput()){
mimeType = "text/xml";
} else if (outputDes.isSetComplexOutput()) {
mimeType = outputDes.getComplexOutput().getDefault()
.getFormat().getMimeType();
LOGGER.warn("Using default mime type: " + mimeType
+ " for input: "
+ inputID);
}
}
return mimeType;
}
private void generateComplexDataOutput(String responseID, boolean asReference, boolean rawData, String schema, String mimeType, String encoding, LanguageStringType title) throws ExceptionReport{
IData obj = request.getAttachedResult().get(responseID);
if(rawData) {
rawDataHandler = new RawData(obj, responseID, schema, encoding, mimeType, this.identifier, description);
}
else {
OutputDataItem handler = new OutputDataItem(obj, responseID, schema, encoding, mimeType, title, this.identifier, description);
if(asReference) {
handler.updateResponseAsReference(doc, (request.getUniqueId()).toString(),mimeType);
}
else {
handler.updateResponseForInlineComplexData(doc);
}
}
}
private void generateLiteralDataOutput(String responseID, ExecuteResponseDocument res, boolean rawData, String dataTypeReference, String schema, String mimeType, String encoding, LanguageStringType title) throws ExceptionReport {
IData obj = request.getAttachedResult().get(responseID);
if(rawData) {
rawDataHandler = new RawData(obj, responseID, schema, encoding, mimeType, this.identifier, description);
}else{
OutputDataItem handler = new OutputDataItem(obj, responseID, schema, encoding, mimeType, title, this.identifier, description);
handler.updateResponseForLiteralData(res, dataTypeReference);
}
}
private void generateBBOXOutput(String responseID, ExecuteResponseDocument res, boolean rawData, LanguageStringType title) throws ExceptionReport {
IBBOXData obj = (IBBOXData) request.getAttachedResult().get(responseID);
if(rawData) {
rawDataHandler = new RawData(obj, responseID, null, null, null, this.identifier, description);
}else{
OutputDataItem handler = new OutputDataItem(obj, responseID, null, null, null, title, this.identifier, description);
handler.updateResponseForBBOXData(res, obj);
}
}
public InputStream getAsStream() throws ExceptionReport{
if(request.isRawData() && rawDataHandler != null) {
return rawDataHandler.getAsStream();
}
if(request.isStoreResponse()) {
String id = request.getUniqueId().toString();
// String statusLocation = DatabaseFactory.getDatabase().generateRetrieveResultURL(id);
String statusLocation = webStatus+"?id="+id;
doc.getExecuteResponse().setStatusLocation(statusLocation);
}
try {
return doc.newInputStream(XMLBeansHelper.getXmlOptions());
}
catch(Exception e) {
throw new RuntimeException(e);
}
}
public void setStatus(StatusType status) {
//workaround, should be generated either at the creation of the document or when the process has been finished.
status.setCreationTime(creationTime);
doc.getExecuteResponse().setStatus(status);
}
}

View File

@ -49,7 +49,6 @@ import org.n52.wps.server.WebProcessingService;
import org.n52.wps.server.handler.RequestExecutor;
import org.n52.wps.server.request.CapabilitiesRequest;
import org.n52.wps.server.request.DescribeProcessRequest;
import org.n52.wps.server.request.ExecuteRequest;
import org.n52.wps.server.request.Request;
import org.n52.wps.server.request.RetrieveResultRequest;
import org.n52.wps.server.response.Response;

View File

@ -0,0 +1,24 @@
package org.gcube.data.analysis.wps.processes;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils.Cancellable;
public class Processes {
private Map<String, Cancellable> runningProcesses = Collections.synchronizedMap(new HashMap<String, Cancellable>());
private Map<String, Cancellable> cancelledProcesses = Collections.synchronizedMap(new WeakHashMap<String, Cancellable>());
public Map<String, Cancellable> getRunningProcesses() {
return runningProcesses;
}
public Map<String, Cancellable> getCancelledProcesses() {
return cancelledProcesses;
}
}

View File

@ -25,7 +25,7 @@ no. 654119), SoBigData (grant no. 654024);
Version
--------------------------------------------------
1.0.0-SNAPSHOT (2017-06-29)
1.0.0-SNAPSHOT (${maven.build.timestamp})
Please see the file named "changelog.xml" in this directory for the release notes.