From 8f9786de09ae9d8e5f06eb8b6dafdd91602938ec Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Wed, 13 Mar 2024 11:28:37 +0200 Subject: [PATCH] Upgrade the algorithm for finding the previously-found fulltexts, based on their md5hash: - Use a single query with a list of the fileHashes, instead of thousands of singe-md5hash-check queries (run at most 6 in parallel) which require a lot of I/O. - Avoid checking multiple times the same fileHash, in case it is related with multiple payloads. - In case of a database-error, avoid completely losing the full-texts of that worker, instead, continue processing the full-texts. --- .../services/UrlsServiceImpl.java | 6 +- .../urls_controller/util/FileUtils.java | 201 +++++++++--------- 2 files changed, 106 insertions(+), 101 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 8ebdafd..362d682 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -284,11 +284,7 @@ public class UrlsServiceImpl implements UrlsService { String workerReportBaseName = this.workerReportsDirPath + File.separator + curWorkerId + File.separator + curWorkerId + "_assignments_" + curReportAssignmentsCounter + "_report"; renameAndGetWorkerReportFile(workerReportBaseName, new File(workerReportBaseName + ".json"), "No info was found for worker: " + curWorkerId); // It may return null. return false; - } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) { - postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Problem with the Impala-database!"); - return false; - } - else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { + } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter); // The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available. fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false); diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 24b455e..893977d 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -1,7 +1,6 @@ package eu.openaire.urls_controller.util; import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.controllers.UrlsController; @@ -29,17 +28,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.SQLException; -import java.sql.Types; import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.Callable; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; @@ -66,7 +58,7 @@ public class FileUtils { - public enum UploadFullTextsResponse {successful, successful_without_fulltexts, unsuccessful, databaseError} + public enum UploadFullTextsResponse {successful, successful_without_fulltexts, unsuccessful} public String baseFilesLocation; @@ -220,100 +212,117 @@ public class FileUtils { workerIp = workerInfo.getWorkerIP(); // This won't be null. // Get the file-locations. - final AtomicInteger numValidFullTextsFound = new AtomicInteger(0); - final AtomicInteger numFilesFoundFromPreviousAssignmentsBatches = new AtomicInteger(0); - final AtomicInteger numFullTextsWithProblematicLocations = new AtomicInteger(0); + int numValidFullTextsFound = 0; + int numFilesFoundFromPreviousAssignmentsBatches = 0; + int numFullTextsWithProblematicLocations = 0; - SetMultimap allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((sizeOfUrlReports / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. + HashMultimap allFileNamesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. - final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload" + (isTestEnvironment ? "_aggregated" : "") + " where `hash` = ? limit 1"; - final int[] hashArgType = new int[] {Types.VARCHAR}; - - final List> callableTasks = new ArrayList<>(6); + HashMultimap hashesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple payloads for the same fileHash. + // The "Hash" part of the multimap helps with avoiding duplicate fileHashes. for ( UrlReport urlReport : urlReports ) { - callableTasks.add(() -> { - Payload payload = urlReport.getPayload(); - if ( payload == null ) - return null; + Payload payload = urlReport.getPayload(); + if ( payload == null ) + continue; - String fileLocation = payload.getLocation(); - if ( fileLocation == null ) - return null; // The full-text was not retrieved for this UrlReport. + String fileLocation = payload.getLocation(); + if ( fileLocation == null ) + continue; // The full-text was not retrieved for this UrlReport. - // Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH. - // If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker. - // If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker. - String fileHash = payload.getHash(); - if ( fileHash != null ) { - String alreadyFoundFileLocation = null; - try { - alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, hashArgType, String.class); - } catch (EmptyResultDataAccessException erdae) { - // No fileLocation is found, it's ok. It will be null by default. - } catch (Exception e) { - logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n", e); - // TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database?? - // TODO - The idea is that since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error). - // Unless we do what it is said above, do not continue to the next UrlReport, this query-exception should not disrupt the normal full-text processing. - } + // Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH. + // If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker. + // If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker. + String fileHash = payload.getHash(); + if ( fileHash != null ) + { + hashesWithPayloads.put(fileHash, payload); // Hold multiple payloads per fileHash. + // There are 2 cases, which contribute to that: + // 1) Different publication-IDs end up giving the same full-text-url, resulting in the same file. Those duplicates are not saved, but instead, the location, hash and size of the file is copied to the other payload. + // 2) Different publication-IDs end up giving different full-text-urls which point to the same file. Although very rare, in this case, the file is downloaded again by the Worker and has a different name. - if ( alreadyFoundFileLocation != null ) { // If the full-text of this record is already-found and uploaded. - payload.setLocation(alreadyFoundFileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical. - if ( logger.isTraceEnabled() ) - logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG! - numFilesFoundFromPreviousAssignmentsBatches.incrementAndGet(); - numValidFullTextsFound.incrementAndGet(); - return null; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later. - } - } + // In either case, the duplicate file will not be transferred to the Controller, but in the 2nd one it takes up extra space, at least for some time. + // TODO - Implement a fileHash-check lagorithm in the Worker's side ("PublicationsRetriever"), to avoid keeping those files in storage. - // Extract the "fileNameWithExtension" to be added in the HashMultimap. - Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileLocation); - if ( ! matcher.matches() ) { - logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION); - numFullTextsWithProblematicLocations.incrementAndGet(); - return null; - } - String fileNameWithExtension = matcher.group(2); - if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) { - logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION); - numFullTextsWithProblematicLocations.incrementAndGet(); - return null; - } - - numValidFullTextsFound.incrementAndGet(); - allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate. - // Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again. - return null; - }); + } else // This should never happen.. + logger.error("Payload: " + payload + " has a null fileHash!"); }// end-for - DatabaseConnector.databaseLock.lock(); // The execution uses the database. - try { // Invoke all the tasks and wait for them to finish before moving to the next batch. - List> futures = hashMatchingExecutor.invokeAll(callableTasks); - for ( Future future : futures ) { - try { - Void result = future.get(); // The result is always "null" as we have a "Void" type. - } catch (Exception e) { - logger.error("", e); - } - } - } catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled. - logger.warn("The current thread was interrupted when waiting for the worker-threads to finish checking for already-found file-hashes: " + ie.getMessage()); - // This is a very rare case. At the moment, we just move on with what we have so far. - } catch (Exception e) { - logger.error("Unexpected error when checking for already-found file-hashes in parallel!", e); - return UploadFullTextsResponse.unsuccessful; - } finally { - DatabaseConnector.databaseLock.unlock(); // The remaining work of this function does not use the database. + Set fileHashes = hashesWithPayloads.keySet(); + int fileHashesSetSize = fileHashes.size(); // Get the size of the keysSet, instead of the whole multimap. + if ( fileHashesSetSize == 0 ) { + logger.warn("No fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\"."); + return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error. } - if ( numFullTextsWithProblematicLocations.get() > 0 ) - logger.warn(numFullTextsWithProblematicLocations.get() + " files had problematic names."); + // Prepare the "fileHashListString" to be used inside the "getHashLocationsQuery". Create the following string-pattern: + // ("HASH_1", "HASH_2", ...) + int stringBuilderCapacity = ((fileHashesSetSize * 32) + (fileHashesSetSize -1) +2); - if ( numValidFullTextsFound.get() == 0 ) { + String getHashLocationsQuery = "select distinct `hash`, `location` from " + DatabaseConnector.databaseName + ".payload where `hash` in " + + getQueryListString(new ArrayList<>(fileHashes), fileHashesSetSize, stringBuilderCapacity); + + HashMap hashLocationMap = new HashMap<>(fileHashesSetSize/2); // No multimap is needed since only one location is returned for each fileHash. + + DatabaseConnector.databaseLock.lock(); // The execution uses the database. + try { + jdbcTemplate.query(getHashLocationsQuery, rs -> { + try { // For each of the 4 columns returned, do the following. The column-indexing starts from 1. + hashLocationMap.put(rs.getString(1), rs.getString(2)); + } catch (SQLException sqle) { + logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle); + } + }); + } catch (EmptyResultDataAccessException erdae) { + logger.warn("No previously-found hash-locations where found for assignments_" + assignmentsBatchCounter); + } catch (Exception e) { + logger.error("Unexpected error when checking for already-found file-hashes!", e); + // We will continue with storing the files, we do not want to lose them. + } finally { + DatabaseConnector.databaseLock.unlock(); + } + + for ( String fileHash : fileHashes ) + { + for ( Payload payload : hashesWithPayloads.get(fileHash) ) + { + String alreadyFoundFileLocation = hashLocationMap.get(fileHash); // Only one location has been retrieved per fileHash. + if ( alreadyFoundFileLocation != null ) { + // Fill the payloads with locations from the "previously-found-hashes." + payload.setLocation(alreadyFoundFileLocation); + if ( logger.isTraceEnabled() ) + logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG! + numFilesFoundFromPreviousAssignmentsBatches ++; + numValidFullTextsFound ++; // We trust the location being valid.. + } + else { // This file has not been found before.. + // Extract the "fileNameWithExtension" to be added in the HashMultimap. + String fileLocation = payload.getLocation(); + Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileLocation); + if ( ! matcher.matches() ) { + logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION); + numFullTextsWithProblematicLocations ++; + continue; + } + String fileNameWithExtension = matcher.group(2); + if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) { + logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION); + numFullTextsWithProblematicLocations ++; + continue; + } + + numValidFullTextsFound ++; + allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate. + // Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again. + } + } + } + + if ( numFullTextsWithProblematicLocations > 0 ) + logger.warn(numFullTextsWithProblematicLocations + " files had problematic names."); + + if ( numValidFullTextsFound == 0 ) { logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId); return UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either. } @@ -321,24 +330,24 @@ public class FileUtils { ArrayList allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file. int numFullTextsToBeRequested = allFileNames.size(); if ( numFullTextsToBeRequested == 0 ) { - logger.info(numValidFullTextsFound.get() + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before."); + logger.info(numValidFullTextsFound + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before."); return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error. } - logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound.get() + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound.get() * 100.0 / sizeOfUrlReports) + "%)."); + logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound * 100.0 / sizeOfUrlReports) + "%)."); // TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time.. - logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches.get()); + logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches); // Request the full-texts in batches, compressed in a zstd tar file. int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch); int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch); if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are. numOfBatches++; - logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files)."); + logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files)."); } else - logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each)."); + logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each)."); // Check if one full text is left out because of the division. Put it int the last batch. String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; @@ -403,7 +412,7 @@ public class FileUtils { long finalPayloadsCounter = urlReports.parallelStream() .map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null))) .count(); - int numInitialPayloads = (numValidFullTextsFound.get() + numFullTextsWithProblematicLocations.get()); + int numInitialPayloads = (numValidFullTextsFound + numFullTextsWithProblematicLocations); long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter); if ( numFailedPayloads == numInitialPayloads ) { // This will also be the case if there was no DB failure, but all the batches have failed.