- Fix Impala "broken pipe" error, by closing the connection when not in need. The connection is reopened later with minimal overhead, as a connection pool is used.

- Fix not closing the database-connection in case of a specific error (also in a commented error-case).
This commit is contained in:
Lampros Smyrnaios 2022-01-13 00:47:15 +02:00
parent 82bf11b9b3
commit 2cf25b0d26
2 changed files with 41 additions and 25 deletions

View File

@ -118,7 +118,7 @@ public class UrlController {
computeCurrentAssignmentsStatsPreparedStatement.execute(); computeCurrentAssignmentsStatsPreparedStatement.execute();
} catch (SQLException sqle) { } catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con); String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle); errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle);
@ -137,7 +137,7 @@ public class UrlController {
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery); getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
} catch (SQLException sqle) { } catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con); String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle); errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle);
@ -153,6 +153,7 @@ public class UrlController {
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId; String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg); logger.error(errorMsg);
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg); return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
}*/ }*/
@ -177,7 +178,7 @@ public class UrlController {
} }
} catch (Exception e) { } catch (Exception e) {
String errorMsg = dropCurrentAssignmentTable(con); String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n"; errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
@ -195,7 +196,7 @@ public class UrlController {
int assignmentsSize = assignments.size(); int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) { if ( assignmentsSize == 0 ) {
String errorMsg = dropCurrentAssignmentTable(con); String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId; errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId;
@ -219,7 +220,7 @@ public class UrlController {
insertAssignmentsPreparedStatement.execute(); insertAssignmentsPreparedStatement.execute();
} catch (SQLException sqle) { } catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con); String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle); errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle);
@ -234,7 +235,7 @@ public class UrlController {
} }
String errorMsg = dropCurrentAssignmentTable(con); String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table."); logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
@ -288,23 +289,23 @@ public class UrlController {
long curReportAssignments = workerReport.getAssignmentRequestCounter(); long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database."); logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
if ( ! FileUtils.getAndUploadFullTexts(urlReports, con, request, curReportAssignments, curWorkerId) ) { FileUtils.UploadFullTextsResponse uploadFullTextsResponse = FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments); logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads. // The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports); FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
} }
// The "databaseLock" was unlocked inside the "FileUtils.getAndUploadFullTexts" to avoid blocking the database while doing large irrelevant tasks like transferring files.
ImpalaConnector.databaseLock.lock(); ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
// Store the workerReport into the database. // Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, `date`, mimetype, size, `hash`, `location`, provenance) VALUES "; String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, `date`, mimetype, size, `hash`, `location`, provenance) VALUES ";
@ -390,6 +391,7 @@ public class UrlController {
con.commit(); // Commit all the insert-queries to the database (write them to disk). con.commit(); // Commit all the insert-queries to the database (write them to disk).
} catch (SQLException sqle) { } catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!"; String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
logger.error(errorMsg + "\n" + sqle.getMessage()); logger.error(errorMsg + "\n" + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);

View File

@ -106,25 +106,33 @@ public class FileUtils {
} }
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError};
private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$"); private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$"); private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator; public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator;
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames). private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public static boolean getAndUploadFullTexts(List<UrlReport> urlReports, Connection con, HttpServletRequest request, long assignmentsBatchCounter, String workerId) public static UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId)
{ {
// The Controller have to request the files from the Worker, in order to upload them to the S3. // The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database. // We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
if ( request == null ) { if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!"); logger.error("The \"HttpServletRequest\" is null!");
ImpalaConnector.databaseLock.unlock(); return UploadFullTextsResponse.unsuccessful;
return false;
} }
String remoteAddr = request.getHeader("X-FORWARDED-FOR"); String remoteAddr = request.getHeader("X-FORWARDED-FOR");
if ( remoteAddr == null || "".equals(remoteAddr) ) if ( remoteAddr == null || "".equals(remoteAddr) )
remoteAddr = request.getRemoteAddr(); remoteAddr = request.getRemoteAddr();
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the Impala-connection!");
return UploadFullTextsResponse.databaseError;
}
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ; String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ;
PreparedStatement getFileLocationForHashPreparedStatement = null; PreparedStatement getFileLocationForHashPreparedStatement = null;
try { try {
@ -132,7 +140,7 @@ public class FileUtils {
} catch (SQLException sqle) { } catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the prepared statement for \"" + getFileLocationForHashQuery + "\"!\n" + sqle.getMessage()); logger.error("Problem when creating the prepared statement for \"" + getFileLocationForHashQuery + "\"!\n" + sqle.getMessage());
return false; return UploadFullTextsResponse.databaseError;
} }
// Get the file-locations. // Get the file-locations.
@ -177,6 +185,11 @@ public class FileUtils {
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage()); logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage());
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - Since the database will have problems.. there is not point in trying to insert the payloads to Impala (handling it like we tried to insert and got an error).
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not autoclosed here)and the Database connection.
} }
} }
@ -205,6 +218,7 @@ public class FileUtils {
logger.error("Failed to close the \"getFileLocationForHashPreparedStatement\"!\n" + sqle.getMessage()); logger.error("Failed to close the \"getFileLocationForHashPreparedStatement\"!\n" + sqle.getMessage());
} finally { } finally {
ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database. ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database.
ImpalaConnector.closeConnection(con);
} }
logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ")."); logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ").");
@ -213,8 +227,8 @@ public class FileUtils {
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithIDsHashMap.keySet()); ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithIDsHashMap.keySet());
int numAllFullTexts = allFileNames.size(); int numAllFullTexts = allFileNames.size();
if ( numAllFullTexts == 0 ) { if ( numAllFullTexts == 0 ) {
logger.warn("The file retrieved by the Worker where < 0 > for assignments_" + assignmentsBatchCounter); logger.warn("The retrieved files where < 0 > for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return true; // It was handled, no error. return UploadFullTextsResponse.successful; // It was handled, no error.
} }
// Request the full-texts in batches, compressed in zip. // Request the full-texts in batches, compressed in zip.
@ -322,10 +336,10 @@ public class FileUtils {
// Check if none of the batches were handled.. // Check if none of the batches were handled..
if ( failedBatches == numOfBatches ) { if ( failedBatches == numOfBatches ) {
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId); logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return false; return UploadFullTextsResponse.unsuccessful;
} else { } else {
replaceNotUploadedFileLocations(urlReports); // Make sure all records without an s3Url have null file-data. replaceNotUploadedFileLocations(urlReports); // Make sure all records without an s3Url have < null > file-data.
return true; return UploadFullTextsResponse.successful;
} }
} }