package org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.dataspace; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.UUID; import org.gcube.common.homelibrary.home.Home; import org.gcube.common.homelibrary.home.HomeLibrary; import org.gcube.common.homelibrary.home.HomeManager; import org.gcube.common.homelibrary.home.HomeManagerFactory; import org.gcube.common.homelibrary.home.User; import org.gcube.common.homelibrary.home.workspace.Workspace; import org.gcube.common.homelibrary.home.workspace.WorkspaceFolder; import org.gcube.common.homelibrary.home.workspace.WorkspaceItem; import org.gcube.common.homelibrary.home.workspace.folder.FolderItem; import org.gcube.common.homelibrary.util.WorkspaceUtil; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.contentmanagement.lexicalmatcher.utils.FileTools; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mapping.AbstractEcologicalEngineMapper; public class DataspaceManager implements Runnable { public static String dataminerFolder = "DataMiner"; public static String importedDataFolder = "Input Data Sets"; public static String computedDataFolder = "Output Data Sets"; public static String computationsFolder = "Computations"; AlgorithmConfiguration config; ComputationData computation; List inputData; List outputData; List generatedFiles; public static String computation_id = "computation_id"; public static String data_id = "data_id"; public static String data_type = "data_type"; public static String operator_name = "operator_name"; public static String operator_id = "operator_id"; public static String vre = "VRE"; public static String operator_description = "operator_description"; public static String data_description = "data_description"; public static String creation_date = "creation_date"; public static String start_date = "start_date"; public static String end_date = "end_date"; public static String status = "status"; public static String execution_platform = "execution_type"; public static String error = "error"; public static String IO = "IO"; public static String operator = "operator"; public static String payload = "payload"; public DataspaceManager(AlgorithmConfiguration config, ComputationData computation, List inputData, List outputData, List generatedFiles) { this.config = config; this.computation = computation; this.inputData = inputData; this.outputData = outputData; this.generatedFiles = generatedFiles; } public void run() { try { AnalysisLogger.getLogger().debug("Dataspace->Deleting running computation"); try { deleteRunningComputationData(); } catch (Exception e) { AnalysisLogger.getLogger().debug("Dataspace->No running computation available"); } AnalysisLogger.getLogger().debug("Dataspace->Writing provenance information"); writeProvenance(computation, inputData, outputData); } catch (Exception e) { e.printStackTrace(); AnalysisLogger.getLogger().debug("Dataspace-> error writing provenance information " + e.getLocalizedMessage()); AnalysisLogger.getLogger().debug(e); } } public void createFoldersNetwork(Workspace ws, WorkspaceFolder root) throws Exception { AnalysisLogger.getLogger().debug("Dataspace->Creating folders for DataMiner"); // manage folders: create the folders network if (!ws.exists(dataminerFolder, root.getId())) { AnalysisLogger.getLogger().debug("Dataspace->Creating DataMiner main folder"); root.createFolder(dataminerFolder, "A folder collecting DataMiner experiments data and computation information"); } WorkspaceFolder dataminerFolderWS = (WorkspaceFolder) root.find(dataminerFolder); if (!ws.exists(importedDataFolder, dataminerFolderWS.getId())) { AnalysisLogger.getLogger().debug("Dataspace->Creating DataMiner imported data folder"); dataminerFolderWS.createFolder(importedDataFolder, "A folder collecting DataMiner imported data"); } if (!ws.exists(computedDataFolder, dataminerFolderWS.getId())) { AnalysisLogger.getLogger().debug("Dataspace->Creating DataMiner computed data folder"); dataminerFolderWS.createFolder(computedDataFolder, "A folder collecting DataMiner computed data"); } if (!ws.exists(computationsFolder, dataminerFolderWS.getId())) { AnalysisLogger.getLogger().debug("Dataspace->Creating DataMiner computations folder"); dataminerFolderWS.createFolder(computationsFolder, "A folder collecting DataMiner computations information"); } } public String uploadData(StoredData data, WorkspaceFolder wsFolder) throws Exception { return uploadData(data, wsFolder, true); } public String uploadData(StoredData data, WorkspaceFolder wsFolder, boolean changename) throws Exception { // String filenameonwsString = WorkspaceUtil.getUniqueName(data.name, wsFolder); String filenameonwsString = data.name ; if (changename){ filenameonwsString = data.name + "_[" + data.computationId + "]";// ("_"+UUID.randomUUID()).replace("-", ""); if (data.type.equals("text/csv")) filenameonwsString+=".csv"; else if (data.type.equals("image/png")) filenameonwsString+=".png"; } InputStream in = null; String url = ""; try { if (data.type.equals("text/csv")||data.type.equals("application/d4science")||data.type.equals("image/png")) { if (new File(data.payload).exists() || !data.payload.startsWith("http")) { AnalysisLogger.getLogger().debug("Dataspace->Uploading file " + data.payload); in = new FileInputStream(new File(data.payload)); } else { AnalysisLogger.getLogger().debug("Dataspace->Uploading via URL " + data.payload); int tries = 10; for (int i=0;iRetrying connection to "+data.payload+" number "+(i+1)); in =null; } if (in!=null) break; else Thread.sleep(10000); } } if (in==null) throw new Exception("Impossible to open stream from "+data.payload); // AnalysisLogger.getLogger().debug("Dataspace->final file name on ws " + data.name+" description "+data.description); AnalysisLogger.getLogger().debug("Dataspace->saving the following file on the WS " + filenameonwsString + " [" + data.computationId + "]"); FolderItem fileItem = WorkspaceUtil.createExternalFile(wsFolder, filenameonwsString, data.description, null, in); fileItem.getProperties().addProperty(computation_id, data.computationId); fileItem.getProperties().addProperty(vre, data.vre); fileItem.getProperties().addProperty(creation_date, data.creationDate); fileItem.getProperties().addProperty(operator, data.operator); fileItem.getProperties().addProperty(data_id, data.id); fileItem.getProperties().addProperty(data_description, data.description); fileItem.getProperties().addProperty(IO, data.provenance.name()); fileItem.getProperties().addProperty(data_type, data.type); url = fileItem.getPublicLink(true); fileItem.getProperties().addProperty(payload, url); data.payload = url; try { in.close(); } catch (Exception e) { AnalysisLogger.getLogger().debug("Dataspace->Error creating file " + e.getMessage()); AnalysisLogger.getLogger().debug(e); } AnalysisLogger.getLogger().debug("Dataspace->File created " + data.name); } else { AnalysisLogger.getLogger().debug("Dataspace->Uploading string " + data.payload); url = data.payload; } } catch (Throwable e) { e.printStackTrace(); AnalysisLogger.getLogger().debug("Dataspace->Could not retrieve input payload " + data.payload+" - "+e.getLocalizedMessage()); AnalysisLogger.getLogger().debug(e); url = "payload was not made available for this dataset"; data.payload = url; } return url; } public List uploadInputData(List inputData, WorkspaceFolder dataminerFolder) throws Exception { AnalysisLogger.getLogger().debug("Dataspace->uploading input data " + inputData.size()); WorkspaceItem folderItem = dataminerFolder.find(importedDataFolder); List urls = new ArrayList(); if (folderItem != null && folderItem.isFolder()) { WorkspaceFolder destinationFolder = (WorkspaceFolder) folderItem; for (StoredData input : inputData) { WorkspaceItem item = destinationFolder.find(input.name); if (item==null){ AnalysisLogger.getLogger().debug("Dataspace->There is no item named "+input.name+" on the Workspace"); String url = uploadData(input, destinationFolder,false); AnalysisLogger.getLogger().debug("Dataspace->returning generated URL "+url); urls.add(url); } else{ AnalysisLogger.getLogger().debug("Dataspace->Input item "+input.name+" is already available in the input folder"); String url = item.getPublicLink(false); AnalysisLogger.getLogger().debug("Dataspace->returning WS url "+url); urls.add(url); } } } else AnalysisLogger.getLogger().debug("Dataspace->folder is not valid"); AnalysisLogger.getLogger().debug("Dataspace->finished uploading input data"); return urls; } public List uploadOutputData(List outputData, WorkspaceFolder dataminerFolder) throws Exception { AnalysisLogger.getLogger().debug("Dataspace->uploading output data" + outputData.size()); WorkspaceItem folderItem = dataminerFolder.find(computedDataFolder); List urls = new ArrayList(); if (folderItem != null && folderItem.isFolder()) { WorkspaceFolder destinationFolder = (WorkspaceFolder) folderItem; for (StoredData output : outputData) { String url = uploadData(output, destinationFolder); urls.add(url); } } else AnalysisLogger.getLogger().debug("Dataspace->folder is not valid"); AnalysisLogger.getLogger().debug("Dataspace->finished uploading output data"); return urls; } public void uploadComputationData(ComputationData computation, List inputData, List outputData, WorkspaceFolder dataminerFolder, Workspace ws) throws Exception { AnalysisLogger.getLogger().debug("Dataspace->uploading computation data"); WorkspaceItem folderItem = dataminerFolder.find(computationsFolder); if (folderItem != null && folderItem.isFolder()) { // create a folder in here AnalysisLogger.getLogger().debug("Dataspace->Creating computation folder " + computation.id); WorkspaceFolder cfolder = ((WorkspaceFolder) folderItem); String cfoldername = computation.id; WorkspaceFolder newcomputationFolder = cfolder.createFolder(cfoldername, computation.operatorDescription); String itemType = "COMPUTATION"; // create IO folders AnalysisLogger.getLogger().debug("Dataspace->creating IO folders under " + cfoldername); newcomputationFolder.createFolder(importedDataFolder, importedDataFolder); newcomputationFolder.createFolder(computedDataFolder, computedDataFolder); // copy IO in those folders List inputurls = uploadInputData(inputData, newcomputationFolder); List outputurls = uploadOutputData(outputData, newcomputationFolder); AnalysisLogger.getLogger().debug("Dataspace->creating gCube Item"); // write a computation item for the computation LinkedHashMap properties = new LinkedHashMap(); properties.put(computation_id, computation.id); newcomputationFolder.getProperties().addProperty(computation_id, computation.id); properties.put(vre, computation.vre); newcomputationFolder.getProperties().addProperty(vre, computation.vre); properties.put(operator_name, config.getAgent()); newcomputationFolder.getProperties().addProperty(operator_name, config.getAgent()); properties.put(operator_id, computation.operatorId); newcomputationFolder.getProperties().addProperty(operator_id, computation.operatorId); properties.put(operator_description, computation.operatorDescription); newcomputationFolder.getProperties().addProperty(operator_description, computation.operatorDescription); properties.put(start_date, computation.startDate); newcomputationFolder.getProperties().addProperty(start_date, computation.startDate); properties.put(end_date, computation.endDate); newcomputationFolder.getProperties().addProperty(end_date, computation.endDate); properties.put(status, computation.status); newcomputationFolder.getProperties().addProperty(status, computation.status); properties.put(execution_platform, computation.infrastructure); newcomputationFolder.getProperties().addProperty(execution_platform, computation.infrastructure); int ninput = inputurls.size(); int noutput = outputurls.size(); AnalysisLogger.getLogger().debug("Dataspace->Adding input properties for " + ninput + " inputs"); for (int i = 1; i <= ninput; i++) { properties.put("input" + i + "_" + inputData.get(i - 1).name, inputurls.get(i - 1)); newcomputationFolder.getProperties().addProperty("input" + i + "_" + inputData.get(i - 1).name, inputurls.get(i - 1)); } AnalysisLogger.getLogger().debug("Dataspace->Adding output properties for " + noutput + " outputs"); for (int i = 1; i <= noutput; i++) { properties.put("output" + i + "_" + outputData.get(i - 1).name, outputurls.get(i - 1)); newcomputationFolder.getProperties().addProperty("output" + i + "_" + outputData.get(i - 1).name, outputurls.get(i - 1)); } AnalysisLogger.getLogger().debug("Dataspace->Saving properties to ProvO XML file " + noutput + " outputs"); /* * XStream xstream = new XStream(); String xmlproperties = xstream.toXML(properties); */ try { String xmlproperties = ProvOGenerator.toProvO(computation, inputData, outputData); File xmltosave = new File(config.getPersistencePath(), "prov_o_" + UUID.randomUUID()); FileTools.saveString(xmltosave.getAbsolutePath(), xmlproperties, true, "UTF-8"); InputStream sis = new FileInputStream(xmltosave); WorkspaceUtil.createExternalFile(newcomputationFolder, computation.id + ".xml", computation.operatorDescription, null, sis); sis.close(); xmltosave.delete(); } catch (Exception e) { AnalysisLogger.getLogger().debug("Dataspace->Failed creating ProvO XML file " + e.getLocalizedMessage()); AnalysisLogger.getLogger().debug(e); e.printStackTrace(); } List scopes = new ArrayList(); scopes.add(config.getGcubeScope()); ws.createGcubeItem(computation.id, computation.operatorDescription, scopes, computation.user, itemType, properties, newcomputationFolder.getId()); } AnalysisLogger.getLogger().debug("Dataspace->finished uploading computation data"); } public void writeProvenance(ComputationData computation, List inputData, List outputData) throws Exception { AnalysisLogger.getLogger().debug("Dataspace->connecting to Workspace"); HomeManagerFactory factory = HomeLibrary.getHomeManagerFactory(); HomeManager manager = factory.getHomeManager(); AnalysisLogger.getLogger().debug("Dataspace->getting user"); User user = manager.createUser(computation.user); Home home = manager.getHome(user); AnalysisLogger.getLogger().debug("Dataspace->getting root folder"); Workspace ws = home.getWorkspace(); WorkspaceFolder root = ws.getRoot(); AnalysisLogger.getLogger().debug("Dataspace->create folders network"); createFoldersNetwork(ws, root); WorkspaceFolder dataminerItem = (WorkspaceFolder) root.find(dataminerFolder); AnalysisLogger.getLogger().debug("Dataspace->uploading input files"); uploadInputData(inputData, dataminerItem); AnalysisLogger.getLogger().debug("Dataspace->uploading output files"); uploadOutputData(outputData, dataminerItem); AnalysisLogger.getLogger().debug("Dataspace->uploading computation files"); uploadComputationData(computation, inputData, outputData, dataminerItem, ws); AnalysisLogger.getLogger().debug("Dataspace->provenance management finished"); AnalysisLogger.getLogger().debug("Dataspace->deleting generated files"); AbstractEcologicalEngineMapper.deleteGeneratedFiles(generatedFiles); AnalysisLogger.getLogger().debug("Dataspace->generated files deleted"); } public void writeRunningComputationData() throws Exception { try { deleteRunningComputationData(); } catch (Exception e) { AnalysisLogger.getLogger().debug("Dataspace->impossible to delete running computation"); } // AnalysisLogger.getLogger().debug("Dataspace->updating computation status"); // AnalysisLogger.getLogger().debug("Dataspace->connecting to Workspace"); HomeManagerFactory factory = HomeLibrary.getHomeManagerFactory(); HomeManager manager = factory.getHomeManager(); // AnalysisLogger.getLogger().debug("Dataspace->getting user"); User user = manager.createUser(computation.user); Home home = manager.getHome(user); // AnalysisLogger.getLogger().debug("Dataspace->getting root folder"); Workspace ws = home.getWorkspace(); WorkspaceFolder root = ws.getRoot(); // AnalysisLogger.getLogger().debug("Dataspace->create folders network"); createFoldersNetwork(ws, root); WorkspaceFolder dataminerFolderWS = (WorkspaceFolder) root.find(dataminerFolder); WorkspaceItem computationsFolderItem = dataminerFolderWS.find(computationsFolder); // AnalysisLogger.getLogger().debug("Dataspace->Creating computation item " + computation.id+" with status"+computation.status); String itemType = "COMPUTATION"; // write a computation item for the computation LinkedHashMap properties = new LinkedHashMap(); properties.put(computation_id, computation.id); properties.put(vre, computation.vre); properties.put(operator_name, config.getAgent()); properties.put(operator_description, computation.operatorDescription); properties.put(operator_id, computation.operatorId); properties.put(start_date, computation.startDate); properties.put(end_date, computation.endDate); properties.put(status, computation.status); properties.put(execution_platform, computation.infrastructure); if (computation.exception != null && computation.exception.length() > 0) properties.put(error, computation.exception); List scopes = new ArrayList(); scopes.add(config.getGcubeScope()); ws.createGcubeItem(computation.id, computation.operatorDescription, scopes, computation.user, itemType, properties, computationsFolderItem.getId()); AnalysisLogger.getLogger().debug("Dataspace->finished uploading computation data"); } public void deleteRunningComputationData() throws Exception { AnalysisLogger.getLogger().debug("Dataspace->deleting computation item"); AnalysisLogger.getLogger().debug("Dataspace->connecting to Workspace"); HomeManagerFactory factory = HomeLibrary.getHomeManagerFactory(); HomeManager manager = factory.getHomeManager(); AnalysisLogger.getLogger().debug("Dataspace->getting user"); User user = manager.createUser(computation.user); Home home = manager.getHome(user); AnalysisLogger.getLogger().debug("Dataspace->getting root folder"); Workspace ws = home.getWorkspace(); WorkspaceFolder root = ws.getRoot(); WorkspaceFolder dataminerFolderWS = (WorkspaceFolder) root.find(dataminerFolder); WorkspaceItem computationsFolderItem = dataminerFolderWS.find(computationsFolder); AnalysisLogger.getLogger().debug("Dataspace->removing computation data"); ((WorkspaceFolder) computationsFolderItem).find(computation.id).remove(); AnalysisLogger.getLogger().debug("Dataspace->finished removing computation data"); } }