UrlsController/src/main/java/eu/openaire/urls_controller/util/FileUtils.java

850 lines
50 KiB
Java

package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import org.apache.commons.io.FileDeleteStrategy;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.io.*;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.text.DecimalFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Component
public class FileUtils {
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private S3ObjectStore s3ObjectStore;
@Autowired
private FileDecompressor fileDecompressor;
@Value("${services.pdfaggregation.worker.port}")
private String workerPort;
public enum UploadFullTextsResponse {successful, successful_without_fulltexts, unsuccessful}
public String baseFilesLocation;
public static final String workingDir = System.getProperty("user.dir") + File.separator;
private final boolean isTestEnvironment;
public FileUtils (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation, @Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment) {
if ( !baseFilesLocation.endsWith(File.separator) )
baseFilesLocation += File.separator;
if ( !baseFilesLocation.startsWith(File.separator) )
baseFilesLocation = workingDir + baseFilesLocation;
this.baseFilesLocation = baseFilesLocation;
this.isTestEnvironment = isTestEnvironment;
}
public static final DecimalFormat df = new DecimalFormat("0.00");
// The following regex might be useful in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension".
// Possible full-filenames are: "path1/path2/ID.pdf", "ID2.pdf", "path1/path2/ID(12).pdf", "ID2(25).pdf"
public static final Pattern FILEPATH_ID_EXTENSION = Pattern.compile("([^.()]+/)?((([^/()]+)[^./]*)(\\.[\\w]{2,10}))$");
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public static final ExecutorService hashMatchingExecutor = Executors.newFixedThreadPool(6);
// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
public UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, int sizeOfUrlReports, long assignmentsBatchCounter, String workerId) throws RuntimeException
{
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
String workerIp = null;
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) {
logger.error("Could not retrieve the info of worker: " + workerId);
return null;
} else
workerIp = workerInfo.getWorkerIP(); // This won't be null.
// Get the file-locations.
int numValidFullTextsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
int numFullTextsWithProblematicLocations = 0;
HashMultimap<String, Payload> hashesWithPayloads = getHashesWithPayloads(urlReports, sizeOfUrlReports); // Holds multiple payloads for the same fileHash.
Set<String> fileHashes = hashesWithPayloads.keySet();
int fileHashesSetSize = fileHashes.size(); // Get the size of the keysSet, instead of the whole multimap.
if ( fileHashesSetSize == 0 ) {
logger.warn("No fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\".");
return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
HashMap<String, String> hashLocationMap = getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter, "assignments");
HashMultimap<String, Payload> allFileNamesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( String fileHash : fileHashes )
{
for ( Payload payload : hashesWithPayloads.get(fileHash) )
{
String alreadyFoundFileLocation = hashLocationMap.get(fileHash); // Only one location has been retrieved per fileHash.
if ( alreadyFoundFileLocation != null ) {
// Fill the payloads with locations from the "previously-found-hashes."
payload.setLocation(alreadyFoundFileLocation);
if ( logger.isTraceEnabled() )
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
numValidFullTextsFound ++; // We trust the location being valid..
}
else { // This file has not been found before..
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
String fileLocation = payload.getLocation();
Matcher matcher = FILEPATH_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILEPATH_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILEPATH_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
numValidFullTextsFound ++;
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}
}
}
if ( numFullTextsWithProblematicLocations > 0 )
logger.warn(numFullTextsWithProblematicLocations + " files had problematic names.");
if ( numValidFullTextsFound == 0 ) {
logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either.
}
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file.
int numFullTextsToBeRequested = allFileNames.size();
if ( numFullTextsToBeRequested == 0 ) {
logger.info(numValidFullTextsFound + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before.");
return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound * 100.0 / sizeOfUrlReports) + "%).");
// TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time..
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
// Request the full-texts in batches, compressed in a zstd tar file.
int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch);
int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch);
if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
} else
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
// TODO - The worker should send the port in which it accepts requests, along with the current request.
// TODO - The least we have to do it to expose the port-assignment somewhere more obvious like inside the "application.yml" file.
String curAssignmentsBaseLocation = baseFilesLocation + "assignments_" + assignmentsBatchCounter + File.separator;
// Note: the "curAssignmentsBaseLocation"-directory will be created once the first batch subdirectory is called for creation.
int failedBatches = 0;
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) {
// TODO - Currently, for big assignments (e.g. 10000), it takes 2 mins (actually 1,5 mins after using the Zstandard compression) for the worker to zstd the files and return them FOR EACH BATCH
// Also it takes around 3 mins for the Controller to process the received files FOR EACH BATCH
// So, for 24 batches, it takes around 24 * 2 * 3 = 144 mins to process all the full-texts for each assignments-batch.
// TODO - What if we could passing a new thread for each full-texts-batch, and make them "FIRE" one after the other.
// TODO - So, the 1st thread with the first batch starts and waits for the the first zstd file from the worker,
// Once it takes the zstd file it continues, but now the Worker is just sitting waiting.. So the 2nd thread fires and asks the new zstd.
// So, the worker will never be left waiting and the Controller will not wait for the Worker either..!
// The worker will not have 2 parallel requests for zstd files, so the single CPU there will not be stressed to zstd many files in parallel.
// Yes the Controller may have the situation in which before finishing uploading the previously receive files to S3, it receives the new zstd from the Worker.
// TODO - BUT, we can make the new thread "WAIT" for the previous to finish.
String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator;
Path curBatchPath;
try { // Create this batch-directory.
curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// The base-directory will be created along with the first batch-directory.
} catch (Exception e) {
logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
failedBatches ++;
continue;
}
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numFullTextsToBeRequested, batchCounter);
String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd";
try {
if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) {
failedBatches ++;
continue;
}
} catch (RuntimeException re) {
failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. Some initial batches may have succeeded.
break;
}
if ( ! decompressAndUploadFullTexts(zstdFileFullPath, curBatchPath, targetDirectory, fileNamesForCurBatch, batchCounter, allFileNamesWithPayloads, assignmentsBatchCounter) )
failedBatches ++;
} // End of batches.
if ( failedBatches == numOfBatches )
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
removeUnretrievedFullTextsFromUrlReports(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed).
deleteDirectory(new File(curAssignmentsBaseLocation));
// Check and warn about the number of failed payloads.
// Possible reasons: failed to check their hash in the DB, the file was not found inside the worker, whole batch failed to be delivered from the worker, files failed t be uploaded to S3
// Retrieve the payloads from the existing urlReports.
long finalPayloadsCounter = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.count();
int numInitialPayloads = (numValidFullTextsFound + numFullTextsWithProblematicLocations);
long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter);
if ( numFailedPayloads == numInitialPayloads ) {
// This will also be the case if there was no DB failure, but all the batches have failed.
logger.error("None of the " + numInitialPayloads + " payloads could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return UploadFullTextsResponse.unsuccessful;
} else if ( numFailedPayloads > 0 )
logger.warn(numFailedPayloads + " payloads (out of " + numInitialPayloads + ") failed to be processed for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return UploadFullTextsResponse.successful;
}
public HashMultimap<String, Payload> getHashesWithPayloads(List<UrlReport> urlReports, int sizeOfUrlReports)
{
HashMultimap<String, Payload> hashesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple payloads for the same fileHash.
// The "Hash" part of the multimap helps with avoiding duplicate fileHashes.
for ( UrlReport urlReport : urlReports )
{
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
String fileLocation = payload.getLocation();
if ( fileLocation == null )
continue; // The full-text was not retrieved for this UrlReport.
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
String fileHash = payload.getHash();
if ( fileHash != null )
{
hashesWithPayloads.put(fileHash, payload); // Hold multiple payloads per fileHash.
// There are 2 cases, which contribute to that:
// 1) Different publication-IDs end up giving the same full-text-url, resulting in the same file. Those duplicates are not saved, but instead, the location, hash and size of the file is copied to the other payload.
// 2) Different publication-IDs end up giving different full-text-urls which point to the same file. Although very rare, in this case, the file is downloaded again by the Worker and has a different name.
// In either case, the duplicate file will not be transferred to the Controller, but in the 2nd one it takes up extra space, at least for some time.
// TODO - Implement a fileHash-check algorithm in the Worker's side ("PublicationsRetriever"), to avoid keeping those files in storage.
} else // This should never happen..
logger.error("Payload: " + payload + " has a null fileHash!");
}// end-for
return hashesWithPayloads;
}
public HashMap<String, String> getHashLocationMap(Set<String> fileHashes, int fileHashesSetSize, long batchCounter, String groupType)
{
// Prepare the "fileHashListString" to be used inside the "getHashLocationsQuery". Create the following string-pattern:
// ("HASH_1", "HASH_2", ...)
int stringBuilderCapacity = ((fileHashesSetSize * 32) + (fileHashesSetSize -1) +2);
String getHashLocationsQuery = "select distinct `hash`, `location` from " + DatabaseConnector.databaseName + ".payload where `hash` in "
+ getQueryListString(new ArrayList<>(fileHashes), fileHashesSetSize, stringBuilderCapacity);
HashMap<String, String> hashLocationMap = new HashMap<>(fileHashesSetSize/2); // No multimap is needed since only one location is returned for each fileHash.
DatabaseConnector.databaseLock.lock(); // The execution uses the database.
try {
jdbcTemplate.query(getHashLocationsQuery, rs -> {
try { // For each of the 4 columns returned, do the following. The column-indexing starts from 1.
hashLocationMap.put(rs.getString(1), rs.getString(2));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
}
});
} catch (EmptyResultDataAccessException erdae) {
logger.warn("No previously-found hash-locations where found for " + groupType + "_" + batchCounter);
} catch (Exception e) {
logger.error("Unexpected error when checking for already-found file-hashes, for " + groupType + "_" + batchCounter, e);
// We will continue with storing the files, we do not want to lose them.
} finally {
DatabaseConnector.databaseLock.unlock();
}
return hashLocationMap;
}
private boolean getAndSaveFullTextBatch(List<String> fileNamesForCurBatch, String baseUrl, long assignmentsBatchCounter, int batchCounter, int numOfBatches,
String zstdFileFullPath, String workerId) throws RuntimeException
{
HttpURLConnection conn;
try {
if ( (conn = getConnectionForFullTextBatch(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null )
return false;
} catch (RuntimeException re) {
// The "cause" was logged inside "getConnection()".
throw re;
}
// Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url.
return saveArchive(conn, new File(zstdFileFullPath));
// We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future.
}
private boolean decompressAndUploadFullTexts(String zstdFileFullPath, Path curBatchPath, String targetDirectory, List<String> fileNamesForCurBatch, int batchCounter,
SetMultimap<String, Payload> allFileNamesWithPayloads, long assignmentsBatchCounter)
{
try {
fileDecompressor.decompressFiles(zstdFileFullPath, curBatchPath);
String[] extractedFileNames = new File(targetDirectory).list();
if ( (extractedFileNames == null) || (extractedFileNames.length <= 2) ) { // The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred..
logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory);
return false;
} else if ( (extractedFileNames.length - 2) != fileNamesForCurBatch.size() ) {
logger.warn("The number of extracted files (" + (extractedFileNames.length - 2) + ") was not equal to the number of files (" + fileNamesForCurBatch.size() + ") of the current batch_" + batchCounter);
// We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to <null>,
// since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null.
}
uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads, batchCounter);
return true;
} catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
return false;
} finally {
deleteDirectory(curBatchPath.toFile());
}
}
private HttpURLConnection getConnectionForFullTextBatch(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException
{
baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
//logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
try {
HttpURLConnection conn = (HttpURLConnection) new URL(requestUrl).openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("User-Agent", "UrlsController");
// TODO - Write the fileNames in the RequestBody, so that we can include as many as we want in each request.
// Right now, we can include only up to 70-80 fileNames in the url-string.
// TODO - We need to add the fileNames in the requestBody BEFORE we connect. So we will need to refactor the code to work in that order.
/*OutputStream os = conn.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8);
osw.write(fileNamesForCurBatch_separated_by_comma);
osw.flush();
osw.close();
os.close();*/
// TODO - The above update will also enable is to improve edge-case management like making sure we do not create a whole new batch for just a few files..
// Check this case for example, where we have one extra batch with the network, compression-decompression, transfer, uploading ect overhead:
// 2023-02-06 12:17:26.893 [http-nio-1880-exec-8] DEBUG e.o.urls_controller.util.FileUtils.getAndUploadFullTexts(@235) - The assignments_12 have 211 distinct non-already-uploaded fullTexts.
// Going to request them from the Worker "worker_X", in 4 batches (70 files each, except for the final batch, which will have 1 files).
// If we are not limited by the url-length we can easily say that if less than 10 files remain for the last batch, then add them to the previous batch (eg. the last batch will have 79 files)
// If equal to 10 or more files remain, then we will make an extra batch.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode == -1 ) { // Invalid HTTP-Response.
logger.warn("Problem when getting the \"status-code\" for url: " + requestUrl);
throw new RuntimeException(); // Avoid any other batches.
} else if ( statusCode != 200 ) {
String errMsg = getMessageFromResponseBody(conn, true);
logger.warn("HTTP-" + statusCode + ": " + errMsg + "\n\nProblem when requesting the ZstdFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl);
if ( ((statusCode >= 500) && (statusCode <= 599))
|| ((statusCode == 400) && ((errMsg != null) && errMsg.contains("The base directory for assignments_" + assignmentsBatchCounter + " was not found"))) )
throw new RuntimeException(); // Throw an exception to indicate that the Worker has problems and all remaining batches will fail as well.
return null;
} else
return conn;
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
String exMessage = e.getMessage();
logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + GenericUtils.endOfLine + exMessage);
if ( exMessage.contains("Connection refused") ) {
logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!");
throw new RuntimeException();
}
return null;
}
}
private void uploadFullTexts(String[] fileNames, String targetDirectory, SetMultimap<String, Payload> allFileNamesWithPayloads, int batchCounter)
{
// Iterate over the files and upload them to S3.
//int numUploadedFiles = 0;
for ( String fileName : fileNames )
{
if ( fileName.contains(".tar") ) // Exclude the tar-files from uploading (".tar" and ".tar.zstd").
continue;
// Check if this stored file is related to one or more Payloads from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedPayloads".
Set<Payload> fileRelatedPayloads = allFileNamesWithPayloads.get(fileName);
if ( fileRelatedPayloads.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithPayloads" HashMultimap.
logger.error("The stored file \"" + fileName + "\" is not related to any Payload returned from the Worker!");
continue;
}
// Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end).
// Prepare the filename as: "datasourceid/publicationid::hash.pdf"
// All related payloads point to this exact same file, BUT, may be related with different urlIDs, which in turn be related with different datasourceIDs.
// This file could have been found from different urlIds and thus be related to multiple datasourceIds.
// BUT, since the filename contains a specific urlID, the datasourceId should be the one related to that specific urlID.
// So, we extract this urlID, search the payload inside the "fileRelatedPayloads" and get the related datasourceID (instead of taking the first or a random datasourceID).
Matcher matcher = FILEPATH_ID_EXTENSION.matcher(fileName);
if ( !matcher.matches() ) {
logger.error("Failed to match the \"" + fileName + "\" with the regex: " + FILEPATH_ID_EXTENSION);
continue;
}
// The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used.
// Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName).
String fileNameID = matcher.group(4); // The "fileNameID" is the OpenAIRE_ID for this file.
if ( (fileNameID == null) || fileNameID.isEmpty() ) {
logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\".");
continue;
}
String dotFileExtension = matcher.group(5);
if ( (dotFileExtension == null) || dotFileExtension.isEmpty() ) {
logger.error("Failed to extract the \"dotFileExtension\" from \"" + fileName + "\".");
continue;
}
// This file is related with some payloads, in a sense that these payloads have urls which lead to the same full-text url.
// These payloads might have different IDs and sourceUrls. But, in the end, the different sourceUrls give the same full-text.
// Below, we make sure we pick the "datasource" from the payload, which has the same id as the full-text's name.
// If there are multiple payloads with the same id, which point to the same file, then we can take whatever datasource we want from those payloads.
// It is possible that payloads with same IDs, but different sourceUrls pointing to the same full-text, can be related with different datasources
// (especially for IDs of type: "doiboost_____::XXXXXXXXXXXXXXXXXXXXX").
// It does not really matter, since the first-ever payload to give this full-text could very well be another one,
// since the crawling happens in multiple threads which compete with each other for CPU time.
String datasourceId = null;
String hash = null;
boolean isFound = false;
for ( Payload payload : fileRelatedPayloads ) {
if ( fileNameID.equals(payload.getId()) ) {
datasourceId = payload.getDatasourceId();
hash = payload.getHash();
isFound = true;
break;
}
}
if ( !isFound ) { // This should never normally happen. If it does, then a very bad change will have taken place.
logger.error("The \"fileNameID\" (" + fileNameID + ") was not found inside the \"fileRelatedPayloads\" for fileName: " + fileName);
continue;
}
try {
String s3Url = constructS3FilenameAndUploadToS3(targetDirectory, fileName, fileNameID, dotFileExtension, datasourceId, hash);
if ( s3Url != null ) {
setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url);
//numUploadedFiles ++;
}
} catch (Exception e) {
logger.error("Avoid uploading the rest of the files of batch_" + batchCounter + " | " + e.getMessage());
break;
}
// Else, the record will have its file-data set to "null", in the end of the caller method (as it will not have an s3Url as its location).
}//end filenames-for-loop
//logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -2) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
// (fileNames.length -2) --> minus the zstd and the tar files.
}
public String constructS3FilenameAndUploadToS3(String targetDirectory, String fileName, String openAireId,
String dotFileExtension, String datasourceId, String hash) throws ConnectException, UnknownHostException
{
String filenameForS3 = constructS3FileName(fileName, openAireId, dotFileExtension, datasourceId, hash); // This name is for the uploaded file, in the S3 Object Store.
if ( filenameForS3 == null ) // The error is logged inside.
return null;
String fileFullPath = targetDirectory + fileName; // The fullPath to the local file (which has the previous name).
String s3Url = null;
try {
s3Url = s3ObjectStore.uploadToS3(filenameForS3, fileFullPath);
} catch (ConnectException ce) {
logger.error("Could not connect with the S3 Object Store! " + ce.getMessage());
throw ce;
} catch (UnknownHostException uhe) {
logger.error("The S3 Object Store could not be found! " + uhe.getMessage());
throw uhe;
} catch (Exception e) {
logger.error("Could not upload the local-file \"" + fileFullPath + "\" to the S3 ObjectStore, with S3-filename: \"" + filenameForS3 + "\"!", e);
return null;
}
return s3Url;
}
public String constructS3FileName(String fileName, String openAireID, String dotFileExtension, String datasourceId, String hash)
{
if ( datasourceId == null ) {
logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName);
return null;
}
if ( hash == null ) {
logger.error("The retrieved \"hash\" was \"null\" for file: " + fileName);
return null;
}
// Now we append the file-hash, so it is guaranteed that the filename will be unique.
return datasourceId + "/" + openAireID + "::" + hash + dotFileExtension; // This is the fileName to be used in the objectStore, not of the local file!
}
public String getMessageFromResponseBody(HttpURLConnection conn, boolean isError)
{
final StringBuilder msgStrB = new StringBuilder(500);
try ( BufferedReader br = new BufferedReader(new InputStreamReader((isError ? conn.getErrorStream() : conn.getInputStream()), StandardCharsets.UTF_8)) ) {
String inputLine;
while ( (inputLine = br.readLine()) != null )
{
if ( !inputLine.isEmpty() )
msgStrB.append(inputLine);
}
return (msgStrB.length() != 0) ? msgStrB.toString() : null; // Make sure we return a "null" on empty string, to better handle the case in the caller-function.
} catch ( IOException ioe ) {
logger.error("IOException when retrieving the response-body: " + ioe.getMessage());
return null;
} catch ( Exception e ) { // This includes the case, where the "conn.getErrorStream()" returns < null >.
logger.error("Could not extract the response-body!", e);
return null;
}
}
private List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch)
{
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
int endingIndex = (curBatch * numOfFullTextsPerBatch);
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
endingIndex = numAllFullTexts;
final List<String> fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch);
for ( int i = initialIndex; i < endingIndex; ++i ) {
try {
fileNamesOfCurBatch.add(allFileNames.get(i));
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for i=" + i + GenericUtils.endOfLine + ioobe.getMessage(), ioobe);
}
}
return fileNamesOfCurBatch;
}
private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch)
{
final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
sb.append(baseUrl);
int numFullTextsCurBatch = fileNamesForCurBatch.size();
for ( int j=0; j < numFullTextsCurBatch; ++j ){
sb.append(fileNamesForCurBatch.get(j));
if ( j < (numFullTextsCurBatch -1) )
sb.append(",");
}
return sb.toString();
}
public static final int twentyFiveKb = 25_600; // 25 Kb
public static final int halfMb = 524_288; // 0.5 Mb = 512 Kb = 524_288 bytes
public static final int tenMb = (10 * 1_048_576);
public boolean saveArchive(HttpURLConnection conn, File zstdFile)
{
try ( BufferedInputStream inStream = new BufferedInputStream(conn.getInputStream(), tenMb);
BufferedOutputStream outStream = new BufferedOutputStream(Files.newOutputStream(zstdFile.toPath()), tenMb) )
{
int readBytes;
while ( (readBytes = inStream.read()) != -1 ) {
outStream.write(readBytes);
}
return true;
} catch (Exception e) {
logger.error("Could not save the zstd file \"" + zstdFile.getName() + "\": " + e.getMessage(), e);
return false;
}
}
/**
* This method updates the UrlReports to not point to any downloaded fullText files.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails, and we don't want any "links" to locally stored files, which will be deleted.
* If the "shouldCheckAndKeepS3UploadedFiles" is set to "true", then the payloads which have their file uploaded to the S3-ObjectStore, are excluded.
* @param urlReports
* @param shouldCheckAndKeepS3UploadedFiles
*/
public void removeUnretrievedFullTextsFromUrlReports(List<UrlReport> urlReports, boolean shouldCheckAndKeepS3UploadedFiles)
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
if ( shouldCheckAndKeepS3UploadedFiles ) {
String fileLocation = payload.getLocation();
if ( (fileLocation == null) || s3ObjectStore.isLocationInStore(fileLocation) )
continue;
}
// Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text ("actual_url") will be kept, for now.
payload.setLocation(null); // This will cause the payload to not be inserted into the "payload" table in the database. Only the "attempt" record will be inserted.
payload.setHash(null);
payload.setMime_type(null);
payload.setSize(null);
// The id-url record will be called as a new assignment in the future.
}
}
/**
* This method searches the backlog of publications for the ones that have the same "original_url" or their "original_url" is equal to the "actual_url" or an existing payload.
* Then, it generates a new "UrlReport" object, which has as a payload a previously generated one, which has different "id", "original_url".
* Then, the program automatically generates "attempt" and "payload" records for these additional UrlReport-records.
* It must be executed inside the same "database-locked" block of code, along with the inserts of the attempt and payload records.
* */
public boolean addUrlReportsByMatchingRecordsFromBacklog(List<UrlReport> urlReports, List<Payload> initialPayloads, int numInitialPayloads, long assignmentsBatchCounter)
{
logger.debug("numInitialPayloads: " + numInitialPayloads + " | assignmentsBatchCounter: " + assignmentsBatchCounter);
// Create a HashMultimap, containing the "original_url" or "actual_url" as the key and the related "payload" objects as its values.
final HashMultimap<String, Payload> urlToPayloadsMultimap = HashMultimap.create((numInitialPayloads / 3), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( Payload payload : initialPayloads ) {
String original_url = payload.getOriginal_url();
String actual_url = payload.getActual_url();
// Link this payload with both the original and actual urls (in case they are different).
urlToPayloadsMultimap.put(original_url, payload);
if ( ! actual_url.equals(original_url) )
urlToPayloadsMultimap.put(actual_url, payload);
}
// A url may be related to different payloads, in the urlReports. For example, in one payload the url was the original_url
// but in another payload the url was only the actual-url (the original was different)
// Gather the original and actual urls of the current payloads and add them in a list usable by the query.
List<String> urlsToRetrieveRelatedIDs = initialPayloads.parallelStream()
.flatMap(payload -> Stream.of(payload.getOriginal_url(), payload.getActual_url())) // Add both "original_url" and "actual_url" in the final results.
.collect(Collectors.toList());
// Prepare the "urlsToRetrieveRelatedIDs" to be used inside the "getDataForPayloadPrefillQuery". Create the following string-pattern: ("URL_1", "URL_2", ...)
int urlsToRetrieveRelatedIDsSize = urlsToRetrieveRelatedIDs.size();
int stringBuilderCapacity = (urlsToRetrieveRelatedIDsSize * 100);
// Get the id and url of any
String getDataForPayloadPrefillQuery = "select distinct pu.id, pu.url\n" +
"from " + DatabaseConnector.databaseName + ".publication_urls pu\n" +
// Exclude the "already-processed" pairs.
"left anti join " + DatabaseConnector.databaseName + ".attempt a on a.id=pu.id and a.original_url=pu.url\n" +
"left anti join " + DatabaseConnector.databaseName + ".payload p on p.id=pu.id and p.original_url=pu.url\n" +
"left anti join " + DatabaseConnector.databaseName + ".assignment asgn on asgn.id=pu.id and asgn.original_url=pu.url\n" +
// Limit the urls to the ones matching to the payload-urls found for the current assignments.
"where pu.url in " + getQueryListString(urlsToRetrieveRelatedIDs, urlsToRetrieveRelatedIDsSize, stringBuilderCapacity);
//logger.trace("getDataForPayloadPrefillQuery:\n" + getDataForPayloadPrefillQuery);
final List<Payload> prefilledPayloads = new ArrayList<>(1000);
try {
jdbcTemplate.query(getDataForPayloadPrefillQuery, rs -> {
String id;
String original_url;
try { // For each of the 2 columns returned, do the following. The column-indexing starts from 1.
id = rs.getString(1);
original_url = rs.getString(2);
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
return; // Move to the next payload.
}
Set<Payload> foundPayloads = urlToPayloadsMultimap.get(original_url);
if ( foundPayloads == null ) {
logger.error("No payloads associated with \"original_url\" = \"" + original_url + "\"!");
return;
}
// Select a random "foundPayload" to use its data to fill the "prefilledPayload" (in a "Set" the first element is pseudo-random).
Optional<Payload> optPayload = foundPayloads.stream().findFirst();
if ( !optPayload.isPresent() ) {
logger.error("Could not retrieve any payload for the \"original_url\": " + original_url);
return; // Move to the next payload.
}
Payload prefilledPayload = optPayload.get().clone(); // We take a clone of the original payload and change just the id and the original_url.
prefilledPayload.setId(id);
prefilledPayload.setOriginal_url(original_url);
prefilledPayloads.add(prefilledPayload);
});
} catch (EmptyResultDataAccessException erdae) {
logger.error("No results retrieved from the \"getDataForPayloadPrefillQuery\", when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + ".");
return false;
} catch (Exception e) {
DatabaseConnector.handleQueryException("getDataForPayloadPrefillQuery", getDataForPayloadPrefillQuery, e);
return false;
}
int numPrefilledPayloads = prefilledPayloads.size();
if ( numPrefilledPayloads == 0 ) {
logger.error("Some results were retrieved from the \"getDataForPayloadPrefillQuery\", but no data could be extracted from them, when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + ".");
return false;
}
logger.debug("numPrefilledPayloads: " + numPrefilledPayloads + " | assignmentsBatchCounter: " + assignmentsBatchCounter);
// Add the prefilled to the UrlReports.
final Error noError = new Error(null, null);
for ( Payload prefilledPayload : prefilledPayloads )
{
urlReports.add(new UrlReport(UrlReport.StatusType.accessible, prefilledPayload, noError));
}
logger.debug("Final number of UrlReports is " + urlReports.size() + " | assignmentsBatchCounter: " + assignmentsBatchCounter);
// In order to avoid assigning these "prefill" records to workers, before they are inserted in the attempt and payload tables..
// We have to make sure this method is called inside a "DB-locked" code block and the "DB-unlock" happens only after all records are loaded into the DB-tables.
return true;
}
/**
* Set the fileLocation for all those Payloads related to the File.
* @param filePayloads
* @param s3Url
*/
public void setFullTextForMultiplePayloads(@NotNull Set<Payload> filePayloads, String s3Url) {
for ( Payload payload : filePayloads )
if ( payload != null )
payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker.
}
public boolean deleteDirectory(File directory)
{
try {
org.apache.commons.io.FileUtils.deleteDirectory(directory);
return true; // Will return "true" also in case this directory does not exist. So, no Exception will be thrown for that case.
} catch (IOException e) {
logger.error("The following directory could not be deleted: " + directory.getName(), e);
return false;
} catch (IllegalArgumentException iae) {
logger.error("This batch-dir does not exist: " + directory.getName());
return false;
}
}
public boolean deleteFile(String fileFullPathString)
{
try {
FileDeleteStrategy.FORCE.delete(new File(fileFullPathString));
} catch (IOException e) {
logger.error("Error when deleting the file: " + fileFullPathString);
return false;
}
return true;
}
public static final Lock fileAccessLock = new ReentrantLock(true);
public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads)
{
if ( shouldLockThreads ) // In case multiple threads write to the same file. for ex. during the bulk-import procedure.
fileAccessLock.lock();
// TODO - Make this method to be synchronized be specific file, not in general.
// TODO - NOW: Multiple bulkImport procedures (with diff DIRs), are blocked while writing to DIFFERENT files..
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), halfMb) )
{
bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter.
} catch (Exception e) {
String errorMsg = "Failed to create or acquire the file \"" + fileFullPath + "\"!";
logger.error(errorMsg, e);
return errorMsg;
} finally {
if ( shouldLockThreads )
fileAccessLock.unlock();
}
return null;
}
public static String getQueryListString(List<String> list, int fileHashesListSize, int stringBuilderCapacity) {
StringBuilder sb = new StringBuilder(stringBuilderCapacity);
sb.append("(");
for ( int i=0; i < fileHashesListSize; ++i ) {
sb.append("\"").append(list.get(i)).append("\"");
if ( i < (fileHashesListSize -1) )
sb.append(", ");
}
sb.append(")");
return sb.toString();
}
}