UrlsWorker/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java

134 lines
5.1 KiB
Java

package eu.openaire.urls_worker.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.Task;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.payloads.requests.AssignmentRequest;
import eu.openaire.urls_worker.payloads.responces.WorkerReport;
import eu.openaire.urls_worker.plugins.publicationsRetriever.PublicationsRetrieverPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
import java.util.*;
public class AssignmentHandler {
private static final Logger logger = LoggerFactory.getLogger(AssignmentHandler.class);
public static boolean isAvailableForWork = true;
public static List<UrlReport> urlReports = null;
public static Assignment requestAssignment()
{
RestTemplate restTemplate = new RestTemplateBuilder().build();
String url = "http://localhost:8080/api/urls/test?workerId=" + WorkerConstants.WORKER_ID + "&tasksLimit=" + WorkerConstants.TASKS_LIMIT;
String json = null;
try {
json = restTemplate.getForObject(url, String.class);
} catch (Exception e) {
logger.error("Could not retrieve the assignment: " + e.getMessage());
return null;
}
AssignmentRequest assignmentRequest = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
assignmentRequest = objectMapper.readValue(json, AssignmentRequest.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
if ( assignmentRequest == null ) {
String errorMessage = "Could not map the json to an \"assignmentRequest\"!";
System.err.println(errorMessage);
logger.error(errorMessage);
System.exit(1);
}
//logger.debug(assignmentRequest.toString()); // DEBUG!
Assignment assignment = assignmentRequest.getAssignment();
logger.info("Assignment with id < " + assignment.getAssignmentId() + " > was received and it's ready to be processed. It contains " + assignment.getTasks().size() + " tasks.");
// TODO - Maybe create a HashSet with these IDs. It may be useful for the Worker to know and report which assignments (and how many) it has processed.
return assignment;
}
public static void handleAssignment()
{
Assignment assignment = requestAssignment();
if ( assignment == null ) {
logger.warn("The assignment was found to be null!");
return;
}
// Start handling the assignment, the worker is busy.
isAvailableForWork = false;
List<Task> tasks = assignment.getTasks();
// Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later.
Multimap<String, Task> tasksForPlugins = HashMultimap.create();
for ( Task task : tasks ) {
// Add each task in its own HashSet.
tasksForPlugins.put(task.getDatasource().getId(), task);
}
urlReports = new ArrayList<>(tasks.size()); // Define the new UrlReport-list. It is reinitialized each time.
// TODO - Decide which tasks run with what plugin (depending on their datasource).
// First run -in parallel- the tasks which require some specific plugin.
// Then run the remaining tasks in the generic plugin (which handles parallelism itself).
// For now, let's just run all tasks in the generic plugin.
try {
PublicationsRetrieverPlugin.processTasks(assignment.getAssignmentId(), tasksForPlugins.values());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
postWorkerReport(assignment.getAssignmentId());
isAvailableForWork = true;
}
public static boolean postWorkerReport(int assignmentId)
{
RestTemplate restTemplate = new RestTemplateBuilder().build();
String url = "http://localhost:8080/api/urls/addWorkerReport";
try {
ResponseEntity responseEntity = restTemplate.postForEntity(url, new WorkerReport(WorkerConstants.WORKER_ID, assignmentId, urlReports), String.class);
int responseCode = responseEntity.getStatusCode().value();
if ( responseCode != HttpStatus.OK.value() ) {
logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentId + " to the Controller. Error-code was: " + responseCode);
return false;
}
} catch (Exception e) {
logger.error("Error when submitting the WorkerReport of assignment_" + assignmentId + " to the Controller: ", e);
return false;
}
return true;
}
}