diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 2618406..0f05a73 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -242,46 +242,46 @@ public class UrlsServiceImpl implements UrlsService { @Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.") - public Boolean addWorkerReport(String curWorkerId, long curReportAssignments, List urlReports, int sizeOfUrlReports) + public Boolean addWorkerReport(String curWorkerId, long curReportAssignmentsCounter, List urlReports, int sizeOfUrlReports) { - logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + assignmentsBatchCounter); + logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter); // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". - FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, curReportAssignments, curWorkerId); + FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, curReportAssignmentsCounter, curWorkerId); if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) { - postReportResultToWorker(curWorkerId, curReportAssignments, "Problem with the Impala-database!"); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Problem with the Impala-database!"); return false; } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { - logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignments); + logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter); // The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available. fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false); // We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store. // We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later. // The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later. } else - logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments); + logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignmentsCounter); - String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignments + File.separator; + String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignmentsCounter + File.separator; try { Files.createDirectories(Paths.get(localParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException" } catch (Exception e) { String errorMsg = "Could not create the parquet-directory: " + localParquetPath; logger.error(errorMsg, e); - postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg); return false; } - logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments); + logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignmentsCounter); - List> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignments, localParquetPath, uploadFullTextsResponse); + List> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, localParquetPath, uploadFullTextsResponse); // Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables. - String endingMkDirAndParams = curReportAssignments + "/" + parquetFileUtils.mkDirsAndParams; + String endingMkDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.mkDirsAndParams; if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams) || !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams) ) { - postReportResultToWorker(curWorkerId, curReportAssignments, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignments); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); return false; } @@ -294,19 +294,19 @@ public class UrlsServiceImpl implements UrlsService { SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures); ResponseEntity errorResponseEntity = sumParquetSuccess.getResponseEntity(); if ( errorResponseEntity != null ) { // The related log is already shown. - postReportResultToWorker(curWorkerId, curReportAssignments, "Error when creating or uploading the parquet files!"); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating or uploading the parquet files!"); return false; } hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem(); hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem(); if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem ) - throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignments); + throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignmentsCounter); else { if ( hasAttemptParquetFileProblem ) - logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignments); + logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignmentsCounter); else if ( hasPayloadParquetFileProblem ) - logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignments); + logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter); else logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files."); } @@ -315,19 +315,19 @@ public class UrlsServiceImpl implements UrlsService { ImpalaConnector.databaseLock.lock(); if ( ! hasAttemptParquetFileProblem ) - hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignments + "/", "attempt"); + hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/", "attempt"); if ( ! hasPayloadParquetFileProblem ) - hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/", "payload_aggregated"); + hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignmentsCounter + "/", "payload_aggregated"); ImpalaConnector.databaseLock.unlock(); if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem ) - throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignments); + throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter); else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem ) - logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignments); + logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignmentsCounter); else - logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignments); + logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter); } catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled. logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage()); @@ -340,22 +340,22 @@ public class UrlsServiceImpl implements UrlsService { if ( assignmentErrorMsg != null ) errorMsg += "\n" + assignmentErrorMsg; logger.error(errorMsg); - postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg); return false; } catch (Exception e) { String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage(); logger.error(errorMsg, e); - postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg); return false; // No tables-merging is happening. } finally { logger.debug("Deleting parquet directory: " + localParquetPath); fileUtils.deleteDirectory(new File(localParquetPath)); // Delete the HDFS subDirs for this Report. - String endingRmDirAndParams = curReportAssignments + "/" + parquetFileUtils.rmDirsAndParams; + String endingRmDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.rmDirsAndParams; if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams) || !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams) ) { - logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignments); // A directory-specific log has already appeared. + logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); // A directory-specific log has already appeared. // The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files). // The worst case is that a few subDirs will be left back in the HDFS, although, without their parquetFiles, since they have already moved inside the DB tables. } @@ -372,7 +372,7 @@ public class UrlsServiceImpl implements UrlsService { mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); return false; } } @@ -381,7 +381,7 @@ public class UrlsServiceImpl implements UrlsService { mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); return false; } } @@ -389,7 +389,7 @@ public class UrlsServiceImpl implements UrlsService { mergeErrorMsg = deleteWorkerAssignments(curWorkerId); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); return false; } @@ -397,12 +397,12 @@ public class UrlsServiceImpl implements UrlsService { logger.debug("Finished merging the database tables."); if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { - postReportResultToWorker(curWorkerId, curReportAssignments, "The full-text files failed to be acquired from the worker!"); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "The full-text files failed to be acquired from the worker!"); return false; } // Notify the Worker that the processing of this report was successful, so that the Worker can delete the files. - postReportResultToWorker(curWorkerId, curReportAssignments, null); + postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, null); return true; } diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index 85ba4bc..cd8f502 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -205,7 +205,7 @@ public class ParquetFileUtils { } - public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List urlReports, long curReportAssignments, String localParquetPath) + public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List urlReports, long curReportAssignmentsCounter, String localParquetPath) { List recordList = new ArrayList<>(urlReports.size()); GenericData.Record record; @@ -213,7 +213,7 @@ public class ParquetFileUtils { for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); if ( payload == null ) { - logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport); + logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + "\n" + urlReport); continue; } @@ -244,7 +244,7 @@ public class ParquetFileUtils { return false; } - String fileName = UrlsServiceImpl.assignmentsBatchCounter.get() + "_attempts_" + attemptsIncNum + ".parquet"; + String fileName = curReportAssignmentsCounter + "_attempts_" + attemptsIncNum + ".parquet"; //logger.trace("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG! String fullFilePath = localParquetPath + fileName; @@ -252,7 +252,7 @@ public class ParquetFileUtils { //logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG! // Upload and insert the data to the "attempt" Impala table. - String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + curReportAssignments + "/")); + String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/")); return (errorMsg == null); // The possible error-message returned, is already logged by the Controller. } else return false; @@ -262,7 +262,7 @@ public class ParquetFileUtils { } - public boolean createAndLoadParquetDataIntoPayloadTable(List urlReports, long curReportAssignments, String localParquetPath, String parquetHDFSDirectoryPathPayloads) + public boolean createAndLoadParquetDataIntoPayloadTable(List urlReports, long curReportAssignmentsCounter, String localParquetPath, String parquetHDFSDirectoryPathPayloads) { List recordList = new ArrayList<>((int) (urlReports.size() * 0.2)); GenericData.Record record; @@ -271,7 +271,7 @@ public class ParquetFileUtils { { Payload payload = urlReport.getPayload(); if ( payload == null ) { - logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport); + logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + "\n" + urlReport); continue; } @@ -304,7 +304,7 @@ public class ParquetFileUtils { return false; } - String fileName = UrlsServiceImpl.assignmentsBatchCounter.get() + "_payloads.parquet"; + String fileName = curReportAssignmentsCounter + "_payloads.parquet"; //logger.trace("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG! String fullFilePath = localParquetPath + fileName;