From 2e60128084f9c7f4f341d1a324f83daa1985706f Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Tue, 19 Dec 2023 23:31:42 +0200 Subject: [PATCH] - Allow to easily change the por used by workers. - Show the number of active background-tasks and bulkImportDirs, which delay the Service's shutdown. - Update dependencies. - Code polishing. --- build.gradle | 4 ++-- gradle/wrapper/gradle-wrapper.properties | 2 +- installAndRun.sh | 2 +- .../urls_controller/UrlsControllerApplication.java | 1 + .../urls_controller/components/ScheduledTasks.java | 14 ++++++++++---- .../controllers/UrlsController.java | 2 +- .../services/ShutdownServiceImpl.java | 9 +++++++-- .../urls_controller/services/StatsServiceImpl.java | 1 + .../urls_controller/services/UrlsServiceImpl.java | 5 ++++- .../openaire/urls_controller/util/FileUtils.java | 6 +++++- src/main/resources/application.yml | 3 ++- 11 files changed, 35 insertions(+), 14 deletions(-) diff --git a/build.gradle b/build.gradle index 85271a0..a903f4d 100644 --- a/build.gradle +++ b/build.gradle @@ -52,7 +52,7 @@ dependencies { implementation("org.apache.commons:commons-compress:1.25.0") { exclude group: 'com.github.luben', module: 'zstd-jni' } - implementation 'com.github.luben:zstd-jni:1.5.5-10' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often. + implementation 'com.github.luben:zstd-jni:1.5.5-11' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often. implementation 'io.minio:minio:8.5.7' @@ -120,7 +120,7 @@ dependencies { // https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus - runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.12.0' + runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.12.1' testImplementation 'org.springframework.security:spring-security-test' testImplementation "org.springframework.boot:spring-boot-starter-test" diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 3fa8f86..1af9e09 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/installAndRun.sh b/installAndRun.sh index 1ceec35..431e833 100755 --- a/installAndRun.sh +++ b/installAndRun.sh @@ -26,7 +26,7 @@ if [[ justRun -eq 1 && shouldRunInDocker -eq 1 ]]; then justRun=0 fi -gradleVersion="8.4" +gradleVersion="8.5" if [[ justRun -eq 0 ]]; then diff --git a/src/main/java/eu/openaire/urls_controller/UrlsControllerApplication.java b/src/main/java/eu/openaire/urls_controller/UrlsControllerApplication.java index 4b6c4b7..c56c616 100644 --- a/src/main/java/eu/openaire/urls_controller/UrlsControllerApplication.java +++ b/src/main/java/eu/openaire/urls_controller/UrlsControllerApplication.java @@ -60,6 +60,7 @@ public class UrlsControllerApplication { public void gentleAppShutdown() { + logger.info("Shutting down the app.."); shutdownThreads(); int exitCode = 0; try { diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index 24ff7cb..5b8c3e5 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -129,7 +129,7 @@ public class ScheduledTasks { logger.error("IOOBE for background task_" + i + " in the futures-list! " + ioobe.getMessage()); // Only here, the "future" will be null. } finally { - if ( future != null ) + if ( future != null ) // It may be null in case we have a IOBE. futuresToDelete.add(future); // Do not delete them directly here, as the indexes will get messed up and we will get "IOOBE". } } @@ -152,8 +152,11 @@ public class ScheduledTasks { return; // Either the service was never instructed to shut down, or the user canceled the request. // Check whether there are still background tasks to be processed. Either workerReport or Bulk-import requests. - if ( UrlsController.futuresOfBackgroundTasks.size() > 0 ) + int numOfFutures = UrlsController.futuresOfBackgroundTasks.size(); + if ( numOfFutures > 0 ) { + logger.debug("There are still " + numOfFutures + " backgroundTasks waiting to be executed or have their status checked.."); return; + } // Here, the above may have given a result of < 0 >, but a new task may be asked for execution right next and still await for execution.. // The crawling-jobs can be safely finish, by avoiding to shut-down as long as at least one worker is still running (waiting for the Controller to verify that the assignments-batch is completed). @@ -161,8 +164,11 @@ public class ScheduledTasks { // So the Controller will now shut down if either of takes-types have not finished. // Check whether there are any active bulk-import procedures. - if ( BulkImportController.bulkImportDirsUnderProcessing.size() > 0 ) + int numOfBulkImportDirsUnderProcessing = BulkImportController.bulkImportDirsUnderProcessing.size(); + if ( numOfBulkImportDirsUnderProcessing > 0 ) { + logger.debug("There are still " + numOfBulkImportDirsUnderProcessing + " bulkImportDirsUnderProcessing.."); return; + } // Check whether the workers have not shutdown yet, which means that they either crawl assignments or/and they are waiting for the Controller to process the WorkerReport and then shutdown. Set workerIds = UrlsController.workersInfoMap.keySet(); @@ -249,7 +255,7 @@ public class ScheduledTasks { // The assignments just remain in the table, and the urls cannot be rechecked. Calendar calendar = Calendar.getInstance(); - calendar.add(Calendar.DAY_OF_MONTH, - 3); // Subtract from current Date. + calendar.add(Calendar.DAY_OF_MONTH, - 3); // Subtract 3 from current Date. DatabaseConnector.databaseLock.lock(); urlsService.deleteAssignmentsWithOlderDate(calendar.getTimeInMillis()); // Any error-log is written inside. diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index 31f264d..dce0249 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -96,7 +96,7 @@ public class UrlsController { if ( ShutdownController.shouldShutdownService ) { // There might be the case that the Controller has not sent shutDown requests to the Workers yet, or it has, BUT: - // 1) A worker requests for new assignments before the shutDown request is handled by its side. + // 1) A worker requests for new assignments, before it can handle the shutDown request given to it. // 2) A new Worker joins the Service (unexpected, but anyway). String warnMsg = "The Service is about to shutdown, after all under-processing assignments and/or bulkImport requests are handled. No new requests are accepted!"; logger.warn(warnMsg); // It's likely not an actual error, but still it's not accepted. diff --git a/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java index 85e8c51..19fefde 100644 --- a/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java @@ -5,6 +5,7 @@ import eu.openaire.urls_controller.models.WorkerInfo; import eu.openaire.urls_controller.util.UriBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; @@ -25,6 +26,10 @@ public class ShutdownServiceImpl implements ShutdownService { private static final Pattern PRIVATE_IP_ADDRESSES_RFC_1918 = Pattern.compile("(?:10.|172.(?:1[6-9]|2[0-9]|3[0-1])|192.168.)[0-9.]+"); + @Value("${services.pdfaggregation.worker.port}") + private String workerPort; + + public ResponseEntity passSecurityChecks(String remoteAddr, String initMsg) { // In case the Controller is running inside a docker container, and we want to send the "shutdownServiceRequest" from the terminal (with curl), without entering inside the container, @@ -45,7 +50,7 @@ public class ShutdownServiceImpl implements ShutdownService { if ( ! workerInfo.getHasShutdown() ) // A worker may have shutdown on its own (by sending it a shutDown request manually), so it will have told the Controller when it shut down. In case of a Worker-crash, the Controller will not know about it. postShutdownOrCancelRequestToWorker(workerId, workerInfo.getWorkerIP(), shouldCancel); else - logger.warn("Will not post " + (shouldCancel ? "Cancel-" : "") + " ShutdownRequest to Worker \"" + workerId + "\", since is it has already shutdown."); + logger.warn("Will not post " + (shouldCancel ? "Cancel-" : "") + "ShutdownRequest to Worker \"" + workerId + "\", since is it has already shutdown."); } } @@ -54,7 +59,7 @@ public class ShutdownServiceImpl implements ShutdownService { public boolean postShutdownOrCancelRequestToWorker(String workerId, String workerIp, boolean shouldCancel) { - String url = "http://" + workerIp + ":1881/api/" + (shouldCancel ? "cancelShutdownWorker" : "shutdownWorker"); + String url = "http://" + workerIp + ":" + workerPort + "/api/" + (shouldCancel ? "cancelShutdownWorker" : "shutdownWorker"); try { ResponseEntity responseEntity = restTemplate.postForEntity(url, null, String.class); int responseCode = responseEntity.getStatusCodeValue(); diff --git a/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java index 4142811..e5fea56 100644 --- a/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java @@ -93,6 +93,7 @@ public class StatsServiceImpl implements StatsService { } } + // To get the human-friendly timestamp format from the BigInt in the database: // select from_timestamp(CAST(CAST(`date` as decimal(30,0))/1000 AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload // Or simpler: select from_timestamp(CAST((`date`/1000) AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index a686835..6b5eaf3 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -58,6 +58,9 @@ public class UrlsServiceImpl implements UrlsService { @Value("${services.pdfaggregation.controller.workerReportsDirPath}") private String workerReportsDirPath; + @Value("${services.pdfaggregation.worker.port}") + private String workerPort; + public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); private final AtomicInteger maxAttemptsPerRecordAtomic; @@ -534,7 +537,7 @@ public class UrlsServiceImpl implements UrlsService { logger.error("Could not find any info for worker with id: \"" + workerId +"\"."); return false; } - String url = "http://" + workerInfo.getWorkerIP() + ":1881/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will NOT be null. + String url = "http://" + workerInfo.getWorkerIP() + ":" + workerPort + "/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will NOT be null. if ( logger.isTraceEnabled() ) logger.trace("Going to \"postReportResultToWorker\": \"" + workerId + "\", for assignments_" + assignmentRequestCounter + ((errorMsg != null) ? "\nError: " + errorMsg : "")); diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index b1ad81d..d098c68 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -61,6 +61,10 @@ public class FileUtils { @Autowired private FileDecompressor fileDecompressor; + @Value("${services.pdfaggregation.worker.port}") + private String workerPort; + + public enum UploadFullTextsResponse {successful, unsuccessful, databaseError} @@ -326,7 +330,7 @@ public class FileUtils { logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each)."); // Check if one full text is left out because of the division. Put it int the last batch. - String baseUrl = "http://" + workerIp + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; + String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; // TODO - The worker should send the port in which it accepts requests, along with the current request. // TODO - The least we have to do it to expose the port-assignment somewhere more obvious like inside the "application.yml" file. diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4527b7f..694fb5f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -34,7 +34,8 @@ services: bucketName: XA shouldEmptyBucket: false shouldShowAllS3Buckets: true - + worker: + port: 1881 bulk-import: baseBulkImportLocation: /mnt/bulk_import/