From 780ed15ce224b83aa57145cd5892d172fe49350a Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Tue, 30 Nov 2021 13:26:19 +0200 Subject: [PATCH] - Fix a "databaseLock" bug, which could cause both the payload and attempt inserts and the "mergeParquetFiles" to fail, as the inserts could be executed concurrently with tables-compaction. - Fix the "null" representation of an "unknown" payload-size in the database. - Remove the obsolete thread-locking for the "CreateDatabase" operation. This code is guaranteed to run BEFORE any other operation in the database. - Implement the "handlePreparedStatementException" and "closeConnection" methods. - Improve error-logs. - Update dependencies. - Code cleanup. --- build.gradle | 2 +- gradle/wrapper/gradle-wrapper.properties | 2 +- installAndRun.sh | 2 +- .../configuration/ImpalaConnector.java | 83 +++++++++++-------- .../controllers/UrlController.java | 54 +++++------- ...Response.java => AssignmentsResponse.java} | 22 ++--- 6 files changed, 85 insertions(+), 80 deletions(-) rename src/main/java/eu/openaire/urls_controller/payloads/responces/{AssignmentResponse.java => AssignmentsResponse.java} (57%) diff --git a/build.gradle b/build.gradle index 93b3729..5757916 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ plugins { - id 'org.springframework.boot' version '2.5.6' + id 'org.springframework.boot' version '2.6.1' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ffed3a2..e750102 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.3-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/installAndRun.sh b/installAndRun.sh index ff6545a..a3bb3e3 100755 --- a/installAndRun.sh +++ b/installAndRun.sh @@ -8,7 +8,7 @@ elif [[ $# -gt 1 ]]; then echo -e "Wrong number of arguments given: ${#}\nPlease execute it like: script.sh "; exit 1 fi -gradleVersion="7.2" +gradleVersion="7.3" if [[ justInstall -eq 0 ]]; then diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java index 265e0e5..eff4a6e 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -41,22 +41,6 @@ public final class ImpalaConnector { } - public HikariDataSource impalaDS() throws SQLException, PropertyVetoException - { - HikariConfig hikariConfig = new HikariConfig(); - hikariConfig.setDriverClassName(ImpalaConnector.impalaDriver); - hikariConfig.setAutoCommit(true); - hikariConfig.setJdbcUrl(ImpalaConnector.impalaConnectionUrl); - hikariConfig.setPoolName(poolName); - hikariConfig.setMaximumPoolSize(hikariMaxConnectionPoolSize); - hikariConfig.setMaxLifetime(hikariMaxLifetime); - hikariConfig.setMinimumIdle(hikariMinIdleConnections); - hikariConfig.setConnectionTimeout(hikariConnectionTimeOut); - hikariConfig.setIdleTimeout(hikariIdleTimeOut); - return new HikariDataSource(hikariConfig); - } - - public ImpalaConnector() { logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes."); @@ -81,36 +65,49 @@ public final class ImpalaConnector { } else throw new RuntimeException("The \"impalaDriver\" was null or empty!"); } catch(Exception e) { - logger.error("Error when loading the database properties!\n" + e.getMessage()); - e.printStackTrace(); + String errorMsg = "Error when loading the database properties!\n" + e.getMessage(); + logger.error(errorMsg, e); + System.err.println(errorMsg); System.exit(11); } try { hikariDataSource = impalaDS(); } catch (SQLException | PropertyVetoException e) { - logger.error("Problem when creating the Hikari connection pool!"); - e.printStackTrace(); + logger.error("Problem when creating the Hikari connection pool!", e); } createDatabase(); } + public HikariDataSource impalaDS() throws SQLException, PropertyVetoException + { + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setDriverClassName(ImpalaConnector.impalaDriver); + hikariConfig.setAutoCommit(true); + hikariConfig.setJdbcUrl(ImpalaConnector.impalaConnectionUrl); + hikariConfig.setPoolName(poolName); + hikariConfig.setMaximumPoolSize(hikariMaxConnectionPoolSize); + hikariConfig.setMaxLifetime(hikariMaxLifetime); + hikariConfig.setMinimumIdle(hikariMinIdleConnections); + hikariConfig.setConnectionTimeout(hikariConnectionTimeOut); + hikariConfig.setIdleTimeout(hikariIdleTimeOut); + return new HikariDataSource(hikariConfig); + } + + public void createDatabase() { - databaseLock.lock(); // Make sure the database and tables are created before the workers can request assignments. Connection con = getConnection(); - if ( con == null ) { - databaseLock.unlock(); + if ( con == null ) System.exit(22); - } try { if ( !con.getMetaData().supportsBatchUpdates() ) logger.warn("The database does not support \"BatchUpdates\"!"); } catch (SQLException e) { - e.printStackTrace(); + logger.error(e.getMessage(), e); } logger.info("Going to create the database and the tables, if they do not exist. Also will fill some tables with data from OpenAIRE."); @@ -119,12 +116,7 @@ public final class ImpalaConnector { statement = con.createStatement(); } catch (SQLException sqle) { logger.error("Problem when creating a connection-statement!\n" + sqle.getMessage()); - try { - con.close(); - } catch (SQLException sqle2) { - logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage()); - } - databaseLock.unlock(); + ImpalaConnector.closeConnection(con); System.exit(33); } @@ -152,11 +144,11 @@ public final class ImpalaConnector { statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet"); statement.execute("COMPUTE STATS " + databaseName + ".payload"); } catch (SQLException sqle) { - logger.error("Problem when executing the \"create database and create tables queries!\n" + sqle.getMessage() + "\nSQL state: " + sqle.getSQLState() + "\nError code: " + sqle.getErrorCode()); - sqle.printStackTrace(); + String errorMsg = "Problem when executing the \"create database and create tables queries!\n" + sqle.getMessage() + "\nSQL state: " + sqle.getSQLState() + "\nError code: " + sqle.getErrorCode(); + logger.error(errorMsg, sqle); + System.err.println(errorMsg); System.exit(44); } finally { - databaseLock.unlock(); try { statement.close(); con.close(); @@ -222,7 +214,7 @@ public final class ImpalaConnector { //logger.debug(responseEntity.toString()); } catch (SQLException sqle) { - sqle.printStackTrace(); + logger.error(sqle.getMessage(), sqle); return false; } finally { try { @@ -236,4 +228,25 @@ public final class ImpalaConnector { return true; } + + public static boolean closeConnection(Connection con) { + try { + if ( con != null ) + con.close(); // It may have already closed and that's fine. + return true; + } catch (SQLException sqle) { + logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); + return false; + } + } + + + public static String handlePreparedStatementException(String queryName, String query, String preparedStatementName, PreparedStatement preparedStatement, Connection con, Exception e) + { + String errorMsg = "Problem when creating " + (( ! queryName.startsWith("get")) ? "and executing " : "") + "the prepared statement for \"" + queryName + "\"!\n"; + logger.error(errorMsg + "\n\n" + query + "\n\n" + e.getMessage(), e); + closeConnection(con); + return errorMsg; + } + } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index 4e3803c..2e02835 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -5,10 +5,8 @@ import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.models.Error; import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.payloads.requests.WorkerReport; -import eu.openaire.urls_controller.payloads.responces.AssignmentResponse; -import eu.openaire.urls_controller.util.ControllerConstants; -import eu.openaire.urls_controller.util.FileUtils; -import eu.openaire.urls_controller.util.GenericUtils; +import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; +import eu.openaire.urls_controller.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; @@ -137,8 +135,7 @@ public class UrlController { datasource.setId(resultSet.getString(3)); datasource.setName(resultSet.getString(4)); } catch (SQLException sqle) { - logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow()); - sqle.printStackTrace(); + logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow(), sqle); continue; // This object is broken, move to the next row. } assignment.setDatasource(datasource); @@ -262,8 +259,8 @@ public class UrlController { closePreparedStatements(preparedInsertAssignmentStatement, null, con); } - logger.info("Sending batch_" + assignmentsBatchCounter.incrementAndGet() + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + "."); - return ResponseEntity.status(HttpStatus.OK).body(new AssignmentResponse(assignmentsBatchCounter.get(), assignments)); + logger.info("Sending batch-assignments_" + assignmentsBatchCounter.incrementAndGet() + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + "."); + return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(assignmentsBatchCounter.get(), assignments)); } @@ -286,9 +283,7 @@ public class UrlController { logger.info("Received the WorkerReport for batch_ " + workerReport.getAssignmentRequestCounter() + ", from the worker with id: " + workerReport.getWorkerId() + ". It contains " + urlReports.size() + " urlReports. Going to insert them into the database."); - // TODO - The Controller will have to request the files from the Worker, in order to upload them to the S3, in the future. - // We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database. - + ImpalaConnector.databaseLock.lock(); Connection con = ImpalaConnector.getInstance().getConnection(); if ( con == null ) @@ -313,7 +308,7 @@ public class UrlController { } try { - con.setAutoCommit(false); + con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end. } catch (SQLException sqle) { String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!"; logger.error(errorMsg + "\n" + sqle.getMessage()); @@ -327,24 +322,26 @@ public class UrlController { for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); if ( payload == null ) { - logger.error("Payload was \"null\" for a \"urlReport\"!"); - // TODO - A bit rare to happen.. but what should we do? (otherwise an NPE will be thrown later) - // We can't just create an empty object and proceed, since the payload is as important as the whole row to be inserted.. - // TODO - Just add it in an errorMessage to be returned in the end. Should it return HTTP-200 but with a small error message along? + logger.error("Payload was \"null\" for a \"urlReport\", in assignments_" + assignmentsBatchCounter); payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!"; continue; } String tempFullQueryString = null; - try { // We use a "PreparedStatement" to do insertions, for security reasons. preparedInsertPayloadStatement.setString(1, payload.getId()); preparedInsertPayloadStatement.setString(2, payload.getOriginal_url()); preparedInsertPayloadStatement.setString(3, payload.getActual_url()); preparedInsertPayloadStatement.setDate(4, payload.getDate_acquired()); preparedInsertPayloadStatement.setString(5, payload.getMime_type()); + // The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers. - preparedInsertPayloadStatement.setString(6, String.valueOf(payload.getSize())); + String stringSize = null; + Long size = payload.getSize(); + if ( size != null ) + stringSize = String.valueOf(size); + + preparedInsertPayloadStatement.setString(6, stringSize); preparedInsertPayloadStatement.setString(7, payload.getHash()); preparedInsertPayloadStatement.setString(8, payload.getLocation()); preparedInsertPayloadStatement.setString(9, payload.getProvenance()); @@ -374,13 +371,12 @@ public class UrlController { } }//end for-loop - ImpalaConnector.databaseLock.lock(); try { - con.commit(); // Send all the insert-queries to the database. + con.commit(); // Commit all the insert-queries to the database (write them to disk). } catch (SQLException sqle) { + ImpalaConnector.databaseLock.unlock(); String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!"; logger.error(errorMsg + "\n" + sqle.getMessage()); - ImpalaConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } finally { closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here! @@ -391,21 +387,19 @@ public class UrlController { String mergeErrorMsg = mergeParquetFiles("payload", con); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - try { con.close(); } - catch (SQLException sqle) { logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); } + ImpalaConnector.closeConnection(con); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); } mergeErrorMsg = mergeParquetFiles("attempt", con); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - try { con.close(); } - catch (SQLException sqle) { logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); } + ImpalaConnector.closeConnection(con); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); } try { - con.commit(); // Apply the merges. + con.commit(); // Apply the merges permanently (write them to disk). con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool. } catch (SQLException sqle) { String errorMsg = "Problem when committing changes to the database!"; @@ -414,8 +408,7 @@ public class UrlController { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } finally { ImpalaConnector.databaseLock.unlock(); - try { con.close(); } - catch (SQLException sqle) { logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); } + ImpalaConnector.closeConnection(con); } return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg); @@ -516,8 +509,7 @@ public class UrlController { if ( con != null ) con.close(); // It may have already closed and that's fine. return true; - } - catch (SQLException sqle) { + } catch (SQLException sqle) { logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); return false; } @@ -576,7 +568,7 @@ public class UrlController { logger.info("Sending batch_" + assignmentsBatchCounter.incrementAndGet() + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId); - return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentResponse(assignmentsBatchCounter.get(), assignments)); + return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(assignmentsBatchCounter.get(), assignments)); } } diff --git a/src/main/java/eu/openaire/urls_controller/payloads/responces/AssignmentResponse.java b/src/main/java/eu/openaire/urls_controller/payloads/responces/AssignmentsResponse.java similarity index 57% rename from src/main/java/eu/openaire/urls_controller/payloads/responces/AssignmentResponse.java rename to src/main/java/eu/openaire/urls_controller/payloads/responces/AssignmentsResponse.java index e61babe..6401db1 100644 --- a/src/main/java/eu/openaire/urls_controller/payloads/responces/AssignmentResponse.java +++ b/src/main/java/eu/openaire/urls_controller/payloads/responces/AssignmentsResponse.java @@ -8,26 +8,26 @@ import java.util.List; @JsonInclude(JsonInclude.Include.NON_NULL) -public class AssignmentResponse { +public class AssignmentsResponse { - @JsonProperty("assignmentCounter") - private Long assignmentCounter; + @JsonProperty("assignmentsCounter") + private Long assignmentsCounter; @JsonProperty("assignments") private List assignments; - public AssignmentResponse(Long assignmentCounter, List assignments) { - this.assignmentCounter = assignmentCounter; + public AssignmentsResponse(Long assignmentCounter, List assignments) { + this.assignmentsCounter = assignmentCounter; this.assignments = assignments; } - public Long getAssignmentCounter() { - return assignmentCounter; + public Long getAssignmentsCounter() { + return assignmentsCounter; } - public void setAssignmentCounter(Long assignmentCounter) { - this.assignmentCounter = assignmentCounter; + public void setAssignmentsCounter(Long assignmentsCounter) { + this.assignmentsCounter = assignmentsCounter; } public List getAssignments() { @@ -41,8 +41,8 @@ public class AssignmentResponse { @Override public String toString() { - return "AssignmentResponse{" + - "assignmentCounter=" + assignmentCounter + + return "AssignmentsResponse{" + + "assignmentsCounter=" + assignmentsCounter + ", assignments=" + assignments + '}'; }