diff --git a/README.md b/README.md index 7fcc352..44fee2f 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ The Worker's Application, requests assignments from the [Controller](https://cod Then, it posts the results to the Controller, which in turn, requests from the Worker, the full-texts which are not already found by other workers, in batches.
The Worker responds by compressing and sending the requested files, in each batch.

+We use Facebook's [**Zstandard**](https://facebook.github.io/zstd/) compression algorithm, which brings very big benefits in compression rate and speed. +
To install and run the application: - Run ```git clone``` and then ```cd UrlsWorker```. - [Optional] Create the file ```inputData.txt``` , which contains just one line with the ___workerId___, the ___maxAssignmentsLimitPerBatch___, the ___maxAssignmentsBatchesToHandleBeforeRestart___, the ___controller's base api-url___ and the ___shutdownOrCancelCode___, all seperated by a _comma_ "```,```" .
diff --git a/build.gradle b/build.gradle index 6d60d48..28983a6 100644 --- a/build.gradle +++ b/build.gradle @@ -27,10 +27,6 @@ dependencies { implementation 'org.projectlombok:lombok:1.18.24' - // https://mvnrepository.com/artifact/commons-io/commons-io - implementation 'commons-io:commons-io:2.11.0' - - //implementation group: 'io.jsonwebtoken', name: 'jjwt-api', version: '0.11.5' // Use this in case we use auth-tokens later on. // Enable the validation annotations. @@ -40,8 +36,12 @@ dependencies { exclude group: 'ch.qos.logback', module: 'logback-core' exclude group: 'ch.qos.logback', module: 'logback-classic' exclude group: 'org.slf4j', module: 'slf4j-api' + exclude group: 'io.minio' // This is not used in the Worker, since it's the Controller which uploads the full-texts to S3. It also includes an older "commons-compress" version which causes problems. } + implementation 'org.apache.commons:commons-compress:1.22' + implementation 'com.github.luben:zstd-jni:1.5.2-5' + testImplementation 'org.springframework.security:spring-security-test' testImplementation "org.springframework.boot:spring-boot-starter-test" } diff --git a/src/main/java/eu/openaire/urls_worker/controllers/FullTextsController.java b/src/main/java/eu/openaire/urls_worker/controllers/FullTextsController.java index 751db4e..c0595fb 100644 --- a/src/main/java/eu/openaire/urls_worker/controllers/FullTextsController.java +++ b/src/main/java/eu/openaire/urls_worker/controllers/FullTextsController.java @@ -2,6 +2,7 @@ package eu.openaire.urls_worker.controllers; import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin; import eu.openaire.urls_worker.services.FileStorageService; +import eu.openaire.urls_worker.util.FilesCompressor; import eu.openaire.urls_worker.util.FilesZipper; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -35,6 +36,67 @@ public class FullTextsController { } + @GetMapping("getFullTextsImproved/{assignmentsCounter:[\\d]+}/{totalBatches:[\\d]+}/{batchCounter:[\\d]+}/{fileNamesWithExtensions}") + public Object getMultipleFullTextsImproved(@PathVariable long assignmentsCounter, @PathVariable int totalBatches, @PathVariable int batchCounter, @PathVariable List fileNamesWithExtensions) { + + int fileNamesListNum = fileNamesWithExtensions.size(); + if ( (fileNamesListNum == 1) && (fileNamesWithExtensions.get(0).length() == 0) ) { // In case the last "/" in the url was given (without any files following), then this list will not be empty, but have one empty item instead. + // In case the url does not end in "/", then Spring will automatically return an "HTTP-BadRequest". + String errorMsg = "An empty \"fileNamesWithExtensions\" list was given from assignments_" + assignmentsCounter + ", for batch_" + batchCounter; + logger.error(errorMsg); + return ResponseEntity.badRequest().body(errorMsg); + } + + if ( totalBatches == 0 ) { + String errorMsg = "The given \"totalBatches\" (" + totalBatches + ") was < 0 >!"; + logger.error(errorMsg); + return ResponseEntity.badRequest().body(errorMsg); + } + else if ( batchCounter > totalBatches ) { + String errorMsg = "The given \"batchCounter\" (" + batchCounter + ") is greater than the \"totalBatches\" (" + totalBatches + ")!"; + logger.error(errorMsg); + return ResponseEntity.badRequest().body(errorMsg); + } + + logger.info("Received a \"getMultipleFullTextsImproved\" request for returning a tar-file containing " + fileNamesListNum + " full-texts, from assignments_" + assignmentsCounter + ", for batch_" + batchCounter + " (out of " + totalBatches + ")."); + + String currentAssignmentsBaseFullTextsPath = assignmentsBaseDir + "assignments_" + assignmentsCounter + "_fullTexts" + File.separator; + + if ( ! (new File(currentAssignmentsBaseFullTextsPath).isDirectory()) ) { + String errorMsg = "The base directory for assignments_" + assignmentsCounter + " was not found: " + currentAssignmentsBaseFullTextsPath; + logger.error(errorMsg); + return ResponseEntity.badRequest().body(errorMsg); + } + + File zstdFile = FilesCompressor.compressMultipleFilesIntoOne(assignmentsCounter, batchCounter, fileNamesWithExtensions, currentAssignmentsBaseFullTextsPath); + if ( zstdFile == null ) { + String errorMsg = "Failed to create the zstd file for \"batchCounter\"-" + batchCounter; + logger.error(errorMsg); + return ResponseEntity.internalServerError().body(errorMsg); + } + + if ( batchCounter == totalBatches ) + logger.debug("Will return the " + ((totalBatches > 1) ? "last" : "only one") + " batch (" + batchCounter + ") of Assignments_" + assignmentsCounter + " to the Controller."); + + String zstdName = zstdFile.getName(); + String zstdTarFileFullPath = currentAssignmentsBaseFullTextsPath + zstdName; + try { + return ResponseEntity.ok() + .contentType(MediaType.APPLICATION_OCTET_STREAM) + .header(HttpHeaders.CONTENT_DISPOSITION, "inline; filename=\"" + zstdName + "\"") + .body(new InputStreamResource(Files.newInputStream(Paths.get(zstdTarFileFullPath)))); + } catch (Exception e) { + String errorMsg = "Could not load the FileInputStream of the zstd-tar-file \"" + zstdTarFileFullPath + "\"!"; + logger.error(errorMsg, e); + return ResponseEntity.internalServerError().body(errorMsg); + } + + // The related fulltext and (zstd-)tar files will be deleted in "AssignmentsHandler.postWorkerReport()", after the Controller has finished transferring them. They will be deleted even in case of a Controller-error. + // In case of an error and file-deletion, the related id-url records will just be re-processed in the future by some (maybe different) Worker. + } + + + @Deprecated @GetMapping("getFullTexts/{assignmentsCounter:[\\d]+}/{totalZipBatches:[\\d]+}/{zipBatchCounter:[\\d]+}/{fileNamesWithExtensions}") public Object getMultipleFullTexts(@PathVariable long assignmentsCounter, @PathVariable int totalZipBatches, @PathVariable int zipBatchCounter, @PathVariable List fileNamesWithExtensions) { diff --git a/src/main/java/eu/openaire/urls_worker/util/FilesCompressor.java b/src/main/java/eu/openaire/urls_worker/util/FilesCompressor.java new file mode 100644 index 0000000..1738903 --- /dev/null +++ b/src/main/java/eu/openaire/urls_worker/util/FilesCompressor.java @@ -0,0 +1,110 @@ +package eu.openaire.urls_worker.util; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; + +public class FilesCompressor { + + private static final Logger logger = LoggerFactory.getLogger(FilesCompressor.class); + + public static File compressMultipleFilesIntoOne(long assignmentsCounter, int zipBatchCounter, List filesToCompress, String baseDirectory) + { + // For example: assignments_2_full-texts_4.zip | where < 4 > is referred to the 4th batch of files requested by the Controller. + + File tarFile = getTarArchiveWithFullTexts(filesToCompress, baseDirectory, assignmentsCounter, zipBatchCounter); + if ( tarFile == null ) + return null; // The error-cause is already logged. + + // The "TAR" archive is not compressed, but it helps deliver multiple full-texts with a single Stream. + // Then, we compress the archive, using Facebook's "ZStandard" algorithm, which delivers both high compression-rate and compression and decompression efficiency. + + String tarFilePath = tarFile.getPath(); + String zStandardFileFullPath = tarFilePath + ".zstd"; + File zStandardFile = new File(zStandardFileFullPath); + + try ( InputStream in = Files.newInputStream(Paths.get(tarFilePath)); + ZstdCompressorOutputStream zOut = new ZstdCompressorOutputStream(new BufferedOutputStream(Files.newOutputStream(zStandardFile.toPath()))) ) + { + final byte[] buffer = new byte[1048576]; // 1 Mb + int numBytes = 0; + while ( (numBytes = in.read(buffer)) != -1 ) { + zOut.write(buffer, 0, numBytes); + } + } catch (Exception e) { + logger.error("Exception when compressing the tar-archive: " + tarFilePath, e); + return null; + } + + return zStandardFile; + } + + + /** + * This method adds the requested full-text file into a TAR archive, which later will be compressed. + * */ + private static File getTarArchiveWithFullTexts(List filesToTar, String baseDir, long assignmentsCounter, int tarBatchCounter) { + + String tarFileFullPath = baseDir + "assignments_" + assignmentsCounter + "_full-texts_" + tarBatchCounter + ".tar"; + // For example: assignments_2_full-texts_4.zip | where < 4 > is referred to the 4th batch of files requested by the Controller. + + // https://commons.apache.org/proper/commons-compress/examples.html + + int numTarredFiles = 0; + File tarFile = new File(tarFileFullPath); + + try ( TarArchiveOutputStream taos = new TarArchiveOutputStream(Files.newOutputStream(tarFile.toPath())) ) + { + for ( String fileName : filesToTar ) { + if ( addTarEntry(taos, fileName, baseDir) ) + numTarredFiles ++; + } + } catch (Exception e) { + logger.error("Exception when creating the tar-file: " + tarFileFullPath, e); + return null; + } + logger.debug("Tarred " + numTarredFiles + " (out of " + filesToTar.size() + ") files for assignments_" + assignmentsCounter + ", batch_" + tarBatchCounter); + return tarFile; + } + + + private static boolean addTarEntry(TarArchiveOutputStream taos, String fileName, String baseDir) + { + boolean shouldCloseEntry = false; // Useful in order to know if we should close the entry (an Exception may appear, and so we should not try to close it). + + String fullFileName = baseDir + fileName; + try ( FileInputStream fis = new FileInputStream(fullFileName) ) + { + TarArchiveEntry entry = new TarArchiveEntry(fileName); + entry.setSize(Files.size(Paths.get(fullFileName))); // Yes, tar requires that we set the size beforehand.. + taos.putArchiveEntry(entry); + shouldCloseEntry = true; + + int readByte; + while ( (readByte = fis.read()) != -1 ) { + taos.write(readByte); + } + } catch (Exception e) { + logger.error("", e); + return false; + } finally { + if ( shouldCloseEntry ) { + try { + taos.closeArchiveEntry(); // close just the ZipEntry here (not the ZipOutputStream) + } catch (IOException e) { + logger.error("", e); + } + } + } + + return true; + } + +}