From 74ff31fc64003b5081577421e2ddc71116011967 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Mon, 29 May 2023 12:12:08 +0300 Subject: [PATCH] - Show the workerIPs in the logs. - Rename the "FullTexts"-files to "BulkImport". --- .../openaire/urls_controller/Application.java | 6 ++--- .../components/ScheduledTasks.java | 8 +++---- ...troller.java => BulkImportController.java} | 18 +++++++------- .../controllers/UrlsController.java | 6 ++--- ...xtsService.java => BulkImportService.java} | 2 +- ...ceImpl.java => BulkImportServiceImpl.java} | 24 +++++++++---------- 6 files changed, 32 insertions(+), 32 deletions(-) rename src/main/java/eu/openaire/urls_controller/controllers/{FullTextsController.java => BulkImportController.java} (93%) rename src/main/java/eu/openaire/urls_controller/services/{FullTextsService.java => BulkImportService.java} (93%) rename src/main/java/eu/openaire/urls_controller/services/{FullTextsServiceImpl.java => BulkImportServiceImpl.java} (96%) diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/Application.java index dc2048a..ae2b59f 100644 --- a/src/main/java/eu/openaire/urls_controller/Application.java +++ b/src/main/java/eu/openaire/urls_controller/Application.java @@ -1,6 +1,6 @@ package eu.openaire.urls_controller; -import eu.openaire.urls_controller.services.FullTextsServiceImpl; +import eu.openaire.urls_controller.services.BulkImportServiceImpl; import eu.openaire.urls_controller.services.UrlsServiceImpl; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.UriBuilder; @@ -70,8 +70,8 @@ public class Application { shutdownThreads(UrlsServiceImpl.insertsExecutor); shutdownThreads(FileUtils.hashMatchingExecutor); - shutdownThreads(FullTextsServiceImpl.backgroundExecutor); - shutdownThreads(FullTextsServiceImpl.bulkImportExecutor); + shutdownThreads(BulkImportServiceImpl.backgroundExecutor); + shutdownThreads(BulkImportServiceImpl.bulkImportExecutor); logger.info("Exiting.."); } diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index ef355c3..949ec14 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -3,7 +3,7 @@ package eu.openaire.urls_controller.components; import eu.openaire.urls_controller.Application; import eu.openaire.urls_controller.controllers.ShutdownController; import eu.openaire.urls_controller.controllers.UrlsController; -import eu.openaire.urls_controller.services.FullTextsServiceImpl; +import eu.openaire.urls_controller.services.BulkImportServiceImpl; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.GenericUtils; import org.slf4j.Logger; @@ -48,7 +48,7 @@ public class ScheduledTasks { //@Scheduled(initialDelay = 20_000, fixedDelay = 20_000) // Just for testing (every 20 secs). public void executeBackgroundTasks() { - List> tempList = new ArrayList<>(FullTextsServiceImpl.backgroundCallableTasks); // Copy the list in order to know what was executed. + List> tempList = new ArrayList<>(BulkImportServiceImpl.backgroundCallableTasks); // Copy the list in order to know what was executed. // So the items added while this execution happens, will be remain in the global-list, while the other will have already be deleted. int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed. if ( numOfTasks == 0 ) @@ -56,13 +56,13 @@ public class ScheduledTasks { // Immediately delete the selected tasks form the global list, so that if these tasks are not finished before the scheduler runs again, they will not be re-executed. for ( Callable selectedTask : tempList ) { - FullTextsServiceImpl.backgroundCallableTasks.remove(selectedTask); + BulkImportServiceImpl.backgroundCallableTasks.remove(selectedTask); } logger.debug(numOfTasks + " background tasks were found inside the \"backgroundCallableTasks\" list and are about to be executed."); // Execute the tasks and wait for them to finish. try { - List> futures = FullTextsServiceImpl.backgroundExecutor.invokeAll(tempList); + List> futures = BulkImportServiceImpl.backgroundExecutor.invokeAll(tempList); int sizeOfFutures = futures.size(); for ( int i = 0; i < sizeOfFutures; ++i ) { try { diff --git a/src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java similarity index 93% rename from src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java rename to src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java index 8b5cb5a..fbb3cdf 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java @@ -2,8 +2,8 @@ package eu.openaire.urls_controller.controllers; import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.models.BulkImportReport; -import eu.openaire.urls_controller.services.FullTextsService; -import eu.openaire.urls_controller.services.FullTextsServiceImpl; +import eu.openaire.urls_controller.services.BulkImportService; +import eu.openaire.urls_controller.services.BulkImportServiceImpl; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.GenericUtils; import org.slf4j.Logger; @@ -29,14 +29,14 @@ import java.util.regex.Pattern; @RestController @RequestMapping("") -public class FullTextsController { +public class BulkImportController { - private static final Logger logger = LoggerFactory.getLogger(FullTextsController.class); + private static final Logger logger = LoggerFactory.getLogger(BulkImportController.class); @Autowired private FileUtils fileUtils; - private final FullTextsService fullTextsService; + private final BulkImportService bulkImportService; private final String baseBulkImportLocation; @@ -48,7 +48,7 @@ public class FullTextsController { - public FullTextsController(FullTextsService fullTextsService, BulkImport bulkImport) + public BulkImportController(BulkImportService bulkImportService, BulkImport bulkImport) { String bulkImportReportLocation1; this.baseBulkImportLocation = bulkImport.getBaseBulkImportLocation(); @@ -60,7 +60,7 @@ public class FullTextsController { bulkImportReportLocation1 += "/"; this.bulkImportReportLocation = bulkImportReportLocation1; - this.fullTextsService = fullTextsService; + this.bulkImportService = bulkImportService; } @@ -172,8 +172,8 @@ public class FullTextsController { // Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours). String finalBulkImportDir = bulkImportDir; String finalRelativeBulkImportDir = relativeBulkImportDir; - FullTextsServiceImpl.backgroundCallableTasks.add(() -> - fullTextsService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish) + BulkImportServiceImpl.backgroundCallableTasks.add(() -> + bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish) ); return ResponseEntity.ok().body(msg); diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index 2c32c9b..b6dca27 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -3,7 +3,7 @@ package eu.openaire.urls_controller.controllers; import eu.openaire.urls_controller.models.UrlReport; import eu.openaire.urls_controller.models.WorkerInfo; import eu.openaire.urls_controller.payloads.requests.WorkerReport; -import eu.openaire.urls_controller.services.FullTextsServiceImpl; +import eu.openaire.urls_controller.services.BulkImportServiceImpl; import eu.openaire.urls_controller.services.UrlsService; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.ParquetFileUtils; @@ -111,7 +111,7 @@ public class UrlsController { workerInfo.setHasShutdown(false); } } else { - logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP."); + logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "]."); workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false)); } @@ -171,7 +171,7 @@ public class UrlsController { // The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0). int finalSizeOUrlReports = sizeOUrlReports; - FullTextsServiceImpl.backgroundCallableTasks.add(() -> + BulkImportServiceImpl.backgroundCallableTasks.add(() -> urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports) ); diff --git a/src/main/java/eu/openaire/urls_controller/services/FullTextsService.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportService.java similarity index 93% rename from src/main/java/eu/openaire/urls_controller/services/FullTextsService.java rename to src/main/java/eu/openaire/urls_controller/services/BulkImportService.java index 6cb9cb4..3e2430f 100644 --- a/src/main/java/eu/openaire/urls_controller/services/FullTextsService.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportService.java @@ -6,7 +6,7 @@ import eu.openaire.urls_controller.models.BulkImportReport; import java.io.File; import java.util.List; -public interface FullTextsService { +public interface BulkImportService { Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish); diff --git a/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java similarity index 96% rename from src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java rename to src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index 49aad0d..aa7e3f7 100644 --- a/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -4,7 +4,7 @@ package eu.openaire.urls_controller.services; import com.google.common.collect.Lists; import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.configuration.ImpalaConnector; -import eu.openaire.urls_controller.controllers.FullTextsController; +import eu.openaire.urls_controller.controllers.BulkImportController; import eu.openaire.urls_controller.models.BulkImportReport; import eu.openaire.urls_controller.models.DocFileData; import eu.openaire.urls_controller.models.FileLocationData; @@ -36,9 +36,9 @@ import java.util.stream.Stream; @Service -public class FullTextsServiceImpl implements FullTextsService { +public class BulkImportServiceImpl implements BulkImportService { - private static final Logger logger = LoggerFactory.getLogger(FullTextsServiceImpl.class); + private static final Logger logger = LoggerFactory.getLogger(BulkImportServiceImpl.class); @Autowired @@ -78,7 +78,7 @@ public class FullTextsServiceImpl implements FullTextsService { logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - FullTextsController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -86,7 +86,7 @@ public class FullTextsServiceImpl implements FullTextsService { if ( fileLocations == null ) { bulkImportReport.addEvent("Could not retrieve the files for bulk-import!"); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - FullTextsController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -96,7 +96,7 @@ public class FullTextsServiceImpl implements FullTextsService { logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - FullTextsController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -111,7 +111,7 @@ public class FullTextsServiceImpl implements FullTextsService { logger.error(errorMsg, e); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - FullTextsController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -122,7 +122,7 @@ public class FullTextsServiceImpl implements FullTextsService { logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - FullTextsController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -169,7 +169,7 @@ public class FullTextsServiceImpl implements FullTextsService { logger.error(errorMsg, e); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - FullTextsController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirs.remove(bulkImportDirName); return false; } finally { logger.debug("Deleting local parquet directory: " + localParquetDir); @@ -183,7 +183,7 @@ public class FullTextsServiceImpl implements FullTextsService { logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - FullTextsController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirs.remove(bulkImportDirName); return false; } else if ( numFailedFiles > 0 ) { // Some failed, but not all. msg = numFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName; @@ -202,7 +202,7 @@ public class FullTextsServiceImpl implements FullTextsService { ImpalaConnector.databaseLock.unlock(); bulkImportReport.addEvent(mergeErrorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - FullTextsController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirs.remove(bulkImportDirName); return false; } ImpalaConnector.databaseLock.unlock(); @@ -215,7 +215,7 @@ public class FullTextsServiceImpl implements FullTextsService { // Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time, // after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours). - FullTextsController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirs.remove(bulkImportDirName); return true; }