Gianpaolo Coro 2016-09-30 14:38:23 +00:00
parent 456075e618
commit 1a2126742f
9 changed files with 447 additions and 10 deletions

22
pom.xml
View File

@ -110,10 +110,20 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency> <!-- <dependency>
<groupId>org.gcube.data.transfer</groupId> <groupId>org.gcube.data.transfer</groupId>
<artifactId>transfer-library</artifactId> <artifactId>transfer-library</artifactId>
<version>(0.0.1-SNAPSHOT,2.0.0-SNAPSHOT)</version> <version>(0.0.1-SNAPSHOT,2.0.0-SNAPSHOT)</version>
</dependency> -->
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<version>2.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>rapidminer-custom</groupId> <groupId>rapidminer-custom</groupId>
@ -150,8 +160,8 @@
</repositories> </repositories>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version> <version>3.1</version>
@ -168,9 +178,9 @@
<skipTests>true</skipTests> <skipTests>true</skipTests>
</configuration> </configuration>
</plugin> </plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId> <artifactId>maven-assembly-plugin</artifactId>

View File

@ -6,13 +6,12 @@ import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.data.transfer.common.TransferUtil; import org.gcube.dataanalysis.geo.utils.transfer.TransferUtil;
/** /**
* A class which reads an ESRI ASCII raster file into a Raster * A class which reads an ESRI ASCII raster file into a Raster

View File

@ -7,12 +7,12 @@ import java.util.UUID;
import org.gcube.contentmanagement.graphtools.utils.HttpRequest; import org.gcube.contentmanagement.graphtools.utils.HttpRequest;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.data.transfer.common.TransferUtil;
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
import org.gcube.dataanalysis.ecoengine.utils.Tuple; import org.gcube.dataanalysis.ecoengine.utils.Tuple;
import org.gcube.dataanalysis.geo.connectors.asc.ASC; import org.gcube.dataanalysis.geo.connectors.asc.ASC;
import org.gcube.dataanalysis.geo.interfaces.GISDataConnector; import org.gcube.dataanalysis.geo.interfaces.GISDataConnector;
import org.gcube.dataanalysis.geo.utils.GdalConverter; import org.gcube.dataanalysis.geo.utils.GdalConverter;
import org.gcube.dataanalysis.geo.utils.transfer.TransferUtil;
public class GeoTiff implements GISDataConnector { public class GeoTiff implements GISDataConnector {

View File

@ -8,7 +8,6 @@ import java.util.UUID;
import org.gcube.contentmanagement.graphtools.utils.HttpRequest; import org.gcube.contentmanagement.graphtools.utils.HttpRequest;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.data.transfer.common.TransferUtil;
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
import org.gcube.dataanalysis.ecoengine.utils.Tuple; import org.gcube.dataanalysis.ecoengine.utils.Tuple;
import org.gcube.dataanalysis.geo.connectors.asc.ASC; import org.gcube.dataanalysis.geo.connectors.asc.ASC;
@ -17,6 +16,7 @@ import org.gcube.dataanalysis.geo.meta.OGCFormatter;
import org.gcube.dataanalysis.geo.utils.GdalConverter; import org.gcube.dataanalysis.geo.utils.GdalConverter;
import org.gcube.dataanalysis.geo.utils.GeoTiffMetadata; import org.gcube.dataanalysis.geo.utils.GeoTiffMetadata;
import org.gcube.dataanalysis.geo.utils.VectorOperations; import org.gcube.dataanalysis.geo.utils.VectorOperations;
import org.gcube.dataanalysis.geo.utils.transfer.TransferUtil;
public class WCS implements GISDataConnector { public class WCS implements GISDataConnector {
// WCS examples // WCS examples

View File

@ -0,0 +1,45 @@
package org.gcube.dataanalysis.geo.utils.transfer;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.commons.net.io.CopyStreamException;
import org.apache.commons.net.io.CopyStreamListener;
import org.apache.commons.net.io.Util;
/**
*
* @author Andrea
*
*/
public class CopyStreamHandler implements Runnable {
private InputStream in;
private OutputStream out;
private long streamSize;
CopyStreamListener listener;
CopyStreamHandler(InputStream in, OutputStream out, long streamSize,
CopyStreamListener listener) {
this.in = in;
this.out = out;
this.streamSize = streamSize;
this.listener = listener;
}
public void run() {
try {
Util.copyStream(in, out, TransferUtil.bufferSize,
streamSize, listener);
} catch (CopyStreamException e) {
e.printStackTrace();
} finally {
Util.closeQuietly(in);
Util.closeQuietly(out);
}
}
}

View File

@ -0,0 +1,44 @@
package org.gcube.dataanalysis.geo.utils.transfer;
import org.apache.commons.vfs2.FileSystemOptions;
import org.apache.commons.vfs2.provider.http.HttpFileSystemConfigBuilder;
/**
*
* Patched version declaring timeout
* @author Andrea
*
*/
public final class HttpFileSystemConfBuilderPatched extends HttpFileSystemConfigBuilder {
private static final HttpFileSystemConfBuilderPatched BUILDER = new HttpFileSystemConfBuilderPatched();
protected HttpFileSystemConfBuilderPatched(String prefix) {
super("http.");
}
protected HttpFileSystemConfBuilderPatched() {
super("http.");
}
public static HttpFileSystemConfBuilderPatched getInstance()
{
return BUILDER;
}
public void setTimeout(FileSystemOptions opts, int timeout)
{
setParam(opts, "http.socket.timeout", timeout);
}
public int getTimeout(FileSystemOptions opts){
return getInteger(opts, "http.socket.timeout");
}
}

View File

@ -0,0 +1,243 @@
package org.gcube.dataanalysis.geo.utils.transfer;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.net.io.CopyStreamEvent;
import org.apache.commons.net.io.CopyStreamListener;
import org.apache.commons.net.io.Util;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.VFS;
/**
*
* @author Andrea Manzi
*
*/
public class TransferUtil {
public static int bufferSize = Util.DEFAULT_COPY_BUFFER_SIZE * 1000;
private long bytesTransferredForCurrent;
static int connectiontimeout = 100000000;
static int transferTimeout = 100000000;
VFileSystemManager localFSManager = null;
private ExecutorService pool;
public TransferUtil() throws FileSystemException {
localFSManager = new VFileSystemManager("/");
pool = Executors.newFixedThreadPool(1);
}
public int getConnectiontimeout() {
return connectiontimeout;
}
public void setConnectiontimeout(int connectiontimeout) {
TransferUtil.connectiontimeout = connectiontimeout;
}
public int getTransferTimeout() {
return transferTimeout;
}
public void setTransferTimeout(int transferTimeout) {
TransferUtil.transferTimeout = transferTimeout;
}
/**
*
* @param uri
* @param connectionTimeout
* @return
* @throws FileSystemException
*/
public static InputStream getInputStream(URI uri, int connectionTimeout)
throws FileSystemException {
connectiontimeout = connectionTimeout;
FileObject inputFile = TransferUtil.prepareFileObject(uri.toString());
return inputFile.getContent().getInputStream();
}
/**
*
* @param uri
* @param outfile
* @throws Exception
*/
public synchronized long transfer(URI uri, String outfile) throws Exception {
bytesTransferredForCurrent = 0;
FileObject inputFile = TransferUtil.prepareFileObject(uri.toString());
InputStream sourceFileIn = inputFile.getContent().getInputStream();
// getting outfile info
FileOutputStream out = new FileOutputStream(new File(outfile));
// parameters
boolean terminate = false;
bytesTransferredForCurrent = 0;
CopyStreamHandler handler = new CopyStreamHandler(sourceFileIn, out,
inputFile.getContent().getSize(), listener);
try {
pool.execute(handler);
pool.shutdown();
} catch (Exception e) {
e.printStackTrace();
pool.shutdownNow();
throw new Exception("Error while executing the transfer",e);
}
// waiting for transfer to complete
terminate = pool.awaitTermination(transferTimeout,
TimeUnit.MILLISECONDS);
sourceFileIn.close();
out.close();
return bytesTransferredForCurrent;
}
/**
*
* @param uri
* @param outfile
* @throws Exception
*/
public synchronized void performTransfer(URI uri, String outfile) throws Exception {
bytesTransferredForCurrent = 0;
// parameters
boolean terminate = false;
FileObject inputFile = TransferUtil.prepareFileObject(uri.toString());
InputStream sourceFileIn = inputFile.getContent().getInputStream();
// getting outfile info
FileOutputStream out = new FileOutputStream(new File(outfile));
CopyStreamHandler handler = new CopyStreamHandler(sourceFileIn, out,
inputFile.getContent().getSize(), listener);
try {
pool.execute(handler);
pool.shutdown();
} catch (Exception e) {
e.printStackTrace();
pool.shutdownNow();
throw new Exception("Error while executing the transfer",e);
}
// waiting for transfer to complete
terminate = pool.awaitTermination(transferTimeout,
TimeUnit.MILLISECONDS);
sourceFileIn.close();
out.close();
}
/**
*
* @param uri
* @param outPath
* @throws Exception
*/
public synchronized void performTransferToFolder(URI uri, String outPath)
throws Exception {
FileObject inputFile = TransferUtil.prepareFileObject(uri.toString());
InputStream sourceFileIn = inputFile.getContent().getInputStream();
// getting outfile info
String outputFile;
outputFile = inputFile.getName().getBaseName();
if (outPath.endsWith("/"))
outPath = outPath.substring(0, outPath.length() - 1);
String relativeOutputFile = outPath + File.separator + outputFile;
FileObject absoluteOutputFile = localFSManager
.resolveFile(relativeOutputFile);
FileObject absolutePath = localFSManager.resolveFile(outPath);
absolutePath.createFolder();
// parameters
boolean terminate = false;
OutputStream destinationFileOut = absoluteOutputFile.getContent()
.getOutputStream();
bytesTransferredForCurrent = 0;
CopyStreamHandler handler = new CopyStreamHandler(sourceFileIn,
destinationFileOut, inputFile.getContent().getSize(), listener);
try {
pool.execute(handler);
pool.shutdown();
} catch (Exception e) {
e.printStackTrace();
pool.shutdownNow();
absoluteOutputFile.delete();
throw new Exception("Error while executing the transfer",e);
}
// waiting for transfer to complete
terminate = pool.awaitTermination(transferTimeout,
TimeUnit.MILLISECONDS);
sourceFileIn.close();
destinationFileOut.close();
}
/**
*
* @param URI
* @return
* @throws FileSystemException
*/
private static FileObject prepareFileObject(String URI)
throws FileSystemException {
return VFS.getManager().resolveFile(URI, Utils.createDefaultOptions(URI,connectiontimeout));
}
CopyStreamListener listener = new CopyStreamListener() {
@Override
public void bytesTransferred(long arg0, int arg1, long arg2) {
// only for the current object
bytesTransferredForCurrent = bytesTransferredForCurrent + arg1;
}
@Override
public void bytesTransferred(CopyStreamEvent arg0) {
}
};
}

View File

@ -0,0 +1,60 @@
package org.gcube.dataanalysis.geo.utils.transfer;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileSystemOptions;
import org.apache.commons.vfs2.provider.ftp.FtpFileSystemConfigBuilder;
import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder;
public class Utils {
/**
*
* @param URI
* @return
* @throws FileSystemException
*/
protected static FileSystemOptions createDefaultOptions(String URI,int connectiontimeout) {
// Create SFTP options
FileSystemOptions opts = new FileSystemOptions();
// check the URL type
if (URI.startsWith("ftp://")) {
// Root directory set to user home
FtpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts,
true);
// Timeout is count by Milliseconds
FtpFileSystemConfigBuilder.getInstance().setSoTimeout(opts,
connectiontimeout);
FtpFileSystemConfigBuilder.getInstance().setDataTimeout(opts,
connectiontimeout);
return opts;
} else if (URI.startsWith("sftp://")) {
// Root directory set to user home
SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts,
true);
// Timeout is count by Milliseconds
SftpFileSystemConfigBuilder.getInstance().setTimeout(opts,
connectiontimeout);
return opts;
} else if (URI.startsWith("s3://")) {
// com.scoyo.commons.vfs.S3Util.initS3Provider(ServiceContext.getContext().getAwsKeyID(),ServiceContext.getContext().getAwsKey());
} else if (URI.startsWith("http://") || URI.startsWith("https://")) {
// Root directory set to user home
HttpFileSystemConfBuilderPatched.getInstance().setTimeout(opts,
connectiontimeout);
HttpFileSystemConfBuilderPatched.getInstance().setFollowRedirect(opts,true);
return opts;
} else if (URI.startsWith("smp://")) {
return opts;
}
return null;
}
}

View File

@ -0,0 +1,36 @@
package org.gcube.dataanalysis.geo.utils.transfer;
import java.io.File;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileSystemManager;
import org.apache.commons.vfs2.VFS;
/**
*
* @author andrea
*
*/
public class VFileSystemManager {
File vfsRoot = null;
FileSystemManager manager = null;
public VFileSystemManager(String vfsRoot) throws FileSystemException {
this.vfsRoot = new File( vfsRoot);
this.manager = VFS.getManager();
}
public FileObject resolveFile(String filePath) throws FileSystemException{
return this.manager.resolveFile(this.vfsRoot,filePath);
}
public FileObject resolveFolder(String filePath) throws FileSystemException{
return this.manager.resolveFile(this.vfsRoot,filePath);
}
}