From 199105f7f1282d45fe64963c458b262f65c55827 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Mon, 4 Sep 2023 16:33:27 +0300 Subject: [PATCH] Fix not writing some bulk-import error-messages to the logs. Instead, they were only written to the json-reports. --- .../services/BulkImportServiceImpl.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index 07f5568..cc234b2 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -77,7 +77,7 @@ public class BulkImportServiceImpl implements BulkImportService { return false; } - List fileLocations = getFileLocationsInsideDir(bulkImportDirName); + List fileLocations = getFileLocationsInsideDir(bulkImportDirName); // the error-msg has already been written if ( fileLocations == null ) { bulkImportReport.addEvent("Could not retrieve the files for bulk-import!"); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); @@ -195,7 +195,7 @@ public class BulkImportServiceImpl implements BulkImportService { // Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations. DatabaseConnector.databaseLock.lock(); - String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null); + String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null); // msg is already logged if ( mergeErrorMsg != null ) { DatabaseConnector.databaseLock.unlock(); bulkImportReport.addEvent(mergeErrorMsg); @@ -274,8 +274,10 @@ public class BulkImportServiceImpl implements BulkImportService { if ( logger.isTraceEnabled() ) logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG! - if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { - bulkImportReport.addEvent("Could not write the payload-records for segment-" + segmentCounter + " to the parquet-file: " + parquetFileName + " !"); + if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { // Any errorMsg is already logged inside. + String errorMsg = "Could not write the payload-records for segment-" + segmentCounter + " to the parquet-file: " + parquetFileName + " !"; + logger.error(errorMsg); + bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; // All files of this segment have failed. @@ -285,9 +287,11 @@ public class BulkImportServiceImpl implements BulkImportService { logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS."); // DEBUG! // Upload and insert the data to the "payload" Impala table. (no database-locking is required) - String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); + String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); // The returned message is already logged inside. if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller. - bulkImportReport.addEvent("Could not upload the parquet-file " + parquetFileName + " to HDFS!"); + errorMsg = "Could not upload the parquet-file " + parquetFileName + " to HDFS!"; + logger.error(errorMsg); + bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; // All files of this segment have failed. @@ -299,7 +303,9 @@ public class BulkImportServiceImpl implements BulkImportService { DatabaseConnector.databaseLock.lock(); if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) { DatabaseConnector.databaseLock.unlock(); - bulkImportReport.addEvent("Could not load the payload-records to the database, for segment-" + segmentCounter + "!"); + errorMsg = "Could not load the payload-records to the database, for segment-" + segmentCounter + "!"; + logger.error(errorMsg); + bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; // All files of this segment have failed.