diff --git a/README.md b/README.md
index 299ff06..37cfa1a 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@ Then, it receives the "WorkerReports", it requests the full-texts from the worke
It can also process **Bulk-Import** requests, from compatible data sources, in which case it receives the full-text files immediately, without offloading crawling jobs to Workers.
-For interacting with the database we use [**Impala**](https://impala.apache.org/).
+For managing and generating data, we use [**Impala**](https://impala.apache.org/) JDBC and WebHDFS.
diff --git a/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java
index e5fea56..f12eff4 100644
--- a/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java
+++ b/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java
@@ -19,7 +19,7 @@ public class StatsServiceImpl implements StatsService {
private JdbcTemplate jdbcTemplate;
// No DB-lock is required for these READ-operations.
- // BUT! The is an issue.. these queries may run while a "table-merging" operation is in progress.. thus resulting in "no table reference" and "no file found (fieName.parquet)"
+ // BUT! There is an issue.. these queries may run while a "table-merging" operation is in progress.. thus resulting in "no table reference" and "no file found (fieName.parquet)"
// Thus, we need to have an "error-detection-and-retry" mechanism, in order to avoid returning error that we know will exist in certain times and we can overcome them.
// The final time-to-return of the results-retrieval methods may be somewhat large, but the alternative of returning predictable errors or locking the DB and slowing down the aggregation system are even worse.
diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java
index 82f023b..a36d105 100644
--- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java
+++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java
@@ -281,6 +281,8 @@ public class UrlsServiceImpl implements UrlsService {
logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter);
+ boolean hasFulltexts = true;
+
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, curWorkerId);
if ( uploadFullTextsResponse == null ) {
@@ -300,7 +302,9 @@ public class UrlsServiceImpl implements UrlsService {
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
- } else
+ } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful_without_fulltexts )
+ hasFulltexts = false;
+ else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignmentsCounter);
String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignmentsCounter + File.separator;
@@ -318,7 +322,7 @@ public class UrlsServiceImpl implements UrlsService {
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
String endingMkDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.mkDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
- || !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams) )
+ || (hasFulltexts && !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams)) )
{
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignmentsCounter);
return false;
@@ -339,7 +343,7 @@ public class UrlsServiceImpl implements UrlsService {
SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures);
ResponseEntity> errorResponseEntity = sumParquetSuccess.getResponseEntity();
- if ( errorResponseEntity != null ) { // The related log is already shown.
+ if ( errorResponseEntity != null ) { // The related log is already shown in this case.
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating or uploading the parquet files!");
DatabaseConnector.databaseLock.unlock();
return false;
@@ -354,17 +358,16 @@ public class UrlsServiceImpl implements UrlsService {
if ( hasAttemptParquetFileProblem )
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignmentsCounter);
else if ( hasPayloadParquetFileProblem )
- logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter);
+ logger.error("All of the payload-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter);
else
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files, for batch-assignments_" + curReportAssignmentsCounter);
}
// Load all the parquet files of each type into its table.
-
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/", "attempt");
- if ( ! hasPayloadParquetFileProblem )
+ if ( hasFulltexts && ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignmentsCounter + "/", "payload_aggregated");
DatabaseConnector.databaseLock.unlock();
@@ -374,7 +377,7 @@ public class UrlsServiceImpl implements UrlsService {
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignmentsCounter);
else
- logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter);
+ logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" " + (hasFulltexts ? "and the \"payload_aggregated\" tables" : "table") + ", for batch-assignments_" + curReportAssignmentsCounter);
} catch (InterruptedException ie) { // Thrown by "insertsExecutor.invokeAll()". In this case, any unfinished tasks are cancelled.
DatabaseConnector.databaseLock.unlock();
@@ -398,7 +401,7 @@ public class UrlsServiceImpl implements UrlsService {
// Delete the HDFS subDirs for this Report.
String endingRmDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.rmDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
- || !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams) )
+ || (hasFulltexts && !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams)) )
{
logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); // A directory-specific log has already appeared.
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
@@ -418,7 +421,7 @@ public class UrlsServiceImpl implements UrlsService {
// For every "numOfWorkers" assignment-batches that get processed, we merge the "attempts" and "payload_aggregated" tables.
if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) // The workersNum will not be zero!
- if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem) )
+ if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, hasFulltexts) )
// The "postReportResultToWorker()" was called inside.
return false;
@@ -473,7 +476,7 @@ public class UrlsServiceImpl implements UrlsService {
}
- private boolean mergeWorkerRelatedTables(String curWorkerId, long curReportAssignmentsCounter, boolean hasAttemptParquetFileProblem, boolean hasPayloadParquetFileProblem)
+ private boolean mergeWorkerRelatedTables(String curWorkerId, long curReportAssignmentsCounter, boolean hasAttemptParquetFileProblem, boolean hasPayloadParquetFileProblem, boolean hasFulltexts)
{
logger.debug("Going to merge the parquet files for the tables which were altered, for batch-assignments_" + curReportAssignmentsCounter);
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
@@ -492,7 +495,7 @@ public class UrlsServiceImpl implements UrlsService {
}
}
- if ( ! hasPayloadParquetFileProblem ) {
+ if ( hasFulltexts && ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
if ( mergeErrorMsg != null ) {
DatabaseConnector.databaseLock.unlock();
diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java
index 201a3f5..58551ca 100644
--- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java
+++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java
@@ -66,7 +66,7 @@ public class FileUtils {
- public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}
+ public enum UploadFullTextsResponse {successful, successful_without_fulltexts, unsuccessful, databaseError}
public String baseFilesLocation;
@@ -220,9 +220,9 @@ public class FileUtils {
workerIp = workerInfo.getWorkerIP(); // This won't be null.
// Get the file-locations.
- final AtomicInteger numPayloadsToBeHandled = new AtomicInteger(0);
- final AtomicInteger numFullTextsFound = new AtomicInteger(0);
+ final AtomicInteger numValidFullTextsFound = new AtomicInteger(0);
final AtomicInteger numFilesFoundFromPreviousAssignmentsBatches = new AtomicInteger(0);
+ final AtomicInteger numFullTextsWithProblematicLocations = new AtomicInteger(0);
SetMultimap allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((sizeOfUrlReports / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
@@ -240,9 +240,7 @@ public class FileUtils {
String fileLocation = payload.getLocation();
if ( fileLocation == null )
- return null; // The full-text was not retrieved, go to the next UrlReport.
-
- numPayloadsToBeHandled.incrementAndGet();
+ return null; // The full-text was not retrieved for this UrlReport.
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
@@ -266,7 +264,7 @@ public class FileUtils {
if ( logger.isTraceEnabled() )
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches.incrementAndGet();
- numFullTextsFound.incrementAndGet();
+ numValidFullTextsFound.incrementAndGet();
return null; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
}
}
@@ -275,15 +273,17 @@ public class FileUtils {
Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
+ numFullTextsWithProblematicLocations.incrementAndGet();
return null;
}
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
+ numFullTextsWithProblematicLocations.incrementAndGet();
return null;
}
- numFullTextsFound.incrementAndGet();
+ numValidFullTextsFound.incrementAndGet();
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
return null;
@@ -310,24 +310,35 @@ public class FileUtils {
DatabaseConnector.databaseLock.unlock(); // The remaining work of this function does not use the database.
}
- ArrayList allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet());
- int numAllFullTexts = allFileNames.size();
- if ( numAllFullTexts == 0 ) {
+ if ( numFullTextsWithProblematicLocations.get() > 0 )
+ logger.warn(numFullTextsWithProblematicLocations.get() + " files had problematic names.");
+
+ if ( numValidFullTextsFound.get() == 0 ) {
logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
- return UploadFullTextsResponse.successful; // It was handled, no error.
+ return UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either.
}
- logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextsFound.get() + " (out of " + sizeOfUrlReports + " | about " + df.format(numFullTextsFound.get() * 100.0 / sizeOfUrlReports) + "%).");
+ ArrayList allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file.
+ int numFullTextsToBeRequested = allFileNames.size();
+ if ( numFullTextsToBeRequested == 0 ) {
+ logger.info(numValidFullTextsFound.get() + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before.");
+ return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
+ }
+
+ logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound.get() + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound.get() * 100.0 / sizeOfUrlReports) + "%).");
+
+ // TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time..
+
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches.get());
// Request the full-texts in batches, compressed in a zstd tar file.
- int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch);
- int remainingFiles = (numAllFullTexts % numOfFullTextsPerBatch);
+ int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch);
+ int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch);
if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches++;
- logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
+ logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
} else
- logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
+ logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
@@ -364,7 +375,7 @@ public class FileUtils {
continue;
}
- List fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
+ List fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numFullTextsToBeRequested, batchCounter);
String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd";
try {
if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) {
@@ -392,7 +403,7 @@ public class FileUtils {
long finalPayloadsCounter = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.count();
- int numInitialPayloads = numPayloadsToBeHandled.get();
+ int numInitialPayloads = (numValidFullTextsFound.get() + numFullTextsWithProblematicLocations.get());
long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter);
if ( numFailedPayloads == numInitialPayloads ) {
// This will also be the case if there was no DB failure, but all the batches have failed.
diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java
index 9fef898..2a41e97 100644
--- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java
+++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java
@@ -178,7 +178,7 @@ public class ParquetFileUtils {
int numInitialPayloads = initialPayloads.size();
if ( numInitialPayloads > 0 ) // If at least 1 payload was created by the processed records..
{ // (it's ok to have no payloads, if there were no full-texts available)
- // At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved.
+ // At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved FROM the worker.
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method.