From cec2531737d295e952aa628261dadd0fad7d7185 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Fri, 21 Jul 2023 11:45:50 +0300 Subject: [PATCH] - Increase the "numOfBackgroundThreads" to 8. - Make the "numOfBackgroundThreads" and "numOfThreadsPerBulkImportProcedure" configurable from the "application.yml" file. - Code polishing. --- .../urls_controller/components/BulkImport.java | 11 +++++++++++ .../controllers/BulkImportController.java | 8 ++++++++ .../urls_controller/controllers/UrlsController.java | 10 +++++++--- .../services/BulkImportServiceImpl.java | 12 ++++++------ .../urls_controller/services/UrlsServiceImpl.java | 6 ++---- src/main/resources/application.yml | 5 +++++ 6 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/components/BulkImport.java b/src/main/java/eu/openaire/urls_controller/components/BulkImport.java index 1d6e5ea..fec6c09 100644 --- a/src/main/java/eu/openaire/urls_controller/components/BulkImport.java +++ b/src/main/java/eu/openaire/urls_controller/components/BulkImport.java @@ -14,6 +14,8 @@ public class BulkImport { private String bulkImportReportLocation; + private int numOfThreadsPerBulkImportProcedure; + private Map bulkImportSources; public BulkImport() { @@ -35,6 +37,14 @@ public class BulkImport { this.bulkImportReportLocation = bulkImportReportLocation; } + public int getNumOfThreadsPerBulkImportProcedure() { + return numOfThreadsPerBulkImportProcedure; + } + + public void setNumOfThreadsPerBulkImportProcedure(int numOfThreadsPerBulkImportProcedure) { + this.numOfThreadsPerBulkImportProcedure = numOfThreadsPerBulkImportProcedure; + } + public Map getBulkImportSources() { return bulkImportSources; } @@ -48,6 +58,7 @@ public class BulkImport { return "BulkImport{" + "baseBulkImportLocation='" + baseBulkImportLocation + '\'' + ", bulkImportReportLocation='" + bulkImportReportLocation + '\'' + + ", numOfThreadsPerBulkImportProcedure=" + numOfThreadsPerBulkImportProcedure + ", bulkImportSources=" + bulkImportSources + '}'; } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java index 8e63cbf..75b9648 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -45,6 +47,8 @@ public class BulkImportController { public static final Set bulkImportDirs = Collections.newSetFromMap(new ConcurrentHashMap()); + public static int numOfThreadsPerBulkImportProcedure; + public static ExecutorService bulkImportExecutor; public BulkImportController(BulkImportService bulkImportService, BulkImport bulkImport) @@ -60,6 +64,10 @@ public class BulkImportController { this.bulkImportReportLocation = bulkImportReportLocation1; this.bulkImportService = bulkImportService; + + numOfThreadsPerBulkImportProcedure = bulkImport.getNumOfThreadsPerBulkImportProcedure(); + logger.info("Will use " + numOfThreadsPerBulkImportProcedure + " threads per bulk-import procedure."); + bulkImportExecutor = Executors.newFixedThreadPool(numOfThreadsPerBulkImportProcedure); // At most < numOfThreadsPerBulkImportProcedure > threads will be used per bulk-import procedure.. } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index 1f081a6..dceb77e 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -48,19 +48,23 @@ public class UrlsController { public static final ConcurrentHashMap workersInfoMap = new ConcurrentHashMap<>(6); - public static final ExecutorService backgroundExecutor = Executors.newFixedThreadPool(4); // At most 4 threads will be used. + public static ExecutorService backgroundExecutor; public static final List> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>()); private final String workerReportsDirPath; - public UrlsController(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath) + public UrlsController(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath, + @Value("${services.pdfaggregation.controller.numOfBackgroundThreads}") int numOfBackgroundThreads) { if ( !workerReportsDirPath.endsWith("/") ) workerReportsDirPath += "/"; this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later. + + logger.info("Will use " + numOfBackgroundThreads + " threads for background tasks, such as processing worker-reports or bulk-import procedures."); + backgroundExecutor = Executors.newFixedThreadPool(numOfBackgroundThreads); // At most < numOfBackgroundThreads > tasks will be running in parallel. } @@ -74,7 +78,7 @@ public class UrlsController { return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg); } - logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit); + logger.info("Worker with id: \"" + workerId + "\", requested up to " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit); // Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server. int assignmentsLimit = workerAssignmentsLimit; diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index 17cfdec..16f115b 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -29,7 +29,10 @@ import java.sql.Types; import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -51,9 +54,6 @@ public class BulkImportServiceImpl implements BulkImportService { private JdbcTemplate jdbcTemplate; - private static final int numOfBulkImportThreads = 4; - public static final ExecutorService bulkImportExecutor = Executors.newFixedThreadPool(numOfBulkImportThreads); // At most 4 threads will be used. - /** * Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service. @@ -124,7 +124,7 @@ public class BulkImportServiceImpl implements BulkImportService { long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records. List> callables = new ArrayList<>(numOfFiles); - List> subLists = Lists.partition(fileLocations, numOfBulkImportThreads); // Divide the initial list to "numOfBulkImportThreads" subLists. The last one may have marginally fewer files. + List> subLists = Lists.partition(fileLocations, BulkImportController.numOfThreadsPerBulkImportProcedure); // Divide the initial list to "numOfBulkImportThreads" subLists. The last one may have marginally fewer files. int subListsSize = subLists.size(); bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel."); @@ -140,7 +140,7 @@ public class BulkImportServiceImpl implements BulkImportService { int numFailedSegments = 0; int numFailedFiles = 0; try { - List> futures = bulkImportExecutor.invokeAll(callables); // This waits for all tasks to finish. + List> futures = BulkImportController.bulkImportExecutor.invokeAll(callables); // This waits for all tasks to finish. int sizeOfFutures = futures.size(); for ( int i = 0; i < sizeOfFutures; ++i ) { try { diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index ab6670f..441bbf4 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -264,10 +264,8 @@ public class UrlsServiceImpl implements UrlsService { List distinctAssignments = new ArrayList<>(uniquePairsAndAssignments.values()); int distinctAssignmentsSize = distinctAssignments.size(); - if ( logger.isTraceEnabled() ) { - logger.trace("numDuplicates in returned assignments = " + (assignmentsSize - distinctAssignmentsSize)); - logger.trace("Size of \"distinctAssignments\": " + distinctAssignments.size()); - } + if ( logger.isTraceEnabled() ) + logger.trace("numDuplicates in returned assignments_" + curAssignmentsBatchCounter + " = " + (assignmentsSize - distinctAssignmentsSize)); logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + distinctAssignmentsSize + " assignments to worker with ID: " + workerId + "."); return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, distinctAssignments)); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4279d4a..2855f0d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -19,6 +19,10 @@ services: assignmentLimit: 10000 maxAttemptsPerRecord: 3 + + numOfBackgroundThreads: 8 + # This refers to the number of threads running in the background and processing workerReports and bulkImport procedures. + baseFilesLocation: /tmp/ workerReportsDirPath: /reports/workerReports/ parquetLocalDirectoryPath: ${services.pdfaggregation.controller.baseFilesLocation}parquetFiles/ @@ -35,6 +39,7 @@ services: bulk-import: baseBulkImportLocation: /mnt/bulk_import/ bulkImportReportLocation: /reports/bulkImportReports/ + numOfThreadsPerBulkImportProcedure: 4 bulkImportSources: # These sources are accepted for bulk-import requests and are excluded from crawling. arxivImport: datasourceID: opendoar____::6f4922f45568161a8cdf4ad2299f6d23