This commit is contained in:
Gianpaolo Coro 2016-05-24 10:17:21 +00:00
parent 306ef9020a
commit c8e8c4a639
7 changed files with 280 additions and 92 deletions

View File

@ -127,3 +127,8 @@ http://dataminer1-d-d4s.d4science.org/wps/WebProcessingService?request=Execute&s
41 - Raster_data_publisher
http://dataminer1-d-d4s.d4science.org/wps/WebProcessingService?Request=DescribeProcess&Service=WPS&Version=1.0.0&gcube-token=4ccc2c35-60c9-4c9b-9800-616538d5d48b&Identifier=org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.transducerers.RASTER_DATA_PUBLISHER
http://dataminer1-d-d4s.d4science.org/wps/WebProcessingService?request=Execute&service=WPS&Version=1.0.0&gcube-token=4ccc2c35-60c9-4c9b-9800-616538d5d48b&lang=en-US&Identifier=org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.transducerers.RASTER_DATA_PUBLISHER&DataInputs=DatasetTitle=testGP;DatasetAbstract=test gp abstract;InnerLayerName=analyzed_field;FileNameOnInfra=test12345.nc;RasterFile=http://goo.gl/3dQif8;Topics=test;SpatialResolution=-1;
42 - BIOCLIMATE_HCAF
http://dataminer1-d-d4s.d4science.org/wps/WebProcessingService?Request=DescribeProcess&Service=WPS&Version=1.0.0&gcube-token=4ccc2c35-60c9-4c9b-9800-616538d5d48b&Identifier=org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.transducerers.BIOCLIMATE_HCAF
http://dataminer1-d-d4s.d4science.org/wps/WebProcessingService?request=Execute&service=WPS&Version=1.0.0&gcube-token=4ccc2c35-60c9-4c9b-9800-616538d5d48b&lang=en-US&Identifier=org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.transducerers.BIOCLIMATE_HCAF&DataInputs=HCAF_Table_List=http://goo.gl/LTqufC|http://goo.gl/LTqufC;HCAF_Table_Names=h1|h2
43- ICHTHYOP_MODEL
http://dataminer1-d-d4s.d4science.org/wps/WebProcessingService?request=Execute&service=WPS&Version=1.0.0&gcube-token=4ccc2c35-60c9-4c9b-9800-616538d5d48b&lang=en-US&Identifier=org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.transducerers.ICHTHYOP_MODEL_ONE_BY_ONE&DataInputs=id_drifter=63908;starting_point_latitude=-13.548;starting_point_longitude=63.757;starting_point_starting_date=2007-01-15;trajectory_temporal_extent=193%20day(s)%2000%20hour(s)%2000%20minute(s)

View File

@ -1,6 +1,8 @@
package org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@ -19,6 +21,7 @@ import org.gcube.dataanalysis.ecoengine.processing.factories.ModelersFactory;
import org.gcube.dataanalysis.ecoengine.processing.factories.TransducerersFactory;
import org.gcube.dataanalysis.ecoengine.utils.DatabaseFactory;
import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils;
import org.gcube.dataanalysis.executor.scripts.OSCommand;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.infrastructure.DatabaseInfo;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.infrastructure.InfrastructureDialoguer;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.infrastructure.TableCoherenceChecker;
@ -114,7 +117,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
public void waitForResources() throws Exception {
while (getRuningComputations() > ConfigurationManager.getMaxComputations()) {
Thread.sleep(2000);
Thread.sleep(20000);
AnalysisLogger.getLogger().debug("Waiting for resources to be available: " + displayRunningComputations());
}
@ -230,6 +233,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
}
public static void deleteGeneratedFiles(List<File> generatedFiles) throws Exception {
System.gc();
if (generatedFiles != null) {
for (File file : generatedFiles) {
if (file.exists()) {
@ -239,6 +243,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
AnalysisLogger.getLogger().debug("Deleting File - File does not exist " + file.getAbsolutePath());
}
}
System.gc();
}
public void manageUserToken() {
@ -256,12 +261,15 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
}
long statusInterrupt = 0;
float previousStatus = -1;
float previousStatus = -3;
public void updateStatus(float status) {
if (agent != null) {
long stream = org.n52.wps.server.database.DatabaseFactory.getDatabase().getContentLengthForStoreResponse(wpsExternalID);
long stream =0;
try{
stream = org.n52.wps.server.database.DatabaseFactory.getDatabase().getContentLengthForStoreResponse(wpsExternalID);
//AnalysisLogger.getLogger().debug("STATUS bytes " + stream + " interrupt bytes " + statusInterrupt);
if (statusInterrupt == 0 || statusInterrupt > stream - 3) {
if (statusInterrupt == 0 || statusInterrupt > stream - 3 && stream != 468) {
statusInterrupt = stream;
} else {
AnalysisLogger.getLogger().debug("STATUS INTERRUPTED!");
@ -269,14 +277,20 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
statusInterrupt = -1;
agent = null;
status = -1f;
super.update(new Integer((int) status));
updateComputationOnWS(status, null);
System.gc();
}
if (status!=previousStatus){
AnalysisLogger.getLogger().debug("STATUS update to:" + status);
AnalysisLogger.getLogger().debug("STATUS update to:" + status+" - status interrupt "+statusInterrupt);
previousStatus=status;
super.update(new Integer((int) status));
updateComputationOnWS(status, null);
}
}catch(Exception e){
AnalysisLogger.getLogger().debug("WARNING: STATUS RETRIEVAL EXCEPTION "+e.getLocalizedMessage());
//stream = statusInterrupt;
}
}
}
public void updateComputationOnWS(float status, String exception) {
@ -394,8 +408,13 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
time("A priori output retrieval");
// run the computation
AnalysisLogger.getLogger().info("9 - Running the computation and updater");
runStatusUpdater();
AnalysisLogger.getLogger().info("Initializing the computation");
updateStatus(0);
agent.init();
AnalysisLogger.getLogger().info("Updating status");
runStatusUpdater();
AnalysisLogger.getLogger().info("Running the computation");
agent.compute();
AnalysisLogger.getLogger().info("The computation has finished. Retrieving output");
time("Execution time");
@ -421,24 +440,31 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
// delete all temporary tables
AnalysisLogger.getLogger().info("12 - Deleting possible generated temporary tables");
AnalysisLogger.getLogger().debug("Final Computation Output: " + outputs);
AnalysisLogger.getLogger().info("12 - Deleting possible generated temporary tables");
AnalysisLogger.getLogger().debug("All done");
endTime = new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(System.currentTimeMillis());
saveComputationOnWS(inputsManager.getProvenanceData(), outputmanager.getProvenanceData(), agent, generatedFiles);
if (statusInterrupt!=-1){
saveComputationOnWS(inputsManager.getProvenanceData(), outputmanager.getProvenanceData(), agent, generatedFiles);
}else{
AnalysisLogger.getLogger().debug("Computation interrupted - no update");
throw new Exception ("Computation cancelled");
}
AnalysisLogger.getLogger().debug("All done");
} catch (Exception e) {
AnalysisLogger.getLogger().debug("Error in Algorithm execution: " + algorithm);
AnalysisLogger.getLogger().debug(e);
e.printStackTrace();
if (inputsManager!=null)
updateComputationOnWS(-2,e.getMessage(),inputsManager.getProvenanceData(),generatedFiles);
else
updateComputationOnWS(-2,e.getMessage());
int exitstatus = -2;
if (statusInterrupt==-1)
exitstatus = -1;
throw e;
if (inputsManager!=null)
updateComputationOnWS(exitstatus,e.getMessage(),inputsManager.getProvenanceData(),generatedFiles);
else
updateComputationOnWS(exitstatus,e.getMessage());
if (statusInterrupt==-1)
throw new Exception ("Computation cancelled");
else
throw e;
} finally {
AnalysisLogger.getLogger().debug("Deleting Input Tables");
deleteTemporaryTables(generatedInputTables);
@ -452,6 +478,8 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
time("Cleaning of resources");
displayTimes();
cleanResources();
AnalysisLogger.getLogger().debug("All done - Computation Finished");
}
}
@ -460,7 +488,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
@Override
public void run() {
while (agent != null && agent.getStatus() < 100) {
while (agent != null && statusInterrupt!=-1 && agent.getStatus() < 100) {
try {
updateStatus(agent.getStatus());
Thread.sleep(10000);
@ -514,10 +542,66 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
}
private void cleanResources() {
times = null;
agent = null;
// manage open files and garbage
AnalysisLogger.getLogger().debug("Managing open files");
// String checkOpenFiles = "ls -l /proc/*/fd/* 2|grep \"wps/ecocfg\"";
try{
String checkOpenFiles = "for i in `ls -l /proc/*/fd/* 2>/dev/null | grep delete | grep tomcat | awk '{print $9}'`; do du -hL $i | awk '{print $1}' | tr '\n' ' '; ls -l $i | awk '{print $6\" \"$7\" \"$8\" \"$9\" \"$10\" \"$11\" \"$12}'; done";
List<String> openFiles =command(checkOpenFiles,"./");
AnalysisLogger.getLogger().debug("Open Files "+openFiles);
if (openFiles!=null){
for (String openFile:openFiles){
if (!openFile.contains("cannot access") && openFile.contains("(deleted)")){
String size = openFile.substring(0,openFile.indexOf(" ")).trim();
String pid = openFile.substring(openFile.indexOf("/proc/"),openFile.indexOf("->"));
pid = pid.trim();
if (!size.equals("0")){
AnalysisLogger.getLogger().debug("Killing "+pid + " with size "+size);
command(":>"+pid,"./");
}
}
}
}
}catch(Exception e){
AnalysisLogger.getLogger().debug("Could not kill files "+e.getLocalizedMessage());
}
System.gc();
}
public static List<String> command(final String cmdline,
final String directory) {
try {
Process process =
new ProcessBuilder(new String[] {"bash", "-c", cmdline})
.redirectErrorStream(true)
.directory(new File(directory))
.start();
List<String> output = new ArrayList<String>();
BufferedReader br = new BufferedReader(
new InputStreamReader(process.getInputStream()));
String line = null;
while ( (line = br.readLine()) != null )
output.add(line);
//There should really be a timeout here.
if (0 != process.waitFor())
return null;
return output;
} catch (Exception e) {
return null;
}
}
}

View File

@ -97,11 +97,12 @@ public class InputsManager {
if (input instanceof String) {
AnalysisLogger.getLogger().debug("Simple Input: "+ input);
// manage lists
String inputAlgoOrig = ((String) input).trim();
String inputAlgo = ((String) input).trim().replaceAll(inputsSeparator, AlgorithmConfiguration.listSeparator);
AnalysisLogger.getLogger().debug("Simple Input Transformed: " + inputAlgo);
config.setParam(inputName, inputAlgo);
saveInputData(inputName,inputName,inputAlgo);
saveInputData(inputName,inputName,inputAlgoOrig);
}
// case of Complex Input
else if (input instanceof GenericFileData) {
@ -110,9 +111,10 @@ public class InputsManager {
// retrieve payload
GenericFileData files = ((GenericFileData) input);
List<File> localfiles = getLocalFiles(files);
List<File> localfiles = getLocalFiles(files,inputName);
String inputtables = "";
int nfiles = localfiles.size();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < nfiles; i++) {
File tableFile = localfiles.get(i);
generatedFiles.add(tableFile);
@ -134,7 +136,15 @@ public class InputsManager {
inputtables += tableName;
saveInputData(tableFile.getName(), inputName, tableFile.getAbsolutePath());
if (i>0)
sb.append("|");
sb.append(tableFile.getName());
}
sb.append("|");
if (nfiles>0)
saveInputData(inputName, inputName, sb.toString());
// the only possible complex input is a table - check the WPS
// parsers
config.setParam(inputName, inputtables);
@ -192,7 +202,7 @@ public class InputsManager {
return filename;
}
public List<File> getLocalFiles(GenericFileData files) throws Exception {
public List<File> getLocalFiles(GenericFileData files,String inputName) throws Exception {
// download input
List<File> filesList = new ArrayList<File>();
@ -204,7 +214,7 @@ public class InputsManager {
AnalysisLogger.getLogger().debug("File link: " + fileLink.substring(0,Math.min(fileLink.length(),10)) + "...");
String fileName = "";
// case of a http link
if (fileLink.toLowerCase().startsWith("http:") || fileLink.toLowerCase().startsWith("https:")) {
if (fileLink!=null && fileLink.toLowerCase().startsWith("http:") || fileLink.toLowerCase().startsWith("https:")) {
// manage the case of multiple files
String[] remotefiles = fileLink.split(inputsSeparator);
for (String subfilelink : remotefiles) {
@ -218,23 +228,20 @@ public class InputsManager {
urlConnection = (HttpURLConnection) url.openConnection();
is = new BufferedInputStream(urlConnection.getInputStream());
// retrieve payload: for test purpose only
fileName = subfilelink.substring(subfilelink.lastIndexOf("/") + 1).trim();
if (fileName.contains("."))
fileName = fileName.substring(0, fileName.lastIndexOf(".")) + UUID.randomUUID() + fileName.substring(fileName.lastIndexOf("."));
else{
//take file name from http header
String fileNameTemp = inputNameFromHttpHeader(subfilelink);
if (fileNameTemp==null)
fileName = fileName + UUID.randomUUID();
else
fileName = fileNameTemp+ "_[" + computationId + "]."+FilenameUtils.getExtension(fileNameTemp);
}
String fileNameTemp = inputNameFromHttpHeader(subfilelink);
if (fileNameTemp==null)
fileName = inputName+"_[" + computationId + "]";
else
fileName = fileNameTemp+ "_[" + computationId + "]."+FilenameUtils.getExtension(fileNameTemp);
AnalysisLogger.getLogger().debug("Retrieving remote input in file: " + fileName);
AnalysisLogger.getLogger().debug("Creating local temp file: " + fileName);
File of = new File(config.getPersistencePath(), fileName);
FileOutputStream fos = new FileOutputStream(of);
IOUtils.copy(is, fos);
is.close();
fos.flush();
fos.close();
urlConnection.disconnect();
filesList.add(of);
@ -245,9 +252,13 @@ public class InputsManager {
fileName = f.getName();
AnalysisLogger.getLogger().debug("Retriving local input from file: " + fileName);
if (fileLink==null)
AnalysisLogger.getLogger().debug("The file is a binary file: " + f.getAbsolutePath());
//since this is a GenericFile we will suppose it is a csv file
if (isXML(fileLink))
{
else{
if (isXML(fileLink))
{
String xmlFile = f.getAbsolutePath();
String csvFile = xmlFile+".csv";
AnalysisLogger.getLogger().debug("Transforming XML file into a csv: " + csvFile);
@ -255,8 +266,17 @@ public class InputsManager {
AnalysisLogger.getLogger().debug("GML Parsed: " + readOneLine(csvFile)+"[..]");
f = new File(csvFile);
}
else
AnalysisLogger.getLogger().debug("The file is a csv: " + f.getAbsolutePath());
else{
AnalysisLogger.getLogger().debug("The file is a csv: " + f.getAbsolutePath());
}
String absFile = new File(f.getParent(),inputName+ "_[" + computationId + "].csv").getAbsolutePath();
AnalysisLogger.getLogger().debug("Renaming to: "+absFile);
System.gc();
boolean renamed = f.renameTo(new File(absFile));
if (renamed)
f = new File(absFile);
AnalysisLogger.getLogger().debug("The file has been renamed as : " + f.getAbsolutePath()+" - "+renamed);
}
filesList.add(f);
}

View File

@ -175,7 +175,7 @@ public class OutputsManager {
private String uploadFileOnStorage(String localfile, String mimetype) throws Exception {
AnalysisLogger.getLogger().debug("Start uploading on storage the following file: " + localfile);
AnalysisLogger.getLogger().debug("Storing->Start uploading on storage the following file: " + localfile);
File localFile = new File(localfile);
String remotef = "/wps_synch_output/" +config.getAgent()+"/"+computationsession+"/"+ localFile.getName();
storageclient.put(true).LFile(localfile).RFile(remotef);
@ -188,7 +188,7 @@ public class OutputsManager {
else
url = "http://data.d4science.org/uri-resolver/smp?smp-uri=" + url+ "&fileName=" + localFile.getName() + "&contentType=" + mimetype;
*/
AnalysisLogger.getLogger().info("Uploading finished - URL: " + url);
AnalysisLogger.getLogger().info("Storing->Uploading finished - URL: " + url);
return url;
}

View File

@ -51,10 +51,10 @@ public class DataspaceManager implements Runnable {
public static String start_date = "start_date";
public static String end_date = "end_date";
public static String status = "status";
public static String execution_platform = "execution_type";
public static String execution_platform = "execution_platform";
public static String error = "error";
public static String IO = "IO";
public static String operator = "operator";
public static String operator = "operator_name";
public static String payload = "payload";
public DataspaceManager(AlgorithmConfiguration config, ComputationData computation, List<StoredData> inputData, List<StoredData> outputData, List<File> generatedFiles) {
@ -111,7 +111,7 @@ public class DataspaceManager implements Runnable {
return uploadData(data, wsFolder, true);
}
public String uploadData(StoredData data, WorkspaceFolder wsFolder, boolean changename) throws Exception {
AnalysisLogger.getLogger().debug("Dataspace->Analysing " + data.name);
// String filenameonwsString = WorkspaceUtil.getUniqueName(data.name, wsFolder);
String filenameonwsString = data.name ;
if (changename){
@ -155,18 +155,26 @@ public class DataspaceManager implements Runnable {
throw new Exception("Impossible to open stream from "+data.payload);
// AnalysisLogger.getLogger().debug("Dataspace->final file name on ws " + data.name+" description "+data.description);
AnalysisLogger.getLogger().debug("Dataspace->saving the following file on the WS " + filenameonwsString + " [" + data.computationId + "]");
FolderItem fileItem = WorkspaceUtil.createExternalFile(wsFolder, filenameonwsString, data.description, null, in);
fileItem.getProperties().addProperty(computation_id, data.computationId);
fileItem.getProperties().addProperty(vre, data.vre);
fileItem.getProperties().addProperty(creation_date, data.creationDate);
fileItem.getProperties().addProperty(operator, data.operator);
fileItem.getProperties().addProperty(data_id, data.id);
fileItem.getProperties().addProperty(data_description, data.description);
fileItem.getProperties().addProperty(IO, data.provenance.name());
fileItem.getProperties().addProperty(data_type, data.type);
AnalysisLogger.getLogger().debug("Dataspace->WS OP saving the following file on the WS " + filenameonwsString);
LinkedHashMap<String, String> properties = new LinkedHashMap<String, String>();
properties.put(computation_id, data.computationId);
properties.put(vre, data.vre);
properties.put(creation_date, data.creationDate);
properties.put(operator, data.operator);
properties.put(data_id, data.id);
properties.put(data_description, data.description);
properties.put(IO, data.provenance.name());
properties.put(data_type, data.type);
properties.put(payload, url);
FolderItem fileItem = WorkspaceUtil.createExternalFile(wsFolder, filenameonwsString, data.description, in,properties,data.type);
//fileItem.getProperties().addProperties(properties);
AnalysisLogger.getLogger().debug("Dataspace->WS OP file saved on the WS " + filenameonwsString);
url = fileItem.getPublicLink(true);
fileItem.getProperties().addProperty(payload, url);
AnalysisLogger.getLogger().debug("Dataspace->WS OP url produced for the file " + url);
data.payload = url;
try {
in.close();
@ -174,9 +182,9 @@ public class DataspaceManager implements Runnable {
AnalysisLogger.getLogger().debug("Dataspace->Error creating file " + e.getMessage());
AnalysisLogger.getLogger().debug(e);
}
AnalysisLogger.getLogger().debug("Dataspace->File created " + data.name);
AnalysisLogger.getLogger().debug("Dataspace->File created " + filenameonwsString);
} else {
AnalysisLogger.getLogger().debug("Dataspace->Uploading string " + data.payload);
AnalysisLogger.getLogger().debug("Dataspace->String parameter " + data.payload);
url = data.payload;
}
} catch (Throwable e) {
@ -190,17 +198,20 @@ public class DataspaceManager implements Runnable {
}
public List<String> uploadInputData(List<StoredData> inputData, WorkspaceFolder dataminerFolder) throws Exception {
AnalysisLogger.getLogger().debug("Dataspace->uploading input data " + inputData.size());
AnalysisLogger.getLogger().debug("Dataspace->uploading input data; Number of data: " + inputData.size());
WorkspaceItem folderItem = dataminerFolder.find(importedDataFolder);
List<String> urls = new ArrayList<String>();
if (folderItem != null && folderItem.isFolder()) {
WorkspaceFolder destinationFolder = (WorkspaceFolder) folderItem;
for (StoredData input : inputData) {
WorkspaceItem item = destinationFolder.find(input.name);
WorkspaceItem item = null;
if (input.type.equals("text/csv")||input.type.equals("application/d4science")||input.type.equals("image/png"))
item = destinationFolder.find(input.name);
if (item==null){
AnalysisLogger.getLogger().debug("Dataspace->There is no item named "+input.name+" on the Workspace");
String url = uploadData(input, destinationFolder,false);
AnalysisLogger.getLogger().debug("Dataspace->returning generated URL "+url);
AnalysisLogger.getLogger().debug("Dataspace->returning property "+url);
urls.add(url);
}
else{
@ -218,7 +229,7 @@ public class DataspaceManager implements Runnable {
}
public List<String> uploadOutputData(List<StoredData> outputData, WorkspaceFolder dataminerFolder) throws Exception {
AnalysisLogger.getLogger().debug("Dataspace->uploading output data" + outputData.size());
AnalysisLogger.getLogger().debug("Dataspace->uploading output data; Number of data: " + outputData.size());
WorkspaceItem folderItem = dataminerFolder.find(computedDataFolder);
List<String> urls = new ArrayList<String>();
if (folderItem != null && folderItem.isFolder()) {
@ -250,45 +261,81 @@ public class DataspaceManager implements Runnable {
newcomputationFolder.createFolder(computedDataFolder, computedDataFolder);
// copy IO in those folders
AnalysisLogger.getLogger().debug("Dataspace->*****uploading inputs in IO folder*****");
List<String> inputurls = uploadInputData(inputData, newcomputationFolder);
AnalysisLogger.getLogger().debug("Dataspace->*****uploading outputs in IO folder*****");
List<String> outputurls = uploadOutputData(outputData, newcomputationFolder);
AnalysisLogger.getLogger().debug("Dataspace->creating gCube Item");
AnalysisLogger.getLogger().debug("Dataspace->*****adding properties to the folder*****");
AnalysisLogger.getLogger().debug("Dataspace->creating Folder Properties");
// write a computation item for the computation
LinkedHashMap<String, String> properties = new LinkedHashMap<String, String>();
properties.put(computation_id, computation.id);
newcomputationFolder.getProperties().addProperty(computation_id, computation.id);
properties.put(vre, computation.vre);
newcomputationFolder.getProperties().addProperty(vre, computation.vre);
properties.put(operator_name, config.getAgent());
newcomputationFolder.getProperties().addProperty(operator_name, config.getAgent());
properties.put(operator_id, computation.operatorId);
newcomputationFolder.getProperties().addProperty(operator_id, computation.operatorId);
properties.put(operator_description, computation.operatorDescription);
newcomputationFolder.getProperties().addProperty(operator_description, computation.operatorDescription);
properties.put(start_date, computation.startDate);
newcomputationFolder.getProperties().addProperty(start_date, computation.startDate);
properties.put(end_date, computation.endDate);
newcomputationFolder.getProperties().addProperty(end_date, computation.endDate);
properties.put(status, computation.status);
newcomputationFolder.getProperties().addProperty(status, computation.status);
properties.put(status, getStatus(computation.status));
properties.put(execution_platform, computation.infrastructure);
newcomputationFolder.getProperties().addProperty(execution_platform, computation.infrastructure);
int ninput = inputurls.size();
int noutput = outputurls.size();
AnalysisLogger.getLogger().debug("Dataspace->Adding input properties for " + ninput + " inputs");
for (int i = 1; i <= ninput; i++) {
properties.put("input" + i + "_" + inputData.get(i - 1).name, inputurls.get(i - 1));
newcomputationFolder.getProperties().addProperty("input" + i + "_" + inputData.get(i - 1).name, inputurls.get(i - 1));
StoredData input = inputData.get(i - 1);
if (input.payload.contains("|")){
String payload = input .payload;
AnalysisLogger.getLogger().debug("Dataspace->Managing complex input "+input.name+" : "+payload);
//delete the names that are not useful
for (StoredData subinput:inputData){
if (input.description.equals(subinput.description)){
payload = payload.replace(subinput.name,subinput.payload);
subinput.name=null;
}
}
input.name = null;
//delete last pipe character
if (payload.endsWith("|"))
payload = payload.substring(0,payload.length()-1);
AnalysisLogger.getLogger().debug("Dataspace->Complex input after processing "+payload);
properties.put("input" + i + "_" + input.description, payload);
input.payload=payload;
}
}
for (int i = 1; i <= ninput; i++) {
StoredData input = inputData.get(i - 1);
if (input.name!=null){
properties.put("input" + i + "_" + input.name, inputurls.get(i - 1));
}
}
AnalysisLogger.getLogger().debug("Dataspace->Adding output properties for " + noutput + " outputs");
for (int i = 1; i <= noutput; i++) {
properties.put("output" + i + "_" + outputData.get(i - 1).name, outputurls.get(i - 1));
newcomputationFolder.getProperties().addProperty("output" + i + "_" + outputData.get(i - 1).name, outputurls.get(i - 1));
}
AnalysisLogger.getLogger().debug("Dataspace->Properties of the folder: " + properties);
AnalysisLogger.getLogger().debug("Dataspace->Saving properties to ProvO XML file " + noutput + " outputs");
/*
@ -300,7 +347,7 @@ public class DataspaceManager implements Runnable {
File xmltosave = new File(config.getPersistencePath(), "prov_o_" + UUID.randomUUID());
FileTools.saveString(xmltosave.getAbsolutePath(), xmlproperties, true, "UTF-8");
InputStream sis = new FileInputStream(xmltosave);
WorkspaceUtil.createExternalFile(newcomputationFolder, computation.id + ".xml", computation.operatorDescription, null, sis);
WorkspaceUtil.createExternalFile(newcomputationFolder, computation.id + ".xml", computation.operatorDescription, "text/xml", sis);
sis.close();
xmltosave.delete();
} catch (Exception e) {
@ -308,15 +355,25 @@ public class DataspaceManager implements Runnable {
AnalysisLogger.getLogger().debug(e);
e.printStackTrace();
}
List<String> scopes = new ArrayList<String>();
scopes.add(config.getGcubeScope());
ws.createGcubeItem(computation.id, computation.operatorDescription, scopes, computation.user, itemType, properties, newcomputationFolder.getId());
//List<String> scopes = new ArrayList<String>();
//scopes.add(config.getGcubeScope());
//ws.createGcubeItem(computation.id, computation.operatorDescription, scopes, computation.user, itemType, properties, newcomputationFolder.getId());
newcomputationFolder.getProperties().addProperties(properties);
}
AnalysisLogger.getLogger().debug("Dataspace->finished uploading computation data");
}
public String buildCompositePayload(List<StoredData> inputData,String payload, String inputName){
for (StoredData input:inputData){
if (inputName.equals(input.description)){
payload = payload.replace(input.name,input.payload);
}
}
return payload;
}
public void writeProvenance(ComputationData computation, List<StoredData> inputData, List<StoredData> outputData) throws Exception {
AnalysisLogger.getLogger().debug("Dataspace->connecting to Workspace");
HomeManagerFactory factory = HomeLibrary.getHomeManagerFactory();
@ -330,11 +387,11 @@ public class DataspaceManager implements Runnable {
AnalysisLogger.getLogger().debug("Dataspace->create folders network");
createFoldersNetwork(ws, root);
WorkspaceFolder dataminerItem = (WorkspaceFolder) root.find(dataminerFolder);
AnalysisLogger.getLogger().debug("Dataspace->uploading input files");
AnalysisLogger.getLogger().debug("Dataspace->****uploading input files****");
uploadInputData(inputData, dataminerItem);
AnalysisLogger.getLogger().debug("Dataspace->uploading output files");
AnalysisLogger.getLogger().debug("Dataspace->****uploading output files****");
uploadOutputData(outputData, dataminerItem);
AnalysisLogger.getLogger().debug("Dataspace->uploading computation files");
AnalysisLogger.getLogger().debug("Dataspace->****uploading computation files****");
uploadComputationData(computation, inputData, outputData, dataminerItem, ws);
AnalysisLogger.getLogger().debug("Dataspace->provenance management finished");
AnalysisLogger.getLogger().debug("Dataspace->deleting generated files");
@ -374,7 +431,7 @@ public class DataspaceManager implements Runnable {
properties.put(operator_id, computation.operatorId);
properties.put(start_date, computation.startDate);
properties.put(end_date, computation.endDate);
properties.put(status, computation.status);
properties.put(status, getStatus(computation.status));
properties.put(execution_platform, computation.infrastructure);
if (computation.exception != null && computation.exception.length() > 0)
properties.put(error, computation.exception);
@ -386,6 +443,24 @@ public class DataspaceManager implements Runnable {
AnalysisLogger.getLogger().debug("Dataspace->finished uploading computation data");
}
public String getStatus(String status){
double statusD = 0;
try{
statusD = Double.parseDouble(status);
}catch(Exception e){
return status;
}
if (statusD==100)
return "completed";
else if (statusD==-2)
return "error";
else if (statusD==-1)
return "cancelled";
else
return status;
}
public void deleteRunningComputationData() throws Exception {
AnalysisLogger.getLogger().debug("Dataspace->deleting computation item");
AnalysisLogger.getLogger().debug("Dataspace->connecting to Workspace");

View File

@ -23,14 +23,14 @@ public class MultiThreadingCalls {
// final URL urlToCall = new URL("http://"+host+":8080/wps/WebProcessingService?request=Execute&service=WPS&version=1.0.0&lang=en-US&Identifier=org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.modellers.FEED_FORWARD_ANN&DataInputs=" +
// URLEncoder.encode("scope=/gcube/devsec;user.name=test.user;LayersNeurons=10|10;LearningThreshold=0.01;MaxIterations=100;ModelName=wps_ann;Reference=1;TargetColumn=depthmean;TrainingColumns=depthmin|depthmax;TrainingDataSet=http://goo.gl/juNsCK@MimeType=text/csv","UTF-8"));
/*
final URL urlToCall = new URL("http://"+host+"/wps/WebProcessingService?request=Execute&service=WPS&Version=1.0.0&gcube-token=4ccc2c35-60c9-4c9b-9800-616538d5d48b&lang=en-US&Identifier=org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.clusterers.XMEANS&DataInputs=" +
URLEncoder.encode("OccurrencePointsClusterLabel=OccClustersTest;min_points=1;maxIterations=100;minClusters=1;maxClusters=3;OccurrencePointsTable=http://goo.gl/VDzpch;FeaturesColumnNames=depthmean|sstmnmax|salinitymean;","UTF-8"));
*/
final URL urlToCall = new URL("http://"+host+"/wps/WebProcessingService?Request=GetCapabilities&Service=WPS&gcube-token=4ccc2c35-60c9-4c9b-9800-616538d5d48b");
int nthreads = 200;
// final URL urlToCall = new URL("http://"+host+"/wps/WebProcessingService?Request=GetCapabilities&Service=WPS&gcube-token=4ccc2c35-60c9-4c9b-9800-616538d5d48b");
int nthreads = 50;
for (int i = 0; i < nthreads; i++) {
final int index = i+1;
Thread t = new Thread(new Runnable() {

View File

@ -48,16 +48,20 @@ public class CancelComputation extends HttpServlet {
// id of result to retrieve.
String id = request.getParameter("id");
AnalysisLogger.getLogger().debug("CANCEL COMPUTATION -> RETRIEVING ID " + id);
if (StringUtils.isEmpty(id)) {
errorResponse("id parameter missing", response);
} else {
AnalysisLogger.getLogger().debug("CANCEL COMPUTATION -> ID RETRIEVED" + id);
AnalysisLogger.getLogger().debug("CANCEL COMPUTATION -> ID RETRIEVED " + id);
if (!isIDValid(id)) {
errorResponse("id parameter not valid", response);
}
AnalysisLogger.getLogger().debug("CANCEL COMPUTATION -> ID IS VALID " + id);
IDatabase db = DatabaseFactory.getDatabase();
long len = db.getContentLengthForStoreResponse(id);
AnalysisLogger.getLogger().debug("CANCEL COMPUTATION -> INITIAL ID RESPONSE LENGTH " + len);
try {
AnalysisLogger.getLogger().debug("CANCEL COMPUTATION -> DELETING ID " + id);
@ -71,7 +75,7 @@ public class CancelComputation extends HttpServlet {
AnalysisLogger.getLogger().debug(e);
}
AnalysisLogger.getLogger().debug("CANCEL COMPUTATION -> ID DELETED " + id);
long len = db.getContentLengthForStoreResponse(id);
len = db.getContentLengthForStoreResponse(id);
AnalysisLogger.getLogger().debug("CANCEL COMPUTATION -> ID RESPONSE LENGTH " + len);
} catch (Exception e) {
e.printStackTrace();