- Fix not acquiring the full workerReport when retrying it, with the scheduler.
- Improve error-handling in the "inspectWorkerReportsAndTakeAction" process. - Code polishing.
This commit is contained in:
parent
e72a4d3d10
commit
1c8f3765ca
|
@ -287,7 +287,8 @@ public class ScheduledTasks {
|
||||||
if ( actionForWorkerReports != ActionForWorkerReports.process_previous_failed )
|
if ( actionForWorkerReports != ActionForWorkerReports.process_previous_failed )
|
||||||
currentTime = System.currentTimeMillis();
|
currentTime = System.currentTimeMillis();
|
||||||
|
|
||||||
int numFilesHandled = 0;
|
int numWorkerReportsToBeHandled = 0;
|
||||||
|
int numWorkerReportsHandled = 0;
|
||||||
|
|
||||||
for ( File workerReportSubDir : workerReportSubDirs )
|
for ( File workerReportSubDir : workerReportSubDirs )
|
||||||
{
|
{
|
||||||
|
@ -306,8 +307,9 @@ public class ScheduledTasks {
|
||||||
|
|
||||||
if ( actionForWorkerReports == ActionForWorkerReports.process_previous_failed ) {
|
if ( actionForWorkerReports == ActionForWorkerReports.process_previous_failed ) {
|
||||||
if ( workerReportName.contains("failed") ) {
|
if ( workerReportName.contains("failed") ) {
|
||||||
processWorkerReport(workerReportFile, workerReportName);
|
numWorkerReportsToBeHandled ++;
|
||||||
numFilesHandled ++;
|
if ( processFailedWorkerReport(workerReportFile, workerReportName) )
|
||||||
|
numWorkerReportsHandled ++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
long lastModified = workerReportFile.lastModified();
|
long lastModified = workerReportFile.lastModified();
|
||||||
|
@ -320,24 +322,31 @@ public class ScheduledTasks {
|
||||||
if ( actionForWorkerReports == ActionForWorkerReports.process_current_failed ) {
|
if ( actionForWorkerReports == ActionForWorkerReports.process_current_failed ) {
|
||||||
if ( (elapsedDays >= daysToWaitBeforeProcessing) && (elapsedDays <= maxDaysToAllowProcessing)
|
if ( (elapsedDays >= daysToWaitBeforeProcessing) && (elapsedDays <= maxDaysToAllowProcessing)
|
||||||
&& workerReportName.contains("failed") ) {
|
&& workerReportName.contains("failed") ) {
|
||||||
processWorkerReport(workerReportFile, workerReportName);
|
numWorkerReportsToBeHandled ++;
|
||||||
numFilesHandled ++;
|
if ( processFailedWorkerReport(workerReportFile, workerReportName) )
|
||||||
|
numWorkerReportsHandled ++;
|
||||||
}
|
}
|
||||||
} else { // Deletion..
|
} else { // Deletion..
|
||||||
if ( elapsedDays > daysToWaitBeforeDeletion ) {
|
if ( elapsedDays > daysToWaitBeforeDeletion ) {
|
||||||
// Enough time has passed, the directory should be deleted immediately.
|
// Enough time has passed, the directory should be deleted immediately.
|
||||||
logger.warn("The workerReport \"" + workerReportName + "\" was accessed " + elapsedDays + " days ago (passed the " + daysToWaitBeforeDeletion + " days limit) and will be deleted.");
|
logger.warn("The workerReport \"" + workerReportName + "\" was accessed " + elapsedDays + " days ago (passed the " + daysToWaitBeforeDeletion + " days limit) and will be deleted.");
|
||||||
fileUtils.deleteFile(workerReportFile.getAbsolutePath());
|
numWorkerReportsToBeHandled ++;
|
||||||
numFilesHandled ++;
|
if ( fileUtils.deleteFile(workerReportFile.getAbsolutePath()) // Either successful or failed.
|
||||||
if ( workerReportName.contains("failed") ) // (For the successful, they have already been deleted)
|
&& workerReportName.contains("failed") // If this has failed, then delete the assignment-records. For the successful, they have already been deleted.
|
||||||
extractAssignmentsCounterAndDeleteRelatedAssignmentRecords(workerReportName);
|
&& extractAssignmentsCounterAndDeleteRelatedAssignmentRecords(workerReportName) )
|
||||||
|
numWorkerReportsHandled ++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}// end reports loop
|
}// end reports loop
|
||||||
}// end sub-dirs loop
|
}// end sub-dirs loop
|
||||||
|
|
||||||
logger.debug("The action \"" + actionForWorkerReports.toString() + "\" was imposed to " + numFilesHandled + " workerReports.");
|
String initMsg = "The action \"" + actionForWorkerReports.toString() + "\" was imposed to " + numWorkerReportsHandled + " workerReports.";
|
||||||
|
int numWorkerReportsFailedToBeHandled = (numWorkerReportsToBeHandled - numWorkerReportsHandled);
|
||||||
|
if ( numWorkerReportsFailedToBeHandled > 0 )
|
||||||
|
logger.warn(initMsg + " " + numWorkerReportsFailedToBeHandled + " workerReports failed to be handled!");
|
||||||
|
else
|
||||||
|
logger.debug(initMsg);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("", e);
|
logger.error("", e);
|
||||||
}
|
}
|
||||||
|
@ -364,15 +373,21 @@ public class ScheduledTasks {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void processWorkerReport(File workerReportFile, String workerReportName)
|
private boolean processFailedWorkerReport(File workerReportFile, String workerReportName)
|
||||||
{
|
{
|
||||||
logger.debug("Going to load and parse the workerReport: " + workerReportName);
|
logger.debug("Going to load and parse the workerReport: " + workerReportName);
|
||||||
|
|
||||||
// Load the file's json content into a "WorkerReport" object.
|
// Load the file's json content into a "WorkerReport" object.
|
||||||
try ( BufferedReader bfRead = new BufferedReader(new FileReader(workerReportFile)) ) { // The default size is sufficient here.
|
try ( BufferedReader bfRead = new BufferedReader(new FileReader(workerReportFile)) ) { // The default size is sufficient here.
|
||||||
jsonStringBuilder.append(bfRead.readLine());
|
String line;
|
||||||
|
while ( (line = bfRead.readLine()) != null ) // The line, without any line-termination-characters.
|
||||||
|
jsonStringBuilder.append(line).append("\n");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Problem when acquiring the contents of workerReport \"" + workerReportName + "\"");
|
logger.error("Problem when acquiring the contents of workerReport \"" + workerReportName + "\"");
|
||||||
|
jsonStringBuilder.setLength(0); // Reset the StringBuilder without de-allocating.
|
||||||
|
// This failed report will be retried by the scheduler, inside the allowed time-period.
|
||||||
|
// If it never reached success, then its file and the assignment-records will be deleted by the scheduler in the following days.
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
WorkerReport workerReport = null;
|
WorkerReport workerReport = null;
|
||||||
|
@ -380,17 +395,20 @@ public class ScheduledTasks {
|
||||||
workerReport = gson.fromJson(jsonStringBuilder.toString(), WorkerReport.class);
|
workerReport = gson.fromJson(jsonStringBuilder.toString(), WorkerReport.class);
|
||||||
} catch (JsonSyntaxException jse) {
|
} catch (JsonSyntaxException jse) {
|
||||||
logger.error("Problem when parsing the workerReport \"" + workerReportName + "\": " + jse.getMessage());
|
logger.error("Problem when parsing the workerReport \"" + workerReportName + "\": " + jse.getMessage());
|
||||||
|
// Same handling as mentioned in the above "catch".
|
||||||
|
return false;
|
||||||
|
} finally {
|
||||||
|
jsonStringBuilder.setLength(0); // Reset the StringBuilder without de-allocating.
|
||||||
}
|
}
|
||||||
|
|
||||||
this.urlsController.addWorkerReport(workerReport); // This will check and add the workerReport to the background jobs' scheduler.
|
this.urlsController.addWorkerReport(workerReport); // This will check and add the workerReport to the background jobs' scheduler.
|
||||||
|
return true;
|
||||||
jsonStringBuilder.setLength(0); // Reset the StringBuilder without de-allocating.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static final Pattern ASSIGNMENTS_COUNTER_REPORT_FILTER = Pattern.compile(".*([\\d]+)_report[\\w]*.json$");
|
private static final Pattern ASSIGNMENTS_COUNTER_REPORT_FILTER = Pattern.compile(".*([\\d]+)_report[\\w]*.json$");
|
||||||
|
|
||||||
private void extractAssignmentsCounterAndDeleteRelatedAssignmentRecords(String workerReportName)
|
private boolean extractAssignmentsCounterAndDeleteRelatedAssignmentRecords(String workerReportName)
|
||||||
{
|
{
|
||||||
// We need to delete the records from the "assignment" table, in order for them to be retried in the future.
|
// We need to delete the records from the "assignment" table, in order for them to be retried in the future.
|
||||||
|
|
||||||
|
@ -398,24 +416,25 @@ public class ScheduledTasks {
|
||||||
Matcher matcher = ASSIGNMENTS_COUNTER_REPORT_FILTER.matcher(workerReportName);
|
Matcher matcher = ASSIGNMENTS_COUNTER_REPORT_FILTER.matcher(workerReportName);
|
||||||
if ( ! matcher.matches() ) {
|
if ( ! matcher.matches() ) {
|
||||||
logger.error("Could not match the report \"" + workerReportName + "\" with regex: " + ASSIGNMENTS_COUNTER_REPORT_FILTER);
|
logger.error("Could not match the report \"" + workerReportName + "\" with regex: " + ASSIGNMENTS_COUNTER_REPORT_FILTER);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
String counterString = matcher.group(1);
|
String counterString = matcher.group(1);
|
||||||
if ( (counterString == null) || counterString.isEmpty() ) {
|
if ( (counterString == null) || counterString.isEmpty() ) {
|
||||||
logger.error("Could not extract the \"assignmentCounter\" from report: " + workerReportName);
|
logger.error("Could not extract the \"assignmentCounter\" from report: " + workerReportName);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
int curReportAssignmentsCounter;
|
int curReportAssignmentsCounter;
|
||||||
try {
|
try {
|
||||||
curReportAssignmentsCounter = Integer.parseInt(counterString);
|
curReportAssignmentsCounter = Integer.parseInt(counterString);
|
||||||
} catch (NumberFormatException nfe) {
|
} catch (NumberFormatException nfe) {
|
||||||
logger.error("Could not parse the \"curReportAssignmentsCounter\" (" + counterString + ") which was extracted from report: " + workerReportName);
|
logger.error("Could not parse the \"curReportAssignmentsCounter\" (" + counterString + ") which was extracted from report: " + workerReportName);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
DatabaseConnector.databaseLock.lock();
|
DatabaseConnector.databaseLock.lock();
|
||||||
urlsService.deleteAssignmentsBatch(curReportAssignmentsCounter); // Any error-log is written inside.
|
urlsService.deleteAssignmentsBatch(curReportAssignmentsCounter); // Any error-log is written inside.
|
||||||
DatabaseConnector.databaseLock.unlock();
|
DatabaseConnector.databaseLock.unlock();
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -402,12 +402,11 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
// If this method exits sooner, due tio an error, then the assignments are not deleted in order to wait for the schedulers to retry them and not be given to workers, to avoid reprocessing the urls.
|
// If this method exits sooner, due tio an error, then the assignments are not deleted in order to wait for the schedulers to retry them and not be given to workers, to avoid reprocessing the urls.
|
||||||
DatabaseConnector.databaseLock.lock();
|
DatabaseConnector.databaseLock.lock();
|
||||||
String deleteErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter);
|
String deleteErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter);
|
||||||
|
DatabaseConnector.databaseLock.unlock();
|
||||||
if ( deleteErrorMsg != null ) {
|
if ( deleteErrorMsg != null ) {
|
||||||
DatabaseConnector.databaseLock.unlock();
|
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, deleteErrorMsg);
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, deleteErrorMsg);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
DatabaseConnector.databaseLock.unlock();
|
|
||||||
|
|
||||||
// For every "numOfWorkers" assignment-batches that go to workers, we merge the tables, once a workerReport comes in.
|
// For every "numOfWorkers" assignment-batches that go to workers, we merge the tables, once a workerReport comes in.
|
||||||
// After the first few increases of "assignmentsBatchCounter" until all workers get assignment-batches,
|
// After the first few increases of "assignmentsBatchCounter" until all workers get assignment-batches,
|
||||||
|
@ -471,7 +470,7 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
{
|
{
|
||||||
logger.debug("Going to merge the parquet files for the tables which were altered.");
|
logger.debug("Going to merge the parquet files for the tables which were altered.");
|
||||||
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
|
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
|
||||||
// This means that over time a table may have thousand of parquet files and the search through them will be very slow. Thus, we merge them every now and then.
|
// This means that over time a table may have thousands of parquet files and the search through them will be very slow. Thus, we merge them every now and then.
|
||||||
|
|
||||||
String mergeErrorMsg;
|
String mergeErrorMsg;
|
||||||
|
|
||||||
|
|
|
@ -183,7 +183,7 @@ public class FileUtils {
|
||||||
|
|
||||||
public static final DecimalFormat df = new DecimalFormat("0.00");
|
public static final DecimalFormat df = new DecimalFormat("0.00");
|
||||||
|
|
||||||
// The following regex might be usefull in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension".
|
// The following regex might be useful in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension".
|
||||||
// Possible full-filenames are: "path1/path2/ID.pdf", "ID2.pdf", "path1/path2/ID(12).pdf", "ID2(25).pdf"
|
// Possible full-filenames are: "path1/path2/ID.pdf", "ID2.pdf", "path1/path2/ID(12).pdf", "ID2(25).pdf"
|
||||||
public static final Pattern FILENAME_ID_EXTENSION = Pattern.compile("(?:([^.()]+)/)?((([^/()]+)[^./]*)(\\.[\\w]{2,10}))$");
|
public static final Pattern FILENAME_ID_EXTENSION = Pattern.compile("(?:([^.()]+)/)?((([^/()]+)[^./]*)(\\.[\\w]{2,10}))$");
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue