From 6307cda23a6ccd5c0d50d5e7945c40372fe7f5de Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Mon, 5 Jul 2021 15:00:29 +0300 Subject: [PATCH] - Refactor the assignments-handling. In order to match with the database schema, now the AssignmentRequest returns a list of Assignments instead of a single assignment having a list of Tasks. - Cleanup the members of the "Payload" model. --- .../components/ScheduledTasks.java | 2 +- .../controllers/GeneralController.java | 2 +- .../urls_worker/models/Assignment.java | 59 ++++++++++++------- .../openaire/urls_worker/models/Payload.java | 40 ++++--------- .../payloads/requests/AssignmentRequest.java | 33 ++++++++--- .../payloads/responces/WorkerReport.java | 18 +++--- .../payloads/responces/WorkerResponse.java | 18 +++--- .../PublicationsRetrieverPlugin.java | 25 ++++---- .../urls_worker/util/AssignmentHandler.java | 46 +++++++-------- .../urls_worker/util/WorkerConstants.java | 2 +- 10 files changed, 132 insertions(+), 113 deletions(-) diff --git a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java index 6db8359..62d1143 100644 --- a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java @@ -26,7 +26,7 @@ public class ScheduledTasks { @Scheduled(fixedRate = 1800_000) // Every 30 mins: 1800_000 public void handleAssignment() { if ( AssignmentHandler.isAvailableForWork ) - AssignmentHandler.handleAssignment(); + AssignmentHandler.handleAssignments(); } //@Scheduled(fixedRate = 20_000) // Every 20 secs. diff --git a/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java b/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java index 7fa3204..06a3b69 100644 --- a/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java +++ b/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java @@ -36,7 +36,7 @@ public class GeneralController { if ( AssignmentHandler.isAvailableForWork ) { logger.info("The worker is available for an assignment."); - return ResponseEntity.status(200).body(new WorkerResponse(WorkerConstants.WORKER_ID, WorkerConstants.TASKS_LIMIT)); + return ResponseEntity.status(200).body(new WorkerResponse(WorkerConstants.WORKER_ID, WorkerConstants.ASSIGNMENTS_LIMIT)); } else { logger.info("The worker is busy with another assignment."); diff --git a/src/main/java/eu/openaire/urls_worker/models/Assignment.java b/src/main/java/eu/openaire/urls_worker/models/Assignment.java index 3d76695..ef00dee 100644 --- a/src/main/java/eu/openaire/urls_worker/models/Assignment.java +++ b/src/main/java/eu/openaire/urls_worker/models/Assignment.java @@ -5,53 +5,68 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import java.util.Date; -import java.util.List; @JsonInclude(JsonInclude.Include.NON_NULL) @JsonPropertyOrder({ - "assignmentId", - "tasks", + "id", + "original_url", + "datasource", "workerId", "date" }) public class Assignment { - @JsonProperty("assignmentId") - private int assignmentId; + @JsonProperty("id") + private String id; - @JsonProperty("tasks") - private List tasks; + @JsonProperty("original_url") + private String originalUrl; - @JsonProperty("workerId") + @JsonProperty("datasource") + private Datasource datasource; + + @JsonProperty("workerid") private String workerId; @JsonProperty("date") private Date date; + public Assignment() {} - public Assignment(int assignmentId, List tasks, String workerId, Date date) { - this.assignmentId = assignmentId; - this.tasks = tasks; + + public Assignment(String id, String originalUrl, Datasource datasource, String workerId, Date date) { + this.id = id; + this.originalUrl = originalUrl; + this.datasource = datasource; this.workerId = workerId; this.date = date; } - public int getAssignmentId() { - return assignmentId; + + public String getId() { + return id; } - public void setAssignmentId(int assignmentId) { - this.assignmentId = assignmentId; + public void setId(String id) { + this.id = id; } - public List getTasks() { - return tasks; + public String getOriginalUrl() { + return originalUrl; } - public void setTasks(List tasks) { - this.tasks = tasks; + public void setOriginalUrl(String originalUrl) { + this.originalUrl = originalUrl; + } + + public Datasource getDatasource() { + return datasource; + } + + public void setDatasource(Datasource datasource) { + this.datasource = datasource; } public String getWorkerId() { @@ -70,11 +85,13 @@ public class Assignment { this.date = date; } + @Override public String toString() { return "Assignment{" + - "assignmentId=" + assignmentId + - ", tasks=" + tasks + + "id='" + id + '\'' + + ", originalUrl='" + originalUrl + '\'' + + ", datasource=" + datasource + ", workerId='" + workerId + '\'' + ", date=" + date + '}'; diff --git a/src/main/java/eu/openaire/urls_worker/models/Payload.java b/src/main/java/eu/openaire/urls_worker/models/Payload.java index 4c3a0e0..55c3526 100644 --- a/src/main/java/eu/openaire/urls_worker/models/Payload.java +++ b/src/main/java/eu/openaire/urls_worker/models/Payload.java @@ -12,11 +12,10 @@ import java.util.Date; "id", "original_url", "actual_url", - "date_acquired", + "date", "mime_type", "size", - "more_info", - "md5", + "hash", "location", "provenance" }) @@ -31,7 +30,7 @@ public class Payload { @JsonProperty("actual_url") private String actual_url; - @JsonProperty("date_acquired") + @JsonProperty("date") private Date date_acquired; @JsonProperty("mime_type") @@ -40,28 +39,24 @@ public class Payload { @JsonProperty("size") private Long size; // In bytes. - @JsonProperty("more_info") - private String more_info; - - @JsonProperty("md5") - private String md5; + @JsonProperty("hash") + private String hash; @JsonProperty("location") private String location; @JsonProperty("provenance") - private String provenance; + private String provenance; // "crawl:" - public Payload(String id, String original_url, String actual_url, Date date_acquired, String mime_type, Long size, String more_info, String md5, String location, String provenance) { + public Payload(String id, String original_url, String actual_url, Date date_acquired, String mime_type, Long size, String hash, String location, String provenance) { this.id = id; this.original_url = original_url; this.actual_url = actual_url; this.date_acquired = date_acquired; this.mime_type = mime_type; this.size = size; - this.more_info = more_info; - this.md5 = md5; + this.hash = hash; this.location = location; this.provenance = provenance; } @@ -114,20 +109,12 @@ public class Payload { this.size = size; } - public String getMore_info() { - return more_info; + public String getHash() { + return hash; } - public void setMore_info(String more_info) { - this.more_info = more_info; - } - - public String getMd5() { - return md5; - } - - public void setMd5(String md5) { - this.md5 = md5; + public void setHash(String hash) { + this.hash = hash; } public String getLocation() { @@ -155,8 +142,7 @@ public class Payload { ", date_acquired='" + date_acquired + '\'' + ", mime_type='" + mime_type + '\'' + ", size='" + size + '\'' + - ", more_info='" + more_info + '\'' + - ", md5='" + md5 + '\'' + + ", hash='" + hash + '\'' + ", location='" + location + '\'' + ", provenance='" + provenance + '\'' + '}'; diff --git a/src/main/java/eu/openaire/urls_worker/payloads/requests/AssignmentRequest.java b/src/main/java/eu/openaire/urls_worker/payloads/requests/AssignmentRequest.java index 08f8e50..b2ec1f7 100644 --- a/src/main/java/eu/openaire/urls_worker/payloads/requests/AssignmentRequest.java +++ b/src/main/java/eu/openaire/urls_worker/payloads/requests/AssignmentRequest.java @@ -4,31 +4,46 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import eu.openaire.urls_worker.models.Assignment; +import java.util.List; + @JsonInclude(JsonInclude.Include.NON_NULL) public class AssignmentRequest { - @JsonProperty("assignment") - private Assignment assignment; + @JsonProperty("assignmentCounter") + private Long assignmentCounter; + + @JsonProperty("assignments") + private List assignments; public AssignmentRequest() { } - public AssignmentRequest(Assignment assignment) { - this.assignment = assignment; + public AssignmentRequest(Long assignmentCounter, List assignments) { + this.assignmentCounter = assignmentCounter; + this.assignments = assignments; } - public Assignment getAssignment() { - return assignment; + public Long getAssignmentCounter() { + return assignmentCounter; } - public void setAssignment(Assignment assignment) { - this.assignment = assignment; + public void setAssignmentCounter(Long assignmentCounter) { + this.assignmentCounter = assignmentCounter; + } + + public List getAssignments() { + return assignments; + } + + public void setAssignments(List assignments) { + this.assignments = assignments; } @Override public String toString() { return "AssignmentRequest{" + - "assignment=" + assignment + + "assignmentCounter=" + assignmentCounter + + ", assignments=" + assignments + '}'; } } diff --git a/src/main/java/eu/openaire/urls_worker/payloads/responces/WorkerReport.java b/src/main/java/eu/openaire/urls_worker/payloads/responces/WorkerReport.java index ed3ab8f..c7893f9 100644 --- a/src/main/java/eu/openaire/urls_worker/payloads/responces/WorkerReport.java +++ b/src/main/java/eu/openaire/urls_worker/payloads/responces/WorkerReport.java @@ -19,15 +19,15 @@ public class WorkerReport { @JsonProperty("workerId") private String workerId; - @JsonProperty("assignmentId") - private int assignmentId; + @JsonProperty("assignmentRequestCounter") + private Long assignmentRequestCounter; @JsonProperty("urlReports") private List urlReports; - public WorkerReport(String workerId, int assignmentId, List urlReports) { + public WorkerReport(String workerId, Long assignmentRequestCounter, List urlReports) { this.workerId = workerId; - this.assignmentId = assignmentId; + this.assignmentRequestCounter = assignmentRequestCounter; this.urlReports = urlReports; } @@ -39,12 +39,12 @@ public class WorkerReport { this.workerId = workerId; } - public int getAssignmentId() { - return this.assignmentId; + public Long getAssignmentRequestCounter() { + return assignmentRequestCounter; } - public void setAssignmentId(int assignmentId) { - this.assignmentId = assignmentId; + public void setAssignmentRequestCounter(Long assignmentRequestCounter) { + this.assignmentRequestCounter = assignmentRequestCounter; } public List getUrlReports() { @@ -59,7 +59,7 @@ public class WorkerReport { public String toString() { return "WorkerReport{" + "workerId='" + workerId + '\'' + - ", assignmentId=" + assignmentId + + ", assignmentRequestCounter=" + assignmentRequestCounter + ", urlReports=" + urlReports + '}'; } diff --git a/src/main/java/eu/openaire/urls_worker/payloads/responces/WorkerResponse.java b/src/main/java/eu/openaire/urls_worker/payloads/responces/WorkerResponse.java index 29c0032..e00ead6 100644 --- a/src/main/java/eu/openaire/urls_worker/payloads/responces/WorkerResponse.java +++ b/src/main/java/eu/openaire/urls_worker/payloads/responces/WorkerResponse.java @@ -8,19 +8,19 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder; @JsonInclude(JsonInclude.Include.NON_NULL) @JsonPropertyOrder({ "workerId", - "tasksLimit" + "assignmentsLimit" }) public class WorkerResponse { @JsonProperty("workerId") private String workerId; - @JsonProperty("tasksLimit") - private int tasksLimit; + @JsonProperty("assignmentsLimit") + private int assignmentsLimit; - public WorkerResponse(String workerId, int tasksLimit) { + public WorkerResponse(String workerId, int assignmentsLimit) { this.workerId = workerId; - this.tasksLimit = tasksLimit; + this.assignmentsLimit = assignmentsLimit; } public String getWorkerId() { @@ -32,18 +32,18 @@ public class WorkerResponse { } public int getTasksLimit() { - return tasksLimit; + return assignmentsLimit; } - public void setTasksLimit(int tasksLimit) { - this.tasksLimit = tasksLimit; + public void setTasksLimit(int assignmentsLimit) { + this.assignmentsLimit = assignmentsLimit; } @Override public String toString() { return "WorkerRequest{" + "id='" + workerId + '\'' + - ", tasksLimit=" + tasksLimit + + ", assignmentsLimit=" + assignmentsLimit + '}'; } } diff --git a/src/main/java/eu/openaire/urls_worker/plugins/publicationsRetriever/PublicationsRetrieverPlugin.java b/src/main/java/eu/openaire/urls_worker/plugins/publicationsRetriever/PublicationsRetrieverPlugin.java index eb83ea8..a8a5877 100644 --- a/src/main/java/eu/openaire/urls_worker/plugins/publicationsRetriever/PublicationsRetrieverPlugin.java +++ b/src/main/java/eu/openaire/urls_worker/plugins/publicationsRetriever/PublicationsRetrieverPlugin.java @@ -10,6 +10,7 @@ import eu.openaire.publications_retriever.util.http.HttpConnUtils; import eu.openaire.publications_retriever.util.url.DataToBeLogged; import eu.openaire.publications_retriever.util.url.LoaderAndChecker; import eu.openaire.publications_retriever.util.url.UrlUtils; +import eu.openaire.urls_worker.models.Assignment; import eu.openaire.urls_worker.models.Payload; import eu.openaire.urls_worker.models.Task; import eu.openaire.urls_worker.models.UrlReport; @@ -57,25 +58,25 @@ public class PublicationsRetrieverPlugin { } - public static void processTasks(int assignmentId, Collection tasks) throws RuntimeException, FileNotFoundException + public static void processAssginmets(Long assignmentRequestCounter, Collection assignments) throws RuntimeException, FileNotFoundException { ConnSupportUtils.setKnownMimeTypes(); - FileUtils.storeDocFilesDir = assignmentsBaseFullTextsPath + "assignment_" + assignmentId + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored. + FileUtils.storeDocFilesDir = assignmentsBaseFullTextsPath + "assignment_" + assignmentRequestCounter + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored. - FileUtils.setOutput(new FileOutputStream(assignmentsBasePath + "assignment_" + assignmentId + "_generic_results.json")); + FileUtils.setOutput(new FileOutputStream(assignmentsBasePath + "assignment_" + assignmentRequestCounter + "_generic_results.json")); - int tasksSize = tasks.size(); + int tasksSize = assignments.size(); int batchCount = 0; int taskCount = 0; List> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize); // Start loading and checking urls. - for ( Task task : tasks ) + for ( Assignment assignment : assignments ) { callableTasks.add(() -> { - String id = task.getId(); - String url = task.getUrl(); + String id = assignment.getId(); + String url = assignment.getOriginalUrl(); if ( (url = LoaderAndChecker.handleUrlChecks(id, url)) == null ) { return false; @@ -133,7 +134,7 @@ public class PublicationsRetrieverPlugin { { for ( DataToBeLogged data : FileUtils.dataToBeLoggedList ) { - String status = null, fileLocation = null, md5 = null; + String status = null, fileLocation = null, hash = null; Long size = null; if ( data.getWasDocumentOrDatasetAccessible().equals("true") ) { @@ -158,14 +159,14 @@ public class PublicationsRetrieverPlugin { try { File docFile = new File(fileLocation); if ( docFile.isFile() ) { - md5 = Files.hash(docFile, Hashing.md5()).toString(); // These hashing functions are deprecated, but just to inform us that MD5 is not secure. Luckily, we use MD5 just to identify duplicate files. - //logger.debug("MD5 for file \"" + docFile.getName() + "\": " + md5); // DEBUG! + hash = Files.hash(docFile, Hashing.md5()).toString(); // These hashing functions are deprecated, but just to inform us that MD5 is not secure. Luckily, we use MD5 just to identify duplicate files. + //logger.debug("MD5 for file \"" + docFile.getName() + "\": " + hash); // DEBUG! size = java.nio.file.Files.size(Paths.get(fileLocation)); //logger.debug("Size of file \"" + docFile.getName() + "\": " + size); // DEBUG! } else logger.error("No file was found with path: " + fileLocation); } catch (Exception e) { - if ( md5 == null ) + if ( hash == null ) logger.error("Could not retrieve the MD5-hash for the file: " + fileLocation); if ( size == null ) @@ -178,7 +179,7 @@ public class PublicationsRetrieverPlugin { } else status = "non-accessible"; - Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), data.getDocOrDatasetUrl(), new Date(), "application/pdf", size, "more_info", md5, fileLocation, null); + Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), data.getDocOrDatasetUrl(), new Date(), "application/pdf", size, hash, fileLocation, "crawl:PublicationsRetriever"); AssignmentHandler.urlReports.add(new UrlReport(status, payload)); } FileUtils.dataToBeLoggedList.clear(); // Empty the list, to be re-populated by the next batch / assignment. diff --git a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java index bb155b7..ca75035 100644 --- a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java +++ b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import eu.openaire.urls_worker.models.Assignment; -import eu.openaire.urls_worker.models.Task; import eu.openaire.urls_worker.models.UrlReport; import eu.openaire.urls_worker.payloads.requests.AssignmentRequest; import eu.openaire.urls_worker.payloads.responces.WorkerReport; @@ -29,10 +28,10 @@ public class AssignmentHandler { public static List urlReports = null; - public static Assignment requestAssignment() + public static AssignmentRequest requestAssignments() { RestTemplate restTemplate = new RestTemplateBuilder().build(); - String url = "http://localhost:8080/api/urls/test?workerId=" + WorkerConstants.WORKER_ID + "&tasksLimit=" + WorkerConstants.TASKS_LIMIT; + String url = "http://localhost:8080/api/urls/test?workerId=" + WorkerConstants.WORKER_ID + "&assignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT; String json = null; try { json = restTemplate.getForObject(url, String.class); @@ -58,37 +57,38 @@ public class AssignmentHandler { //logger.debug(assignmentRequest.toString()); // DEBUG! - Assignment assignment = assignmentRequest.getAssignment(); - logger.info("Assignment with id < " + assignment.getAssignmentId() + " > was received and it's ready to be processed. It contains " + assignment.getTasks().size() + " tasks."); + logger.info("AssignmentRequest < " + assignmentRequest.getAssignmentCounter() + " > was received and it's ready to be processed. It contains " + assignmentRequest.getAssignments().size() + " tasks."); // TODO - Maybe create a HashSet with these IDs. It may be useful for the Worker to know and report which assignments (and how many) it has processed. - return assignment; + return assignmentRequest; } - public static void handleAssignment() + public static void handleAssignments() { - Assignment assignment = requestAssignment(); - if ( assignment == null ) { - logger.warn("The assignment was found to be null!"); + AssignmentRequest assignmentRequest = requestAssignments(); + + Long assignmentRequestCounter = assignmentRequest.getAssignmentCounter(); + + List assignments = assignmentRequest.getAssignments(); + if ( assignments == null ) { + logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter); return; } - // Start handling the assignment, the worker is busy. + // Start handling the assignments, the worker is busy. isAvailableForWork = false; - List tasks = assignment.getTasks(); - // Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later. - Multimap tasksForPlugins = HashMultimap.create(); + Multimap assignmentsForPlugins = HashMultimap.create(); - for ( Task task : tasks ) { + for ( Assignment assignment : assignments ) { // Add each task in its own HashSet. - tasksForPlugins.put(task.getDatasource().getId(), task); + assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment); } - urlReports = new ArrayList<>(tasks.size()); // Define the new UrlReport-list. It is reinitialized each time. + urlReports = new ArrayList<>(assignments.size()); // Define the new UrlReport-list. It is reinitialized each time. // TODO - Decide which tasks run with what plugin (depending on their datasource). @@ -99,31 +99,31 @@ public class AssignmentHandler { // For now, let's just run all tasks in the generic plugin. try { - PublicationsRetrieverPlugin.processTasks(assignment.getAssignmentId(), tasksForPlugins.values()); + PublicationsRetrieverPlugin.processAssginmets(assignmentRequestCounter, assignmentsForPlugins.values()); } catch (Exception e) { logger.error(e.getMessage(), e); } - postWorkerReport(assignment.getAssignmentId()); + postWorkerReport(assignmentRequestCounter); isAvailableForWork = true; } - public static boolean postWorkerReport(int assignmentId) + public static boolean postWorkerReport(Long assignmentRequestCounter) { RestTemplate restTemplate = new RestTemplateBuilder().build(); String url = "http://localhost:8080/api/urls/addWorkerReport"; try { - ResponseEntity responseEntity = restTemplate.postForEntity(url, new WorkerReport(WorkerConstants.WORKER_ID, assignmentId, urlReports), String.class); + ResponseEntity responseEntity = restTemplate.postForEntity(url, new WorkerReport(WorkerConstants.WORKER_ID, assignmentRequestCounter, urlReports), String.class); int responseCode = responseEntity.getStatusCode().value(); if ( responseCode != HttpStatus.OK.value() ) { - logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentId + " to the Controller. Error-code was: " + responseCode); + logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode); return false; } } catch (Exception e) { - logger.error("Error when submitting the WorkerReport of assignment_" + assignmentId + " to the Controller: ", e); + logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e); return false; } diff --git a/src/main/java/eu/openaire/urls_worker/util/WorkerConstants.java b/src/main/java/eu/openaire/urls_worker/util/WorkerConstants.java index b5e49a5..cc1728f 100644 --- a/src/main/java/eu/openaire/urls_worker/util/WorkerConstants.java +++ b/src/main/java/eu/openaire/urls_worker/util/WorkerConstants.java @@ -3,6 +3,6 @@ package eu.openaire.urls_worker.util; public interface WorkerConstants { String WORKER_ID = "worker_1"; // This should be different for every deployment of a Worker. - int TASKS_LIMIT = 10000; + int ASSIGNMENTS_LIMIT = 10000; }