git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/DataMiner@173499 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
30749c37e3
commit
8e7edbb075
16
pom.xml
16
pom.xml
|
@ -124,7 +124,7 @@
|
||||||
<artifactId>javassist</artifactId>
|
<artifactId>javassist</artifactId>
|
||||||
<version>3.12.1.GA</version>
|
<version>3.12.1.GA</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<!-- <dependency>
|
||||||
<groupId>org.gcube.common</groupId>
|
<groupId>org.gcube.common</groupId>
|
||||||
<artifactId>home-library-jcr</artifactId>
|
<artifactId>home-library-jcr</artifactId>
|
||||||
<version>[2.0.0-SNAPSHOT,3.0.0-SNAPSHOT)</version>
|
<version>[2.0.0-SNAPSHOT,3.0.0-SNAPSHOT)</version>
|
||||||
|
@ -134,8 +134,18 @@
|
||||||
<groupId>org.gcube.common</groupId>
|
<groupId>org.gcube.common</groupId>
|
||||||
<artifactId>home-library</artifactId>
|
<artifactId>home-library</artifactId>
|
||||||
<version>[2.0.0-SNAPSHOT,3.0.0-SNAPSHOT)</version>
|
<version>[2.0.0-SNAPSHOT,3.0.0-SNAPSHOT)</version>
|
||||||
</dependency>
|
</dependency> -->
|
||||||
|
<dependency >
|
||||||
|
<groupId>org.gcube.common</groupId>
|
||||||
|
<artifactId>storagehub-client-library</artifactId>
|
||||||
|
<version>[1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency >
|
||||||
|
<groupId>org.gcube.common</groupId>
|
||||||
|
<artifactId>storagehub-model</artifactId>
|
||||||
|
<version>[1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>xerces</groupId>
|
<groupId>xerces</groupId>
|
||||||
<artifactId>xercesImpl</artifactId>
|
<artifactId>xercesImpl</artifactId>
|
||||||
|
|
|
@ -13,16 +13,14 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.gcube.common.homelibrary.home.Home;
|
import org.gcube.common.storagehub.client.dsl.FileContainer;
|
||||||
import org.gcube.common.homelibrary.home.HomeLibrary;
|
import org.gcube.common.storagehub.client.dsl.FolderContainer;
|
||||||
import org.gcube.common.homelibrary.home.HomeManager;
|
import org.gcube.common.storagehub.client.dsl.ItemContainer;
|
||||||
import org.gcube.common.homelibrary.home.HomeManagerFactory;
|
import org.gcube.common.storagehub.client.dsl.StorageHubClient;
|
||||||
import org.gcube.common.homelibrary.home.User;
|
import org.gcube.common.storagehub.model.Metadata;
|
||||||
import org.gcube.common.homelibrary.home.workspace.Workspace;
|
import org.gcube.common.storagehub.model.items.FolderItem;
|
||||||
import org.gcube.common.homelibrary.home.workspace.WorkspaceFolder;
|
import org.gcube.common.storagehub.model.items.GCubeItem;
|
||||||
import org.gcube.common.homelibrary.home.workspace.WorkspaceItem;
|
import org.gcube.common.storagehub.model.items.Item;
|
||||||
import org.gcube.common.homelibrary.home.workspace.folder.FolderItem;
|
|
||||||
import org.gcube.common.homelibrary.util.WorkspaceUtil;
|
|
||||||
import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools;
|
import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools;
|
||||||
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
|
||||||
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.AbstractEcologicalEngineMapper;
|
import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.AbstractEcologicalEngineMapper;
|
||||||
|
@ -88,43 +86,55 @@ public class DataspaceManager implements Runnable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createFoldersNetwork(Workspace ws, WorkspaceFolder root) throws Exception {
|
public FolderContainer createFoldersNetwork() throws Exception {
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->Creating folders for DataMiner");
|
LOGGER.debug("Dataspace->Creating folders for DataMiner");
|
||||||
|
|
||||||
// manage folders: create the folders network
|
StorageHubClient shc = new StorageHubClient();
|
||||||
if (!ws.exists(dataminerFolder, root.getId())) {
|
|
||||||
LOGGER.debug("Dataspace->Creating DataMiner main folder");
|
|
||||||
root.createFolder(dataminerFolder, "A folder collecting DataMiner experiments data and computation information");
|
|
||||||
((WorkspaceFolder) root.find(dataminerFolder)).setSystemFolder(true);
|
|
||||||
}
|
|
||||||
WorkspaceFolder dataminerFolderWS = (WorkspaceFolder) root.find(dataminerFolder);
|
|
||||||
|
|
||||||
if (!ws.exists(importedDataFolder, dataminerFolderWS.getId())) {
|
FolderContainer root =shc.getWSRoot();
|
||||||
|
|
||||||
|
List<ItemContainer<? extends Item>> dataminerItems = root.findByName(dataminerFolder).getContainers();
|
||||||
|
|
||||||
|
FolderContainer dataminerFolderContainer;
|
||||||
|
|
||||||
|
// manage folders: create the folders network
|
||||||
|
if (dataminerItems.isEmpty()) {
|
||||||
|
LOGGER.debug("Dataspace->Creating DataMiner main folder");
|
||||||
|
dataminerFolderContainer = root.newFolder(dataminerFolder, "A folder collecting DataMiner experiments data and computation information");
|
||||||
|
//((WorkspaceFolder) root.find(dataminerFolder)).setSystemFolder(true);
|
||||||
|
} else if (dataminerItems.size()>1) throw new Exception("found more than one dataminer folder (impossible!!!)");
|
||||||
|
else dataminerFolderContainer = (FolderContainer) dataminerItems.get(0);
|
||||||
|
|
||||||
|
|
||||||
|
if (dataminerFolderContainer.findByName(importedDataFolder).getContainers().isEmpty()) {
|
||||||
LOGGER.debug("Dataspace->Creating DataMiner imported data folder");
|
LOGGER.debug("Dataspace->Creating DataMiner imported data folder");
|
||||||
dataminerFolderWS.createFolder(importedDataFolder, "A folder collecting DataMiner imported data");
|
dataminerFolderContainer.newFolder(importedDataFolder, "A folder collecting DataMiner imported data");
|
||||||
}
|
}
|
||||||
if (!ws.exists(computedDataFolder, dataminerFolderWS.getId())) {
|
|
||||||
|
if (dataminerFolderContainer.findByName(computedDataFolder).getContainers().isEmpty()) {
|
||||||
LOGGER.debug("Dataspace->Creating DataMiner computed data folder");
|
LOGGER.debug("Dataspace->Creating DataMiner computed data folder");
|
||||||
dataminerFolderWS.createFolder(computedDataFolder, "A folder collecting DataMiner computed data");
|
dataminerFolderContainer.newFolder(computedDataFolder, "A folder collecting DataMiner computed data");
|
||||||
}
|
}
|
||||||
if (!ws.exists(computationsFolder, dataminerFolderWS.getId())) {
|
if (dataminerFolderContainer.findByName(computationsFolder).getContainers().isEmpty()) {
|
||||||
LOGGER.debug("Dataspace->Creating DataMiner computations folder");
|
LOGGER.debug("Dataspace->Creating DataMiner computations folder");
|
||||||
dataminerFolderWS.createFolder(computationsFolder, "A folder collecting DataMiner computations information");
|
dataminerFolderContainer.newFolder(computationsFolder, "A folder collecting DataMiner computations information");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return dataminerFolderContainer;
|
||||||
}
|
}
|
||||||
public String uploadData(StoredData data, WorkspaceFolder wsFolder) throws Exception {
|
|
||||||
return uploadData(data, wsFolder, true);
|
public String uploadData(StoredData data, FolderContainer destinationFolder) throws Exception {
|
||||||
|
return uploadData(data, destinationFolder, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String uploadData(StoredData data, WorkspaceFolder wsFolder, boolean changename) throws Exception {
|
public String uploadData(StoredData data, FolderContainer destinationFolder, boolean changename) throws Exception {
|
||||||
LOGGER.debug("Dataspace->Analysing " + data);
|
LOGGER.debug("Dataspace->Analysing " + data);
|
||||||
// String filenameonwsString = WorkspaceUtil.getUniqueName(data.name, wsFolder);
|
// String filenameonwsString = WorkspaceUtil.getUniqueName(data.name, wsFolder);
|
||||||
String filenameonwsString = data.name ;
|
String filenameonwsString = data.name ;
|
||||||
if (changename)
|
if (changename)
|
||||||
filenameonwsString = String.format("%s_(%s)%s", data.name, data.computationId, getExtension(data.payload, data.type));
|
filenameonwsString = String.format("%s_(%s)%s", data.name, data.computationId, getExtension(data.payload, data.type));
|
||||||
|
|
||||||
|
|
||||||
InputStream in = null;
|
InputStream in = null;
|
||||||
String url = "";
|
String url = "";
|
||||||
try {
|
try {
|
||||||
|
@ -174,11 +184,13 @@ public class DataspaceManager implements Runnable {
|
||||||
properties.put(data_type, data.type);
|
properties.put(data_type, data.type);
|
||||||
properties.put(payload, url);
|
properties.put(payload, url);
|
||||||
|
|
||||||
FolderItem fileItem = WorkspaceUtil.createExternalFile(wsFolder, filenameonwsString, data.description, in,properties,data.type,size);
|
FileContainer fileContainer = destinationFolder.uploadFile(in, filenameonwsString, data.description);
|
||||||
|
//TODO: add proprerty to file
|
||||||
|
//FolderItem fileItem = WorkspaceUtil.createExternalFile(wsFolder, filenameonwsString, data.description, in,properties,data.type,size);
|
||||||
//fileItem.getProperties().addProperties(properties);
|
//fileItem.getProperties().addProperties(properties);
|
||||||
LOGGER.debug("Dataspace->WS OP file saved on the WS " + filenameonwsString);
|
LOGGER.debug("Dataspace->WS OP file saved on the WS " + filenameonwsString);
|
||||||
|
|
||||||
url = fileItem.getPublicLink(false);
|
url = fileContainer.getPublicLink().toString();
|
||||||
LOGGER.debug("Dataspace->WS OP url produced for the file " + url);
|
LOGGER.debug("Dataspace->WS OP url produced for the file " + url);
|
||||||
|
|
||||||
data.payload = url;
|
data.payload = url;
|
||||||
|
@ -202,176 +214,167 @@ public class DataspaceManager implements Runnable {
|
||||||
return url;
|
return url;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> uploadInputData(List<StoredData> inputData, WorkspaceFolder dataminerFolder) throws Exception {
|
public List<String> uploadInputData(List<StoredData> inputData, FolderContainer dataminerFolder) throws Exception {
|
||||||
LOGGER.debug("Dataspace->uploading input data; Number of data: " + inputData.size());
|
LOGGER.debug("Dataspace->uploading input data; Number of data: " + inputData.size());
|
||||||
WorkspaceItem folderItem = dataminerFolder.find(importedDataFolder);
|
FolderContainer destinationFolder = (FolderContainer) dataminerFolder.findByName(importedDataFolder).getContainers().get(0);
|
||||||
List<String> urls = new ArrayList<String>();
|
List<String> urls = new ArrayList<String>();
|
||||||
if (folderItem != null && folderItem.isFolder()) {
|
for (StoredData input : inputData) {
|
||||||
WorkspaceFolder destinationFolder = (WorkspaceFolder) folderItem;
|
List<ItemContainer<? extends Item>> items = null;
|
||||||
for (StoredData input : inputData) {
|
|
||||||
WorkspaceItem item = null;
|
|
||||||
|
|
||||||
if (input.type.equals("text/csv")||input.type.equals("application/d4science")||input.type.equals("image/png"))
|
if (input.type.equals("text/csv")||input.type.equals("application/d4science")||input.type.equals("image/png"))
|
||||||
item = destinationFolder.find(input.name);
|
items = destinationFolder.findByName(input.name).getContainers();
|
||||||
|
|
||||||
if (item==null){
|
if (items==null || items.isEmpty()){
|
||||||
String url = uploadData(input, destinationFolder,false);
|
String url = uploadData(input, destinationFolder,false);
|
||||||
LOGGER.debug("Dataspace->returning property "+url);
|
LOGGER.debug("Dataspace->returning property "+url);
|
||||||
urls.add(url);
|
urls.add(url);
|
||||||
}
|
|
||||||
else{
|
|
||||||
LOGGER.debug("Dataspace->Input item "+input.name+" is already available in the input folder");
|
|
||||||
String url = item.getPublicLink(false);
|
|
||||||
LOGGER.debug("Dataspace->returning WS url "+url);
|
|
||||||
urls.add(url);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else
|
else{
|
||||||
LOGGER.debug("Dataspace->folder is not valid");
|
FileContainer item = (FileContainer) items.get(0);
|
||||||
|
LOGGER.debug("Dataspace->Input item "+input.name+" is already available in the input folder");
|
||||||
|
String url = item.getPublicLink().toString();
|
||||||
|
LOGGER.debug("Dataspace->returning WS url "+url);
|
||||||
|
urls.add(url);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->finished uploading input data");
|
LOGGER.debug("Dataspace->finished uploading input data");
|
||||||
return urls;
|
return urls;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> uploadOutputData(List<StoredData> outputData, WorkspaceFolder dataminerFolder) throws Exception {
|
public List<String> uploadOutputData(List<StoredData> outputData,FolderContainer dataminerFolder) throws Exception {
|
||||||
LOGGER.debug("Dataspace->uploading output data; Number of data: " + outputData.size());
|
LOGGER.debug("Dataspace->uploading output data; Number of data: " + outputData.size());
|
||||||
WorkspaceItem folderItem = dataminerFolder.find(computedDataFolder);
|
FolderContainer destinationFolder = (FolderContainer)dataminerFolder.findByName(computedDataFolder).getContainers().get(0);
|
||||||
List<String> urls = new ArrayList<String>();
|
List<String> urls = new ArrayList<String>();
|
||||||
if (folderItem != null && folderItem.isFolder()) {
|
for (StoredData output : outputData) {
|
||||||
WorkspaceFolder destinationFolder = (WorkspaceFolder) folderItem;
|
|
||||||
for (StoredData output : outputData) {
|
|
||||||
String url = uploadData(output, destinationFolder);
|
String url = uploadData(output, destinationFolder);
|
||||||
urls.add(url);
|
urls.add(url);
|
||||||
}
|
}
|
||||||
} else
|
|
||||||
LOGGER.debug("Dataspace->folder is not valid");
|
|
||||||
LOGGER.debug("Dataspace->finished uploading output data");
|
LOGGER.debug("Dataspace->finished uploading output data");
|
||||||
return urls;
|
return urls;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void uploadComputationData(ComputationData computation, List<StoredData> inputData, List<StoredData> outputData, WorkspaceFolder dataminerFolder, Workspace ws) throws Exception {
|
public void uploadComputationData(ComputationData computation, List<StoredData> inputData, List<StoredData> outputData, FolderContainer dataminerFolder) throws Exception {
|
||||||
LOGGER.debug("Dataspace->uploading computation data");
|
LOGGER.debug("Dataspace->uploading computation data");
|
||||||
WorkspaceItem folderItem = dataminerFolder.find(computationsFolder);
|
FolderContainer computationContainer = (FolderContainer) dataminerFolder.findByName(computationsFolder).getContainers().get(0);
|
||||||
if (folderItem != null && folderItem.isFolder()) {
|
// create a folder in here
|
||||||
// create a folder in here
|
LOGGER.debug("Dataspace->Creating computation folder " + computation.id);
|
||||||
LOGGER.debug("Dataspace->Creating computation folder " + computation.id);
|
String cfoldername = computation.id;
|
||||||
WorkspaceFolder cfolder = ((WorkspaceFolder) folderItem);
|
FolderContainer newcomputationFolder = null;
|
||||||
String cfoldername = computation.id;
|
try{
|
||||||
WorkspaceFolder newcomputationFolder = null;
|
newcomputationFolder = computationContainer.newFolder(cfoldername, computation.operatorDescription);
|
||||||
try{
|
}catch(java.lang.ClassCastException e){
|
||||||
newcomputationFolder = cfolder.createFolder(cfoldername, computation.operatorDescription);
|
LOGGER.debug("Dataspace->concurrency exception - deleting remaining item");
|
||||||
}catch(java.lang.ClassCastException e){
|
deleteRunningComputationData();
|
||||||
LOGGER.debug("Dataspace->concurrency exception - deleting remaining item");
|
newcomputationFolder = computationContainer.newFolder(cfoldername, computation.operatorDescription);
|
||||||
deleteRunningComputationData();
|
|
||||||
newcomputationFolder = cfolder.createFolder(cfoldername, computation.operatorDescription);
|
|
||||||
}
|
|
||||||
//String itemType = "COMPUTATION";
|
|
||||||
|
|
||||||
// create IO folders
|
|
||||||
LOGGER.debug("Dataspace->creating IO folders under " + cfoldername);
|
|
||||||
newcomputationFolder.createFolder(importedDataFolder, importedDataFolder);
|
|
||||||
newcomputationFolder.createFolder(computedDataFolder, computedDataFolder);
|
|
||||||
|
|
||||||
// copy IO in those folders
|
|
||||||
LOGGER.debug("Dataspace->*****uploading inputs in IO folder*****");
|
|
||||||
List<String> inputurls = uploadInputData(inputData, newcomputationFolder);
|
|
||||||
LOGGER.debug("Dataspace->*****uploading outputs in IO folder*****");
|
|
||||||
List<String> outputurls = uploadOutputData(outputData, newcomputationFolder);
|
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->*****adding properties to the folder*****");
|
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->creating Folder Properties");
|
|
||||||
|
|
||||||
// write a computation item for the computation
|
|
||||||
LinkedHashMap<String, String> properties = new LinkedHashMap<String, String>();
|
|
||||||
properties.put(computation_id, computation.id);
|
|
||||||
|
|
||||||
properties.put(hostname, WPSConfig.getInstance().getWPSConfig().getServer().getHostname());
|
|
||||||
|
|
||||||
properties.put(vre, computation.vre);
|
|
||||||
|
|
||||||
properties.put(operator_name, config.getAgent());
|
|
||||||
|
|
||||||
properties.put(operator_id, computation.operatorId);
|
|
||||||
|
|
||||||
properties.put(operator_description, computation.operatorDescription);
|
|
||||||
|
|
||||||
properties.put(start_date, computation.startDate);
|
|
||||||
|
|
||||||
properties.put(end_date, computation.endDate);
|
|
||||||
|
|
||||||
properties.put(status, getStatus(computation.status));
|
|
||||||
|
|
||||||
properties.put(execution_platform, computation.infrastructure);
|
|
||||||
|
|
||||||
int ninput = inputurls.size();
|
|
||||||
int noutput = outputurls.size();
|
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->Adding input properties for " + ninput + " inputs");
|
|
||||||
for (int i = 1; i <= ninput; i++) {
|
|
||||||
StoredData input = inputData.get(i - 1);
|
|
||||||
if (input.payload.contains("|")){
|
|
||||||
String payload = input .payload;
|
|
||||||
LOGGER.debug("Dataspace->Managing complex input "+input.name+" : "+payload);
|
|
||||||
//delete the names that are not useful
|
|
||||||
|
|
||||||
for (StoredData subinput:inputData){
|
|
||||||
if (input.description.equals(subinput.description)){
|
|
||||||
payload = payload.replace(subinput.name,subinput.payload);
|
|
||||||
subinput.name=null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
input.name = null;
|
|
||||||
|
|
||||||
//delete last pipe character
|
|
||||||
if (payload.endsWith("|"))
|
|
||||||
payload = payload.substring(0,payload.length()-1);
|
|
||||||
LOGGER.debug("Dataspace->Complex input after processing "+payload);
|
|
||||||
properties.put("input" + i + "_" + input.description, payload);
|
|
||||||
input.payload=payload;
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 1; i <= ninput; i++) {
|
|
||||||
StoredData input = inputData.get(i - 1);
|
|
||||||
if (input.name!=null){
|
|
||||||
properties.put("input" + i + "_" + input.name, inputurls.get(i - 1));
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->Adding output properties for " + noutput + " outputs");
|
|
||||||
for (int i = 1; i <= noutput; i++) {
|
|
||||||
properties.put("output" + i + "_" + outputData.get(i - 1).name, outputurls.get(i - 1));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->Properties of the folder: " + properties);
|
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->Saving properties to ProvO XML file " + noutput + " outputs");
|
|
||||||
|
|
||||||
/*
|
|
||||||
* XStream xstream = new XStream(); String xmlproperties = xstream.toXML(properties);
|
|
||||||
*/
|
|
||||||
try {
|
|
||||||
String xmlproperties = ProvOGenerator.toProvO(computation, inputData, outputData);
|
|
||||||
|
|
||||||
File xmltosave = new File(config.getPersistencePath(), "prov_o_" + UUID.randomUUID());
|
|
||||||
FileTools.saveString(xmltosave.getAbsolutePath(), xmlproperties, true, "UTF-8");
|
|
||||||
InputStream sis = new FileInputStream(xmltosave);
|
|
||||||
WorkspaceUtil.createExternalFile(newcomputationFolder, computation.id + ".xml", computation.operatorDescription, sis,null,"text/xml",xmltosave.length());
|
|
||||||
sis.close();
|
|
||||||
xmltosave.delete();
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOGGER.error("Dataspace->Failed creating ProvO XML file ",e);
|
|
||||||
}
|
|
||||||
//List<String> scopes = new ArrayList<String>();
|
|
||||||
//scopes.add(config.getGcubeScope());
|
|
||||||
//ws.createGcubeItem(computation.id, computation.operatorDescription, scopes, computation.user, itemType, properties, newcomputationFolder.getId());
|
|
||||||
newcomputationFolder.getProperties().addProperties(properties);
|
|
||||||
}
|
}
|
||||||
|
//String itemType = "COMPUTATION";
|
||||||
|
|
||||||
|
// create IO folders
|
||||||
|
LOGGER.debug("Dataspace->creating IO folders under " + cfoldername);
|
||||||
|
newcomputationFolder.newFolder(importedDataFolder, importedDataFolder);
|
||||||
|
newcomputationFolder.newFolder(computedDataFolder, computedDataFolder);
|
||||||
|
|
||||||
|
// copy IO in those folders
|
||||||
|
LOGGER.debug("Dataspace->*****uploading inputs in IO folder*****");
|
||||||
|
List<String> inputurls = uploadInputData(inputData, newcomputationFolder);
|
||||||
|
LOGGER.debug("Dataspace->*****uploading outputs in IO folder*****");
|
||||||
|
List<String> outputurls = uploadOutputData(outputData, newcomputationFolder);
|
||||||
|
|
||||||
|
LOGGER.debug("Dataspace->*****adding properties to the folder*****");
|
||||||
|
|
||||||
|
LOGGER.debug("Dataspace->creating Folder Properties");
|
||||||
|
|
||||||
|
// write a computation item for the computation
|
||||||
|
Map<String, Object> properties = new LinkedHashMap<String, Object>();
|
||||||
|
properties.put(computation_id, computation.id);
|
||||||
|
|
||||||
|
properties.put(hostname, WPSConfig.getInstance().getWPSConfig().getServer().getHostname());
|
||||||
|
|
||||||
|
properties.put(vre, computation.vre);
|
||||||
|
|
||||||
|
properties.put(operator_name, config.getAgent());
|
||||||
|
|
||||||
|
properties.put(operator_id, computation.operatorId);
|
||||||
|
|
||||||
|
properties.put(operator_description, computation.operatorDescription);
|
||||||
|
|
||||||
|
properties.put(start_date, computation.startDate);
|
||||||
|
|
||||||
|
properties.put(end_date, computation.endDate);
|
||||||
|
|
||||||
|
properties.put(status, getStatus(computation.status));
|
||||||
|
|
||||||
|
properties.put(execution_platform, computation.infrastructure);
|
||||||
|
|
||||||
|
int ninput = inputurls.size();
|
||||||
|
int noutput = outputurls.size();
|
||||||
|
|
||||||
|
LOGGER.debug("Dataspace->Adding input properties for " + ninput + " inputs");
|
||||||
|
for (int i = 1; i <= ninput; i++) {
|
||||||
|
StoredData input = inputData.get(i - 1);
|
||||||
|
if (input.payload.contains("|")){
|
||||||
|
String payload = input .payload;
|
||||||
|
LOGGER.debug("Dataspace->Managing complex input "+input.name+" : "+payload);
|
||||||
|
//delete the names that are not useful
|
||||||
|
|
||||||
|
for (StoredData subinput:inputData){
|
||||||
|
if (input.description.equals(subinput.description)){
|
||||||
|
payload = payload.replace(subinput.name,subinput.payload);
|
||||||
|
subinput.name=null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
input.name = null;
|
||||||
|
|
||||||
|
//delete last pipe character
|
||||||
|
if (payload.endsWith("|"))
|
||||||
|
payload = payload.substring(0,payload.length()-1);
|
||||||
|
LOGGER.debug("Dataspace->Complex input after processing "+payload);
|
||||||
|
properties.put("input" + i + "_" + input.description, payload);
|
||||||
|
input.payload=payload;
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 1; i <= ninput; i++) {
|
||||||
|
StoredData input = inputData.get(i - 1);
|
||||||
|
if (input.name!=null){
|
||||||
|
properties.put(String.format("input%d_%s",i, input.name), inputurls.get(i - 1));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.debug("Dataspace->Adding output properties for " + noutput + " outputs");
|
||||||
|
for (int i = 1; i <= noutput; i++) {
|
||||||
|
properties.put(String.format("output%d_%s",i,outputData.get(i - 1).name), outputurls.get(i - 1));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.debug("Dataspace->Properties of the folder: " + properties);
|
||||||
|
|
||||||
|
LOGGER.debug("Dataspace->Saving properties to ProvO XML file " + noutput + " outputs");
|
||||||
|
|
||||||
|
/*
|
||||||
|
* XStream xstream = new XStream(); String xmlproperties = xstream.toXML(properties);
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
String xmlproperties = ProvOGenerator.toProvO(computation, inputData, outputData);
|
||||||
|
|
||||||
|
File xmltosave = new File(config.getPersistencePath(), "prov_o_" + UUID.randomUUID());
|
||||||
|
FileTools.saveString(xmltosave.getAbsolutePath(), xmlproperties, true, "UTF-8");
|
||||||
|
try(InputStream sis = new FileInputStream(xmltosave)){
|
||||||
|
newcomputationFolder.uploadFile(sis,computation.id + ".xml", computation.operatorDescription);
|
||||||
|
}
|
||||||
|
xmltosave.delete();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("Dataspace->Failed creating ProvO XML file ",e);
|
||||||
|
}
|
||||||
|
//List<String> scopes = new ArrayList<String>();
|
||||||
|
//scopes.add(config.getGcubeScope());
|
||||||
|
//ws.createGcubeItem(computation.id, computation.operatorDescription, scopes, computation.user, itemType, properties, newcomputationFolder.getId());
|
||||||
|
newcomputationFolder.setMetadata(new Metadata(properties));
|
||||||
|
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->finished uploading computation data");
|
LOGGER.debug("Dataspace->finished uploading computation data");
|
||||||
}
|
}
|
||||||
|
@ -389,23 +392,14 @@ public class DataspaceManager implements Runnable {
|
||||||
|
|
||||||
public void writeProvenance(ComputationData computation, List<StoredData> inputData, List<StoredData> outputData) throws Exception {
|
public void writeProvenance(ComputationData computation, List<StoredData> inputData, List<StoredData> outputData) throws Exception {
|
||||||
LOGGER.debug("Dataspace->connecting to Workspace");
|
LOGGER.debug("Dataspace->connecting to Workspace");
|
||||||
HomeManagerFactory factory = HomeLibrary.getHomeManagerFactory();
|
|
||||||
HomeManager manager = factory.getHomeManager();
|
|
||||||
LOGGER.debug("Dataspace->getting user");
|
|
||||||
User user = manager.createUser(computation.user);
|
|
||||||
Home home = manager.getHome(user);
|
|
||||||
LOGGER.debug("Dataspace->getting root folder");
|
|
||||||
Workspace ws = home.getWorkspace();
|
|
||||||
WorkspaceFolder root = ws.getRoot();
|
|
||||||
LOGGER.debug("Dataspace->create folders network");
|
LOGGER.debug("Dataspace->create folders network");
|
||||||
createFoldersNetwork(ws, root);
|
FolderContainer dataminerFolder = createFoldersNetwork();
|
||||||
WorkspaceFolder dataminerItem = (WorkspaceFolder) root.find(dataminerFolder);
|
|
||||||
LOGGER.debug("Dataspace->****uploading input files****");
|
LOGGER.debug("Dataspace->****uploading input files****");
|
||||||
uploadInputData(inputData, dataminerItem);
|
uploadInputData(inputData, dataminerFolder);
|
||||||
LOGGER.debug("Dataspace->****uploading output files****");
|
LOGGER.debug("Dataspace->****uploading output files****");
|
||||||
uploadOutputData(outputData, dataminerItem);
|
uploadOutputData(outputData, dataminerFolder);
|
||||||
LOGGER.debug("Dataspace->****uploading computation files****");
|
LOGGER.debug("Dataspace->****uploading computation files****");
|
||||||
uploadComputationData(computation, inputData, outputData, dataminerItem, ws);
|
uploadComputationData(computation, inputData, outputData, dataminerFolder);
|
||||||
LOGGER.debug("Dataspace->provenance management finished");
|
LOGGER.debug("Dataspace->provenance management finished");
|
||||||
LOGGER.debug("Dataspace->deleting generated files");
|
LOGGER.debug("Dataspace->deleting generated files");
|
||||||
AbstractEcologicalEngineMapper.deleteGeneratedFiles(generatedFiles);
|
AbstractEcologicalEngineMapper.deleteGeneratedFiles(generatedFiles);
|
||||||
|
@ -418,27 +412,19 @@ public class DataspaceManager implements Runnable {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.debug("Dataspace->impossible to delete running computation : {} ",e.getMessage());
|
LOGGER.debug("Dataspace->impossible to delete running computation : {} ",e.getMessage());
|
||||||
}
|
}
|
||||||
// LOGGER.debug("Dataspace->updating computation status");
|
StorageHubClient shc = new StorageHubClient();
|
||||||
// LOGGER.debug("Dataspace->connecting to Workspace");
|
|
||||||
HomeManagerFactory factory = HomeLibrary.getHomeManagerFactory();
|
|
||||||
HomeManager manager = factory.getHomeManager();
|
|
||||||
// LOGGER.debug("Dataspace->getting user");
|
|
||||||
User user = manager.createUser(computation.user);
|
|
||||||
Home home = manager.getHome(user);
|
|
||||||
// LOGGER.debug("Dataspace->getting root folder");
|
|
||||||
Workspace ws = home.getWorkspace();
|
|
||||||
WorkspaceFolder root = ws.getRoot();
|
|
||||||
// LOGGER.debug("Dataspace->create folders network");
|
// LOGGER.debug("Dataspace->create folders network");
|
||||||
createFoldersNetwork(ws, root);
|
FolderContainer folderContainer = createFoldersNetwork();
|
||||||
WorkspaceFolder dataminerFolderWS = (WorkspaceFolder) root.find(dataminerFolder);
|
|
||||||
WorkspaceItem computationsFolderItem = dataminerFolderWS.find(computationsFolder);
|
FolderContainer computationsContainer = (FolderContainer) folderContainer.findByName(computationsFolder).getContainers().get(0);
|
||||||
// LOGGER.debug("Dataspace->Creating computation item " + computation.id+" with status"+computation.status);
|
// LOGGER.debug("Dataspace->Creating computation item " + computation.id+" with status"+computation.status);
|
||||||
String itemType = "COMPUTATION";
|
String itemType = "COMPUTATION";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// write a computation item for the computation
|
// write a computation item for the computation
|
||||||
LinkedHashMap<String, String> properties = new LinkedHashMap<String, String>();
|
Map<String, Object> properties = new LinkedHashMap<String, Object>();
|
||||||
properties.put(computation_id, computation.id);
|
properties.put(computation_id, computation.id);
|
||||||
properties.put(hostname, WPSConfig.getInstance().getWPSConfig().getServer().getHostname());
|
properties.put(hostname, WPSConfig.getInstance().getWPSConfig().getServer().getHostname());
|
||||||
properties.put(vre, computation.vre);
|
properties.put(vre, computation.vre);
|
||||||
|
@ -454,7 +440,15 @@ public class DataspaceManager implements Runnable {
|
||||||
|
|
||||||
List<String> scopes = new ArrayList<String>();
|
List<String> scopes = new ArrayList<String>();
|
||||||
scopes.add(config.getGcubeScope());
|
scopes.add(config.getGcubeScope());
|
||||||
ws.createGcubeItem(computation.id, computation.operatorDescription, scopes, computation.user, itemType, properties, computationsFolderItem.getId());
|
GCubeItem gcubeItem = new GCubeItem();
|
||||||
|
gcubeItem.setName(computation.id);
|
||||||
|
gcubeItem.setDescription(computation.operatorDescription);
|
||||||
|
gcubeItem.setScopes(scopes.toArray(new String[scopes.size()]));
|
||||||
|
gcubeItem.setItemType(itemType);
|
||||||
|
gcubeItem.setProperty(new Metadata(properties));
|
||||||
|
|
||||||
|
computationsContainer.newGcubeItem(gcubeItem);
|
||||||
|
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->finished uploading computation data");
|
LOGGER.debug("Dataspace->finished uploading computation data");
|
||||||
}
|
}
|
||||||
|
@ -481,26 +475,20 @@ public class DataspaceManager implements Runnable {
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->deleting computation item");
|
LOGGER.debug("Dataspace->deleting computation item");
|
||||||
LOGGER.debug("Dataspace->connecting to Workspace");
|
LOGGER.debug("Dataspace->connecting to Workspace");
|
||||||
HomeManagerFactory factory = HomeLibrary.getHomeManagerFactory();
|
StorageHubClient shc = new StorageHubClient();
|
||||||
HomeManager manager = factory.getHomeManager();
|
FolderContainer dataminerContainer = (FolderContainer) shc.getWSRoot().findByName(dataminerFolder).getContainers().get(0);
|
||||||
LOGGER.debug("Dataspace->getting user");
|
FolderContainer computationContainer = (FolderContainer) dataminerContainer.findByName(computationsFolder).getContainers().get(0);
|
||||||
User user = manager.createUser(computation.user);
|
|
||||||
Home home = manager.getHome(user);
|
|
||||||
LOGGER.debug("Dataspace->getting root folder");
|
|
||||||
Workspace ws = home.getWorkspace();
|
|
||||||
WorkspaceFolder root = ws.getRoot();
|
|
||||||
WorkspaceFolder dataminerFolderWS = (WorkspaceFolder) root.find(dataminerFolder);
|
|
||||||
WorkspaceItem computationsFolderItem = dataminerFolderWS.find(computationsFolder);
|
|
||||||
LOGGER.debug("Dataspace->removing computation data");
|
LOGGER.debug("Dataspace->removing computation data");
|
||||||
WorkspaceFolder computationsFolderWs = ((WorkspaceFolder) computationsFolderItem);
|
|
||||||
WorkspaceItem wi = computationsFolderWs.find(computation.id);
|
List<ItemContainer<? extends Item>> wi = computationContainer.findByName(computation.id).getContainers();
|
||||||
if (wi!=null){
|
if (wi.isEmpty()){
|
||||||
LOGGER.debug("Dataspace->Found "+computation.id+" under "+computationsFolderWs.getName()+" - removing");
|
for (ItemContainer<? extends Item> container : wi)
|
||||||
wi.remove();
|
container.delete();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
LOGGER.debug("Dataspace->Warning Could not find "+computation.id+" under "+computationsFolderWs.getName());
|
LOGGER.debug("Dataspace->Warning Could not find "+computation.id+" under "+computationContainer.get().getName());
|
||||||
|
|
||||||
|
/*TODO: ASK GIANPAOLO
|
||||||
int maxtries = 3;
|
int maxtries = 3;
|
||||||
int i =1;
|
int i =1;
|
||||||
while (ws.exists(computation.id,computationsFolderWs.getId()) && i<maxtries){
|
while (ws.exists(computation.id,computationsFolderWs.getId()) && i<maxtries){
|
||||||
|
@ -508,9 +496,9 @@ public class DataspaceManager implements Runnable {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
computationsFolderWs.find(computation.id).remove();
|
computationsFolderWs.find(computation.id).remove();
|
||||||
i++;
|
i++;
|
||||||
}
|
}*/
|
||||||
|
|
||||||
LOGGER.debug("Dataspace->finished removing computation data - success "+!ws.exists(computation.id,computationsFolderWs.getId()));
|
LOGGER.debug("Dataspace->finished removing computation data ");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getExtension(String payload, String type){
|
public static String getExtension(String payload, String type){
|
||||||
|
|
Loading…
Reference in New Issue