From d821ae398f539828432afd8c1e5ae70b1b9a1dad Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Mon, 24 Jul 2023 20:28:41 +0300 Subject: [PATCH] Improve performance by applying the merging-procedure for the parquet files of the database tables less often, while keeping the benefits of having a relatively small maximum number of parquet files in search operations. --- .../controllers/UrlsController.java | 5 ++ .../services/UrlsServiceImpl.java | 85 ++++++++++++------- 2 files changed, 57 insertions(+), 33 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index 38e6dbf..ff724fe 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -25,6 +25,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; @@ -47,6 +48,8 @@ public class UrlsController { public static final ConcurrentHashMap workersInfoMap = new ConcurrentHashMap<>(6); + public static final AtomicInteger numOfWorkers = new AtomicInteger(0); + public static ExecutorService backgroundExecutor; @@ -117,10 +120,12 @@ public class UrlsController { if ( workerInfo.getHasShutdown() ) { logger.info("The worker with id \"" + workerId + "\" was restarted."); workerInfo.setHasShutdown(false); + numOfWorkers.decrementAndGet(); } } else { logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "] in memory."); workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false)); + numOfWorkers.incrementAndGet(); } return urlsService.getAssignments(workerId, assignmentsLimit); diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 441bbf4..ce13c1b 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -272,9 +272,15 @@ public class UrlsServiceImpl implements UrlsService { } + + public static final AtomicLong numOfWorkerReportsProcessed = new AtomicLong(0); + + @Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.") public Boolean addWorkerReport(String curWorkerId, long curReportAssignmentsCounter, List urlReports, int sizeOfUrlReports) { + long currentNumOfWorkerReportsProcessed = numOfWorkerReportsProcessed.incrementAndGet(); // Increment it when it is actually being pressed, not when it arrives at the endpoint. + logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter); // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". @@ -392,41 +398,13 @@ public class UrlsServiceImpl implements UrlsService { } } - logger.debug("Going to merge the parquet files for the tables which were altered."); - // When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table. - - String mergeErrorMsg; - - ImpalaConnector.databaseLock.lock(); - - if ( ! hasAttemptParquetFileProblem ) { - mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null); - if ( mergeErrorMsg != null ) { - ImpalaConnector.databaseLock.unlock(); - postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); + // For every "numOfWorkers" assignment-batches that go to workers, we merge the tables, once a workerReport comes in. + // After the first few increases of "assignmentsBatchCounter" until all workers get assignment-batches, + // there will always be a time when the counter will be just before the "golden-value" and then one workerReport has to be processed here and the counter will be incremented by one and signal the merging-time. + if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) + if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem) ) return false; - } - } - if ( ! hasPayloadParquetFileProblem ) { - mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null); - if ( mergeErrorMsg != null ) { - ImpalaConnector.databaseLock.unlock(); - postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); - return false; - } - } - - mergeErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter); - if ( mergeErrorMsg != null ) { - ImpalaConnector.databaseLock.unlock(); - postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); - return false; - } - - ImpalaConnector.databaseLock.unlock(); - - logger.debug("Finished merging the database tables."); if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "The full-text files failed to be acquired from the worker!"); return false; @@ -478,6 +456,47 @@ public class UrlsServiceImpl implements UrlsService { } + private boolean mergeWorkerRelatedTables(String curWorkerId, long curReportAssignmentsCounter, boolean hasAttemptParquetFileProblem, boolean hasPayloadParquetFileProblem) + { + logger.debug("Going to merge the parquet files for the tables which were altered."); + // When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table. + // This means that over time a table may have thousand of parquet files and the search through them will be very slow. Thus, we merge them every now and then. + + String mergeErrorMsg; + + ImpalaConnector.databaseLock.lock(); + + if ( ! hasAttemptParquetFileProblem ) { + mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null); + if ( mergeErrorMsg != null ) { + ImpalaConnector.databaseLock.unlock(); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); + return false; + } + } + + if ( ! hasPayloadParquetFileProblem ) { + mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null); + if ( mergeErrorMsg != null ) { + ImpalaConnector.databaseLock.unlock(); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); + return false; + } + } + + mergeErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter); + if ( mergeErrorMsg != null ) { + ImpalaConnector.databaseLock.unlock(); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); + return false; + } + + ImpalaConnector.databaseLock.unlock(); + logger.debug("Finished merging the database tables."); + return true; + } + + private String deleteAssignmentsBatch(long givenAssignmentsBatchCounter) { // This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.