- Make sure the test-environment uses a dedicated hdfs-parquet-directory.

- Block app-execution in case the hdfs parquet directories failed to be created.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2023-01-18 13:38:05 +02:00
parent b0b00c8aed
commit d8773e6ebb
1 changed files with 18 additions and 12 deletions

View File

@ -72,7 +72,6 @@ public class ParquetFileUtils {
public Schema payloadsSchema;
public Schema attemptsSchema;
public final String parquetHDFSDirectoryPathAttempts;
public final String parquetHDFSDirectoryPathPayloads;
@ -80,7 +79,8 @@ public class ParquetFileUtils {
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
@Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${output.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath,
@Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir, FileUtils fileUtils) throws IOException
@Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir,
@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, FileUtils fileUtils) throws IOException
{
if ( webHDFSBaseUrl.endsWith("/") ) // We don't wand an ending slash in the url (as it causes problems when the file=path is added).
this.webHDFSBaseUrl = webHDFSBaseUrl.substring(0, (webHDFSBaseUrl.length() -1));
@ -119,6 +119,9 @@ public class ParquetFileUtils {
if ( !hdfsParquetBaseDir.endsWith("/") )
hdfsParquetBaseDir += "/";
if ( isTestEnvironment ) // Make sure the hdfs-remote-dir is different for running tests, in order to not cause conflicts with production.
hdfsParquetBaseDir += "test/";
this.parquetHDFSDirectoryPathPayloads = hdfsParquetBaseDir + "payloads/";
this.parquetHDFSDirectoryPathAttempts = hdfsParquetBaseDir + "attempts/";
this.fileUtils = fileUtils;
@ -450,14 +453,17 @@ public class ParquetFileUtils {
// Check if the remote directories exist. If so, then return and continue with execution.
// If the directories do not exist, then make them in two requests.
// The WebHDFS uses the "mkdirs" operations which creates all the non-existent directories in the specified path.
// So with one request we will create the "parquet_uploads/" and the "parquet_uploads/payloads/" and with the seconds request, the "parquet_uploads/attempts/" directory.
// So with one request we will create the "parquet_uploads/" and the "parquet_uploads/attempts/" and with the seconds request, the "parquet_uploads/payloads/" directory.
String mkdirsParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName;
String mkDirsParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName;
logger.info("Going to check if the remote parquet directories exist.");
String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName;
boolean payloadCreationSuccessful = true;
boolean attemptCreationSuccessful = true;
// Get the "fileStatuses" of the directories (main and subdirectories) in one request.
try {
URL url = new URL(listMainDirectoryUrl);
@ -476,9 +482,8 @@ public class ParquetFileUtils {
if ( statusCode == 404 ) {
logger.info("The directory \"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkdirsParams);
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkdirsParams);
return true;
attemptCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsParams);
payloadCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkDirsParams);
}
else {
// Check the json-response, to see if all the subdirectories exist.
@ -522,19 +527,19 @@ public class ParquetFileUtils {
return false;
}
// IMPORTANT NOTE: It is possible that the ".../payloads" dir exists, but the ".../attempts" dir does not (in case of remote filesystem failure of by accidental deletion by some other user).
// IMPORTANT NOTE: It is possible that the ".../attempts" dir exists, but the ".../payloads" dir does not and vise-versa (in case of remote filesystem failure of by accidental deletion by some other user).
// Also, it is possible that the Controller was terminated before creating all the directories, or that in the previous executions the second "create"-request failed, resulting in Controller's shut down.
// For each missing subdirectories, run the mkdirs-request.
// For each missing subdirectories, run the mkDirs-request.
if ( !foundAttemptsDir ) {
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it.");
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkdirsParams);
attemptCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsParams);
} else
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
if ( !foundPayloadsDir ) {
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloads + "\" does not exist! Going to create it.");
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkdirsParams);
payloadCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkDirsParams);
} else
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloads + "\" exists.");
}
@ -543,7 +548,8 @@ public class ParquetFileUtils {
return false;
}
return true;
return (attemptCreationSuccessful && payloadCreationSuccessful);
// We need both to be created in order for the app to function properly!
}