- Increase the "numOfBackgroundThreads" to 8.

- Make the "numOfBackgroundThreads" and "numOfThreadsPerBulkImportProcedure" configurable from the "application.yml" file.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2023-07-21 11:45:50 +03:00
parent fd1cf56863
commit cec2531737
6 changed files with 39 additions and 13 deletions

View File

@ -14,6 +14,8 @@ public class BulkImport {
private String bulkImportReportLocation; private String bulkImportReportLocation;
private int numOfThreadsPerBulkImportProcedure;
private Map<String, BulkImportSource> bulkImportSources; private Map<String, BulkImportSource> bulkImportSources;
public BulkImport() { public BulkImport() {
@ -35,6 +37,14 @@ public class BulkImport {
this.bulkImportReportLocation = bulkImportReportLocation; this.bulkImportReportLocation = bulkImportReportLocation;
} }
public int getNumOfThreadsPerBulkImportProcedure() {
return numOfThreadsPerBulkImportProcedure;
}
public void setNumOfThreadsPerBulkImportProcedure(int numOfThreadsPerBulkImportProcedure) {
this.numOfThreadsPerBulkImportProcedure = numOfThreadsPerBulkImportProcedure;
}
public Map<String, BulkImportSource> getBulkImportSources() { public Map<String, BulkImportSource> getBulkImportSources() {
return bulkImportSources; return bulkImportSources;
} }
@ -48,6 +58,7 @@ public class BulkImport {
return "BulkImport{" + return "BulkImport{" +
"baseBulkImportLocation='" + baseBulkImportLocation + '\'' + "baseBulkImportLocation='" + baseBulkImportLocation + '\'' +
", bulkImportReportLocation='" + bulkImportReportLocation + '\'' + ", bulkImportReportLocation='" + bulkImportReportLocation + '\'' +
", numOfThreadsPerBulkImportProcedure=" + numOfThreadsPerBulkImportProcedure +
", bulkImportSources=" + bulkImportSources + ", bulkImportSources=" + bulkImportSources +
'}'; '}';
} }

View File

@ -23,6 +23,8 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -45,6 +47,8 @@ public class BulkImportController {
public static final Set<String> bulkImportDirs = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); public static final Set<String> bulkImportDirs = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static int numOfThreadsPerBulkImportProcedure;
public static ExecutorService bulkImportExecutor;
public BulkImportController(BulkImportService bulkImportService, BulkImport bulkImport) public BulkImportController(BulkImportService bulkImportService, BulkImport bulkImport)
@ -60,6 +64,10 @@ public class BulkImportController {
this.bulkImportReportLocation = bulkImportReportLocation1; this.bulkImportReportLocation = bulkImportReportLocation1;
this.bulkImportService = bulkImportService; this.bulkImportService = bulkImportService;
numOfThreadsPerBulkImportProcedure = bulkImport.getNumOfThreadsPerBulkImportProcedure();
logger.info("Will use " + numOfThreadsPerBulkImportProcedure + " threads per bulk-import procedure.");
bulkImportExecutor = Executors.newFixedThreadPool(numOfThreadsPerBulkImportProcedure); // At most < numOfThreadsPerBulkImportProcedure > threads will be used per bulk-import procedure..
} }

View File

@ -48,19 +48,23 @@ public class UrlsController {
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6); public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
public static final ExecutorService backgroundExecutor = Executors.newFixedThreadPool(4); // At most 4 threads will be used. public static ExecutorService backgroundExecutor;
public static final List<Callable<Boolean>> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>()); public static final List<Callable<Boolean>> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>());
private final String workerReportsDirPath; private final String workerReportsDirPath;
public UrlsController(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath) public UrlsController(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath,
@Value("${services.pdfaggregation.controller.numOfBackgroundThreads}") int numOfBackgroundThreads)
{ {
if ( !workerReportsDirPath.endsWith("/") ) if ( !workerReportsDirPath.endsWith("/") )
workerReportsDirPath += "/"; workerReportsDirPath += "/";
this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later. this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later.
logger.info("Will use " + numOfBackgroundThreads + " threads for background tasks, such as processing worker-reports or bulk-import procedures.");
backgroundExecutor = Executors.newFixedThreadPool(numOfBackgroundThreads); // At most < numOfBackgroundThreads > tasks will be running in parallel.
} }
@ -74,7 +78,7 @@ public class UrlsController {
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg); return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
} }
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit); logger.info("Worker with id: \"" + workerId + "\", requested up to " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server. // Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit; int assignmentsLimit = workerAssignmentsLimit;

View File

@ -29,7 +29,10 @@ import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -51,9 +54,6 @@ public class BulkImportServiceImpl implements BulkImportService {
private JdbcTemplate jdbcTemplate; private JdbcTemplate jdbcTemplate;
private static final int numOfBulkImportThreads = 4;
public static final ExecutorService bulkImportExecutor = Executors.newFixedThreadPool(numOfBulkImportThreads); // At most 4 threads will be used.
/** /**
* Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service. * Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service.
@ -124,7 +124,7 @@ public class BulkImportServiceImpl implements BulkImportService {
long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records. long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records.
List<Callable<Integer>> callables = new ArrayList<>(numOfFiles); List<Callable<Integer>> callables = new ArrayList<>(numOfFiles);
List<List<String>> subLists = Lists.partition(fileLocations, numOfBulkImportThreads); // Divide the initial list to "numOfBulkImportThreads" subLists. The last one may have marginally fewer files. List<List<String>> subLists = Lists.partition(fileLocations, BulkImportController.numOfThreadsPerBulkImportProcedure); // Divide the initial list to "numOfBulkImportThreads" subLists. The last one may have marginally fewer files.
int subListsSize = subLists.size(); int subListsSize = subLists.size();
bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel."); bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel.");
@ -140,7 +140,7 @@ public class BulkImportServiceImpl implements BulkImportService {
int numFailedSegments = 0; int numFailedSegments = 0;
int numFailedFiles = 0; int numFailedFiles = 0;
try { try {
List<Future<Integer>> futures = bulkImportExecutor.invokeAll(callables); // This waits for all tasks to finish. List<Future<Integer>> futures = BulkImportController.bulkImportExecutor.invokeAll(callables); // This waits for all tasks to finish.
int sizeOfFutures = futures.size(); int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i ) { for ( int i = 0; i < sizeOfFutures; ++i ) {
try { try {

View File

@ -264,10 +264,8 @@ public class UrlsServiceImpl implements UrlsService {
List<Assignment> distinctAssignments = new ArrayList<>(uniquePairsAndAssignments.values()); List<Assignment> distinctAssignments = new ArrayList<>(uniquePairsAndAssignments.values());
int distinctAssignmentsSize = distinctAssignments.size(); int distinctAssignmentsSize = distinctAssignments.size();
if ( logger.isTraceEnabled() ) { if ( logger.isTraceEnabled() )
logger.trace("numDuplicates in returned assignments = " + (assignmentsSize - distinctAssignmentsSize)); logger.trace("numDuplicates in returned assignments_" + curAssignmentsBatchCounter + " = " + (assignmentsSize - distinctAssignmentsSize));
logger.trace("Size of \"distinctAssignments\": " + distinctAssignments.size());
}
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + distinctAssignmentsSize + " assignments to worker with ID: " + workerId + "."); logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + distinctAssignmentsSize + " assignments to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, distinctAssignments)); return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, distinctAssignments));

View File

@ -19,6 +19,10 @@ services:
assignmentLimit: 10000 assignmentLimit: 10000
maxAttemptsPerRecord: 3 maxAttemptsPerRecord: 3
numOfBackgroundThreads: 8
# This refers to the number of threads running in the background and processing workerReports and bulkImport procedures.
baseFilesLocation: /tmp/ baseFilesLocation: /tmp/
workerReportsDirPath: /reports/workerReports/ workerReportsDirPath: /reports/workerReports/
parquetLocalDirectoryPath: ${services.pdfaggregation.controller.baseFilesLocation}parquetFiles/ parquetLocalDirectoryPath: ${services.pdfaggregation.controller.baseFilesLocation}parquetFiles/
@ -35,6 +39,7 @@ services:
bulk-import: bulk-import:
baseBulkImportLocation: /mnt/bulk_import/ baseBulkImportLocation: /mnt/bulk_import/
bulkImportReportLocation: /reports/bulkImportReports/ bulkImportReportLocation: /reports/bulkImportReports/
numOfThreadsPerBulkImportProcedure: 4
bulkImportSources: # These sources are accepted for bulk-import requests and are excluded from crawling. bulkImportSources: # These sources are accepted for bulk-import requests and are excluded from crawling.
arxivImport: arxivImport:
datasourceID: opendoar____::6f4922f45568161a8cdf4ad2299f6d23 datasourceID: opendoar____::6f4922f45568161a8cdf4ad2299f6d23