package eu.openaire.urls_worker.util; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import eu.openaire.urls_worker.models.Assignment; import eu.openaire.urls_worker.models.UrlReport; import eu.openaire.urls_worker.payloads.requests.AssignmentRequest; import eu.openaire.urls_worker.payloads.responces.WorkerReport; import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestTemplate; import java.util.*; public class AssignmentHandler { private static final Logger logger = LoggerFactory.getLogger(AssignmentHandler.class); public static boolean isAvailableForWork = true; public static List urlReports = null; public static AssignmentRequest requestAssignments() { RestTemplate restTemplate = new RestTemplateBuilder().build(); String url = "http://localhost:1880/api/urls/test?workerId=" + WorkerConstants.WORKER_ID + "&workerAssignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT; String json = null; try { json = restTemplate.getForObject(url, String.class); } catch (Exception e) { logger.error("Could not retrieve the assignment: " + e.getMessage()); return null; } AssignmentRequest assignmentRequest = null; ObjectMapper objectMapper = new ObjectMapper(); try { assignmentRequest = objectMapper.readValue(json, AssignmentRequest.class); } catch (JsonProcessingException e) { e.printStackTrace(); } if ( assignmentRequest == null ) { String errorMessage = "Could not map the json to an \"assignmentRequest\"!"; System.err.println(errorMessage); logger.error(errorMessage); System.exit(1); } //logger.debug(assignmentRequest.toString()); // DEBUG! logger.info("AssignmentRequest < " + assignmentRequest.getAssignmentCounter() + " > was received and it's ready to be processed. It contains " + assignmentRequest.getAssignments().size() + " tasks."); // TODO - Maybe create a HashSet with these IDs. It may be useful for the Worker to know and report which assignments (and how many) it has processed. return assignmentRequest; } public static void handleAssignments() { AssignmentRequest assignmentRequest = requestAssignments(); Long assignmentRequestCounter = assignmentRequest.getAssignmentCounter(); List assignments = assignmentRequest.getAssignments(); if ( assignments == null ) { logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter); return; } // Start handling the assignments, the worker is busy. isAvailableForWork = false; // Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later. Multimap assignmentsForPlugins = HashMultimap.create(); for ( Assignment assignment : assignments ) { // Add each task in its own HashSet. assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment); } urlReports = new ArrayList<>(assignments.size()); // Define the new UrlReport-list. It is reinitialized each time. // TODO - Decide which tasks run with what plugin (depending on their datasource). // First run -in parallel- the tasks which require some specific plugin. // Then run the remaining tasks in the generic plugin (which handles parallelism itself). // For now, let's just run all tasks in the generic plugin. try { PublicationsRetrieverPlugin.processAssginmets(assignmentRequestCounter, assignmentsForPlugins.values()); } catch (Exception e) { logger.error(e.getMessage(), e); } postWorkerReport(assignmentRequestCounter); isAvailableForWork = true; } public static boolean postWorkerReport(Long assignmentRequestCounter) { RestTemplate restTemplate = new RestTemplateBuilder().build(); String url = "http://localhost:1880/api/urls/addWorkerReport"; try { ResponseEntity responseEntity = restTemplate.postForEntity(url, new WorkerReport(WorkerConstants.WORKER_ID, assignmentRequestCounter, urlReports), String.class); int responseCode = responseEntity.getStatusCode().value(); if ( responseCode != HttpStatus.OK.value() ) { logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode); return false; } } catch (Exception e) { logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e); return false; } return true; } }