UrlsWorker/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java

221 lines
12 KiB
Java

package eu.openaire.urls_worker.components;
import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.controllers.FullTextsController;
import eu.openaire.urls_worker.controllers.GeneralController;
import eu.openaire.urls_worker.services.FileStorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.File;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Component
public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
@Autowired
AssignmentsHandler assignmentsHandler;
@Autowired
private FileStorageService fileStorageService;
@Autowired
private ConnWithController connWithController;
@Value("${info.workerId}")
private String workerId;
@Value("${workerReportsDirPath}")
private String workerReportsDirPath;
private static final File rootPath = new File("/");
private static final long oneAndHalfGB = 1_610_612_736L; // 1.5 GB free space per 1.000-assignments-batch.
private static long requiredFreeSpace;
public ScheduledTasks(@Value("${info.maxAssignmentsLimitPerBatch}") int maxAssignmentsLimitPerBatch)
{
if ( maxAssignmentsLimitPerBatch < 1_000 )
requiredFreeSpace = oneAndHalfGB;
else
requiredFreeSpace = oneAndHalfGB * (maxAssignmentsLimitPerBatch / 1_000);
logger.info("The \"requiredFreeSpace\" for the app to request new assignments, having \"maxAssignmentsLimitPerBatch\" equal to " + maxAssignmentsLimitPerBatch + " , is: " + (requiredFreeSpace / (1024 * 1024)) + " Mb");
}
@Scheduled(fixedDelay = 1) // Request the next batch immediately after the last one finishes.
public void handleNewAssignments() {
if ( AssignmentsHandler.shouldNotRequestMore ) {
// Here we will be right after the Worker has posted its last report. It is guaranteed that the Controller will not have processed it and have not requested the full-text files.
// We do not want to shut down the controller.
return;
}
if ( rootPath.getFreeSpace() < requiredFreeSpace ) {
// It's not safe to proceed with downloading more files and risk of "noSpaceLeft" error.
// Wait for the Controller to take the full-texts and any remaining files to be deleted, so that more free-space becomes available.
// We need to have some buffer zone for the ".tar" files which will be created from the already downloaded full-texts, when the Controller starts requesting them.
if ( GeneralController.shouldShutdownWorker ) { // Make sure the worker shuts-down, in case the user sends the relevant request, while the worker is stuck in a free-space check loop.
AssignmentsHandler.shouldNotRequestMore = true;
return;
}
logger.warn("The free space is running out (less than " + (requiredFreeSpace / (1024 * 1024)) + " Mb). Will avoid to get new assignments for the next 15 minutes.");
try {
Thread.sleep(900_000); // Sleep for 15 mins to stall the scheduler from retrying right away, thus giving time to the disk-space to be freed.
} catch (InterruptedException ie) {
logger.warn("Sleeping was interrupted!");
}
return; // Cause this method to be called again, so that the Free-space can be checked again before proceeding with new assignments.
}
if ( AssignmentsHandler.hadConnectionErrorOnRequest ) {
if ( GeneralController.shouldShutdownWorker ) { // Make sure the worker shuts-down, in case the user sends the relevant request, while the worker is stuck in a data-request error-loop.
AssignmentsHandler.shouldNotRequestMore = true;
return;
}
try {
Thread.sleep(900_000); // Sleep for 15 mins to stall the scheduler from retrying right away, thus giving time to the Controller to recover.
} catch (InterruptedException ie) {
logger.warn("Sleeping was interrupted!");
} finally {
AssignmentsHandler.hadConnectionErrorOnRequest = false;
}
}
assignmentsHandler.handleAssignments();
}
@Scheduled(initialDelay = 900_000, fixedDelay = 1_800_000) // InitialDelay = 15 mins, FixedDelay = 30 mins.
//@Scheduled(initialDelay = 60_000, fixedDelay = 60_000) // Just for testing (every 60 secs).
public void checkIfShouldShutdown()
{
if ( !GeneralController.shouldShutdownWorker && !AssignmentsHandler.shouldNotRequestMore )
return;
// Check if the full-texts have been delivered to the Controller.
// In case some files have been left behind due to an error. DO not shutdown, but wait for the other scheduled task to clean the in the right time and then shutdown.
File fullTextsBaseDir = new File(fileStorageService.assignmentsBaseLocation);
if ( fullTextsBaseDir.isDirectory() ) {
File[] fulltextSubDirs = fullTextsBaseDir.listFiles(File::isDirectory);
if ( fulltextSubDirs == null ) {
logger.error("There was an error when getting the subDirs of \"fullTextsBaseDir\": " + fullTextsBaseDir);
return; // It's NOT safe to shut down.
}
if ( fulltextSubDirs.length > 0 ) {
logger.warn("The base full-texts directory still has sub-directories with full-texts, wait for the Controller to take all the files, or wait some time to past before they are deleted. Then the Worker will shut down.");
// Some subDirs may be left behind due to some error when processing the WorkerReport. In that case,
return;
} else
logger.debug("The \"fullTextsBaseDir\" is empty. Shutting down..");
} else
logger.warn("The base full-texts directory was not found! Shutting down.."); // This base-directory should exist during run-time, but we can proceed with shutting down the Service.
connWithController.postShutdownReportToController(workerId);
UrlsWorkerApplication.gentleAppShutdown();
}
private static final Pattern ASSIGNMENTS_COUNTER = Pattern.compile(".*assignments_([\\d]+).*");
private static final double hoursToWaitBeforeDeletion = 36.0;
@Scheduled(initialDelay = 21_600_000, fixedDelay = 21_600_000) // InitialDelay & FixedDelay = 36 hours.
//@Scheduled(initialDelay = 120_000, fixedDelay = 120_000) // Just for testing (every 2 mins).
public void checkAndDeleteOldFiles() {
// For any reason the Worker-report connection with the Controller may fail, but the Controller will continue requesting the full-text batches.
// Every X hours, check the last modification data of each "assignments_X_fulltexts" sub-directory.
// All sub-directories will have some files inside, as the duplicate files will not have been requested by the Controller, thus not been deleted after a batch.
// Also, the last .zstd file will be inside.
// The way to know for which directory, we have a problem, is either by the amount of files or by the WorkerReport (in a separate directory).
// Even though we delete the full-texts batch-by-batch, some files may not have been previously deleted, since they may be duplicates of others found by previous assignments-batches
// and thus, they may have not been requested by the Controller (and thus not deleted after transferring the batches).
// Also, the ".tar.zstd" file of last batch will be deleted here, as well as the whole directory itself.
logger.debug("Going to check if any leftover full-texts exist and delete them.");
int usableDirsNum = 0;
try {
File fullTextsBaseDir = new File(fileStorageService.assignmentsBaseLocation);
if ( !fullTextsBaseDir.isDirectory() ) {
logger.error("The \"fullTextsBaseDir\" (" + fileStorageService.assignmentsBaseLocation + ") does not exist!"); // This base dir should always exist during execution!
return;
}
File[] fulltextSubDirs = fullTextsBaseDir.listFiles(File::isDirectory);
if ( fulltextSubDirs == null ) {
logger.error("There was an error when getting the subDirs of \"fullTextsBaseDir\": " + fullTextsBaseDir);
return;
}
usableDirsNum = fulltextSubDirs.length;
if ( usableDirsNum == 0 ) {
logger.debug("The \"fullTextsBaseDir\" is empty, so there is nothing to delete.");
return;
}
long currentTime = System.currentTimeMillis();
// Loop through the array and print only the directories
for ( File subDir : fulltextSubDirs ) {
long lastModified = subDir.lastModified();
if ( logger.isTraceEnabled() )
logger.trace("The subDir \"" + subDir.getName() + "\" was last accessed in: " + new Date(lastModified));
// Get the difference in hours. /1000 to get seconds, /60 to get minutes and /60 to get hours.
double elapsedHours = (double) (currentTime - lastModified) / (1000 * 60 * 60);
if ( elapsedHours > hoursToWaitBeforeDeletion ) {
// Enough time has passed, the directory should be deleted immediately.
String subDirName = subDir.getName();
logger.warn("The subDir \"" + subDirName + "\" was accessed " + elapsedHours + " hours ago (passed the " + hoursToWaitBeforeDeletion + " hours limit) and will be deleted, along with the related WorkerReport.");
FullTextsController.deleteDirectory(subDir);
// Extract the "assignmentsCounter" from subDir's name, in order to delete the right report file.
Matcher matcher = ASSIGNMENTS_COUNTER.matcher(subDirName);
if ( matcher.matches() ) {
String assingmentsCounterString = matcher.group(1);
if ( (assingmentsCounterString != null) && !assingmentsCounterString.isEmpty()) {
if ( FullTextsController.deleteFile(this.workerReportsDirPath + this.workerId + "_assignments_" + assingmentsCounterString + "_report.json") )
logger.warn("The subDir \"" + subDirName + "\" probably contains some failed file, since the workerReport for assignments_" + assingmentsCounterString + " was deleted only now, which means the Controller failed to successfully process the results of those assignments.");
}
else
logger.error("The subDir \"" + subDirName + "\" has an invalid name! It does not contains the assignmentsCounter!");
} else
logger.error("The subDir \"" + subDirName + "\" has an invalid name! It could not be matched with regex: " + ASSIGNMENTS_COUNTER);
usableDirsNum --; // Reduce the usableDirsNum even if some directories failed to be deleted, since the failed-dirs are not usable anyway.
}
}
} catch (Exception e) {
logger.error("", e);
return;
}
// After the cleanup of the remaining files, make sure we shutdown the Worker if it is desired.
// Do this here, instead of waiting further, for the "checkIfShouldShutdown()" method to be called and shut it down.
if ( (GeneralController.shouldShutdownWorker || AssignmentsHandler.shouldNotRequestMore)
&& (usableDirsNum == 0) ) { // Shutdown only if there are no "usable" directories left.
connWithController.postShutdownReportToController(workerId);
UrlsWorkerApplication.gentleAppShutdown();
}
}
}