UrlsWorker/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java

190 lines
9.0 KiB
Java

package eu.openaire.urls_worker.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.payloads.requests.AssignmentRequest;
import eu.openaire.urls_worker.payloads.responces.WorkerReport;
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.web.client.ResponseErrorHandler;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
public class AssignmentHandler {
private static final Logger logger = LoggerFactory.getLogger(AssignmentHandler.class);
public static boolean isAvailableForWork = true;
public static List<UrlReport> urlReports = null;
private static final boolean askForTest = false; // Enable this only for testing.
private static final Duration requestConnectTimeoutDuration = Duration.ofMinutes(1); // 1 minute.
private static final Duration requestReadTimeoutDuration = Duration.ofMinutes(60); // 60 minutes. Time to wait for the data to get transferred over the network. Many workers may try to get assignments from the Worker, so each worker might have to wait some 10s of minutes for work.
// The controller has to retrieve the data from the database, then prepare them in memory, insert them in the "assignment"-table and, finally, return them to the worker.
private static boolean encounteredHTTPRequestError = false;
public static AssignmentRequest requestAssignments()
{
String requestUrl = UrlsWorkerApplication.controllerBaseUrl + "urls" + (askForTest ? "/test" : "") + "?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT;
logger.info("Going to request assignments from the controller-server: " + requestUrl);
RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build();
ResponseErrorHandler responseErrorHandler = new ResponseErrorHandler() {
@Override
public boolean hasError(ClientHttpResponse response) throws IOException {
if ( response.getRawStatusCode() != 200 ) {
encounteredHTTPRequestError = true;
return true;
} else {
encounteredHTTPRequestError = false; // Make sure the value is reset here, to avoid non-present errors from previous requests.
return false;
}
}
@Override
public void handleError(ClientHttpResponse response) throws IOException {
int responseCode = response.getRawStatusCode();
String statusText = response.getStatusText();
String additionalErrorMessage = ((!"".equals(statusText)) ? statusText : "The HTTP-response-code was: " + responseCode);
if ( (responseCode >= 500) && responseCode <= 599 )
logger.error("The Controller encountered a problem! " + additionalErrorMessage);
else if ( (responseCode >= 400) && (responseCode <= 499) )
logger.error("There was a bad request to the Controller! " + additionalErrorMessage);
else if ( responseCode != 200 )
logger.error("There was an HTTP-error when requesting the assignments from the Controller! " + additionalErrorMessage);
}
};
restTemplate.setErrorHandler(responseErrorHandler);
String json = null;
try { // Here, the HTTP-request is executed.
json = restTemplate.getForObject(requestUrl, String.class);
} catch (RestClientException e) {
logger.error("Could not retrieve the assignments!\n" + e.getMessage());
return null;
}
if ( encounteredHTTPRequestError )
return null;
AssignmentRequest assignmentRequest = null;
try {
assignmentRequest = new ObjectMapper().readValue(json, AssignmentRequest.class);
} catch (JsonProcessingException jpe) {
String errorMessage = "Could not process/map the json to an \"assignmentRequest\"!\n" + jpe.getMessage();
logger.error(errorMessage);
System.err.println(errorMessage);
jpe.printStackTrace();
return null;
}
//logger.debug(assignmentRequest.toString()); // DEBUG!
logger.info("AssignmentRequest < " + assignmentRequest.getAssignmentCounter() + " > was received and it's ready to be processed. It contains " + assignmentRequest.getAssignments().size() + " tasks.");
return assignmentRequest;
}
public static void handleAssignments()
{
AssignmentRequest assignmentRequest = requestAssignments();
if ( assignmentRequest == null ) {
logger.error("The \"assignmentRequest\" was \"null\"!");
return;
}
Long assignmentRequestCounter = assignmentRequest.getAssignmentCounter();
List<Assignment> assignments = assignmentRequest.getAssignments();
if ( assignments == null ) {
logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter);
return;
}
// Start handling the assignments, the worker is busy.
isAvailableForWork = false;
// Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later.
Multimap<String, Assignment> assignmentsForPlugins = HashMultimap.create();
for ( Assignment assignment : assignments ) {
// Add each task in its own HashSet.
assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment);
}
urlReports = new ArrayList<>(assignments.size()); // Define the new UrlReport-list. It is reinitialized each time.
// TODO - Decide which tasks run with what plugin (depending on their datasource).
// First run -in parallel- the tasks which require some specific plugin.
// Then run the remaining tasks in the generic plugin (which handles parallelism itself).
// For now, let's just run all tasks in the generic plugin.
try {
PublicationsRetrieverPlugin.processAssignments(assignmentRequestCounter, assignmentsForPlugins.values());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
isAvailableForWork = true; // State this before posting, to catch the soonest next scheduled request.
if ( askForTest ) {
logger.debug("UrlReports:"); // DEBUG!
for ( UrlReport urlReport : urlReports )
logger.debug(urlReport.toString());
} // Avoid posting the results in "askForTestUrls"-mode. We don't want for test-results to be written into the database by the controller.
else
postWorkerReport(assignmentRequestCounter);
// Note: Cannot call this method here retrospectively, as if it runs 100s of times, the memory-stack may break..
// The scheduler will handle calling it every half an hour, in case the Worker is available for work..
}
public static boolean postWorkerReport(Long assignmentRequestCounter)
{
String postUrl = UrlsWorkerApplication.controllerBaseUrl + "urls/addWorkerReport";
logger.info("Going to post the WorkerReport to the controller-server: " + postUrl);
try {
ResponseEntity<String> responseEntity = new RestTemplateBuilder().build().postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class);
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode != HttpStatus.OK.value() ) {
logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode);
return false;
}
} catch (Exception e) {
logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e);
return false;
} finally {
AssignmentHandler.urlReports.clear(); // Reset the UrlReports list for the next assignments-handling.
}
return true;
}
}