diff --git a/build.gradle b/build.gradle index 89e1fb4..994c6b3 100644 --- a/build.gradle +++ b/build.gradle @@ -1,12 +1,5 @@ -buildscript { - ext { - springBootVersion = "2.5.5" - springSecurityVersion = "5.5.2" - } -} - plugins { - id 'org.springframework.boot' version '2.5.5' + id 'org.springframework.boot' version '2.5.6' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } @@ -25,14 +18,14 @@ repositories { dependencies { - runtimeOnly "org.springframework.boot:spring-boot-devtools:${springBootVersion}" + runtimeOnly "org.springframework.boot:spring-boot-devtools" - implementation("org.springframework.boot:spring-boot-starter-web:${springBootVersion}") - implementation("org.springframework.boot:spring-boot-starter-security:${springBootVersion}") - implementation("org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}") - implementation("org.springframework.security:spring-security-core:${springSecurityVersion}") - implementation("org.springframework.security:spring-security-web:${springSecurityVersion}") - implementation("org.springframework.security:spring-security-config:${springSecurityVersion}") + implementation("org.springframework.boot:spring-boot-starter-web") + implementation("org.springframework.boot:spring-boot-starter-security") + implementation("org.springframework.boot:spring-boot-configuration-processor") + implementation("org.springframework.security:spring-security-core") + implementation("org.springframework.security:spring-security-web") + implementation("org.springframework.security:spring-security-config") //implementation("io.jsonwebtoken:jjwt:0.9.1") // Use this in case we use auth-tokens later on. implementation "org.projectlombok:lombok:1.18.22" @@ -46,8 +39,8 @@ dependencies { exclude group: 'org.slf4j', module: 'slf4j-api' } - testImplementation group: 'org.springframework.security', name: 'spring-security-test', version: springSecurityVersion - testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}" + testImplementation group: 'org.springframework.security', name: 'spring-security-test' + testImplementation "org.springframework.boot:spring-boot-starter-test" } configurations { diff --git a/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java b/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java index 122b854..cd933f7 100644 --- a/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java +++ b/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java @@ -15,6 +15,7 @@ import eu.openaire.urls_worker.models.Error; import eu.openaire.urls_worker.models.Payload; import eu.openaire.urls_worker.models.UrlReport; import eu.openaire.urls_worker.util.AssignmentHandler; +import eu.openaire.urls_worker.util.WorkerConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,7 @@ public class PublicationsRetrieverPlugin { FileUtils.shouldDownloadDocFiles = true; FileUtils.docFileNameType = FileUtils.DocFileNameType.idName; PublicationsRetriever.targetUrlType = "docUrl"; + FileUtils.jsonBatchSize = WorkerConstants.ASSIGNMENTS_LIMIT; FileUtils.shouldUploadFilesToS3 = true; new S3ObjectStoreMinIO(); // Check here on how to create the credentials-file: https://github.com/LSmyrnaios/PublicationsRetriever/blob/master/README.md @@ -60,6 +62,7 @@ public class PublicationsRetrieverPlugin { PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount); } + private static final List> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize); public static void processAssignments(Long assignmentRequestCounter, Collection assignments) throws RuntimeException, FileNotFoundException { @@ -70,7 +73,6 @@ public class PublicationsRetrieverPlugin { int tasksNumber = assignments.size(); int batchCount = 0; int tasksCount = 0; - List> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize); // Start loading and checking urls. for ( Assignment assignment : assignments ) @@ -119,9 +121,10 @@ public class PublicationsRetrieverPlugin { }); // Invoke the tasks every time we reach the "jsonBatchSize" tasks, or we are at the end of the list. - if ( ((++tasksCount) >= FileUtils.jsonBatchSize) || (tasksCount >= tasksNumber) ) + tasksCount ++; + if ( (tasksCount == FileUtils.jsonBatchSize) || (tasksCount == tasksNumber) ) { - logger.info("Batch counter: " + (++batchCount) + " | progress: " + PublicationsRetriever.df.format(((batchCount-1) * tasksCount) * 100.0 / tasksNumber) + "% | every batch contains " + FileUtils.jsonBatchSize + " id-url pairs."); + logger.info("Batch counter: " + (++batchCount) + " | progress: " + PublicationsRetriever.df.format((batchCount * tasksCount) * 100.0 / tasksNumber) + "% | every batch contains " + FileUtils.jsonBatchSize + " id-url pairs."); LoaderAndChecker.invokeAllTasksAndWait(callableTasks); addUrlReportsToWorkerReport(); callableTasks.clear(); // Reset the thread-tasks-list for the next batch. diff --git a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java index fccc3f9..daa1aef 100644 --- a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java +++ b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java @@ -1,7 +1,5 @@ package eu.openaire.urls_worker.util; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import eu.openaire.urls_worker.UrlsWorkerApplication; @@ -15,12 +13,9 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.http.client.ClientHttpResponse; -import org.springframework.web.client.ResponseErrorHandler; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; -import java.io.IOException; import java.time.Duration; import java.util.*; @@ -30,72 +25,28 @@ public class AssignmentHandler { private static final Logger logger = LoggerFactory.getLogger(AssignmentHandler.class); public static boolean isAvailableForWork = true; - public static List urlReports = null; + public static final List urlReports = new ArrayList<>(WorkerConstants.ASSIGNMENTS_LIMIT); + private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments. + private static final int expectedAssignmentsPerDatasource = (WorkerConstants.ASSIGNMENTS_LIMIT / expectedDatasourcesPerRequest); + public static final Multimap assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource); private static final boolean askForTest = false; // Enable this only for testing. private static final Duration requestConnectTimeoutDuration = Duration.ofMinutes(1); // 1 minute. private static final Duration requestReadTimeoutDuration = Duration.ofMinutes(60); // 60 minutes. Time to wait for the data to get transferred over the network. Many workers may try to get assignments from the Worker, so each worker might have to wait some 10s of minutes for work. // The controller has to retrieve the data from the database, then prepare them in memory, insert them in the "assignment"-table and, finally, return them to the worker. - private static boolean encounteredHTTPRequestError = false; + public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build(); public static AssignmentRequest requestAssignments() { String requestUrl = UrlsWorkerApplication.controllerBaseUrl + "urls" + (askForTest ? "/test" : "") + "?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT; logger.info("Going to request assignments from the controller-server: " + requestUrl); - RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build(); - - ResponseErrorHandler responseErrorHandler = new ResponseErrorHandler() { - @Override - public boolean hasError(ClientHttpResponse response) throws IOException { - if ( response.getRawStatusCode() != 200 ) { - encounteredHTTPRequestError = true; - return true; - } else { - encounteredHTTPRequestError = false; // Make sure the value is reset here, to avoid non-present errors from previous requests. - return false; - } - } - - /** - * This is the HTTP-error-handler. - * */ - @Override - public void handleError(ClientHttpResponse response) throws IOException { - int responseCode = response.getRawStatusCode(); - String statusText = response.getStatusText(); - String extraBodyMessage = "\n" + response.getBody(); - String additionalErrorMessage = ((!"".equals(statusText)) ? statusText + extraBodyMessage : "The HTTP-response-code was: " + responseCode + extraBodyMessage); - if ( (responseCode >= 500) && responseCode <= 599 ) - logger.error("The Controller encountered a problem! " + additionalErrorMessage); - else if ( (responseCode >= 400) && (responseCode <= 499) ) - logger.error("There was a bad request to the Controller! " + additionalErrorMessage); - else if ( responseCode != 200 ) - logger.error("There was an HTTP-error when requesting the assignments from the Controller! " + additionalErrorMessage); - } - }; - restTemplate.setErrorHandler(responseErrorHandler); - - String json = null; - try { // Here, the HTTP-request is executed. - json = restTemplate.getForObject(requestUrl, String.class); - } catch (RestClientException e) { - logger.error("Could not retrieve the assignments!\n" + e.getMessage()); - return null; - } - - if ( encounteredHTTPRequestError ) - return null; - AssignmentRequest assignmentRequest = null; - try { - assignmentRequest = new ObjectMapper().readValue(json, AssignmentRequest.class); - } catch (JsonProcessingException jpe) { - String errorMessage = "Could not process/map the json to an \"assignmentRequest\"!\n" + jpe.getMessage(); - logger.error(errorMessage); - System.err.println(errorMessage); - jpe.printStackTrace(); + try { // Here, the HTTP-request is executed. + assignmentRequest = restTemplate.getForObject(requestUrl, AssignmentRequest.class); + } catch (RestClientException rce) { + logger.error("Could not retrieve the assignments!\n" + rce.getMessage()); // It shows the response body (after Spring v.2.5.6). return null; } @@ -107,10 +58,8 @@ public class AssignmentHandler { public static void handleAssignments() { AssignmentRequest assignmentRequest = requestAssignments(); - if ( assignmentRequest == null ) { - logger.error("The \"assignmentRequest\" was \"null\"!"); + if ( assignmentRequest == null ) return; - } Long assignmentRequestCounter = assignmentRequest.getAssignmentCounter(); List assignments = assignmentRequest.getAssignments(); @@ -130,16 +79,18 @@ public class AssignmentHandler { // Start handling the assignments, the worker is busy. isAvailableForWork = false; - // Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later. - - Multimap assignmentsForPlugins = HashMultimap.create(); + // Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later. for ( Assignment assignment : assignments ) { // Add each task in its own HashSet. - assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment); + try { + assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment); + } catch (NullPointerException npe) { + logger.warn("An NPE was thrown when splitting the assignments based on the datasource-types. The assignment was: " + assignment); // Do not use "assignment.toString()", it may cause an NPE. + } } - urlReports = new ArrayList<>(assignments.size()); // Define the new UrlReport-list. It is reinitialized each time. + //countDatasourcesAndRecords(assignmentsSize); // Only for DEBUG! Keep it commented in normal run. // TODO - Decide which tasks run with what plugin (depending on their datasource). @@ -155,8 +106,6 @@ public class AssignmentHandler { logger.error(e.getMessage(), e); } - isAvailableForWork = true; // State this before posting, to catch the soonest next scheduled request. - if ( askForTest ) { logger.debug("UrlReports:"); // DEBUG! for ( UrlReport urlReport : urlReports ) @@ -165,6 +114,8 @@ public class AssignmentHandler { else postWorkerReport(assignmentRequestCounter); + isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run. + // Note: Cannot call this method here retrospectively, as if it runs 100s of times, the memory-stack may break.. // The scheduler will handle calling it every half an hour, in case the Worker is available for work.. } @@ -176,20 +127,34 @@ public class AssignmentHandler { logger.info("Going to post the WorkerReport of assignment_" + assignmentRequestCounter + " to the controller-server: " + postUrl); try { - ResponseEntity responseEntity = new RestTemplateBuilder().build().postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class); + ResponseEntity responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class); int responseCode = responseEntity.getStatusCodeValue(); if ( responseCode != HttpStatus.OK.value() ) { - logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode); + logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode); return false; } } catch (Exception e) { logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e); + e.printStackTrace(); return false; } finally { - AssignmentHandler.urlReports.clear(); // Reset the UrlReports list for the next assignments-handling. + urlReports.clear(); // Reset, without de-allocating. + assignmentsForPlugins.clear(); } return true; } + + public static void countDatasourcesAndRecords(int assignmentsSize) + { + Set datasources = assignmentsForPlugins.keySet(); + int numDatasources = datasources.size(); + logger.debug("Num of datasources: " + numDatasources); + for ( String datasource : datasources ) { + logger.debug("Num of records for datasource \"" + datasource + "\" is: " + assignmentsForPlugins.get(datasource).size() ); + } + logger.debug("Average num of records per datasource: " + (assignmentsSize / numDatasources)); + } + } diff --git a/src/main/java/eu/openaire/urls_worker/util/WorkerConstants.java b/src/main/java/eu/openaire/urls_worker/util/WorkerConstants.java index 4093ca3..4933c1a 100644 --- a/src/main/java/eu/openaire/urls_worker/util/WorkerConstants.java +++ b/src/main/java/eu/openaire/urls_worker/util/WorkerConstants.java @@ -2,6 +2,6 @@ package eu.openaire.urls_worker.util; public interface WorkerConstants { - int ASSIGNMENTS_LIMIT = 10000; + int ASSIGNMENTS_LIMIT = 10_000; }