From 1a2126742f5c3022ac9b03c3a03f519a269952ed Mon Sep 17 00:00:00 2001 From: Gianpaolo Coro Date: Fri, 30 Sep 2016 14:38:23 +0000 Subject: [PATCH] git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/EcologicalEngineGeoSpatialExtension@132212 82a268e6-3cf1-43bd-a215-b396298e98cf --- pom.xml | 22 +- .../geo/connectors/asc/AscRasterReader.java | 3 +- .../geo/connectors/geotiff/GeoTiff.java | 2 +- .../dataanalysis/geo/connectors/wcs/WCS.java | 2 +- .../geo/utils/transfer/CopyStreamHandler.java | 45 ++++ .../HttpFileSystemConfBuilderPatched.java | 44 ++++ .../geo/utils/transfer/TransferUtil.java | 243 ++++++++++++++++++ .../geo/utils/transfer/Utils.java | 60 +++++ .../utils/transfer/VFileSystemManager.java | 36 +++ 9 files changed, 447 insertions(+), 10 deletions(-) create mode 100644 src/main/java/org/gcube/dataanalysis/geo/utils/transfer/CopyStreamHandler.java create mode 100644 src/main/java/org/gcube/dataanalysis/geo/utils/transfer/HttpFileSystemConfBuilderPatched.java create mode 100644 src/main/java/org/gcube/dataanalysis/geo/utils/transfer/TransferUtil.java create mode 100644 src/main/java/org/gcube/dataanalysis/geo/utils/transfer/Utils.java create mode 100644 src/main/java/org/gcube/dataanalysis/geo/utils/transfer/VFileSystemManager.java diff --git a/pom.xml b/pom.xml index 072dadd..c3f8cb3 100644 --- a/pom.xml +++ b/pom.xml @@ -110,10 +110,20 @@ - + + + commons-net + commons-net + 3.1 + + + org.apache.commons + commons-vfs2 + 2.1 rapidminer-custom @@ -150,8 +160,8 @@ - - + + maven-compiler-plugin 3.1 @@ -168,9 +178,9 @@ true - - - + + + org.apache.maven.plugins maven-assembly-plugin diff --git a/src/main/java/org/gcube/dataanalysis/geo/connectors/asc/AscRasterReader.java b/src/main/java/org/gcube/dataanalysis/geo/connectors/asc/AscRasterReader.java index 5db3c41..ad9e9e9 100644 --- a/src/main/java/org/gcube/dataanalysis/geo/connectors/asc/AscRasterReader.java +++ b/src/main/java/org/gcube/dataanalysis/geo/connectors/asc/AscRasterReader.java @@ -6,13 +6,12 @@ import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; -import java.net.URL; import java.net.URLConnection; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.data.transfer.common.TransferUtil; +import org.gcube.dataanalysis.geo.utils.transfer.TransferUtil; /** * A class which reads an ESRI ASCII raster file into a Raster diff --git a/src/main/java/org/gcube/dataanalysis/geo/connectors/geotiff/GeoTiff.java b/src/main/java/org/gcube/dataanalysis/geo/connectors/geotiff/GeoTiff.java index d2cc4f4..664b7b6 100644 --- a/src/main/java/org/gcube/dataanalysis/geo/connectors/geotiff/GeoTiff.java +++ b/src/main/java/org/gcube/dataanalysis/geo/connectors/geotiff/GeoTiff.java @@ -7,12 +7,12 @@ import java.util.UUID; import org.gcube.contentmanagement.graphtools.utils.HttpRequest; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.data.transfer.common.TransferUtil; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.ecoengine.utils.Tuple; import org.gcube.dataanalysis.geo.connectors.asc.ASC; import org.gcube.dataanalysis.geo.interfaces.GISDataConnector; import org.gcube.dataanalysis.geo.utils.GdalConverter; +import org.gcube.dataanalysis.geo.utils.transfer.TransferUtil; public class GeoTiff implements GISDataConnector { diff --git a/src/main/java/org/gcube/dataanalysis/geo/connectors/wcs/WCS.java b/src/main/java/org/gcube/dataanalysis/geo/connectors/wcs/WCS.java index e78cc23..1e566ab 100644 --- a/src/main/java/org/gcube/dataanalysis/geo/connectors/wcs/WCS.java +++ b/src/main/java/org/gcube/dataanalysis/geo/connectors/wcs/WCS.java @@ -8,7 +8,6 @@ import java.util.UUID; import org.gcube.contentmanagement.graphtools.utils.HttpRequest; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; -import org.gcube.data.transfer.common.TransferUtil; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.ecoengine.utils.Tuple; import org.gcube.dataanalysis.geo.connectors.asc.ASC; @@ -17,6 +16,7 @@ import org.gcube.dataanalysis.geo.meta.OGCFormatter; import org.gcube.dataanalysis.geo.utils.GdalConverter; import org.gcube.dataanalysis.geo.utils.GeoTiffMetadata; import org.gcube.dataanalysis.geo.utils.VectorOperations; +import org.gcube.dataanalysis.geo.utils.transfer.TransferUtil; public class WCS implements GISDataConnector { // WCS examples diff --git a/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/CopyStreamHandler.java b/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/CopyStreamHandler.java new file mode 100644 index 0000000..2883a64 --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/CopyStreamHandler.java @@ -0,0 +1,45 @@ +package org.gcube.dataanalysis.geo.utils.transfer; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.commons.net.io.CopyStreamException; +import org.apache.commons.net.io.CopyStreamListener; +import org.apache.commons.net.io.Util; + +/** + * + * @author Andrea + * + */ +public class CopyStreamHandler implements Runnable { + private InputStream in; + private OutputStream out; + private long streamSize; + CopyStreamListener listener; + + CopyStreamHandler(InputStream in, OutputStream out, long streamSize, + CopyStreamListener listener) { + this.in = in; + this.out = out; + this.streamSize = streamSize; + this.listener = listener; + + } + + public void run() { + try { + Util.copyStream(in, out, TransferUtil.bufferSize, + streamSize, listener); + } catch (CopyStreamException e) { + e.printStackTrace(); + } finally { + Util.closeQuietly(in); + Util.closeQuietly(out); + + } + + } + + +} \ No newline at end of file diff --git a/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/HttpFileSystemConfBuilderPatched.java b/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/HttpFileSystemConfBuilderPatched.java new file mode 100644 index 0000000..499ab1b --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/HttpFileSystemConfBuilderPatched.java @@ -0,0 +1,44 @@ +package org.gcube.dataanalysis.geo.utils.transfer; + + +import org.apache.commons.vfs2.FileSystemOptions; +import org.apache.commons.vfs2.provider.http.HttpFileSystemConfigBuilder; + + +/** + * + * Patched version declaring timeout + * @author Andrea + * + */ +public final class HttpFileSystemConfBuilderPatched extends HttpFileSystemConfigBuilder { + + private static final HttpFileSystemConfBuilderPatched BUILDER = new HttpFileSystemConfBuilderPatched(); + + protected HttpFileSystemConfBuilderPatched(String prefix) { + super("http."); + } + + protected HttpFileSystemConfBuilderPatched() { + super("http."); + } + + public static HttpFileSystemConfBuilderPatched getInstance() + { + return BUILDER; + } + + + + public void setTimeout(FileSystemOptions opts, int timeout) + { + setParam(opts, "http.socket.timeout", timeout); + + } + + public int getTimeout(FileSystemOptions opts){ + return getInteger(opts, "http.socket.timeout"); + } + + +} diff --git a/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/TransferUtil.java b/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/TransferUtil.java new file mode 100644 index 0000000..a5acefa --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/TransferUtil.java @@ -0,0 +1,243 @@ +package org.gcube.dataanalysis.geo.utils.transfer; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.net.io.CopyStreamEvent; +import org.apache.commons.net.io.CopyStreamListener; +import org.apache.commons.net.io.Util; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.VFS; + +/** + * + * @author Andrea Manzi + * + */ +public class TransferUtil { + + public static int bufferSize = Util.DEFAULT_COPY_BUFFER_SIZE * 1000; + + private long bytesTransferredForCurrent; + + static int connectiontimeout = 100000000; + static int transferTimeout = 100000000; + + VFileSystemManager localFSManager = null; + + private ExecutorService pool; + + public TransferUtil() throws FileSystemException { + + localFSManager = new VFileSystemManager("/"); + + pool = Executors.newFixedThreadPool(1); + + } + + public int getConnectiontimeout() { + return connectiontimeout; + } + + public void setConnectiontimeout(int connectiontimeout) { + TransferUtil.connectiontimeout = connectiontimeout; + } + + public int getTransferTimeout() { + return transferTimeout; + } + + public void setTransferTimeout(int transferTimeout) { + TransferUtil.transferTimeout = transferTimeout; + } + + /** + * + * @param uri + * @param connectionTimeout + * @return + * @throws FileSystemException + */ + public static InputStream getInputStream(URI uri, int connectionTimeout) + throws FileSystemException { + connectiontimeout = connectionTimeout; + FileObject inputFile = TransferUtil.prepareFileObject(uri.toString()); + + return inputFile.getContent().getInputStream(); + } + + /** + * + * @param uri + * @param outfile + * @throws Exception + */ + public synchronized long transfer(URI uri, String outfile) throws Exception { + + bytesTransferredForCurrent = 0; + + FileObject inputFile = TransferUtil.prepareFileObject(uri.toString()); + + InputStream sourceFileIn = inputFile.getContent().getInputStream(); + + // getting outfile info + + FileOutputStream out = new FileOutputStream(new File(outfile)); + + // parameters + boolean terminate = false; + + bytesTransferredForCurrent = 0; + CopyStreamHandler handler = new CopyStreamHandler(sourceFileIn, out, + inputFile.getContent().getSize(), listener); + + try { + pool.execute(handler); + pool.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + pool.shutdownNow(); + throw new Exception("Error while executing the transfer",e); + } + + // waiting for transfer to complete + terminate = pool.awaitTermination(transferTimeout, + TimeUnit.MILLISECONDS); + sourceFileIn.close(); + out.close(); + return bytesTransferredForCurrent; + } + + /** + * + * @param uri + * @param outfile + * @throws Exception + */ + public synchronized void performTransfer(URI uri, String outfile) throws Exception { + + bytesTransferredForCurrent = 0; + // parameters + boolean terminate = false; + + FileObject inputFile = TransferUtil.prepareFileObject(uri.toString()); + + InputStream sourceFileIn = inputFile.getContent().getInputStream(); + + // getting outfile info + + FileOutputStream out = new FileOutputStream(new File(outfile)); + + CopyStreamHandler handler = new CopyStreamHandler(sourceFileIn, out, + inputFile.getContent().getSize(), listener); + + try { + pool.execute(handler); + pool.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + pool.shutdownNow(); + throw new Exception("Error while executing the transfer",e); + } + + // waiting for transfer to complete + terminate = pool.awaitTermination(transferTimeout, + TimeUnit.MILLISECONDS); + sourceFileIn.close(); + out.close(); + } + + /** + * + * @param uri + * @param outPath + * @throws Exception + */ + public synchronized void performTransferToFolder(URI uri, String outPath) + throws Exception { + + FileObject inputFile = TransferUtil.prepareFileObject(uri.toString()); + + InputStream sourceFileIn = inputFile.getContent().getInputStream(); + + // getting outfile info + String outputFile; + outputFile = inputFile.getName().getBaseName(); + + if (outPath.endsWith("/")) + outPath = outPath.substring(0, outPath.length() - 1); + + String relativeOutputFile = outPath + File.separator + outputFile; + + FileObject absoluteOutputFile = localFSManager + .resolveFile(relativeOutputFile); + + FileObject absolutePath = localFSManager.resolveFile(outPath); + + absolutePath.createFolder(); + + // parameters + boolean terminate = false; + + OutputStream destinationFileOut = absoluteOutputFile.getContent() + .getOutputStream(); + + bytesTransferredForCurrent = 0; + CopyStreamHandler handler = new CopyStreamHandler(sourceFileIn, + destinationFileOut, inputFile.getContent().getSize(), listener); + + try { + pool.execute(handler); + pool.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + pool.shutdownNow(); + absoluteOutputFile.delete(); + throw new Exception("Error while executing the transfer",e); + } + + // waiting for transfer to complete + terminate = pool.awaitTermination(transferTimeout, + TimeUnit.MILLISECONDS); + sourceFileIn.close(); + destinationFileOut.close(); + + } + + /** + * + * @param URI + * @return + * @throws FileSystemException + */ + private static FileObject prepareFileObject(String URI) + throws FileSystemException { + return VFS.getManager().resolveFile(URI, Utils.createDefaultOptions(URI,connectiontimeout)); + } + + + + CopyStreamListener listener = new CopyStreamListener() { + @Override + public void bytesTransferred(long arg0, int arg1, long arg2) { + // only for the current object + bytesTransferredForCurrent = bytesTransferredForCurrent + arg1; + + } + + @Override + public void bytesTransferred(CopyStreamEvent arg0) { + } + }; + + + + +} diff --git a/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/Utils.java b/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/Utils.java new file mode 100644 index 0000000..aa5648d --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/Utils.java @@ -0,0 +1,60 @@ +package org.gcube.dataanalysis.geo.utils.transfer; + +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.FileSystemOptions; +import org.apache.commons.vfs2.provider.ftp.FtpFileSystemConfigBuilder; +import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder; + + + +public class Utils { + + /** + * + * @param URI + * @return + * @throws FileSystemException + */ + protected static FileSystemOptions createDefaultOptions(String URI,int connectiontimeout) { + // Create SFTP options + FileSystemOptions opts = new FileSystemOptions(); + + // check the URL type + if (URI.startsWith("ftp://")) { + + // Root directory set to user home + FtpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, + true); + + // Timeout is count by Milliseconds + FtpFileSystemConfigBuilder.getInstance().setSoTimeout(opts, + connectiontimeout); + + FtpFileSystemConfigBuilder.getInstance().setDataTimeout(opts, + connectiontimeout); + return opts; + } else if (URI.startsWith("sftp://")) { + // Root directory set to user home + SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, + true); + + // Timeout is count by Milliseconds + SftpFileSystemConfigBuilder.getInstance().setTimeout(opts, + connectiontimeout); + return opts; + } else if (URI.startsWith("s3://")) { + + // com.scoyo.commons.vfs.S3Util.initS3Provider(ServiceContext.getContext().getAwsKeyID(),ServiceContext.getContext().getAwsKey()); + } else if (URI.startsWith("http://") || URI.startsWith("https://")) { + // Root directory set to user home + HttpFileSystemConfBuilderPatched.getInstance().setTimeout(opts, + connectiontimeout); + HttpFileSystemConfBuilderPatched.getInstance().setFollowRedirect(opts,true); + return opts; + } else if (URI.startsWith("smp://")) { + return opts; + } + return null; + } + +} \ No newline at end of file diff --git a/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/VFileSystemManager.java b/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/VFileSystemManager.java new file mode 100644 index 0000000..7d86c9a --- /dev/null +++ b/src/main/java/org/gcube/dataanalysis/geo/utils/transfer/VFileSystemManager.java @@ -0,0 +1,36 @@ +package org.gcube.dataanalysis.geo.utils.transfer; + +import java.io.File; + +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.VFS; + +/** + * + * @author andrea + * + */ + +public class VFileSystemManager { + + File vfsRoot = null; + + FileSystemManager manager = null; + + public VFileSystemManager(String vfsRoot) throws FileSystemException { + this.vfsRoot = new File( vfsRoot); + this.manager = VFS.getManager(); + + } + + public FileObject resolveFile(String filePath) throws FileSystemException{ + return this.manager.resolveFile(this.vfsRoot,filePath); + } + + public FileObject resolveFolder(String filePath) throws FileSystemException{ + return this.manager.resolveFile(this.vfsRoot,filePath); + } + +}