2021-05-20 02:28:48 +02:00
package eu.openaire.urls_worker.util ;
import com.fasterxml.jackson.core.JsonProcessingException ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import com.google.common.collect.HashMultimap ;
import com.google.common.collect.Multimap ;
import eu.openaire.urls_worker.models.Assignment ;
import eu.openaire.urls_worker.models.Task ;
import eu.openaire.urls_worker.payloads.requests.AssignmentRequest ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.boot.web.client.RestTemplateBuilder ;
import org.springframework.web.client.RestTemplate ;
import java.util.* ;
public class AssignmentHandler {
private static final Logger logger = LoggerFactory . getLogger ( AssignmentHandler . class ) ;
2021-06-11 12:44:33 +02:00
public static boolean isAvailableForWork = true ;
2021-05-20 02:28:48 +02:00
2021-06-10 13:29:20 +02:00
public static Assignment requestAssignment ( )
{
2021-05-20 02:28:48 +02:00
RestTemplate restTemplate = new RestTemplateBuilder ( ) . build ( ) ;
String url = " http://localhost:8080/api/urls/test?workerId= " + WorkerConstants . WORKER_ID + " &tasksLimit= " + WorkerConstants . TASKS_LIMIT ;
String json = null ;
try {
json = restTemplate . getForObject ( url , String . class ) ;
} catch ( Exception e ) {
logger . error ( " Could not retrieve the assignment: " + e . getMessage ( ) ) ;
return null ;
}
AssignmentRequest assignmentRequest = null ;
ObjectMapper objectMapper = new ObjectMapper ( ) ;
try {
assignmentRequest = objectMapper . readValue ( json , AssignmentRequest . class ) ;
} catch ( JsonProcessingException e ) {
e . printStackTrace ( ) ;
}
if ( assignmentRequest = = null ) {
String errorMessage = " Could not map the json to an \" assignmentRequest \" ! " ;
System . err . println ( errorMessage ) ;
logger . error ( errorMessage ) ;
System . exit ( 1 ) ;
}
2021-06-09 04:45:07 +02:00
//logger.debug(assignmentRequest.toString()); // DEBUG!
2021-05-20 02:28:48 +02:00
2021-06-09 04:45:07 +02:00
Assignment assignment = assignmentRequest . getAssignment ( ) ;
2021-06-10 13:29:20 +02:00
logger . info ( " Assignment with id < " + assignment . getAssignmentId ( ) + " > was received and it's ready to be processed. It contains " + assignment . getTasks ( ) . size ( ) + " tasks. " ) ;
2021-06-09 04:45:07 +02:00
// TODO - Maybe create a HashSet with these IDs. It may be useful for the Worker to know and report which assignments (and how many) it has processed.
return assignment ;
2021-05-20 02:28:48 +02:00
}
2021-06-10 13:29:20 +02:00
public static void handleAssignment ( )
{
2021-05-20 02:28:48 +02:00
Assignment assignment = requestAssignment ( ) ;
if ( assignment = = null ) {
logger . warn ( " The assignment was found to be null! " ) ;
return ;
}
2021-06-11 12:44:33 +02:00
// Start handling the assignment, the worker is busy.
2021-05-20 02:28:48 +02:00
isAvailableForWork = false ;
List < Task > tasks = assignment . getTasks ( ) ;
// Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later.
Multimap < String , Task > tasksForPlugins = HashMultimap . create ( ) ;
for ( Task task : tasks ) {
// Add each task in its own HashSet.
tasksForPlugins . put ( task . getDatasource ( ) . getId ( ) , task ) ;
}
// First run -in parallel- the tasks which require some specific plugin.
// Then run the remaining tasks in the generic plugin (which handles parallelism itself).
// Check availability-endpoint using Thread-sleep here. DEBUG!
try {
Thread . sleep ( 30_000 ) ; // 30 seconds
} catch ( InterruptedException ie ) {
logger . warn ( " Sleeping interrupted! " ) ;
}
isAvailableForWork = true ;
}
}