2023-01-09 14:48:30 +01:00
package eu.openaire.urls_worker.util ;
2023-05-23 21:19:41 +02:00
import eu.openaire.urls_worker.controllers.FullTextsController ;
2023-01-09 14:48:30 +01:00
import org.apache.commons.compress.archivers.tar.TarArchiveEntry ;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream ;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
2023-03-02 16:47:58 +01:00
import java.io.BufferedInputStream ;
import java.io.BufferedOutputStream ;
import java.io.File ;
import java.io.IOException ;
2023-01-09 14:48:30 +01:00
import java.nio.file.Files ;
2023-10-04 15:08:38 +02:00
import java.nio.file.NoSuchFileException ;
2023-03-02 16:47:58 +01:00
import java.nio.file.Path ;
2023-01-09 14:48:30 +01:00
import java.nio.file.Paths ;
import java.util.List ;
public class FilesCompressor {
private static final Logger logger = LoggerFactory . getLogger ( FilesCompressor . class ) ;
2023-04-28 16:59:36 +02:00
public static final int bufferSize = ( 5 * 1_048_576 ) ; // 5 Mb
2023-03-02 16:47:58 +01:00
2023-03-07 15:25:10 +01:00
public static File compressMultipleFilesIntoOne ( long assignmentsCounter , int tarBatchCounter , List < String > filesToCompress , String baseDirectory )
2023-01-09 14:48:30 +01:00
{
2023-03-07 15:25:10 +01:00
// For example: assignments_2_full-texts_4.tar.zstd | where < 4 > is referred to the 4th batch of files requested by the Controller.
2023-05-23 21:19:41 +02:00
File tarFile ;
try {
tarFile = getTarArchiveWithFullTexts ( filesToCompress , baseDirectory , assignmentsCounter , tarBatchCounter ) ;
} catch ( Exception e ) {
logger . error ( " Exception when creating the tar-file for assignments_ " + assignmentsCounter , e ) ;
return null ;
} finally {
// Delete the files of this failed batch immediately. These files will not be requested again. The urls leading to these file will be reprocessed in the future.
for ( String fileName : filesToCompress )
FullTextsController . deleteFile ( baseDirectory + fileName ) ;
}
2023-01-09 14:48:30 +01:00
// The "TAR" archive is not compressed, but it helps deliver multiple full-texts with a single Stream.
// Then, we compress the archive, using Facebook's "ZStandard" algorithm, which delivers both high compression-rate and compression and decompression efficiency.
String tarFilePath = tarFile . getPath ( ) ;
String zStandardFileFullPath = tarFilePath + " .zstd " ;
File zStandardFile = new File ( zStandardFileFullPath ) ;
2023-04-28 16:59:36 +02:00
try ( BufferedInputStream in = new BufferedInputStream ( Files . newInputStream ( Paths . get ( tarFilePath ) ) , bufferSize ) ;
ZstdCompressorOutputStream zOut = new ZstdCompressorOutputStream ( new BufferedOutputStream ( Files . newOutputStream ( zStandardFile . toPath ( ) ) ) , bufferSize ) )
2023-01-09 14:48:30 +01:00
{
2023-03-02 16:47:58 +01:00
int readByte ;
while ( ( readByte = in . read ( ) ) ! = - 1 ) {
zOut . write ( readByte ) ;
2023-01-09 14:48:30 +01:00
}
} catch ( Exception e ) {
logger . error ( " Exception when compressing the tar-archive: " + tarFilePath , e ) ;
return null ;
2023-05-23 21:19:41 +02:00
} finally {
FullTextsController . deleteFile ( tarFilePath ) ;
2023-01-09 14:48:30 +01:00
}
2023-03-07 15:25:10 +01:00
logger . debug ( " Finished archiving and compressing the full-texts of assignments_ " + assignmentsCounter + " , batch_ " + tarBatchCounter ) ;
2023-01-09 14:48:30 +01:00
return zStandardFile ;
}
/ * *
* This method adds the requested full - text file into a TAR archive , which later will be compressed .
* * /
2023-05-23 21:19:41 +02:00
private static File getTarArchiveWithFullTexts ( List < String > filesToTar , String baseDir , long assignmentsCounter , int tarBatchCounter ) throws Exception
{
2023-01-09 14:48:30 +01:00
String tarFileFullPath = baseDir + " assignments_ " + assignmentsCounter + " _full-texts_ " + tarBatchCounter + " .tar " ;
2023-03-07 15:25:10 +01:00
// For example: assignments_2_full-texts_4.tar.zstd | where < 4 > is referred to the 4th batch of files requested by the Controller.
2023-01-09 14:48:30 +01:00
// https://commons.apache.org/proper/commons-compress/examples.html
int numTarredFiles = 0 ;
File tarFile = new File ( tarFileFullPath ) ;
2023-04-28 16:59:36 +02:00
try ( TarArchiveOutputStream taos = new TarArchiveOutputStream ( new BufferedOutputStream ( Files . newOutputStream ( tarFile . toPath ( ) ) , bufferSize ) ) )
2023-01-09 14:48:30 +01:00
{
for ( String fileName : filesToTar ) {
if ( addTarEntry ( taos , fileName , baseDir ) )
numTarredFiles + + ;
}
}
2023-03-07 15:25:10 +01:00
2023-10-04 15:08:38 +02:00
if ( numTarredFiles = = 0 ) {
throw new RuntimeException ( " None of the requested ( " + filesToTar . size ( ) + " ) could be tarred, for assignments_ " + assignmentsCounter + " , batch_ " + tarBatchCounter ) ;
} else if ( numTarredFiles ! = filesToTar . size ( ) )
2023-03-07 15:25:10 +01:00
logger . warn ( " The number of \" numTarredFiles \" ( " + numTarredFiles + " ) is different from the number of files requested to be tarred ( " + filesToTar . size ( ) + " ), for assignments_ " + assignmentsCounter + " , batch_ " + tarBatchCounter ) ;
2023-04-20 14:39:15 +02:00
// Still, some files may have been tarred, so we move on. It's up to the Controller, to handle such case.
2023-03-07 15:25:10 +01:00
2023-01-09 14:48:30 +01:00
return tarFile ;
}
private static boolean addTarEntry ( TarArchiveOutputStream taos , String fileName , String baseDir )
{
2023-04-28 16:59:36 +02:00
boolean shouldCloseEntry = false ; // Useful in order to know if we should close the entry (an Exception may appear when initializing the stream, and so we should not try to close it).
2023-01-09 14:48:30 +01:00
2023-03-02 16:47:58 +01:00
Path fullFileNamePath = Paths . get ( baseDir + fileName ) ;
2023-10-04 15:08:38 +02:00
try ( BufferedInputStream fis = new BufferedInputStream ( Files . newInputStream ( fullFileNamePath ) , bufferSize ) ) {
2023-01-09 14:48:30 +01:00
TarArchiveEntry entry = new TarArchiveEntry ( fileName ) ;
2023-03-02 16:47:58 +01:00
entry . setSize ( Files . size ( fullFileNamePath ) ) ; // Yes, tar requires that we set the size beforehand..
2023-01-09 14:48:30 +01:00
taos . putArchiveEntry ( entry ) ;
shouldCloseEntry = true ;
int readByte ;
while ( ( readByte = fis . read ( ) ) ! = - 1 ) {
taos . write ( readByte ) ;
}
2023-10-04 15:08:38 +02:00
} catch ( NoSuchFileException nsfe ) {
logger . error ( " NoSuchFileException: " + nsfe . getMessage ( ) ) ;
return false ;
2023-01-09 14:48:30 +01:00
} catch ( Exception e ) {
logger . error ( " " , e ) ;
return false ;
} finally {
if ( shouldCloseEntry ) {
try {
2023-03-07 15:25:10 +01:00
taos . closeArchiveEntry ( ) ; // close just the TarEntry here (not the TarArchiveOutputStream)
2023-01-09 14:48:30 +01:00
} catch ( IOException e ) {
logger . error ( " " , e ) ;
}
}
}
return true ;
}
}