package eu.dnetlib.data.collector.plugins.sftp; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.util.*; import com.jcraft.jsch.*; import eu.dnetlib.data.collector.rmi.CollectorServiceRuntimeException; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; /** * Created by andrea on 11/01/16. */ public class SftpIterator implements Iterator { private static final Log log = LogFactory.getLog(SftpIterator.class); private static final int MAX_RETRIES = 5; private static final int DEFAULT_TIMEOUT = 30000; private static final long BACKOFF_MILLIS = 10000; private String baseUrl; private String sftpURIScheme; private String sftpServerAddress; private String remoteSftpBasePath; private String username; private String password; private boolean isRecursive; private Set extensionsSet; private boolean incremental; private Session sftpSession; private ChannelSftp sftpChannel; private Queue queue; private DateTime fromDate = null; private DateTimeFormatter simpleDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd"); public SftpIterator(String baseUrl, String username, String password, boolean isRecursive, Set extensionsSet, String fromDate) { this.baseUrl = baseUrl; this.username = username; this.password = password; this.isRecursive = isRecursive; this.extensionsSet = extensionsSet; this.incremental = StringUtils.isNotBlank(fromDate); if (incremental) { //I expect fromDate in the format 'yyyy-MM-dd'. See class eu.dnetlib.msro.workflows.nodes.collect.FindDateRangeForIncrementalHarvestingJobNode . this.fromDate = DateTime.parse(fromDate, simpleDateTimeFormatter); log.debug("fromDate string: " + fromDate + " -- parsed: " + this.fromDate.toString()); } try { URI sftpServer = new URI(baseUrl); this.sftpURIScheme = sftpServer.getScheme(); this.sftpServerAddress = sftpServer.getHost(); this.remoteSftpBasePath = sftpServer.getPath(); } catch (URISyntaxException e) { throw new CollectorServiceRuntimeException("Bad syntax in the URL " + baseUrl); } connectToSftpServer(); initializeQueue(); } private void connectToSftpServer() { JSch jsch = new JSch(); try { JSch.setConfig("StrictHostKeyChecking", "no"); sftpSession = jsch.getSession(username, sftpServerAddress); sftpSession.setPassword(password); sftpSession.connect(); Channel channel = sftpSession.openChannel(sftpURIScheme); channel.connect(); sftpChannel = (ChannelSftp) channel; String pwd = sftpChannel.pwd(); log.debug("PWD from server: " + pwd); String fullPath = pwd + remoteSftpBasePath; sftpChannel.cd(fullPath); log.debug("PWD from server 2 after 'cd " + fullPath + "' : " + sftpChannel.pwd()); log.info("Connected to SFTP server " + sftpServerAddress); } catch (JSchException e) { throw new CollectorServiceRuntimeException("Unable to connect to remote SFTP server.", e); } catch (SftpException e) { throw new CollectorServiceRuntimeException("Unable to access the base remote path on the SFTP server.", e); } } private void disconnectFromSftpServer() { sftpChannel.exit(); sftpSession.disconnect(); } private void initializeQueue() { queue = new LinkedList(); log.info(String.format("SFTP collector plugin collecting from %s with recursion = %s, incremental = %s with fromDate=%s", remoteSftpBasePath, isRecursive, incremental, fromDate)); listDirectoryRecursive(".", ""); } private void listDirectoryRecursive(final String parentDir, final String currentDir) { String dirToList = parentDir; if (StringUtils.isNotBlank(currentDir)) { dirToList += "/" + currentDir; } log.debug("PARENT DIR: " + parentDir); log.debug("DIR TO LIST: " + dirToList); try { Vector ls = sftpChannel.ls(dirToList); for (ChannelSftp.LsEntry entry : ls) { String currentFileName = entry.getFilename(); if (currentFileName.equals(".") || currentFileName.equals("..")) { // skip parent directory and directory itself continue; } SftpATTRS attrs = entry.getAttrs(); if (attrs.isDir()) { if (isRecursive) { listDirectoryRecursive(dirToList, currentFileName); } } else { // test the file for extensions compliance and, just in case, add it to the list. for (String ext : extensionsSet) { if (currentFileName.endsWith(ext)) { //test if the file has been changed after the last collection date: if (incremental) { int mTime = attrs.getMTime(); //int times are values reduced by the milliseconds, hence we multiply per 1000L DateTime dt = new DateTime(mTime * 1000L); if (dt.isAfter(fromDate)) { queue.add(currentFileName); log.debug(currentFileName + " has changed and must be re-collected"); } else { if (log.isDebugEnabled()) { log.debug(currentFileName + " has not changed since last collection"); } } } else { //if it is not incremental, just add it to the queue queue.add(currentFileName); } } } } } } catch (SftpException e) { throw new CollectorServiceRuntimeException("Cannot list the sftp remote directory", e); } } @Override public boolean hasNext() { if (queue.isEmpty()) { disconnectFromSftpServer(); return false; } else { return true; } } @Override public String next() { String nextRemotePath = queue.remove(); int nRepeat = 0; String fullPathFile = nextRemotePath; while (nRepeat < MAX_RETRIES) { try { OutputStream baos = new ByteArrayOutputStream(); sftpChannel.get(nextRemotePath, baos); if (log.isDebugEnabled()) { fullPathFile = sftpChannel.pwd() + "/" + nextRemotePath; log.debug(String.format("Collected file from SFTP: %s%s", sftpServerAddress, fullPathFile)); } return baos.toString(); } catch (SftpException e) { nRepeat++; log.warn(String.format("An error occurred [%s] for %s%s, retrying.. [retried %s time(s)]", e.getMessage(), sftpServerAddress, fullPathFile, nRepeat)); // disconnectFromSftpServer(); try { Thread.sleep(BACKOFF_MILLIS); } catch (InterruptedException e1) { log.error(e1); } } } throw new CollectorServiceRuntimeException( String.format("Impossible to retrieve FTP file %s after %s retries. Aborting FTP collection.", fullPathFile, nRepeat)); } @Override public void remove() { throw new UnsupportedOperationException(); } }