- Show the workerIPs in the logs.

- Rename the "FullTexts"-files to "BulkImport".
This commit is contained in:
Lampros Smyrnaios 2023-05-29 12:12:08 +03:00
parent 3988eb3a48
commit 74ff31fc64
6 changed files with 32 additions and 32 deletions

View File

@ -1,6 +1,6 @@
package eu.openaire.urls_controller; package eu.openaire.urls_controller;
import eu.openaire.urls_controller.services.FullTextsServiceImpl; import eu.openaire.urls_controller.services.BulkImportServiceImpl;
import eu.openaire.urls_controller.services.UrlsServiceImpl; import eu.openaire.urls_controller.services.UrlsServiceImpl;
import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.UriBuilder; import eu.openaire.urls_controller.util.UriBuilder;
@ -70,8 +70,8 @@ public class Application {
shutdownThreads(UrlsServiceImpl.insertsExecutor); shutdownThreads(UrlsServiceImpl.insertsExecutor);
shutdownThreads(FileUtils.hashMatchingExecutor); shutdownThreads(FileUtils.hashMatchingExecutor);
shutdownThreads(FullTextsServiceImpl.backgroundExecutor); shutdownThreads(BulkImportServiceImpl.backgroundExecutor);
shutdownThreads(FullTextsServiceImpl.bulkImportExecutor); shutdownThreads(BulkImportServiceImpl.bulkImportExecutor);
logger.info("Exiting.."); logger.info("Exiting..");
} }

View File

@ -3,7 +3,7 @@ package eu.openaire.urls_controller.components;
import eu.openaire.urls_controller.Application; import eu.openaire.urls_controller.Application;
import eu.openaire.urls_controller.controllers.ShutdownController; import eu.openaire.urls_controller.controllers.ShutdownController;
import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.services.FullTextsServiceImpl; import eu.openaire.urls_controller.services.BulkImportServiceImpl;
import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils; import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -48,7 +48,7 @@ public class ScheduledTasks {
//@Scheduled(initialDelay = 20_000, fixedDelay = 20_000) // Just for testing (every 20 secs). //@Scheduled(initialDelay = 20_000, fixedDelay = 20_000) // Just for testing (every 20 secs).
public void executeBackgroundTasks() public void executeBackgroundTasks()
{ {
List<Callable<Boolean>> tempList = new ArrayList<>(FullTextsServiceImpl.backgroundCallableTasks); // Copy the list in order to know what was executed. List<Callable<Boolean>> tempList = new ArrayList<>(BulkImportServiceImpl.backgroundCallableTasks); // Copy the list in order to know what was executed.
// So the items added while this execution happens, will be remain in the global-list, while the other will have already be deleted. // So the items added while this execution happens, will be remain in the global-list, while the other will have already be deleted.
int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed. int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed.
if ( numOfTasks == 0 ) if ( numOfTasks == 0 )
@ -56,13 +56,13 @@ public class ScheduledTasks {
// Immediately delete the selected tasks form the global list, so that if these tasks are not finished before the scheduler runs again, they will not be re-executed. // Immediately delete the selected tasks form the global list, so that if these tasks are not finished before the scheduler runs again, they will not be re-executed.
for ( Callable<Boolean> selectedTask : tempList ) { for ( Callable<Boolean> selectedTask : tempList ) {
FullTextsServiceImpl.backgroundCallableTasks.remove(selectedTask); BulkImportServiceImpl.backgroundCallableTasks.remove(selectedTask);
} }
logger.debug(numOfTasks + " background tasks were found inside the \"backgroundCallableTasks\" list and are about to be executed."); logger.debug(numOfTasks + " background tasks were found inside the \"backgroundCallableTasks\" list and are about to be executed.");
// Execute the tasks and wait for them to finish. // Execute the tasks and wait for them to finish.
try { try {
List<Future<Boolean>> futures = FullTextsServiceImpl.backgroundExecutor.invokeAll(tempList); List<Future<Boolean>> futures = BulkImportServiceImpl.backgroundExecutor.invokeAll(tempList);
int sizeOfFutures = futures.size(); int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i ) { for ( int i = 0; i < sizeOfFutures; ++i ) {
try { try {

View File

@ -2,8 +2,8 @@ package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.models.BulkImportReport; import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.services.FullTextsService; import eu.openaire.urls_controller.services.BulkImportService;
import eu.openaire.urls_controller.services.FullTextsServiceImpl; import eu.openaire.urls_controller.services.BulkImportServiceImpl;
import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils; import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -29,14 +29,14 @@ import java.util.regex.Pattern;
@RestController @RestController
@RequestMapping("") @RequestMapping("")
public class FullTextsController { public class BulkImportController {
private static final Logger logger = LoggerFactory.getLogger(FullTextsController.class); private static final Logger logger = LoggerFactory.getLogger(BulkImportController.class);
@Autowired @Autowired
private FileUtils fileUtils; private FileUtils fileUtils;
private final FullTextsService fullTextsService; private final BulkImportService bulkImportService;
private final String baseBulkImportLocation; private final String baseBulkImportLocation;
@ -48,7 +48,7 @@ public class FullTextsController {
public FullTextsController(FullTextsService fullTextsService, BulkImport bulkImport) public BulkImportController(BulkImportService bulkImportService, BulkImport bulkImport)
{ {
String bulkImportReportLocation1; String bulkImportReportLocation1;
this.baseBulkImportLocation = bulkImport.getBaseBulkImportLocation(); this.baseBulkImportLocation = bulkImport.getBaseBulkImportLocation();
@ -60,7 +60,7 @@ public class FullTextsController {
bulkImportReportLocation1 += "/"; bulkImportReportLocation1 += "/";
this.bulkImportReportLocation = bulkImportReportLocation1; this.bulkImportReportLocation = bulkImportReportLocation1;
this.fullTextsService = fullTextsService; this.bulkImportService = bulkImportService;
} }
@ -172,8 +172,8 @@ public class FullTextsController {
// Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours). // Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours).
String finalBulkImportDir = bulkImportDir; String finalBulkImportDir = bulkImportDir;
String finalRelativeBulkImportDir = relativeBulkImportDir; String finalRelativeBulkImportDir = relativeBulkImportDir;
FullTextsServiceImpl.backgroundCallableTasks.add(() -> BulkImportServiceImpl.backgroundCallableTasks.add(() ->
fullTextsService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish) bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish)
); );
return ResponseEntity.ok().body(msg); return ResponseEntity.ok().body(msg);

View File

@ -3,7 +3,7 @@ package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.models.UrlReport; import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo; import eu.openaire.urls_controller.models.WorkerInfo;
import eu.openaire.urls_controller.payloads.requests.WorkerReport; import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.FullTextsServiceImpl; import eu.openaire.urls_controller.services.BulkImportServiceImpl;
import eu.openaire.urls_controller.services.UrlsService; import eu.openaire.urls_controller.services.UrlsService;
import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils; import eu.openaire.urls_controller.util.ParquetFileUtils;
@ -111,7 +111,7 @@ public class UrlsController {
workerInfo.setHasShutdown(false); workerInfo.setHasShutdown(false);
} }
} else { } else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP."); logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "].");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false)); workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
} }
@ -171,7 +171,7 @@ public class UrlsController {
// The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0). // The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0).
int finalSizeOUrlReports = sizeOUrlReports; int finalSizeOUrlReports = sizeOUrlReports;
FullTextsServiceImpl.backgroundCallableTasks.add(() -> BulkImportServiceImpl.backgroundCallableTasks.add(() ->
urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports) urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports)
); );

View File

@ -6,7 +6,7 @@ import eu.openaire.urls_controller.models.BulkImportReport;
import java.io.File; import java.io.File;
import java.util.List; import java.util.List;
public interface FullTextsService { public interface BulkImportService {
Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish); Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish);

View File

@ -4,7 +4,7 @@ package eu.openaire.urls_controller.services;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.controllers.FullTextsController; import eu.openaire.urls_controller.controllers.BulkImportController;
import eu.openaire.urls_controller.models.BulkImportReport; import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.models.DocFileData; import eu.openaire.urls_controller.models.DocFileData;
import eu.openaire.urls_controller.models.FileLocationData; import eu.openaire.urls_controller.models.FileLocationData;
@ -36,9 +36,9 @@ import java.util.stream.Stream;
@Service @Service
public class FullTextsServiceImpl implements FullTextsService { public class BulkImportServiceImpl implements BulkImportService {
private static final Logger logger = LoggerFactory.getLogger(FullTextsServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(BulkImportServiceImpl.class);
@Autowired @Autowired
@ -78,7 +78,7 @@ public class FullTextsServiceImpl implements FullTextsService {
logger.error(errorMsg); logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg); bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName); BulkImportController.bulkImportDirs.remove(bulkImportDirName);
return false; return false;
} }
@ -86,7 +86,7 @@ public class FullTextsServiceImpl implements FullTextsService {
if ( fileLocations == null ) { if ( fileLocations == null ) {
bulkImportReport.addEvent("Could not retrieve the files for bulk-import!"); bulkImportReport.addEvent("Could not retrieve the files for bulk-import!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName); BulkImportController.bulkImportDirs.remove(bulkImportDirName);
return false; return false;
} }
@ -96,7 +96,7 @@ public class FullTextsServiceImpl implements FullTextsService {
logger.warn(errorMsg); logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg); bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName); BulkImportController.bulkImportDirs.remove(bulkImportDirName);
return false; return false;
} }
@ -111,7 +111,7 @@ public class FullTextsServiceImpl implements FullTextsService {
logger.error(errorMsg, e); logger.error(errorMsg, e);
bulkImportReport.addEvent(errorMsg); bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName); BulkImportController.bulkImportDirs.remove(bulkImportDirName);
return false; return false;
} }
@ -122,7 +122,7 @@ public class FullTextsServiceImpl implements FullTextsService {
logger.error(errorMsg); logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg); bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName); BulkImportController.bulkImportDirs.remove(bulkImportDirName);
return false; return false;
} }
@ -169,7 +169,7 @@ public class FullTextsServiceImpl implements FullTextsService {
logger.error(errorMsg, e); logger.error(errorMsg, e);
bulkImportReport.addEvent(errorMsg); bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName); BulkImportController.bulkImportDirs.remove(bulkImportDirName);
return false; return false;
} finally { } finally {
logger.debug("Deleting local parquet directory: " + localParquetDir); logger.debug("Deleting local parquet directory: " + localParquetDir);
@ -183,7 +183,7 @@ public class FullTextsServiceImpl implements FullTextsService {
logger.error(errorMsg); logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg); bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName); BulkImportController.bulkImportDirs.remove(bulkImportDirName);
return false; return false;
} else if ( numFailedFiles > 0 ) { // Some failed, but not all. } else if ( numFailedFiles > 0 ) { // Some failed, but not all.
msg = numFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName; msg = numFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName;
@ -202,7 +202,7 @@ public class FullTextsServiceImpl implements FullTextsService {
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
bulkImportReport.addEvent(mergeErrorMsg); bulkImportReport.addEvent(mergeErrorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName); BulkImportController.bulkImportDirs.remove(bulkImportDirName);
return false; return false;
} }
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
@ -215,7 +215,7 @@ public class FullTextsServiceImpl implements FullTextsService {
// Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time, // Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time,
// after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours). // after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours).
FullTextsController.bulkImportDirs.remove(bulkImportDirName); BulkImportController.bulkImportDirs.remove(bulkImportDirName);
return true; return true;
} }