99 lines
3.5 KiB
Java
99 lines
3.5 KiB
Java
package eu.openaire.urls_worker.util;
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.google.common.collect.HashMultimap;
|
|
import com.google.common.collect.Multimap;
|
|
import eu.openaire.urls_worker.models.Assignment;
|
|
import eu.openaire.urls_worker.models.Task;
|
|
import eu.openaire.urls_worker.payloads.requests.AssignmentRequest;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.boot.web.client.RestTemplateBuilder;
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
|
import java.util.*;
|
|
|
|
|
|
public class AssignmentHandler {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(AssignmentHandler.class);
|
|
|
|
public static boolean isAvailableForWork = false;
|
|
|
|
public static Assignment requestAssignment()
|
|
{
|
|
RestTemplate restTemplate = new RestTemplateBuilder().build();
|
|
String url = "http://localhost:8080/api/urls/test?workerId=" + WorkerConstants.WORKER_ID + "&tasksLimit=" + WorkerConstants.TASKS_LIMIT;
|
|
String json = null;
|
|
try {
|
|
json = restTemplate.getForObject(url, String.class);
|
|
} catch (Exception e) {
|
|
logger.error("Could not retrieve the assignment: " + e.getMessage());
|
|
return null;
|
|
}
|
|
|
|
AssignmentRequest assignmentRequest = null;
|
|
ObjectMapper objectMapper = new ObjectMapper();
|
|
try {
|
|
assignmentRequest = objectMapper.readValue(json, AssignmentRequest.class);
|
|
} catch (JsonProcessingException e) {
|
|
e.printStackTrace();
|
|
}
|
|
|
|
if ( assignmentRequest == null ) {
|
|
String errorMessage = "Could not map the json to an \"assignmentRequest\"!";
|
|
System.err.println(errorMessage);
|
|
logger.error(errorMessage);
|
|
System.exit(1);
|
|
}
|
|
|
|
//logger.debug(assignmentRequest.toString()); // DEBUG!
|
|
|
|
Assignment assignment = assignmentRequest.getAssignment();
|
|
logger.info("Assignment with id < " + assignment.getAssignmentId() + " > was received and it's ready to be processed. It contains " + assignment.getTasks().size() + " tasks.");
|
|
// TODO - Maybe create a HashSet with these IDs. It may be useful for the Worker to know and report which assignments (and how many) it has processed.
|
|
|
|
return assignment;
|
|
}
|
|
|
|
|
|
public static void handleAssignment()
|
|
{
|
|
Assignment assignment = requestAssignment();
|
|
if ( assignment == null ) {
|
|
logger.warn("The assignment was found to be null!");
|
|
return;
|
|
}
|
|
isAvailableForWork = false;
|
|
|
|
List<Task> tasks = assignment.getTasks();
|
|
|
|
// Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later.
|
|
|
|
Multimap<String, Task> tasksForPlugins = HashMultimap.create();
|
|
|
|
for ( Task task : tasks ) {
|
|
// Add each task in its own HashSet.
|
|
tasksForPlugins.put(task.getDatasource().getId(), task);
|
|
}
|
|
|
|
// First run -in parallel- the tasks which require some specific plugin.
|
|
|
|
// Then run the remaining tasks in the generic plugin (which handles parallelism itself).
|
|
|
|
|
|
|
|
|
|
// Check availability-endpoint using Thread-sleep here. DEBUG!
|
|
try {
|
|
Thread.sleep(30_000); // 30 seconds
|
|
} catch (InterruptedException ie) {
|
|
logger.warn("Sleeping interrupted!");
|
|
}
|
|
|
|
isAvailableForWork = true;
|
|
}
|
|
|
|
}
|