- Use a separate HDFS sub-dir for every assignments-batch, in order to avoid any disrruptancies from multiple threads moving parquet-files from the same sub-dir. Multiple batches from the same worker may be processed at the same time. These sub-dirs are deleted afterwards.

- Treat the "contains no visible files" situation as an error. In which case the assignments-data is presumed to not have been inserted to the database tables.
- Code polishing/cleanup.
This commit is contained in:
Lampros Smyrnaios 2023-05-27 02:36:05 +03:00
parent 02cee097d4
commit 3988eb3a48
5 changed files with 43 additions and 61 deletions

View File

@ -90,6 +90,8 @@ public class ScheduledTasks {
if ( ! ShutdownController.shouldShutdownService )
return; // Either the service was never instructed to shut down, or the user canceled the request.
// If the workers have shutdown on their own, without been instructed to by the Controller, then the Controller will keep running.
for ( String workerId : UrlsController.workersInfoMap.keySet() ) {
if ( ! UrlsController.workersInfoMap.get(workerId).getHasShutdown() ) // The workerId is certainly inside the map and has a workerInfo value.
return; // If at least 1 worker is still active, then do not shut down the server.

View File

@ -45,9 +45,7 @@ public class ShutdownController {
shutdownService.postShutdownOrCancelRequestToWorker(workerId, UrlsController.workersInfoMap.get(workerId).getWorkerIP(), false);
// That's it for now. The workers may take some hours to finish their work (including delivering the full-text files).
// TODO - Add a scheduler to monitor the "HasShutdown" values for all workers.
// TODO - Once all have the value "true", gently shutdown the Controller, just like we do for the worker.
// TODO - Each worker, upon "shutdown" should send a "workerShutdownReport" to the Controller.
// A scheduler monitors the shutdown of the workers. Once all worker have shutdown, the Controller shuts down as well.
}
finalMsg += "The service will shutdown, after finishing current work.";

View File

@ -111,17 +111,8 @@ public class UrlsController {
workerInfo.setHasShutdown(false);
}
} else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP and create the remote parquet subdirectories (in HDFS).");
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP.");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
// Create extra subdirectories in HDFS parquet-directories, so that the parquet directory does not become empty right before "loading" the data to the DB, in case another worker loaded multiple-worker's data to the DB.
String endingDirAndParams = workerId + "/" + parquetFileUtils.mkDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingDirAndParams)
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingDirAndParams) )
{
String errorMsg = "Error when creating the HDFS sub-directories for worker with id: " + workerId;
logger.error(errorMsg);
return ResponseEntity.internalServerError().body(errorMsg);
}
}
return urlsService.getAssignments(workerId, assignmentsLimit);
@ -129,8 +120,8 @@ public class UrlsController {
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport) {
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport)
{
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
logger.error(errorMsg);

View File

@ -265,7 +265,16 @@ public class UrlsServiceImpl implements UrlsService {
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments);
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignments, localParquetPath, uploadFullTextsResponse, curWorkerId);
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignments, localParquetPath, uploadFullTextsResponse);
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
String endingMkDirAndParams = curReportAssignments + "/" + parquetFileUtils.mkDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams) )
{
postReportResultToWorker(curWorkerId, curReportAssignments, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignments);
return false;
}
boolean hasAttemptParquetFileProblem = false;
boolean hasPayloadParquetFileProblem = false;
@ -297,10 +306,10 @@ public class UrlsServiceImpl implements UrlsService {
ImpalaConnector.databaseLock.lock();
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curWorkerId + "/", "attempt");
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignments + "/", "attempt");
if ( ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curWorkerId + "/", "payload_aggregated");
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/", "payload_aggregated");
ImpalaConnector.databaseLock.unlock();
@ -328,10 +337,19 @@ public class UrlsServiceImpl implements UrlsService {
String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg);
return false;
return false; // No tables-merging is happening.
} finally {
logger.debug("Deleting parquet directory: " + localParquetPath);
fileUtils.deleteDirectory(new File(localParquetPath));
// Delete the HDFS subDirs for this Report.
String endingRmDirAndParams = curReportAssignments + "/" + parquetFileUtils.rmDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams) )
{
logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignments); // A directory-specific log has already appeared.
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
// The worst case is that a few subDirs will be left back in the HDFS, although, without their parquetFiles, since they have already moved inside the DB tables.
}
}
logger.debug("Going to merge the parquet files for the tables which were altered.");
@ -465,7 +483,7 @@ public class UrlsServiceImpl implements UrlsService {
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - If this is ever used, make it a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for ( int i=1; i <= dataSize; ++i ) {
sb.append("(");

View File

@ -84,6 +84,9 @@ public class ParquetFileUtils {
//public String setPermAndParams;
public String rmDirsAndParams;
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
@Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${services.pdfaggregation.controller.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath,
@ -136,6 +139,7 @@ public class ParquetFileUtils {
this.fileUtils = fileUtils;
this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName; // All permissions for user, group and others must be set, in order for this service' user to have access to the hdfs directory.
//this.setPermAndParams = "?op=SETPERMISSION&permission=777&user.name=" + hdfsUserName;
this.rmDirsAndParams = "?op=DELETE&recursive=true&user.name=" + hdfsUserName;
createRemoteParquetDirectories(hdfsParquetBaseDir);
}
@ -150,7 +154,7 @@ public class ParquetFileUtils {
}
public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse, String workerId)
public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
{
// Split the "UrlReports" into some sub-lists.
List<List<UrlReport>> subLists;
@ -168,13 +172,13 @@ public class ParquetFileUtils {
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath, workerId));
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath));
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts.
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath, workerId));
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath));
});
}
@ -187,7 +191,7 @@ public class ParquetFileUtils {
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
}
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + workerId + "/")));
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/")));
});
}
@ -201,7 +205,7 @@ public class ParquetFileUtils {
}
public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String localParquetPath, String workerId)
public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String localParquetPath)
{
List<GenericData.Record> recordList = new ArrayList<>(urlReports.size());
GenericData.Record record;
@ -248,7 +252,7 @@ public class ParquetFileUtils {
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "attempt" Impala table.
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + workerId + "/"));
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + curReportAssignments + "/"));
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
} else
return false;
@ -441,7 +445,6 @@ public class ParquetFileUtils {
// Important note!
// Using the "load data inpath" command, he files are MOVED, not copied! So we don't have to delete them afterwards.
// See: https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_load_data.html
} catch (Throwable e) {
String errorMsg = "Error while uploading parquet file \"" + parquetFileFullLocalPath + "\" to HDFS!\n" + e;
logger.error(errorMsg);
@ -460,23 +463,15 @@ public class ParquetFileUtils {
try {
jdbcTemplate.execute(loadParquetInTableQuery);
} catch (Exception e) {
/*
// TODO - We will make a new sub-dir for each assignments-counter, which will be deleted afterwards.
// These subDirs will replace the "worker-subDirs", since we do not need them then.
// In case a subDir fails to be Deleted, we do not mark the whole process as failed, but instead, just log the issue and move on.
// There is no big deal to have a few "leftover" subDirs.
// Upon a Service restart (were the counter resets) these leftover dirs will be used as if they were new (any leftwover parquet files will be overwrite by the new ones of the same name).
*/
// Check if this error is related to the files be missing from the HDFS directory.
Throwable cause = e.getCause();
if ( cause instanceof SQLException ) { // In this case the "parent" exception is: "org.springframework.jdbc.UncategorizedSQLException".
String errorMsg = cause.getMessage();
if ( (errorMsg != null) && errorMsg.contains("contains no visible files") ) {
logger.warn("The \"remoteParquetDataLocation\": \"" + remoteParquetDataLocation + "\" was found empty, when tried to load its content into the \"" + tableName + "\" table. Most likely, another thread loaded all content before this one got a chance. Continuing as normal.");
return true;
logger.error("The \"remoteParquetDataLocation\": \"" + remoteParquetDataLocation + "\" was found empty, when tried to load its content into the \"" + tableName + "\" table!");
return false; // Since each thread is using a different subDir, by design, This error is unacceptable.
}
}
ImpalaConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged.
return false;
}
@ -607,7 +602,7 @@ public class ParquetFileUtils {
try {
URL url = new URL(hdfsOperationUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
conn.setRequestMethod(hdfsOperationUrl.contains("DELETE") ? "DELETE" : "PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
@ -672,26 +667,4 @@ public class ParquetFileUtils {
return new SumParquetSuccess(hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, null);
}
// Use this if we decide to delete undeleted files (probably due to failed "load" attempts). For now, it's better to leave them there, in order to fix potential problems more easily.
// Also, the leftover files will be automatically be loaded to the table in the next "load" attempt, since we make one "load" operation with the whole directory and multiple loads, one for each file.
public String deleteFileFromHDFS(String fileLocation, String parquetFileName) throws Exception
{
// Delete the file from the temporal storage on HDFS.
HttpURLConnection conn = (HttpURLConnection) (new URL(fileLocation + "?op=DELETE&user.name=" + hdfsUserName)).openConnection();
conn.setRequestMethod("DELETE");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode == 200 ) {
logger.debug("The file \"" + parquetFileName + "\" was successfully deleted.");
} else {
String errorMsg = "The file \"" + parquetFileName + "\" could not be deleted! Response-code: " + statusCode;
logger.error(errorMsg);
return errorMsg;
}
return null;
}
}