From 82e12655e79df6953ef4236fe297e9d87be18291 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Thu, 20 May 2021 03:28:48 +0300 Subject: [PATCH] - Add an "AssignmentHandler", which retrieves the assignment from the controller and categorises the tasks using their datasource. In the future, it will execute the tasks of the assignment, using different plugins. It runs upon the Application start and also every 30 mins (if no other job is in execution). - Add the "isWorkerAvailableForWork-endpoint. --- build.gradle | 2 +- .../urls_worker/UrlsWorkerApplication.java | 6 ++ .../components/ScheduledTasks.java | 9 +- .../controllers/GeneralController.java | 20 ++++ .../urls_worker/util/AssignmentHandler.java | 94 +++++++++++++++++++ 5 files changed, 129 insertions(+), 2 deletions(-) create mode 100644 src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java diff --git a/build.gradle b/build.gradle index 314bedc..4eabaa9 100644 --- a/build.gradle +++ b/build.gradle @@ -38,7 +38,7 @@ dependencies { implementation group: 'commons-io', name: 'commons-io', version: '2.8.0' // https://mvnrepository.com/artifact/com.google.guava/guava - // implementation group: 'com.google.guava', name: 'guava', version: '30.1.1-jre' // It will be usefull later.. + implementation group: 'com.google.guava', name: 'guava', version: '30.1.1-jre' // It will be usefull later.. testImplementation group: 'org.springframework.security', name: 'spring-security-test', version: springSecurityVersion testImplementation 'org.springframework.boot:spring-boot-starter-test' diff --git a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java index f035034..86e0d72 100644 --- a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java +++ b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java @@ -1,13 +1,19 @@ package eu.openaire.urls_worker; +import eu.openaire.urls_worker.util.AssignmentHandler; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; + @SpringBootApplication +@EnableScheduling public class UrlsWorkerApplication { public static void main(String[] args) { SpringApplication.run(UrlsWorkerApplication.class, args); + + AssignmentHandler.handleAssignment(); } } diff --git a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java index 83ddd27..fb7899d 100644 --- a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java @@ -1,5 +1,6 @@ package eu.openaire.urls_worker.components; +import eu.openaire.urls_worker.util.AssignmentHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -16,8 +17,14 @@ public class ScheduledTasks { private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); - @Scheduled(fixedRate = 600_000) // TODO - Change to every 10 mins: 600_000 + @Scheduled(fixedRate = 600_000) // Every 10 mins: 600_000 public void reportCurrentTime() { logger.info("Server is live! Time is now {}", dateFormat.format(new Date())); } + + @Scheduled(fixedRate = 1800_000) // Every 30 mins: 1800_000 + public void handleAssignment() { + if ( AssignmentHandler.isAvailableForWork ) + AssignmentHandler.handleAssignment(); + } } \ No newline at end of file diff --git a/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java b/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java index 659d91e..7fa3204 100644 --- a/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java +++ b/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java @@ -1,7 +1,11 @@ package eu.openaire.urls_worker.controllers; +import eu.openaire.urls_worker.payloads.responces.WorkerResponse; +import eu.openaire.urls_worker.util.AssignmentHandler; +import eu.openaire.urls_worker.util.WorkerConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; @@ -24,4 +28,20 @@ public class GeneralController { return ResponseEntity.ok().build(); } + + @GetMapping("isAvailableForWork") + public ResponseEntity isWorkerAvailableForWork() { + + logger.info("Received an \"isWorkerAvailableForWork\" request."); + + if ( AssignmentHandler.isAvailableForWork ) { + logger.info("The worker is available for an assignment."); + return ResponseEntity.status(200).body(new WorkerResponse(WorkerConstants.WORKER_ID, WorkerConstants.TASKS_LIMIT)); + } + else { + logger.info("The worker is busy with another assignment."); + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); + } + } + } diff --git a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java new file mode 100644 index 0000000..7d73873 --- /dev/null +++ b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java @@ -0,0 +1,94 @@ +package eu.openaire.urls_worker.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import eu.openaire.urls_worker.models.Assignment; +import eu.openaire.urls_worker.models.Task; +import eu.openaire.urls_worker.payloads.requests.AssignmentRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.web.client.RestTemplate; + +import java.util.*; + + +public class AssignmentHandler { + + private static final Logger logger = LoggerFactory.getLogger(AssignmentHandler.class); + + public static boolean isAvailableForWork = false; + + public static Assignment requestAssignment() { + + RestTemplate restTemplate = new RestTemplateBuilder().build(); + String url = "http://localhost:8080/api/urls/test?workerId=" + WorkerConstants.WORKER_ID + "&tasksLimit=" + WorkerConstants.TASKS_LIMIT; + String json = null; + try { + json = restTemplate.getForObject(url, String.class); + } catch (Exception e) { + logger.error("Could not retrieve the assignment: " + e.getMessage()); + return null; + } + + AssignmentRequest assignmentRequest = null; + ObjectMapper objectMapper = new ObjectMapper(); + try { + assignmentRequest = objectMapper.readValue(json, AssignmentRequest.class); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + if ( assignmentRequest == null ) { + String errorMessage = "Could not map the json to an \"assignmentRequest\"!"; + System.err.println(errorMessage); + logger.error(errorMessage); + System.exit(1); + } + + logger.debug(assignmentRequest.toString()); + + return assignmentRequest.getAssignment(); + } + + + public static void handleAssignment() { + + Assignment assignment = requestAssignment(); + if ( assignment == null ) { + logger.warn("The assignment was found to be null!"); + return; + } + isAvailableForWork = false; + + List tasks = assignment.getTasks(); + + // Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later. + + Multimap tasksForPlugins = HashMultimap.create(); + + for ( Task task : tasks ) { + // Add each task in its own HashSet. + tasksForPlugins.put(task.getDatasource().getId(), task); + } + + // First run -in parallel- the tasks which require some specific plugin. + + // Then run the remaining tasks in the generic plugin (which handles parallelism itself). + + + + + // Check availability-endpoint using Thread-sleep here. DEBUG! + try { + Thread.sleep(30_000); // 30 seconds + } catch (InterruptedException ie) { + logger.warn("Sleeping interrupted!"); + } + + isAvailableForWork = true; + } + +}