From c7bfd759732c9bd9244e48dbae0d9843cb56d92b Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Tue, 23 May 2023 14:57:15 +0300 Subject: [PATCH] - Add the "getWorkersInfo" endpoint. - Improve startup speed, by using a faster remote server to get the host's machine public IP. This also reduces the risk of not being able to get the public IP at all. - Fix the detection of a different IP for a known worker. - Improve documentation. --- README.md | 15 +++++++++------ .../controllers/GeneralController.java | 10 ++++++++++ .../controllers/UrlsController.java | 3 +-- .../openaire/urls_controller/util/UriBuilder.java | 2 +- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 899e4af..693bcd5 100644 --- a/README.md +++ b/README.md @@ -2,17 +2,20 @@ The Controller's Application receives requests coming from the [Workers](https://code-repo.d4science.org/lsmyrnaios/UrlsWorker) , constructs an assignments-list with data received from a database and returns the list to the workers.
Then, it receives the "WorkerReports", it requests the full-texts from the workers, in batches, and uploads them on the S3-Object-Store. Finally, it writes the related reports, along with the updated file-locations into the database.
-The database used is the [Impala](https://impala.apache.org/).
+
+It can also process **Bulk-Import** requests, from compatible data sources, in which case it receives the full-text files immediately, without offloading crawling jobs to Workers.
+
+For interacting with the database we use [Impala](https://impala.apache.org/).

**Statistics API**: -- "**getNumberOfAllPayloads**" endpoint: **http://:/api/stats/getNumberOfAllPayloads**
+- "**getNumberOfAllPayloads**" endpoint: **http://\:/api/stats/getNumberOfAllPayloads**
This endpoint returns the total number of payloads existing in the database, independently of the way they were aggregated. This includes the payloads created by other pieces of software, before the PDF-Aggregation-Service was created. -- "**getNumberOfPayloadsAggregatedByService**" endpoint: **http://:/api/stats/getNumberOfPayloadsAggregatedByService**
+- "**getNumberOfPayloadsAggregatedByService**" endpoint: **http://\:/api/stats/getNumberOfPayloadsAggregatedByService**
This endpoint returns the number of payloads aggregated by the PDF-Aggregated-Service itself. It excludes the payloads aggregated by other methods, by applying a Date-filter for the records created in 2021 or later. -- "**getNumberOfPayloadsForDatasource**" endpoint: **http://:/api/stats/getNumberOfPayloadsForDatasource?datasourceId="givenDatasourceId"**
+- "**getNumberOfPayloadsForDatasource**" endpoint: **http://\:/api/stats/getNumberOfPayloadsForDatasource?datasourceId=\**
This endpoint returns the number of payloads which belong to the datasource specified by the given datasourceID. -- "**getNumberOfRecordsInspected**" endpoint: **http://:/api/stats/getNumberOfRecordsInspected**
+- "**getNumberOfRecordsInspected**" endpoint: **http://\:/api/stats/getNumberOfRecordsInspected**
This endpoint returns the number of records inspected by the PDF-Aggregation-Service.

@@ -27,4 +30,4 @@ If you want to build and run the app on a **Docker Container**, then run the scr Implementation notes: - For transferring the full-text files, we use Facebook's [**Zstandard**](https://facebook.github.io/zstd/) compression algorithm, which brings very big benefits in compression rate and speed. -- The uploaded full-text files follow this naming-scheme: "**datasourceID/recordId::fileHash.pdf**" +- The uploaded full-text files follow this naming-scheme: "**datasourceID/recordID::fileHash.pdf**" diff --git a/src/main/java/eu/openaire/urls_controller/controllers/GeneralController.java b/src/main/java/eu/openaire/urls_controller/controllers/GeneralController.java index bbd1909..af3b067 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/GeneralController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/GeneralController.java @@ -20,4 +20,14 @@ public class GeneralController { return ResponseEntity.ok().build(); } + + @GetMapping("getWorkersInfo") + public ResponseEntity getWorkersInfo() + { + if ( UrlsController.workersInfoMap.isEmpty() ) + return ResponseEntity.noContent().build(); + + return ResponseEntity.ok(UrlsController.workersInfoMap); + } + } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index 109dc2a..2853380 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -36,7 +36,6 @@ public class UrlsController { @Value("${services.pdfaggregation.controller.assignmentLimit}") private int assignmentLimit; - public static final ConcurrentHashMap workersInfoMap = new ConcurrentHashMap<>(6); @@ -88,7 +87,7 @@ public class UrlsController { WorkerInfo workerInfo = workersInfoMap.get(workerId); if ( workerInfo != null ) { // This worker has already been identified. String savedWorkerIp = workerInfo.getWorkerIP(); - if ( savedWorkerIp.equals(remoteAddr) ) { + if ( !savedWorkerIp.equals(remoteAddr) ) { logger.warn("The worker with id \"" + workerId + "\" has changed IP from \"" + savedWorkerIp + "\" to \"" + remoteAddr + "\"."); workerInfo.setWorkerIP(remoteAddr); // Set the new IP. The update will be reflected in the map. } // In this case, the worker may has previously informed the Controller it has shutdown or it may have crashed. diff --git a/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java b/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java index 67a0255..53e32f2 100644 --- a/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java +++ b/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java @@ -55,7 +55,7 @@ public class UriBuilder { { String publicIpAddress = ""; HttpURLConnection conn = null; - String urlString = "https://api.ipify.org/"; + String urlString = "https://checkip.amazonaws.com/"; try { conn = (HttpURLConnection) new URL(urlString).openConnection(); conn.setConnectTimeout(60_000); // 1 minute