From 5c1bd422e8f0c36f44b1e3f1edcdf000a5ec019b Mon Sep 17 00:00:00 2001 From: Gianpaolo Coro Date: Wed, 23 Sep 2015 10:04:25 +0000 Subject: [PATCH] git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineSmartExecutor@118991 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../executor/nodes/algorithms/ICCATVPA.java | 2 +- .../executor/util/DataTransferer.java | 167 ++++++++++-------- .../executor/util/IfraRetrieval.java | 30 ---- .../executor/util/InfraRetrieval.java | 58 ++++++ 4 files changed, 155 insertions(+), 102 deletions(-) delete mode 100644 src/main/java/org/gcube/dataanalysis/executor/util/IfraRetrieval.java create mode 100644 src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java diff --git a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java index 466e24c..2fde988 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java +++ b/src/main/java/org/gcube/dataanalysis/executor/nodes/algorithms/ICCATVPA.java @@ -24,7 +24,7 @@ import org.gcube.dataanalysis.executor.util.StorageUtils; public class ICCATVPA extends ActorNode { - + @Override public ALG_PROPS[] getProperties() { diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/DataTransferer.java b/src/main/java/org/gcube/dataanalysis/executor/util/DataTransferer.java index c26f6c4..212caab 100644 --- a/src/main/java/org/gcube/dataanalysis/executor/util/DataTransferer.java +++ b/src/main/java/org/gcube/dataanalysis/executor/util/DataTransferer.java @@ -4,107 +4,132 @@ import static org.gcube.datatransfer.agent.library.proxies.Proxies.transferAgent import java.io.File; import java.net.URI; -import java.net.URL; import java.util.ArrayList; import java.util.concurrent.TimeUnit; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.datatransfer.agent.library.AgentLibrary; +import org.gcube.datatransfer.agent.library.exceptions.MonitorTransferException; import org.gcube.datatransfer.common.agent.Types.storageType; import org.gcube.datatransfer.common.options.TransferOptions; import org.gcube.datatransfer.common.outcome.FileTransferOutcome; +import org.gcube.datatransfer.common.outcome.TransferStatus; public class DataTransferer { - public static void main(String[] args) throws Exception{ - - ScopeProvider.instance.set("/d4science.research-infrastructures.eu/gCubeApps"); + public static void main(String[] args) throws Exception { + String scope = "/d4science.research-infrastructures.eu/gCubeApps"; + ScopeProvider.instance.set(scope); String transferGHN = "dewn04.madgik.di.uoa.gr"; - int transferPort = 8080; + int transferPort = 8080; AgentLibrary library = transferAgent().at(transferGHN, transferPort).build(); - + ArrayList input = new ArrayList(); /* - File localfile = new File("C:/Users/coro/Dropbox/Public/wind1.tif"); - String file = "wind1.tif"; - String localfolder = "C:/Users/coro/Dropbox/Public/"; - String storagesmpurl = StorageUtils.uploadFilesOnStorage("/gcube/devsec", "gianpaolo.coro",localfolder,file); - - System.out.println("URI from storage: "+storagesmpurl); - - String urlStorage = "http://dev.d4science.org/uri-resolver/smp?smp-uri="+storagesmpurl+"&fileName="+file; - */ -// String urlStorage = "http://dev.d4science.org/smp?smp-uri=smp://data.gcube.org/gzAv/RparhTHO4yhbF9ItALcRlSJRIiBGmbP5+HKCzc=&fileName=wind1.tif"; + * File localfile = new File("C:/Users/coro/Dropbox/Public/wind1.tif"); String file = "wind1.tif"; String localfolder = "C:/Users/coro/Dropbox/Public/"; String storagesmpurl = StorageUtils.uploadFilesOnStorage("/gcube/devsec", "gianpaolo.coro",localfolder,file); + * + * System.out.println("URI from storage: "+storagesmpurl); + * + * String urlStorage = "http://dev.d4science.org/uri-resolver/smp?smp-uri="+storagesmpurl+"&fileName="+file; + */ + // String urlStorage = "http://dev.d4science.org/smp?smp-uri=smp://data.gcube.org/gzAv/RparhTHO4yhbF9ItALcRlSJRIiBGmbP5+HKCzc=&fileName=wind1.tif"; String urlStorage = "smp://data.gcube.org/gzAv/RparhTHO4yhbF9ItALcRlSJRIiBGmbP5+HKCzc="; - - System.out.println("URL for storage: "+urlStorage); - -// URI uri = new URI("http://dl.dropboxusercontent.com/u/12809149/wind1.tif"); - URI uri = new URI(urlStorage); - - //http://dev.d4science.org/uri-resolver/smp?smp-uri=smp://data.gcube.org/gzAv/RparhTHO4yhbF9ItALcRlSJRIiBGmbP5+HKCzc=&fileName=wind1.tif&contentType=tiff - -// URI uri = new URI(storageurl); //localfile.toURI(); -// URI uri = new URI("file:///C:Users/coro/Dropbox/Public/wind1.tif"); + + System.out.println("URL for storage: " + urlStorage); + + URI uri = new URI("http://dl.dropboxusercontent.com/u/12809149/wind1.tif"); + // URI uri = new URI(urlStorage); + + // http://dev.d4science.org/uri-resolver/smp?smp-uri=smp://data.gcube.org/gzAv/RparhTHO4yhbF9ItALcRlSJRIiBGmbP5+HKCzc=&fileName=wind1.tif&contentType=tiff + + // URI uri = new URI(storageurl); //localfile.toURI(); + // URI uri = new URI("file:///C:Users/coro/Dropbox/Public/wind1.tif"); input.add(uri); - + String outPath = "/tmp"; - - TransferOptions options = new TransferOptions(); - - options = new TransferOptions(); - options.setOverwriteFile(true); - options.setType(storageType.LocalGHN); - options.setUnzipFile(false); - - ArrayList outcomes = library.startTransferSync(input, outPath, options); - for (FileTransferOutcome outcome:outcomes){ - System.out.println(outcome); - System.out.println(outcome.fileName()); - System.out.println(outcome.isSuccess()); - System.out.println(outcome.getTotal_size()); - System.out.println(outcome.getTransferredBytes()); - System.out.println(outcome.getTransferTime()); - } + + transferFileToService(scope, "gianpaolo.coro", transferGHN, transferPort, "C:\\Users\\coro\\Dropbox\\Public\\3_Aquamaps.jpg", outPath); } - - //returns the number of transferred bytes - public static Long transferFileToService(String scope, String username, String service,int port, String fileAbsolutePath, String remoteFolder) throws Exception{ - AnalysisLogger.getLogger().debug("Transferring file "+fileAbsolutePath+" to "+service+":"+port); + + // returns the number of transferred bytes + public static boolean transferFileToService(String scope, String username, String service, int port, String fileAbsolutePath, String remoteFolder) throws Exception { + AnalysisLogger.getLogger().debug("Transferring file " + fileAbsolutePath + " to " + service + ":" + port); ScopeProvider.instance.set(scope); + AgentLibrary library = transferAgent().at(service, port).build(); ArrayList input = new ArrayList(); - String localfolder = new File(fileAbsolutePath).getParent(); - String file = new File(fileAbsolutePath).getName(); - AnalysisLogger.getLogger().debug("Uploading file "+file+" onto storage"); - String storagesmpurl = StorageUtils.uploadFilesOnStorage(scope, username,localfolder,file); - AnalysisLogger.getLogger().debug("SMP url generated: "+storagesmpurl); + File localFile = new File(fileAbsolutePath); + if (!localFile.exists()) + throw new Exception("Local file does not exist: " + localFile); + + String localfolder = localFile.getParent(); + String file = localFile.getName(); + AnalysisLogger.getLogger().debug("Uploading file " + file + " onto storage"); + String storagesmpurl = StorageUtils.uploadFilesOnStorage(scope, username, localfolder, file); + //urls for testing + //storagesmpurl = "http://dev.d4science.org/smp?smp-uri="+storagesmpurl+"&fileName="+file; + // String storagesmpurl = "smp://data.gcube.org/sHtVhK4clGtbcWCliQud+5b4PfGx5BW+GmbP5+HKCzc="; + // String storagesmpurl = "http://goo.gl/r6ggMA"; + // String storagesmpurl = "http://dl.dropboxusercontent.com/u/12809149/3_Aquamaps.jpg"; + AnalysisLogger.getLogger().debug("SMP url generated: " + storagesmpurl); URI uri = new URI(storagesmpurl); input.add(uri); - - TransferOptions options = new TransferOptions(); - + + TransferOptions options = new TransferOptions(); + options = new TransferOptions(); - options.setOverwriteFile(true); - options.setType(storageType.LocalGHN); + options.setOverwriteFile(false); + options.setType(storageType.DataStorage); options.setUnzipFile(false); options.setTransferTimeout(3, TimeUnit.HOURS); AnalysisLogger.getLogger().debug("Transferring..."); - ArrayList outcomes = library.startTransferSync(input, remoteFolder, options); + + //old code for sync transfer +// ArrayList outcomes = library.startTransferSync(input, remoteFolder, options); + + + ArrayList outputURI = new ArrayList(); + outputURI.add(new URI("file://"+remoteFolder+""+file)); + + AnalysisLogger.getLogger().debug("Remote file name will be: " + outputURI.get(0)); + + String transferId = library.startTransfer(input, outputURI, options); + + TransferStatus transferStatus = null; + + do { + try { + + Thread.sleep(1000); + transferStatus = TransferStatus.valueOf(library.monitorTransfer(transferId)); + + } catch (MonitorTransferException e) { + e.printStackTrace(); + } + + } while (!transferStatus.hasCompleted()); + + ArrayList outcomes = library.getTransferOutcomes(transferId, FileTransferOutcome.class); + AnalysisLogger.getLogger().debug("Transferring complete"); - Long bytes = 0L; - for (FileTransferOutcome outcome:outcomes){ - AnalysisLogger.getLogger().debug("Outcome "+outcome); - AnalysisLogger.getLogger().debug("Transferred file name "+outcome.fileName()); - AnalysisLogger.getLogger().debug("Transferring success "+outcome.isSuccess()); - AnalysisLogger.getLogger().debug("Transferred bytes "+outcome.getTotal_size()); - AnalysisLogger.getLogger().debug("Transfer time "+outcome.getTransferTime()); - bytes = outcome.getTotal_size(); + boolean success = false; + String outcomeString = ""; + for (FileTransferOutcome outcome : outcomes) { + AnalysisLogger.getLogger().debug("Outcome " + outcome); + outcomeString = outcome.toString(); + AnalysisLogger.getLogger().debug("Transferred file name " + outcome.fileName()); + AnalysisLogger.getLogger().debug("Transferring success " + outcome.isSuccess()); + AnalysisLogger.getLogger().debug("Transferred bytes " + outcome.getTotal_size()); + AnalysisLogger.getLogger().debug("Transfer time " + outcome.getTransferTime()); + success = outcome.isSuccess(); } - return bytes; - } - - + + if (!success) + throw new Exception("No Bytes were transferred to the Thredds server: "+outcomeString); + + return success; + } + } diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/IfraRetrieval.java b/src/main/java/org/gcube/dataanalysis/executor/util/IfraRetrieval.java deleted file mode 100644 index 738f875..0000000 --- a/src/main/java/org/gcube/dataanalysis/executor/util/IfraRetrieval.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.gcube.dataanalysis.executor.util; - -import java.util.ArrayList; -import java.util.List; - -import org.gcube.common.resources.gcore.ServiceEndpoint; -import org.gcube.common.scope.api.ScopeProvider; -import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.resources.discovery.client.api.DiscoveryClient; -import org.gcube.resources.discovery.client.queries.api.SimpleQuery; -import org.gcube.resources.discovery.icclient.ICFactory; - -public class IfraRetrieval { - - public static List retrieveAddresses(String Category, String scope, String exclude) { - if (scope == null || scope.length() == 0) - return new ArrayList(); - -// AnalysisLogger.getLogger().debug("RetrieveAddressesFromInfra->Setting Scope to " + scope+" and executing query"); - ScopeProvider.instance.set(scope); - - SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class); - query.addCondition("$resource/Profile/Category/text() eq '" + Category + "'").addCondition("$resource/Profile[Name[not(contains(., '" + exclude + "'))]]").setResult("$resource/Profile/AccessPoint/Interface/Endpoint/text()"); - DiscoveryClient client = ICFactory.client(); - List addresses = client.submit(query); -// AnalysisLogger.getLogger().debug("RetrieveAddressesFromInfra->Query to IS finished"); - return addresses; - } - -} diff --git a/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java b/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java new file mode 100644 index 0000000..f92afb5 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/executor/util/InfraRetrieval.java @@ -0,0 +1,58 @@ +package org.gcube.dataanalysis.executor.util; + +import java.util.ArrayList; +import java.util.List; + +import org.gcube.common.resources.gcore.GCoreEndpoint; +import org.gcube.common.resources.gcore.ServiceEndpoint; +import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.resources.discovery.client.api.DiscoveryClient; +import org.gcube.resources.discovery.client.queries.api.SimpleQuery; +import org.gcube.resources.discovery.icclient.ICFactory; + +public class InfraRetrieval { + + public static List retrieveAddresses(String Category, String scope, String exclude) { + if (scope == null || scope.length() == 0) + return new ArrayList(); + +// AnalysisLogger.getLogger().debug("RetrieveAddressesFromInfra->Setting Scope to " + scope+" and executing query"); + ScopeProvider.instance.set(scope); + + SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class); + query.addCondition("$resource/Profile/Category/text() eq '" + Category + "'").addCondition("$resource/Profile[Name[not(contains(., '" + exclude + "'))]]").setResult("$resource/Profile/AccessPoint/Interface/Endpoint/text()"); + DiscoveryClient client = ICFactory.client(); + List addresses = client.submit(query); +// AnalysisLogger.getLogger().debug("RetrieveAddressesFromInfra->Query to IS finished"); + return addresses; + } + + public static List retrieveServiceAddress(String Category, String Name, String scope, String exclude) { + if (scope == null || scope.length() == 0) + return new ArrayList(); + + ScopeProvider.instance.set(scope); + + SimpleQuery query = ICFactory.queryFor(ServiceEndpoint.class); + query.addCondition("$resource/Profile/Category/text() eq '" + Category + "'").addCondition("$resource/Profile/Name/text() eq '" + Name+ "'").addCondition("$resource/Profile[Name[not(contains(., '" + exclude + "'))]]").setResult("$resource/Profile/AccessPoint/Interface/Endpoint/text()"); + DiscoveryClient client = ICFactory.client(); + List addresses = client.submit(query); + + return addresses; + } + + public static List retrieveService(String service, String scope) { + if (scope == null || scope.length() == 0) + return new ArrayList(); + + ScopeProvider.instance.set(scope); + + SimpleQuery query = ICFactory.queryFor(GCoreEndpoint.class); + query.addCondition("$resource/Profile/ServiceName/text() eq '"+service+"'").setResult("$resource/Profile/AccessPoint/RunningInstanceInterfaces/Endpoint/text()"); + DiscoveryClient client = ICFactory.client(); + List addresses = client.submit(query); + + return addresses; + } + +}