- Improve handling of the case when no fulltexts have been found or none of the found ones were requested from the worker, as they were already retrieved in the past.

- Show the number of files with problematic locations (if any of them exist).
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2024-02-23 12:39:28 +02:00
parent 749172edd8
commit 43ea64758d
5 changed files with 47 additions and 33 deletions

View File

@ -5,7 +5,7 @@ Then, it receives the "WorkerReports", it requests the full-texts from the worke
<br>
It can also process **Bulk-Import** requests, from compatible data sources, in which case it receives the full-text files immediately, without offloading crawling jobs to Workers.<br>
<br>
For interacting with the database we use [**Impala**](https://impala.apache.org/).<br>
For managing and generating data, we use [**Impala**](https://impala.apache.org/) JDBC and WebHDFS.<br>
<br>

View File

@ -19,7 +19,7 @@ public class StatsServiceImpl implements StatsService {
private JdbcTemplate jdbcTemplate;
// No DB-lock is required for these READ-operations.
// BUT! The is an issue.. these queries may run while a "table-merging" operation is in progress.. thus resulting in "no table reference" and "no file found (fieName.parquet)"
// BUT! There is an issue.. these queries may run while a "table-merging" operation is in progress.. thus resulting in "no table reference" and "no file found (fieName.parquet)"
// Thus, we need to have an "error-detection-and-retry" mechanism, in order to avoid returning error that we know will exist in certain times and we can overcome them.
// The final time-to-return of the results-retrieval methods may be somewhat large, but the alternative of returning predictable errors or locking the DB and slowing down the aggregation system are even worse.

View File

@ -281,6 +281,8 @@ public class UrlsServiceImpl implements UrlsService {
logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter);
boolean hasFulltexts = true;
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, curWorkerId);
if ( uploadFullTextsResponse == null ) {
@ -300,7 +302,9 @@ public class UrlsServiceImpl implements UrlsService {
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
} else
} else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful_without_fulltexts )
hasFulltexts = false;
else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignmentsCounter);
String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignmentsCounter + File.separator;
@ -318,7 +322,7 @@ public class UrlsServiceImpl implements UrlsService {
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
String endingMkDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.mkDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams) )
|| (hasFulltexts && !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams)) )
{
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignmentsCounter);
return false;
@ -339,7 +343,7 @@ public class UrlsServiceImpl implements UrlsService {
SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures);
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
if ( errorResponseEntity != null ) { // The related log is already shown.
if ( errorResponseEntity != null ) { // The related log is already shown in this case.
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating or uploading the parquet files!");
DatabaseConnector.databaseLock.unlock();
return false;
@ -354,17 +358,16 @@ public class UrlsServiceImpl implements UrlsService {
if ( hasAttemptParquetFileProblem )
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignmentsCounter);
else if ( hasPayloadParquetFileProblem )
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter);
logger.error("All of the payload-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter);
else
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files, for batch-assignments_" + curReportAssignmentsCounter);
}
// Load all the parquet files of each type into its table.
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/", "attempt");
if ( ! hasPayloadParquetFileProblem )
if ( hasFulltexts && ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignmentsCounter + "/", "payload_aggregated");
DatabaseConnector.databaseLock.unlock();
@ -374,7 +377,7 @@ public class UrlsServiceImpl implements UrlsService {
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignmentsCounter);
else
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter);
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" " + (hasFulltexts ? "and the \"payload_aggregated\" tables" : "table") + ", for batch-assignments_" + curReportAssignmentsCounter);
} catch (InterruptedException ie) { // Thrown by "insertsExecutor.invokeAll()". In this case, any unfinished tasks are cancelled.
DatabaseConnector.databaseLock.unlock();
@ -398,7 +401,7 @@ public class UrlsServiceImpl implements UrlsService {
// Delete the HDFS subDirs for this Report.
String endingRmDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.rmDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams) )
|| (hasFulltexts && !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams)) )
{
logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); // A directory-specific log has already appeared.
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
@ -418,7 +421,7 @@ public class UrlsServiceImpl implements UrlsService {
// For every "numOfWorkers" assignment-batches that get processed, we merge the "attempts" and "payload_aggregated" tables.
if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) // The workersNum will not be zero!
if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem) )
if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, hasFulltexts) )
// The "postReportResultToWorker()" was called inside.
return false;
@ -473,7 +476,7 @@ public class UrlsServiceImpl implements UrlsService {
}
private boolean mergeWorkerRelatedTables(String curWorkerId, long curReportAssignmentsCounter, boolean hasAttemptParquetFileProblem, boolean hasPayloadParquetFileProblem)
private boolean mergeWorkerRelatedTables(String curWorkerId, long curReportAssignmentsCounter, boolean hasAttemptParquetFileProblem, boolean hasPayloadParquetFileProblem, boolean hasFulltexts)
{
logger.debug("Going to merge the parquet files for the tables which were altered, for batch-assignments_" + curReportAssignmentsCounter);
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
@ -492,7 +495,7 @@ public class UrlsServiceImpl implements UrlsService {
}
}
if ( ! hasPayloadParquetFileProblem ) {
if ( hasFulltexts && ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
if ( mergeErrorMsg != null ) {
DatabaseConnector.databaseLock.unlock();

View File

@ -66,7 +66,7 @@ public class FileUtils {
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}
public enum UploadFullTextsResponse {successful, successful_without_fulltexts, unsuccessful, databaseError}
public String baseFilesLocation;
@ -220,9 +220,9 @@ public class FileUtils {
workerIp = workerInfo.getWorkerIP(); // This won't be null.
// Get the file-locations.
final AtomicInteger numPayloadsToBeHandled = new AtomicInteger(0);
final AtomicInteger numFullTextsFound = new AtomicInteger(0);
final AtomicInteger numValidFullTextsFound = new AtomicInteger(0);
final AtomicInteger numFilesFoundFromPreviousAssignmentsBatches = new AtomicInteger(0);
final AtomicInteger numFullTextsWithProblematicLocations = new AtomicInteger(0);
SetMultimap<String, Payload> allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((sizeOfUrlReports / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
@ -240,9 +240,7 @@ public class FileUtils {
String fileLocation = payload.getLocation();
if ( fileLocation == null )
return null; // The full-text was not retrieved, go to the next UrlReport.
numPayloadsToBeHandled.incrementAndGet();
return null; // The full-text was not retrieved for this UrlReport.
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
@ -266,7 +264,7 @@ public class FileUtils {
if ( logger.isTraceEnabled() )
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches.incrementAndGet();
numFullTextsFound.incrementAndGet();
numValidFullTextsFound.incrementAndGet();
return null; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
}
}
@ -275,15 +273,17 @@ public class FileUtils {
Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
numFullTextsWithProblematicLocations.incrementAndGet();
return null;
}
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
numFullTextsWithProblematicLocations.incrementAndGet();
return null;
}
numFullTextsFound.incrementAndGet();
numValidFullTextsFound.incrementAndGet();
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
return null;
@ -310,24 +310,35 @@ public class FileUtils {
DatabaseConnector.databaseLock.unlock(); // The remaining work of this function does not use the database.
}
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet());
int numAllFullTexts = allFileNames.size();
if ( numAllFullTexts == 0 ) {
if ( numFullTextsWithProblematicLocations.get() > 0 )
logger.warn(numFullTextsWithProblematicLocations.get() + " files had problematic names.");
if ( numValidFullTextsFound.get() == 0 ) {
logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return UploadFullTextsResponse.successful; // It was handled, no error.
return UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either.
}
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextsFound.get() + " (out of " + sizeOfUrlReports + " | about " + df.format(numFullTextsFound.get() * 100.0 / sizeOfUrlReports) + "%).");
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file.
int numFullTextsToBeRequested = allFileNames.size();
if ( numFullTextsToBeRequested == 0 ) {
logger.info(numValidFullTextsFound.get() + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before.");
return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound.get() + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound.get() * 100.0 / sizeOfUrlReports) + "%).");
// TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time..
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches.get());
// Request the full-texts in batches, compressed in a zstd tar file.
int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch);
int remainingFiles = (numAllFullTexts % numOfFullTextsPerBatch);
int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch);
int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch);
if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
} else
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
@ -364,7 +375,7 @@ public class FileUtils {
continue;
}
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numFullTextsToBeRequested, batchCounter);
String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd";
try {
if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) {
@ -392,7 +403,7 @@ public class FileUtils {
long finalPayloadsCounter = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.count();
int numInitialPayloads = numPayloadsToBeHandled.get();
int numInitialPayloads = (numValidFullTextsFound.get() + numFullTextsWithProblematicLocations.get());
long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter);
if ( numFailedPayloads == numInitialPayloads ) {
// This will also be the case if there was no DB failure, but all the batches have failed.

View File

@ -178,7 +178,7 @@ public class ParquetFileUtils {
int numInitialPayloads = initialPayloads.size();
if ( numInitialPayloads > 0 ) // If at least 1 payload was created by the processed records..
{ // (it's ok to have no payloads, if there were no full-texts available)
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved.
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved FROM the worker.
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method.