- Add a new endpoint "getFullTextsImproved" which uses Facebook's [**Zstandard**](https://facebook.github.io/zstd/) compression algorithm, which brings very big benefits on compression rate and speed.
- Remove some dependencies.
This commit is contained in:
parent
778dc6e25c
commit
fd62ac567e
|
@ -4,6 +4,8 @@ The Worker's Application, requests assignments from the [Controller](https://cod
|
|||
Then, it posts the results to the Controller, which in turn, requests from the Worker, the full-texts which are not already found by other workers, in batches.<br>
|
||||
The Worker responds by compressing and sending the requested files, in each batch.<br>
|
||||
<br>
|
||||
We use Facebook's [**Zstandard**](https://facebook.github.io/zstd/) compression algorithm, which brings very big benefits in compression rate and speed.
|
||||
<br>
|
||||
To install and run the application:
|
||||
- Run ```git clone``` and then ```cd UrlsWorker```.
|
||||
- [Optional] Create the file ```inputData.txt``` , which contains just one line with the ___workerId___, the ___maxAssignmentsLimitPerBatch___, the ___maxAssignmentsBatchesToHandleBeforeRestart___, the ___controller's base api-url___ and the ___shutdownOrCancelCode___, all seperated by a _comma_ "```,```" .<br>
|
||||
|
|
|
@ -27,10 +27,6 @@ dependencies {
|
|||
|
||||
implementation 'org.projectlombok:lombok:1.18.24'
|
||||
|
||||
// https://mvnrepository.com/artifact/commons-io/commons-io
|
||||
implementation 'commons-io:commons-io:2.11.0'
|
||||
|
||||
|
||||
//implementation group: 'io.jsonwebtoken', name: 'jjwt-api', version: '0.11.5' // Use this in case we use auth-tokens later on.
|
||||
|
||||
// Enable the validation annotations.
|
||||
|
@ -40,8 +36,12 @@ dependencies {
|
|||
exclude group: 'ch.qos.logback', module: 'logback-core'
|
||||
exclude group: 'ch.qos.logback', module: 'logback-classic'
|
||||
exclude group: 'org.slf4j', module: 'slf4j-api'
|
||||
exclude group: 'io.minio' // This is not used in the Worker, since it's the Controller which uploads the full-texts to S3. It also includes an older "commons-compress" version which causes problems.
|
||||
}
|
||||
|
||||
implementation 'org.apache.commons:commons-compress:1.22'
|
||||
implementation 'com.github.luben:zstd-jni:1.5.2-5'
|
||||
|
||||
testImplementation 'org.springframework.security:spring-security-test'
|
||||
testImplementation "org.springframework.boot:spring-boot-starter-test"
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package eu.openaire.urls_worker.controllers;
|
|||
|
||||
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
||||
import eu.openaire.urls_worker.services.FileStorageService;
|
||||
import eu.openaire.urls_worker.util.FilesCompressor;
|
||||
import eu.openaire.urls_worker.util.FilesZipper;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -35,6 +36,67 @@ public class FullTextsController {
|
|||
}
|
||||
|
||||
|
||||
@GetMapping("getFullTextsImproved/{assignmentsCounter:[\\d]+}/{totalBatches:[\\d]+}/{batchCounter:[\\d]+}/{fileNamesWithExtensions}")
|
||||
public Object getMultipleFullTextsImproved(@PathVariable long assignmentsCounter, @PathVariable int totalBatches, @PathVariable int batchCounter, @PathVariable List<String> fileNamesWithExtensions) {
|
||||
|
||||
int fileNamesListNum = fileNamesWithExtensions.size();
|
||||
if ( (fileNamesListNum == 1) && (fileNamesWithExtensions.get(0).length() == 0) ) { // In case the last "/" in the url was given (without any files following), then this list will not be empty, but have one empty item instead.
|
||||
// In case the url does not end in "/", then Spring will automatically return an "HTTP-BadRequest".
|
||||
String errorMsg = "An empty \"fileNamesWithExtensions\" list was given from assignments_" + assignmentsCounter + ", for batch_" + batchCounter;
|
||||
logger.error(errorMsg);
|
||||
return ResponseEntity.badRequest().body(errorMsg);
|
||||
}
|
||||
|
||||
if ( totalBatches == 0 ) {
|
||||
String errorMsg = "The given \"totalBatches\" (" + totalBatches + ") was < 0 >!";
|
||||
logger.error(errorMsg);
|
||||
return ResponseEntity.badRequest().body(errorMsg);
|
||||
}
|
||||
else if ( batchCounter > totalBatches ) {
|
||||
String errorMsg = "The given \"batchCounter\" (" + batchCounter + ") is greater than the \"totalBatches\" (" + totalBatches + ")!";
|
||||
logger.error(errorMsg);
|
||||
return ResponseEntity.badRequest().body(errorMsg);
|
||||
}
|
||||
|
||||
logger.info("Received a \"getMultipleFullTextsImproved\" request for returning a tar-file containing " + fileNamesListNum + " full-texts, from assignments_" + assignmentsCounter + ", for batch_" + batchCounter + " (out of " + totalBatches + ").");
|
||||
|
||||
String currentAssignmentsBaseFullTextsPath = assignmentsBaseDir + "assignments_" + assignmentsCounter + "_fullTexts" + File.separator;
|
||||
|
||||
if ( ! (new File(currentAssignmentsBaseFullTextsPath).isDirectory()) ) {
|
||||
String errorMsg = "The base directory for assignments_" + assignmentsCounter + " was not found: " + currentAssignmentsBaseFullTextsPath;
|
||||
logger.error(errorMsg);
|
||||
return ResponseEntity.badRequest().body(errorMsg);
|
||||
}
|
||||
|
||||
File zstdFile = FilesCompressor.compressMultipleFilesIntoOne(assignmentsCounter, batchCounter, fileNamesWithExtensions, currentAssignmentsBaseFullTextsPath);
|
||||
if ( zstdFile == null ) {
|
||||
String errorMsg = "Failed to create the zstd file for \"batchCounter\"-" + batchCounter;
|
||||
logger.error(errorMsg);
|
||||
return ResponseEntity.internalServerError().body(errorMsg);
|
||||
}
|
||||
|
||||
if ( batchCounter == totalBatches )
|
||||
logger.debug("Will return the " + ((totalBatches > 1) ? "last" : "only one") + " batch (" + batchCounter + ") of Assignments_" + assignmentsCounter + " to the Controller.");
|
||||
|
||||
String zstdName = zstdFile.getName();
|
||||
String zstdTarFileFullPath = currentAssignmentsBaseFullTextsPath + zstdName;
|
||||
try {
|
||||
return ResponseEntity.ok()
|
||||
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
||||
.header(HttpHeaders.CONTENT_DISPOSITION, "inline; filename=\"" + zstdName + "\"")
|
||||
.body(new InputStreamResource(Files.newInputStream(Paths.get(zstdTarFileFullPath))));
|
||||
} catch (Exception e) {
|
||||
String errorMsg = "Could not load the FileInputStream of the zstd-tar-file \"" + zstdTarFileFullPath + "\"!";
|
||||
logger.error(errorMsg, e);
|
||||
return ResponseEntity.internalServerError().body(errorMsg);
|
||||
}
|
||||
|
||||
// The related fulltext and (zstd-)tar files will be deleted in "AssignmentsHandler.postWorkerReport()", after the Controller has finished transferring them. They will be deleted even in case of a Controller-error.
|
||||
// In case of an error and file-deletion, the related id-url records will just be re-processed in the future by some (maybe different) Worker.
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
@GetMapping("getFullTexts/{assignmentsCounter:[\\d]+}/{totalZipBatches:[\\d]+}/{zipBatchCounter:[\\d]+}/{fileNamesWithExtensions}")
|
||||
public Object getMultipleFullTexts(@PathVariable long assignmentsCounter, @PathVariable int totalZipBatches, @PathVariable int zipBatchCounter, @PathVariable List<String> fileNamesWithExtensions) {
|
||||
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
package eu.openaire.urls_worker.util;
|
||||
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
|
||||
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
|
||||
public class FilesCompressor {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FilesCompressor.class);
|
||||
|
||||
public static File compressMultipleFilesIntoOne(long assignmentsCounter, int zipBatchCounter, List<String> filesToCompress, String baseDirectory)
|
||||
{
|
||||
// For example: assignments_2_full-texts_4.zip | where < 4 > is referred to the 4th batch of files requested by the Controller.
|
||||
|
||||
File tarFile = getTarArchiveWithFullTexts(filesToCompress, baseDirectory, assignmentsCounter, zipBatchCounter);
|
||||
if ( tarFile == null )
|
||||
return null; // The error-cause is already logged.
|
||||
|
||||
// The "TAR" archive is not compressed, but it helps deliver multiple full-texts with a single Stream.
|
||||
// Then, we compress the archive, using Facebook's "ZStandard" algorithm, which delivers both high compression-rate and compression and decompression efficiency.
|
||||
|
||||
String tarFilePath = tarFile.getPath();
|
||||
String zStandardFileFullPath = tarFilePath + ".zstd";
|
||||
File zStandardFile = new File(zStandardFileFullPath);
|
||||
|
||||
try ( InputStream in = Files.newInputStream(Paths.get(tarFilePath));
|
||||
ZstdCompressorOutputStream zOut = new ZstdCompressorOutputStream(new BufferedOutputStream(Files.newOutputStream(zStandardFile.toPath()))) )
|
||||
{
|
||||
final byte[] buffer = new byte[1048576]; // 1 Mb
|
||||
int numBytes = 0;
|
||||
while ( (numBytes = in.read(buffer)) != -1 ) {
|
||||
zOut.write(buffer, 0, numBytes);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Exception when compressing the tar-archive: " + tarFilePath, e);
|
||||
return null;
|
||||
}
|
||||
|
||||
return zStandardFile;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This method adds the requested full-text file into a TAR archive, which later will be compressed.
|
||||
* */
|
||||
private static File getTarArchiveWithFullTexts(List<String> filesToTar, String baseDir, long assignmentsCounter, int tarBatchCounter) {
|
||||
|
||||
String tarFileFullPath = baseDir + "assignments_" + assignmentsCounter + "_full-texts_" + tarBatchCounter + ".tar";
|
||||
// For example: assignments_2_full-texts_4.zip | where < 4 > is referred to the 4th batch of files requested by the Controller.
|
||||
|
||||
// https://commons.apache.org/proper/commons-compress/examples.html
|
||||
|
||||
int numTarredFiles = 0;
|
||||
File tarFile = new File(tarFileFullPath);
|
||||
|
||||
try ( TarArchiveOutputStream taos = new TarArchiveOutputStream(Files.newOutputStream(tarFile.toPath())) )
|
||||
{
|
||||
for ( String fileName : filesToTar ) {
|
||||
if ( addTarEntry(taos, fileName, baseDir) )
|
||||
numTarredFiles ++;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Exception when creating the tar-file: " + tarFileFullPath, e);
|
||||
return null;
|
||||
}
|
||||
logger.debug("Tarred " + numTarredFiles + " (out of " + filesToTar.size() + ") files for assignments_" + assignmentsCounter + ", batch_" + tarBatchCounter);
|
||||
return tarFile;
|
||||
}
|
||||
|
||||
|
||||
private static boolean addTarEntry(TarArchiveOutputStream taos, String fileName, String baseDir)
|
||||
{
|
||||
boolean shouldCloseEntry = false; // Useful in order to know if we should close the entry (an Exception may appear, and so we should not try to close it).
|
||||
|
||||
String fullFileName = baseDir + fileName;
|
||||
try ( FileInputStream fis = new FileInputStream(fullFileName) )
|
||||
{
|
||||
TarArchiveEntry entry = new TarArchiveEntry(fileName);
|
||||
entry.setSize(Files.size(Paths.get(fullFileName))); // Yes, tar requires that we set the size beforehand..
|
||||
taos.putArchiveEntry(entry);
|
||||
shouldCloseEntry = true;
|
||||
|
||||
int readByte;
|
||||
while ( (readByte = fis.read()) != -1 ) {
|
||||
taos.write(readByte);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("", e);
|
||||
return false;
|
||||
} finally {
|
||||
if ( shouldCloseEntry ) {
|
||||
try {
|
||||
taos.closeArchiveEntry(); // close just the ZipEntry here (not the ZipOutputStream)
|
||||
} catch (IOException e) {
|
||||
logger.error("", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue