diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java index fb892f0..c0085e6 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -139,6 +139,8 @@ public final class ImpalaConnector { statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet"); statement.execute("COMPUTE STATS " + databaseName + ".assignment"); + statement.execute("DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE"); + statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet"); statement.execute("COMPUTE STATS " + databaseName + ".attempt"); diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index d8a90dc..2b4f23b 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -82,7 +82,6 @@ public class UrlController { List assignments = new ArrayList<>(assignmentsLimit); ImpalaConnector.databaseLock.lock(); - Connection con = ImpalaConnector.getInstance().getConnection(); if ( con == null ) { // This is already logged in "getConnection()". ImpalaConnector.databaseLock.unlock(); @@ -358,12 +357,12 @@ public class UrlController { preparedInsertPayloadStatement.setString(5, payload.getMime_type()); // The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers. - String stringSize = null; + String sizeStr = null; Long size = payload.getSize(); if ( size != null ) - stringSize = String.valueOf(size); + sizeStr = String.valueOf(size); - preparedInsertPayloadStatement.setString(6, stringSize); + preparedInsertPayloadStatement.setString(6, sizeStr); preparedInsertPayloadStatement.setString(7, payload.getHash()); preparedInsertPayloadStatement.setString(8, payload.getLocation()); preparedInsertPayloadStatement.setString(9, payload.getProvenance()); @@ -383,7 +382,7 @@ public class UrlController { preparedInsertAttemptStatement.setString(2, payload.getOriginal_url()); preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired()); preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString()); - preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null". + preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null" too. preparedInsertAttemptStatement.setString(6, error.getMessage()); preparedInsertAttemptStatement.executeUpdate(); } catch (SQLException sqle) { @@ -403,7 +402,10 @@ public class UrlController { closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here, as we might move forward. } - logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables."); + if ( payloadErrorMsg != null ) + logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables, although " + payloadErrorMsg + " Going to merge the parquet files for those tables."); + else + logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables."); String mergeErrorMsg = FileUtils.mergeParquetFiles("payload", con, "", null); if ( mergeErrorMsg != null ) { @@ -442,6 +444,7 @@ public class UrlController { ImpalaConnector.closeConnection(con); } + logger.debug("Finished merging the database tables."); return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg); } diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 345718a..01f66d8 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -157,56 +157,55 @@ public class FileUtils { numFullTextUrlsFound ++; Payload payload = urlReport.getPayload(); - if ( payload != null ) - { - String fileLocation = null; + if ( payload == null ) + continue; - // Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH. - // If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker. - // If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker. - // Use the same prepared-statement for all requests, to improve speed (just like when inserting similar thing to the DB). - String fileHash = payload.getHash(); - if ( fileHash != null ) { - try { - getFileLocationForHashPreparedStatement.setString(1, fileHash); - } catch (SQLException sqle) { - logger.error("Error when setting the parameter in \"getFileLocationForHashQuery\"!\n" + sqle.getMessage()); - } + String fileLocation = null; - try ( ResultSet resultSet = getFileLocationForHashPreparedStatement.executeQuery() ) { - if ( resultSet.next() ) { // Move the "cursor" to the first row. If there is any data.. - fileLocation = resultSet.getString(1); - if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded. - payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3. - //logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG! - numFilesFoundFromPreviousAssignmentsBatches ++; - continue; - } + // Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH. + // If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker. + // If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker. + // Use the same prepared-statement for all requests, to improve speed (just like when inserting similar thing to the DB). + String fileHash = payload.getHash(); + if ( fileHash != null ) { + try { + getFileLocationForHashPreparedStatement.setString(1, fileHash); + } catch (SQLException sqle) { + logger.error("Error when setting the parameter in \"getFileLocationForHashQuery\"!\n" + sqle.getMessage()); + } + + try ( ResultSet resultSet = getFileLocationForHashPreparedStatement.executeQuery() ) { + if ( resultSet.next() ) { // Move the "cursor" to the first row. If there is any data, then take the first result (there should not be more, but we still want the first anyway). + fileLocation = resultSet.getString(1); + if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded. + payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical. + //logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG! + numFilesFoundFromPreviousAssignmentsBatches ++; + continue; // Do not request the file from the worker, it's already uploaded. Move on. } - } catch (Exception e) { - logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage()); - - // TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database?? - // TODO - Since the database will have problems.. there is not point in trying to insert the payloads to Impala (handling it like we tried to insert and got an error). - // TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not autoclosed here)and the Database connection. - } + } catch (Exception e) { + logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage()); + + // TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database?? + // TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error). + // TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection. } + } - // If the full-text of this record was not found by a previous batch.. - fileLocation = payload.getLocation(); - if ( fileLocation != null ) { // If the docFile was downloaded (without an error).. - Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation); - if ( !matcher.matches() ) { - continue; - } - String fileNameWithExtension = matcher.group(1); - if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) { - continue; - } - allFileNamesWithIDsHashMap.put(fileNameWithExtension, payload.getId()); // The keys and the values are not duplicate. Task with ID-1 might have an "ID-1.pdf" file. - // While a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1. + // If the full-text of this record was not found by a previous batch.. + fileLocation = payload.getLocation(); + if ( fileLocation != null ) { // If the docFile was downloaded (without an error).. + Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation); + if ( !matcher.matches() ) { + continue; } + String fileNameWithExtension = matcher.group(1); + if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) { + continue; + } + allFileNamesWithIDsHashMap.put(fileNameWithExtension, payload.getId()); // The keys and the values are not duplicate. Task with ID-1 might have an "ID-1.pdf" file. + // While a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again. } } @@ -299,7 +298,7 @@ public class FileUtils { if ( fileFullPath.equals(zipFileFullPath) ) // Exclude the zip-file from uploading. continue; - // Check if this stored file is related to one or more IDs from the Set. + // Check if this stored file is related to one or more IDs from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedIDs". Set fileRelatedIDs = allFileNamesWithIDsHashMap.get(fileName); if ( fileRelatedIDs.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithIDsHashMap" HashMultimap. logger.error("The stored file \"" + fileName + "\" is not related to any ID which had a file requested from the Worker!"); @@ -314,8 +313,8 @@ public class FileUtils { String s3Url = S3ObjectStoreMinIO.uploadToS3(fileName, fileFullPath); if ( s3Url != null ) { - setFullTextForMultipleIDs(payloadsHashMultimap, fileRelatedIDs, s3Url); // It checks weather (s3Url != null) and acts accordingly. - numUploadedFiles++; + setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url); // It checks weather (s3Url != null) and acts accordingly. + numUploadedFiles ++; } // Else, the record will have its file-data set to "null", in the end of this method. } @@ -556,12 +555,12 @@ public class FileUtils { /** - * Set the fileLocation for all those IDs related to the File. The IDs may have one or more payloads. - * @param payloadsHashMultimap + * Set the fileLocation for all those IDs related to the File. The IDs may have one or more payloads. * @param fileIDs + * @param payloadsHashMultimap * @param s3Url */ - public static void setFullTextForMultipleIDs(HashMultimap payloadsHashMultimap, Set fileIDs, String s3Url) + public static void setFullTextForMultipleIDs(Set fileIDs, HashMultimap payloadsHashMultimap, String s3Url) { for ( String id : fileIDs ) { Set payloads = payloadsHashMultimap.get(id);