- Refactor the scheduling of the "handleNewAssignments()" task. Spring already waits for the last task to get finished, before running the new one (unless Async is specifically enabled), so the "isAvailableForWork" didn't do anything (thus the bug described in a previous commit was never going to appear). Also, now we set to request the new assignments-batch immediately after the last one is finished (not after 15 mins), while dealing with potential continuous connection-errors.

- Avoid running the "deleteHandledAssignmentsFullTexts()" scheduled task on application's start.
- Optimize assignment of "requestUrl".
- Add clarity in the scheduled tasks, by using "fixedDelay" instead of "fixedRate", to signify that the time specified is counted right from the time the last task is finished (even though without enabling the "Async" there is no "danger" of running them in parallel).
- Code cleanup.
This commit is contained in:
Lampros Smyrnaios 2022-02-21 12:48:21 +02:00
parent 0d2f0b8b01
commit edbf6461d5
5 changed files with 18 additions and 54 deletions

View File

@ -1,6 +1,5 @@
package eu.openaire.urls_worker.components;
import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.controllers.FullTextsController;
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
import eu.openaire.urls_worker.util.AssignmentsHandler;
@ -13,7 +12,6 @@ import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Set;
@ -25,22 +23,24 @@ public class ScheduledTasks {
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
//@Scheduled(fixedRate = 600_000) // Every 10 mins: 600_000
public void reportCurrentTime() {
logger.info("Server is live! Time is now {}", dateFormat.format(new Date()));
}
@Scheduled(fixedRate = 900_000) // Every 15 mins: 900_000
@Scheduled(fixedDelay = 1) // Request the next batch immediately after the last one finishes.
public void handleNewAssignments() {
if ( AssignmentsHandler.isAvailableForWork )
AssignmentsHandler.handleAssignments();
else {
//logger.debug("The worker is not available for work at the moment.."); // JUST FOR DEBUG!
if ( AssignmentsHandler.hadConnectionErrorOnRequest ) {
try {
Thread.sleep(900_000); // Sleep for 15 mins to stall the scheduler from retrying right away, thus giving time to the Controller to recover.
} catch (InterruptedException ie) {
logger.warn("Sleeping was interrupted!");
} finally {
AssignmentsHandler.hadConnectionErrorOnRequest = false;
}
}
AssignmentsHandler.handleAssignments();
}
@Scheduled(fixedRate = 43_200_000) // Every 12 hours.
@Scheduled(fixedDelay = 43_200_000, initialDelay = 43_200_000) // Every 12 hours, after 12 hours from the start of this app.
public static void deleteHandledAssignmentsFullTexts()
{
Set<Map.Entry<Long, Boolean>> entrySet = FullTextsController.assignmentsNumsHandledAndLocallyDeleted.entrySet();
@ -73,11 +73,4 @@ public class ScheduledTasks {
}
}
//@Scheduled(fixedRate = 20_000) // Every 20 secs.
public void testUrlConnection() {
String urlToCheck = "https://zenodo.org/record/1145726";
PublicationsRetrieverPlugin.connectWithUrlTest(urlToCheck);
}
}

View File

@ -1,11 +1,7 @@
package eu.openaire.urls_worker.controllers;
import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.payloads.responces.WorkerResponse;
import eu.openaire.urls_worker.util.AssignmentsHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@ -34,21 +30,6 @@ public class GeneralController {
}
@GetMapping("isAvailableForWork")
public ResponseEntity<?> isWorkerAvailableForWork() {
logger.info("Received an \"isWorkerAvailableForWork\" request.");
if ( AssignmentsHandler.isAvailableForWork ) {
logger.info("The worker is available for an assignment.");
return ResponseEntity.status(200).body(new WorkerResponse(UrlsWorkerApplication.workerId, UrlsWorkerApplication.maxAssignmentsLimitPerBatch));
} else {
logger.info("The worker is busy with another assignment.");
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
}
}
@GetMapping("getHandledAssignmentsCounts")
public ResponseEntity<?> getHandledAssignmentsCounts()
{

View File

@ -6,7 +6,6 @@ import eu.openaire.publications_retriever.util.file.FileUtils;
import eu.openaire.publications_retriever.util.http.ConnSupportUtils;
import eu.openaire.publications_retriever.util.http.HttpConnUtils;
import eu.openaire.publications_retriever.util.url.DataToBeLogged;
import eu.openaire.publications_retriever.util.url.GenericUtils;
import eu.openaire.publications_retriever.util.url.LoaderAndChecker;
import eu.openaire.publications_retriever.util.url.UrlUtils;
import eu.openaire.urls_worker.UrlsWorkerApplication;

View File

@ -28,17 +28,18 @@ public class AssignmentsHandler {
private static final Logger logger = LoggerFactory.getLogger(AssignmentsHandler.class);
public static boolean isAvailableForWork = true;
public static List<UrlReport> urlReports = null;
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
public static Multimap<String, Assignment> assignmentsForPlugins = null;
private static final boolean askForTest = false; // Enable this only for testing.
private static String requestUrl;
private static final Duration requestConnectTimeoutDuration = Duration.ofMinutes(1); // 1 minute.
private static final Duration requestReadTimeoutDuration = Duration.ofMinutes(60); // 60 minutes. Time to wait for the data to get transferred over the network. Many workers may try to get assignments from the Worker, so each worker might have to wait some 10s of minutes for work.
// The controller has to retrieve the data from the database, then prepare them in memory, insert them in the "assignment"-table and, finally, return them to the worker.
public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build();
public static boolean hadConnectionErrorOnRequest = false;
public static long numHandledAssignmentsBatches = 0; // No need to be synchronized.
@ -51,12 +52,12 @@ public class AssignmentsHandler {
urlReports = new ArrayList<>(UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
int expectedAssignmentsPerDatasource = (UrlsWorkerApplication.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest);
assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
requestUrl = UrlsWorkerApplication.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
}
public static AssignmentsRequest requestAssignments()
{
String requestUrl = UrlsWorkerApplication.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
logger.info("Going to request assignments from the controller-server: " + requestUrl);
AssignmentsRequest assignmentRequest = null;
@ -64,6 +65,7 @@ public class AssignmentsHandler {
assignmentRequest = restTemplate.getForObject(requestUrl, AssignmentsRequest.class);
} catch (RestClientException rce) {
logger.error("Could not retrieve the assignments!\n" + rce.getMessage()); // It shows the response body (after Spring v.2.5.6).
hadConnectionErrorOnRequest = true;
return null;
}
@ -74,27 +76,20 @@ public class AssignmentsHandler {
public static void handleAssignments()
{
// Go request the assignments, the worker is busy. This may take more time than the scheduled time to request new assignments.
isAvailableForWork = false;
AssignmentsRequest assignmentsRequest = requestAssignments();
if ( assignmentsRequest == null ) {
isAvailableForWork = true; // Make sure it is "true" both in case of a "RestClientException" and in case of a "null" object.
if ( assignmentsRequest == null )
return;
}
Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter();
List<Assignment> assignments = assignmentsRequest.getAssignments();
if ( assignments == null ) {
logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter);
isAvailableForWork = true;
return;
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter);
isAvailableForWork = true;
return;
}
@ -145,7 +140,6 @@ public class AssignmentsHandler {
if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDomainAndPathTrackingData )
GenericUtils.clearDomainAndPathTrackingData();
if ( AssignmentsHandler.numHandledAssignmentsBatches == UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart )
{
logger.info("The maximum assignments-batches (" + UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart + ") to be handled was reached! Shut down, in order for the external Linux-service to restart on its own..");
@ -153,9 +147,6 @@ public class AssignmentsHandler {
// The "gentleAppShutdown()" will exit the app, so the "isAvailableForWork" will not be set below.
}
isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run.
// Also, since the worker has limited resources, it's better to finish sending the full-texts first and then request a new batch of assignments.
// Note: Cannot call this method, here, retrospectively, as if it runs 100s of times, the memory-stack may break..
// The scheduler will handle calling it every 15 mins, in case the Worker is available for work..
}