package eu.openaire.urls_worker.util; import eu.openaire.urls_worker.controllers.FullTextsController; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; public class FilesCompressor { private static final Logger logger = LoggerFactory.getLogger(FilesCompressor.class); public static final int bufferSize = (5 * 1_048_576); // 5 Mb public static File compressMultipleFilesIntoOne(long assignmentsCounter, int tarBatchCounter, List filesToCompress, String baseDirectory) { // For example: assignments_2_full-texts_4.tar.zstd | where < 4 > is referred to the 4th batch of files requested by the Controller. File tarFile; try { tarFile = getTarArchiveWithFullTexts(filesToCompress, baseDirectory, assignmentsCounter, tarBatchCounter); } catch (Exception e) { logger.error("Exception when creating the tar-file for assignments_" + assignmentsCounter, e); return null; } finally { // Delete the files of this failed batch immediately. These files will not be requested again. The urls leading to these file will be reprocessed in the future. for ( String fileName : filesToCompress ) FullTextsController.deleteFile(baseDirectory + fileName); } // The "TAR" archive is not compressed, but it helps deliver multiple full-texts with a single Stream. // Then, we compress the archive, using Facebook's "ZStandard" algorithm, which delivers both high compression-rate and compression and decompression efficiency. String tarFilePath = tarFile.getPath(); String zStandardFileFullPath = tarFilePath + ".zstd"; File zStandardFile = new File(zStandardFileFullPath); try ( BufferedInputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(tarFilePath)), bufferSize); ZstdCompressorOutputStream zOut = new ZstdCompressorOutputStream(new BufferedOutputStream(Files.newOutputStream(zStandardFile.toPath())), bufferSize) ) { int readByte; while ( (readByte = in.read()) != -1 ) { zOut.write(readByte); } } catch (Exception e) { logger.error("Exception when compressing the tar-archive: " + tarFilePath, e); return null; } finally { FullTextsController.deleteFile(tarFilePath); } logger.debug("Finished archiving and compressing the full-texts of assignments_" + assignmentsCounter + ", batch_" + tarBatchCounter); return zStandardFile; } /** * This method adds the requested full-text file into a TAR archive, which later will be compressed. * */ private static File getTarArchiveWithFullTexts(List filesToTar, String baseDir, long assignmentsCounter, int tarBatchCounter) throws Exception { String tarFileFullPath = baseDir + "assignments_" + assignmentsCounter + "_full-texts_" + tarBatchCounter + ".tar"; // For example: assignments_2_full-texts_4.tar.zstd | where < 4 > is referred to the 4th batch of files requested by the Controller. // https://commons.apache.org/proper/commons-compress/examples.html int numTarredFiles = 0; File tarFile = new File(tarFileFullPath); try ( TarArchiveOutputStream taos = new TarArchiveOutputStream(new BufferedOutputStream(Files.newOutputStream(tarFile.toPath()), bufferSize)) ) { for ( String fileName : filesToTar ) { if ( addTarEntry(taos, fileName, baseDir) ) numTarredFiles ++; } } if ( numTarredFiles == 0 ) { throw new RuntimeException("None of the requested (" + filesToTar.size() + ") could be tarred, for assignments_" + assignmentsCounter + ", batch_" + tarBatchCounter); } else if ( numTarredFiles != filesToTar.size() ) logger.warn("The number of \"numTarredFiles\" (" + numTarredFiles + ") is different from the number of files requested to be tarred (" + filesToTar.size() + "), for assignments_" + assignmentsCounter + ", batch_" + tarBatchCounter); // Still, some files may have been tarred, so we move on. It's up to the Controller, to handle such case. return tarFile; } private static boolean addTarEntry(TarArchiveOutputStream taos, String fileName, String baseDir) { boolean shouldCloseEntry = false; // Useful in order to know if we should close the entry (an Exception may appear when initializing the stream, and so we should not try to close it). Path fullFileNamePath = Paths.get(baseDir + fileName); try ( BufferedInputStream fis = new BufferedInputStream(Files.newInputStream(fullFileNamePath), bufferSize) ) { TarArchiveEntry entry = new TarArchiveEntry(fileName); entry.setSize(Files.size(fullFileNamePath)); // Yes, tar requires that we set the size beforehand.. taos.putArchiveEntry(entry); shouldCloseEntry = true; int readByte; while ( (readByte = fis.read()) != -1 ) { taos.write(readByte); } } catch (NoSuchFileException nsfe) { logger.error("NoSuchFileException: " + nsfe.getMessage()); return false; } catch (Exception e) { logger.error("", e); return false; } finally { if ( shouldCloseEntry ) { try { taos.closeArchiveEntry(); // close just the TarEntry here (not the TarArchiveOutputStream) } catch (IOException e) { logger.error("", e); } } } return true; } }