- Add an "AssignmentHandler", which retrieves the assignment from the controller and categorises the tasks using their datasource. In the future, it will execute the tasks of the assignment, using different plugins. It runs upon the Application start and also every 30 mins (if no other job is in execution).

- Add the "isWorkerAvailableForWork-endpoint.
This commit is contained in:
Lampros Smyrnaios 2021-05-20 03:28:48 +03:00
parent a4c97dffbf
commit 82e12655e7
5 changed files with 129 additions and 2 deletions

View File

@ -38,7 +38,7 @@ dependencies {
implementation group: 'commons-io', name: 'commons-io', version: '2.8.0'
// https://mvnrepository.com/artifact/com.google.guava/guava
// implementation group: 'com.google.guava', name: 'guava', version: '30.1.1-jre' // It will be usefull later..
implementation group: 'com.google.guava', name: 'guava', version: '30.1.1-jre' // It will be usefull later..
testImplementation group: 'org.springframework.security', name: 'spring-security-test', version: springSecurityVersion
testImplementation 'org.springframework.boot:spring-boot-starter-test'

View File

@ -1,13 +1,19 @@
package eu.openaire.urls_worker;
import eu.openaire.urls_worker.util.AssignmentHandler;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class UrlsWorkerApplication {
public static void main(String[] args) {
SpringApplication.run(UrlsWorkerApplication.class, args);
AssignmentHandler.handleAssignment();
}
}

View File

@ -1,5 +1,6 @@
package eu.openaire.urls_worker.components;
import eu.openaire.urls_worker.util.AssignmentHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
@ -16,8 +17,14 @@ public class ScheduledTasks {
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
@Scheduled(fixedRate = 600_000) // TODO - Change to every 10 mins: 600_000
@Scheduled(fixedRate = 600_000) // Every 10 mins: 600_000
public void reportCurrentTime() {
logger.info("Server is live! Time is now {}", dateFormat.format(new Date()));
}
@Scheduled(fixedRate = 1800_000) // Every 30 mins: 1800_000
public void handleAssignment() {
if ( AssignmentHandler.isAvailableForWork )
AssignmentHandler.handleAssignment();
}
}

View File

@ -1,7 +1,11 @@
package eu.openaire.urls_worker.controllers;
import eu.openaire.urls_worker.payloads.responces.WorkerResponse;
import eu.openaire.urls_worker.util.AssignmentHandler;
import eu.openaire.urls_worker.util.WorkerConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@ -24,4 +28,20 @@ public class GeneralController {
return ResponseEntity.ok().build();
}
@GetMapping("isAvailableForWork")
public ResponseEntity<?> isWorkerAvailableForWork() {
logger.info("Received an \"isWorkerAvailableForWork\" request.");
if ( AssignmentHandler.isAvailableForWork ) {
logger.info("The worker is available for an assignment.");
return ResponseEntity.status(200).body(new WorkerResponse(WorkerConstants.WORKER_ID, WorkerConstants.TASKS_LIMIT));
}
else {
logger.info("The worker is busy with another assignment.");
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
}
}
}

View File

@ -0,0 +1,94 @@
package eu.openaire.urls_worker.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.Task;
import eu.openaire.urls_worker.payloads.requests.AssignmentRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.web.client.RestTemplate;
import java.util.*;
public class AssignmentHandler {
private static final Logger logger = LoggerFactory.getLogger(AssignmentHandler.class);
public static boolean isAvailableForWork = false;
public static Assignment requestAssignment() {
RestTemplate restTemplate = new RestTemplateBuilder().build();
String url = "http://localhost:8080/api/urls/test?workerId=" + WorkerConstants.WORKER_ID + "&tasksLimit=" + WorkerConstants.TASKS_LIMIT;
String json = null;
try {
json = restTemplate.getForObject(url, String.class);
} catch (Exception e) {
logger.error("Could not retrieve the assignment: " + e.getMessage());
return null;
}
AssignmentRequest assignmentRequest = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
assignmentRequest = objectMapper.readValue(json, AssignmentRequest.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
if ( assignmentRequest == null ) {
String errorMessage = "Could not map the json to an \"assignmentRequest\"!";
System.err.println(errorMessage);
logger.error(errorMessage);
System.exit(1);
}
logger.debug(assignmentRequest.toString());
return assignmentRequest.getAssignment();
}
public static void handleAssignment() {
Assignment assignment = requestAssignment();
if ( assignment == null ) {
logger.warn("The assignment was found to be null!");
return;
}
isAvailableForWork = false;
List<Task> tasks = assignment.getTasks();
// Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later.
Multimap<String, Task> tasksForPlugins = HashMultimap.create();
for ( Task task : tasks ) {
// Add each task in its own HashSet.
tasksForPlugins.put(task.getDatasource().getId(), task);
}
// First run -in parallel- the tasks which require some specific plugin.
// Then run the remaining tasks in the generic plugin (which handles parallelism itself).
// Check availability-endpoint using Thread-sleep here. DEBUG!
try {
Thread.sleep(30_000); // 30 seconds
} catch (InterruptedException ie) {
logger.warn("Sleeping interrupted!");
}
isAvailableForWork = true;
}
}