Improve efficiency and performance when processing the full-texts.

This commit is contained in:
Lampros Smyrnaios 2022-02-08 15:02:13 +02:00
parent 5819bf584b
commit d2ed9cd9ed
2 changed files with 43 additions and 144 deletions

View File

@ -253,7 +253,7 @@ public class UrlController {
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available and continue with writing the attempts and the payloads.
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
}
// Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons.

View File

@ -86,16 +86,12 @@ public class FileUtils {
}
private final String numAndExtension = "(?:\\([\\d]+\\))?\\.[\\w]{2,10}";
private final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)" + numAndExtension + "$");
private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+" + numAndExtension + ")$");
@Value("services.pdfaggregation.controller.baseTargetLocation")
private String baseTargetLocation;
private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:()]+\\.[\\w]{2,10})$");
private final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) {
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
@ -111,7 +107,7 @@ public class FileUtils {
// Get the file-locations.
int numFullTextUrlsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
HashMultimap<String, String> allFileNamesWithIDsHashMap = HashMultimap.create((urlReports.size() / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
HashMultimap<String, Payload> allFileNamesWithPayloads = HashMultimap.create((urlReports.size() / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
String getFileLocationForHashQuery = "select `location` from " + databaseName + ".payload where `hash` = ? limit 1" ;
ImpalaConnector.databaseLock.lock();
@ -132,7 +128,6 @@ public class FileUtils {
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
// Use the same prepared-statement for all requests, to improve speed (just like when inserting similar thing to the DB).
String fileHash = payload.getHash();
if ( fileHash != null ) {
try {
@ -144,6 +139,7 @@ public class FileUtils {
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection.
// Unless we do what it is said above, do not continue to the next UrlReport, this query-exception should not disrupt the normal full-text processing.
}
if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded.
@ -165,8 +161,8 @@ public class FileUtils {
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
continue;
}
allFileNamesWithIDsHashMap.put(fileNameWithExtension, payload.getId()); // The keys and the values are not duplicate. Task with ID-1 might have an "ID-1.pdf" file.
// While a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}
}// end-for
@ -175,7 +171,7 @@ public class FileUtils {
logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ").");
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithIDsHashMap.keySet());
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet());
int numAllFullTexts = allFileNames.size();
if ( numAllFullTexts == 0 ) {
logger.warn("The retrieved files where < 0 > for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
@ -192,14 +188,6 @@ public class FileUtils {
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
// Index all Payloads to be more efficiently searched later.
HashMultimap<String, Payload> payloadsHashMultimap = HashMultimap.create((urlReports.size() / 3), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null )
payloadsHashMultimap.put(payload.getId(), payload);
}
String curAssignmentsBaseLocation = baseTargetLocation + "assignments_" + assignmentsBatchCounter + File.separator;
File curAssignmentsBaseDir = new File(curAssignmentsBaseLocation);
@ -208,7 +196,6 @@ public class FileUtils {
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId);
if ( conn == null ) {
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
continue; // To the next batch.
}
@ -224,11 +211,9 @@ public class FileUtils {
File zipFile = new File(zipFileFullPath);
if ( ! saveZipFile(conn, zipFile) ) {
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
continue; // To the next batch.
}
//logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG!
fileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
@ -236,7 +221,6 @@ public class FileUtils {
String[] fileNames = new File(targetDirectory).list();
if ( (fileNames == null) || (fileNames.length <= 1 ) ) { // The directory might have only one file, the "zip-file".
logger.error("No full-text fileNames where extracted from directory: " + targetDirectory);
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
continue; // To the next batch.
}
@ -249,22 +233,17 @@ public class FileUtils {
if ( fileFullPath.equals(zipFileFullPath) ) // Exclude the zip-file from uploading.
continue;
// Check if this stored file is related to one or more IDs from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedIDs".
Set<String> fileRelatedIDs = allFileNamesWithIDsHashMap.get(fileName);
if ( fileRelatedIDs.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithIDsHashMap" HashMultimap.
logger.error("The stored file \"" + fileName + "\" is not related to any ID which had a file requested from the Worker!");
// Check if this stored file is related to one or more Payloads from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedPayloads".
Set<Payload> fileRelatedPayloads = allFileNamesWithPayloads.get(fileName);
if ( fileRelatedPayloads.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithPayloads" HashMultimap.
logger.error("The stored file \"" + fileName + "\" is not related to any Payload returned from the Worker!");
continue;
}
if ( isFileNameProblematic(fileName, payloadsHashMultimap) ) // Do some more checks.
continue;
// At this point, we know that this file is related with one or more IDs of the payloads AND it has a valid fileName.
// Let's try to upload the file to S3 and update the payloads of all related IDs, either in successful upload or not.
// Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end).
try {
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url, fileName);
setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url);
numUploadedFiles ++;
} catch (Exception e) {
logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e);
@ -277,24 +256,21 @@ public class FileUtils {
} catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
}
} // End of batches.
// Delete this assignments-num directory.
updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an s3Url have < null > file-data (some batches or uploads might have failed).
deleteDirectory(curAssignmentsBaseDir);
// Check if none of the batches were handled..
if ( failedBatches == numOfBatches ) {
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return UploadFullTextsResponse.unsuccessful;
} else {
replaceNotUploadedFileLocations(urlReports); // Make sure all records without an s3Url have < null > file-data.
} else
return UploadFullTextsResponse.successful;
}
}
private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) {
baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
@ -317,6 +293,7 @@ public class FileUtils {
return conn;
}
private String getErrorMessageFromResponseBody(HttpURLConnection conn) {
StringBuilder errorMsgStrB = new StringBuilder(500);
try ( BufferedReader br = new BufferedReader(new InputStreamReader(conn.getErrorStream())) ) { // Try-with-resources
@ -336,6 +313,7 @@ public class FileUtils {
}
}
private List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch) {
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
int endingIndex = (curBatch * numOfFullTextsPerBatch);
@ -353,6 +331,7 @@ public class FileUtils {
return fileNamesOfCurBatch;
}
private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch) {
final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
sb.append(baseUrl);
@ -365,6 +344,7 @@ public class FileUtils {
return sb.toString();
}
private final int bufferSize = 20971520; // 20 MB
public boolean saveZipFile(HttpURLConnection conn, File zipFile) {
@ -394,129 +374,47 @@ public class FileUtils {
}
}
private boolean isFileNameProblematic(String fileName, HashMultimap<String, Payload> payloadsHashMultimap) {
// Get the ID of the file.
Matcher matcher = FILENAME_ID.matcher(fileName);
if ( !matcher.matches() ) {
logger.error("The given fileName \"" + fileName + "\" was invalid! Could not be matched with matcher: " + matcher);
return true;
}
String fileID = matcher.group(1);
if ( (fileID == null) || fileID.isEmpty() ) {
logger.error("The given fileName \"" + fileName + "\" was invalid. No fileID was extracted!");
return true;
}
// Take the payloads which are related with this ID. An ID might have multiple original-urls, thus multiple payloads.
// The ID we have here, is the one from the first record which reached to this file.
// There might be other records pointing to this file. But, in order to mark this file as "valid", we have to match it with at least one of the records-IDs.
// We do this process to avoid handling and uploading irrelevant files which could find their way to the working directory (either because of a Worker's error or any other type of malfunction or even malicious action).
Set<Payload> payloads = payloadsHashMultimap.get(fileID);
if ( payloads.isEmpty() ) {
logger.error("The given fileID \"" + fileID + "\" was not part of the \"payloadsHashMultimap\"!");
return true;
}
// Search through the payloads to find at least one match, in order for this file to NOT be "problematic".
for ( Payload payload : payloads )
{
String location = payload.getLocation();
if ( (location != null) && location.endsWith(fileName) )
return false; // It's not problematic.
}
logger.error("None of the locations of the payloads matched with the ID \"" + fileID + "\" are ending with the filename \"" + fileName + "\", as it was supposed to.\nThe related payloads are: " + payloads);
return true;
}
/**
* This method updates the UrlReports to not point to any downloaded fullText files.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails.
* Then, we don't want any "links" to locally stored files, which will be deleted.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails, and we don't want any "links" to locally stored files, which will be deleted.
* If the "shouldCheckAndKeepS3UploadedFiles" is set to "true", then the payloads which have their file uploaded to the S3-ObjectStore, are excluded.
* @param urlReports
* @param shouldCheckAndKeepS3UploadedFiles
*/
public void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports) {
public void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports, boolean shouldCheckAndKeepS3UploadedFiles) {
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null )
setUnretrievedFullText(payload);
}
}
if ( payload == null )
continue;
private void replaceNotUploadedFileLocations(List<UrlReport> urlReports) {
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null ) {
if ( shouldCheckAndKeepS3UploadedFiles ) {
String fileLocation = payload.getLocation();
if ( (fileLocation != null) && (! s3ObjectStore.isLocationInStore(fileLocation)) )
setUnretrievedFullText(payload);
if ( (fileLocation == null) || s3ObjectStore.isLocationInStore(fileLocation) )
continue;
}
// Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept.
payload.setLocation(null);
payload.setHash(null);
payload.setMime_type(null);
payload.setSize(null);
}
}
public void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMultimap<String, Payload> payloadsHashMultimap, List<String> fileNames) {
for ( String fileName : fileNames ) {
// Get the ID of the file.
Matcher matcher = FILENAME_ID.matcher(fileName);
if ( !matcher.matches() ) {
continue;
}
String id = matcher.group(1);
if ( (id == null) || id.isEmpty() ) {
continue;
}
Set<Payload> payloads = payloadsHashMultimap.get(id);
// Set for all payloads connected to this ID.
for ( Payload payload : payloads )
if ( payload != null )
setUnretrievedFullText(payload); // It changes the payload in the original UrlReport list.
}
}
public void setUnretrievedFullText(Payload payload) {
// Mark the full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept.
payload.setLocation(null);
payload.setHash(null);
payload.setMime_type(null);
payload.setSize(null);
}
/**
* Set the fileLocation for all those IDs related to the File. The IDs may have one or more payloads.
* @param fileIDs
* @param payloadsHashMultimap
* Set the fileLocation for all those Payloads related to the File.
* @param filePayloads
* @param s3Url
* @param fileNameWithExt
*/
public void setFullTextForMultipleIDs(Set<String> fileIDs, HashMultimap<String, Payload> payloadsHashMultimap, String s3Url, String fileNameWithExt) {
for ( String id : fileIDs ) {
Set<Payload> payloads = payloadsHashMultimap.get(id);
if ( payloads.isEmpty() ) {
logger.error("The given id \"" + id + "\" (coming from the \"allFileNamesWithIDsHashMap\"), is not found inside the \"payloadsHashMultimap\"!");
continue;
}
for ( Payload payload : payloads ) {
// Update only for the records which led to a file, not all the records of this ID (an ID might have multiple original_urls pointing to different directions).
String currentFileLoc = payload.getLocation();
if ( currentFileLoc != null ) {
// Check that the current payload does not have a different file waiting to be uploaded. It is possible that multiple Payloads with the same ID, point to different files (because of different sourceUrls).
Matcher matcher = FILENAME_WITH_EXTENSION.matcher(currentFileLoc);
if ( matcher.matches() ) {
String curFileNameWithExtension = matcher.group(1);
if ( (curFileNameWithExtension != null) && !curFileNameWithExtension.isEmpty()
&& ! curFileNameWithExtension.equals(fileNameWithExt) ) { // If the file of this payload is NOT that same with the given one, then do NOT update it.
continue; // This different file, is waiting its upload-time, in the loop, where this method was called.
}
}
payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker.
}
}
}
public void setFullTextForMultiplePayloads(Set<Payload> filePayloads, String s3Url) {
for ( Payload payload : filePayloads )
if ( payload != null )
payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker.
}
public boolean deleteDirectory(File curBatchDir) {
try {
org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir);
@ -526,4 +424,5 @@ public class FileUtils {
return false;
}
}
}