- Move the "FileUtils.mergeParquetFiles()" method to "ParquetFileUtils.mergeParquetFilesOfTable()".
- Fix a typo.
This commit is contained in:
parent
724eae1514
commit
56d233d38e
|
@ -120,7 +120,7 @@ public class BulkImportServiceImpl implements BulkImportService {
|
|||
|
||||
// Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before.
|
||||
String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir;
|
||||
if ( ! parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick.
|
||||
if ( ! parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick.
|
||||
String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir;
|
||||
logger.error(errorMsg + additionalLoggingMsg);
|
||||
bulkImportReport.addEvent(errorMsg);
|
||||
|
@ -208,7 +208,7 @@ public class BulkImportServiceImpl implements BulkImportService {
|
|||
|
||||
// Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations.
|
||||
DatabaseConnector.databaseLock.lock();
|
||||
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null); // msg is already logged
|
||||
String mergeErrorMsg = parquetFileUtils.mergeParquetFilesOfTable("payload_bulk_import", "", null); // msg is already logged
|
||||
DatabaseConnector.databaseLock.unlock();
|
||||
if ( mergeErrorMsg != null ) { // the message in already logged
|
||||
bulkImportReport.addEvent(mergeErrorMsg);
|
||||
|
|
|
@ -311,8 +311,8 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
|
||||
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
|
||||
String endingMkDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.mkDirsAndParams;
|
||||
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
|
||||
|| (hasFulltexts && !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams)) )
|
||||
if ( !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
|
||||
|| (hasFulltexts && !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams)) )
|
||||
{
|
||||
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignmentsCounter);
|
||||
return false;
|
||||
|
@ -390,8 +390,8 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
fileUtils.deleteDirectory(new File(localParquetPath));
|
||||
// Delete the HDFS subDirs for this Report.
|
||||
String endingRmDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.rmDirsAndParams;
|
||||
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
|
||||
|| (hasFulltexts && !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams)) )
|
||||
if ( !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
|
||||
|| (hasFulltexts && !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams)) )
|
||||
{
|
||||
logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); // A directory-specific log has already appeared.
|
||||
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
|
||||
|
@ -477,7 +477,7 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
DatabaseConnector.databaseLock.lock();
|
||||
|
||||
if ( ! hasAttemptParquetFileProblem ) {
|
||||
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
|
||||
mergeErrorMsg = parquetFileUtils.mergeParquetFilesOfTable("attempt", "", null);
|
||||
if ( mergeErrorMsg != null ) {
|
||||
DatabaseConnector.databaseLock.unlock();
|
||||
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
|
||||
|
@ -486,7 +486,7 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
}
|
||||
|
||||
if ( hasFulltexts && ! hasPayloadParquetFileProblem ) {
|
||||
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
|
||||
mergeErrorMsg = parquetFileUtils.mergeParquetFilesOfTable("payload_aggregated", "", null);
|
||||
if ( mergeErrorMsg != null ) {
|
||||
DatabaseConnector.databaseLock.unlock();
|
||||
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
|
||||
|
@ -506,7 +506,7 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
// As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
|
||||
// Only the rows referring to OTHER "givenAssignmentsBatchCounter" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
|
||||
// We don't need to keep the assignment-info forever, as the "findAssignmentsQuery" checks the "payload" table for previously handled tasks.
|
||||
return fileUtils.mergeParquetFiles("assignment", " WHERE assignments_batch_counter != ", givenAssignmentsBatchCounter);
|
||||
return parquetFileUtils.mergeParquetFilesOfTable("assignment", " WHERE assignments_batch_counter != ", givenAssignmentsBatchCounter);
|
||||
}
|
||||
|
||||
|
||||
|
@ -516,7 +516,7 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
// As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
|
||||
// Only the rows referring to NEWER than "givenDate" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
|
||||
// We don't need to keep the assignment-info forever, as the "findAssignmentsQuery" checks the "payload" table for previously handled tasks.
|
||||
return fileUtils.mergeParquetFiles("assignment", " WHERE `date` >= ", givenDate);
|
||||
return parquetFileUtils.mergeParquetFilesOfTable("assignment", " WHERE `date` >= ", givenDate);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -80,112 +80,6 @@ public class FileUtils {
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
|
||||
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
|
||||
* Renames the clone to the original's name.
|
||||
* Returns the errorMsg, if an error appears, otherwise is returns "null".
|
||||
*/
|
||||
public String mergeParquetFiles(String tableName, String whereClause, Object parameter) {
|
||||
String errorMsg;
|
||||
if ( (tableName == null) || tableName.isEmpty() ) {
|
||||
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
|
||||
logger.error(errorMsg);
|
||||
return errorMsg; // Return the error-msg to indicate that something went wrong and pass it down to the Worker.
|
||||
}
|
||||
|
||||
// Make sure the following are empty strings.
|
||||
whereClause = (whereClause != null) ? (whereClause + " ") : "";
|
||||
|
||||
if ( parameter == null )
|
||||
parameter = "";
|
||||
else if ( parameter instanceof String )
|
||||
parameter = "'" + parameter + "'"; // This will be a "string-check", thus the single-quotes.
|
||||
// Else it is a "long", it will be used as is.
|
||||
|
||||
// Create a temp-table as a copy of the initial table.
|
||||
try {
|
||||
jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + " " + whereClause + parameter);
|
||||
} catch (Exception e) {
|
||||
errorMsg = "Problem when copying the contents of \"" + tableName + "\" table to a newly created \"" + tableName + "_tmp\" table, when merging the parquet-files!\n";
|
||||
logger.error(errorMsg, e);
|
||||
try { // Make sure we delete the possibly half-created temp-table.
|
||||
jdbcTemplate.execute("DROP TABLE IF EXISTS " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
|
||||
// We cannot move on with merging, but no harm happened, since the "table_tmp" name is still reserved for future use (after it was dropped immediately)..
|
||||
} catch (Exception e1) {
|
||||
logger.error("Failed to drop the \"" + tableName + "_tmp\" table!", e1);
|
||||
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
|
||||
}
|
||||
return errorMsg; // We return only the initial error to the Worker, which is easily distinguished indie the "merge-queries".
|
||||
}
|
||||
|
||||
// Drop the initial table.
|
||||
try {
|
||||
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + " PURGE");
|
||||
} catch (Exception e) {
|
||||
errorMsg = "Problem when dropping the initial \"" + tableName + "\" table, when merging the parquet-files!\n";
|
||||
logger.error(errorMsg, e);
|
||||
// The original table could not be dropped, so the temp-table cannot be renamed to the original..!
|
||||
try { // Make sure we delete the already created temp-table, in order to be able to use it in the future. The merging has failed nevertheless.
|
||||
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
|
||||
} catch (Exception e1) {
|
||||
logger.error((errorMsg += "Failed to drop the \"" + tableName + "_tmp\" table!"), e1); // Add this error to the original, both are very important.
|
||||
}
|
||||
// Here, the original table is created.
|
||||
return errorMsg;
|
||||
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
|
||||
}
|
||||
|
||||
// Rename the temp-table to have the initial-table's name.
|
||||
try {
|
||||
jdbcTemplate.execute("ALTER TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp RENAME TO " + DatabaseConnector.databaseName + "." + tableName);
|
||||
} catch (Exception e) {
|
||||
errorMsg = "Problem in renaming the \"" + tableName + "_tmp\" table to \"" + tableName + "\", when merging the parquet-files!\n";
|
||||
logger.error(errorMsg, e);
|
||||
// At this point we only have a "temp-table", the original is already deleted..
|
||||
// Try to create the original, as a copy of the temp-table. If that succeeds, then try to delete the temp-table.
|
||||
try {
|
||||
jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + " stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + "_tmp");
|
||||
} catch (Exception e1) {
|
||||
errorMsg = "Problem when copying the contents of \"" + tableName + "_tmp\" table to a newly created \"" + tableName + "\" table, when merging the parquet-files!\n";
|
||||
logger.error(errorMsg, e1);
|
||||
// If the original table was not created, then we have to intervene manually, if it was created but without any data, then we can safely move on handling other assignments and workerReports, but the data will be lost! So this workerReport failed to be handled.
|
||||
try { // The below query normally returns a list, as it takes a "regex-pattern" as an input. BUT, we give just the table name, without wildcards. So the result is either the tableName itself or none (not any other table).
|
||||
jdbcTemplate.queryForObject("SHOW TABLES IN " + DatabaseConnector.databaseName + " LIKE '" + tableName + "'", List.class);
|
||||
} catch (EmptyResultDataAccessException erdae) {
|
||||
// The table does not exist, so it was not even half-created by the previous query.
|
||||
// Not having the original table anymore is a serious error. A manual action is needed!
|
||||
logger.error((errorMsg += "The original table \"" + tableName + "\" must be created manually! Serious problems may appear otherwise!"));
|
||||
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and fix it immediately to avoid other errors in the Service)..!
|
||||
}
|
||||
// Here, the original-table exists in the DB, BUT without any data inside! This workerReport failed to be handled! (some of its data could not be loaded to the database, and all previous data was lost).
|
||||
return errorMsg;
|
||||
}
|
||||
|
||||
// The creation of the original table was successful. Try to delete the temp-table.
|
||||
try {
|
||||
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
|
||||
} catch (Exception e2) {
|
||||
logger.error((errorMsg += "Problem when dropping the \"" + tableName + "_tmp\" table, when merging the parquet-files!\n"), e2);
|
||||
// Manual deletion should be performed!
|
||||
return errorMsg; // Return both errors here, as the second is so important that if it did not happen then we could move on with this workerReport.
|
||||
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
|
||||
}
|
||||
// Here the original table exists and the temp-table is deleted. We eventually have the same state as if the "ALTER TABLE" succeeded.
|
||||
}
|
||||
|
||||
// Gather information to be used for queries-optimization.
|
||||
try {
|
||||
jdbcTemplate.execute("COMPUTE STATS " + DatabaseConnector.databaseName + "." + tableName);
|
||||
} catch (Exception e) {
|
||||
logger.error("Problem when gathering information from table \"" + tableName + "\" to be used for queries-optimization.", e);
|
||||
// In this case the error is not so important to the whole operation.. It's only that the performance of this specific table will be less optimal, only temporarily, unless every "COMPUTE STATS" query fails for future workerReports too.
|
||||
}
|
||||
|
||||
return null; // No errorMsg, everything is fine.
|
||||
}
|
||||
|
||||
|
||||
public static final DecimalFormat df = new DecimalFormat("0.00");
|
||||
|
||||
// The following regex might be useful in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension".
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
|
|||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.dao.EmptyResultDataAccessException;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
@ -567,9 +568,9 @@ public class ParquetFileUtils {
|
|||
|
||||
if ( statusCode == 404 ) {
|
||||
logger.info(initErrMsg + "\"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
|
||||
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
|
||||
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
|
||||
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
|
||||
attemptCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
|
||||
payloadAggregatedCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
|
||||
payloadBulkImportCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
|
||||
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
|
||||
// We need all directories to be created in order for the app to function properly!
|
||||
}
|
||||
|
@ -626,19 +627,19 @@ public class ParquetFileUtils {
|
|||
// For each missing subdirectories, run the mkDirs-request.
|
||||
if ( !foundAttemptsDir ) {
|
||||
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it.");
|
||||
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
|
||||
attemptCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
|
||||
} else
|
||||
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
|
||||
|
||||
if ( !foundPayloadsAggregatedDir ) {
|
||||
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it.");
|
||||
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
|
||||
payloadAggregatedCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
|
||||
} else
|
||||
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists.");
|
||||
|
||||
if ( !foundPayloadsBulkImportDir ) {
|
||||
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it.");
|
||||
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
|
||||
payloadBulkImportCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
|
||||
} else
|
||||
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists.");
|
||||
|
||||
|
@ -651,7 +652,7 @@ public class ParquetFileUtils {
|
|||
}
|
||||
|
||||
|
||||
public boolean applyHDFOperation(String hdfsOperationUrl)
|
||||
public boolean applyHDFSOperation(String hdfsOperationUrl)
|
||||
{
|
||||
try {
|
||||
URL url = new URL(hdfsOperationUrl);
|
||||
|
@ -727,4 +728,110 @@ public class ParquetFileUtils {
|
|||
return new SumParquetSuccess(hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
|
||||
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
|
||||
* Renames the clone to the original's name.
|
||||
* Returns the errorMsg, if an error appears, otherwise is returns "null".
|
||||
*/
|
||||
public String mergeParquetFilesOfTable(String tableName, String whereClause, Object parameter) {
|
||||
String errorMsg;
|
||||
if ( (tableName == null) || tableName.isEmpty() ) {
|
||||
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
|
||||
logger.error(errorMsg);
|
||||
return errorMsg; // Return the error-msg to indicate that something went wrong and pass it down to the Worker.
|
||||
}
|
||||
|
||||
// Make sure the following are empty strings.
|
||||
whereClause = (whereClause != null) ? (whereClause + " ") : "";
|
||||
|
||||
if ( parameter == null )
|
||||
parameter = "";
|
||||
else if ( parameter instanceof String )
|
||||
parameter = "'" + parameter + "'"; // This will be a "string-check", thus the single-quotes.
|
||||
// Else it is a "long", it will be used as is.
|
||||
|
||||
// Create a temp-table as a copy of the initial table.
|
||||
try {
|
||||
jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + " " + whereClause + parameter);
|
||||
} catch (Exception e) {
|
||||
errorMsg = "Problem when copying the contents of \"" + tableName + "\" table to a newly created \"" + tableName + "_tmp\" table, when merging the parquet-files!\n";
|
||||
logger.error(errorMsg, e);
|
||||
try { // Make sure we delete the possibly half-created temp-table.
|
||||
jdbcTemplate.execute("DROP TABLE IF EXISTS " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
|
||||
// We cannot move on with merging, but no harm happened, since the "table_tmp" name is still reserved for future use (after it was dropped immediately)..
|
||||
} catch (Exception e1) {
|
||||
logger.error("Failed to drop the \"" + tableName + "_tmp\" table!", e1);
|
||||
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
|
||||
}
|
||||
return errorMsg; // We return only the initial error to the Worker, which is easily distinguished indie the "merge-queries".
|
||||
}
|
||||
|
||||
// Drop the initial table.
|
||||
try {
|
||||
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + " PURGE");
|
||||
} catch (Exception e) {
|
||||
errorMsg = "Problem when dropping the initial \"" + tableName + "\" table, when merging the parquet-files!\n";
|
||||
logger.error(errorMsg, e);
|
||||
// The original table could not be dropped, so the temp-table cannot be renamed to the original..!
|
||||
try { // Make sure we delete the already created temp-table, in order to be able to use it in the future. The merging has failed nevertheless.
|
||||
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
|
||||
} catch (Exception e1) {
|
||||
logger.error((errorMsg += "Failed to drop the \"" + tableName + "_tmp\" table!"), e1); // Add this error to the original, both are very important.
|
||||
}
|
||||
// Here, the original table is created.
|
||||
return errorMsg;
|
||||
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
|
||||
}
|
||||
|
||||
// Rename the temp-table to have the initial-table's name.
|
||||
try {
|
||||
jdbcTemplate.execute("ALTER TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp RENAME TO " + DatabaseConnector.databaseName + "." + tableName);
|
||||
} catch (Exception e) {
|
||||
errorMsg = "Problem in renaming the \"" + tableName + "_tmp\" table to \"" + tableName + "\", when merging the parquet-files!\n";
|
||||
logger.error(errorMsg, e);
|
||||
// At this point we only have a "temp-table", the original is already deleted..
|
||||
// Try to create the original, as a copy of the temp-table. If that succeeds, then try to delete the temp-table.
|
||||
try {
|
||||
jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + " stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + "_tmp");
|
||||
} catch (Exception e1) {
|
||||
errorMsg = "Problem when copying the contents of \"" + tableName + "_tmp\" table to a newly created \"" + tableName + "\" table, when merging the parquet-files!\n";
|
||||
logger.error(errorMsg, e1);
|
||||
// If the original table was not created, then we have to intervene manually, if it was created but without any data, then we can safely move on handling other assignments and workerReports, but the data will be lost! So this workerReport failed to be handled.
|
||||
try { // The below query normally returns a list, as it takes a "regex-pattern" as an input. BUT, we give just the table name, without wildcards. So the result is either the tableName itself or none (not any other table).
|
||||
jdbcTemplate.queryForObject("SHOW TABLES IN " + DatabaseConnector.databaseName + " LIKE '" + tableName + "'", List.class);
|
||||
} catch (EmptyResultDataAccessException erdae) {
|
||||
// The table does not exist, so it was not even half-created by the previous query.
|
||||
// Not having the original table anymore is a serious error. A manual action is needed!
|
||||
logger.error((errorMsg += "The original table \"" + tableName + "\" must be created manually! Serious problems may appear otherwise!"));
|
||||
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and fix it immediately to avoid other errors in the Service)..!
|
||||
}
|
||||
// Here, the original-table exists in the DB, BUT without any data inside! This workerReport failed to be handled! (some of its data could not be loaded to the database, and all previous data was lost).
|
||||
return errorMsg;
|
||||
}
|
||||
|
||||
// The creation of the original table was successful. Try to delete the temp-table.
|
||||
try {
|
||||
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
|
||||
} catch (Exception e2) {
|
||||
logger.error((errorMsg += "Problem when dropping the \"" + tableName + "_tmp\" table, when merging the parquet-files!\n"), e2);
|
||||
// Manual deletion should be performed!
|
||||
return errorMsg; // Return both errors here, as the second is so important that if it did not happen then we could move on with this workerReport.
|
||||
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
|
||||
}
|
||||
// Here the original table exists and the temp-table is deleted. We eventually have the same state as if the "ALTER TABLE" succeeded.
|
||||
}
|
||||
|
||||
// Gather information to be used for queries-optimization.
|
||||
try {
|
||||
jdbcTemplate.execute("COMPUTE STATS " + DatabaseConnector.databaseName + "." + tableName);
|
||||
} catch (Exception e) {
|
||||
logger.error("Problem when gathering information from table \"" + tableName + "\" to be used for queries-optimization.", e);
|
||||
// In this case the error is not so important to the whole operation.. It's only that the performance of this specific table will be less optimal, only temporarily, unless every "COMPUTE STATS" query fails for future workerReports too.
|
||||
}
|
||||
|
||||
return null; // No errorMsg, everything is fine.
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue