Improve performance by applying the merging-procedure for the parquet files of the database tables less often, while keeping the benefits of having a relatively small maximum number of parquet files in search operations.

This commit is contained in:
Lampros Smyrnaios 2023-07-24 20:28:41 +03:00
parent 7dc72e242e
commit d821ae398f
2 changed files with 57 additions and 33 deletions

View File

@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -47,6 +48,8 @@ public class UrlsController {
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6); public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
public static final AtomicInteger numOfWorkers = new AtomicInteger(0);
public static ExecutorService backgroundExecutor; public static ExecutorService backgroundExecutor;
@ -117,10 +120,12 @@ public class UrlsController {
if ( workerInfo.getHasShutdown() ) { if ( workerInfo.getHasShutdown() ) {
logger.info("The worker with id \"" + workerId + "\" was restarted."); logger.info("The worker with id \"" + workerId + "\" was restarted.");
workerInfo.setHasShutdown(false); workerInfo.setHasShutdown(false);
numOfWorkers.decrementAndGet();
} }
} else { } else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "] in memory."); logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "] in memory.");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false)); workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
numOfWorkers.incrementAndGet();
} }
return urlsService.getAssignments(workerId, assignmentsLimit); return urlsService.getAssignments(workerId, assignmentsLimit);

View File

@ -272,9 +272,15 @@ public class UrlsServiceImpl implements UrlsService {
} }
public static final AtomicLong numOfWorkerReportsProcessed = new AtomicLong(0);
@Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.") @Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.")
public Boolean addWorkerReport(String curWorkerId, long curReportAssignmentsCounter, List<UrlReport> urlReports, int sizeOfUrlReports) public Boolean addWorkerReport(String curWorkerId, long curReportAssignmentsCounter, List<UrlReport> urlReports, int sizeOfUrlReports)
{ {
long currentNumOfWorkerReportsProcessed = numOfWorkerReportsProcessed.incrementAndGet(); // Increment it when it is actually being pressed, not when it arrives at the endpoint.
logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter); logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter);
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
@ -392,41 +398,13 @@ public class UrlsServiceImpl implements UrlsService {
} }
} }
logger.debug("Going to merge the parquet files for the tables which were altered."); // For every "numOfWorkers" assignment-batches that go to workers, we merge the tables, once a workerReport comes in.
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table. // After the first few increases of "assignmentsBatchCounter" until all workers get assignment-batches,
// there will always be a time when the counter will be just before the "golden-value" and then one workerReport has to be processed here and the counter will be incremented by one and signal the merging-time.
String mergeErrorMsg; if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 )
if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem) )
ImpalaConnector.databaseLock.lock();
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
return false; return false;
}
}
if ( ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
return false;
}
}
mergeErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
return false;
}
ImpalaConnector.databaseLock.unlock();
logger.debug("Finished merging the database tables.");
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "The full-text files failed to be acquired from the worker!"); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "The full-text files failed to be acquired from the worker!");
return false; return false;
@ -478,6 +456,47 @@ public class UrlsServiceImpl implements UrlsService {
} }
private boolean mergeWorkerRelatedTables(String curWorkerId, long curReportAssignmentsCounter, boolean hasAttemptParquetFileProblem, boolean hasPayloadParquetFileProblem)
{
logger.debug("Going to merge the parquet files for the tables which were altered.");
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
// This means that over time a table may have thousand of parquet files and the search through them will be very slow. Thus, we merge them every now and then.
String mergeErrorMsg;
ImpalaConnector.databaseLock.lock();
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
return false;
}
}
if ( ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
return false;
}
}
mergeErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
return false;
}
ImpalaConnector.databaseLock.unlock();
logger.debug("Finished merging the database tables.");
return true;
}
private String deleteAssignmentsBatch(long givenAssignmentsBatchCounter) private String deleteAssignmentsBatch(long givenAssignmentsBatchCounter)
{ {
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data. // This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.