Split the full-texts-batch's main handling-code to two separate methods, which can be used in parallel by two threads, in the future.

This commit is contained in:
Lampros Smyrnaios 2023-06-14 17:16:38 +03:00
parent e2776c50d0
commit c37f157f51
1 changed files with 79 additions and 49 deletions

View File

@ -325,63 +325,44 @@ public class FileUtils {
int failedBatches = 0; int failedBatches = 0;
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) { for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) {
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter); // TODO - Currently, for big assignments (e.g. 10000), it takes 2 mins (actually 1,5 mins after using the Zstandard compression) for the worker to zstd the files and return them FOR EACH BATCH
HttpURLConnection conn; // Also it takes around 3 mins for the Controller to process the received files FOR EACH BATCH
try { // So, for 24 batches, it takes around 24 * 2 * 3 = 144 mins to process all the full-texts for each assignments-batch.
conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId);
if ( conn == null ) { // TODO - What if we could passing a new thread for each full-texts-batch, and make them "FIRE" one after the other.
// TODO - So, the 1st thread with the first batch starts and waits for the the first zstd file from the worker,
// Once it takes the zstd file it continues, but now the Worker is just sitting waiting.. So the 2nd thread fires and asks the new zstd.
// So, the worker will never be left waiting and the Controller will not wait for the Worker either..!
// The worker will not have 2 parallel requests for zstd files, so the single CPU there will not be stressed to zstd many files in parallel.
// Yes the Controller may have the situation in which before finishing uploading the previously receive files to S3, it receives the new zstd from the Worker.
// TODO - BUT, we can make the new thread "WAIT" for the previous to finish.
String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator;
Path curBatchPath;
try { // Create this batch-directory.
curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// The base-directory will be created along with the first batch-directory.
} catch (Exception e) {
logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
failedBatches ++; failedBatches ++;
continue; // To the next batch. continue;
}
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd";
try {
if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) {
failedBatches ++;
continue;
} }
} catch (RuntimeException re) { } catch (RuntimeException re) {
// The "cause" was logged inside "getConnection()". failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. Some initial batches may have succeeded.
failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. break;
break; // The rest of the batches will likely fail as well.
} }
// Get the extracted files. if ( ! decompressAndUploadFullTexts(zstdFileFullPath, curBatchPath, targetDirectory, fileNamesForCurBatch, batchCounter, allFileNamesWithPayloads, assignmentsBatchCounter) )
String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator;
Path curBatchPath = null;
try {
// Create this batch-directory.
curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// The base-directory will be created along with the first batch directory.
// Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url.
String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd";
File zstdFile = new File(zstdFileFullPath);
if ( ! saveArchive(conn, zstdFile) ) {
failedBatches ++; failedBatches ++;
continue; // To the next batch.
}
//logger.debug("The zstd file has been saved: " + zstdFileFullPath); // DEBUG!
// We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future.
fileDecompressor.decompressFiles(zstdFileFullPath, curBatchPath);
String[] extractedFileNames = new File(targetDirectory).list();
if ( (extractedFileNames == null) || (extractedFileNames.length <= 2) ) { // The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred..
logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory);
failedBatches ++;
continue; // To the next batch.
}
else if ( (extractedFileNames.length -2) != fileNamesForCurBatch.size() ) {
logger.warn("The number of extracted files (" + (extractedFileNames.length -2) + ") was not equal to the number of the current-batch's (" + batchCounter + ") files (" + fileNamesForCurBatch.size() + ").");
// We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to <null>,
// since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null.
}
uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads);
} catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
failedBatches ++;
} finally {
if ( curBatchPath != null )
deleteDirectory(curBatchPath.toFile());
}
} // End of batches. } // End of batches.
updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed). updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed).
@ -395,6 +376,55 @@ public class FileUtils {
} }
private boolean getAndSaveFullTextBatch(List<String> fileNamesForCurBatch, String baseUrl, long assignmentsBatchCounter, int batchCounter, int numOfBatches,
String zstdFileFullPath, String workerId) throws RuntimeException
{
HttpURLConnection conn;
try {
if ( (conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null )
return false;
} catch (RuntimeException re) {
// The "cause" was logged inside "getConnection()".
throw re;
}
// Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url.
File zstdFile = new File(zstdFileFullPath);
if ( ! saveArchive(conn, zstdFile) )
return false;
//logger.trace("The zstd file has been saved: " + zstdFileFullPath); // DEBUG!
// We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future.
return true;
}
private boolean decompressAndUploadFullTexts(String zstdFileFullPath, Path curBatchPath, String targetDirectory, List<String> fileNamesForCurBatch, int batchCounter,
SetMultimap<String, Payload> allFileNamesWithPayloads, long assignmentsBatchCounter)
{
try {
fileDecompressor.decompressFiles(zstdFileFullPath, curBatchPath);
String[] extractedFileNames = new File(targetDirectory).list();
if ( (extractedFileNames == null) || (extractedFileNames.length <= 2) ) { // The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred..
logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory);
return false;
} else if ( (extractedFileNames.length - 2) != fileNamesForCurBatch.size() ) {
logger.warn("The number of extracted files (" + (extractedFileNames.length - 2) + ") was not equal to the number of the current-batch's (" + batchCounter + ") files (" + fileNamesForCurBatch.size() + ").");
// We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to <null>,
// since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null.
}
uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads);
return true;
} catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
return false;
} finally {
deleteDirectory(curBatchPath.toFile());
}
}
private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException
{ {
baseUrl += batchNum + "/"; baseUrl += batchNum + "/";