2021-05-20 02:28:48 +02:00
package eu.openaire.urls_worker.util ;
import com.fasterxml.jackson.core.JsonProcessingException ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import com.google.common.collect.HashMultimap ;
import com.google.common.collect.Multimap ;
2021-09-21 15:21:39 +02:00
import eu.openaire.urls_worker.UrlsWorkerApplication ;
2021-05-20 02:28:48 +02:00
import eu.openaire.urls_worker.models.Assignment ;
2021-06-22 04:58:07 +02:00
import eu.openaire.urls_worker.models.UrlReport ;
2021-05-20 02:28:48 +02:00
import eu.openaire.urls_worker.payloads.requests.AssignmentRequest ;
2021-06-22 04:58:07 +02:00
import eu.openaire.urls_worker.payloads.responces.WorkerReport ;
2021-07-29 08:01:53 +02:00
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin ;
2021-05-20 02:28:48 +02:00
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.boot.web.client.RestTemplateBuilder ;
2021-06-22 04:58:07 +02:00
import org.springframework.http.HttpStatus ;
import org.springframework.http.ResponseEntity ;
2021-05-20 02:28:48 +02:00
import java.util.* ;
public class AssignmentHandler {
private static final Logger logger = LoggerFactory . getLogger ( AssignmentHandler . class ) ;
2021-06-11 12:44:33 +02:00
public static boolean isAvailableForWork = true ;
2021-05-20 02:28:48 +02:00
2021-06-22 04:58:07 +02:00
public static List < UrlReport > urlReports = null ;
2021-09-21 15:21:39 +02:00
private static final boolean askForTest = false ; // Enable this only for testing.
2021-08-05 14:09:28 +02:00
2021-06-22 04:58:07 +02:00
2021-07-05 14:00:29 +02:00
public static AssignmentRequest requestAssignments ( )
2021-06-10 13:29:20 +02:00
{
2021-09-22 15:36:48 +02:00
String requestUrl = UrlsWorkerApplication . controllerBaseUrl + " urls " + ( askForTest ? " /test " : " " ) + " ?workerId= " + UrlsWorkerApplication . workerId + " &workerAssignmentsLimit= " + WorkerConstants . ASSIGNMENTS_LIMIT ;
2021-09-21 15:21:39 +02:00
logger . info ( " Going to request assignments from the controller-server: " + requestUrl ) ;
2021-05-20 02:28:48 +02:00
String json = null ;
try {
2021-09-21 15:21:39 +02:00
json = new RestTemplateBuilder ( ) . build ( ) . getForObject ( requestUrl , String . class ) ;
2021-05-20 02:28:48 +02:00
} catch ( Exception e ) {
2021-09-21 15:21:39 +02:00
logger . error ( " Could not retrieve the assignments! \ n " + e . getMessage ( ) ) ;
2021-05-20 02:28:48 +02:00
return null ;
}
AssignmentRequest assignmentRequest = null ;
try {
2021-09-21 15:21:39 +02:00
assignmentRequest = new ObjectMapper ( ) . readValue ( json , AssignmentRequest . class ) ;
2021-05-20 02:28:48 +02:00
} catch ( JsonProcessingException e ) {
e . printStackTrace ( ) ;
}
if ( assignmentRequest = = null ) {
String errorMessage = " Could not map the json to an \" assignmentRequest \" ! " ;
System . err . println ( errorMessage ) ;
logger . error ( errorMessage ) ;
System . exit ( 1 ) ;
}
2021-06-09 04:45:07 +02:00
//logger.debug(assignmentRequest.toString()); // DEBUG!
2021-05-20 02:28:48 +02:00
2021-07-05 14:00:29 +02:00
logger . info ( " AssignmentRequest < " + assignmentRequest . getAssignmentCounter ( ) + " > was received and it's ready to be processed. It contains " + assignmentRequest . getAssignments ( ) . size ( ) + " tasks. " ) ;
2021-06-09 04:45:07 +02:00
2021-07-05 14:00:29 +02:00
return assignmentRequest ;
2021-05-20 02:28:48 +02:00
}
2021-07-05 14:00:29 +02:00
public static void handleAssignments ( )
2021-06-10 13:29:20 +02:00
{
2021-07-05 14:00:29 +02:00
AssignmentRequest assignmentRequest = requestAssignments ( ) ;
2021-08-05 14:09:28 +02:00
if ( assignmentRequest = = null ) {
logger . error ( " The \" assignmentRequest \" was \" null \" ! " ) ;
return ;
}
2021-07-05 14:00:29 +02:00
Long assignmentRequestCounter = assignmentRequest . getAssignmentCounter ( ) ;
List < Assignment > assignments = assignmentRequest . getAssignments ( ) ;
if ( assignments = = null ) {
logger . warn ( " The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter ) ;
2021-05-20 02:28:48 +02:00
return ;
}
2021-06-11 12:44:33 +02:00
2021-07-05 14:00:29 +02:00
// Start handling the assignments, the worker is busy.
2021-05-20 02:28:48 +02:00
isAvailableForWork = false ;
// Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later.
2021-07-05 14:00:29 +02:00
Multimap < String , Assignment > assignmentsForPlugins = HashMultimap . create ( ) ;
2021-05-20 02:28:48 +02:00
2021-07-05 14:00:29 +02:00
for ( Assignment assignment : assignments ) {
2021-05-20 02:28:48 +02:00
// Add each task in its own HashSet.
2021-07-05 14:00:29 +02:00
assignmentsForPlugins . put ( assignment . getDatasource ( ) . getId ( ) , assignment ) ;
2021-05-20 02:28:48 +02:00
}
2021-07-05 14:00:29 +02:00
urlReports = new ArrayList < > ( assignments . size ( ) ) ; // Define the new UrlReport-list. It is reinitialized each time.
2021-06-22 04:58:07 +02:00
// TODO - Decide which tasks run with what plugin (depending on their datasource).
2021-05-20 02:28:48 +02:00
// First run -in parallel- the tasks which require some specific plugin.
// Then run the remaining tasks in the generic plugin (which handles parallelism itself).
2021-06-22 04:58:07 +02:00
// For now, let's just run all tasks in the generic plugin.
try {
2021-09-08 04:02:14 +02:00
PublicationsRetrieverPlugin . processAssignments ( assignmentRequestCounter , assignmentsForPlugins . values ( ) ) ;
2021-06-22 04:58:07 +02:00
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
}
2021-09-08 04:02:14 +02:00
isAvailableForWork = true ; // State this before posting, to catch the soonest next scheduled request.
2021-06-22 04:58:07 +02:00
2021-09-21 15:21:39 +02:00
/ * logger . debug ( " UrlReports: " ) ; // DEBUG!
for ( UrlReport urlReport : urlReports ) {
logger . debug ( urlReport . toString ( ) ) ;
} * /
2021-09-08 04:02:14 +02:00
postWorkerReport ( assignmentRequestCounter ) ;
2021-09-21 15:21:39 +02:00
// Note: Cannot call this method here retrospectively, as if it runs 100s of times, the memory may break..
// The scheduler will handle calling it every half an hour, in case the Worker is available for work..
2021-06-22 04:58:07 +02:00
}
2021-05-20 02:28:48 +02:00
2021-07-05 14:00:29 +02:00
public static boolean postWorkerReport ( Long assignmentRequestCounter )
2021-06-22 04:58:07 +02:00
{
2021-09-21 15:21:39 +02:00
String postUrl = UrlsWorkerApplication . controllerBaseUrl + " urls/addWorkerReport " ;
logger . info ( " Going to post the WorkerReport to the controller-server: " + postUrl ) ;
2021-05-20 02:28:48 +02:00
try {
2021-09-22 15:36:48 +02:00
ResponseEntity < String > responseEntity = new RestTemplateBuilder ( ) . build ( ) . postForEntity ( postUrl , new WorkerReport ( UrlsWorkerApplication . workerId , assignmentRequestCounter , urlReports ) , String . class ) ;
2021-06-22 04:58:07 +02:00
int responseCode = responseEntity . getStatusCode ( ) . value ( ) ;
if ( responseCode ! = HttpStatus . OK . value ( ) ) {
2021-07-05 14:00:29 +02:00
logger . error ( " Connection problem with the submission of the WorkerReport of assignment_ " + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode ) ;
2021-06-22 04:58:07 +02:00
return false ;
}
} catch ( Exception e ) {
2021-07-05 14:00:29 +02:00
logger . error ( " Error when submitting the WorkerReport of assignment_ " + assignmentRequestCounter + " to the Controller: " , e ) ;
2021-06-22 04:58:07 +02:00
return false ;
2021-09-21 15:21:39 +02:00
} finally {
AssignmentHandler . urlReports . clear ( ) ; // Reset the UrlReports list for the next assignments-handling.
2021-05-20 02:28:48 +02:00
}
2021-06-22 04:58:07 +02:00
return true ;
2021-05-20 02:28:48 +02:00
}
}