Code polishing.

This commit is contained in:
Lampros Smyrnaios 2023-05-29 12:21:48 +03:00
parent 03bf4294b8
commit a38d6ace79
5 changed files with 17 additions and 14 deletions

View File

@ -1,5 +1,6 @@
package eu.openaire.urls_controller;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.services.BulkImportServiceImpl;
import eu.openaire.urls_controller.services.UrlsServiceImpl;
import eu.openaire.urls_controller.util.FileUtils;
@ -70,7 +71,7 @@ public class Application {
shutdownThreads(UrlsServiceImpl.insertsExecutor);
shutdownThreads(FileUtils.hashMatchingExecutor);
shutdownThreads(BulkImportServiceImpl.backgroundExecutor);
shutdownThreads(UrlsController.backgroundExecutor);
shutdownThreads(BulkImportServiceImpl.bulkImportExecutor);
logger.info("Exiting..");

View File

@ -3,7 +3,6 @@ package eu.openaire.urls_controller.components;
import eu.openaire.urls_controller.Application;
import eu.openaire.urls_controller.controllers.ShutdownController;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.services.BulkImportServiceImpl;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger;
@ -48,7 +47,7 @@ public class ScheduledTasks {
//@Scheduled(initialDelay = 20_000, fixedDelay = 20_000) // Just for testing (every 20 secs).
public void executeBackgroundTasks()
{
List<Callable<Boolean>> tempList = new ArrayList<>(BulkImportServiceImpl.backgroundCallableTasks); // Copy the list in order to know what was executed.
List<Callable<Boolean>> tempList = new ArrayList<>(UrlsController.backgroundCallableTasks); // Copy the list in order to know what was executed.
// So the items added while this execution happens, will be remain in the global-list, while the other will have already be deleted.
int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed.
if ( numOfTasks == 0 )
@ -56,13 +55,13 @@ public class ScheduledTasks {
// Immediately delete the selected tasks form the global list, so that if these tasks are not finished before the scheduler runs again, they will not be re-executed.
for ( Callable<Boolean> selectedTask : tempList ) {
BulkImportServiceImpl.backgroundCallableTasks.remove(selectedTask);
UrlsController.backgroundCallableTasks.remove(selectedTask);
}
logger.debug(numOfTasks + " background tasks were found inside the \"backgroundCallableTasks\" list and are about to be executed.");
// Execute the tasks and wait for them to finish.
try {
List<Future<Boolean>> futures = BulkImportServiceImpl.backgroundExecutor.invokeAll(tempList);
List<Future<Boolean>> futures = UrlsController.backgroundExecutor.invokeAll(tempList);
int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i ) {
try {

View File

@ -3,7 +3,6 @@ package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.services.BulkImportService;
import eu.openaire.urls_controller.services.BulkImportServiceImpl;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger;
@ -172,7 +171,7 @@ public class BulkImportController {
// Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours).
String finalBulkImportDir = bulkImportDir;
String finalRelativeBulkImportDir = relativeBulkImportDir;
BulkImportServiceImpl.backgroundCallableTasks.add(() ->
UrlsController.backgroundCallableTasks.add(() ->
bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish)
);

View File

@ -3,7 +3,6 @@ package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.BulkImportServiceImpl;
import eu.openaire.urls_controller.services.UrlsService;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
@ -19,8 +18,13 @@ import javax.servlet.http.HttpServletRequest;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
@ -46,6 +50,11 @@ public class UrlsController {
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
public static final ExecutorService backgroundExecutor = Executors.newFixedThreadPool(4); // At most 4 threads will be used.
public static final List<Callable<Boolean>> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>());
private final String workerReportsDirPath;
@ -171,7 +180,7 @@ public class UrlsController {
// The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0).
int finalSizeOUrlReports = sizeOUrlReports;
BulkImportServiceImpl.backgroundCallableTasks.add(() ->
UrlsController.backgroundCallableTasks.add(() ->
urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports)
);

View File

@ -27,7 +27,6 @@ import java.nio.file.Paths;
import java.security.MessageDigest;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.*;
@ -51,10 +50,6 @@ public class BulkImportServiceImpl implements BulkImportService {
@Autowired
private JdbcTemplate jdbcTemplate;
public static final ExecutorService backgroundExecutor = Executors.newFixedThreadPool(4); // At most 4 threads will be used.
public static final List<Callable<Boolean>> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>());
private static final int numOfBulkImportThreads = 4;
public static final ExecutorService bulkImportExecutor = Executors.newFixedThreadPool(numOfBulkImportThreads); // At most 4 threads will be used.