diff --git a/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java b/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java index ddaeacf..741f9a1 100644 --- a/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java +++ b/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java @@ -35,9 +35,6 @@ public class PublicationsRetrieverPlugin { public static String assignmentsBasePath; - public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 10_000_000; - public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 1_000_000; - public PublicationsRetrieverPlugin() { // Specify some configurations @@ -57,7 +54,7 @@ public class PublicationsRetrieverPlugin { LoaderAndChecker.setCouldRetryRegex(); PublicationsRetriever.threadsMultiplier = 4; - int workerThreadsCount = Runtime.getRuntime().availableProcessors() * PublicationsRetriever.threadsMultiplier; + int workerThreadsCount = (Runtime.getRuntime().availableProcessors() * PublicationsRetriever.threadsMultiplier); logger.info("Use " + workerThreadsCount + " worker-threads."); PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount); } @@ -151,15 +148,6 @@ public class PublicationsRetrieverPlugin { UrlUtils.docOrDatasetUrlsWithIDs.clear(); // This HashTable is useful only for a single assignments-batch. // In the next batch, the previously stored files might have been already uploaded by the Controller and deleted by the worker. Also, they will be stored in a different directory anyway. - - // Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program. - // This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine. - long idUrlPairsHandled = (assignmentRequestCounter * UrlsWorkerApplication.maxAssignmentsLimitPerBatch); - if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDuplicateUrlsData ) - UrlUtils.duplicateUrls.clear(); - - if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDomainAndPathTrackingData ) - GenericUtils.clearDomainAndPathTrackingData(); } diff --git a/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java b/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java index b722f27..8c2cdef 100644 --- a/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java +++ b/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java @@ -2,6 +2,8 @@ package eu.openaire.urls_worker.util; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import eu.openaire.publications_retriever.util.url.GenericUtils; +import eu.openaire.publications_retriever.util.url.UrlUtils; import eu.openaire.urls_worker.UrlsWorkerApplication; import eu.openaire.urls_worker.models.Assignment; import eu.openaire.urls_worker.models.UrlReport; @@ -40,6 +42,9 @@ public class AssignmentsHandler { public static long numHandledAssignmentsBatches = 0; // No need to be synchronized. + public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 10_000_000; + public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 1_000_000; + public AssignmentsHandler() { @@ -51,6 +56,9 @@ public class AssignmentsHandler { public static AssignmentsRequest requestAssignments() { + // Go request the assignments, the worker is busy. This may take more time than the scheduled time to request new assignments. + isAvailableForWork = false; + String requestUrl = UrlsWorkerApplication.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch; logger.info("Going to request assignments from the controller-server: " + requestUrl); @@ -70,27 +78,28 @@ public class AssignmentsHandler { public static void handleAssignments() { AssignmentsRequest assignmentsRequest = requestAssignments(); - if ( assignmentsRequest == null ) + if ( assignmentsRequest == null ) { + isAvailableForWork = true; // Make sure it is "true" both in case of a "RestClientException" and in case of a "null" object. return; + } Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter(); List assignments = assignmentsRequest.getAssignments(); if ( assignments == null ) { logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter); + isAvailableForWork = true; return; } int assignmentsSize = assignments.size(); if ( assignmentsSize == 0 ) { logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter); + isAvailableForWork = true; return; } logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " tasks."); - // Start handling the assignments, the worker is busy. - isAvailableForWork = false; - // Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later. for ( Assignment assignment : assignments ) { @@ -127,17 +136,28 @@ public class AssignmentsHandler { numHandledAssignmentsBatches ++; // This is used later to stop this app, when a user-defined upper limit is reached. - isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run. - // Also, since the worker has limited resources, it's better to finish sending the full-texts first and then request a new batch of assignments. + // Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program. + // This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine. + long idUrlPairsHandled = (numHandledAssignmentsBatches * UrlsWorkerApplication.maxAssignmentsLimitPerBatch); + if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDuplicateUrlsData ) + UrlUtils.duplicateUrls.clear(); + + if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDomainAndPathTrackingData ) + GenericUtils.clearDomainAndPathTrackingData(); - // Note: Cannot call this method here retrospectively, as if it runs 100s of times, the memory-stack may break.. - // The scheduler will handle calling it every 15 mins, in case the Worker is available for work.. if ( AssignmentsHandler.numHandledAssignmentsBatches == UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart ) { logger.info("The maximum assignments-batches (" + UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart + ") to be handled was reached! Shut down, in order for the external Linux-service to restart on its own.."); UrlsWorkerApplication.gentleAppShutdown(); + // The "gentleAppShutdown()" will exit the app, so the "isAvailableForWork" will not be set below. } + + isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run. + // Also, since the worker has limited resources, it's better to finish sending the full-texts first and then request a new batch of assignments. + + // Note: Cannot call this method, here, retrospectively, as if it runs 100s of times, the memory-stack may break.. + // The scheduler will handle calling it every 15 mins, in case the Worker is available for work.. }