Upgrade the algorithm for finding the previously-found fulltexts, based on their md5hash:

- Use a single query with a list of the fileHashes, instead of thousands of singe-md5hash-check queries (run at most 6 in parallel) which require a lot of I/O.
- Avoid checking multiple times the same fileHash, in case it is related with multiple payloads.
- In case of a database-error, avoid completely losing the full-texts of that worker, instead, continue processing the full-texts.
This commit is contained in:
Lampros Smyrnaios 2024-03-13 11:28:37 +02:00
parent e4540e7f3c
commit 8f9786de09
2 changed files with 106 additions and 101 deletions

View File

@ -284,11 +284,7 @@ public class UrlsServiceImpl implements UrlsService {
String workerReportBaseName = this.workerReportsDirPath + File.separator + curWorkerId + File.separator + curWorkerId + "_assignments_" + curReportAssignmentsCounter + "_report";
renameAndGetWorkerReportFile(workerReportBaseName, new File(workerReportBaseName + ".json"), "No info was found for worker: " + curWorkerId); // It may return null.
return false;
} else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Problem with the Impala-database!");
return false;
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
} else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);

View File

@ -1,7 +1,6 @@
package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.UrlsController;
@ -29,17 +28,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.sql.Types;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
@ -66,7 +58,7 @@ public class FileUtils {
public enum UploadFullTextsResponse {successful, successful_without_fulltexts, unsuccessful, databaseError}
public enum UploadFullTextsResponse {successful, successful_without_fulltexts, unsuccessful}
public String baseFilesLocation;
@ -220,100 +212,117 @@ public class FileUtils {
workerIp = workerInfo.getWorkerIP(); // This won't be null.
// Get the file-locations.
final AtomicInteger numValidFullTextsFound = new AtomicInteger(0);
final AtomicInteger numFilesFoundFromPreviousAssignmentsBatches = new AtomicInteger(0);
final AtomicInteger numFullTextsWithProblematicLocations = new AtomicInteger(0);
int numValidFullTextsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
int numFullTextsWithProblematicLocations = 0;
SetMultimap<String, Payload> allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((sizeOfUrlReports / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
HashMultimap<String, Payload> allFileNamesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload" + (isTestEnvironment ? "_aggregated" : "") + " where `hash` = ? limit 1";
final int[] hashArgType = new int[] {Types.VARCHAR};
final List<Callable<Void>> callableTasks = new ArrayList<>(6);
HashMultimap<String, Payload> hashesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple payloads for the same fileHash.
// The "Hash" part of the multimap helps with avoiding duplicate fileHashes.
for ( UrlReport urlReport : urlReports )
{
callableTasks.add(() -> {
Payload payload = urlReport.getPayload();
if ( payload == null )
return null;
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
String fileLocation = payload.getLocation();
if ( fileLocation == null )
return null; // The full-text was not retrieved for this UrlReport.
String fileLocation = payload.getLocation();
if ( fileLocation == null )
continue; // The full-text was not retrieved for this UrlReport.
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
String fileHash = payload.getHash();
if ( fileHash != null ) {
String alreadyFoundFileLocation = null;
try {
alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, hashArgType, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n", e);
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - The idea is that since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// Unless we do what it is said above, do not continue to the next UrlReport, this query-exception should not disrupt the normal full-text processing.
}
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
String fileHash = payload.getHash();
if ( fileHash != null )
{
hashesWithPayloads.put(fileHash, payload); // Hold multiple payloads per fileHash.
// There are 2 cases, which contribute to that:
// 1) Different publication-IDs end up giving the same full-text-url, resulting in the same file. Those duplicates are not saved, but instead, the location, hash and size of the file is copied to the other payload.
// 2) Different publication-IDs end up giving different full-text-urls which point to the same file. Although very rare, in this case, the file is downloaded again by the Worker and has a different name.
if ( alreadyFoundFileLocation != null ) { // If the full-text of this record is already-found and uploaded.
payload.setLocation(alreadyFoundFileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
if ( logger.isTraceEnabled() )
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches.incrementAndGet();
numValidFullTextsFound.incrementAndGet();
return null; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
}
}
// In either case, the duplicate file will not be transferred to the Controller, but in the 2nd one it takes up extra space, at least for some time.
// TODO - Implement a fileHash-check lagorithm in the Worker's side ("PublicationsRetriever"), to avoid keeping those files in storage.
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
numFullTextsWithProblematicLocations.incrementAndGet();
return null;
}
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
numFullTextsWithProblematicLocations.incrementAndGet();
return null;
}
numValidFullTextsFound.incrementAndGet();
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
return null;
});
} else // This should never happen..
logger.error("Payload: " + payload + " has a null fileHash!");
}// end-for
DatabaseConnector.databaseLock.lock(); // The execution uses the database.
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
List<Future<Void>> futures = hashMatchingExecutor.invokeAll(callableTasks);
for ( Future<Void> future : futures ) {
try {
Void result = future.get(); // The result is always "null" as we have a "Void" type.
} catch (Exception e) {
logger.error("", e);
}
}
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish checking for already-found file-hashes: " + ie.getMessage());
// This is a very rare case. At the moment, we just move on with what we have so far.
} catch (Exception e) {
logger.error("Unexpected error when checking for already-found file-hashes in parallel!", e);
return UploadFullTextsResponse.unsuccessful;
} finally {
DatabaseConnector.databaseLock.unlock(); // The remaining work of this function does not use the database.
Set<String> fileHashes = hashesWithPayloads.keySet();
int fileHashesSetSize = fileHashes.size(); // Get the size of the keysSet, instead of the whole multimap.
if ( fileHashesSetSize == 0 ) {
logger.warn("No fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\".");
return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
if ( numFullTextsWithProblematicLocations.get() > 0 )
logger.warn(numFullTextsWithProblematicLocations.get() + " files had problematic names.");
// Prepare the "fileHashListString" to be used inside the "getHashLocationsQuery". Create the following string-pattern:
// ("HASH_1", "HASH_2", ...)
int stringBuilderCapacity = ((fileHashesSetSize * 32) + (fileHashesSetSize -1) +2);
if ( numValidFullTextsFound.get() == 0 ) {
String getHashLocationsQuery = "select distinct `hash`, `location` from " + DatabaseConnector.databaseName + ".payload where `hash` in "
+ getQueryListString(new ArrayList<>(fileHashes), fileHashesSetSize, stringBuilderCapacity);
HashMap<String, String> hashLocationMap = new HashMap<>(fileHashesSetSize/2); // No multimap is needed since only one location is returned for each fileHash.
DatabaseConnector.databaseLock.lock(); // The execution uses the database.
try {
jdbcTemplate.query(getHashLocationsQuery, rs -> {
try { // For each of the 4 columns returned, do the following. The column-indexing starts from 1.
hashLocationMap.put(rs.getString(1), rs.getString(2));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
}
});
} catch (EmptyResultDataAccessException erdae) {
logger.warn("No previously-found hash-locations where found for assignments_" + assignmentsBatchCounter);
} catch (Exception e) {
logger.error("Unexpected error when checking for already-found file-hashes!", e);
// We will continue with storing the files, we do not want to lose them.
} finally {
DatabaseConnector.databaseLock.unlock();
}
for ( String fileHash : fileHashes )
{
for ( Payload payload : hashesWithPayloads.get(fileHash) )
{
String alreadyFoundFileLocation = hashLocationMap.get(fileHash); // Only one location has been retrieved per fileHash.
if ( alreadyFoundFileLocation != null ) {
// Fill the payloads with locations from the "previously-found-hashes."
payload.setLocation(alreadyFoundFileLocation);
if ( logger.isTraceEnabled() )
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
numValidFullTextsFound ++; // We trust the location being valid..
}
else { // This file has not been found before..
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
String fileLocation = payload.getLocation();
Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
numValidFullTextsFound ++;
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}
}
}
if ( numFullTextsWithProblematicLocations > 0 )
logger.warn(numFullTextsWithProblematicLocations + " files had problematic names.");
if ( numValidFullTextsFound == 0 ) {
logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either.
}
@ -321,24 +330,24 @@ public class FileUtils {
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file.
int numFullTextsToBeRequested = allFileNames.size();
if ( numFullTextsToBeRequested == 0 ) {
logger.info(numValidFullTextsFound.get() + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before.");
logger.info(numValidFullTextsFound + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before.");
return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound.get() + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound.get() * 100.0 / sizeOfUrlReports) + "%).");
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound * 100.0 / sizeOfUrlReports) + "%).");
// TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time..
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches.get());
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
// Request the full-texts in batches, compressed in a zstd tar file.
int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch);
int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch);
if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
} else
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
@ -403,7 +412,7 @@ public class FileUtils {
long finalPayloadsCounter = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.count();
int numInitialPayloads = (numValidFullTextsFound.get() + numFullTextsWithProblematicLocations.get());
int numInitialPayloads = (numValidFullTextsFound + numFullTextsWithProblematicLocations);
long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter);
if ( numFailedPayloads == numInitialPayloads ) {
// This will also be the case if there was no DB failure, but all the batches have failed.