- Improve error-handling in "S3ObjectStore.emptyBucket()".

- Change some log-levels.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2024-03-11 16:17:32 +02:00
parent 8f18008001
commit 1048463ca0
7 changed files with 38 additions and 35 deletions

View File

@ -204,7 +204,7 @@ public class ScheduledTasks {
{
// We make sure an initial delay of some minutes is in place before this is executed, since we have to make sure all workers are up and running in order for them to be able to answer the full-texts-requests.
if ( UrlsController.numOfWorkers.get() == 0 ) {
if ( UrlsController.numOfActiveWorkers.get() == 0 ) {
long timeToWait = (isTestEnvironment ? 1_200_000 : 43_200_000); // 10 mins | 12 hours
logger.warn("None of the workers is participating in the service, at the moment. Will wait " + ((timeToWait /1000) /60) + " minutes and try again..");
try {
@ -212,8 +212,8 @@ public class ScheduledTasks {
} catch (InterruptedException ie) {
logger.warn("The wait-period was interrupted! Will try either way.");
}
if ( UrlsController.numOfWorkers.get() == 0 ) {
logger.error("None of the workers have participated in the service yet again. Will not process any leftover workerReports!");
if ( UrlsController.numOfActiveWorkers.get() == 0 ) {
logger.error("None of the workers is participating in the service, yet again. Will not process any leftover workerReports!");
return;
}
}
@ -249,7 +249,6 @@ public class ScheduledTasks {
}
@Scheduled(initialDelay = 259_200_000, fixedDelay = 259_200_000) // Run every 3 days. 3 days after startup.
//@Scheduled(initialDelay = 1_200_000, fixedDelay = 1_200_000) // Just for testing (every 1200 secs).
public void checkAndDeleteUnhandledAssignments()
@ -261,7 +260,7 @@ public class ScheduledTasks {
// The assignments just remain in the table, and the urls cannot be rechecked.
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DAY_OF_MONTH, - 3); // Subtract 3 from current Date.
calendar.add(Calendar.DAY_OF_MONTH, -3); // Subtract 3 from current Date.
DatabaseConnector.databaseLock.lock();
urlsService.deleteAssignmentsWithOlderDate(calendar.getTimeInMillis()); // Any error-log is written inside.

View File

@ -153,7 +153,7 @@ public class ShutdownController {
logger.info(initMsg);
workerInfo.setHasShutdown(true); // This will update the map.
UrlsController.numOfWorkers.decrementAndGet();
UrlsController.numOfActiveWorkers.decrementAndGet();
// Return "HTTP-OK" to this worker. If this was part of a shutdown-service request, then wait for the scheduler to check and shutdown the service.
return ResponseEntity.ok().build();

View File

@ -45,8 +45,7 @@ public class UrlsController {
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
public static final AtomicInteger numOfWorkers = new AtomicInteger(0);
public static final AtomicInteger numOfActiveWorkers = new AtomicInteger(0);
public static ExecutorService backgroundExecutor;
@ -121,12 +120,12 @@ public class UrlsController {
if ( workerInfo.getHasShutdown() ) {
logger.info("The worker with id \"" + workerId + "\" was restarted.");
workerInfo.setHasShutdown(false);
numOfWorkers.incrementAndGet();
numOfActiveWorkers.incrementAndGet();
}
} else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "] in memory.");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
numOfWorkers.incrementAndGet();
numOfActiveWorkers.incrementAndGet();
}
return urlsService.getAssignments(workerId, assignmentsLimit);

View File

@ -100,15 +100,8 @@ public class UrlsServiceImpl implements UrlsService {
// Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern:
// ("ID_1", "ID_2", ...)
final StringBuilder sb = new StringBuilder((exclusionListSize * 46) + (exclusionListSize -1) +2 );
sb.append("(");
for ( int i=0; i < exclusionListSize; ++i ) {
sb.append("\"").append(excludedIDs.get(i)).append("\"");
if ( i < (exclusionListSize -1) )
sb.append(", ");
}
sb.append(")");
excludedDatasourceIDsStringList = sb.toString();
int stringBuilderCapacity = ((exclusionListSize * 46) + (exclusionListSize -1) +2);
excludedDatasourceIDsStringList = FileUtils.getQueryListString(excludedIDs, exclusionListSize, stringBuilderCapacity);
logger.info("The following bulkImport data-sources will be excluded from crawling: " + excludedDatasourceIDsStringList);
}
@ -289,7 +282,7 @@ public class UrlsServiceImpl implements UrlsService {
// Nothing to post to the Worker, since we do not have the worker's info.
// Rename the worker-report-file to indicate its "failure", so that the scheduler can pick it up and retry processing it.
String workerReportBaseName = this.workerReportsDirPath + File.separator + curWorkerId + File.separator + curWorkerId + "_assignments_" + curReportAssignmentsCounter + "_report";
getRenamedWorkerReport(workerReportBaseName, new File(workerReportBaseName + ".json"), "No info was found for worker: " + curWorkerId); // It may return null.
renameAndGetWorkerReportFile(workerReportBaseName, new File(workerReportBaseName + ".json"), "No info was found for worker: " + curWorkerId); // It may return null.
return false;
} else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Problem with the Impala-database!");
@ -421,7 +414,7 @@ public class UrlsServiceImpl implements UrlsService {
}
// For every "numOfWorkers" assignment-batches that get processed, we merge the "attempts" and "payload_aggregated" tables.
if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) // The workersNum will not be zero!
if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfActiveWorkers.get()) == 0 ) // The workersNum will not be zero!
if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, hasFulltexts) )
// The "postReportResultToWorker()" was called inside.
return false;
@ -538,7 +531,7 @@ public class UrlsServiceImpl implements UrlsService {
// Rename the worker-report to indicate success or failure.
String workerReportBaseName = this.workerReportsDirPath + File.separator + workerId + File.separator + workerId + "_assignments_" + assignmentRequestCounter + "_report";
File workerReport = new File(workerReportBaseName + ".json");
File renamedWorkerReport = getRenamedWorkerReport(workerReportBaseName, workerReport, errorMsg); // It may return null.
File renamedWorkerReport = renameAndGetWorkerReportFile(workerReportBaseName, workerReport, errorMsg); // It may return null.
// Get the IP of this worker.
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
@ -580,7 +573,7 @@ public class UrlsServiceImpl implements UrlsService {
}
private static File getRenamedWorkerReport(String workerReportBaseName, File workerReport, String errorMsg)
private static File renameAndGetWorkerReportFile(String workerReportBaseName, File workerReport, String errorMsg)
{
File renamedWorkerReport = null;
try {

View File

@ -779,14 +779,7 @@ public class FileUtils {
// Prepare the "urlsToRetrieveRelatedIDs" to be used inside the "getDataForPayloadPrefillQuery". Create the following string-pattern: ("URL_1", "URL_2", ...)
int urlsToRetrieveRelatedIDsSize = urlsToRetrieveRelatedIDs.size();
StringBuilder relatedIDsStringBuilder = new StringBuilder(urlsToRetrieveRelatedIDsSize * 100);
relatedIDsStringBuilder.append("(");
for ( int i=0; i < urlsToRetrieveRelatedIDsSize; ++i ) {
relatedIDsStringBuilder.append("\"").append(urlsToRetrieveRelatedIDs.get(i)).append("\"");
if ( i < (urlsToRetrieveRelatedIDsSize -1) )
relatedIDsStringBuilder.append(", ");
}
relatedIDsStringBuilder.append(")");
int stringBuilderCapacity = (urlsToRetrieveRelatedIDsSize * 100);
// Get the id and url of any
String getDataForPayloadPrefillQuery = "select distinct pu.id, pu.url\n" +
@ -794,7 +787,7 @@ public class FileUtils {
"left anti join " + DatabaseConnector.databaseName + ".attempt a on a.id=pu.id and a.original_url=pu.url\n" +
"left anti join " + DatabaseConnector.databaseName + ".payload p on p.id=pu.id and p.original_url=pu.url\n" +
"left anti join " + DatabaseConnector.databaseName + ".assignment asgn on asgn.id=pu.id and asgn.original_url=pu.url\n" +
"where pu.url in " + relatedIDsStringBuilder;
"where pu.url in " + getQueryListString(urlsToRetrieveRelatedIDs, urlsToRetrieveRelatedIDsSize, stringBuilderCapacity);
//logger.trace("getDataForPayloadPrefillQuery:\n" + getDataForPayloadPrefillQuery);
@ -914,4 +907,17 @@ public class FileUtils {
return null;
}
public static String getQueryListString(List<String> list, int fileHashesListSize, int stringBuilderCapacity) {
StringBuilder sb = new StringBuilder(stringBuilderCapacity);
sb.append("(");
for ( int i=0; i < fileHashesListSize; ++i ) {
sb.append("\"").append(list.get(i)).append("\"");
if ( i < (fileHashesListSize -1) )
sb.append(", ");
}
sb.append(")");
return sb.toString();
}
}

View File

@ -269,7 +269,7 @@ public class ParquetFileUtils {
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No attempts are available to be inserted to the database!");
logger.error("No attempts are available to be inserted to the database!"); // This should have been caught earlier.
return false;
}
@ -319,7 +319,7 @@ public class ParquetFileUtils {
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No payloads are available to be inserted to the database!");
logger.error("No payloads are available to be inserted to the database!"); // This should have been caught earlier.
return false;
}

View File

@ -122,7 +122,13 @@ public class S3ObjectStore {
logger.warn("Going to " + (shouldDeleteBucket ? "delete" : "empty") + " bucket \"" + bucketName + "\"!");
// First list the objects of the bucket.
Iterable<Result<Item>> results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build());
Iterable<Result<Item>> results;
try {
results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build());
} catch (Exception e) {
logger.error("Could not retrieve the list of objects of bucket \"" + bucketName + "\"!");
return;
}
int countDeletedFiles = 0;
int countFilesNotDeleted = 0;