Upgrade management of failed workerReports:

- Upon completing processing a workerReport, the name of the json-file will be appended with "successful" or "failed".
- Avoid deleting immediately the failed workerReports.
- Add a scheduling task to process leftover failed workerReports from previous executions of the service, only once, 12 hours after startup, in order for the workers to have participated and filled the "workersInfoMap".
- Add a scheduling task to process leftover failed workerReports from the current execution, regularly.
- Fix not iterating through the workers' subDirs when checking the last-access-time of workerReports.
- Fix not deleting the assignment records from the DB, when a failed leftover workerReport gets deleted.
- Code refactoring.
This commit is contained in:
Lampros Smyrnaios 2023-09-01 15:10:58 +03:00
parent 5c459a3a16
commit febe2b212c
3 changed files with 258 additions and 48 deletions

View File

@ -1,9 +1,14 @@
package eu.openaire.urls_controller.components;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import eu.openaire.urls_controller.Application;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.ShutdownController;
import eu.openaire.urls_controller.controllers.StatsController;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.UrlsServiceImpl;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import io.micrometer.core.instrument.MeterRegistry;
@ -15,7 +20,9 @@ import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -25,6 +32,8 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Component
@ -35,8 +44,15 @@ public class ScheduledTasks {
@Autowired
FileUtils fileUtils;
private StatsController statsController;
@Autowired
UrlsServiceImpl urlsService;
private final StatsController statsController;
private final UrlsController urlsController;
@Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentsLimit;
private final String workerReportsDirPath;
@ -49,13 +65,15 @@ public class ScheduledTasks {
public static final AtomicInteger numOfRecordsInspectedByServiceThroughCrawling = new AtomicInteger(0);
public ScheduledTasks(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath, StatsController statsController, MeterRegistry registry)
public ScheduledTasks(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath, StatsController statsController, UrlsController urlsController, MeterRegistry registry)
{
if ( !workerReportsDirPath.endsWith("/") )
workerReportsDirPath += "/";
this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later.
this.statsController = statsController;
this.urlsController = urlsController;
jsonStringBuilder = new StringBuilder(assignmentsLimit * 500);
registry.gauge("numOfAllPayloads", numOfAllPayloads);
registry.gauge("numOfPayloadsAggregatedByServiceThroughCrawling", numOfPayloadsAggregatedByServiceThroughCrawling);
@ -128,54 +146,64 @@ public class ScheduledTasks {
// If one worker has crashed, then it will have not informed the Controller. So the controller will think that it is still running and will not shut down.
// Any left-over worker-reports are kept to be retried next time the Controller starts.
Application.gentleAppShutdown();
}
private static final double daysToWaitBeforeDeletion = 7.0;
@Value("${services.pdfaggregation.controller.isTestEnvironment}")
private boolean isTestEnvironment;
@Scheduled(initialDelay = 43_200_000, fixedDelay = Long.MAX_VALUE) // Run 12 hours after startup (in order for all workers to be back online) and then never again.
//@Scheduled(initialDelay = 420_000, fixedDelay = Long.MAX_VALUE) // Just for testing (7 mins after startup, only once).
public void checkAndProcessOldAndUnsuccessfulWorkerReports()
{
// We make sure an initial delay of some minutes is in place before this is executed, since we have to make sure all workers are up and running in order for them to be able to answer the full-texts-requests.
if ( UrlsController.numOfWorkers.get() == 0 ) {
long timeToWait = (isTestEnvironment ? 1_200_000 : 43_200_000); // 10 mins | 12 hours
logger.warn("None of the workers have participated in the service yet. Will wait " + ((timeToWait /1000) /60) + " minutes and try again..");
try {
Thread.sleep(timeToWait);
} catch (InterruptedException ie) {
logger.warn("The wait-period was interrupted! Will try either way.");
}
if ( UrlsController.numOfWorkers.get() == 0 ) {
logger.error("None of the workers have participated in the service yet again. Will not process any leftover workerReports!");
return;
}
}
// In case the Controller processes older, failed workerReports, it may be the case that the worker to which the report comes from, is not currently online.
// So its IP won't be stored in the map. Even if we have its IP in the report itself, it's of no use if the worker is offline, since we cannot get the full-texts.
// We do not care for attempts, if the payloads are not there.
inspectWorkerReportsAndTakeAction(ActionForWorkerReports.process_previous_failed);
// In case a worker-report among those "left-overs" fails again, then it will just be removed by the scheduler, when 7 days pass, and it's still there.
}
@Scheduled(initialDelay = 86_400_000, fixedDelay = 86_400_000) // Run after one day, every day.
//@Scheduled(initialDelay = 1_200_000, fixedDelay = 1_200_000) // Just for testing (every 1200 secs).
public void checkAndProcessRecentUnsuccessfulWorkerReports()
{
// After some hours from their failure and before the worker delete the full-texts as obsolete, we re-try these workerReports,
// hopping that any previous error was temporary.
inspectWorkerReportsAndTakeAction(ActionForWorkerReports.process_current_failed);
}
@Scheduled(initialDelay = 604_800_000, fixedDelay = 604_800_000) // Run every 7 days.
//@Scheduled(initialDelay = 1_200_000, fixedDelay = 1_200_000) // Just for testing (every 1200 secs).
public void checkAndDeleteUnsuccessfulWorkerReports()
public void checkAndDeleteOldWorkerReports()
{
logger.debug("Going to check and remove any unsuccessful workerReports, which are more than 7 days old.");
try {
File workerReportsDir = new File(workerReportsDirPath);
if ( !workerReportsDir.isDirectory() ) {
logger.error("The \"workerReportsDir\" (" + workerReportsDirPath + ") does not exist!"); // This base dir should always exist!
return;
}
// The failed workerReports are kept for 7 days, for manual debugging purposes. Even if the service does not make more than 2 attempts to import them.
File[] workerReports = workerReportsDir.listFiles(File::isFile);
if ( workerReports == null ) {
logger.error("There was an error when getting the subDirs of \"workerReportsDir\": " + workerReportsDir);
return;
} else if ( workerReports.length == 0 ) {
logger.debug("The \"workerReportsDir\" is empty, so there is nothing to delete.");
return;
}
long currentTime = System.currentTimeMillis();
for ( File workerReport : workerReports ) {
long lastModified = workerReport.lastModified();
if ( logger.isTraceEnabled() )
logger.trace("The workerReport \"" + workerReport.getName() + "\" was last accessed in: " + new Date(lastModified));
// Get the difference in hours. /1000 to get seconds, /60 to get minutes, /60 to get hours and /24 to get days.
double elapsedDays = (double) (currentTime - lastModified) / (1000 * 60 * 60 * 24);
if ( elapsedDays > daysToWaitBeforeDeletion ) {
// Enough time has passed, the directory should be deleted immediately.
String workerReportName = workerReport.getName();
logger.warn("The workerReport \"" + workerReportName + "\" was accessed " + elapsedDays + " days ago (passed the " + daysToWaitBeforeDeletion + " days limit) and will be deleted.");
fileUtils.deleteFile(workerReport.getAbsolutePath());
}
}
} catch (Exception e) {
logger.error("", e);
}
inspectWorkerReportsAndTakeAction(ActionForWorkerReports.delete_old);
}
@ -222,4 +250,172 @@ public class ScheduledTasks {
// <numOfAggregatedPayloadsPerDatasource>, ..., <numOfBulkImportedPayloadsPerDatasource>, ...
}
enum ActionForWorkerReports {process_previous_failed, process_current_failed, delete_old}
// TODO - Maybe make these numbers configurable from the "application.yml" file.
private static final double daysToWaitBeforeDeletion = 7.0;
private static final double daysToWaitBeforeProcessing = 0.5; // 12 hours
private static final double maxDaysToAllowProcessing = 1.9; // 45.6 hours
// The Workers wait at most 48 hours before deleting the full-text files. So there is no point to try and process the report after that time-frame.
// These reports will have to wait a bit for the scheduler to assign them to threads, before actually being processed.
private static final Gson gson = new Gson(); // This is "transient" by default.
private static StringBuilder jsonStringBuilder = null;
private static final int daysDivisor = (1000 * 60 * 60 * 24); // In order to get the time-difference in days. We divide with: /1000 to get seconds, /60 to get minutes, /60 to get hours and /24 to get days.
private void inspectWorkerReportsAndTakeAction(ActionForWorkerReports actionForWorkerReports)
{
if ( actionForWorkerReports == ActionForWorkerReports.process_previous_failed )
logger.debug("Going to check and process any unsuccessful workerReports from the previous run.");
else if ( actionForWorkerReports == ActionForWorkerReports.process_current_failed )
logger.debug("Going to check and process any unsuccessful workerReports which are between " + daysToWaitBeforeProcessing + " and " + daysToWaitBeforeDeletion + " days old (inclusive).");
else
logger.debug("Going to check and remove any leftover workerReports, which are more than " + daysToWaitBeforeDeletion + " days old.");
// The failed workerReports are kept for 7 days, for manual debugging purposes. Even if the service does not make more than 2 attempts to import them.
try {
File[] workerReportSubDirs = getWorkerReportSubDirs();
if ( workerReportSubDirs == null )
return;
long currentTime = 0L;
if ( actionForWorkerReports != ActionForWorkerReports.process_previous_failed )
currentTime = System.currentTimeMillis();
int numFilesHandled = 0;
for ( File workerReportSubDir : workerReportSubDirs )
{
File[] workerReportFiles = workerReportSubDir.listFiles(File::isFile);
if (workerReportFiles == null) {
logger.error("There was an error when getting the workerReports of \"workerReportSubDir\": " + workerReportSubDir);
return;
} else if (workerReportFiles.length == 0) {
logger.debug("The \"workerReportsDir\" is empty, so there is nothing to take action on.");
return;
}
for ( File workerReportFile : workerReportFiles )
{
String workerReportName = workerReportFile.getName();
if ( actionForWorkerReports == ActionForWorkerReports.process_previous_failed ) {
if ( workerReportName.contains("failed") ) {
processWorkerReport(workerReportFile, workerReportName);
numFilesHandled ++;
}
} else {
long lastModified = workerReportFile.lastModified();
if ( logger.isTraceEnabled() )
logger.trace("The workerReport \"" + workerReportName + "\" was last accessed in: " + new Date(lastModified));
double elapsedDays = (double) (currentTime - lastModified) / daysDivisor;
if ( actionForWorkerReports == ActionForWorkerReports.process_current_failed ) {
if ( (elapsedDays >= daysToWaitBeforeProcessing) && (elapsedDays <= maxDaysToAllowProcessing)
&& workerReportName.contains("failed") ) {
processWorkerReport(workerReportFile, workerReportName);
numFilesHandled ++;
}
} else { // Deletion..
if ( elapsedDays > daysToWaitBeforeDeletion ) {
// Enough time has passed, the directory should be deleted immediately.
logger.warn("The workerReport \"" + workerReportName + "\" was accessed " + elapsedDays + " days ago (passed the " + daysToWaitBeforeDeletion + " days limit) and will be deleted.");
fileUtils.deleteFile(workerReportFile.getAbsolutePath());
numFilesHandled ++;
if ( workerReportName.contains("failed") ) // (For the successful, they have already been deleted)
extractAssignmentsCounterAndDeleteRelatedAssignmentRecords(workerReportName);
}
}
}
}// end reports loop
}// end sub-dirs loop
logger.debug("The action \"" + actionForWorkerReports.toString() + "\" was imposed to " + numFilesHandled + " workerReports.");
} catch (Exception e) {
logger.error("", e);
}
}
private File[] getWorkerReportSubDirs() throws Exception
{
File workerReportsDir = new File(workerReportsDirPath);
if ( !workerReportsDir.isDirectory() ) {
logger.error("The \"workerReportsDir\" (" + workerReportsDirPath + ") does not exist!"); // This base dir should always exist!
return null;
}
// The worker reports are inside "worker_X" sub-dirs.
File[] workerReportSubDirs = workerReportsDir.listFiles(File::isDirectory);
if ( workerReportSubDirs == null ) {
logger.error("There was an error when getting the subDirs of \"workerReportsDir\": " + workerReportsDir);
return null;
} else if ( workerReportSubDirs.length == 0 ) { // The worker_X sub-dirs do not normally get deleted. So this is a warning that either no workers are running (wasting time) or that something bad happened to thios directories.
logger.warn("The \"workerReportsDir\" is empty. None of the workers returned a workerReport, so there is nothing to process.");
return null;
} else
return workerReportSubDirs;
}
private void processWorkerReport(File workerReportFile, String workerReportName)
{
logger.debug("Going to load and parse the workerReport: " + workerReportName);
// Load the file's json content into a "WorkerReport" object.
try ( BufferedReader bfRead = new BufferedReader(new FileReader(workerReportFile)) ) { // The default size is sufficient here.
jsonStringBuilder.append(bfRead.readLine());
} catch (Exception e) {
logger.error("Problem when acquiring the contents of workerReport \"" + workerReportName + "\"");
}
WorkerReport workerReport = null;
try {
workerReport = gson.fromJson(jsonStringBuilder.toString(), WorkerReport.class);
} catch (JsonSyntaxException jse) {
logger.error("Problem when parsing the workerReport \"" + workerReportName + "\": " + jse.getMessage());
}
this.urlsController.addWorkerReport(workerReport); // This will check and add the workerReport to the background jobs' scheduler.
jsonStringBuilder.setLength(0); // Reset the StringBuilder without de-allocating.
}
private static final Pattern ASSIGNMENTS_COUNTER_REPORT_FILTER = Pattern.compile(".*([\\d]+)_report[\\w]*.json$");
private void extractAssignmentsCounterAndDeleteRelatedAssignmentRecords(String workerReportName)
{
// We need to delete the records from the "assignment" table, in order for them to be retried in the future.
// Take the counter from workerReportName.
Matcher matcher = ASSIGNMENTS_COUNTER_REPORT_FILTER.matcher(workerReportName);
if ( ! matcher.matches() ) {
logger.error("Could not match the report \"" + workerReportName + "\" with regex: " + ASSIGNMENTS_COUNTER_REPORT_FILTER);
return;
}
String counterString = matcher.group(1);
if ( (counterString == null) || counterString.isEmpty() ) {
logger.error("Could not extract the \"assignmentCounter\" from report: " + workerReportName);
return;
}
int curReportAssignmentsCounter;
try {
curReportAssignmentsCounter = Integer.parseInt(counterString);
} catch (NumberFormatException nfe) {
logger.error("Could not parse the \"curReportAssignmentsCounter\" (" + counterString + ") which was extracted from report: " + workerReportName);
return;
}
DatabaseConnector.databaseLock.lock();
urlsService.deleteAssignmentsBatch(curReportAssignmentsCounter); // Any error-log is written inside.
DatabaseConnector.databaseLock.unlock();
}
}

View File

@ -506,7 +506,7 @@ public class UrlsServiceImpl implements UrlsService {
}
private String deleteAssignmentsBatch(long givenAssignmentsBatchCounter)
public String deleteAssignmentsBatch(long givenAssignmentsBatchCounter)
{
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
@ -519,13 +519,28 @@ public class UrlsServiceImpl implements UrlsService {
private boolean postReportResultToWorker(String workerId, long assignmentRequestCounter, String errorMsg)
{
// Rename the worker-report to indicate success or failure.
String workerReportBaseName = this.workerReportsDirPath + File.separator + workerId + File.separator + workerId + "_assignments_" + assignmentRequestCounter + "_report";
File workerReport = new File(workerReportBaseName + ".json");
File renamedWorkerReport = new File(workerReportBaseName + ((errorMsg == null) ? "_successful.json" : "_failed.json"));
boolean wasWorkerReportRenamed = true;
try {
if ( !workerReport.renameTo(renamedWorkerReport) ) {
logger.warn("There was a problem when renaming the workerReport: " + workerReport.getName());
wasWorkerReportRenamed = false;
}
} catch (Exception e) {
logger.error("There was a problem when renaming the workerReport: " + workerReport.getName(), e);
wasWorkerReportRenamed = false;
}
// Get the IP of this worker.
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) {
logger.error("Could not find any info for worker with id: \"" + workerId +"\".");
return false;
}
String url = "http://" + workerInfo.getWorkerIP() + ":1881/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will not be null.
String url = "http://" + workerInfo.getWorkerIP() + ":1881/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will NOT be null.
if ( logger.isTraceEnabled() )
logger.trace("Going to \"postReportResultToWorker\": \"" + workerId + "\", for assignments_" + assignmentRequestCounter + ((errorMsg != null) ? "\nError: " + errorMsg : ""));
@ -533,13 +548,12 @@ public class UrlsServiceImpl implements UrlsService {
try {
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, errorMsg, String.class); // We may pass a "null" entity.
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode != HttpStatus.OK.value() ) {
if ( responseCode != 200 ) {
logger.error("HTTP-Connection problem with the submission of the \"postReportResultToWorker\" of worker \"" + workerId + "\" and assignments_" + assignmentRequestCounter + "! Error-code was: " + responseCode);
return false;
} else {
fileUtils.deleteFile(workerReportsDirPath + "/" + workerId + "/" + workerId + "_assignments_" + assignmentRequestCounter + "_report.json");
return true;
}
} else if ( errorMsg == null ) // If the worker was notified, then go delete the successful workerReport.
fileUtils.deleteFile(wasWorkerReportRenamed ? renamedWorkerReport.getAbsolutePath() : workerReport.getAbsolutePath());
return true;
} catch (HttpServerErrorException hsee) {
logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage());
return false;

View File

@ -707,7 +707,7 @@ public class FileUtils {
public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads)
{
if ( shouldLockThreads )
if ( shouldLockThreads ) // In case multiple threads write to the same file. for ex. during the bulk-import procedure.
fileWriteLock.lock();
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FileUtils.tenMb) )