From 1048463ca09ed4d2afdf9b6f489a0692721595e5 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Mon, 11 Mar 2024 16:17:32 +0200 Subject: [PATCH] - Improve error-handling in "S3ObjectStore.emptyBucket()". - Change some log-levels. - Code polishing. --- .../components/ScheduledTasks.java | 9 ++++--- .../controllers/ShutdownController.java | 2 +- .../controllers/UrlsController.java | 7 +++--- .../services/UrlsServiceImpl.java | 19 +++++---------- .../urls_controller/util/FileUtils.java | 24 ++++++++++++------- .../util/ParquetFileUtils.java | 4 ++-- .../urls_controller/util/S3ObjectStore.java | 8 ++++++- 7 files changed, 38 insertions(+), 35 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index b71d9c9..2f69e95 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -204,7 +204,7 @@ public class ScheduledTasks { { // We make sure an initial delay of some minutes is in place before this is executed, since we have to make sure all workers are up and running in order for them to be able to answer the full-texts-requests. - if ( UrlsController.numOfWorkers.get() == 0 ) { + if ( UrlsController.numOfActiveWorkers.get() == 0 ) { long timeToWait = (isTestEnvironment ? 1_200_000 : 43_200_000); // 10 mins | 12 hours logger.warn("None of the workers is participating in the service, at the moment. Will wait " + ((timeToWait /1000) /60) + " minutes and try again.."); try { @@ -212,8 +212,8 @@ public class ScheduledTasks { } catch (InterruptedException ie) { logger.warn("The wait-period was interrupted! Will try either way."); } - if ( UrlsController.numOfWorkers.get() == 0 ) { - logger.error("None of the workers have participated in the service yet again. Will not process any leftover workerReports!"); + if ( UrlsController.numOfActiveWorkers.get() == 0 ) { + logger.error("None of the workers is participating in the service, yet again. Will not process any leftover workerReports!"); return; } } @@ -249,7 +249,6 @@ public class ScheduledTasks { } - @Scheduled(initialDelay = 259_200_000, fixedDelay = 259_200_000) // Run every 3 days. 3 days after startup. //@Scheduled(initialDelay = 1_200_000, fixedDelay = 1_200_000) // Just for testing (every 1200 secs). public void checkAndDeleteUnhandledAssignments() @@ -261,7 +260,7 @@ public class ScheduledTasks { // The assignments just remain in the table, and the urls cannot be rechecked. Calendar calendar = Calendar.getInstance(); - calendar.add(Calendar.DAY_OF_MONTH, - 3); // Subtract 3 from current Date. + calendar.add(Calendar.DAY_OF_MONTH, -3); // Subtract 3 from current Date. DatabaseConnector.databaseLock.lock(); urlsService.deleteAssignmentsWithOlderDate(calendar.getTimeInMillis()); // Any error-log is written inside. diff --git a/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java b/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java index 5b47ee1..11b3c62 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java @@ -153,7 +153,7 @@ public class ShutdownController { logger.info(initMsg); workerInfo.setHasShutdown(true); // This will update the map. - UrlsController.numOfWorkers.decrementAndGet(); + UrlsController.numOfActiveWorkers.decrementAndGet(); // Return "HTTP-OK" to this worker. If this was part of a shutdown-service request, then wait for the scheduler to check and shutdown the service. return ResponseEntity.ok().build(); diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index dce0249..5c22872 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -45,8 +45,7 @@ public class UrlsController { public static final ConcurrentHashMap workersInfoMap = new ConcurrentHashMap<>(6); - public static final AtomicInteger numOfWorkers = new AtomicInteger(0); - + public static final AtomicInteger numOfActiveWorkers = new AtomicInteger(0); public static ExecutorService backgroundExecutor; @@ -121,12 +120,12 @@ public class UrlsController { if ( workerInfo.getHasShutdown() ) { logger.info("The worker with id \"" + workerId + "\" was restarted."); workerInfo.setHasShutdown(false); - numOfWorkers.incrementAndGet(); + numOfActiveWorkers.incrementAndGet(); } } else { logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "] in memory."); workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false)); - numOfWorkers.incrementAndGet(); + numOfActiveWorkers.incrementAndGet(); } return urlsService.getAssignments(workerId, assignmentsLimit); diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 56908e6..8ebdafd 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -100,15 +100,8 @@ public class UrlsServiceImpl implements UrlsService { // Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern: // ("ID_1", "ID_2", ...) - final StringBuilder sb = new StringBuilder((exclusionListSize * 46) + (exclusionListSize -1) +2 ); - sb.append("("); - for ( int i=0; i < exclusionListSize; ++i ) { - sb.append("\"").append(excludedIDs.get(i)).append("\""); - if ( i < (exclusionListSize -1) ) - sb.append(", "); - } - sb.append(")"); - excludedDatasourceIDsStringList = sb.toString(); + int stringBuilderCapacity = ((exclusionListSize * 46) + (exclusionListSize -1) +2); + excludedDatasourceIDsStringList = FileUtils.getQueryListString(excludedIDs, exclusionListSize, stringBuilderCapacity); logger.info("The following bulkImport data-sources will be excluded from crawling: " + excludedDatasourceIDsStringList); } @@ -289,7 +282,7 @@ public class UrlsServiceImpl implements UrlsService { // Nothing to post to the Worker, since we do not have the worker's info. // Rename the worker-report-file to indicate its "failure", so that the scheduler can pick it up and retry processing it. String workerReportBaseName = this.workerReportsDirPath + File.separator + curWorkerId + File.separator + curWorkerId + "_assignments_" + curReportAssignmentsCounter + "_report"; - getRenamedWorkerReport(workerReportBaseName, new File(workerReportBaseName + ".json"), "No info was found for worker: " + curWorkerId); // It may return null. + renameAndGetWorkerReportFile(workerReportBaseName, new File(workerReportBaseName + ".json"), "No info was found for worker: " + curWorkerId); // It may return null. return false; } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) { postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Problem with the Impala-database!"); @@ -421,7 +414,7 @@ public class UrlsServiceImpl implements UrlsService { } // For every "numOfWorkers" assignment-batches that get processed, we merge the "attempts" and "payload_aggregated" tables. - if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) // The workersNum will not be zero! + if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfActiveWorkers.get()) == 0 ) // The workersNum will not be zero! if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, hasFulltexts) ) // The "postReportResultToWorker()" was called inside. return false; @@ -538,7 +531,7 @@ public class UrlsServiceImpl implements UrlsService { // Rename the worker-report to indicate success or failure. String workerReportBaseName = this.workerReportsDirPath + File.separator + workerId + File.separator + workerId + "_assignments_" + assignmentRequestCounter + "_report"; File workerReport = new File(workerReportBaseName + ".json"); - File renamedWorkerReport = getRenamedWorkerReport(workerReportBaseName, workerReport, errorMsg); // It may return null. + File renamedWorkerReport = renameAndGetWorkerReportFile(workerReportBaseName, workerReport, errorMsg); // It may return null. // Get the IP of this worker. WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); @@ -580,7 +573,7 @@ public class UrlsServiceImpl implements UrlsService { } - private static File getRenamedWorkerReport(String workerReportBaseName, File workerReport, String errorMsg) + private static File renameAndGetWorkerReportFile(String workerReportBaseName, File workerReport, String errorMsg) { File renamedWorkerReport = null; try { diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 58551ca..8f9c1a3 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -779,14 +779,7 @@ public class FileUtils { // Prepare the "urlsToRetrieveRelatedIDs" to be used inside the "getDataForPayloadPrefillQuery". Create the following string-pattern: ("URL_1", "URL_2", ...) int urlsToRetrieveRelatedIDsSize = urlsToRetrieveRelatedIDs.size(); - StringBuilder relatedIDsStringBuilder = new StringBuilder(urlsToRetrieveRelatedIDsSize * 100); - relatedIDsStringBuilder.append("("); - for ( int i=0; i < urlsToRetrieveRelatedIDsSize; ++i ) { - relatedIDsStringBuilder.append("\"").append(urlsToRetrieveRelatedIDs.get(i)).append("\""); - if ( i < (urlsToRetrieveRelatedIDsSize -1) ) - relatedIDsStringBuilder.append(", "); - } - relatedIDsStringBuilder.append(")"); + int stringBuilderCapacity = (urlsToRetrieveRelatedIDsSize * 100); // Get the id and url of any String getDataForPayloadPrefillQuery = "select distinct pu.id, pu.url\n" + @@ -794,7 +787,7 @@ public class FileUtils { "left anti join " + DatabaseConnector.databaseName + ".attempt a on a.id=pu.id and a.original_url=pu.url\n" + "left anti join " + DatabaseConnector.databaseName + ".payload p on p.id=pu.id and p.original_url=pu.url\n" + "left anti join " + DatabaseConnector.databaseName + ".assignment asgn on asgn.id=pu.id and asgn.original_url=pu.url\n" + - "where pu.url in " + relatedIDsStringBuilder; + "where pu.url in " + getQueryListString(urlsToRetrieveRelatedIDs, urlsToRetrieveRelatedIDsSize, stringBuilderCapacity); //logger.trace("getDataForPayloadPrefillQuery:\n" + getDataForPayloadPrefillQuery); @@ -914,4 +907,17 @@ public class FileUtils { return null; } + + public static String getQueryListString(List list, int fileHashesListSize, int stringBuilderCapacity) { + StringBuilder sb = new StringBuilder(stringBuilderCapacity); + sb.append("("); + for ( int i=0; i < fileHashesListSize; ++i ) { + sb.append("\"").append(list.get(i)).append("\""); + if ( i < (fileHashesListSize -1) ) + sb.append(", "); + } + sb.append(")"); + return sb.toString(); + } + } diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index 2a41e97..7f10fb5 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -269,7 +269,7 @@ public class ParquetFileUtils { int recordsSize = recordList.size(); if ( recordsSize == 0 ) { - logger.warn("No attempts are available to be inserted to the database!"); + logger.error("No attempts are available to be inserted to the database!"); // This should have been caught earlier. return false; } @@ -319,7 +319,7 @@ public class ParquetFileUtils { int recordsSize = recordList.size(); if ( recordsSize == 0 ) { - logger.warn("No payloads are available to be inserted to the database!"); + logger.error("No payloads are available to be inserted to the database!"); // This should have been caught earlier. return false; } diff --git a/src/main/java/eu/openaire/urls_controller/util/S3ObjectStore.java b/src/main/java/eu/openaire/urls_controller/util/S3ObjectStore.java index 1326074..568f43e 100644 --- a/src/main/java/eu/openaire/urls_controller/util/S3ObjectStore.java +++ b/src/main/java/eu/openaire/urls_controller/util/S3ObjectStore.java @@ -122,7 +122,13 @@ public class S3ObjectStore { logger.warn("Going to " + (shouldDeleteBucket ? "delete" : "empty") + " bucket \"" + bucketName + "\"!"); // First list the objects of the bucket. - Iterable> results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build()); + Iterable> results; + try { + results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build()); + } catch (Exception e) { + logger.error("Could not retrieve the list of objects of bucket \"" + bucketName + "\"!"); + return; + } int countDeletedFiles = 0; int countFilesNotDeleted = 0;