From 61597d162766d9d521a7ec5b72d16bf05d598c5f Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Tue, 21 Sep 2021 16:21:39 +0300 Subject: [PATCH] - Read the Controller's url from a file, when starting the Application. - Switch the "AssignmentsHandler.askForTest" to "false". - Get the size and the hash of a docFile which is previously downloaded by another ID in that batch. - Reset the "AssignmentHandler.urlReports" list after posting the results to the Controller. - Enhance logging and comments. - Add more guidelines in the README. - Disable the scheduled test-live job. - Code cleanup. --- README.md | 16 +++-- .../urls_worker/UrlsWorkerApplication.java | 49 ++++++++++++++ .../components/ScheduledTasks.java | 4 +- .../controllers/GeneralController.java | 3 +- .../eu/openaire/urls_worker/models/Task.java | 65 ------------------- .../plugins/PublicationsRetrieverPlugin.java | 19 ++++-- .../urls_worker/util/AssignmentHandler.java | 32 +++++---- src/main/resources/logback-spring.xml | 2 +- 8 files changed, 98 insertions(+), 92 deletions(-) delete mode 100644 src/main/java/eu/openaire/urls_worker/models/Task.java diff --git a/README.md b/README.md index 303a150..6e8cc73 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,15 @@ -# Urls_Worker +# UrlsWorker This is the Worker's Application.
It requests assignments from the [controller](https://code-repo.d4science.org/lsmyrnaios/UrlsController) and processes them.
-[...]
- -To install and run the application, run ```git clone``` and then execute the ```installAndRun.sh``` script.
-That script, installs the [PublicationsRetriever](https://github.com/LSmyrnaios/PublicationsRetriever), as a library and then compiles and runs the whole Application.
+It posts the results to the controller, which in turn, puts them in a database.
+
+ +To install and run the application: +- Run ```git clone``` and then ```cd UrlsWorker```. +- Create the file ```controllerBaseUrl.txt``` which contains just one line with the controller's base api-url, for example: ```http://IP:PORT/api/```. +- Execute the ```installAndRun.sh``` script.
+ +That script, installs the [PublicationsRetriever](https://github.com/LSmyrnaios/PublicationsRetriever), as a library and then compiles and runs the whole Application.
+If you want to just run the app, then run the script with the argument "1": ```./installAndRun.sh 1```.

diff --git a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java index 2481066..d2ce51d 100644 --- a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java +++ b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java @@ -1,6 +1,7 @@ package eu.openaire.urls_worker; import eu.openaire.publications_retriever.PublicationsRetriever; +import eu.openaire.publications_retriever.util.file.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; @@ -8,6 +9,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; import javax.annotation.PreDestroy; +import java.io.File; +import java.util.Scanner; import java.util.concurrent.TimeUnit; @@ -17,7 +20,13 @@ public class UrlsWorkerApplication { private static final Logger logger = LoggerFactory.getLogger(UrlsWorkerApplication.class); + private static final String controllerBaseUrlFilePath = FileUtils.workingDir + "controllerBaseUrl.txt"; + public static String controllerBaseUrl = null; // BaseUrl template: "http://IP:PORT/api/" + public static void main(String[] args) { + + setControllerBaseUrl(); // This may cause the Server to terminate early, in case the controllerBaseUrl cannot be found. + SpringApplication.run(UrlsWorkerApplication.class, args); } @@ -38,4 +47,44 @@ public class UrlsWorkerApplication { } } + + private static void setControllerBaseUrl() + { + // Take the controllerBaseUrl from the file. + Scanner myReader = null; + try { + File controllerBaseUrlFile = new File(controllerBaseUrlFilePath); + if ( !controllerBaseUrlFile.exists() ) { + String errorMsg = "controllerBaseUrlFile \"" + controllerBaseUrlFilePath + "\" does not exists!"; + logger.error(errorMsg); + System.err.println(errorMsg); + System.exit(60); + } + + myReader = new Scanner(controllerBaseUrlFile); + if ( !myReader.hasNextLine() ) { + String errorMsg = "The controllerBaseUrlFile is empty! No WorkerReports can be sent from this worker! Exiting.."; + logger.error(errorMsg); + System.err.println(errorMsg); + System.exit(61); + } + + controllerBaseUrl = myReader.nextLine().trim(); + if ( !controllerBaseUrl.endsWith("/") ) + controllerBaseUrl += "/"; // Make sure the whole urls will not break later. + + logger.info("The controllerBaseUrl is: " + controllerBaseUrl); + + } catch (Exception e) { + String errorMsg = "An error prevented the retrieval of the controllerBaseUrl from the file: " + controllerBaseUrlFilePath + "\n" + e.getMessage(); + logger.error(errorMsg); + System.err.println(errorMsg); + e.printStackTrace(); + System.exit(62); + } finally { + if ( myReader != null ) + myReader.close(); + } + } + } diff --git a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java index dacc379..a3112f5 100644 --- a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java @@ -18,7 +18,7 @@ public class ScheduledTasks { private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); - @Scheduled(fixedRate = 600_000) // Every 10 mins: 600_000 + //@Scheduled(fixedRate = 600_000) // Every 10 mins: 600_000 public void reportCurrentTime() { logger.info("Server is live! Time is now {}", dateFormat.format(new Date())); } @@ -27,6 +27,8 @@ public class ScheduledTasks { public void handleAssignment() { if ( AssignmentHandler.isAvailableForWork ) AssignmentHandler.handleAssignments(); + else + logger.debug("The worker is not available for work at the moment.."); } //@Scheduled(fixedRate = 20_000) // Every 20 secs. diff --git a/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java b/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java index 06a3b69..d6b0416 100644 --- a/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java +++ b/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java @@ -17,8 +17,7 @@ public class GeneralController { private static final Logger logger = LoggerFactory.getLogger(GeneralController.class); - public GeneralController() { - } + public GeneralController() {} @GetMapping("isAlive") public ResponseEntity isWorkerAlive() { diff --git a/src/main/java/eu/openaire/urls_worker/models/Task.java b/src/main/java/eu/openaire/urls_worker/models/Task.java deleted file mode 100644 index f2ae0e6..0000000 --- a/src/main/java/eu/openaire/urls_worker/models/Task.java +++ /dev/null @@ -1,65 +0,0 @@ -package eu.openaire.urls_worker.models; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonPropertyOrder; - - -@JsonInclude(JsonInclude.Include.NON_NULL) -@JsonPropertyOrder({ - "id", - "url", - "datasource" -}) -public class Task { - - @JsonProperty("id") - private String id; - - @JsonProperty("url") - private String url; - - @JsonProperty("datasource") - private Datasource datasource; - - public Task() {} - - public Task(String id, String url, Datasource datasource) { - this.id = id; - this.url = url; - this.datasource = datasource; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getUrl() { - return url; - } - - public void setUrl(String url) { - this.url = url; - } - - public Datasource getDatasource() { - return datasource; - } - - public void setDatasource(Datasource datasource) { - this.datasource = datasource; - } - - @Override - public String toString() { - return "Task{" + - "id='" + id + '\'' + - ", url='" + url + '\'' + - ", datasource=" + datasource + - '}'; - } -} diff --git a/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java b/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java index e956ebb..a9591ee 100644 --- a/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java +++ b/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java @@ -70,9 +70,9 @@ public class PublicationsRetrieverPlugin { FileUtils.setOutput(new FileOutputStream(assignmentsBasePath + "assignment_" + assignmentRequestCounter + "_generic_results.json")); - int tasksSize = assignments.size(); + int tasksNumber = assignments.size(); int batchCount = 0; - int taskCount = 0; + int tasksCount = 0; List> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize); // Start loading and checking urls. @@ -121,9 +121,10 @@ public class PublicationsRetrieverPlugin { return true; }); - if ( ((++taskCount) >= FileUtils.jsonBatchSize) || (taskCount >= tasksSize) ) + // Invoke the tasks every time we reach the "jsonBatchSize" tasks, or we are at the end of the list. + if ( ((++tasksCount) >= FileUtils.jsonBatchSize) || (tasksCount >= tasksNumber) ) { - logger.info("Batch counter: " + (++batchCount) + " | progress: " + PublicationsRetriever.df.format(((batchCount-1) * taskCount) * 100.0 / tasksSize) + "% | every batch contains " + FileUtils.jsonBatchSize + " id-url pairs."); + logger.info("Batch counter: " + (++batchCount) + " | progress: " + PublicationsRetriever.df.format(((batchCount-1) * tasksCount) * 100.0 / tasksNumber) + "% | every batch contains " + FileUtils.jsonBatchSize + " id-url pairs."); LoaderAndChecker.invokeAllTasksAndWait(callableTasks); addUrlReportsToWorkerReport(); callableTasks.clear(); // Reset the thread-tasks-list for the next batch. @@ -138,7 +139,8 @@ public class PublicationsRetrieverPlugin { { for ( DataToBeLogged data : FileUtils.dataToBeLoggedList ) { - String status = null, fileLocation = null, comment = data.getComment(), mimeType = null; + String status = null, fileLocation = null, comment = data.getComment(), mimeType = null, hash = data.getHash(); + Long size = data.getSize(); Error error = null; if ( data.getWasDocumentOrDatasetAccessible().equals("true") ) @@ -152,10 +154,13 @@ public class PublicationsRetrieverPlugin { for ( DataToBeLogged data_2 : FileUtils.dataToBeLoggedList ) { if ( data_2.getUrlId().equals(previousId) && data_2.getWasDocumentOrDatasetAccessible().equals("true") ) { fileLocation = data_2.getComment(); - mimeType = "application/pdf"; + size = data_2.getSize(); + hash = data_2.getHash(); + mimeType = "application/pdf"; // TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified. break; } } + // TODO - The case where the "twin-ID" is not found, should "never" happen. But should we check? How to handle if that is the case..? } else if ( comment.contains(DocFileNotRetrievedExceptionName) ) fileLocation = "File not retrieved"; @@ -177,7 +182,7 @@ public class PublicationsRetrieverPlugin { if ( docOrDatasetUrl.equals(UrlUtils.unreachableDocOrDatasetUrlIndicator) || docOrDatasetUrl.equals(UrlUtils.duplicateUrlIndicator) ) docOrDatasetUrl = null; - Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), docOrDatasetUrl, new Date(), mimeType, data.getSize(), data.getHash(), fileLocation, "crawl:PublicationsRetriever"); + Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), docOrDatasetUrl, new Date(), mimeType, size, hash, fileLocation, "crawl:PublicationsRetriever"); // TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified. AssignmentHandler.urlReports.add(new UrlReport(status, payload, error)); diff --git a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java index 562dd10..f6ebe84 100644 --- a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java +++ b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import eu.openaire.urls_worker.UrlsWorkerApplication; import eu.openaire.urls_worker.models.Assignment; import eu.openaire.urls_worker.models.UrlReport; import eu.openaire.urls_worker.payloads.requests.AssignmentRequest; @@ -14,7 +15,6 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.web.client.RestTemplate; import java.util.*; @@ -27,25 +27,25 @@ public class AssignmentHandler { public static List urlReports = null; - private static final boolean askForTest = true; // Enable this only for testing. + private static final boolean askForTest = false; // Enable this only for testing. public static AssignmentRequest requestAssignments() { - RestTemplate restTemplate = new RestTemplateBuilder().build(); - String url = "http://localhost:1880/api/urls" + (askForTest ? "/test" : "") + "?workerId=" + WorkerConstants.WORKER_ID + "&workerAssignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT; + String requestUrl = UrlsWorkerApplication.controllerBaseUrl + "urls" + (askForTest ? "/test" : "") + "?workerId=" + WorkerConstants.WORKER_ID + "&workerAssignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT; + logger.info("Going to request assignments from the controller-server: " + requestUrl); + String json = null; try { - json = restTemplate.getForObject(url, String.class); + json = new RestTemplateBuilder().build().getForObject(requestUrl, String.class); } catch (Exception e) { - logger.error("Could not retrieve the assignment: " + e.getMessage()); + logger.error("Could not retrieve the assignments!\n" + e.getMessage()); return null; } AssignmentRequest assignmentRequest = null; - ObjectMapper objectMapper = new ObjectMapper(); try { - assignmentRequest = objectMapper.readValue(json, AssignmentRequest.class); + assignmentRequest = new ObjectMapper().readValue(json, AssignmentRequest.class); } catch (JsonProcessingException e) { e.printStackTrace(); } @@ -111,17 +111,25 @@ public class AssignmentHandler { isAvailableForWork = true; // State this before posting, to catch the soonest next scheduled request. + /*logger.debug("UrlReports:"); // DEBUG! + for ( UrlReport urlReport : urlReports ) { + logger.debug(urlReport.toString()); + }*/ + postWorkerReport(assignmentRequestCounter); + + // Note: Cannot call this method here retrospectively, as if it runs 100s of times, the memory may break.. + // The scheduler will handle calling it every half an hour, in case the Worker is available for work.. } public static boolean postWorkerReport(Long assignmentRequestCounter) { - RestTemplate restTemplate = new RestTemplateBuilder().build(); - String url = "http://localhost:1880/api/urls/addWorkerReport"; + String postUrl = UrlsWorkerApplication.controllerBaseUrl + "urls/addWorkerReport"; + logger.info("Going to post the WorkerReport to the controller-server: " + postUrl); try { - ResponseEntity responseEntity = restTemplate.postForEntity(url, new WorkerReport(WorkerConstants.WORKER_ID, assignmentRequestCounter, urlReports), String.class); + ResponseEntity responseEntity = new RestTemplateBuilder().build().postForEntity(postUrl, new WorkerReport(WorkerConstants.WORKER_ID, assignmentRequestCounter, urlReports), String.class); int responseCode = responseEntity.getStatusCode().value(); if ( responseCode != HttpStatus.OK.value() ) { logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode); @@ -130,6 +138,8 @@ public class AssignmentHandler { } catch (Exception e) { logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e); return false; + } finally { + AssignmentHandler.urlReports.clear(); // Reset the UrlReports list for the next assignments-handling. } return true; diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index d490c59..a92c683 100644 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -23,7 +23,7 @@ - +