diff --git a/build.gradle b/build.gradle index 314bedc..4eabaa9 100644 --- a/build.gradle +++ b/build.gradle @@ -38,7 +38,7 @@ dependencies { implementation group: 'commons-io', name: 'commons-io', version: '2.8.0' // https://mvnrepository.com/artifact/com.google.guava/guava - // implementation group: 'com.google.guava', name: 'guava', version: '30.1.1-jre' // It will be usefull later.. + implementation group: 'com.google.guava', name: 'guava', version: '30.1.1-jre' // It will be usefull later.. testImplementation group: 'org.springframework.security', name: 'spring-security-test', version: springSecurityVersion testImplementation 'org.springframework.boot:spring-boot-starter-test' diff --git a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java index f035034..86e0d72 100644 --- a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java +++ b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java @@ -1,13 +1,19 @@ package eu.openaire.urls_worker; +import eu.openaire.urls_worker.util.AssignmentHandler; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; + @SpringBootApplication +@EnableScheduling public class UrlsWorkerApplication { public static void main(String[] args) { SpringApplication.run(UrlsWorkerApplication.class, args); + + AssignmentHandler.handleAssignment(); } } diff --git a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java index 83ddd27..fb7899d 100644 --- a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java @@ -1,5 +1,6 @@ package eu.openaire.urls_worker.components; +import eu.openaire.urls_worker.util.AssignmentHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -16,8 +17,14 @@ public class ScheduledTasks { private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); - @Scheduled(fixedRate = 600_000) // TODO - Change to every 10 mins: 600_000 + @Scheduled(fixedRate = 600_000) // Every 10 mins: 600_000 public void reportCurrentTime() { logger.info("Server is live! Time is now {}", dateFormat.format(new Date())); } + + @Scheduled(fixedRate = 1800_000) // Every 30 mins: 1800_000 + public void handleAssignment() { + if ( AssignmentHandler.isAvailableForWork ) + AssignmentHandler.handleAssignment(); + } } \ No newline at end of file diff --git a/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java b/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java index 659d91e..7fa3204 100644 --- a/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java +++ b/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java @@ -1,7 +1,11 @@ package eu.openaire.urls_worker.controllers; +import eu.openaire.urls_worker.payloads.responces.WorkerResponse; +import eu.openaire.urls_worker.util.AssignmentHandler; +import eu.openaire.urls_worker.util.WorkerConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; @@ -24,4 +28,20 @@ public class GeneralController { return ResponseEntity.ok().build(); } + + @GetMapping("isAvailableForWork") + public ResponseEntity isWorkerAvailableForWork() { + + logger.info("Received an \"isWorkerAvailableForWork\" request."); + + if ( AssignmentHandler.isAvailableForWork ) { + logger.info("The worker is available for an assignment."); + return ResponseEntity.status(200).body(new WorkerResponse(WorkerConstants.WORKER_ID, WorkerConstants.TASKS_LIMIT)); + } + else { + logger.info("The worker is busy with another assignment."); + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); + } + } + } diff --git a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java new file mode 100644 index 0000000..7d73873 --- /dev/null +++ b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java @@ -0,0 +1,94 @@ +package eu.openaire.urls_worker.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import eu.openaire.urls_worker.models.Assignment; +import eu.openaire.urls_worker.models.Task; +import eu.openaire.urls_worker.payloads.requests.AssignmentRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.web.client.RestTemplate; + +import java.util.*; + + +public class AssignmentHandler { + + private static final Logger logger = LoggerFactory.getLogger(AssignmentHandler.class); + + public static boolean isAvailableForWork = false; + + public static Assignment requestAssignment() { + + RestTemplate restTemplate = new RestTemplateBuilder().build(); + String url = "http://localhost:8080/api/urls/test?workerId=" + WorkerConstants.WORKER_ID + "&tasksLimit=" + WorkerConstants.TASKS_LIMIT; + String json = null; + try { + json = restTemplate.getForObject(url, String.class); + } catch (Exception e) { + logger.error("Could not retrieve the assignment: " + e.getMessage()); + return null; + } + + AssignmentRequest assignmentRequest = null; + ObjectMapper objectMapper = new ObjectMapper(); + try { + assignmentRequest = objectMapper.readValue(json, AssignmentRequest.class); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + if ( assignmentRequest == null ) { + String errorMessage = "Could not map the json to an \"assignmentRequest\"!"; + System.err.println(errorMessage); + logger.error(errorMessage); + System.exit(1); + } + + logger.debug(assignmentRequest.toString()); + + return assignmentRequest.getAssignment(); + } + + + public static void handleAssignment() { + + Assignment assignment = requestAssignment(); + if ( assignment == null ) { + logger.warn("The assignment was found to be null!"); + return; + } + isAvailableForWork = false; + + List tasks = assignment.getTasks(); + + // Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later. + + Multimap tasksForPlugins = HashMultimap.create(); + + for ( Task task : tasks ) { + // Add each task in its own HashSet. + tasksForPlugins.put(task.getDatasource().getId(), task); + } + + // First run -in parallel- the tasks which require some specific plugin. + + // Then run the remaining tasks in the generic plugin (which handles parallelism itself). + + + + + // Check availability-endpoint using Thread-sleep here. DEBUG! + try { + Thread.sleep(30_000); // 30 seconds + } catch (InterruptedException ie) { + logger.warn("Sleeping interrupted!"); + } + + isAvailableForWork = true; + } + +}