Bug fixes:

- Fix a bug, where, in case it took too long to get the assignments from the Controller (possible when there are too many workers requesting at the same time or if the database is responding slowly), the Worker's scheduler would request for new assignments, in the meantime.
- Fix a bug, where, if the "maxAssignmentsBatchesToHandleBeforeRestart" was set, the Worker's scheduler could request another batch, right before the Worker was about to shut down.
- Fix a bug, where the condition of when to clear the over-sized data-structures was based on the "assignmentRequestCounter" send by the Controller (which is increased on each request by any worker and not for each individual one), and not on the "numHandledAssignmentsBatches" kept by each individual worker. This would result in much earlier cleanup, relative to the number of the Workers.
This commit is contained in:
Lampros Smyrnaios 2022-02-19 17:09:02 +02:00
parent 3d1faf4a8a
commit b63ad87d00
2 changed files with 29 additions and 21 deletions

View File

@ -35,9 +35,6 @@ public class PublicationsRetrieverPlugin {
public static String assignmentsBasePath; public static String assignmentsBasePath;
public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 10_000_000;
public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 1_000_000;
public PublicationsRetrieverPlugin() { public PublicationsRetrieverPlugin() {
// Specify some configurations // Specify some configurations
@ -57,7 +54,7 @@ public class PublicationsRetrieverPlugin {
LoaderAndChecker.setCouldRetryRegex(); LoaderAndChecker.setCouldRetryRegex();
PublicationsRetriever.threadsMultiplier = 4; PublicationsRetriever.threadsMultiplier = 4;
int workerThreadsCount = Runtime.getRuntime().availableProcessors() * PublicationsRetriever.threadsMultiplier; int workerThreadsCount = (Runtime.getRuntime().availableProcessors() * PublicationsRetriever.threadsMultiplier);
logger.info("Use " + workerThreadsCount + " worker-threads."); logger.info("Use " + workerThreadsCount + " worker-threads.");
PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount); PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount);
} }
@ -151,15 +148,6 @@ public class PublicationsRetrieverPlugin {
UrlUtils.docOrDatasetUrlsWithIDs.clear(); // This HashTable is useful only for a single assignments-batch. UrlUtils.docOrDatasetUrlsWithIDs.clear(); // This HashTable is useful only for a single assignments-batch.
// In the next batch, the previously stored files might have been already uploaded by the Controller and deleted by the worker. Also, they will be stored in a different directory anyway. // In the next batch, the previously stored files might have been already uploaded by the Controller and deleted by the worker. Also, they will be stored in a different directory anyway.
// Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program.
// This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine.
long idUrlPairsHandled = (assignmentRequestCounter * UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDuplicateUrlsData )
UrlUtils.duplicateUrls.clear();
if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDomainAndPathTrackingData )
GenericUtils.clearDomainAndPathTrackingData();
} }

View File

@ -2,6 +2,8 @@ package eu.openaire.urls_worker.util;
import com.google.common.collect.HashMultimap; import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import eu.openaire.publications_retriever.util.url.GenericUtils;
import eu.openaire.publications_retriever.util.url.UrlUtils;
import eu.openaire.urls_worker.UrlsWorkerApplication; import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.models.Assignment; import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.UrlReport; import eu.openaire.urls_worker.models.UrlReport;
@ -40,6 +42,9 @@ public class AssignmentsHandler {
public static long numHandledAssignmentsBatches = 0; // No need to be synchronized. public static long numHandledAssignmentsBatches = 0; // No need to be synchronized.
public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 10_000_000;
public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 1_000_000;
public AssignmentsHandler() public AssignmentsHandler()
{ {
@ -51,6 +56,9 @@ public class AssignmentsHandler {
public static AssignmentsRequest requestAssignments() public static AssignmentsRequest requestAssignments()
{ {
// Go request the assignments, the worker is busy. This may take more time than the scheduled time to request new assignments.
isAvailableForWork = false;
String requestUrl = UrlsWorkerApplication.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch; String requestUrl = UrlsWorkerApplication.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
logger.info("Going to request assignments from the controller-server: " + requestUrl); logger.info("Going to request assignments from the controller-server: " + requestUrl);
@ -70,27 +78,28 @@ public class AssignmentsHandler {
public static void handleAssignments() public static void handleAssignments()
{ {
AssignmentsRequest assignmentsRequest = requestAssignments(); AssignmentsRequest assignmentsRequest = requestAssignments();
if ( assignmentsRequest == null ) if ( assignmentsRequest == null ) {
isAvailableForWork = true; // Make sure it is "true" both in case of a "RestClientException" and in case of a "null" object.
return; return;
}
Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter(); Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter();
List<Assignment> assignments = assignmentsRequest.getAssignments(); List<Assignment> assignments = assignmentsRequest.getAssignments();
if ( assignments == null ) { if ( assignments == null ) {
logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter); logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter);
isAvailableForWork = true;
return; return;
} }
int assignmentsSize = assignments.size(); int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) { if ( assignmentsSize == 0 ) {
logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter); logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter);
isAvailableForWork = true;
return; return;
} }
logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " tasks."); logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " tasks.");
// Start handling the assignments, the worker is busy.
isAvailableForWork = false;
// Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later. // Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later.
for ( Assignment assignment : assignments ) { for ( Assignment assignment : assignments ) {
@ -127,17 +136,28 @@ public class AssignmentsHandler {
numHandledAssignmentsBatches ++; // This is used later to stop this app, when a user-defined upper limit is reached. numHandledAssignmentsBatches ++; // This is used later to stop this app, when a user-defined upper limit is reached.
isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run. // Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program.
// Also, since the worker has limited resources, it's better to finish sending the full-texts first and then request a new batch of assignments. // This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine.
long idUrlPairsHandled = (numHandledAssignmentsBatches * UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDuplicateUrlsData )
UrlUtils.duplicateUrls.clear();
if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDomainAndPathTrackingData )
GenericUtils.clearDomainAndPathTrackingData();
// Note: Cannot call this method here retrospectively, as if it runs 100s of times, the memory-stack may break..
// The scheduler will handle calling it every 15 mins, in case the Worker is available for work..
if ( AssignmentsHandler.numHandledAssignmentsBatches == UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart ) if ( AssignmentsHandler.numHandledAssignmentsBatches == UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart )
{ {
logger.info("The maximum assignments-batches (" + UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart + ") to be handled was reached! Shut down, in order for the external Linux-service to restart on its own.."); logger.info("The maximum assignments-batches (" + UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart + ") to be handled was reached! Shut down, in order for the external Linux-service to restart on its own..");
UrlsWorkerApplication.gentleAppShutdown(); UrlsWorkerApplication.gentleAppShutdown();
// The "gentleAppShutdown()" will exit the app, so the "isAvailableForWork" will not be set below.
} }
isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run.
// Also, since the worker has limited resources, it's better to finish sending the full-texts first and then request a new batch of assignments.
// Note: Cannot call this method, here, retrospectively, as if it runs 100s of times, the memory-stack may break..
// The scheduler will handle calling it every 15 mins, in case the Worker is available for work..
} }