- Handle the case, were multiple threads may load the same HDFS directory to a database table, thus causing the "directory contains no visible files"-SQLException.

- Improve the values of the delays for some scheduledTasks.
- Improve elapsed time precision for the "lastAccessedOn" metadata of the workerReports.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2023-05-25 00:34:36 +03:00
parent 164245cb53
commit 2b50e08bf6
2 changed files with 30 additions and 15 deletions

View File

@ -44,8 +44,8 @@ public class ScheduledTasks {
}
@Scheduled(initialDelay = 3_600_000, fixedDelay = 3_600_000) // Execute this method 1 hour from the start and 1 hour after each last execution, in order for some tasks to have been gathered.
//@Scheduled(initialDelay = 3_600_000, fixedDelay = 20_000) // Just for testing (every 20 secs).
@Scheduled(initialDelay = 1_800_000, fixedDelay = 1_800_000) // Execute this method 30 mins from the start and 30 mins after each last execution, in order for some tasks to have been gathered.
//@Scheduled(initialDelay = 20_000, fixedDelay = 20_000) // Just for testing (every 20 secs).
public void executeBackgroundTasks()
{
List<Callable<Boolean>> tempList = new ArrayList<>(FullTextsServiceImpl.backgroundCallableTasks); // Copy the list in order to know what was executed and delete only that data later.
@ -97,15 +97,14 @@ public class ScheduledTasks {
}
private static final int daysToWaitBeforeDeletion = 7;
private static final double daysToWaitBeforeDeletion = 7.0;
@Scheduled(initialDelay = 120_000, fixedDelay = 604_800_000) // Run every 7 days.
//@Scheduled(initialDelay = 120_000, fixedDelay = 20_000) // Just for testing (every 20 secs).
@Scheduled(initialDelay = 604_800_000, fixedDelay = 604_800_000) // Run every 7 days.
//@Scheduled(initialDelay = 1_200_000, fixedDelay = 1_200_000) // Just for testing (every 1200 secs).
public void checkAndDeleteUnsuccessfulWorkerReports()
{
logger.debug("Going to check and remove any unsuccessful workerReports, which are more than 7 days old.");
int usableDirsNum = 0;
try {
File workerReportsDir = new File(workerReportsDirPath);
if ( !workerReportsDir.isDirectory() ) {
@ -118,9 +117,7 @@ public class ScheduledTasks {
logger.error("There was an error when getting the subDirs of \"workerReportsDir\": " + workerReportsDir);
return;
}
usableDirsNum = workerReports.length;
if ( usableDirsNum == 0 ) {
else if ( workerReports.length == 0 ) {
logger.debug("The \"workerReportsDir\" is empty, so there is nothing to delete.");
return;
}
@ -135,11 +132,11 @@ public class ScheduledTasks {
logger.trace("The workerReport \"" + workerReport.getName() + "\" was last accessed in: " + new Date(lastModified));
// Get the difference in hours. /1000 to get seconds, /60 to get minutes, /60 to get hours and /24 to get days.
long elapsedWeeks = (currentTime - lastModified) / (1000 * 60 * 60 * 24);
if ( elapsedWeeks > daysToWaitBeforeDeletion ) {
double elapsedDays = (double) (currentTime - lastModified) / (1000 * 60 * 60 * 24);
if ( elapsedDays > daysToWaitBeforeDeletion ) {
// Enough time has passed, the directory should be deleted immediately.
String workerReportName = workerReport.getName();
logger.warn("The workerReport \"" + workerReportName + "\" was accessed " + elapsedWeeks + " days ago (passed the " + daysToWaitBeforeDeletion + " days limit) and will be deleted.");
logger.warn("The workerReport \"" + workerReportName + "\" was accessed " + elapsedDays + " days ago (passed the " + daysToWaitBeforeDeletion + " days limit) and will be deleted.");
fileUtils.deleteFile(workerReport.getAbsolutePath());
}
}

View File

@ -38,6 +38,7 @@ import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Base64;
@ -452,17 +453,34 @@ public class ParquetFileUtils {
}
public boolean loadParquetDataIntoTable(String remoteParquetDataDirectory, String tableName)
public boolean loadParquetDataIntoTable(String remoteParquetDataLocation, String tableName)
{
// Import the data from the parquet file into the database's table.
String loadParquetInTableQuery = "load data inpath '" + remoteParquetDataDirectory + "' into table " + ImpalaConnector.databaseName + "." + tableName;
String loadParquetInTableQuery = "load data inpath '" + remoteParquetDataLocation + "' into table " + ImpalaConnector.databaseName + "." + tableName;
try {
jdbcTemplate.execute(loadParquetInTableQuery);
} catch (Exception e) {
/*
// TODO - We will make a new sub-dir for each assignments-counter, which will be deleted afterwards.
// These subDirs will replace the "worker-subDirs", since we do not need them then.
// In case a subDir fails to be Deleted, we do not mark the whole process as failed, but instead, just log the issue and move on.
// There is no big deal to have a few "leftover" subDirs.
// Upon a Service restart (were the counter resets) these leftover dirs will be used as if they were new (any leftwover parquet files will be overwrite by the new ones of the same name).
*/
Throwable cause = e.getCause();
if ( cause instanceof SQLException ) { // In this case the "parent" exception is: "org.springframework.jdbc.UncategorizedSQLException".
String errorMsg = cause.getMessage();
if ( (errorMsg != null) && errorMsg.contains("contains no visible files") ) {
logger.warn("The \"remoteParquetDataLocation\": \"" + remoteParquetDataLocation + "\" was found empty, when tried to load its content into the \"" + tableName + "\" table. Most likely, another thread loaded all content before this one got a chance. Continuing as normal.");
return true;
}
}
ImpalaConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged.
return false;
}
//logger.trace("The data from \"" + remoteParquetDataDirectory + "\" was loaded into the " + tableName + " table."); // DEBUG!
//logger.trace("The data from \"" + remoteParquetDataLocation + "\" was loaded into the " + tableName + " table."); // DEBUG!
return true;
}