- Improve performance when requesting, processing and posting requests.

- Fix a bug, causing degraded performance when processing more than 3000 assignments.
- Fix the progress percentage shown in the logs.
- Avoid a potential NPE when processing a broken "Assignment" object.
- Update Spring to v.2.5.6.
- Code cleanup.
This commit is contained in:
Lampros Smyrnaios 2021-10-30 17:14:18 +03:00
parent 0f12a9305c
commit 3220c97373
4 changed files with 53 additions and 92 deletions

View File

@ -1,12 +1,5 @@
buildscript {
ext {
springBootVersion = "2.5.5"
springSecurityVersion = "5.5.2"
}
}
plugins {
id 'org.springframework.boot' version '2.5.5'
id 'org.springframework.boot' version '2.5.6'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
@ -25,14 +18,14 @@ repositories {
dependencies {
runtimeOnly "org.springframework.boot:spring-boot-devtools:${springBootVersion}"
runtimeOnly "org.springframework.boot:spring-boot-devtools"
implementation("org.springframework.boot:spring-boot-starter-web:${springBootVersion}")
implementation("org.springframework.boot:spring-boot-starter-security:${springBootVersion}")
implementation("org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}")
implementation("org.springframework.security:spring-security-core:${springSecurityVersion}")
implementation("org.springframework.security:spring-security-web:${springSecurityVersion}")
implementation("org.springframework.security:spring-security-config:${springSecurityVersion}")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-security")
implementation("org.springframework.boot:spring-boot-configuration-processor")
implementation("org.springframework.security:spring-security-core")
implementation("org.springframework.security:spring-security-web")
implementation("org.springframework.security:spring-security-config")
//implementation("io.jsonwebtoken:jjwt:0.9.1") // Use this in case we use auth-tokens later on.
implementation "org.projectlombok:lombok:1.18.22"
@ -46,8 +39,8 @@ dependencies {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
testImplementation group: 'org.springframework.security', name: 'spring-security-test', version: springSecurityVersion
testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"
testImplementation group: 'org.springframework.security', name: 'spring-security-test'
testImplementation "org.springframework.boot:spring-boot-starter-test"
}
configurations {

View File

@ -15,6 +15,7 @@ import eu.openaire.urls_worker.models.Error;
import eu.openaire.urls_worker.models.Payload;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.util.AssignmentHandler;
import eu.openaire.urls_worker.util.WorkerConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,6 +52,7 @@ public class PublicationsRetrieverPlugin {
FileUtils.shouldDownloadDocFiles = true;
FileUtils.docFileNameType = FileUtils.DocFileNameType.idName;
PublicationsRetriever.targetUrlType = "docUrl";
FileUtils.jsonBatchSize = WorkerConstants.ASSIGNMENTS_LIMIT;
FileUtils.shouldUploadFilesToS3 = true;
new S3ObjectStoreMinIO(); // Check here on how to create the credentials-file: https://github.com/LSmyrnaios/PublicationsRetriever/blob/master/README.md
@ -60,6 +62,7 @@ public class PublicationsRetrieverPlugin {
PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount);
}
private static final List<Callable<Boolean>> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize);
public static void processAssignments(Long assignmentRequestCounter, Collection<Assignment> assignments) throws RuntimeException, FileNotFoundException
{
@ -70,7 +73,6 @@ public class PublicationsRetrieverPlugin {
int tasksNumber = assignments.size();
int batchCount = 0;
int tasksCount = 0;
List<Callable<Boolean>> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize);
// Start loading and checking urls.
for ( Assignment assignment : assignments )
@ -119,9 +121,10 @@ public class PublicationsRetrieverPlugin {
});
// Invoke the tasks every time we reach the "jsonBatchSize" tasks, or we are at the end of the list.
if ( ((++tasksCount) >= FileUtils.jsonBatchSize) || (tasksCount >= tasksNumber) )
tasksCount ++;
if ( (tasksCount == FileUtils.jsonBatchSize) || (tasksCount == tasksNumber) )
{
logger.info("Batch counter: " + (++batchCount) + " | progress: " + PublicationsRetriever.df.format(((batchCount-1) * tasksCount) * 100.0 / tasksNumber) + "% | every batch contains " + FileUtils.jsonBatchSize + " id-url pairs.");
logger.info("Batch counter: " + (++batchCount) + " | progress: " + PublicationsRetriever.df.format((batchCount * tasksCount) * 100.0 / tasksNumber) + "% | every batch contains " + FileUtils.jsonBatchSize + " id-url pairs.");
LoaderAndChecker.invokeAllTasksAndWait(callableTasks);
addUrlReportsToWorkerReport();
callableTasks.clear(); // Reset the thread-tasks-list for the next batch.

View File

@ -1,7 +1,5 @@
package eu.openaire.urls_worker.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import eu.openaire.urls_worker.UrlsWorkerApplication;
@ -15,12 +13,9 @@ import org.slf4j.LoggerFactory;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.web.client.ResponseErrorHandler;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
@ -30,72 +25,28 @@ public class AssignmentHandler {
private static final Logger logger = LoggerFactory.getLogger(AssignmentHandler.class);
public static boolean isAvailableForWork = true;
public static List<UrlReport> urlReports = null;
public static final List<UrlReport> urlReports = new ArrayList<>(WorkerConstants.ASSIGNMENTS_LIMIT);
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
private static final int expectedAssignmentsPerDatasource = (WorkerConstants.ASSIGNMENTS_LIMIT / expectedDatasourcesPerRequest);
public static final Multimap<String, Assignment> assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
private static final boolean askForTest = false; // Enable this only for testing.
private static final Duration requestConnectTimeoutDuration = Duration.ofMinutes(1); // 1 minute.
private static final Duration requestReadTimeoutDuration = Duration.ofMinutes(60); // 60 minutes. Time to wait for the data to get transferred over the network. Many workers may try to get assignments from the Worker, so each worker might have to wait some 10s of minutes for work.
// The controller has to retrieve the data from the database, then prepare them in memory, insert them in the "assignment"-table and, finally, return them to the worker.
private static boolean encounteredHTTPRequestError = false;
public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build();
public static AssignmentRequest requestAssignments()
{
String requestUrl = UrlsWorkerApplication.controllerBaseUrl + "urls" + (askForTest ? "/test" : "") + "?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT;
logger.info("Going to request assignments from the controller-server: " + requestUrl);
RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build();
ResponseErrorHandler responseErrorHandler = new ResponseErrorHandler() {
@Override
public boolean hasError(ClientHttpResponse response) throws IOException {
if ( response.getRawStatusCode() != 200 ) {
encounteredHTTPRequestError = true;
return true;
} else {
encounteredHTTPRequestError = false; // Make sure the value is reset here, to avoid non-present errors from previous requests.
return false;
}
}
/**
* This is the HTTP-error-handler.
* */
@Override
public void handleError(ClientHttpResponse response) throws IOException {
int responseCode = response.getRawStatusCode();
String statusText = response.getStatusText();
String extraBodyMessage = "\n" + response.getBody();
String additionalErrorMessage = ((!"".equals(statusText)) ? statusText + extraBodyMessage : "The HTTP-response-code was: " + responseCode + extraBodyMessage);
if ( (responseCode >= 500) && responseCode <= 599 )
logger.error("The Controller encountered a problem! " + additionalErrorMessage);
else if ( (responseCode >= 400) && (responseCode <= 499) )
logger.error("There was a bad request to the Controller! " + additionalErrorMessage);
else if ( responseCode != 200 )
logger.error("There was an HTTP-error when requesting the assignments from the Controller! " + additionalErrorMessage);
}
};
restTemplate.setErrorHandler(responseErrorHandler);
String json = null;
try { // Here, the HTTP-request is executed.
json = restTemplate.getForObject(requestUrl, String.class);
} catch (RestClientException e) {
logger.error("Could not retrieve the assignments!\n" + e.getMessage());
return null;
}
if ( encounteredHTTPRequestError )
return null;
AssignmentRequest assignmentRequest = null;
try {
assignmentRequest = new ObjectMapper().readValue(json, AssignmentRequest.class);
} catch (JsonProcessingException jpe) {
String errorMessage = "Could not process/map the json to an \"assignmentRequest\"!\n" + jpe.getMessage();
logger.error(errorMessage);
System.err.println(errorMessage);
jpe.printStackTrace();
try { // Here, the HTTP-request is executed.
assignmentRequest = restTemplate.getForObject(requestUrl, AssignmentRequest.class);
} catch (RestClientException rce) {
logger.error("Could not retrieve the assignments!\n" + rce.getMessage()); // It shows the response body (after Spring v.2.5.6).
return null;
}
@ -107,10 +58,8 @@ public class AssignmentHandler {
public static void handleAssignments()
{
AssignmentRequest assignmentRequest = requestAssignments();
if ( assignmentRequest == null ) {
logger.error("The \"assignmentRequest\" was \"null\"!");
if ( assignmentRequest == null )
return;
}
Long assignmentRequestCounter = assignmentRequest.getAssignmentCounter();
List<Assignment> assignments = assignmentRequest.getAssignments();
@ -130,16 +79,18 @@ public class AssignmentHandler {
// Start handling the assignments, the worker is busy.
isAvailableForWork = false;
// Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later.
Multimap<String, Assignment> assignmentsForPlugins = HashMultimap.create();
// Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later.
for ( Assignment assignment : assignments ) {
// Add each task in its own HashSet.
assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment);
try {
assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment);
} catch (NullPointerException npe) {
logger.warn("An NPE was thrown when splitting the assignments based on the datasource-types. The assignment was: " + assignment); // Do not use "assignment.toString()", it may cause an NPE.
}
}
urlReports = new ArrayList<>(assignments.size()); // Define the new UrlReport-list. It is reinitialized each time.
//countDatasourcesAndRecords(assignmentsSize); // Only for DEBUG! Keep it commented in normal run.
// TODO - Decide which tasks run with what plugin (depending on their datasource).
@ -155,8 +106,6 @@ public class AssignmentHandler {
logger.error(e.getMessage(), e);
}
isAvailableForWork = true; // State this before posting, to catch the soonest next scheduled request.
if ( askForTest ) {
logger.debug("UrlReports:"); // DEBUG!
for ( UrlReport urlReport : urlReports )
@ -165,6 +114,8 @@ public class AssignmentHandler {
else
postWorkerReport(assignmentRequestCounter);
isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run.
// Note: Cannot call this method here retrospectively, as if it runs 100s of times, the memory-stack may break..
// The scheduler will handle calling it every half an hour, in case the Worker is available for work..
}
@ -176,20 +127,34 @@ public class AssignmentHandler {
logger.info("Going to post the WorkerReport of assignment_" + assignmentRequestCounter + " to the controller-server: " + postUrl);
try {
ResponseEntity<String> responseEntity = new RestTemplateBuilder().build().postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class);
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode != HttpStatus.OK.value() ) {
logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode);
logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode);
return false;
}
} catch (Exception e) {
logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e);
e.printStackTrace();
return false;
} finally {
AssignmentHandler.urlReports.clear(); // Reset the UrlReports list for the next assignments-handling.
urlReports.clear(); // Reset, without de-allocating.
assignmentsForPlugins.clear();
}
return true;
}
public static void countDatasourcesAndRecords(int assignmentsSize)
{
Set<String> datasources = assignmentsForPlugins.keySet();
int numDatasources = datasources.size();
logger.debug("Num of datasources: " + numDatasources);
for ( String datasource : datasources ) {
logger.debug("Num of records for datasource \"" + datasource + "\" is: " + assignmentsForPlugins.get(datasource).size() );
}
logger.debug("Average num of records per datasource: " + (assignmentsSize / numDatasources));
}
}

View File

@ -2,6 +2,6 @@ package eu.openaire.urls_worker.util;
public interface WorkerConstants {
int ASSIGNMENTS_LIMIT = 10000;
int ASSIGNMENTS_LIMIT = 10_000;
}