diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 5790df6..50e1167 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -6,6 +6,7 @@ import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; import eu.openaire.urls_controller.util.FileUtils; +import eu.openaire.urls_controller.util.FilesHandler; import eu.openaire.urls_controller.util.GenericUtils; import eu.openaire.urls_controller.util.ParquetFileUtils; import io.micrometer.core.annotation.Timed; @@ -49,6 +50,9 @@ public class UrlsServiceImpl implements UrlsService { @Autowired private JdbcTemplate jdbcTemplate; + @Autowired + private FilesHandler filesHandler; + @Autowired private FileUtils fileUtils; @@ -277,7 +281,7 @@ public class UrlsServiceImpl implements UrlsService { boolean hasFulltexts = true; // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". - FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, curWorkerId); + FileUtils.UploadFullTextsResponse uploadFullTextsResponse = filesHandler.getAndUploadFullTexts(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, curWorkerId); if ( uploadFullTextsResponse == null ) { // Nothing to post to the Worker, since we do not have the worker's info. // Rename the worker-report-file to indicate its "failure", so that the scheduler can pick it up and retry processing it. @@ -287,7 +291,7 @@ public class UrlsServiceImpl implements UrlsService { } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter); // The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available. - fileUtils.removeUnretrievedFullTextsFromUrlReports(urlReports, false); + filesHandler.removeUnretrievedFullTextsFromUrlReports(urlReports, false); // We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store. // We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later. // The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later. diff --git a/src/main/java/eu/openaire/urls_controller/util/FileDecompressor.java b/src/main/java/eu/openaire/urls_controller/util/FileDecompressor.java index 1ec746a..560b35b 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileDecompressor.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileDecompressor.java @@ -1,29 +1,64 @@ package eu.openaire.urls_controller.util; +import com.google.common.collect.SetMultimap; +import eu.openaire.urls_controller.models.Payload; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.File; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; @Component public class FileDecompressor { private static final Logger logger = LoggerFactory.getLogger(FileDecompressor.class); + @Autowired + private FileUtils fileUtils; + + + boolean decompressAndUploadFullTexts(String zstdFileFullPath, Path curBatchPath, String targetDirectory, List fileNamesForCurBatch, int batchCounter, SetMultimap allFileNamesWithPayloads, long assignmentsBatchCounter) + { + try { + decompressFiles(zstdFileFullPath, curBatchPath); + String[] extractedFileNames = new File(targetDirectory).list(); + if ( extractedFileNames == null ) { + logger.error("There was an error when acquiring the list of extracted files of directory: " + targetDirectory); + return false; + } else if ( extractedFileNames.length == 0 ) { // The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred.. + logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory); + return false; + } else if ( extractedFileNames.length != fileNamesForCurBatch.size() ) { + logger.warn("The number of extracted files (" + extractedFileNames.length + ") was not equal to the number of files (" + fileNamesForCurBatch.size() + ") of the current batch_" + batchCounter); + // We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to , + // since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null. + } + fileUtils.uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads, batchCounter); + return true; + } catch (Exception e) { + logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). + return false; + } finally { + fileUtils.deleteDirectory(curBatchPath.toFile()); + } + } public void decompressFiles(String zstdSource, Path targetDir) throws Exception { // Decompress the zstd file. - Path tarPath = Paths.get(StringUtils.replace(zstdSource, ".zstd", "", 1)); // Remove the ".zstd" extension. + String tarPathStr = StringUtils.replace(zstdSource, ".zstd", "", 1); // Remove the ".zstd" extension. + Path tarPath = Paths.get(tarPathStr); int readByte = -1; try ( ZstdCompressorInputStream zsIn = new ZstdCompressorInputStream(new BufferedInputStream(Files.newInputStream(Paths.get(zstdSource)), FileUtils.tenMb)); @@ -31,6 +66,8 @@ public class FileDecompressor { { while ( (readByte = zsIn.read()) != -1 ) out.write(readByte); + } finally { + fileUtils.deleteFile(zstdSource); // Delete the initial zstd file. } // Now we have a decompressed tar-file, which we will Un-tar, in order to extract the full-text files. @@ -46,6 +83,8 @@ public class FileDecompressor { } // The exception will be given to the caller method. // No need to close the tarEntry (no "close()" method is provided). } + } finally { + fileUtils.deleteFile(tarPathStr); // Delete the decompressed tar file. } // Now we have a batch-directory which contains the tar-file along with the extracted full-text files. diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 2b18cc4..5afc57b 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -3,11 +3,9 @@ package eu.openaire.urls_controller.util; import com.google.common.collect.HashMultimap; import com.google.common.collect.SetMultimap; import eu.openaire.urls_controller.configuration.DatabaseConnector; -import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.models.Error; import eu.openaire.urls_controller.models.Payload; import eu.openaire.urls_controller.models.UrlReport; -import eu.openaire.urls_controller.models.WorkerInfo; import org.apache.commons.io.FileDeleteStrategy; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -21,15 +19,15 @@ import org.springframework.stereotype.Component; import java.io.*; import java.net.ConnectException; import java.net.HttpURLConnection; -import java.net.URL; import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; import java.sql.SQLException; -import java.text.DecimalFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; @@ -50,219 +48,29 @@ public class FileUtils { @Autowired private S3ObjectStore s3ObjectStore; - @Autowired - private FileDecompressor fileDecompressor; - - @Value("${services.pdfaggregation.worker.port}") - private String workerPort; - - public enum UploadFullTextsResponse {successful, successful_without_fulltexts, unsuccessful} - public String baseFilesLocation; public static final String workingDir = System.getProperty("user.dir") + File.separator; private final boolean isTestEnvironment; - public FileUtils (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation, @Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment) { - if ( !baseFilesLocation.endsWith(File.separator) ) - baseFilesLocation += File.separator; - - if ( !baseFilesLocation.startsWith(File.separator) ) - baseFilesLocation = workingDir + baseFilesLocation; - - this.baseFilesLocation = baseFilesLocation; + public FileUtils (@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment) { this.isTestEnvironment = isTestEnvironment; } - public static final DecimalFormat df = new DecimalFormat("0.00"); - // The following regex might be useful in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension". // Possible full-filenames are: "path1/path2/ID.pdf", "ID2.pdf", "path1/path2/ID(12).pdf", "ID2(25).pdf" public static final Pattern FILEPATH_ID_EXTENSION = Pattern.compile("([^.()]+/)?((([^/()]+)[^./]*)(\\.[\\w]{2,10}))$"); - private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames). - public static final ExecutorService hashMatchingExecutor = Executors.newFixedThreadPool(6); // TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle? - public UploadFullTextsResponse getAndUploadFullTexts(List urlReports, int sizeOfUrlReports, long assignmentsBatchCounter, String workerId) throws RuntimeException - { - // The Controller have to request the files from the Worker, in order to upload them to the S3. - // We UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database. - - String workerIp = null; - WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); - if ( workerInfo == null ) { - logger.error("Could not retrieve the info of worker: " + workerId); - return null; - } else - workerIp = workerInfo.getWorkerIP(); // This won't be null. - - // Get the file-locations. - int numValidFullTextsFound = 0; - int numFilesFoundFromPreviousAssignmentsBatches = 0; - int numFullTextsWithProblematicLocations = 0; - - HashMultimap hashesWithPayloads = getHashesWithPayloads(urlReports, sizeOfUrlReports); // Holds multiple payloads for the same fileHash. - Set fileHashes = hashesWithPayloads.keySet(); - int fileHashesSetSize = fileHashes.size(); // Get the size of the keysSet, instead of the whole multimap. - if ( fileHashesSetSize == 0 ) { - logger.warn("No fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\"."); - return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error. - } - - HashMap hashLocationMap = getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter, "assignments"); - HashMultimap allFileNamesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. - - for ( String fileHash : fileHashes ) - { - for ( Payload payload : hashesWithPayloads.get(fileHash) ) - { - String alreadyFoundFileLocation = hashLocationMap.get(fileHash); // Only one location has been retrieved per fileHash. - if ( alreadyFoundFileLocation != null ) { - // Fill the payloads with locations from the "previously-found-hashes." - payload.setLocation(alreadyFoundFileLocation); - if ( logger.isTraceEnabled() ) - logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG! - numFilesFoundFromPreviousAssignmentsBatches ++; - numValidFullTextsFound ++; // We trust the location being valid.. - } - else { // This file has not been found before.. - // Extract the "fileNameWithExtension" to be added in the HashMultimap. - String fileLocation = payload.getLocation(); - Matcher matcher = FILEPATH_ID_EXTENSION.matcher(fileLocation); - if ( ! matcher.matches() ) { - logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILEPATH_ID_EXTENSION); - numFullTextsWithProblematicLocations ++; - continue; - } - String fileNameWithExtension = matcher.group(2); - if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) { - logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILEPATH_ID_EXTENSION); - numFullTextsWithProblematicLocations ++; - continue; - } - - numValidFullTextsFound ++; - allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate. - // Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again. - } - } - } - - if ( numFullTextsWithProblematicLocations > 0 ) - logger.warn(numFullTextsWithProblematicLocations + " files had problematic names."); - - if ( numValidFullTextsFound == 0 ) { - logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId); - return UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either. - } - - ArrayList allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file. - int numFullTextsToBeRequested = allFileNames.size(); - if ( numFullTextsToBeRequested == 0 ) { - logger.info(numValidFullTextsFound + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before."); - return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error. - } - - logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound * 100.0 / sizeOfUrlReports) + "%)."); - - // TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time.. - - logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches); - - // Request the full-texts in batches, compressed in a zstd tar file. - int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch); - int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch); - if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are. - numOfBatches++; - logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files)."); - } else - logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each)."); - - // Check if one full text is left out because of the division. Put it int the last batch. - String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; - - // TODO - The worker should send the port in which it accepts requests, along with the current request. - // TODO - The least we have to do it to expose the port-assignment somewhere more obvious like inside the "application.yml" file. - - String curAssignmentsBaseLocation = baseFilesLocation + "assignments_" + assignmentsBatchCounter + File.separator; - // Note: the "curAssignmentsBaseLocation"-directory will be created once the first batch subdirectory is called for creation. - - int failedBatches = 0; - for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) { - // TODO - Currently, for big assignments (e.g. 10000), it takes 2 mins (actually 1,5 mins after using the Zstandard compression) for the worker to zstd the files and return them FOR EACH BATCH - // Also it takes around 3 mins for the Controller to process the received files FOR EACH BATCH - // So, for 24 batches, it takes around 24 * 2 * 3 = 144 mins to process all the full-texts for each assignments-batch. - - // TODO - What if we could passing a new thread for each full-texts-batch, and make them "FIRE" one after the other. - // TODO - So, the 1st thread with the first batch starts and waits for the the first zstd file from the worker, - // Once it takes the zstd file it continues, but now the Worker is just sitting waiting.. So the 2nd thread fires and asks the new zstd. - // So, the worker will never be left waiting and the Controller will not wait for the Worker either..! - - // The worker will not have 2 parallel requests for zstd files, so the single CPU there will not be stressed to zstd many files in parallel. - // Yes the Controller may have the situation in which before finishing uploading the previously receive files to S3, it receives the new zstd from the Worker. - // TODO - BUT, we can make the new thread "WAIT" for the previous to finish. - - String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator; - Path curBatchPath; - try { // Create this batch-directory. - curBatchPath = Files.createDirectories(Paths.get(targetDirectory)); - // The base-directory will be created along with the first batch-directory. - } catch (Exception e) { - logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). - failedBatches ++; - continue; - } - - List fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numFullTextsToBeRequested, batchCounter); - String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd"; - try { - if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) { - failedBatches ++; - continue; - } - } catch (RuntimeException re) { - failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. Some initial batches may have succeeded. - break; - } - - if ( ! decompressAndUploadFullTexts(zstdFileFullPath, curBatchPath, targetDirectory, fileNamesForCurBatch, batchCounter, allFileNamesWithPayloads, assignmentsBatchCounter) ) - failedBatches ++; - } // End of batches. - - if ( failedBatches == numOfBatches ) - logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId); - - removeUnretrievedFullTextsFromUrlReports(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed). - deleteDirectory(new File(curAssignmentsBaseLocation)); - - // Check and warn about the number of failed payloads. - // Possible reasons: failed to check their hash in the DB, the file was not found inside the worker, whole batch failed to be delivered from the worker, files failed t be uploaded to S3 - // Retrieve the payloads from the existing urlReports. - long finalPayloadsCounter = urlReports.parallelStream() - .map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null))) - .count(); - int numInitialPayloads = (numValidFullTextsFound + numFullTextsWithProblematicLocations); - long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter); - if ( numFailedPayloads == numInitialPayloads ) { - // This will also be the case if there was no DB failure, but all the batches have failed. - logger.error("None of the " + numInitialPayloads + " payloads could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId); - return UploadFullTextsResponse.unsuccessful; - } else if ( numFailedPayloads > 0 ) - logger.warn(numFailedPayloads + " payloads (out of " + numInitialPayloads + ") failed to be processed for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId); - - return UploadFullTextsResponse.successful; - } - - public HashMultimap getHashesWithPayloads(List urlReports, int sizeOfUrlReports) { HashMultimap hashesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple payloads for the same fileHash. @@ -333,113 +141,20 @@ public class FileUtils { } - private boolean getAndSaveFullTextBatch(List fileNamesForCurBatch, String baseUrl, long assignmentsBatchCounter, int batchCounter, int numOfBatches, - String zstdFileFullPath, String workerId) throws RuntimeException - { - HttpURLConnection conn; - try { - if ( (conn = getConnectionForFullTextBatch(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null ) - return false; - } catch (RuntimeException re) { - // The "cause" was logged inside "getConnection()". - throw re; - } - - // Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url. - return saveArchive(conn, new File(zstdFileFullPath)); - // We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future. - } - - - private boolean decompressAndUploadFullTexts(String zstdFileFullPath, Path curBatchPath, String targetDirectory, List fileNamesForCurBatch, int batchCounter, - SetMultimap allFileNamesWithPayloads, long assignmentsBatchCounter) - { - try { - fileDecompressor.decompressFiles(zstdFileFullPath, curBatchPath); - String[] extractedFileNames = new File(targetDirectory).list(); - if ( (extractedFileNames == null) || (extractedFileNames.length <= 2) ) { // The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred.. - logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory); - return false; - } else if ( (extractedFileNames.length - 2) != fileNamesForCurBatch.size() ) { - logger.warn("The number of extracted files (" + (extractedFileNames.length - 2) + ") was not equal to the number of files (" + fileNamesForCurBatch.size() + ") of the current batch_" + batchCounter); - // We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to , - // since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null. - } - uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads, batchCounter); - return true; - } catch (Exception e) { - logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). - return false; - } finally { - deleteDirectory(curBatchPath.toFile()); - } - } - - - private HttpURLConnection getConnectionForFullTextBatch(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException - { - baseUrl += batchNum + "/"; - String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch); - //logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]"); - try { - HttpURLConnection conn = (HttpURLConnection) new URL(requestUrl).openConnection(); - conn.setRequestMethod("GET"); - conn.setRequestProperty("User-Agent", "UrlsController"); - - // TODO - Write the fileNames in the RequestBody, so that we can include as many as we want in each request. - // Right now, we can include only up to 70-80 fileNames in the url-string. - // TODO - We need to add the fileNames in the requestBody BEFORE we connect. So we will need to refactor the code to work in that order. - /*OutputStream os = conn.getOutputStream(); - OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8); - osw.write(fileNamesForCurBatch_separated_by_comma); - osw.flush(); - osw.close(); - os.close();*/ - - - // TODO - The above update will also enable is to improve edge-case management like making sure we do not create a whole new batch for just a few files.. - // Check this case for example, where we have one extra batch with the network, compression-decompression, transfer, uploading ect overhead: - // 2023-02-06 12:17:26.893 [http-nio-1880-exec-8] DEBUG e.o.urls_controller.util.FileUtils.getAndUploadFullTexts(@235) - The assignments_12 have 211 distinct non-already-uploaded fullTexts. - // Going to request them from the Worker "worker_X", in 4 batches (70 files each, except for the final batch, which will have 1 files). - - // If we are not limited by the url-length we can easily say that if less than 10 files remain for the last batch, then add them to the previous batch (eg. the last batch will have 79 files) - // If equal to 10 or more files remain, then we will make an extra batch. - - conn.connect(); - int statusCode = conn.getResponseCode(); - if ( statusCode == -1 ) { // Invalid HTTP-Response. - logger.warn("Problem when getting the \"status-code\" for url: " + requestUrl); - throw new RuntimeException(); // Avoid any other batches. - } else if ( statusCode != 200 ) { - String errMsg = getMessageFromResponseBody(conn, true); - logger.warn("HTTP-" + statusCode + ": " + errMsg + "\n\nProblem when requesting the ZstdFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl); - if ( ((statusCode >= 500) && (statusCode <= 599)) - || ((statusCode == 400) && ((errMsg != null) && errMsg.contains("The base directory for assignments_" + assignmentsBatchCounter + " was not found"))) ) - throw new RuntimeException(); // Throw an exception to indicate that the Worker has problems and all remaining batches will fail as well. - return null; - } else - return conn; - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - String exMessage = e.getMessage(); - logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + GenericUtils.endOfLine + exMessage); - if ( exMessage.contains("Connection refused") ) { - logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!"); - throw new RuntimeException(); - } - return null; - } - } - - - private void uploadFullTexts(String[] fileNames, String targetDirectory, SetMultimap allFileNamesWithPayloads, int batchCounter) + void uploadFullTexts(String[] fileNames, String targetDirectory, SetMultimap allFileNamesWithPayloads, int batchCounter) { // Iterate over the files and upload them to S3. + + // TODO - Make the uploads run in parallel, using at most 4 threads. + // There might also be uploads from other assignment-batches (from other workers) running at the same time. + // But still, there are currently at most 3 (?) upload operations, ast the same time, each using 1 thread. + // It's most common to have just 1 or 2 of them. So each can easily use 2 or 4 threads to upload its own files. + //int numUploadedFiles = 0; for ( String fileName : fileNames ) { - if ( fileName.contains(".tar") ) // Exclude the tar-files from uploading (".tar" and ".tar.zstd"). + // Check if any of the ".tar" files (".tar" and ".tar.zstd") are still in the directory (in case their deletion failed) and exclude them from being uploaded. + if ( fileName.contains(".tar") ) continue; // Check if this stored file is related to one or more Payloads from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedPayloads". @@ -514,8 +229,7 @@ public class FileUtils { // Else, the record will have its file-data set to "null", in the end of the caller method (as it will not have an s3Url as its location). }//end filenames-for-loop - //logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -2) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore."); - // (fileNames.length -2) --> minus the zstd and the tar files. + //logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + fileNames.length + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore."); } @@ -581,46 +295,13 @@ public class FileUtils { } - private List getFileNamesForBatch(List allFileNames, int numAllFullTexts, int curBatch) - { - int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch); - int endingIndex = (curBatch * numOfFullTextsPerBatch); - if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small. - endingIndex = numAllFullTexts; - - final List fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch); - for ( int i = initialIndex; i < endingIndex; ++i ) { - try { - fileNamesOfCurBatch.add(allFileNames.get(i)); - } catch (IndexOutOfBoundsException ioobe) { - logger.error("IOOBE for i=" + i + GenericUtils.endOfLine + ioobe.getMessage(), ioobe); - } - } - return fileNamesOfCurBatch; - } - - - private String getRequestUrlForBatch(String baseUrl, List fileNamesForCurBatch) - { - final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50); - sb.append(baseUrl); - int numFullTextsCurBatch = fileNamesForCurBatch.size(); - for ( int j=0; j < numFullTextsCurBatch; ++j ){ - sb.append(fileNamesForCurBatch.get(j)); - if ( j < (numFullTextsCurBatch -1) ) - sb.append(","); - } - return sb.toString(); - } - - public static final int twentyFiveKb = 25_600; // 25 Kb public static final int halfMb = 524_288; // 0.5 Mb = 512 Kb = 524_288 bytes public static final int tenMb = (10 * 1_048_576); - public boolean saveArchive(HttpURLConnection conn, File zstdFile) + public boolean saveArchive(InputStream inputStream, File zstdFile) { - try ( BufferedInputStream inStream = new BufferedInputStream(conn.getInputStream(), tenMb); + try ( BufferedInputStream inStream = new BufferedInputStream(inputStream, tenMb); BufferedOutputStream outStream = new BufferedOutputStream(Files.newOutputStream(zstdFile.toPath()), tenMb) ) { int readBytes; @@ -635,36 +316,6 @@ public class FileUtils { } - /** - * This method updates the UrlReports to not point to any downloaded fullText files. - * This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails, and we don't want any "links" to locally stored files, which will be deleted. - * If the "shouldCheckAndKeepS3UploadedFiles" is set to "true", then the payloads which have their file uploaded to the S3-ObjectStore, are excluded. - * @param urlReports - * @param shouldCheckAndKeepS3UploadedFiles - */ - public void removeUnretrievedFullTextsFromUrlReports(List urlReports, boolean shouldCheckAndKeepS3UploadedFiles) - { - for ( UrlReport urlReport : urlReports ) { - Payload payload = urlReport.getPayload(); - if ( payload == null ) - continue; - - if ( shouldCheckAndKeepS3UploadedFiles ) { - String fileLocation = payload.getLocation(); - if ( (fileLocation == null) || s3ObjectStore.isLocationInStore(fileLocation) ) - continue; - } - - // Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text ("actual_url") will be kept, for now. - payload.setLocation(null); // This will cause the payload to not be inserted into the "payload" table in the database. Only the "attempt" record will be inserted. - payload.setHash(null); - payload.setMime_type(null); - payload.setSize(null); - // The id-url record will be called as a new assignment in the future. - } - } - - /** * This method searches the backlog of publications for the ones that have the same "original_url" or their "original_url" is equal to the "actual_url" or an existing payload. * Then, it generates a new "UrlReport" object, which has as a payload a previously generated one, which has different "id", "original_url". diff --git a/src/main/java/eu/openaire/urls_controller/util/FilesHandler.java b/src/main/java/eu/openaire/urls_controller/util/FilesHandler.java new file mode 100644 index 0000000..ab9dd12 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/util/FilesHandler.java @@ -0,0 +1,404 @@ +package eu.openaire.urls_controller.util; + +import com.google.common.collect.HashMultimap; +import eu.openaire.urls_controller.controllers.UrlsController; +import eu.openaire.urls_controller.models.Payload; +import eu.openaire.urls_controller.models.UrlReport; +import eu.openaire.urls_controller.models.WorkerInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; + +@Component +public class FilesHandler { + + private static final Logger logger = LoggerFactory.getLogger(FilesHandler.class); + + @Autowired + private FileUtils fileUtils; + + @Autowired + private FileDecompressor fileDecompressor; + + @Autowired + private S3ObjectStore s3ObjectStore; + + @Value("${services.pdfaggregation.worker.port}") + private String workerPort; + + public static final DecimalFormat df = new DecimalFormat("0.00"); + + + private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames). + + public String baseFilesLocation; + + + public FilesHandler (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation) { + if ( !baseFilesLocation.endsWith(File.separator) ) + baseFilesLocation += File.separator; + + if ( !baseFilesLocation.startsWith(File.separator) ) + baseFilesLocation = FileUtils.workingDir + baseFilesLocation; + + this.baseFilesLocation = baseFilesLocation; + } + + + private HttpURLConnection getConnectionForFullTextBatch(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException + { + baseUrl += batchNum + "/"; + String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch); + //logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]"); + try { + HttpURLConnection conn = (HttpURLConnection) new URL(requestUrl).openConnection(); + conn.setRequestMethod("GET"); + conn.setRequestProperty("User-Agent", "UrlsController"); + + // TODO - Write the fileNames in the RequestBody, so that we can include as many as we want in each request. + // Right now, we can include only up to 70-80 fileNames in the url-string. + // TODO - We need to add the fileNames in the requestBody BEFORE we connect. So we will need to refactor the code to work in that order. + /*OutputStream os = conn.getOutputStream(); + OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8); + osw.write(fileNamesForCurBatch_separated_by_comma); + osw.flush(); + osw.close(); + os.close();*/ + + + // TODO - The above update will also enable is to improve edge-case management like making sure we do not create a whole new batch for just a few files.. + // Check this case for example, where we have one extra batch with the network, compression-decompression, transfer, uploading ect overhead: + // 2023-02-06 12:17:26.893 [http-nio-1880-exec-8] DEBUG e.o.urls_controller.util.FileUtils.getAndUploadFullTexts(@235) - The assignments_12 have 211 distinct non-already-uploaded fullTexts. + // Going to request them from the Worker "worker_X", in 4 batches (70 files each, except for the final batch, which will have 1 files). + + // If we are not limited by the url-length we can easily say that if less than 10 files remain for the last batch, then add them to the previous batch (eg. the last batch will have 79 files) + // If equal to 10 or more files remain, then we will make an extra batch. + + conn.connect(); + int statusCode = conn.getResponseCode(); + if ( statusCode == -1 ) { // Invalid HTTP-Response. + logger.warn("Problem when getting the \"status-code\" for url: " + requestUrl); + throw new RuntimeException(); // Avoid any other batches. + } else if ( statusCode != 200 ) { + String errMsg = fileUtils.getMessageFromResponseBody(conn, true); + logger.warn("HTTP-" + statusCode + ": " + errMsg + "\n\nProblem when requesting the ZstdFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl); + if ( ((statusCode >= 500) && (statusCode <= 599)) + || ((statusCode == 400) && ((errMsg != null) && errMsg.contains("The base directory for assignments_" + assignmentsBatchCounter + " was not found"))) ) + throw new RuntimeException(); // Throw an exception to indicate that the Worker has problems and all remaining batches will fail as well. + return null; + } else + return conn; + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + String exMessage = e.getMessage(); + logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + GenericUtils.endOfLine + exMessage); + if ( exMessage.contains("Connection refused") ) { + logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!"); + throw new RuntimeException(); + } + return null; + } + } + + + public FileUtils.UploadFullTextsResponse getAndUploadFullTexts(List urlReports, int sizeOfUrlReports, long assignmentsBatchCounter, String workerId) throws RuntimeException + { + // The Controller have to request the files from the Worker, in order to upload them to the S3. + // We UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database. + + String workerIp = null; + WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); + if ( workerInfo == null ) { + logger.error("Could not retrieve the info of worker: " + workerId); + return null; + } else + workerIp = workerInfo.getWorkerIP(); // This won't be null. + + // Get the file-locations. + int numValidFullTextsFound = 0; + int numFilesFoundFromPreviousAssignmentsBatches = 0; + int numFullTextsWithProblematicLocations = 0; + + HashMultimap hashesWithPayloads = fileUtils.getHashesWithPayloads(urlReports, sizeOfUrlReports); // Holds multiple payloads for the same fileHash. + Set fileHashes = hashesWithPayloads.keySet(); + int fileHashesSetSize = fileHashes.size(); // Get the size of the keysSet, instead of the whole multimap. + if ( fileHashesSetSize == 0 ) { + logger.warn("No fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\"."); + return FileUtils.UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error. + } + + HashMap hashLocationMap = fileUtils.getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter, "assignments"); + HashMultimap allFileNamesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. + + for ( String fileHash : fileHashes ) + { + for ( Payload payload : hashesWithPayloads.get(fileHash) ) + { + String alreadyFoundFileLocation = hashLocationMap.get(fileHash); // Only one location has been retrieved per fileHash. + if ( alreadyFoundFileLocation != null ) { + // Fill the payloads with locations from the "previously-found-hashes." + payload.setLocation(alreadyFoundFileLocation); + if ( logger.isTraceEnabled() ) + logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG! + numFilesFoundFromPreviousAssignmentsBatches ++; + numValidFullTextsFound ++; // We trust the location being valid.. + } + else { // This file has not been found before.. + // Extract the "fileNameWithExtension" to be added in the HashMultimap. + String fileLocation = payload.getLocation(); + Matcher matcher = FileUtils.FILEPATH_ID_EXTENSION.matcher(fileLocation); + if ( ! matcher.matches() ) { + logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FileUtils.FILEPATH_ID_EXTENSION); + numFullTextsWithProblematicLocations ++; + continue; + } + String fileNameWithExtension = matcher.group(2); + if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) { + logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FileUtils.FILEPATH_ID_EXTENSION); + numFullTextsWithProblematicLocations ++; + continue; + } + + numValidFullTextsFound ++; + allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate. + // Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again. + } + } + } + + if ( numFullTextsWithProblematicLocations > 0 ) + logger.warn(numFullTextsWithProblematicLocations + " files had problematic names."); + + if ( numValidFullTextsFound == 0 ) { + logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId); + return FileUtils.UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either. + } + + ArrayList allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file. + int numFullTextsToBeRequested = allFileNames.size(); + if ( numFullTextsToBeRequested == 0 ) { + logger.info(numValidFullTextsFound + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before."); + return FileUtils.UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error. + } + + logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound * 100.0 / sizeOfUrlReports) + "%)."); + + // TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time.. + + logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches); + + // Request the full-texts in batches, compressed in a zstd tar file. + int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch); + int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch); + if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are. + numOfBatches++; + logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files)."); + } else + logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each)."); + + // Check if one full text is left out because of the division. Put it int the last batch. + String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; + + // TODO - The worker should send the port in which it accepts requests, along with the current request. + // TODO - The least we have to do it to expose the port-assignment somewhere more obvious like inside the "application.yml" file. + + String curAssignmentsBaseLocation = baseFilesLocation + "assignments_" + assignmentsBatchCounter + File.separator; + // Note: the "curAssignmentsBaseLocation"-directory will be created once the first batch subdirectory is called for creation. + + int failedBatches = 0; + for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) { + // TODO - Currently, for big assignments (e.g. 10000), it takes 2 mins (actually 1,5 mins after using the Zstandard compression) for the worker to zstd the files and return them FOR EACH BATCH + // Also it takes around 3 mins for the Controller to process the received files FOR EACH BATCH + // So, for 24 batches, it takes around 24 * 2 * 3 = 144 mins to process all the full-texts for each assignments-batch. + + // TODO - What if we could passing a new thread for each full-texts-batch, and make them "FIRE" one after the other. + // TODO - So, the 1st thread with the first batch starts and waits for the the first zstd file from the worker, + // Once it takes the zstd file it continues, but now the Worker is just sitting waiting.. So the 2nd thread fires and asks the new zstd. + // So, the worker will never be left waiting and the Controller will not wait for the Worker either..! + + // The worker will not have 2 parallel requests for zstd files, so the single CPU there will not be stressed to zstd many files in parallel. + // Yes the Controller may have the situation in which before finishing uploading the previously receive files to S3, it receives the new zstd from the Worker. + // TODO - BUT, we can make the new thread "WAIT" for the previous to finish. + + String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator; + Path curBatchPath; + try { // Create this batch-directory. + curBatchPath = Files.createDirectories(Paths.get(targetDirectory)); + // The base-directory will be created along with the first batch-directory. + } catch (Exception e) { + logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). + failedBatches ++; + continue; + } + + List fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numFullTextsToBeRequested, batchCounter); + String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd"; + try { + if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) { + failedBatches ++; + continue; + } + } catch (RuntimeException re) { + failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. Some initial batches may have succeeded. + break; + } + + if ( ! fileDecompressor.decompressAndUploadFullTexts(zstdFileFullPath, curBatchPath, targetDirectory, fileNamesForCurBatch, batchCounter, allFileNamesWithPayloads, assignmentsBatchCounter) ) + failedBatches ++; + } // End of batches. + + if ( failedBatches == numOfBatches ) + logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId); + + removeUnretrievedFullTextsFromUrlReports(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed). + fileUtils.deleteDirectory(new File(curAssignmentsBaseLocation)); + + // Check and warn about the number of failed payloads. + // Possible reasons: failed to check their hash in the DB, the file was not found inside the worker, whole batch failed to be delivered from the worker, files failed t be uploaded to S3 + // Retrieve the payloads from the existing urlReports. + long finalPayloadsCounter = urlReports.parallelStream() + .map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null))) + .count(); + int numInitialPayloads = (numValidFullTextsFound + numFullTextsWithProblematicLocations); + long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter); + if ( numFailedPayloads == numInitialPayloads ) { + // This will also be the case if there was no DB failure, but all the batches have failed. + logger.error("None of the " + numInitialPayloads + " payloads could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId); + return FileUtils.UploadFullTextsResponse.unsuccessful; + } else if ( numFailedPayloads > 0 ) + logger.warn(numFailedPayloads + " payloads (out of " + numInitialPayloads + ") failed to be processed for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId); + + return FileUtils.UploadFullTextsResponse.successful; + } + + + private boolean getAndSaveFullTextBatch(List fileNamesForCurBatch, String baseUrl, long assignmentsBatchCounter, int batchCounter, int numOfBatches, + String zstdFileFullPath, String workerId) throws RuntimeException + { + HttpURLConnection conn; + InputStream inputStream; + try { + if ( (conn = getConnectionForFullTextBatch(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null ) + return false; + + inputStream = conn.getInputStream(); + } catch (RuntimeException re) { + // The "cause" was logged inside "getConnection()". + throw re; + } catch (IOException ioe) { + throw new RuntimeException(ioe.getMessage()); + } + + // Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url. + return fileUtils.saveArchive(inputStream, new File(zstdFileFullPath)); + // We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future. + + //private static final RestTemplate restTemplate = new RestTemplate(); + + + //baseUrl += batchCounter + "/"; + //String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch); + + // Define a response extractor that returns an input stream + //ResponseExtractor responseExtractor = HttpInputMessage::getBody; + + /*ResponseExtractor responseExtractor = new ResponseExtractor() { + @Override + public InputStream extractData(ClientHttpResponse response) throws IOException { + return response.getBody(); + } + }; + + try { + //ResponseEntity responseEntity = restTemplate.exchange(requestUrl, HttpMethod.GET, null, Object.class); + InputStream inputStream = restTemplate.exchange(requestUrl, HttpMethod.GET, null, responseExtractor); + //InputStream inputStream = (InputStream) responseEntity.getBody(); + return saveArchive(inputStream, new File(zstdFileFullPath)); + } catch (Exception e) { + logger.error("Could not get the file from the response body!", e); + return false; + }*/ + } + + + private String getRequestUrlForBatch(String baseUrl, List fileNamesForCurBatch) + { + final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50); + sb.append(baseUrl); + int numFullTextsCurBatch = fileNamesForCurBatch.size(); + for ( int j=0; j < numFullTextsCurBatch; ++j ){ + sb.append(fileNamesForCurBatch.get(j)); + if ( j < (numFullTextsCurBatch -1) ) + sb.append(","); + } + return sb.toString(); + } + + + private List getFileNamesForBatch(List allFileNames, int numAllFullTexts, int curBatch) + { + int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch); + int endingIndex = (curBatch * numOfFullTextsPerBatch); + if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small. + endingIndex = numAllFullTexts; + + final List fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch); + for ( int i = initialIndex; i < endingIndex; ++i ) { + try { + fileNamesOfCurBatch.add(allFileNames.get(i)); + } catch (IndexOutOfBoundsException ioobe) { + logger.error("IOOBE for i=" + i + GenericUtils.endOfLine + ioobe.getMessage(), ioobe); + } + } + return fileNamesOfCurBatch; + } + + + /** + * This method updates the UrlReports to not point to any downloaded fullText files. + * This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails, and we don't want any "links" to locally stored files, which will be deleted. + * If the "shouldCheckAndKeepS3UploadedFiles" is set to "true", then the payloads which have their file uploaded to the S3-ObjectStore, are excluded. + * @param urlReports + * @param shouldCheckAndKeepS3UploadedFiles + */ + public void removeUnretrievedFullTextsFromUrlReports(List urlReports, boolean shouldCheckAndKeepS3UploadedFiles) + { + for ( UrlReport urlReport : urlReports ) { + Payload payload = urlReport.getPayload(); + if ( payload == null ) + continue; + + if ( shouldCheckAndKeepS3UploadedFiles ) { + String fileLocation = payload.getLocation(); + if ( (fileLocation == null) || s3ObjectStore.isLocationInStore(fileLocation) ) + continue; + } + + // Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text ("actual_url") will be kept, for now. + payload.setLocation(null); // This will cause the payload to not be inserted into the "payload" table in the database. Only the "attempt" record will be inserted. + payload.setHash(null); + payload.setMime_type(null); + payload.setSize(null); + // The id-url record will be called as a new assignment in the future. + } + } + +}