Gianpaolo Coro 2014-02-18 16:04:47 +00:00
parent bf4216a2c4
commit 883517d4d9
5 changed files with 157 additions and 39 deletions

View File

@ -3,7 +3,7 @@ package org.gcube.dataanalysis.geo.wps.client;
import java.math.BigInteger;
import java.net.URL;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.LinkedHashMap;
import java.util.List;
import net.opengis.wps.x100.CapabilitiesDocument;
@ -16,6 +16,9 @@ import net.opengis.wps.x100.InputType;
import net.opengis.wps.x100.OutputDescriptionType;
import net.opengis.wps.x100.ProcessBriefType;
import net.opengis.wps.x100.ProcessDescriptionType;
import net.opengis.wps.x100.ResponseDocumentType;
import net.opengis.wps.x100.StatusType;
import net.opengis.wps.x100.impl.ExecuteResponseDocumentImpl;
import org.apache.xmlbeans.XmlString;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
@ -31,6 +34,7 @@ public class WPSClient {
private String wpsServiceURL;
private InputDescriptionType[] currentInputs;
private OutputDescriptionType[] currentOutputs;
public float wpsstatus = 0;
public OutputDescriptionType[] getCurrentOutputs() {
return currentOutputs;
@ -41,14 +45,14 @@ public class WPSClient {
}
private List<StatisticalType> currentInputStatisticalTypes;
private Hashtable<String, StatisticalType> currentOutputStatisticalTypes;
private LinkedHashMap<String, StatisticalType> currentOutputStatisticalTypes;
private ProcessDescriptionType currentProcessDescription;
public Hashtable<String, StatisticalType> getCurrentOutputStatisticalTypes() {
public LinkedHashMap<String, StatisticalType> getCurrentOutputStatisticalTypes() {
return currentOutputStatisticalTypes;
}
public void setCurrentOutputStatisticalTypes(Hashtable<String, StatisticalType> currentOutputStatisticalTypes) {
public void setCurrentOutputStatisticalTypes(LinkedHashMap<String, StatisticalType> currentOutputStatisticalTypes) {
this.currentOutputStatisticalTypes = currentOutputStatisticalTypes;
}
@ -141,7 +145,7 @@ public class WPSClient {
AnalysisLogger.getLogger().debug("WPSClient->Fetching Outputs");
OutputDescriptionType[] outputList = processDescription.getProcessOutputs().getOutputArray();
currentOutputStatisticalTypes = new Hashtable<String, StatisticalType>();
currentOutputStatisticalTypes = new LinkedHashMap<String, StatisticalType>();
currentOutputs = outputList;
for (OutputDescriptionType output : outputList) {
AnalysisLogger.getLogger().debug("WPSClient->Output id:" + output.getIdentifier().getStringValue());
@ -222,18 +226,77 @@ public class WPSClient {
ExecuteDocument execute = executeBuilder.getExecute();
execute.getExecute().setService("WPS");
// System.out.println("RESPONSE FORM:"+execute.getExecute().getResponseForm());
WPSClientSession wpsClient = WPSClientSession.getInstance();
try {
wpsClient.connect(wpsServiceURL);
AnalysisLogger.getLogger().debug("Sending:\n" + execute);
if (execute.getExecute().getResponseForm() != null) {
ResponseDocumentType documentType = execute.getExecute().getResponseForm().getResponseDocument();
documentType.setStoreExecuteResponse(true);
documentType.setStatus(true);
documentType.setLineage(false);
execute.getExecute().getResponseForm().setResponseDocument(documentType);
}
boolean end = false;
Object responseObject = wpsClient.execute(wpsServiceURL, execute);
AnalysisLogger.getLogger().debug("Response:\n" + responseObject);
if (responseObject instanceof ExecuteResponseDocument) {
ExecuteResponseDocument response = (ExecuteResponseDocument) responseObject;
return response.getExecuteResponse().getProcessOutputs();
} else
String statusLocation = null;
if (responseObject != null)
statusLocation = ((ExecuteResponseDocumentImpl) responseObject).getExecuteResponse().getStatusLocation();
else
throw new Exception("" + responseObject);
while (!end) {
// AnalysisLogger.getLogger().debug("Response:\n" + responseObject);
if (responseObject instanceof ExecuteResponseDocumentImpl) {
// AnalysisLogger.getLogger().debug("ResponseImpl:\n" + responseObject);
StatusType statusType = ((ExecuteResponseDocumentImpl) responseObject).getExecuteResponse().getStatus();
int status = statusType.getProcessStarted() == null ? -1 : statusType.getProcessStarted().getPercentCompleted();
String failure = statusType.getProcessFailed() == null ? null : statusType.getProcessFailed().getExceptionReport().toString();
String accepted = statusType.getProcessAccepted() == null ? null : statusType.getProcessAccepted();
String success = statusType.getProcessSucceeded() == null ? null : statusType.getProcessSucceeded();
String paused = statusType.getProcessPaused() == null ? null : statusType.getProcessPaused().getStringValue();
if ((failure != null && failure.length() > 0) || (paused != null && paused.length() > 0)) {
AnalysisLogger.getLogger().debug("WPS FAILURE: " + failure + " OR PAUSED: " + paused);
wpsstatus = 100f;
throw new Exception(failure);
} else if (accepted != null && accepted.length() > 0) {
AnalysisLogger.getLogger().debug("WPS ACCEPTED");
wpsstatus = 0f;
} else if (success != null && success.length() > 0) {
AnalysisLogger.getLogger().debug("WPS SUCCESS");
wpsstatus = 100f;
end = true;
} else if (status >= 0) {
Float statusd = (float) status;
try {
statusd = Float.parseFloat(statusType.getProcessStarted().getStringValue());
} catch (Exception e) {
}
AnalysisLogger.getLogger().debug("WPS STATUS:" + statusd);
wpsstatus = statusd;
}
Thread.sleep(2000);
if (statusLocation!=null && statusLocation.length()>0)
responseObject = wpsClient.executeViaGET(statusLocation, "");
else
if (wpsstatus!=100)
throw new Exception("Cannot retrieve process status");
// AnalysisLogger.getLogger().debug("ResponseOBJ:\n" + responseObject);
} else
throw new Exception("" + responseObject);
}
AnalysisLogger.getLogger().debug("Response:\n" + responseObject);
wpsstatus = 100f;
return ((ExecuteResponseDocument) responseObject).getExecuteResponse().getProcessOutputs();
} catch (Exception e) {
e.printStackTrace();
throw e;
@ -244,16 +307,19 @@ public class WPSClient {
public static void main(String[] args) throws Exception {
AnalysisLogger.setLogger("./cfg/ALog.properties");
WPSClient client = new WPSClient("http://wps01.i-marine.d4science.org/wps/WebProcessingService");
// WPSClient client = new WPSClient("http://geoprocessing.demo.52north.org:8080/wps/WebProcessingService");
// WPSClient client = new WPSClient("http://wps01.i-marine.d4science.org/wps/WebProcessingService");
WPSClient client = new WPSClient("http://geoprocessing.demo.52north.org:8080/wps/WebProcessingService");
client.requestGetCapabilities();
// client.describeProcess("com.terradue.wps_hadoop.processes.examples.async.Async", new URL("file:///C:/Users/coro/Desktop/WorkFolder/Workspace/EcologicalEngineWPSExtension/cfg/test.xml"));
// client.describeProcess("org.n52.wps.extension.GetFuelPriceProcess");
// client.describeProcess("org.n52.wps.server.algorithm.test.DummyTestClass");
// client.describeProcess("org.n52.wps.server.algorithm.coordinatetransform.CoordinateTransformAlgorithm");
// client.describeProcess("org.n52.wps.extension.GetFuelPriceProcess");
client.describeProcess("com.terradue.wps_hadoop.processes.examples.async.Async");
// client.describeProcess("com.terradue.wps_hadoop.processes.examples.async.Async");
client.describeProcess("org.n52.wps.server.algorithm.SimpleBufferAlgorithm");
}
public static int calculateBBDimensions(String bbstring) {
@ -281,11 +347,11 @@ public class WPSClient {
// bboxInput=46,102,47,103,urn:ogc:def:crs:EPSG:6.6:4326,2
String[] bbinput = BBstring.split(",");
int dimensions = calculateBBDimensions(BBstring);
List lc = new ArrayList<String>();
List<String> lc = new ArrayList<String>();
for (int i = 0; i < dimensions / 2; i++) {
lc.add(bbinput[i]);
}
List uc = new ArrayList<String>();
List<String> uc = new ArrayList<String>();
for (int i = dimensions / 2; i < dimensions; i++) {
uc.add(bbinput[i]);
}
@ -293,7 +359,7 @@ public class WPSClient {
bbtype.setLowerCorner(lc);
bbtype.setUpperCorner(uc);
int crsidx = bbinput[dimensions].indexOf("crs:");
// int crsidx = bbinput[dimensions].indexOf("crs:");
String crs = bbinput[dimensions];
/*
* if (crsidx>=0) crs = bbinput[dimensions].substring(crsidx+4);
@ -320,7 +386,7 @@ public class WPSClient {
if (nChildren == 0) {
String text = node.getNodeValue();
if (text.startsWith("https:")||text.startsWith("http:") || text.startsWith("ftp:") || text.startsWith("smp:")|| text.startsWith("file:"))
if (text!= null && (text.startsWith("https:") || text.startsWith("http:") || text.startsWith("ftp:") || text.startsWith("smp:") || text.startsWith("file:")))
urls.add(text.trim());
} else {
for (int i = 0; i < nChildren; i++) {

View File

@ -16,8 +16,10 @@ import org.gcube.dataanalysis.geo.wps.interfaces.WPSProcess;
public class DynamicWPSTransducerer implements DynamicTransducer{
public Map<String, Transducerer> getTransducers(AlgorithmConfiguration config) {
if (transducerersP!=null && !isTooMuchTime())
return transducerersP;
Map<String, Transducerer> transducerers = new LinkedHashMap<String,Transducerer>();
//get the list of endpoints from the IS
List<String> wpsendpoints = getWPSendpoints(config);
@ -32,27 +34,48 @@ public class DynamicWPSTransducerer implements DynamicTransducer{
WPSProcess process = new WPSProcess(wpsendpoint, processInfo.getIdentifier().getStringValue());
process.setConfiguration(config);
transducerers.put(processInfo.getIdentifier().getStringValue(),process);
// break;
}
}
} catch (Exception e) {
e.printStackTrace();
AnalysisLogger.getLogger().debug("Error in retrieving information by WPS Server: "+e.getLocalizedMessage());
}
if (transducerers.size()>0)
transducerersP=transducerers;
return transducerers;
}
static Map<String, Transducerer> transducerersP = null;
static long t0 = System.currentTimeMillis();
static long maxtime = 60*60*1000; //1h
public static boolean isTooMuchTime(){
if (System.currentTimeMillis()-t0>maxtime){
t0 = System.currentTimeMillis();
return true;
}
else
return false;
}
//gets the list of endpoints from the IS
public static List<String> getWPSendpoints(AlgorithmConfiguration config) {
List<String> wps = new ArrayList<String>();
AnalysisLogger.setLogger(config.getConfigPath()+AlgorithmConfiguration.defaultLoggerFile);
// wps.add("http://wps01.i-marine.d4science.org/wps/WebProcessingService");
AnalysisLogger.getLogger().debug("WPS: searching for wps servers in the scope: "+config.getGcubeScope());
wps = org.gcube.dataanalysis.executor.util.IfraRetrieval.retrieveAddresses("WPS", config.getGcubeScope(),"StatisticalManager");
if (wps!=null && wps.size()>0)
if (wps!=null && wps.size()>0){
AnalysisLogger.getLogger().debug("WPS: found "+wps.size()+" wps instances");
}
else
AnalysisLogger.getLogger().debug("WPS: found NO wps instances");
return wps;
}

View File

@ -2,9 +2,9 @@ package org.gcube.dataanalysis.geo.wps.interfaces;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedHashMap;
import java.util.List;
import javax.xml.namespace.QName;
import java.util.Map;
import net.opengis.wps.x100.ComplexDataType;
import net.opengis.wps.x100.ExecuteResponseDocument.ExecuteResponse.ProcessOutputs;
@ -14,17 +14,16 @@ import net.opengis.wps.x100.OutputDescriptionType;
import net.opengis.wps.x100.ProcessDescriptionType;
import net.opengis.wps.x100.SupportedComplexDataInputType;
import org.apache.xmlbeans.XmlObject;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE;
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType;
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveTypesList;
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes;
import org.gcube.dataanalysis.ecoengine.interfaces.Transducerer;
import org.gcube.dataanalysis.ecoengine.utils.ResourceFactory;
import org.gcube.dataanalysis.geo.wps.client.WPSClient;
import org.gcube.dataanalysis.geo.wps.factory.DynamicWPSTransducerer;
public class WPSProcess implements Transducerer {
@ -33,11 +32,14 @@ public class WPSProcess implements Transducerer {
public String title;
public String processAbstract;
private List<StatisticalType> inputTypes;
private Hashtable<String, StatisticalType> outputTypes;
private LinkedHashMap<String, StatisticalType> outputTypes;
private InputDescriptionType[] wpsInputs;
private OutputDescriptionType[] wpsOutputs;
private ProcessDescriptionType processDescription;
private WPSClient currentProcess;
public static Map<String,WPSClient> inputsCache;
protected ResourceFactory resourceManager;
private AlgorithmConfiguration config;
float status = 0;
@ -86,6 +88,7 @@ public class WPSProcess implements Transducerer {
}
// Submit the execution
WPSClient client = new WPSClient(wpsurl);
currentProcess=client;
AnalysisLogger.getLogger().debug("Starting Process");
ProcessOutputs outs = client.executeProcess(executeBuilder, processDescription);
// retrieve the output objs
@ -114,7 +117,10 @@ public class WPSProcess implements Transducerer {
AnalysisLogger.getLogger().debug("Assigning value: " + value + " to output named: " + outputID);
}
}
else
//remove the element name, which is not useful
outputTypes.remove(outputID);
ComplexDataType cdt = out.getData().getComplexData();
List<String> urls = WPSClient.retrieveURLsFromWPSResponse(cdt);
@ -140,9 +146,19 @@ public class WPSProcess implements Transducerer {
}
public void init() throws Exception {
// here we build the WPS process by means of the client
WPSClient wpsclient = new WPSClient(wpsurl);
wpsclient.describeProcess(processid);
WPSClient wpsclient = null;
if (inputsCache!=null)
wpsclient = inputsCache.get(processid);
else
inputsCache=new HashMap<String, WPSClient>();
if (wpsclient ==null)
{
// here we build the WPS process by means of the client
wpsclient = new WPSClient(wpsurl);
wpsclient.describeProcess(processid);
inputsCache.put(processid, wpsclient);
}
inputTypes = wpsclient.getCurrentInputStatisticalTypes();
outputTypes = wpsclient.getCurrentOutputStatisticalTypes();
wpsInputs = wpsclient.getCurrentInputs();
@ -151,6 +167,12 @@ public class WPSProcess implements Transducerer {
processAbstract = wpsclient.getCurrentProcessAbstract();
processDescription = wpsclient.getProcessDescription();
if ( DynamicWPSTransducerer.isTooMuchTime()){
inputsCache=null;
System.gc();
}
}
public String getDescription() {
@ -182,7 +204,12 @@ public class WPSProcess implements Transducerer {
}
public float getStatus() {
return status;
if (status ==100f)
return status;
else if (currentProcess!=null)
return Math.min(currentProcess.wpsstatus, 90f);
else
return status;
}
public void setConfiguration(AlgorithmConfiguration config) {

View File

@ -26,7 +26,7 @@ public class WPS2SM {
Abstract = buildParameterDescription(Abstract, null, null, minOcc, maxOcc, null);
if ((maxOcc == 1)||(maxOcc<0)||(maxOcc == 0))
converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, title, Abstract, "");
converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, title, Abstract, " ",true);
else
converted = new PrimitiveTypesList(String.class.getName(), PrimitiveTypes.STRING, title, Abstract, true);
@ -39,9 +39,11 @@ public class WPS2SM {
String guessedType = guessWPSLiteralType(type);
AnalysisLogger.getLogger().debug("Guessed type: " + guessedType);
// rebuild Abstract
if ((defaultValue==null || defaultValue.trim().length()==0) && (guessedType.equals(String.class.getName())))
defaultValue=" ";
Abstract = buildParameterDescription(Abstract, null, uoms, minOcc, maxOcc, defaultValue);
if ((maxOcc == 1)||(maxOcc<0)||(maxOcc == 0))
converted = new PrimitiveType(guessedType, null, PrimitiveTypes.STRING, title, Abstract, defaultValue);
converted = new PrimitiveType(guessedType, null, PrimitiveTypes.STRING, title, Abstract, defaultValue,true);
else
converted = new PrimitiveTypesList(guessedType, PrimitiveTypes.STRING, title, Abstract, true);
return converted;
@ -64,7 +66,7 @@ public class WPS2SM {
// rebuild Abstract
Abstract = buildParameterDescription(Abstract, maxMegaBytes, null, minOcc, maxOcc, null);
if ((maxOcc == 1)||(maxOcc<0)||(maxOcc == 0))
converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, title, Abstract);
converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, title, Abstract," ",true);
else
converted = new PrimitiveTypesList(String.class.getName(), PrimitiveTypes.STRING, title, Abstract, true);
@ -82,7 +84,7 @@ public class WPS2SM {
rangeOccs = 1;
// default
StatisticalType converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, id, Abstract);
StatisticalType converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, id, Abstract," ",true);
if (rangeOccs > 1)
converted = new PrimitiveTypesList(String.class.getName(), PrimitiveTypes.STRING, id, Abstract, true);
@ -119,7 +121,7 @@ public class WPS2SM {
String Abstract = wpsType.getAbstract()!=null?wpsType.getAbstract().getStringValue():"";
// default
StatisticalType converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, id, Abstract);
StatisticalType converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, id, Abstract," ",true);
AnalysisLogger.getLogger().debug("Conversion to SM Type->Output id:" + id);
AnalysisLogger.getLogger().debug("Conversion to SM Type->Abstract:" + Abstract);

View File

@ -27,7 +27,7 @@ public class RegressionTerradueWPSProcess {
AlgorithmConfiguration config = Regressor.getConfig();
config.setParam("secondsDelay", "1");
config.setParam("secondsDelay", "30");
return config;