Improve performance of the hash-checking algorithm by using multithreading.

This commit is contained in:
Lampros Smyrnaios 2022-12-15 18:34:28 +02:00
parent 9cdbbdea67
commit e11afe5ab2
2 changed files with 103 additions and 58 deletions

View File

@ -1,6 +1,7 @@
package eu.openaire.urls_controller;
import eu.openaire.urls_controller.controllers.UrlController;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.UriBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -18,6 +19,7 @@ import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
@ -44,19 +46,29 @@ public class Application {
@PreDestroy
public void closeThreads() {
public void preDestroy() {
logger.info("Shutting down the threads..");
UrlController.insertsExecutor.shutdown(); // Define that no new tasks will be scheduled.
shutdownThreads(UrlController.insertsExecutor);
shutdownThreads(FileUtils.hashMatchingExecutor);
logger.info("Exiting..");
}
private void shutdownThreads(ExecutorService executorService)
{
executorService.shutdown(); // Define that no new tasks will be scheduled.
try {
if ( ! UrlController.insertsExecutor.awaitTermination(1, TimeUnit.MINUTES) ) {
if ( ! executorService.awaitTermination(1, TimeUnit.MINUTES) ) {
logger.warn("The working threads did not finish on time! Stopping them immediately..");
UrlController.insertsExecutor.shutdownNow();
executorService.shutdownNow();
}
} catch (SecurityException se) {
logger.error("Could not shutdown the threads in any way..!", se);
} catch (InterruptedException ie) {
try {
UrlController.insertsExecutor.shutdownNow();
executorService.shutdownNow();
} catch (SecurityException se) {
logger.error("Could not shutdown the threads in any way..!", se);
}

View File

@ -1,6 +1,8 @@
package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
@ -24,6 +26,11 @@ import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -93,6 +100,9 @@ public class FileUtils {
private final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public static final ExecutorService hashMatchingExecutor = Executors.newFixedThreadPool(6);
public UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) {
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
@ -106,73 +116,96 @@ public class FileUtils {
remoteAddr = request.getRemoteAddr();
// Get the file-locations.
int numFullTextsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
AtomicInteger numFullTextsFound = new AtomicInteger();
AtomicInteger numFilesFoundFromPreviousAssignmentsBatches = new AtomicInteger();
int urlReportsSize = urlReports.size();
HashMultimap<String, Payload> allFileNamesWithPayloads = HashMultimap.create((urlReportsSize / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1" ;
SetMultimap<String, Payload> allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((urlReportsSize / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1";
final int[] hashArgType = new int[] {Types.VARCHAR};
ImpalaConnector.databaseLock.lock(); // The following loop uses the database.
List<Callable<Void>> callableTasks = new ArrayList<>(6);
for ( UrlReport urlReport : urlReports )
{
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
callableTasks.add(() -> {
Payload payload = urlReport.getPayload();
if ( payload == null )
return null;
String fileLocation = payload.getLocation();
if ( fileLocation == null )
continue; // The full-text was not retrieved, go to the next UrlReport.
String fileLocation = payload.getLocation();
if ( fileLocation == null )
return null; // The full-text was not retrieved, go to the next UrlReport.
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
String fileHash = payload.getHash();
if ( fileHash != null ) {
String alreadyFoundFileLocation = null;
try {
alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, hashArgType, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n", e);
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection.
// Unless we do what it is said above, do not continue to the next UrlReport, this query-exception should not disrupt the normal full-text processing.
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
String fileHash = payload.getHash();
if ( fileHash != null ) {
String alreadyFoundFileLocation = null;
try {
alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, hashArgType, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n", e);
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - The idea is that since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// Unless we do what it is said above, do not continue to the next UrlReport, this query-exception should not disrupt the normal full-text processing.
}
if ( alreadyFoundFileLocation != null ) { // If the full-text of this record is already-found and uploaded.
payload.setLocation(alreadyFoundFileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches.incrementAndGet();
numFullTextsFound.incrementAndGet();
return null; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
}
}
if ( alreadyFoundFileLocation != null ) { // If the full-text of this record is already-found and uploaded.
payload.setLocation(alreadyFoundFileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
numFullTextsFound ++;
continue; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"" + fileLocation + "\" with the regex: " + FILENAME_ID_EXTENSION);
return null;
}
String fileNameWithExtension = matcher.group(1);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"" + fileLocation + "\".");
return null;
}
}
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"" + fileLocation + "\" with the regex: " + FILENAME_ID_EXTENSION);
continue;
}
String fileNameWithExtension = matcher.group(1);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"" + fileLocation + "\".");
continue;
}
numFullTextsFound ++;
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
numFullTextsFound.incrementAndGet();
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
return null;
});
}// end-for
ImpalaConnector.databaseLock.unlock(); // The remaining work of this function does not use the database.
ImpalaConnector.databaseLock.lock(); // The execution uses the database.
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
List<Future<Void>> futures = hashMatchingExecutor.invokeAll(callableTasks);
for ( Future<Void> future : futures ) {
try {
Void result = future.get(); // The result is always "null" as we have a "Void" type.
} catch (Exception e) {
logger.error("", e);
}
}
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish checking for already-found file-hashes: " + ie.getMessage());
// This is a very rare case. At the moment, we just move on with what we have so far.
} catch (Exception e) {
String errorMsg = "Unexpected error when checking for already-found file-hashes in parallel! " + e.getMessage();
logger.error(errorMsg, e);
return UploadFullTextsResponse.unsuccessful;
} finally {
ImpalaConnector.databaseLock.unlock(); // The remaining work of this function does not use the database.
}
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextsFound + " (out of " + urlReportsSize + " | about " + df.format(numFullTextsFound * 100.0 / urlReportsSize) + "%).");
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextsFound.get() + " (out of " + urlReportsSize + " | about " + df.format(numFullTextsFound.get() * 100.0 / urlReportsSize) + "%).");
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches.get());
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet());
int numAllFullTexts = allFileNames.size();
@ -285,7 +318,7 @@ public class FileUtils {
}
private void uploadFullTexts(String[] fileNames, String targetDirectory, String zipFileFullPath, HashMultimap<String, Payload> allFileNamesWithPayloads)
private void uploadFullTexts(String[] fileNames, String targetDirectory, String zipFileFullPath, SetMultimap<String, Payload> allFileNamesWithPayloads)
{
// Iterate over the files and upload them to S3.
//int numUploadedFiles = 0;