UrlsController/src/main/java/eu/openaire/urls_controller/util/FilesHandler.java

405 lines
22 KiB
Java

package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
@Component
public class FilesHandler {
private static final Logger logger = LoggerFactory.getLogger(FilesHandler.class);
@Autowired
private FileUtils fileUtils;
@Autowired
private FileDecompressor fileDecompressor;
@Autowired
private S3ObjectStore s3ObjectStore;
@Value("${services.pdfaggregation.worker.port}")
private String workerPort;
public static final DecimalFormat df = new DecimalFormat("0.00");
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public String baseFilesLocation;
public FilesHandler (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation) {
if ( !baseFilesLocation.endsWith(File.separator) )
baseFilesLocation += File.separator;
if ( !baseFilesLocation.startsWith(File.separator) )
baseFilesLocation = FileUtils.workingDir + baseFilesLocation;
this.baseFilesLocation = baseFilesLocation;
}
private HttpURLConnection getConnectionForFullTextBatch(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException
{
baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
//logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
try {
HttpURLConnection conn = (HttpURLConnection) new URL(requestUrl).openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("User-Agent", "UrlsController");
// TODO - Write the fileNames in the RequestBody, so that we can include as many as we want in each request.
// Right now, we can include only up to 70-80 fileNames in the url-string.
// TODO - We need to add the fileNames in the requestBody BEFORE we connect. So we will need to refactor the code to work in that order.
/*OutputStream os = conn.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8);
osw.write(fileNamesForCurBatch_separated_by_comma);
osw.flush();
osw.close();
os.close();*/
// TODO - The above update will also enable is to improve edge-case management like making sure we do not create a whole new batch for just a few files..
// Check this case for example, where we have one extra batch with the network, compression-decompression, transfer, uploading ect overhead:
// 2023-02-06 12:17:26.893 [http-nio-1880-exec-8] DEBUG e.o.urls_controller.util.FileUtils.getAndUploadFullTexts(@235) - The assignments_12 have 211 distinct non-already-uploaded fullTexts.
// Going to request them from the Worker "worker_X", in 4 batches (70 files each, except for the final batch, which will have 1 files).
// If we are not limited by the url-length we can easily say that if less than 10 files remain for the last batch, then add them to the previous batch (eg. the last batch will have 79 files)
// If equal to 10 or more files remain, then we will make an extra batch.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode == -1 ) { // Invalid HTTP-Response.
logger.warn("Problem when getting the \"status-code\" for url: " + requestUrl);
throw new RuntimeException(); // Avoid any other batches.
} else if ( statusCode != 200 ) {
String errMsg = fileUtils.getMessageFromResponseBody(conn, true);
logger.warn("HTTP-" + statusCode + ": " + errMsg + "\n\nProblem when requesting the ZstdFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl);
if ( ((statusCode >= 500) && (statusCode <= 599))
|| ((statusCode == 400) && ((errMsg != null) && errMsg.contains("The base directory for assignments_" + assignmentsBatchCounter + " was not found"))) )
throw new RuntimeException(); // Throw an exception to indicate that the Worker has problems and all remaining batches will fail as well.
return null;
} else
return conn;
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
String exMessage = e.getMessage();
logger.warn("Problem when requesting the ZstdFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + GenericUtils.endOfLine + exMessage);
if ( exMessage.contains("Connection refused") ) {
logger.error("Since we received a \"Connection refused\", from \"" + workerId + "\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!");
throw new RuntimeException();
}
return null;
}
}
public FileUtils.UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, int sizeOfUrlReports, long assignmentsBatchCounter, String workerId) throws RuntimeException
{
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
String workerIp = null;
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) {
logger.error("Could not retrieve the info of worker: " + workerId);
return null;
} else
workerIp = workerInfo.getWorkerIP(); // This won't be null.
// Get the file-locations.
int numValidFullTextsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
int numFullTextsWithProblematicLocations = 0;
HashMultimap<String, Payload> hashesWithPayloads = fileUtils.getHashesWithPayloads(urlReports, sizeOfUrlReports); // Holds multiple payloads for the same fileHash.
Set<String> fileHashes = hashesWithPayloads.keySet();
int fileHashesSetSize = fileHashes.size(); // Get the size of the keysSet, instead of the whole multimap.
if ( fileHashesSetSize == 0 ) {
logger.warn("No fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\".");
return FileUtils.UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
HashMap<String, String> hashLocationMap = fileUtils.getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter, "assignments");
HashMultimap<String, Payload> allFileNamesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( String fileHash : fileHashes )
{
for ( Payload payload : hashesWithPayloads.get(fileHash) )
{
String alreadyFoundFileLocation = hashLocationMap.get(fileHash); // Only one location has been retrieved per fileHash.
if ( alreadyFoundFileLocation != null ) {
// Fill the payloads with locations from the "previously-found-hashes."
payload.setLocation(alreadyFoundFileLocation);
if ( logger.isTraceEnabled() )
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
numValidFullTextsFound ++; // We trust the location being valid..
}
else { // This file has not been found before..
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
String fileLocation = payload.getLocation();
Matcher matcher = FileUtils.FILEPATH_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FileUtils.FILEPATH_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FileUtils.FILEPATH_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
numValidFullTextsFound ++;
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}
}
}
if ( numFullTextsWithProblematicLocations > 0 )
logger.warn(numFullTextsWithProblematicLocations + " files had problematic names.");
if ( numValidFullTextsFound == 0 ) {
logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return FileUtils.UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either.
}
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file.
int numFullTextsToBeRequested = allFileNames.size();
if ( numFullTextsToBeRequested == 0 ) {
logger.info(numValidFullTextsFound + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before.");
return FileUtils.UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound * 100.0 / sizeOfUrlReports) + "%).");
// TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time..
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
// Request the full-texts in batches, compressed in a zstd tar file.
int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch);
int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch);
if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
} else
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
// TODO - The worker should send the port in which it accepts requests, along with the current request.
// TODO - The least we have to do it to expose the port-assignment somewhere more obvious like inside the "application.yml" file.
String curAssignmentsBaseLocation = baseFilesLocation + "assignments_" + assignmentsBatchCounter + File.separator;
// Note: the "curAssignmentsBaseLocation"-directory will be created once the first batch subdirectory is called for creation.
int failedBatches = 0;
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) {
// TODO - Currently, for big assignments (e.g. 10000), it takes 2 mins (actually 1,5 mins after using the Zstandard compression) for the worker to zstd the files and return them FOR EACH BATCH
// Also it takes around 3 mins for the Controller to process the received files FOR EACH BATCH
// So, for 24 batches, it takes around 24 * 2 * 3 = 144 mins to process all the full-texts for each assignments-batch.
// TODO - What if we could passing a new thread for each full-texts-batch, and make them "FIRE" one after the other.
// TODO - So, the 1st thread with the first batch starts and waits for the the first zstd file from the worker,
// Once it takes the zstd file it continues, but now the Worker is just sitting waiting.. So the 2nd thread fires and asks the new zstd.
// So, the worker will never be left waiting and the Controller will not wait for the Worker either..!
// The worker will not have 2 parallel requests for zstd files, so the single CPU there will not be stressed to zstd many files in parallel.
// Yes the Controller may have the situation in which before finishing uploading the previously receive files to S3, it receives the new zstd from the Worker.
// TODO - BUT, we can make the new thread "WAIT" for the previous to finish.
String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator;
Path curBatchPath;
try { // Create this batch-directory.
curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// The base-directory will be created along with the first batch-directory.
} catch (Exception e) {
logger.error("Could not create the \"curBatchPath\" directory: " + targetDirectory + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
failedBatches ++;
continue;
}
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numFullTextsToBeRequested, batchCounter);
String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd";
try {
if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) {
failedBatches ++;
continue;
}
} catch (RuntimeException re) {
failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. Some initial batches may have succeeded.
break;
}
if ( ! fileDecompressor.decompressAndUploadFullTexts(zstdFileFullPath, curBatchPath, targetDirectory, fileNamesForCurBatch, batchCounter, allFileNamesWithPayloads, assignmentsBatchCounter) )
failedBatches ++;
} // End of batches.
if ( failedBatches == numOfBatches )
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
removeUnretrievedFullTextsFromUrlReports(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed).
fileUtils.deleteDirectory(new File(curAssignmentsBaseLocation));
// Check and warn about the number of failed payloads.
// Possible reasons: failed to check their hash in the DB, the file was not found inside the worker, whole batch failed to be delivered from the worker, files failed t be uploaded to S3
// Retrieve the payloads from the existing urlReports.
long finalPayloadsCounter = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.count();
int numInitialPayloads = (numValidFullTextsFound + numFullTextsWithProblematicLocations);
long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter);
if ( numFailedPayloads == numInitialPayloads ) {
// This will also be the case if there was no DB failure, but all the batches have failed.
logger.error("None of the " + numInitialPayloads + " payloads could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return FileUtils.UploadFullTextsResponse.unsuccessful;
} else if ( numFailedPayloads > 0 )
logger.warn(numFailedPayloads + " payloads (out of " + numInitialPayloads + ") failed to be processed for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return FileUtils.UploadFullTextsResponse.successful;
}
private boolean getAndSaveFullTextBatch(List<String> fileNamesForCurBatch, String baseUrl, long assignmentsBatchCounter, int batchCounter, int numOfBatches,
String zstdFileFullPath, String workerId) throws RuntimeException
{
HttpURLConnection conn;
InputStream inputStream;
try {
if ( (conn = getConnectionForFullTextBatch(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null )
return false;
inputStream = conn.getInputStream();
} catch (RuntimeException re) {
// The "cause" was logged inside "getConnection()".
throw re;
} catch (IOException ioe) {
throw new RuntimeException(ioe.getMessage());
}
// Save and decompress the zstd file. Iterate over the PDFs and upload each one of them and get the S3-Url.
return fileUtils.saveArchive(inputStream, new File(zstdFileFullPath));
// We do not call "conn.disconnect()", since more request are about to be made to the worker, in the near future.
//private static final RestTemplate restTemplate = new RestTemplate();
//baseUrl += batchCounter + "/";
//String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
// Define a response extractor that returns an input stream
//ResponseExtractor<InputStream> responseExtractor = HttpInputMessage::getBody;
/*ResponseExtractor<InputStream> responseExtractor = new ResponseExtractor<InputStream>() {
@Override
public InputStream extractData(ClientHttpResponse response) throws IOException {
return response.getBody();
}
};
try {
//ResponseEntity<Object> responseEntity = restTemplate.exchange(requestUrl, HttpMethod.GET, null, Object.class);
InputStream inputStream = restTemplate.exchange(requestUrl, HttpMethod.GET, null, responseExtractor);
//InputStream inputStream = (InputStream) responseEntity.getBody();
return saveArchive(inputStream, new File(zstdFileFullPath));
} catch (Exception e) {
logger.error("Could not get the file from the response body!", e);
return false;
}*/
}
private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch)
{
final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
sb.append(baseUrl);
int numFullTextsCurBatch = fileNamesForCurBatch.size();
for ( int j=0; j < numFullTextsCurBatch; ++j ){
sb.append(fileNamesForCurBatch.get(j));
if ( j < (numFullTextsCurBatch -1) )
sb.append(",");
}
return sb.toString();
}
private List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch)
{
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
int endingIndex = (curBatch * numOfFullTextsPerBatch);
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
endingIndex = numAllFullTexts;
final List<String> fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch);
for ( int i = initialIndex; i < endingIndex; ++i ) {
try {
fileNamesOfCurBatch.add(allFileNames.get(i));
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for i=" + i + GenericUtils.endOfLine + ioobe.getMessage(), ioobe);
}
}
return fileNamesOfCurBatch;
}
/**
* This method updates the UrlReports to not point to any downloaded fullText files.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails, and we don't want any "links" to locally stored files, which will be deleted.
* If the "shouldCheckAndKeepS3UploadedFiles" is set to "true", then the payloads which have their file uploaded to the S3-ObjectStore, are excluded.
* @param urlReports
* @param shouldCheckAndKeepS3UploadedFiles
*/
public void removeUnretrievedFullTextsFromUrlReports(List<UrlReport> urlReports, boolean shouldCheckAndKeepS3UploadedFiles)
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
if ( shouldCheckAndKeepS3UploadedFiles ) {
String fileLocation = payload.getLocation();
if ( (fileLocation == null) || s3ObjectStore.isLocationInStore(fileLocation) )
continue;
}
// Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text ("actual_url") will be kept, for now.
payload.setLocation(null); // This will cause the payload to not be inserted into the "payload" table in the database. Only the "attempt" record will be inserted.
payload.setHash(null);
payload.setMime_type(null);
payload.setSize(null);
// The id-url record will be called as a new assignment in the future.
}
}
}