You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
UrlsWorker/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java

182 lines
9.4 KiB
Java

package eu.openaire.urls_worker.util;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.components.ScheduledTasks;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.payloads.requests.AssignmentsRequest;
import eu.openaire.urls_worker.payloads.responces.WorkerReport;
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class AssignmentsHandler {
private static final Logger logger = LoggerFactory.getLogger(AssignmentsHandler.class);
public static boolean isAvailableForWork = true;
public static List<UrlReport> urlReports = null;
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
public static Multimap<String, Assignment> assignmentsForPlugins = null;
private static final boolean askForTest = false; // Enable this only for testing.
private static final Duration requestConnectTimeoutDuration = Duration.ofMinutes(1); // 1 minute.
private static final Duration requestReadTimeoutDuration = Duration.ofMinutes(60); // 60 minutes. Time to wait for the data to get transferred over the network. Many workers may try to get assignments from the Worker, so each worker might have to wait some 10s of minutes for work.
// The controller has to retrieve the data from the database, then prepare them in memory, insert them in the "assignment"-table and, finally, return them to the worker.
public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build();
public static long numHandledAssignmentsBatches = 0; // No need to be synchronized.
public AssignmentsHandler()
{
urlReports = new ArrayList<>(UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
int expectedAssignmentsPerDatasource = (UrlsWorkerApplication.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest);
assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
}
public static AssignmentsRequest requestAssignments()
{
String requestUrl = UrlsWorkerApplication.controllerBaseUrl + "urls" + (askForTest ? "/test" : "") + "?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
logger.info("Going to request assignments from the controller-server: " + requestUrl);
AssignmentsRequest assignmentRequest = null;
try { // Here, the HTTP-request is executed.
assignmentRequest = restTemplate.getForObject(requestUrl, AssignmentsRequest.class);
} catch (RestClientException rce) {
logger.error("Could not retrieve the assignments!\n" + rce.getMessage()); // It shows the response body (after Spring v.2.5.6).
return null;
}
//logger.debug(assignmentRequest.toString()); // DEBUG!
return assignmentRequest;
}
public static void handleAssignments()
{
AssignmentsRequest assignmentsRequest = requestAssignments();
if ( assignmentsRequest == null )
return;
Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter();
List<Assignment> assignments = assignmentsRequest.getAssignments();
if ( assignments == null ) {
logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter);
return;
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter);
return;
}
logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " tasks.");
// Start handling the assignments, the worker is busy.
isAvailableForWork = false;
// Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later.
for ( Assignment assignment : assignments ) {
// Add each task in its own HashSet.
try {
assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment);
} catch (NullPointerException npe) {
logger.warn("An NPE was thrown when splitting the assignments based on the datasource-types. The assignment was: " + assignment); // Do not use "assignment.toString()", it may cause an NPE.
}
}
//countDatasourcesAndRecords(assignmentsSize); // Only for DEBUG! Keep it commented in normal run.
// TODO - Decide which tasks run with what plugin (depending on their datasource).
// First run -in parallel- the tasks which require some specific plugin.
// Then run the remaining tasks in the generic plugin (which handles parallelism itself).
// For now, let's just run all tasks in the generic plugin.
try {
PublicationsRetrieverPlugin.processAssignments(assignmentRequestCounter, assignmentsForPlugins.values());
} catch (Exception e) {
logger.error("Exception when processing the assignments_" + assignmentRequestCounter, e);
} // In this case, we will either have an empty WorkerReport or a half-filled one. Either way, we want to report back to the Controller.
if ( askForTest ) {
logger.debug("UrlReports:"); // DEBUG!
for ( UrlReport urlReport : urlReports )
logger.debug(urlReport.toString());
} // Avoid posting the results in "askForTestUrls"-mode. We don't want for test-results to be written into the database by the controller.
else
postWorkerReport(assignmentRequestCounter);
numHandledAssignmentsBatches ++; // This is used later to stop this app, when a user-defined upper limit is reached.
isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run.
// Also, since the worker has limited resources, it's better to finish sending the full-texts first and then request a new batch of assignments.
// Note: Cannot call this method here retrospectively, as if it runs 100s of times, the memory-stack may break..
// The scheduler will handle calling it every 15 mins, in case the Worker is available for work..
if ( AssignmentsHandler.numHandledAssignmentsBatches == UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart )
{
logger.info("The maximum assignments-batches (" + UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart + ") to be handled was reached! Shut down, in order for the external Linux-service to restart on its own..");
UrlsWorkerApplication.gentleAppShutdown();
}
}
public static boolean postWorkerReport(Long assignmentRequestCounter)
{
String postUrl = UrlsWorkerApplication.controllerBaseUrl + "urls/addWorkerReport";
logger.info("Going to post the WorkerReport of assignment_" + assignmentRequestCounter + " to the controller-server: " + postUrl);
try {
ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class);
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode == HttpStatus.OK.value() ) {
logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, and the full-text delivering, were successful!");
return true;
} else {
logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode);
return false;
}
} catch (Exception e) {
logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e);
return false;
} finally {
urlReports.clear(); // Reset, without de-allocating.
assignmentsForPlugins.clear();
}
}
public static void countDatasourcesAndRecords(int assignmentsSize)
{
Set<String> datasources = assignmentsForPlugins.keySet();
int numDatasources = datasources.size();
logger.debug("Num of datasources: " + numDatasources);
for ( String datasource : datasources ) {
logger.debug("Num of records for datasource \"" + datasource + "\" is: " + assignmentsForPlugins.get(datasource).size() );
}
logger.debug("Average num of records per datasource: " + (assignmentsSize / numDatasources));
}
}