package eu.openaire.urls_controller.util; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.models.Payload; import eu.openaire.urls_controller.models.UrlReport; import eu.openaire.urls_controller.models.WorkerInfo; import org.apache.commons.io.FileDeleteStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Types; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @Component public class FileUtils { private static final Logger logger = LoggerFactory.getLogger(FileUtils.class); @Autowired private JdbcTemplate jdbcTemplate; @Autowired private S3ObjectStore s3ObjectStore; @Autowired private FileDecompressor fileDecompressor; public enum UploadFullTextsResponse {successful, unsuccessful, databaseError} public String baseFilesLocation; public static final String workingDir = System.getProperty("user.dir") + File.separator; private final boolean isTestEnvironment; public FileUtils (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation, @Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment) { if ( !baseFilesLocation.endsWith(File.separator) ) baseFilesLocation += File.separator; if ( !baseFilesLocation.startsWith(File.separator) ) baseFilesLocation = workingDir + baseFilesLocation; this.baseFilesLocation = baseFilesLocation; this.isTestEnvironment = isTestEnvironment; } /** * In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files.. * This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table. * Renames the clone to the original's name. * Returns the errorMsg, if an error appears, otherwise is returns "null". */ public String mergeParquetFiles(String tableName, String whereClause, Object parameter) { String errorMsg; if ( (tableName == null) || tableName.isEmpty() ) { errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!"; logger.error(errorMsg); return errorMsg; // Return the error-msg to indicate that something went wrong and pass it down to the Worker. } // Make sure the following are empty strings. whereClause = (whereClause != null) ? (whereClause + " ") : ""; if ( parameter == null ) parameter = ""; else if ( parameter instanceof String ) parameter = "'" + parameter + "'"; // This will be a "string-check", thus the single-quotes. // Else it is a "long", it will be used as is. // Create a temp-table as a copy of the initial table. try { jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + " " + whereClause + parameter); } catch (Exception e) { errorMsg = "Problem when copying the contents of \"" + tableName + "\" table to a newly created \"" + tableName + "_tmp\" table, when merging the parquet-files!\n"; logger.error(errorMsg, e); try { // Make sure we delete the possibly half-created temp-table. jdbcTemplate.execute("DROP TABLE IF EXISTS " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE"); // We cannot move on with merging, but no harm happened, since the "table_tmp" name is still reserved for future use (after it was dropped immediately).. } catch (Exception e1) { logger.error("Failed to drop the \"" + tableName + "_tmp\" table!", e1); // TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..! } return errorMsg; // We return only the initial error to the Worker, which is easily distinguished indie the "merge-queries". } // Drop the initial table. try { jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + " PURGE"); } catch (Exception e) { errorMsg = "Problem when dropping the initial \"" + tableName + "\" table, when merging the parquet-files!\n"; logger.error(errorMsg, e); // The original table could not be dropped, so the temp-table cannot be renamed to the original..! try { // Make sure we delete the already created temp-table, in order to be able to use it in the future. The merging has failed nevertheless. jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE"); } catch (Exception e1) { logger.error((errorMsg += "Failed to drop the \"" + tableName + "_tmp\" table!"), e1); // Add this error to the original, both are very important. } // Here, the original table is created. return errorMsg; // TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..! } // Rename the temp-table to have the initial-table's name. try { jdbcTemplate.execute("ALTER TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp RENAME TO " + DatabaseConnector.databaseName + "." + tableName); } catch (Exception e) { errorMsg = "Problem in renaming the \"" + tableName + "_tmp\" table to \"" + tableName + "\", when merging the parquet-files!\n"; logger.error(errorMsg, e); // At this point we only have a "temp-table", the original is already deleted.. // Try to create the original, as a copy of the temp-table. If that succeeds, then try to delete the temp-table. try { jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + " stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + "_tmp"); } catch (Exception e1) { errorMsg = "Problem when copying the contents of \"" + tableName + "_tmp\" table to a newly created \"" + tableName + "\" table, when merging the parquet-files!\n"; logger.error(errorMsg, e1); // If the original table was not created, then we have to intervene manually, if it was created but without any data, then we can safely move on handling other assignments and workerReports, but the data will be lost! So this workerReport failed to be handled. try { // The below query normally returns a list, as it takes a "regex-pattern" as an input. BUT, we give just the table name, without wildcards. So the result is either the tableName itself or none (not any other table). jdbcTemplate.queryForObject("SHOW TABLES IN " + DatabaseConnector.databaseName + " LIKE '" + tableName + "'", List.class); } catch (EmptyResultDataAccessException erdae) { // The table does not exist, so it was not even half-created by the previous query. // Not having the original table anymore is a serious error. A manual action is needed! logger.error((errorMsg += "The original table \"" + tableName + "\" must be created manually! Serious problems may appear otherwise!")); // TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and fix it immediately to avoid other errors in the Service)..! } // Here, the original-table exists in the DB, BUT without any data inside! This worker Report failed to handled! (some of its data could not be loaded to the database, and all previous data was lost). return errorMsg; } // The creation of the original table was successful. Try to delete the temp-table. try { jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE"); } catch (Exception e2) { logger.error((errorMsg += "Problem when dropping the \"" + tableName + "_tmp\" table, when merging the parquet-files!\n"), e2); // Manual deletion should be performed! return errorMsg; // Return both errors here, as the second is so important that if it did not happen then we could move on with this workerReport. // TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..! } // Here the original table exists and the temp-table is deleted. We eventually have the same state as if the "ALTER TABLE" succeeded. } // Gather information to be used for queries-optimization. try { jdbcTemplate.execute("COMPUTE STATS " + DatabaseConnector.databaseName + "." + tableName); } catch (Exception e) { logger.error("Problem when gathering information from table \"" + tableName + "\" to be used for queries-optimization.", e); // In this case the error is not so important to the whole operation.. It's only that the performance of this specific table will be less optimal, only temporarily, unless every "COMPUTE STATS" query fails for future workerReports too. } return null; // No errorMsg, everything is fine. } public static DecimalFormat df = new DecimalFormat("0.00"); // The following regex might be usefull in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension". // Possible full-filenames are: "path1/path2/ID.pdf", "ID2.pdf", "path1/path2/ID(12).pdf", "ID2(25).pdf" public static final Pattern FILENAME_ID_EXTENSION = Pattern.compile("(?:([^.()]+)/)?((([^/()]+)[^./]*)(\\.[\\w]{2,10}))$"); private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames). public static final ExecutorService hashMatchingExecutor = Executors.newFixedThreadPool(6); // TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle? public UploadFullTextsResponse getAndUploadFullTexts(List urlReports, int sizeOfUrlReports, long assignmentsBatchCounter, String workerId) throws RuntimeException { // The Controller have to request the files from the Worker, in order to upload them to the S3. // We UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database. String workerIp = null; WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); if ( workerInfo == null ) { logger.error("Could not retrieve the info of worker: " + workerId); return null; } else workerIp = workerInfo.getWorkerIP(); // This won't be null. // Get the file-locations. final AtomicInteger numFullTextsFound = new AtomicInteger(); final AtomicInteger numFilesFoundFromPreviousAssignmentsBatches = new AtomicInteger(); SetMultimap allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((sizeOfUrlReports / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload" + (isTestEnvironment ? "_aggregated" : "") + " where `hash` = ? limit 1"; final int[] hashArgType = new int[] {Types.VARCHAR}; List> callableTasks = new ArrayList<>(6); for ( UrlReport urlReport : urlReports ) { callableTasks.add(() -> { Payload payload = urlReport.getPayload(); if ( payload == null ) return null; String fileLocation = payload.getLocation(); if ( fileLocation == null ) return null; // The full-text was not retrieved, go to the next UrlReport. // Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH. // If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker. // If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker. String fileHash = payload.getHash(); if ( fileHash != null ) { String alreadyFoundFileLocation = null; try { alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, hashArgType, String.class); } catch (EmptyResultDataAccessException erdae) { // No fileLocation is found, it's ok. It will be null by default. } catch (Exception e) { logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n", e); // TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database?? // TODO - The idea is that since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error). // Unless we do what it is said above, do not continue to the next UrlReport, this query-exception should not disrupt the normal full-text processing. } if ( alreadyFoundFileLocation != null ) { // If the full-text of this record is already-found and uploaded. payload.setLocation(alreadyFoundFileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical. if ( logger.isTraceEnabled() ) logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG! numFilesFoundFromPreviousAssignmentsBatches.incrementAndGet(); numFullTextsFound.incrementAndGet(); return null; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later. } } // Extract the "fileNameWithExtension" to be added in the HashMultimap. Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileLocation); if ( ! matcher.matches() ) { logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION); return null; } String fileNameWithExtension = matcher.group(2); if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) { logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION); return null; } numFullTextsFound.incrementAndGet(); allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate. // Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again. return null; }); }// end-for DatabaseConnector.databaseLock.lock(); // The execution uses the database. try { // Invoke all the tasks and wait for them to finish before moving to the next batch. List> futures = hashMatchingExecutor.invokeAll(callableTasks); for ( Future future : futures ) { try { Void result = future.get(); // The result is always "null" as we have a "Void" type. } catch (Exception e) { logger.error("", e); } } } catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled. logger.warn("The current thread was interrupted when waiting for the worker-threads to finish checking for already-found file-hashes: " + ie.getMessage()); // This is a very rare case. At the moment, we just move on with what we have so far. } catch (Exception e) { logger.error("Unexpected error when checking for already-found file-hashes in parallel!", e); return UploadFullTextsResponse.unsuccessful; } finally { DatabaseConnector.databaseLock.unlock(); // The remaining work of this function does not use the database. } logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextsFound.get() + " (out of " + sizeOfUrlReports + " | about " + df.format(numFullTextsFound.get() * 100.0 / sizeOfUrlReports) + "%)."); logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches.get()); ArrayList allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); int numAllFullTexts = allFileNames.size(); if ( numAllFullTexts == 0 ) { logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId); return UploadFullTextsResponse.successful; // It was handled, no error. } // Request the full-texts in batches, compressed in a zstd tar file. int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch); int remainingFiles = (numAllFullTexts % numOfFullTextsPerBatch); if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are. numOfBatches++; logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files)."); } else logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each)."); // Check if one full text is left out because of the division. Put it int the last batch. String baseUrl = "http://" + workerIp + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; // TODO - The worker should send the port in which it accepts requests, along with the current request. // TODO - The least we have to do it to expose the port-assignment somewhere more obvious like inside the "application.yml" file. String curAssignmentsBaseLocation = baseFilesLocation + "assignments_" + assignmentsBatchCounter + File.separator; // Note: the "curAssignmentsBaseLocation"-directory will be created once the first batch subdirectory is called for creation. int failedBatches = 0; for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) { // TODO - Currently, for big assignments (e.g. 10000), it takes 2 mins (actually 1,5 mins after using the Zstandard compression) for the worker to zstd the files and return them FOR EACH BATCH // Also it takes around 3 mins for the Controller to process the received files FOR EACH BATCH // So, for 24 batches, it takes around 24 * 2 * 3 = 144 mins to process all the full-texts for each assignments-batch. // TODO - What if we could passing a new thread for each full-texts-batch, and make them "FIRE" one after the other. // TODO - So, the 1st thread with the first batch starts and waits for the the first zstd file from the worker, // Once it takes the zstd file it continues, but now the Worker is just sitting waiting.. So the 2nd thread fires and asks the new zstd. // So, the worker will never be left waiting and the Controller will not wait for the Worker either..! // The worker will not have 2 parallel requests for zstd files, so the single CPU there will not be stressed to zstd many files in parallel. // Yes the Controller may have the situation in which before finishing uploading the previously receive files to S3, it receives the new zstd from the Worker. // TODO - BUT, we can make the new thread "WAIT" for the previous to finish. String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator; Path curBatchPath; try { // Create this batch-directory. curBatchPath = Files.createDirectories(Paths.get(targetDirectory)); // The base-directory will be created along with the first batch-directory. } catch (Exception e) { logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). failedBatches ++; continue; } List fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter); String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd"; try { if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) { failedBatches ++; continue; } } catch (RuntimeException re) { failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. Some initial batches may have succeeded. break; } if ( ! decompressAndUploadFullTexts(zstdFileFullPath, curBatchPath, targetDirectory, fileNamesForCurBatch, batchCounter, allFileNamesWithPayloads, assignmentsBatchCounter) ) failedBatches ++; } // End of batches. updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed). deleteDirectory(new File(curAssignmentsBaseLocation)); if ( failedBatches == numOfBatches ) { logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId); return UploadFullTextsResponse.unsuccessful; } else return UploadFullTextsResponse.successful; } private boolean getAndSaveFullTextBatch(List fileNamesForCurBatch, String baseUrl, long assignmentsBatchCounter, int batchCounter, int numOfBatches, String zstdFileFullPath, String workerId) throws RuntimeException { HttpURLConnection conn; try { if ( (conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null ) return false; } catch (RuntimeException re) { // The "cause" was logged inside "getConnection()". throw re; } // Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url. File zstdFile = new File(zstdFileFullPath); if ( ! saveArchive(conn, zstdFile) ) return false; //logger.trace("The zstd file has been saved: " + zstdFileFullPath); // DEBUG! // We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future. return true; } private boolean decompressAndUploadFullTexts(String zstdFileFullPath, Path curBatchPath, String targetDirectory, List fileNamesForCurBatch, int batchCounter, SetMultimap allFileNamesWithPayloads, long assignmentsBatchCounter) { try { fileDecompressor.decompressFiles(zstdFileFullPath, curBatchPath); String[] extractedFileNames = new File(targetDirectory).list(); if ( (extractedFileNames == null) || (extractedFileNames.length <= 2) ) { // The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred.. logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory); return false; } else if ( (extractedFileNames.length - 2) != fileNamesForCurBatch.size() ) { logger.warn("The number of extracted files (" + (extractedFileNames.length - 2) + ") was not equal to the number of the current-batch's (" + batchCounter + ") files (" + fileNamesForCurBatch.size() + ")."); // We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to , // since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null. } uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads); return true; } catch (Exception e) { logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). return false; } finally { deleteDirectory(curBatchPath.toFile()); } } private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException { baseUrl += batchNum + "/"; String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch); //logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]"); try { HttpURLConnection conn = (HttpURLConnection) new URL(requestUrl).openConnection(); conn.setRequestMethod("GET"); conn.setRequestProperty("User-Agent", "UrlsController"); conn.connect(); int statusCode = conn.getResponseCode(); if ( statusCode == -1 ) { // Invalid HTTP-Response. logger.warn("Problem when getting the \"status-code\" for url: " + requestUrl); throw new RuntimeException(); // Avoid any other batches. } else if ( statusCode != 200 ) { logger.warn("HTTP-" + statusCode + ": " + getMessageFromResponseBody(conn, true) + "\n\nProblem when requesting the ZstdFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl); if ( (statusCode >= 500) && (statusCode <= 599) ) throw new RuntimeException(); // Throw an exception to indicate that the Worker has problems and all remaining batches will fail as well. return null; } else return conn; } catch (RuntimeException re) { throw re; } catch (Exception e) { String exMessage = e.getMessage(); logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + exMessage); if ( exMessage.contains("Connection refused") ) { logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!"); throw new RuntimeException(); } return null; } } private void uploadFullTexts(String[] fileNames, String targetDirectory, SetMultimap allFileNamesWithPayloads) { // Iterate over the files and upload them to S3. //int numUploadedFiles = 0; for( String fileName : fileNames ) { if ( fileName.contains(".tar") ) // Exclude the tar-files from uploading (".tar" and ".tar.zstd"). continue; // Check if this stored file is related to one or more Payloads from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedPayloads". Set fileRelatedPayloads = allFileNamesWithPayloads.get(fileName); if ( fileRelatedPayloads.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithPayloads" HashMultimap. logger.error("The stored file \"" + fileName + "\" is not related to any Payload returned from the Worker!"); continue; } // Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end). try { // Prepare the filename as: "datasourceid/publicationid::hash.pdf" // All related payloads point to this exact same file, BUT, may be related with different urlIDs, which in turn be related with different datasourceIDs. // This file could have been found from different urlIds and thus be related to multiple datasourceIds. // BUT, since the filename contains a specific urlID, the datasourceId should be the one related to that specific urlID. // So, we extract this urlID, search the payload inside the "fileRelatedPayloads" and get the related datasourceID (instead of taking the first or a random datasourceID). Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileName); if ( !matcher.matches() ) { logger.error("Failed to match the \"" + fileName + "\" with the regex: " + FILENAME_ID_EXTENSION); continue; } // The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used. String fileNameID = matcher.group(4); if ( (fileNameID == null) || fileNameID.isEmpty() ) { logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\"."); continue; } String dotFileExtension = matcher.group(5); if ( (dotFileExtension == null) || dotFileExtension.isEmpty() ) { logger.error("Failed to extract the \"dotFileExtension\" from \"" + fileName + "\"."); continue; } // This file is related with some payloads, in a sense that these payloads have urls which lead to the same full-text url. // These payloads might have different IDs and sourceUrls. But, in the end, the different sourceUrls give the same full-text. // Below, we make sure we pick the "datasource" from the payload, which has the same id as the full-text's name. // If there are multiple payloads with the same id, which point to the same file, then we can take whatever datasource we want from those payloads. // It is possible that payloads with same IDs, but different sourceUrls pointing to the same full-text, can be related with different datasources // (especially for IDs of type: "doiboost_____::XXXXXXXXXXXXXXXXXXXXX"). // It does not really matter, since the first-ever payload to give this full-text could very well be another one, // since the crawling happens in multiple threads which compete with each other for CPU time. String datasourceId = null; String hash = null; boolean isFound = false; for ( Payload payload : fileRelatedPayloads ) { if ( fileNameID.equals(payload.getId()) ) { datasourceId = payload.getDatasourceId(); hash = payload.getHash(); isFound = true; break; } } if ( !isFound ) { // This should never normally happen. If it does, then a very bad change will have taken place. logger.error("The \"fileNameID\" (" + fileNameID + ") was not found inside the \"fileRelatedPayloads\" for fileName: " + fileName); continue; } String s3Url = constructFileNameAndUploadToS3(targetDirectory, fileName, fileNameID, dotFileExtension, datasourceId, hash); if ( s3Url == null ) continue; setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url); //numUploadedFiles ++; } catch (Exception e) { logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore!", e); } // Else, the record will have its file-data set to "null", in the end of this method. } //logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -2) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore."); // (fileNames.length -2) --> minus the zstd and the tar files. } public String constructFileNameAndUploadToS3(String fileDir, String fileName, String openAireID, String dotFileExtension, String datasourceId, String hash) throws Exception { if ( datasourceId == null ) { logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName); return null; } if ( hash == null ) { logger.error("The retrieved \"hash\" was \"null\" for file: " + fileName); return null; } String fileFullPath = fileDir + File.separator + fileName; // The fullPath to the local file. // Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName). // Now we append the file-hash, so it is guaranteed that the filename will be unique. fileName = datasourceId + "/" + openAireID + "::" + hash + dotFileExtension; // This is the fileName to be used in the objectStore, not of the local file! return s3ObjectStore.uploadToS3(fileName, fileFullPath); } public String getMessageFromResponseBody(HttpURLConnection conn, boolean isError) { final StringBuilder msgStrB = new StringBuilder(500); try ( BufferedReader br = new BufferedReader(new InputStreamReader((isError ? conn.getErrorStream() : conn.getInputStream()))) ) { // Try-with-resources String inputLine; while ( (inputLine = br.readLine()) != null ) { if ( !inputLine.isEmpty() ) msgStrB.append(inputLine); } return (msgStrB.length() != 0) ? msgStrB.toString() : null; // Make sure we return a "null" on empty string, to better handle the case in the caller-function. } catch ( IOException ioe ) { logger.error("IOException when retrieving the response-body: " + ioe.getMessage()); return null; } catch ( Exception e ) { // This includes the case, where the "conn.getErrorStream()" returns < null >. logger.error("Could not extract the response-body!", e); return null; } } private List getFileNamesForBatch(List allFileNames, int numAllFullTexts, int curBatch) { int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch); int endingIndex = (curBatch * numOfFullTextsPerBatch); if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small. endingIndex = numAllFullTexts; List fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch); for ( int i = initialIndex; i < endingIndex; ++i ) { try { fileNamesOfCurBatch.add(allFileNames.get(i)); } catch (IndexOutOfBoundsException ioobe) { logger.error("IOOBE for i=" + i + "\n" + ioobe.getMessage(), ioobe); } } return fileNamesOfCurBatch; } private String getRequestUrlForBatch(String baseUrl, List fileNamesForCurBatch) { final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50); sb.append(baseUrl); int numFullTextsCurBatch = fileNamesForCurBatch.size(); for ( int j=0; j < numFullTextsCurBatch; ++j ){ sb.append(fileNamesForCurBatch.get(j)); if ( j < (numFullTextsCurBatch -1) ) sb.append(","); } return sb.toString(); } public static final int tenMb = (10 * 1_048_576); public boolean saveArchive(HttpURLConnection conn, File zstdFile) { try ( BufferedInputStream inStream = new BufferedInputStream(conn.getInputStream(), tenMb); BufferedOutputStream outStream = new BufferedOutputStream(Files.newOutputStream(zstdFile.toPath()), tenMb) ) { int readBytes; while ( (readBytes = inStream.read()) != -1 ) { outStream.write(readBytes); } return true; } catch (Exception e) { logger.error("Could not save the zstd file \"" + zstdFile.getName() + "\": " + e.getMessage(), e); return false; } } /** * This method updates the UrlReports to not point to any downloaded fullText files. * This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails, and we don't want any "links" to locally stored files, which will be deleted. * If the "shouldCheckAndKeepS3UploadedFiles" is set to "true", then the payloads which have their file uploaded to the S3-ObjectStore, are excluded. * @param urlReports * @param shouldCheckAndKeepS3UploadedFiles */ public void updateUrlReportsToHaveNoFullTextFiles(List urlReports, boolean shouldCheckAndKeepS3UploadedFiles) { for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); if ( payload == null ) continue; if ( shouldCheckAndKeepS3UploadedFiles ) { String fileLocation = payload.getLocation(); if ( (fileLocation == null) || s3ObjectStore.isLocationInStore(fileLocation) ) continue; } // Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text ("actual_url") will be kept, for now. payload.setLocation(null); // This will cause the payload to not be inserted into the "payload" table in the database. Only the "attempt" record will be inserted. payload.setHash(null); payload.setMime_type(null); payload.setSize(null); // The id-url record will be called as a new assignment in the future. } } /** * Set the fileLocation for all those Payloads related to the File. * @param filePayloads * @param s3Url */ public void setFullTextForMultiplePayloads(Set filePayloads, String s3Url) { for ( Payload payload : filePayloads ) if ( payload != null ) payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker. } public boolean deleteDirectory(File directory) { try { org.apache.commons.io.FileUtils.deleteDirectory(directory); return true; // Will return "true" also in case this directory does not exist. So, no Exception will be thrown for that case. } catch (IOException e) { logger.error("The following directory could not be deleted: " + directory.getName(), e); return false; } catch (IllegalArgumentException iae) { logger.error("This batch-dir does not exist: " + directory.getName()); return false; } } public boolean deleteFile(String fileFullPathString) { try { FileDeleteStrategy.FORCE.delete(new File(fileFullPathString)); } catch (IOException e) { logger.error("Error when deleting the file: " + fileFullPathString); return false; } return true; } private static final Lock fileWriteLock = new ReentrantLock(true); public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads) { if ( shouldLockThreads ) // In case multiple threads write to the same file. for ex. during the bulk-import procedure. fileWriteLock.lock(); try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FileUtils.tenMb) ) { bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter. } catch (Exception e) { String errorMsg = "Failed to create or acquire the file \"" + fileFullPath + "\"!"; logger.error(errorMsg, e); return errorMsg; } finally { if ( shouldLockThreads ) fileWriteLock.unlock(); } return null; } }