Compare commits

...

4 Commits

Author SHA1 Message Date
Lampros Smyrnaios 107908a733 - Fix not deleting the "assignments_*" directory, along with the potentially partially created zstd file, in case there was a compression error.
- Show the number of files which were successfully compressed, in each batch.
- Fix the class-value used in the Logger-initializer, in "FullTextsController".
- Improve an error-log.
2024-05-30 12:29:02 +03:00
Lampros Smyrnaios 4af74d4581 - Reduce the amount of "requiredFreeSpace" needed to be available in order to accept new assignments.
- Increase the time to wait before rechecking the available free space, in order to get new assignments, to 30 minutes.
- Update dependencies.
- Code polishing.
2024-05-28 23:10:52 +03:00
Lampros Smyrnaios c242f65518 - Improve error-handling in "ConnWithController.postShutdownReportToController()".
- Update dependencies.
2024-05-22 16:14:45 +03:00
Lampros Smyrnaios b40c72f78f - Fix the process of shutting down the worker, in case the user sends the relevant request, while the worker is stuck in a data-request error-loop.
- Upload the updated gradle-wrapper.
2024-04-29 17:08:40 +03:00
9 changed files with 75 additions and 33 deletions

View File

@ -1,12 +1,12 @@
plugins {
id 'org.springframework.boot' version '2.7.18'
id 'io.spring.dependency-management' version '1.1.4'
id 'io.spring.dependency-management' version '1.1.5'
id 'java'
}
java {
group = 'eu.openaire.urls_worker'
version = '2.1.9-SNAPSHOT'
version = '2.1.12-SNAPSHOT'
sourceCompatibility = JavaVersion.VERSION_1_8
}
@ -44,12 +44,12 @@ dependencies {
exclude group: 'io.minio' // This is not used in the Worker, since it's the Controller which uploads the full-texts to S3. It also includes an older "commons-compress" version which causes problems.
}
implementation group: 'com.google.guava', name: 'guava', version: '33.1.0-jre'
implementation group: 'com.google.guava', name: 'guava', version: '33.2.0-jre'
// https://mvnrepository.com/artifact/com.google.code.gson/gson
implementation 'com.google.code.gson:gson:2.10.1'
implementation 'com.google.code.gson:gson:2.11.0'
implementation("org.apache.commons:commons-compress:1.26.1") {
implementation("org.apache.commons:commons-compress:1.26.2") {
exclude group: 'com.github.luben', module: 'zstd-jni'
}
implementation 'com.github.luben:zstd-jni:1.5.6-3' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often.

Binary file not shown.

View File

@ -47,7 +47,7 @@ public class AssignmentsHandler {
private final String workerId;
private final String controllerBaseUrl;
private final int maxAssignmentsLimitPerBatch;
private final int maxAssignmentsBatchesToHandleBeforeShutdown;
public final int maxAssignmentsBatchesToHandleBeforeShutdown;
public static List<UrlReport> urlReports = null;
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.

View File

@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.client.RestTemplate;
@ -26,8 +27,9 @@ public class ConnWithController {
public boolean postShutdownReportToController(String workerId)
{
logger.info("Going to \"postShutdownReportToController\".");
String url = this.controllerBaseUrl + "workerShutdownReport?workerId=" + workerId;
try {
ResponseEntity<String> responseEntity = new RestTemplate().postForEntity(this.controllerBaseUrl + "workerShutdownReport?workerId=" + workerId, null, String.class);
ResponseEntity<String> responseEntity = new RestTemplate().postForEntity(url, null, String.class);
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode != HttpStatus.OK.value() ) {
logger.error("HTTP-Connection problem with the submission of the \"postShutdownReportToController\"! Error-code was: " + responseCode);
@ -36,8 +38,17 @@ public class ConnWithController {
} catch (HttpServerErrorException hsee) {
logger.error("The Controller failed to handle the \"postShutdownReportToController\": " + hsee.getMessage());
return false;
} catch (HttpClientErrorException hcee) {
logger.error("The Worker did something wrong when sending the report result to the Controller. | url: " + url + "\n" + hcee.getMessage());
return false;
} catch (Exception e) {
logger.error("Error for \"postShutdownReportToController\" to the Controller.", e);
String errorMsg = "Error for \"postShutdownReportToController\", to the Controller.";
Throwable cause = e.getCause();
String exMsg;
if ( (cause != null) && ((exMsg = cause.getMessage()) != null) && exMsg.contains("Connection refused") )
logger.error(errorMsg + " | The Controller has probably crashed, since we received a \"Connection refused\" message!");
else
logger.error(errorMsg, e);
return false;
}
return true;

View File

@ -50,7 +50,7 @@ public class ScheduledTasks {
if ( maxAssignmentsLimitPerBatch <= 1_000 )
requiredFreeSpace = oneAndHalfGB;
else
requiredFreeSpace = oneAndHalfGB * (maxAssignmentsLimitPerBatch / 1_000);
requiredFreeSpace = oneAndHalfGB * (maxAssignmentsLimitPerBatch / 1_500);
logger.info("The \"requiredFreeSpace\" for the app to request new assignments, having \"maxAssignmentsLimitPerBatch\" equal to " + maxAssignmentsLimitPerBatch + " , is: " + (requiredFreeSpace / (1024 * 1024)) + " Mb");
}
@ -65,19 +65,13 @@ public class ScheduledTasks {
return;
}
// The user might have just requested the Worker to shut-down.
if ( GeneralController.shouldShutdownWorker ) { // Make sure the worker shuts-down, in case the user sends the relevant request, while the worker is stuck in a data-request error-loop.
AssignmentsHandler.shouldNotRequestMore = true;
return;
}
if ( rootPath.getFreeSpace() < requiredFreeSpace ) {
// It's not safe to proceed with downloading more files and risk of "noSpaceLeft" error.
// Wait for the Controller to take the full-texts and any remaining files to be deleted, so that more free-space becomes available.
// We need to have some buffer zone for the ".tar" files which will be created from the already downloaded full-texts, when the Controller starts requesting them.
logger.warn("The free space is running out (less than " + (requiredFreeSpace / oneMb) + " Mb). The Worker will avoid getting new assignments for the next 15 minutes.");
logger.warn("The free space is running low (less than " + (requiredFreeSpace / oneMb) + " Mb). The Worker will avoid getting new assignments for the next 30 minutes.");
try {
Thread.sleep(900_000); // Sleep for 15 mins to stall the scheduler from retrying right away, thus giving time to the disk-space to be freed.
Thread.sleep(1_800_000); // Sleep for 30 mins to stall the scheduler from retrying right away, thus giving time to the disk-space to be freed.
} catch (InterruptedException ie) {
logger.warn("Sleeping was interrupted!");
}
@ -203,7 +197,7 @@ public class ScheduledTasks {
}
}
} catch (Exception e) {
logger.error("", e);
logger.error("Failed to check and delete leftover fulltext files!", e);
return;
}

View File

@ -28,7 +28,7 @@ import java.util.List;
@RequestMapping("full-texts/")
public class FullTextsController {
private static final Logger logger = LoggerFactory.getLogger(GeneralController.class);
private static final Logger logger = LoggerFactory.getLogger(FullTextsController.class);
@Autowired
private FileStorageService fileStorageService;
@ -68,8 +68,9 @@ public class FullTextsController {
File zstdFile = FilesCompressor.compressMultipleFilesIntoOne(assignmentsCounter, batchCounter, fileNamesWithExtensions, currentAssignmentsBaseFullTextsPath);
if ( zstdFile == null ) {
// The failed files (including the ".tar"), have already been deleted.
String errorMsg = "Failed to create the zstd file for \"batchCounter\"-" + batchCounter;
// The failed files (including the ".tar" and ".zstd"), have already been deleted.
deleteDirectory(new File(currentAssignmentsBaseFullTextsPath)); // Delete this assignments' directory.
String errorMsg = "Failed to compress the full-text files for batch_" + batchCounter + ", assignments_" + assignmentsCounter;
logger.error(errorMsg);
return ResponseEntity.internalServerError().body(errorMsg);
}

View File

@ -5,6 +5,7 @@ import eu.openaire.urls_worker.components.plugins.PublicationsRetrieverPlugin;
import eu.openaire.urls_worker.util.UriBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@ -22,6 +23,9 @@ public class GeneralController {
private static final Logger logger = LoggerFactory.getLogger(GeneralController.class);
@Autowired
AssignmentsHandler assignmentsHandler;
private final String controllerIp;
private final String workerReportsDirPath;
private final String workerId;
@ -67,8 +71,10 @@ public class GeneralController {
String finalMsg = "";
if ( shouldShutdownWorker )
finalMsg = "The worker has already received a \"shutdownWorker\" request (which was not canceled afterwards). ";
else
else {
shouldShutdownWorker = true;
AssignmentsHandler.shouldNotRequestMore = true; // Make sure the worker shuts-down, in case the user sends the relevant request, while the worker is stuck in a data-request error-loop.
}
finalMsg += "The worker will shutdown, after finishing current work.";
logger.info(initMsg + finalMsg);
@ -85,6 +91,9 @@ public class GeneralController {
return responseEntity;
shouldShutdownWorker = false;
if ( AssignmentsHandler.numHandledAssignmentsBatches < assignmentsHandler.maxAssignmentsBatchesToHandleBeforeShutdown )
AssignmentsHandler.shouldNotRequestMore = false;
String finalMsg = "Any previous \"shutdownWorker\"-request is canceled. The \"maxAssignmentsBatchesToHandleBeforeShutdown\" will still be honored (if it's set).";
logger.info(initMsg + finalMsg);
return ResponseEntity.ok().body(finalMsg + "\n");

View File

@ -0,0 +1,22 @@
package eu.openaire.urls_worker.models;
import java.io.File;
public class TarFileResult {
private File tarFile;
private int numTarredFiles;
public TarFileResult(File tarFile, int numTarredFiles) {
this.tarFile = tarFile;
this.numTarredFiles = numTarredFiles;
}
public File getTarFile() {
return tarFile;
}
public int getNumTarredFiles() {
return numTarredFiles;
}
}

View File

@ -1,6 +1,7 @@
package eu.openaire.urls_worker.util;
import eu.openaire.urls_worker.controllers.FullTextsController;
import eu.openaire.urls_worker.models.TarFileResult;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
@ -28,13 +29,16 @@ public class FilesCompressor {
{
// For example: assignments_2_full-texts_4.tar.zstd | where < 4 > is referred to the 4th batch of files requested by the Controller.
File tarFile;
int numTarredFiles = 0;
try {
tarFile = getTarArchiveWithFullTexts(filesToCompress, baseDirectory, assignmentsCounter, tarBatchCounter);
TarFileResult tarFileResult = getTarArchiveWithFullTexts(filesToCompress, baseDirectory, assignmentsCounter, tarBatchCounter);
tarFile = tarFileResult.getTarFile();
numTarredFiles = tarFileResult.getNumTarredFiles();
} catch (Exception e) {
logger.error("Exception when creating the tar-file for assignments_" + assignmentsCounter, e);
return null;
} finally {
// Delete the files of this failed batch immediately. These files will not be requested again. The urls leading to these file will be reprocessed in the future.
// Delete the files of this batch immediately. These files will not be requested again. The urls leading to these file may be reprocessed in the future, in case the tar did not make it to the Controller..
for ( String fileName : filesToCompress )
FullTextsController.deleteFile(baseDirectory + fileName);
}
@ -50,17 +54,19 @@ public class FilesCompressor {
ZstdCompressorOutputStream zOut = new ZstdCompressorOutputStream(new BufferedOutputStream(Files.newOutputStream(zStandardFile.toPath())), bufferSize) )
{
int readByte;
while ( (readByte = in.read()) != -1 ) {
while ( (readByte = in.read()) != -1 )
zOut.write(readByte);
}
} catch (Exception e) {
logger.error("Exception when compressing the tar-archive: " + tarFilePath, e);
// The ".zstd" file may have been partially created. It will be deleted, along with this assignments' directory, by the caller.
return null;
} finally {
FullTextsController.deleteFile(tarFilePath);
}
logger.debug("Finished archiving and compressing the full-texts of assignments_" + assignmentsCounter + ", batch_" + tarBatchCounter);
// At this point, the compressed files are the exact files included inside the tar archive, so the possible "missing-files" case will already have arisen, previously.
int totalFiles = filesToCompress.size();
logger.debug("Finished archiving and compressing " + ((numTarredFiles == totalFiles) ? ("all " + totalFiles) : (numTarredFiles + " out of " + totalFiles)) + " full-texts of assignments_" + assignmentsCounter + ", batch_" + tarBatchCounter);
return zStandardFile;
}
@ -68,7 +74,7 @@ public class FilesCompressor {
/**
* This method adds the requested full-text file into a TAR archive, which later will be compressed.
* */
private static File getTarArchiveWithFullTexts(List<String> filesToTar, String baseDir, long assignmentsCounter, int tarBatchCounter) throws Exception
private static TarFileResult getTarArchiveWithFullTexts(List<String> filesToTar, String baseDir, long assignmentsCounter, int tarBatchCounter) throws Exception
{
String tarFileFullPath = baseDir + "assignments_" + assignmentsCounter + "_full-texts_" + tarBatchCounter + ".tar";
// For example: assignments_2_full-texts_4.tar.zstd | where < 4 > is referred to the 4th batch of files requested by the Controller.
@ -80,19 +86,18 @@ public class FilesCompressor {
try ( TarArchiveOutputStream taos = new TarArchiveOutputStream(new BufferedOutputStream(Files.newOutputStream(tarFile.toPath()), bufferSize)) )
{
for ( String fileName : filesToTar ) {
for ( String fileName : filesToTar )
if ( addTarEntry(taos, fileName, baseDir) )
numTarredFiles ++;
}
}
if ( numTarredFiles == 0 ) {
if ( numTarredFiles == 0 )
throw new RuntimeException("None of the requested (" + filesToTar.size() + ") could be tarred, for assignments_" + assignmentsCounter + ", batch_" + tarBatchCounter);
} else if ( numTarredFiles != filesToTar.size() )
else if ( numTarredFiles != filesToTar.size() )
logger.warn("The number of \"numTarredFiles\" (" + numTarredFiles + ") is different from the number of files requested to be tarred (" + filesToTar.size() + "), for assignments_" + assignmentsCounter + ", batch_" + tarBatchCounter);
// Still, some files may have been tarred, so we move on. It's up to the Controller, to handle such case.
return tarFile;
return new TarFileResult(tarFile, numTarredFiles);
}