From 90a864ea612074bf12e99992a23bfbe6b09103f9 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Wed, 20 Sep 2023 17:50:10 +0300 Subject: [PATCH] Add more info in bulk-import logs. --- .../services/BulkImportServiceImpl.java | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index 4397b6c..7d426c1 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -102,7 +102,7 @@ public class BulkImportServiceImpl implements BulkImportService { } if ( logger.isTraceEnabled() ) - logger.trace("fileLocations:\n" + fileLocations); + logger.trace("fileLocations: " + additionalLoggingMsg + "\n" + fileLocations); String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/". try { @@ -142,7 +142,7 @@ public class BulkImportServiceImpl implements BulkImportService { for ( int i = 0; i < subListsSize; ++i ) { int finalI = i; callableTasksForFileSegments.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. - return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish); + return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish, additionalLoggingMsg); }); } @@ -226,14 +226,14 @@ public class BulkImportServiceImpl implements BulkImportService { private int processBulkImportedFilesSegment(BulkImportReport bulkImportReport, int segmentCounter, List fileLocationsSegment, String bulkImportDirName, String localParquetDir, String currentBulkImportHdfsDir, - String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish) + String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish, String additionalLoggingMsg) { // Inside this thread, process a segment of the files. String bulkImportReportLocation = bulkImportReport.getReportLocation(); int numOfFilesInSegment = fileLocationsSegment.size(); - String msg = "Going to import " + numOfFilesInSegment + " files, for segment-" + segmentCounter + ", of bulkImport procedure: " + provenance + " | dir: " + bulkImportDirName; - logger.debug(msg); + String msg = "Going to import " + numOfFilesInSegment + " files, for segment-" + segmentCounter; + logger.debug(msg + additionalLoggingMsg); bulkImportReport.addEvent(msg); List payloadRecords = new ArrayList<>(numOfFilesInSegment); @@ -244,7 +244,7 @@ public class BulkImportServiceImpl implements BulkImportService { // Upload files to S3 and collect payloadRecords. for ( String fileLocation: fileLocationsSegment ) { - GenericData.Record record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis); + GenericData.Record record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg); if ( record != null ) payloadRecords.add(record); else { @@ -252,10 +252,10 @@ public class BulkImportServiceImpl implements BulkImportService { failedFiles.add(fileLocation); } - if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status. + if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment. msg = "Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files."; if ( logger.isTraceEnabled() ) - logger.trace(msg); + logger.trace(msg + additionalLoggingMsg); bulkImportReport.addEvent(msg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); } @@ -272,7 +272,7 @@ public class BulkImportServiceImpl implements BulkImportService { } else if ( numOfPayloadRecords != numOfFilesInSegment ) { // Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early. String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment-" + segmentCounter + " !"; - logger.warn(errorMsg); + logger.warn(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); } @@ -282,11 +282,11 @@ public class BulkImportServiceImpl implements BulkImportService { String fullLocalParquetFilePath = localParquetDir + parquetFileName; if ( logger.isTraceEnabled() ) - logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG! + logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath + additionalLoggingMsg); // DEBUG! if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { // Any errorMsg is already logged inside. String errorMsg = "Could not write the payload-records for segment-" + segmentCounter + " to the parquet-file: " + parquetFileName + " !"; - logger.error(errorMsg); + logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. @@ -294,13 +294,13 @@ public class BulkImportServiceImpl implements BulkImportService { } if ( logger.isTraceEnabled() ) - logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS."); // DEBUG! + logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS." + additionalLoggingMsg); // DEBUG! // Upload and insert the data to the "payload" Impala table. (no database-locking is required) String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); // The returned message is already logged inside. if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller. errorMsg = "Could not upload the parquet-file " + parquetFileName + " to HDFS!"; - logger.error(errorMsg); + logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. @@ -308,13 +308,13 @@ public class BulkImportServiceImpl implements BulkImportService { } if ( logger.isTraceEnabled() ) - logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"."); // DEBUG! + logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"." + additionalLoggingMsg); // DEBUG! DatabaseConnector.databaseLock.lock(); if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) { DatabaseConnector.databaseLock.unlock(); errorMsg = "Could not load the payload-records to the database, for segment-" + segmentCounter + "!"; - logger.error(errorMsg); + logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. @@ -323,7 +323,7 @@ public class BulkImportServiceImpl implements BulkImportService { DatabaseConnector.databaseLock.unlock(); String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + ", for segment-" + segmentCounter + "."; - logger.info(segmentSuccessMsg); + logger.info(segmentSuccessMsg + additionalLoggingMsg); bulkImportReport.addEvent(segmentSuccessMsg); if ( shouldDeleteFilesOnFinish ) { @@ -342,7 +342,7 @@ public class BulkImportServiceImpl implements BulkImportService { } - private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis) + private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg) { File fullTextFile = new File(fileLocation); DocFileData docFileData = new DocFileData(fullTextFile, null, null, null); @@ -355,7 +355,7 @@ public class BulkImportServiceImpl implements BulkImportService { try { fileLocationData = new FileLocationData(fileLocation); } catch (RuntimeException re) { - logger.error(re.getMessage()); + logger.error(re.getMessage() + additionalLoggingMsg); return null; } @@ -379,7 +379,7 @@ public class BulkImportServiceImpl implements BulkImportService { } catch (EmptyResultDataAccessException erdae) { // No fileLocation is found, it's ok. It will be null by default. } catch (Exception e) { - logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!\n", e); + logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!" + additionalLoggingMsg, e); // Continue with bulk-importing the file and uploading it to S3. } finally { DatabaseConnector.databaseLock.unlock(); @@ -410,7 +410,7 @@ public class BulkImportServiceImpl implements BulkImportService { if ( s3Url == null ) return null; // In case the 'datasourceID' or 'hash' is null. Which should never happen here, since both of them are checked before the execution reaches here. } catch (Exception e) { - logger.error("Could not upload the file '" + fileLocationData.getFileName() + "' to the S3 ObjectStore!", e); + logger.error("Could not upload the file '" + fileLocationData.getFileName() + "' to the S3 ObjectStore!" + additionalLoggingMsg, e); return null; } } @@ -425,7 +425,7 @@ public class BulkImportServiceImpl implements BulkImportService { record.put("size", ((size != null) ? String.valueOf(size) : null)); record.put("hash", fileHash); // This is already checked and will not be null here. record.put("location", s3Url); - record.put("provenance", ("bulk:" + provenance)); // Add a prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW. + record.put("provenance", ("bulk:" + provenance)); // Add the "bulk:" prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW. return record; }