- Add check for remaining "bulkImportDirsUnderProcessing", before shutting down the Service.

- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2023-10-05 13:43:47 +03:00
parent 96c11ba4b8
commit b2ce6393c1
1 changed files with 11 additions and 6 deletions

View File

@ -4,6 +4,7 @@ import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import eu.openaire.urls_controller.Application;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.BulkImportController;
import eu.openaire.urls_controller.controllers.ShutdownController;
import eu.openaire.urls_controller.controllers.StatsController;
import eu.openaire.urls_controller.controllers.UrlsController;
@ -69,7 +70,6 @@ public class ScheduledTasks {
{
if ( !workerReportsDirPath.endsWith("/") )
workerReportsDirPath += "/";
this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later.
this.statsController = statsController;
this.urlsController = urlsController;
@ -89,9 +89,9 @@ public class ScheduledTasks {
// The initial delay is larger, because we have to wait some time for at least one worker to finish retrieving the full-texts from thousands of publications, whereas, later we will have a lot of workerReports waiting to be processed.
public void executeBackgroundTasks()
{
List<Callable<Boolean>> tempList = new ArrayList<>(UrlsController.backgroundCallableTasks); // Copy the list in order to know what was executed.
List<Callable<Boolean>> tempList = new ArrayList<>(UrlsController.backgroundCallableTasks); // Deep-copy the list in order to know what was executed.
// So the items added while this execution happens, will remain in the global-list, while the others will have already been deleted.
// Also, the "Executor.invokeAll()" requires an "unchanged" list, otherwise there will be "undefined results".
// Also, the "Executor.invokeAll()" requires an "unchanged" list, hence the "tempList", otherwise there will be "undefined results".
int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed.
if ( numOfTasks == 0 )
return;
@ -105,15 +105,15 @@ public class ScheduledTasks {
try {
List<Future<Boolean>> futures = UrlsController.backgroundExecutor.invokeAll(tempList);
int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i ) {
for ( int i=0; i < sizeOfFutures; ++i ) {
try {
Boolean value = futures.get(i).get(); // Get and see if an exception is thrown..
// Add check for the value, if wanted.. (we don't care at the moment)
} catch (ExecutionException ee) {
String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + GenericUtils.endOfLine + stackTraceMessage);
logger.error("Task_" + i + " failed with: " + ee.getMessage() + GenericUtils.endOfLine + stackTraceMessage);
} catch (CancellationException ce) {
logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage());
logger.error("Task_" + i + " was cancelled: " + ce.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage());
}
@ -135,6 +135,11 @@ public class ScheduledTasks {
if ( UrlsController.backgroundCallableTasks.size() > 0 )
return;
// Check whether there are any active bulk-import procedures.
if ( BulkImportController.bulkImportDirsUnderProcessing.size() > 0 )
return;
// Check whether the workers have not shutdown yet, which means that they either crawl assignments or/and they are waiting for the Controller to process the WorkerReport and then shutdown.
Set<String> workerIds = UrlsController.workersInfoMap.keySet();
if ( workerIds.size() > 0 ) {
for ( String workerId : workerIds )