diff --git a/src/main/java/org/gcube/dataanalysis/geo/wps/client/WPSClient.java b/src/main/java/org/gcube/dataanalysis/geo/wps/client/WPSClient.java index 684b4b3..8ad84ca 100644 --- a/src/main/java/org/gcube/dataanalysis/geo/wps/client/WPSClient.java +++ b/src/main/java/org/gcube/dataanalysis/geo/wps/client/WPSClient.java @@ -3,7 +3,7 @@ package org.gcube.dataanalysis.geo.wps.client; import java.math.BigInteger; import java.net.URL; import java.util.ArrayList; -import java.util.Hashtable; +import java.util.LinkedHashMap; import java.util.List; import net.opengis.wps.x100.CapabilitiesDocument; @@ -16,6 +16,9 @@ import net.opengis.wps.x100.InputType; import net.opengis.wps.x100.OutputDescriptionType; import net.opengis.wps.x100.ProcessBriefType; import net.opengis.wps.x100.ProcessDescriptionType; +import net.opengis.wps.x100.ResponseDocumentType; +import net.opengis.wps.x100.StatusType; +import net.opengis.wps.x100.impl.ExecuteResponseDocumentImpl; import org.apache.xmlbeans.XmlString; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; @@ -31,6 +34,7 @@ public class WPSClient { private String wpsServiceURL; private InputDescriptionType[] currentInputs; private OutputDescriptionType[] currentOutputs; + public float wpsstatus = 0; public OutputDescriptionType[] getCurrentOutputs() { return currentOutputs; @@ -41,14 +45,14 @@ public class WPSClient { } private List currentInputStatisticalTypes; - private Hashtable currentOutputStatisticalTypes; + private LinkedHashMap currentOutputStatisticalTypes; private ProcessDescriptionType currentProcessDescription; - public Hashtable getCurrentOutputStatisticalTypes() { + public LinkedHashMap getCurrentOutputStatisticalTypes() { return currentOutputStatisticalTypes; } - public void setCurrentOutputStatisticalTypes(Hashtable currentOutputStatisticalTypes) { + public void setCurrentOutputStatisticalTypes(LinkedHashMap currentOutputStatisticalTypes) { this.currentOutputStatisticalTypes = currentOutputStatisticalTypes; } @@ -141,7 +145,7 @@ public class WPSClient { AnalysisLogger.getLogger().debug("WPSClient->Fetching Outputs"); OutputDescriptionType[] outputList = processDescription.getProcessOutputs().getOutputArray(); - currentOutputStatisticalTypes = new Hashtable(); + currentOutputStatisticalTypes = new LinkedHashMap(); currentOutputs = outputList; for (OutputDescriptionType output : outputList) { AnalysisLogger.getLogger().debug("WPSClient->Output id:" + output.getIdentifier().getStringValue()); @@ -222,18 +226,77 @@ public class WPSClient { ExecuteDocument execute = executeBuilder.getExecute(); execute.getExecute().setService("WPS"); - + // System.out.println("RESPONSE FORM:"+execute.getExecute().getResponseForm()); WPSClientSession wpsClient = WPSClientSession.getInstance(); try { wpsClient.connect(wpsServiceURL); AnalysisLogger.getLogger().debug("Sending:\n" + execute); + if (execute.getExecute().getResponseForm() != null) { + ResponseDocumentType documentType = execute.getExecute().getResponseForm().getResponseDocument(); + documentType.setStoreExecuteResponse(true); + documentType.setStatus(true); + documentType.setLineage(false); + execute.getExecute().getResponseForm().setResponseDocument(documentType); + } + boolean end = false; Object responseObject = wpsClient.execute(wpsServiceURL, execute); - AnalysisLogger.getLogger().debug("Response:\n" + responseObject); - if (responseObject instanceof ExecuteResponseDocument) { - ExecuteResponseDocument response = (ExecuteResponseDocument) responseObject; - return response.getExecuteResponse().getProcessOutputs(); - } else + String statusLocation = null; + if (responseObject != null) + statusLocation = ((ExecuteResponseDocumentImpl) responseObject).getExecuteResponse().getStatusLocation(); + else throw new Exception("" + responseObject); + + while (!end) { + // AnalysisLogger.getLogger().debug("Response:\n" + responseObject); + + if (responseObject instanceof ExecuteResponseDocumentImpl) { + // AnalysisLogger.getLogger().debug("ResponseImpl:\n" + responseObject); + StatusType statusType = ((ExecuteResponseDocumentImpl) responseObject).getExecuteResponse().getStatus(); + int status = statusType.getProcessStarted() == null ? -1 : statusType.getProcessStarted().getPercentCompleted(); + String failure = statusType.getProcessFailed() == null ? null : statusType.getProcessFailed().getExceptionReport().toString(); + String accepted = statusType.getProcessAccepted() == null ? null : statusType.getProcessAccepted(); + String success = statusType.getProcessSucceeded() == null ? null : statusType.getProcessSucceeded(); + String paused = statusType.getProcessPaused() == null ? null : statusType.getProcessPaused().getStringValue(); + + if ((failure != null && failure.length() > 0) || (paused != null && paused.length() > 0)) { + AnalysisLogger.getLogger().debug("WPS FAILURE: " + failure + " OR PAUSED: " + paused); + wpsstatus = 100f; + throw new Exception(failure); + } else if (accepted != null && accepted.length() > 0) { + AnalysisLogger.getLogger().debug("WPS ACCEPTED"); + wpsstatus = 0f; + } else if (success != null && success.length() > 0) { + AnalysisLogger.getLogger().debug("WPS SUCCESS"); + wpsstatus = 100f; + end = true; + } else if (status >= 0) { + Float statusd = (float) status; + try { + statusd = Float.parseFloat(statusType.getProcessStarted().getStringValue()); + } catch (Exception e) { + } + + AnalysisLogger.getLogger().debug("WPS STATUS:" + statusd); + + wpsstatus = statusd; + } + + Thread.sleep(2000); + if (statusLocation!=null && statusLocation.length()>0) + responseObject = wpsClient.executeViaGET(statusLocation, ""); + else + if (wpsstatus!=100) + throw new Exception("Cannot retrieve process status"); + + // AnalysisLogger.getLogger().debug("ResponseOBJ:\n" + responseObject); + } else + throw new Exception("" + responseObject); + + } + + AnalysisLogger.getLogger().debug("Response:\n" + responseObject); + wpsstatus = 100f; + return ((ExecuteResponseDocument) responseObject).getExecuteResponse().getProcessOutputs(); } catch (Exception e) { e.printStackTrace(); throw e; @@ -244,16 +307,19 @@ public class WPSClient { public static void main(String[] args) throws Exception { AnalysisLogger.setLogger("./cfg/ALog.properties"); - WPSClient client = new WPSClient("http://wps01.i-marine.d4science.org/wps/WebProcessingService"); - // WPSClient client = new WPSClient("http://geoprocessing.demo.52north.org:8080/wps/WebProcessingService"); +// WPSClient client = new WPSClient("http://wps01.i-marine.d4science.org/wps/WebProcessingService"); + WPSClient client = new WPSClient("http://geoprocessing.demo.52north.org:8080/wps/WebProcessingService"); client.requestGetCapabilities(); // client.describeProcess("com.terradue.wps_hadoop.processes.examples.async.Async", new URL("file:///C:/Users/coro/Desktop/WorkFolder/Workspace/EcologicalEngineWPSExtension/cfg/test.xml")); // client.describeProcess("org.n52.wps.extension.GetFuelPriceProcess"); // client.describeProcess("org.n52.wps.server.algorithm.test.DummyTestClass"); // client.describeProcess("org.n52.wps.server.algorithm.coordinatetransform.CoordinateTransformAlgorithm"); // client.describeProcess("org.n52.wps.extension.GetFuelPriceProcess"); - client.describeProcess("com.terradue.wps_hadoop.processes.examples.async.Async"); - + + + +// client.describeProcess("com.terradue.wps_hadoop.processes.examples.async.Async"); + client.describeProcess("org.n52.wps.server.algorithm.SimpleBufferAlgorithm"); } public static int calculateBBDimensions(String bbstring) { @@ -281,11 +347,11 @@ public class WPSClient { // bboxInput=46,102,47,103,urn:ogc:def:crs:EPSG:6.6:4326,2 String[] bbinput = BBstring.split(","); int dimensions = calculateBBDimensions(BBstring); - List lc = new ArrayList(); + List lc = new ArrayList(); for (int i = 0; i < dimensions / 2; i++) { lc.add(bbinput[i]); } - List uc = new ArrayList(); + List uc = new ArrayList(); for (int i = dimensions / 2; i < dimensions; i++) { uc.add(bbinput[i]); } @@ -293,7 +359,7 @@ public class WPSClient { bbtype.setLowerCorner(lc); bbtype.setUpperCorner(uc); - int crsidx = bbinput[dimensions].indexOf("crs:"); + // int crsidx = bbinput[dimensions].indexOf("crs:"); String crs = bbinput[dimensions]; /* * if (crsidx>=0) crs = bbinput[dimensions].substring(crsidx+4); @@ -320,7 +386,7 @@ public class WPSClient { if (nChildren == 0) { String text = node.getNodeValue(); - if (text.startsWith("https:")||text.startsWith("http:") || text.startsWith("ftp:") || text.startsWith("smp:")|| text.startsWith("file:")) + if (text!= null && (text.startsWith("https:") || text.startsWith("http:") || text.startsWith("ftp:") || text.startsWith("smp:") || text.startsWith("file:"))) urls.add(text.trim()); } else { for (int i = 0; i < nChildren; i++) { diff --git a/src/main/java/org/gcube/dataanalysis/geo/wps/factory/DynamicWPSTransducerer.java b/src/main/java/org/gcube/dataanalysis/geo/wps/factory/DynamicWPSTransducerer.java index 36a436e..0f22b92 100644 --- a/src/main/java/org/gcube/dataanalysis/geo/wps/factory/DynamicWPSTransducerer.java +++ b/src/main/java/org/gcube/dataanalysis/geo/wps/factory/DynamicWPSTransducerer.java @@ -16,8 +16,10 @@ import org.gcube.dataanalysis.geo.wps.interfaces.WPSProcess; public class DynamicWPSTransducerer implements DynamicTransducer{ + public Map getTransducers(AlgorithmConfiguration config) { - + if (transducerersP!=null && !isTooMuchTime()) + return transducerersP; Map transducerers = new LinkedHashMap(); //get the list of endpoints from the IS List wpsendpoints = getWPSendpoints(config); @@ -32,27 +34,48 @@ public class DynamicWPSTransducerer implements DynamicTransducer{ WPSProcess process = new WPSProcess(wpsendpoint, processInfo.getIdentifier().getStringValue()); process.setConfiguration(config); transducerers.put(processInfo.getIdentifier().getStringValue(),process); +// break; } } } catch (Exception e) { e.printStackTrace(); AnalysisLogger.getLogger().debug("Error in retrieving information by WPS Server: "+e.getLocalizedMessage()); } - + + if (transducerers.size()>0) + transducerersP=transducerers; + return transducerers; } + static Map transducerersP = null; + + static long t0 = System.currentTimeMillis(); + static long maxtime = 60*60*1000; //1h + public static boolean isTooMuchTime(){ + if (System.currentTimeMillis()-t0>maxtime){ + t0 = System.currentTimeMillis(); + return true; + } + else + return false; + } + //gets the list of endpoints from the IS public static List getWPSendpoints(AlgorithmConfiguration config) { + List wps = new ArrayList(); AnalysisLogger.setLogger(config.getConfigPath()+AlgorithmConfiguration.defaultLoggerFile); // wps.add("http://wps01.i-marine.d4science.org/wps/WebProcessingService"); AnalysisLogger.getLogger().debug("WPS: searching for wps servers in the scope: "+config.getGcubeScope()); wps = org.gcube.dataanalysis.executor.util.IfraRetrieval.retrieveAddresses("WPS", config.getGcubeScope(),"StatisticalManager"); - if (wps!=null && wps.size()>0) + if (wps!=null && wps.size()>0){ AnalysisLogger.getLogger().debug("WPS: found "+wps.size()+" wps instances"); + } else AnalysisLogger.getLogger().debug("WPS: found NO wps instances"); + + return wps; } diff --git a/src/main/java/org/gcube/dataanalysis/geo/wps/interfaces/WPSProcess.java b/src/main/java/org/gcube/dataanalysis/geo/wps/interfaces/WPSProcess.java index 523cc72..a429038 100644 --- a/src/main/java/org/gcube/dataanalysis/geo/wps/interfaces/WPSProcess.java +++ b/src/main/java/org/gcube/dataanalysis/geo/wps/interfaces/WPSProcess.java @@ -2,9 +2,9 @@ package org.gcube.dataanalysis.geo.wps.interfaces; import java.util.HashMap; import java.util.Hashtable; +import java.util.LinkedHashMap; import java.util.List; - -import javax.xml.namespace.QName; +import java.util.Map; import net.opengis.wps.x100.ComplexDataType; import net.opengis.wps.x100.ExecuteResponseDocument.ExecuteResponse.ProcessOutputs; @@ -14,17 +14,16 @@ import net.opengis.wps.x100.OutputDescriptionType; import net.opengis.wps.x100.ProcessDescriptionType; import net.opengis.wps.x100.SupportedComplexDataInputType; -import org.apache.xmlbeans.XmlObject; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE; import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; -import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveTypesList; import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; import org.gcube.dataanalysis.ecoengine.interfaces.Transducerer; import org.gcube.dataanalysis.ecoengine.utils.ResourceFactory; import org.gcube.dataanalysis.geo.wps.client.WPSClient; +import org.gcube.dataanalysis.geo.wps.factory.DynamicWPSTransducerer; public class WPSProcess implements Transducerer { @@ -33,11 +32,14 @@ public class WPSProcess implements Transducerer { public String title; public String processAbstract; private List inputTypes; - private Hashtable outputTypes; + private LinkedHashMap outputTypes; private InputDescriptionType[] wpsInputs; private OutputDescriptionType[] wpsOutputs; private ProcessDescriptionType processDescription; - + private WPSClient currentProcess; + + public static Map inputsCache; + protected ResourceFactory resourceManager; private AlgorithmConfiguration config; float status = 0; @@ -86,6 +88,7 @@ public class WPSProcess implements Transducerer { } // Submit the execution WPSClient client = new WPSClient(wpsurl); + currentProcess=client; AnalysisLogger.getLogger().debug("Starting Process"); ProcessOutputs outs = client.executeProcess(executeBuilder, processDescription); // retrieve the output objs @@ -114,7 +117,10 @@ public class WPSProcess implements Transducerer { AnalysisLogger.getLogger().debug("Assigning value: " + value + " to output named: " + outputID); } } - + else + //remove the element name, which is not useful + outputTypes.remove(outputID); + ComplexDataType cdt = out.getData().getComplexData(); List urls = WPSClient.retrieveURLsFromWPSResponse(cdt); @@ -140,9 +146,19 @@ public class WPSProcess implements Transducerer { } public void init() throws Exception { - // here we build the WPS process by means of the client - WPSClient wpsclient = new WPSClient(wpsurl); - wpsclient.describeProcess(processid); + WPSClient wpsclient = null; + if (inputsCache!=null) + wpsclient = inputsCache.get(processid); + else + inputsCache=new HashMap(); + if (wpsclient ==null) + { + // here we build the WPS process by means of the client + wpsclient = new WPSClient(wpsurl); + wpsclient.describeProcess(processid); + inputsCache.put(processid, wpsclient); + } + inputTypes = wpsclient.getCurrentInputStatisticalTypes(); outputTypes = wpsclient.getCurrentOutputStatisticalTypes(); wpsInputs = wpsclient.getCurrentInputs(); @@ -151,6 +167,12 @@ public class WPSProcess implements Transducerer { processAbstract = wpsclient.getCurrentProcessAbstract(); processDescription = wpsclient.getProcessDescription(); + if ( DynamicWPSTransducerer.isTooMuchTime()){ + inputsCache=null; + System.gc(); + } + + } public String getDescription() { @@ -182,7 +204,12 @@ public class WPSProcess implements Transducerer { } public float getStatus() { - return status; + if (status ==100f) + return status; + else if (currentProcess!=null) + return Math.min(currentProcess.wpsstatus, 90f); + else + return status; } public void setConfiguration(AlgorithmConfiguration config) { diff --git a/src/main/java/org/gcube/dataanalysis/geo/wps/mappings/WPS2SM.java b/src/main/java/org/gcube/dataanalysis/geo/wps/mappings/WPS2SM.java index ab5f679..25eb1c8 100644 --- a/src/main/java/org/gcube/dataanalysis/geo/wps/mappings/WPS2SM.java +++ b/src/main/java/org/gcube/dataanalysis/geo/wps/mappings/WPS2SM.java @@ -26,7 +26,7 @@ public class WPS2SM { Abstract = buildParameterDescription(Abstract, null, null, minOcc, maxOcc, null); if ((maxOcc == 1)||(maxOcc<0)||(maxOcc == 0)) - converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, title, Abstract, ""); + converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, title, Abstract, " ",true); else converted = new PrimitiveTypesList(String.class.getName(), PrimitiveTypes.STRING, title, Abstract, true); @@ -39,9 +39,11 @@ public class WPS2SM { String guessedType = guessWPSLiteralType(type); AnalysisLogger.getLogger().debug("Guessed type: " + guessedType); // rebuild Abstract + if ((defaultValue==null || defaultValue.trim().length()==0) && (guessedType.equals(String.class.getName()))) + defaultValue=" "; Abstract = buildParameterDescription(Abstract, null, uoms, minOcc, maxOcc, defaultValue); if ((maxOcc == 1)||(maxOcc<0)||(maxOcc == 0)) - converted = new PrimitiveType(guessedType, null, PrimitiveTypes.STRING, title, Abstract, defaultValue); + converted = new PrimitiveType(guessedType, null, PrimitiveTypes.STRING, title, Abstract, defaultValue,true); else converted = new PrimitiveTypesList(guessedType, PrimitiveTypes.STRING, title, Abstract, true); return converted; @@ -64,7 +66,7 @@ public class WPS2SM { // rebuild Abstract Abstract = buildParameterDescription(Abstract, maxMegaBytes, null, minOcc, maxOcc, null); if ((maxOcc == 1)||(maxOcc<0)||(maxOcc == 0)) - converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, title, Abstract); + converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, title, Abstract," ",true); else converted = new PrimitiveTypesList(String.class.getName(), PrimitiveTypes.STRING, title, Abstract, true); @@ -82,7 +84,7 @@ public class WPS2SM { rangeOccs = 1; // default - StatisticalType converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, id, Abstract); + StatisticalType converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, id, Abstract," ",true); if (rangeOccs > 1) converted = new PrimitiveTypesList(String.class.getName(), PrimitiveTypes.STRING, id, Abstract, true); @@ -119,7 +121,7 @@ public class WPS2SM { String Abstract = wpsType.getAbstract()!=null?wpsType.getAbstract().getStringValue():""; // default - StatisticalType converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, id, Abstract); + StatisticalType converted = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, id, Abstract," ",true); AnalysisLogger.getLogger().debug("Conversion to SM Type->Output id:" + id); AnalysisLogger.getLogger().debug("Conversion to SM Type->Abstract:" + Abstract); diff --git a/src/main/java/org/gcube/dataanalysis/geo/wps/test/regression/RegressionTerradueWPSProcess.java b/src/main/java/org/gcube/dataanalysis/geo/wps/test/regression/RegressionTerradueWPSProcess.java index 284cee1..05e1536 100644 --- a/src/main/java/org/gcube/dataanalysis/geo/wps/test/regression/RegressionTerradueWPSProcess.java +++ b/src/main/java/org/gcube/dataanalysis/geo/wps/test/regression/RegressionTerradueWPSProcess.java @@ -27,7 +27,7 @@ public class RegressionTerradueWPSProcess { AlgorithmConfiguration config = Regressor.getConfig(); - config.setParam("secondsDelay", "1"); + config.setParam("secondsDelay", "30"); return config;