Add more info in bulk-import logs.

This commit is contained in:
Lampros Smyrnaios 2023-09-20 17:50:10 +03:00
parent 0f5d4dac78
commit 90a864ea61
1 changed files with 21 additions and 21 deletions

View File

@ -102,7 +102,7 @@ public class BulkImportServiceImpl implements BulkImportService {
} }
if ( logger.isTraceEnabled() ) if ( logger.isTraceEnabled() )
logger.trace("fileLocations:\n" + fileLocations); logger.trace("fileLocations: " + additionalLoggingMsg + "\n" + fileLocations);
String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/". String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/".
try { try {
@ -142,7 +142,7 @@ public class BulkImportServiceImpl implements BulkImportService {
for ( int i = 0; i < subListsSize; ++i ) { for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i; int finalI = i;
callableTasksForFileSegments.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. callableTasksForFileSegments.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish); return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish, additionalLoggingMsg);
}); });
} }
@ -226,14 +226,14 @@ public class BulkImportServiceImpl implements BulkImportService {
private int processBulkImportedFilesSegment(BulkImportReport bulkImportReport, int segmentCounter, List<String> fileLocationsSegment, String bulkImportDirName, String localParquetDir, String currentBulkImportHdfsDir, private int processBulkImportedFilesSegment(BulkImportReport bulkImportReport, int segmentCounter, List<String> fileLocationsSegment, String bulkImportDirName, String localParquetDir, String currentBulkImportHdfsDir,
String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish) String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish, String additionalLoggingMsg)
{ {
// Inside this thread, process a segment of the files. // Inside this thread, process a segment of the files.
String bulkImportReportLocation = bulkImportReport.getReportLocation(); String bulkImportReportLocation = bulkImportReport.getReportLocation();
int numOfFilesInSegment = fileLocationsSegment.size(); int numOfFilesInSegment = fileLocationsSegment.size();
String msg = "Going to import " + numOfFilesInSegment + " files, for segment-" + segmentCounter + ", of bulkImport procedure: " + provenance + " | dir: " + bulkImportDirName; String msg = "Going to import " + numOfFilesInSegment + " files, for segment-" + segmentCounter;
logger.debug(msg); logger.debug(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg); bulkImportReport.addEvent(msg);
List<GenericData.Record> payloadRecords = new ArrayList<>(numOfFilesInSegment); List<GenericData.Record> payloadRecords = new ArrayList<>(numOfFilesInSegment);
@ -244,7 +244,7 @@ public class BulkImportServiceImpl implements BulkImportService {
// Upload files to S3 and collect payloadRecords. // Upload files to S3 and collect payloadRecords.
for ( String fileLocation: fileLocationsSegment ) { for ( String fileLocation: fileLocationsSegment ) {
GenericData.Record record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis); GenericData.Record record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg);
if ( record != null ) if ( record != null )
payloadRecords.add(record); payloadRecords.add(record);
else { else {
@ -252,10 +252,10 @@ public class BulkImportServiceImpl implements BulkImportService {
failedFiles.add(fileLocation); failedFiles.add(fileLocation);
} }
if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status. if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment.
msg = "Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files."; msg = "Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files.";
if ( logger.isTraceEnabled() ) if ( logger.isTraceEnabled() )
logger.trace(msg); logger.trace(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg); bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
} }
@ -272,7 +272,7 @@ public class BulkImportServiceImpl implements BulkImportService {
} else if ( numOfPayloadRecords != numOfFilesInSegment ) { } else if ( numOfPayloadRecords != numOfFilesInSegment ) {
// Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early. // Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early.
String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment-" + segmentCounter + " !"; String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment-" + segmentCounter + " !";
logger.warn(errorMsg); logger.warn(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg); bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
} }
@ -282,11 +282,11 @@ public class BulkImportServiceImpl implements BulkImportService {
String fullLocalParquetFilePath = localParquetDir + parquetFileName; String fullLocalParquetFilePath = localParquetDir + parquetFileName;
if ( logger.isTraceEnabled() ) if ( logger.isTraceEnabled() )
logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG! logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath + additionalLoggingMsg); // DEBUG!
if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { // Any errorMsg is already logged inside. if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { // Any errorMsg is already logged inside.
String errorMsg = "Could not write the payload-records for segment-" + segmentCounter + " to the parquet-file: " + parquetFileName + " !"; String errorMsg = "Could not write the payload-records for segment-" + segmentCounter + " to the parquet-file: " + parquetFileName + " !";
logger.error(errorMsg); logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg); bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case. // None of the files of this segment will be deleted, in any case.
@ -294,13 +294,13 @@ public class BulkImportServiceImpl implements BulkImportService {
} }
if ( logger.isTraceEnabled() ) if ( logger.isTraceEnabled() )
logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS."); // DEBUG! logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS." + additionalLoggingMsg); // DEBUG!
// Upload and insert the data to the "payload" Impala table. (no database-locking is required) // Upload and insert the data to the "payload" Impala table. (no database-locking is required)
String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); // The returned message is already logged inside. String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); // The returned message is already logged inside.
if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller. if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller.
errorMsg = "Could not upload the parquet-file " + parquetFileName + " to HDFS!"; errorMsg = "Could not upload the parquet-file " + parquetFileName + " to HDFS!";
logger.error(errorMsg); logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg); bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case. // None of the files of this segment will be deleted, in any case.
@ -308,13 +308,13 @@ public class BulkImportServiceImpl implements BulkImportService {
} }
if ( logger.isTraceEnabled() ) if ( logger.isTraceEnabled() )
logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"."); // DEBUG! logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"." + additionalLoggingMsg); // DEBUG!
DatabaseConnector.databaseLock.lock(); DatabaseConnector.databaseLock.lock();
if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) { if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) {
DatabaseConnector.databaseLock.unlock(); DatabaseConnector.databaseLock.unlock();
errorMsg = "Could not load the payload-records to the database, for segment-" + segmentCounter + "!"; errorMsg = "Could not load the payload-records to the database, for segment-" + segmentCounter + "!";
logger.error(errorMsg); logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg); bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case. // None of the files of this segment will be deleted, in any case.
@ -323,7 +323,7 @@ public class BulkImportServiceImpl implements BulkImportService {
DatabaseConnector.databaseLock.unlock(); DatabaseConnector.databaseLock.unlock();
String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + ", for segment-" + segmentCounter + "."; String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + ", for segment-" + segmentCounter + ".";
logger.info(segmentSuccessMsg); logger.info(segmentSuccessMsg + additionalLoggingMsg);
bulkImportReport.addEvent(segmentSuccessMsg); bulkImportReport.addEvent(segmentSuccessMsg);
if ( shouldDeleteFilesOnFinish ) { if ( shouldDeleteFilesOnFinish ) {
@ -342,7 +342,7 @@ public class BulkImportServiceImpl implements BulkImportService {
} }
private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis) private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg)
{ {
File fullTextFile = new File(fileLocation); File fullTextFile = new File(fileLocation);
DocFileData docFileData = new DocFileData(fullTextFile, null, null, null); DocFileData docFileData = new DocFileData(fullTextFile, null, null, null);
@ -355,7 +355,7 @@ public class BulkImportServiceImpl implements BulkImportService {
try { try {
fileLocationData = new FileLocationData(fileLocation); fileLocationData = new FileLocationData(fileLocation);
} catch (RuntimeException re) { } catch (RuntimeException re) {
logger.error(re.getMessage()); logger.error(re.getMessage() + additionalLoggingMsg);
return null; return null;
} }
@ -379,7 +379,7 @@ public class BulkImportServiceImpl implements BulkImportService {
} catch (EmptyResultDataAccessException erdae) { } catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default. // No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) { } catch (Exception e) {
logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!\n", e); logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!" + additionalLoggingMsg, e);
// Continue with bulk-importing the file and uploading it to S3. // Continue with bulk-importing the file and uploading it to S3.
} finally { } finally {
DatabaseConnector.databaseLock.unlock(); DatabaseConnector.databaseLock.unlock();
@ -410,7 +410,7 @@ public class BulkImportServiceImpl implements BulkImportService {
if ( s3Url == null ) if ( s3Url == null )
return null; // In case the 'datasourceID' or 'hash' is null. Which should never happen here, since both of them are checked before the execution reaches here. return null; // In case the 'datasourceID' or 'hash' is null. Which should never happen here, since both of them are checked before the execution reaches here.
} catch (Exception e) { } catch (Exception e) {
logger.error("Could not upload the file '" + fileLocationData.getFileName() + "' to the S3 ObjectStore!", e); logger.error("Could not upload the file '" + fileLocationData.getFileName() + "' to the S3 ObjectStore!" + additionalLoggingMsg, e);
return null; return null;
} }
} }
@ -425,7 +425,7 @@ public class BulkImportServiceImpl implements BulkImportService {
record.put("size", ((size != null) ? String.valueOf(size) : null)); record.put("size", ((size != null) ? String.valueOf(size) : null));
record.put("hash", fileHash); // This is already checked and will not be null here. record.put("hash", fileHash); // This is already checked and will not be null here.
record.put("location", s3Url); record.put("location", s3Url);
record.put("provenance", ("bulk:" + provenance)); // Add a prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW. record.put("provenance", ("bulk:" + provenance)); // Add the "bulk:" prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW.
return record; return record;
} }