enhanced provenance management

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/DataMiner@128521 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Gianpaolo Coro 2016-05-09 09:33:39 +00:00
parent 5eb7021818
commit 1e336b250e
10 changed files with 180 additions and 76 deletions

View File

@ -38,6 +38,9 @@ public class GetCapabilitiesChecker {
String scannedPath = packageName.replace(".", "/");
URL scannedUrl = Thread.currentThread().getContextClassLoader().getResource(scannedPath);
String jarPath = scannedUrl.getFile();
AnalysisLogger.getLogger().debug("Jar Path complete: " + jarPath);
jarPath = jarPath.substring(jarPath.indexOf("file:/") + 6, jarPath.lastIndexOf("!"));
if (jarPath.startsWith("home"))
@ -46,7 +49,11 @@ public class GetCapabilitiesChecker {
JarFile jarFile = null;
List<Class<?>> result = new ArrayList<Class<?>>();
File otherjar = new File(new File(jarPath).getParent(),"dataminer-algorithms.jar");
if (otherjar .exists())
jarPath =otherjar.getAbsolutePath();
AnalysisLogger.getLogger().debug("Alternative Jar Path: " + jarPath);
try {
jarFile = new JarFile(jarPath);
Enumeration<JarEntry> en = jarFile.entries();

View File

@ -234,9 +234,9 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
for (File file : generatedFiles) {
if (file.exists()) {
AnalysisLogger.getLogger().debug("Deleting File " + file.getAbsolutePath());
AnalysisLogger.getLogger().debug("Deleting File Check " + file.delete());
try{AnalysisLogger.getLogger().debug("Deleting File Check " + file.delete());}catch(Exception e){}
} else
AnalysisLogger.getLogger().debug("File does not exist " + file.getAbsolutePath());
AnalysisLogger.getLogger().debug("Deleting File - File does not exist " + file.getAbsolutePath());
}
}
}
@ -256,7 +256,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
}
long statusInterrupt = 0;
float previousStatus = 0;
float previousStatus = -1;
public void updateStatus(float status) {
if (agent != null) {
long stream = org.n52.wps.server.database.DatabaseFactory.getDatabase().getContentLengthForStoreResponse(wpsExternalID);
@ -366,8 +366,9 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
// merging wps with ecological engine parameters - modifies the
// config
AnalysisLogger.getLogger().info("6 - Translating WPS Inputs into Ecological Engine Inputs");
AnalysisLogger.getLogger().debug("Operator class is "+this.getClass().getCanonicalName());
// build computation Data
currentComputation = new ComputationData(config.getTaskID(), config.getAgent(), "", "", startTime, "-", "0", config.getTaskID(), configManager.getUsername());
currentComputation = new ComputationData(config.getTaskID(), config.getAgent(), "", "", startTime, "-", "0", config.getTaskID(), configManager.getUsername(),config.getGcubeScope(),this.getClass().getCanonicalName());
inputsManager.mergeWpsAndEcologicalInputs(supportDatabaseInfo);
generatedInputTables = inputsManager.getGeneratedTables();
generatedFiles = inputsManager.getGeneratedInputFiles();
@ -474,8 +475,9 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
}
private void saveComputationOnWS(List<StoredData> inputData, List<StoredData> outputData, ComputationalAgent agent, List<File> generatedFiles) {
AnalysisLogger.getLogger().debug("Provenance manager started");
ComputationData computation = new ComputationData(config.getTaskID(), config.getAgent(), agent.getDescription(), agent.getInfrastructure().name(), startTime, endTime, "100", config.getTaskID(),config.getParam(ConfigurationManager.serviceUserNameParameterVariable));
AnalysisLogger.getLogger().debug("Provenance manager started for operator "+this.getClass().getCanonicalName());
ComputationData computation = new ComputationData(config.getTaskID(), config.getAgent(), agent.getDescription(), agent.getInfrastructure().name(), startTime, endTime, "100", config.getTaskID(),config.getParam(ConfigurationManager.serviceUserNameParameterVariable),config.getGcubeScope(),this.getClass().getCanonicalName());
// post on WS
DataspaceManager manager = new DataspaceManager(config, computation, inputData, outputData, generatedFiles);
Thread t = new Thread(manager);

View File

@ -45,6 +45,7 @@ import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.dataspa
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.dataspace.StoredType;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.utils.GML2CSV;
import org.hibernate.SessionFactory;
import org.n52.wps.algorithm.annotation.LiteralDataInput;
import org.n52.wps.io.data.GenericFileData;
public class InputsManager {
@ -118,6 +119,7 @@ public class InputsManager {
AnalysisLogger.getLogger().debug("Complex Input: " + input);
// retrieve payload
GenericFileData files = ((GenericFileData) input);
List<File> localfiles = getLocalFiles(files);
String inputtables = "";
int nfiles = localfiles.size();
@ -414,7 +416,7 @@ public class InputsManager {
type = StoredType.DATA;
}
StoredData data = new StoredData(name, description, id, provenance, creationDate, operator, computationId, type,payload);
StoredData data = new StoredData(name, description, id, provenance, creationDate, operator, computationId, type,payload,config.getGcubeScope());
provenanceData.add(data);
}

View File

@ -158,7 +158,7 @@ public class OutputsManager {
}
String payload = info.getContent();
StoredData data = new StoredData(name, info.getAbstractStr(),id, provenance, creationDate, operator, computationId, type,payload);
StoredData data = new StoredData(name, info.getAbstractStr(),id, provenance, creationDate, operator, computationId, type,payload,config.getGcubeScope());
provenanceData.add(data);
}

View File

@ -315,9 +315,11 @@ public class StatisticalTypeToWPSType {
return null;
} else if (stype instanceof TablesList) {
outputType = "csvFile";
// outputType = "string";
String template = ((TablesList) stype).getTemplates().get(0).name();
abstractStr += " [a sequence of http links separated by | , each indicating a table in UTF-8 encoding following this template: " + mapper.linksMap.get(template) + "]";
mimeType = "text/csv";
// mimeType = "text/plain";
}
outputType += isinput ? "Input" : "Output";

View File

@ -4,7 +4,7 @@ package org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.datasp
public class ComputationData {
public ComputationData(String name, String operator, String operatorDescription, String infrastructure, String startDate, String endDate, String status, String id, String user) {
public ComputationData(String name, String operator, String operatorDescription, String infrastructure, String startDate, String endDate, String status, String id, String user, String vre, String operatorId) {
super();
this.name = name;
this.operator = operator;
@ -15,6 +15,20 @@ public class ComputationData {
this.status = status;
this.id = id;
this.user=user;
this.vre = vre;
this.operatorId = operatorId;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getOperatorId() {
return operatorId;
}
public void setOperatorId(String operatorId) {
this.operatorId = operatorId;
}
public String getName() {
return name;
@ -64,6 +78,12 @@ public class ComputationData {
public void setId(String id) {
this.id = id;
}
public void setVre(String vre) {
this.vre = vre;
}
public String getVre() {
return vre;
}
public String getException() {
return exception;
}
@ -80,6 +100,8 @@ public class ComputationData {
public String status;
public String id;
public String user;
public String vre;
public String operatorId;
}

View File

@ -42,6 +42,8 @@ public class DataspaceManager implements Runnable {
public static String data_id = "data_id";
public static String data_type = "data_type";
public static String operator_name = "operator_name";
public static String operator_id = "operator_id";
public static String vre = "VRE";
public static String operator_description = "operator_description";
public static String data_description = "data_description";
@ -105,45 +107,79 @@ public class DataspaceManager implements Runnable {
dataminerFolderWS.createFolder(computationsFolder, "A folder collecting DataMiner computations information");
}
}
public String uploadData(StoredData data, WorkspaceFolder wsFolder) throws Exception {
return uploadData(data, wsFolder, true);
}
public String uploadData(StoredData data, WorkspaceFolder wsFolder, boolean changename) throws Exception {
//String filenameonwsString = WorkspaceUtil.getUniqueName(data.name, wsFolder);
String filenameonwsString = data.name+"_["+data.computationId+"]";//("_"+UUID.randomUUID()).replace("-", "");
// String filenameonwsString = WorkspaceUtil.getUniqueName(data.name, wsFolder);
String filenameonwsString = data.name ;
if (changename)
filenameonwsString = data.name + "_[" + data.computationId + "]";// ("_"+UUID.randomUUID()).replace("-", "");
InputStream in = null;
String url = "";
if (data.type == StoredType.DATA) {
if (new File(data.payload).exists()) {
AnalysisLogger.getLogger().debug("Dataspace->Uploading file " + data.payload);
in = new FileInputStream(new File(data.payload));
} else {
AnalysisLogger.getLogger().debug("Dataspace->Uploading via URL " + data.payload);
URL urlc = new URL(data.payload);
HttpURLConnection urlConnection = (HttpURLConnection) urlc.openConnection();
in = new BufferedInputStream(urlConnection.getInputStream());
}
try {
if (data.type == StoredType.DATA) {
// AnalysisLogger.getLogger().debug("Dataspace->final file name on ws " + data.name+" description "+data.description);
AnalysisLogger.getLogger().debug("Dataspace->saving the following file on the WS " +filenameonwsString+" ["+data.computationId+"]");
FolderItem fileItem = WorkspaceUtil.createExternalFile(wsFolder, filenameonwsString, data.description, null, in);
fileItem.getProperties().addProperty(computation_id, data.computationId);
fileItem.getProperties().addProperty(creation_date, data.creationDate);
fileItem.getProperties().addProperty(operator, data.operator);
fileItem.getProperties().addProperty(data_id, data.id);
fileItem.getProperties().addProperty(data_description, data.description);
fileItem.getProperties().addProperty(IO, data.provenance.name());
fileItem.getProperties().addProperty(data_type, data.type.name());
url = fileItem.getPublicLink(true);
fileItem.getProperties().addProperty(payload, url);
data.payload=url;
try {
in.close();
} catch (Exception e) {
if (new File(data.payload).exists()) {
AnalysisLogger.getLogger().debug("Dataspace->Uploading file " + data.payload);
in = new FileInputStream(new File(data.payload));
} else {
AnalysisLogger.getLogger().debug("Dataspace->Uploading via URL " + data.payload);
int tries = 10;
for (int i=0;i<tries;i++){
try {
URL urlc = new URL(data.payload);
HttpURLConnection urlConnection = (HttpURLConnection) urlc.openConnection();
urlConnection.setConnectTimeout(10000);
urlConnection.setReadTimeout(10000);
in = new BufferedInputStream(urlConnection.getInputStream());
}catch(Exception ee){
AnalysisLogger.getLogger().debug(ee);
AnalysisLogger.getLogger().debug("Dataspace->Retrying connection to "+data.payload+" number "+(i+1));
in =null;
}
if (in!=null)
break;
else
Thread.sleep(10000);
}
}
if (in==null)
throw new Exception("Impossible to open stream from "+data.payload);
// AnalysisLogger.getLogger().debug("Dataspace->final file name on ws " + data.name+" description "+data.description);
AnalysisLogger.getLogger().debug("Dataspace->saving the following file on the WS " + filenameonwsString + " [" + data.computationId + "]");
FolderItem fileItem = WorkspaceUtil.createExternalFile(wsFolder, filenameonwsString, data.description, null, in);
fileItem.getProperties().addProperty(computation_id, data.computationId);
fileItem.getProperties().addProperty(vre, data.vre);
fileItem.getProperties().addProperty(creation_date, data.creationDate);
fileItem.getProperties().addProperty(operator, data.operator);
fileItem.getProperties().addProperty(data_id, data.id);
fileItem.getProperties().addProperty(data_description, data.description);
fileItem.getProperties().addProperty(IO, data.provenance.name());
fileItem.getProperties().addProperty(data_type, data.type.name());
url = fileItem.getPublicLink(true);
fileItem.getProperties().addProperty(payload, url);
data.payload = url;
try {
in.close();
} catch (Exception e) {
AnalysisLogger.getLogger().debug("Dataspace->Error creating file " + e.getMessage());
AnalysisLogger.getLogger().debug(e);
}
AnalysisLogger.getLogger().debug("Dataspace->File created " + data.name);
} else {
AnalysisLogger.getLogger().debug("Dataspace->Uploading string " + data.payload);
url = data.payload;
}
AnalysisLogger.getLogger().debug("Dataspace->File created " + data.name);
} else {
AnalysisLogger.getLogger().debug("Dataspace->Uploading string " + data.payload);
url = data.payload;
} catch (Throwable e) {
e.printStackTrace();
AnalysisLogger.getLogger().debug("Dataspace->Could not retrieve input payload " + data.payload+" - "+e.getLocalizedMessage());
AnalysisLogger.getLogger().debug(e);
url = "payload was not made available for this dataset";
data.payload = url;
}
return url;
}
@ -155,8 +191,19 @@ public class DataspaceManager implements Runnable {
if (folderItem != null && folderItem.isFolder()) {
WorkspaceFolder destinationFolder = (WorkspaceFolder) folderItem;
for (StoredData input : inputData) {
String url = uploadData(input, destinationFolder);
urls.add(url);
WorkspaceItem item = destinationFolder.find(input.name);
if (item==null){
AnalysisLogger.getLogger().debug("Dataspace->There is no item named "+input.name+" on the Workspace");
String url = uploadData(input, destinationFolder,false);
AnalysisLogger.getLogger().debug("Dataspace->returning generated URL "+url);
urls.add(url);
}
else{
AnalysisLogger.getLogger().debug("Dataspace->Input item "+input.name+" is already available in the input folder");
String url = item.getPublicLink(false);
AnalysisLogger.getLogger().debug("Dataspace->returning WS url "+url);
urls.add(url);
}
}
} else
AnalysisLogger.getLogger().debug("Dataspace->folder is not valid");
@ -193,7 +240,7 @@ public class DataspaceManager implements Runnable {
String itemType = "COMPUTATION";
// create IO folders
AnalysisLogger.getLogger().debug("Dataspace->creating IO folders under "+cfoldername);
AnalysisLogger.getLogger().debug("Dataspace->creating IO folders under " + cfoldername);
newcomputationFolder.createFolder(importedDataFolder, importedDataFolder);
newcomputationFolder.createFolder(computedDataFolder, computedDataFolder);
@ -207,8 +254,12 @@ public class DataspaceManager implements Runnable {
LinkedHashMap<String, String> properties = new LinkedHashMap<String, String>();
properties.put(computation_id, computation.id);
newcomputationFolder.getProperties().addProperty(computation_id, computation.id);
properties.put(vre, computation.vre);
newcomputationFolder.getProperties().addProperty(vre, computation.vre);
properties.put(operator_name, config.getAgent());
newcomputationFolder.getProperties().addProperty(operator_name, config.getAgent());
properties.put(operator_id, computation.operatorId);
newcomputationFolder.getProperties().addProperty(operator_id, computation.operatorId);
properties.put(operator_description, computation.operatorDescription);
newcomputationFolder.getProperties().addProperty(operator_description, computation.operatorDescription);
properties.put(start_date, computation.startDate);
@ -221,39 +272,41 @@ public class DataspaceManager implements Runnable {
newcomputationFolder.getProperties().addProperty(execution_platform, computation.infrastructure);
int ninput = inputurls.size();
int noutput = outputurls.size();
AnalysisLogger.getLogger().debug("Dataspace->Adding input properties for "+ninput+" inputs");
AnalysisLogger.getLogger().debug("Dataspace->Adding input properties for " + ninput + " inputs");
for (int i = 1; i <= ninput; i++) {
properties.put("input"+i+"_"+inputData.get(i-1).name, inputurls.get(i-1));
newcomputationFolder.getProperties().addProperty("input"+i+"_"+inputData.get(i-1).name, inputurls.get(i-1));
properties.put("input" + i + "_" + inputData.get(i - 1).name, inputurls.get(i - 1));
newcomputationFolder.getProperties().addProperty("input" + i + "_" + inputData.get(i - 1).name, inputurls.get(i - 1));
}
AnalysisLogger.getLogger().debug("Dataspace->Adding output properties for "+noutput+" outputs");
AnalysisLogger.getLogger().debug("Dataspace->Adding output properties for " + noutput + " outputs");
for (int i = 1; i <= noutput; i++) {
properties.put("output"+i+"_"+outputData.get(i-1).name, outputurls.get(i-1));
newcomputationFolder.getProperties().addProperty("output"+i+"_"+outputData.get(i-1).name, outputurls.get(i-1));
properties.put("output" + i + "_" + outputData.get(i - 1).name, outputurls.get(i - 1));
newcomputationFolder.getProperties().addProperty("output" + i + "_" + outputData.get(i - 1).name, outputurls.get(i - 1));
}
AnalysisLogger.getLogger().debug("Dataspace->Saving properties to ProvO XML file "+noutput+" outputs");
/*XStream xstream = new XStream();
String xmlproperties = xstream.toXML(properties);
*/
try{String xmlproperties = ProvOGenerator.toProvO(computation, inputData, outputData);
File xmltosave = new File(config.getPersistencePath(),"properties_"+UUID.randomUUID());
FileTools.saveString(xmltosave.getAbsolutePath(), xmlproperties, true, "UTF-8");
InputStream sis = new FileInputStream(xmltosave);
WorkspaceUtil.createExternalFile(newcomputationFolder, computation.id+".xml", computation.operatorDescription, null, sis);
sis.close();
}catch(Exception e){
AnalysisLogger.getLogger().debug("Dataspace->Failed creating ProvO XML file "+e.getLocalizedMessage());
AnalysisLogger.getLogger().debug("Dataspace->Saving properties to ProvO XML file " + noutput + " outputs");
/*
* XStream xstream = new XStream(); String xmlproperties = xstream.toXML(properties);
*/
try {
String xmlproperties = ProvOGenerator.toProvO(computation, inputData, outputData);
File xmltosave = new File(config.getPersistencePath(), "prov_o_" + UUID.randomUUID());
FileTools.saveString(xmltosave.getAbsolutePath(), xmlproperties, true, "UTF-8");
InputStream sis = new FileInputStream(xmltosave);
WorkspaceUtil.createExternalFile(newcomputationFolder, computation.id + ".xml", computation.operatorDescription, null, sis);
sis.close();
xmltosave.delete();
} catch (Exception e) {
AnalysisLogger.getLogger().debug("Dataspace->Failed creating ProvO XML file " + e.getLocalizedMessage());
AnalysisLogger.getLogger().debug(e);
e.printStackTrace();
}
List<String> scopes = new ArrayList<String>();
scopes.add(config.getGcubeScope());
ws.createGcubeItem(computation.id, computation.operatorDescription, scopes, computation.user, itemType, properties, newcomputationFolder.getId());
}
AnalysisLogger.getLogger().debug("Dataspace->finished uploading computation data");
@ -273,7 +326,7 @@ public class DataspaceManager implements Runnable {
createFoldersNetwork(ws, root);
WorkspaceFolder dataminerItem = (WorkspaceFolder) root.find(dataminerFolder);
AnalysisLogger.getLogger().debug("Dataspace->uploading input files");
// uploadInputData(inputData, dataminerItem);
uploadInputData(inputData, dataminerItem);
AnalysisLogger.getLogger().debug("Dataspace->uploading output files");
uploadOutputData(outputData, dataminerItem);
AnalysisLogger.getLogger().debug("Dataspace->uploading computation files");
@ -310,8 +363,10 @@ public class DataspaceManager implements Runnable {
// write a computation item for the computation
LinkedHashMap<String, String> properties = new LinkedHashMap<String, String>();
properties.put(computation_id, computation.id);
properties.put(vre, computation.vre);
properties.put(operator_name, config.getAgent());
properties.put(operator_description, computation.operatorDescription);
properties.put(operator_id, computation.operatorId);
properties.put(start_date, computation.startDate);
properties.put(end_date, computation.endDate);
properties.put(status, computation.status);

View File

@ -59,7 +59,7 @@ public class ProvOGenerator {
String status = "100";
String id = name;
String user = "gianpaolo.coro";
ComputationData computation = new ComputationData(name, operator, operatorDescription, infrastructure, startDate, endDate, status, name,user);
ComputationData computation = new ComputationData(name, operator, operatorDescription, infrastructure, startDate, endDate, status, name,user,"devsec","test");
/*
V public static String operator_description="operator_description";
Vpublic static String data_description="data_description";
@ -74,9 +74,9 @@ public class ProvOGenerator {
*/
List<StoredData> inputData = new ArrayList<StoredData>();
List<StoredData> outputData = new ArrayList<StoredData>();
StoredData in = new StoredData("inputT1","descrT1", "inputT1", DataProvenance.IMPORTED, "15/03/2016 11:32:22", operator, id, StoredType.STRING, "hello");
StoredData in = new StoredData("inputT1","descrT1", "inputT1", DataProvenance.IMPORTED, "15/03/2016 11:32:22", operator, id, StoredType.STRING, "hello","devsec");
inputData.add(in);
StoredData out = new StoredData("outputT1","descrT1", "outputT1", DataProvenance.IMPORTED, "16/03/2016 11:32:22", operator, id, StoredType.STRING, "hellooutput");
StoredData out = new StoredData("outputT1","descrT1", "outputT1", DataProvenance.IMPORTED, "16/03/2016 11:32:22", operator, id, StoredType.STRING, "hellooutput","devsec");
outputData.add(out);
//System.out.println(dataToEntity(in));
System.out.println(toProvO(computation, inputData, outputData));
@ -127,11 +127,21 @@ public class ProvOGenerator {
return entity(DataspaceManager.operator, operator);
}
public static String getOperatorID(String operatorId){
return entity(DataspaceManager.operator_id, operatorId);
}
public static String getVREEntity(String vre){
return entity(DataspaceManager.vre, vre);
}
public static String computationToAction(ComputationData computation,String subEntities){
String status = getStatusEntity(computation.status);
String description = getOperatorDescriptionEntity(computation.operatorDescription);
String operator = getOperatorEntity(computation.operator);
String subents =operator+description+status+subEntities;
String operatorId = getOperatorID(computation.operatorId);
String vre = getVREEntity(computation.vre);
String subents =operator+operatorId+description+vre+status+subEntities;
String activity = completeActivity(computation.id,computation.startDate,computation.endDate,computation.user,subents);
return activity;

View File

@ -2,7 +2,7 @@ package org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.datasp
public class StoredData {
public StoredData(String name, String description, String id, DataProvenance provenance, String creationDate, String operator, String computationId, StoredType type, String payload) {
public StoredData(String name, String description, String id, DataProvenance provenance, String creationDate, String operator, String computationId, StoredType type, String payload, String vre) {
super();
this.name = name;
this.id = id;
@ -12,6 +12,7 @@ public class StoredData {
this.operator = operator;
this.computationId = computationId;
this.type = type;
this.vre = vre;
this.payload=payload;
}
String name;
@ -21,6 +22,7 @@ public class StoredData {
String creationDate;
String operator;
String computationId;
String vre;
StoredType type;
String payload;
}

View File

@ -11,6 +11,7 @@ import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.capabilities.GetCapabilitiesChecker;
@ -78,6 +79,7 @@ public class RegressionTests {
if (!check)
break;
System.out.println("EXECUTING "+algorithmName+" : "+executionURL);
executionURL = executionURL.replace("test12345.nc","test"+UUID.randomUUID()+".nc");
pageCheck = GetCapabilitiesChecker.readPageNoHttpClient(new URL(executionURL));
System.out.println("EXECUTION RESULT "+pageCheck);
check = checkHttpPage(executionURL, pageCheck);