diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 4816522..de5edf6 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -325,63 +325,44 @@ public class FileUtils { int failedBatches = 0; for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) { + // TODO - Currently, for big assignments (e.g. 10000), it takes 2 mins (actually 1,5 mins after using the Zstandard compression) for the worker to zstd the files and return them FOR EACH BATCH + // Also it takes around 3 mins for the Controller to process the received files FOR EACH BATCH + // So, for 24 batches, it takes around 24 * 2 * 3 = 144 mins to process all the full-texts for each assignments-batch. + + // TODO - What if we could passing a new thread for each full-texts-batch, and make them "FIRE" one after the other. + // TODO - So, the 1st thread with the first batch starts and waits for the the first zstd file from the worker, + // Once it takes the zstd file it continues, but now the Worker is just sitting waiting.. So the 2nd thread fires and asks the new zstd. + // So, the worker will never be left waiting and the Controller will not wait for the Worker either..! + + // The worker will not have 2 parallel requests for zstd files, so the single CPU there will not be stressed to zstd many files in parallel. + // Yes the Controller may have the situation in which before finishing uploading the previously receive files to S3, it receives the new zstd from the Worker. + // TODO - BUT, we can make the new thread "WAIT" for the previous to finish. + + String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator; + Path curBatchPath; + try { // Create this batch-directory. + curBatchPath = Files.createDirectories(Paths.get(targetDirectory)); + // The base-directory will be created along with the first batch-directory. + } catch (Exception e) { + logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). + failedBatches ++; + continue; + } + List fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter); - HttpURLConnection conn; + String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd"; try { - conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId); - if ( conn == null ) { + if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) { failedBatches ++; - continue; // To the next batch. + continue; } } catch (RuntimeException re) { - // The "cause" was logged inside "getConnection()". - failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. - break; // The rest of the batches will likely fail as well. + failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. Some initial batches may have succeeded. + break; } - // Get the extracted files. - String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator; - Path curBatchPath = null; - try { - // Create this batch-directory. - curBatchPath = Files.createDirectories(Paths.get(targetDirectory)); - // The base-directory will be created along with the first batch directory. - - // Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url. - String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd"; - File zstdFile = new File(zstdFileFullPath); - - if ( ! saveArchive(conn, zstdFile) ) { - failedBatches ++; - continue; // To the next batch. - } - //logger.debug("The zstd file has been saved: " + zstdFileFullPath); // DEBUG! - - // We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future. - - fileDecompressor.decompressFiles(zstdFileFullPath, curBatchPath); - - String[] extractedFileNames = new File(targetDirectory).list(); - if ( (extractedFileNames == null) || (extractedFileNames.length <= 2) ) { // The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred.. - logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory); - failedBatches ++; - continue; // To the next batch. - } - else if ( (extractedFileNames.length -2) != fileNamesForCurBatch.size() ) { - logger.warn("The number of extracted files (" + (extractedFileNames.length -2) + ") was not equal to the number of the current-batch's (" + batchCounter + ") files (" + fileNamesForCurBatch.size() + ")."); - // We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to , - // since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null. - } - - uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads); - - } catch (Exception e) { - logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). + if ( ! decompressAndUploadFullTexts(zstdFileFullPath, curBatchPath, targetDirectory, fileNamesForCurBatch, batchCounter, allFileNamesWithPayloads, assignmentsBatchCounter) ) failedBatches ++; - } finally { - if ( curBatchPath != null ) - deleteDirectory(curBatchPath.toFile()); - } } // End of batches. updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed). @@ -395,6 +376,55 @@ public class FileUtils { } + private boolean getAndSaveFullTextBatch(List fileNamesForCurBatch, String baseUrl, long assignmentsBatchCounter, int batchCounter, int numOfBatches, + String zstdFileFullPath, String workerId) throws RuntimeException + { + HttpURLConnection conn; + try { + if ( (conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null ) + return false; + } catch (RuntimeException re) { + // The "cause" was logged inside "getConnection()". + throw re; + } + + // Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url. + File zstdFile = new File(zstdFileFullPath); + if ( ! saveArchive(conn, zstdFile) ) + return false; + + //logger.trace("The zstd file has been saved: " + zstdFileFullPath); // DEBUG! + + // We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future. + return true; + } + + + private boolean decompressAndUploadFullTexts(String zstdFileFullPath, Path curBatchPath, String targetDirectory, List fileNamesForCurBatch, int batchCounter, + SetMultimap allFileNamesWithPayloads, long assignmentsBatchCounter) + { + try { + fileDecompressor.decompressFiles(zstdFileFullPath, curBatchPath); + String[] extractedFileNames = new File(targetDirectory).list(); + if ( (extractedFileNames == null) || (extractedFileNames.length <= 2) ) { // The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred.. + logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory); + return false; + } else if ( (extractedFileNames.length - 2) != fileNamesForCurBatch.size() ) { + logger.warn("The number of extracted files (" + (extractedFileNames.length - 2) + ") was not equal to the number of the current-batch's (" + batchCounter + ") files (" + fileNamesForCurBatch.size() + ")."); + // We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to , + // since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null. + } + uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads); + return true; + } catch (Exception e) { + logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). + return false; + } finally { + deleteDirectory(curBatchPath.toFile()); + } + } + + private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException { baseUrl += batchNum + "/";