From 1ddfd34236adaf33e7d3c71ebc2c7f20aceda2eb Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Fri, 24 Dec 2021 00:12:34 +0200 Subject: [PATCH] - Allow the user to set a maximum number of assignments-batches for the Worker to handle. After handling those batches, the Worker will shut down. A number of < 0 > indicates an infinite number of batches. - Avoid converting the zero fileSize to < null >. Now, the default value is < null >, so the zero-value will indicate a zero-byte file. - Update dependencies. - Code cleanup. --- build.gradle | 2 +- gradle/wrapper/gradle-wrapper.properties | 2 +- installAndRun.sh | 2 +- .../urls_worker/UrlsWorkerApplication.java | 32 +++++++++++++++---- .../components/ScheduledTasks.java | 12 +++++-- .../plugins/PublicationsRetrieverPlugin.java | 9 ++---- .../urls_worker/util/AssignmentsHandler.java | 10 ++++-- 7 files changed, 49 insertions(+), 20 deletions(-) diff --git a/build.gradle b/build.gradle index e9c664c..1647769 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ plugins { - id 'org.springframework.boot' version '2.6.1' + id 'org.springframework.boot' version '2.6.2' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index d2880ba..2e6e589 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/installAndRun.sh b/installAndRun.sh index 07c6fae..97f1a22 100755 --- a/installAndRun.sh +++ b/installAndRun.sh @@ -28,7 +28,7 @@ if [[ ! -f $inputDataFile ]]; then echo -e "\n\n" fi -gradleVersion="7.3.2" +gradleVersion="7.3.3" if [[ justInstall -eq 0 ]]; then diff --git a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java index d282875..e29565a 100644 --- a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java +++ b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java @@ -12,6 +12,7 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.core.env.Environment; import org.springframework.scheduling.annotation.EnableScheduling; @@ -39,8 +40,11 @@ public class UrlsWorkerApplication { private static final String inputDataFilePath = FileUtils.workingDir + "inputData.txt"; public static String workerId = null; public static int maxAssignmentsLimitPerBatch = 0; + public static int maxAssignmentsBatchesToHandleBeforeRestart = -1; // Default value: -1 = argument-absent, 0 = infinite-batches public static String controllerBaseUrl = null; // BaseUrl template: "http://IP:PORT/api/" + private static ConfigurableApplicationContext context; + public static void main(String[] args) { @@ -48,7 +52,7 @@ public class UrlsWorkerApplication { new PublicationsRetrieverPlugin(); new AssignmentsHandler(); - SpringApplication.run(UrlsWorkerApplication.class, args); + context = SpringApplication.run(UrlsWorkerApplication.class, args); Runtime javaRuntime = Runtime.getRuntime(); logger.debug("HeapSize: " + javaRuntime.totalMemory()); @@ -57,12 +61,19 @@ public class UrlsWorkerApplication { } + public static void gentleShutdown() + { + int exitCode = SpringApplication.exit(context, () -> 0); // The "PreDestroy" method will be called. + System.exit(exitCode); + } + + @PreDestroy public static void preDestroy() { if ( PublicationsRetriever.executor != null ) { - logger.info("Shutting down the threads.."); + logger.info("Shutting down the threads used by \"PublicationsRetriever\"-plugin.."); PublicationsRetriever.executor.shutdown(); // Define that no new tasks will be scheduled. try { if ( !PublicationsRetriever.executor.awaitTermination(1, TimeUnit.MINUTES) ) { @@ -115,7 +126,7 @@ public class UrlsWorkerApplication { myReader = new Scanner(inputDataFile); if ( myReader.hasNextLine() ) { String[] data = myReader.nextLine().split(","); - if ( data.length < 3 ) { + if ( data.length < 4 ) { String errorMsg = "Not all data were retrieved from file \"" + inputDataFilePath + "\"!"; logger.error(errorMsg); System.err.println(errorMsg); @@ -129,7 +140,14 @@ public class UrlsWorkerApplication { logger.warn("The given \"maxAssignmentsLimitPerBatch\" (" + maxAssignmentsLimitStr + ") was not a number! Will use the default one: " + WorkerConstants.ASSIGNMENTS_LIMIT); maxAssignmentsLimitPerBatch = WorkerConstants.ASSIGNMENTS_LIMIT; } - controllerBaseUrl = data[2].trim(); + String maxAssignmentsBatchesStr = data[2].trim(); + try { + maxAssignmentsBatchesToHandleBeforeRestart = Integer.parseInt(maxAssignmentsBatchesStr); + } catch (NumberFormatException nfe) { + logger.warn("The given \"maxAssignmentsBatchesToHandleBeforeRestart\" (" + maxAssignmentsBatchesStr + ") was not a number! Will handle an infinite number of batches!"); + maxAssignmentsBatchesToHandleBeforeRestart = 0; + } + controllerBaseUrl = data[3].trim(); try { new URL(controllerBaseUrl); } catch (MalformedURLException mue) { @@ -142,14 +160,14 @@ public class UrlsWorkerApplication { controllerBaseUrl += "/"; // Make sure the other urls will not break later. } - if ( (workerId == null) || (maxAssignmentsLimitPerBatch == 0) || (controllerBaseUrl == null) ) { - String errorMsg = "No \"workerId\" or/and \"maxAssignmentsLimitPerBatch\" or/and \"controllerBaseUrl\" could be retrieved from the file: " + inputDataFilePath; + if ( (workerId == null) || (maxAssignmentsLimitPerBatch == 0) || (maxAssignmentsBatchesToHandleBeforeRestart == -1) || (controllerBaseUrl == null) ) { + String errorMsg = "No \"workerId\" or/and \"maxAssignmentsLimitPerBatch\" or/and \"maxAssignmentsBatchesToHandleBeforeRestart\" or/and \"controllerBaseUrl\" could be retrieved from the file: " + inputDataFilePath; logger.error(errorMsg); System.err.println(errorMsg); System.exit(63); } - logger.info("workerId: " + workerId + ", maxAssignmentsLimitPerBatch: " + maxAssignmentsLimitPerBatch + ", controllerBaseUrl: " + controllerBaseUrl); // It's safe and helpful to show them in the logs. + logger.info("workerId: " + workerId + ", maxAssignmentsLimitPerBatch: " + maxAssignmentsLimitPerBatch + ", maxAssignmentsBatchesToHandleBeforeRestart: " + maxAssignmentsBatchesToHandleBeforeRestart + ", controllerBaseUrl: " + controllerBaseUrl); // It's safe and helpful to show them in the logs. } catch (Exception e) { String errorMsg = "An error prevented the retrieval of the workerId and the controllerBaseUrl from the file: " + inputDataFilePath + "\n" + e.getMessage(); diff --git a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java index 5091e5d..dc515bd 100644 --- a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java @@ -1,5 +1,6 @@ package eu.openaire.urls_worker.components; +import eu.openaire.urls_worker.UrlsWorkerApplication; import eu.openaire.urls_worker.controllers.FullTextsController; import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin; import eu.openaire.urls_worker.util.AssignmentsHandler; @@ -32,8 +33,15 @@ public class ScheduledTasks { @Scheduled(fixedRate = 900_000) // Every 15 mins: 900_000 public void handleNewAssignments() { if ( AssignmentsHandler.isAvailableForWork ) - AssignmentsHandler.handleAssignments(); - else { + { + if ( (UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart == 0) // Infinite batches. + || (AssignmentsHandler.numHandledAssignmentsBatches < UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart) ) + AssignmentsHandler.handleAssignments(); + else { + logger.info("The maximum assignments-batches (" + UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart + ") to be handled was reached! Shut down, in order for the external Linux-service to restart on its own.."); + UrlsWorkerApplication.gentleShutdown(); + } + } else { //logger.debug("The worker is not available for work at the moment.."); // JUST FOR DEBUG! } } diff --git a/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java b/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java index b2e0b33..13ff694 100644 --- a/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java +++ b/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java @@ -151,7 +151,7 @@ public class PublicationsRetrieverPlugin { if ( "true".equals(data.getWasDocumentOrDatasetAccessible()) ) // The reversed order defends against a potential NPE. { status = UrlReport.StatusType.accessible; - if ( comment.contains(UrlUtils.alreadyDownloadedByIDMessage) ) { + if ( comment.startsWith(UrlUtils.alreadyDownloadedByIDMessage, 0) ) { // The file of this docUrl was already downloaded by another docUrl. String initialId = comment.substring(UrlUtils.alreadyDownloadedByIDMessage.length()); // The fileName starts right after the "message". //logger.debug("initialId: " + initialId); // DEBUG! @@ -171,7 +171,7 @@ public class PublicationsRetrieverPlugin { fileLocation = comment; // This is the full-file-path. mimeType = "application/pdf"; } else // Else the file was not retrieved, so all file-related data are kept "null". - error = new Error(Error.ErrorType.couldRetry, comment); // We can still try to download it in the future. + error = new Error(Error.ErrorType.couldRetry, comment); // We can still try to download it from the found docUrl, in the future. if ( error == null ) // If the file was retrieved, in any time. error = new Error(Error.ErrorType.couldRetry, null); // We do not want to send a "null" object, since it just adds more complicated handling in the controller.. @@ -188,10 +188,7 @@ public class PublicationsRetrieverPlugin { if ( docOrDatasetUrl.equals(UrlUtils.unreachableDocOrDatasetUrlIndicator) || docOrDatasetUrl.equals(UrlUtils.duplicateUrlIndicator) ) docOrDatasetUrl = null; - // Cleanup some data. - if ( (size != null) && (size == 0L) ) - size = null; - + // Convert "null" strings to actual < null > if ( (hash != null) && (hash.equals("null")) ) hash = null; diff --git a/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java b/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java index 16bf88a..b828a78 100644 --- a/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java +++ b/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java @@ -38,6 +38,8 @@ public class AssignmentsHandler { public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build(); + public static long numHandledAssignmentsBatches = 0; // No need to be synchronized. + public AssignmentsHandler() { @@ -124,6 +126,8 @@ public class AssignmentsHandler { else postWorkerReport(assignmentRequestCounter); + numHandledAssignmentsBatches ++; // This is used later to stop this app, when a user-defined upper limit is reached. + isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run. // Also, since the worker has limited resources, it's better to finish sending the full-texts first and then request a new batch of assignments. @@ -139,7 +143,10 @@ public class AssignmentsHandler { try { ResponseEntity responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class); int responseCode = responseEntity.getStatusCodeValue(); - if ( responseCode != HttpStatus.OK.value() ) { + if ( responseCode == HttpStatus.OK.value() ) { + logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, and the full-text delivering, were successful!"); + return true; + } else { logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode); return false; } @@ -150,7 +157,6 @@ public class AssignmentsHandler { urlReports.clear(); // Reset, without de-allocating. assignmentsForPlugins.clear(); } - return true; }