- Allow "pretty-printing" the json response of the "getBulkImportReport" endpoint.

- Add useful log-messages for various bulk-import stages and improve the current ones.
- Optimize reading and writing the reports.
This commit is contained in:
Lampros Smyrnaios 2023-09-11 17:24:39 +03:00
parent 6944678391
commit ee2df19ce1
3 changed files with 46 additions and 20 deletions

View File

@ -1,5 +1,9 @@
package eu.openaire.urls_controller.controllers;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.models.BulkImportResponse;
@ -213,12 +217,14 @@ public class BulkImportController {
@GetMapping(value = "getBulkImportReport", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<?> getBulkImportReport(@RequestParam("id") String bulkImportReportId)
public ResponseEntity<?> getBulkImportReport(@RequestParam("id") String bulkImportReportId, @RequestParam(name = "pretty", defaultValue = "false") boolean prettyFormatting)
{
logger.info("Received a \"getBulkImportReport\" request for \"bulkImportReportId\": \"" + bulkImportReportId + "\"." + (prettyFormatting ? " Will return the report pretty-formatted." : ""));
// Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response.
StringBuilder stringBuilder = new StringBuilder(2_000);
final StringBuilder stringBuilder = new StringBuilder(25_000);
String line;
try ( BufferedReader in = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(this.bulkImportReportLocation, bulkImportReportId + ".json"))), FileUtils.tenMb) ) {
try ( BufferedReader in = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(this.bulkImportReportLocation, bulkImportReportId + ".json"))), FileUtils.twentyFiveKb) ) {
while ( (line = in.readLine()) != null )
stringBuilder.append(line).append("\n"); // The "readLine()" does not return the line-term char.
} catch (NoSuchFileException nsfe) {
@ -230,7 +236,16 @@ public class BulkImportController {
return ResponseEntity.internalServerError().body(errorMsg); // It's ok to give the file-path to the user, since the report already contains the file-path.
}
return ResponseEntity.ok().body(stringBuilder.toString());
String json = stringBuilder.toString().trim();
if ( prettyFormatting ) {
final Gson gson = new GsonBuilder().setPrettyPrinting().create();
try {
json = gson.toJson(JsonParser.parseString(json));
} catch (JsonParseException jpe) {
logger.error("Problem when parsing the json-string: " + jpe.getMessage() + "\nIt is not a valid json!\n" + json);
}
}
return ResponseEntity.ok().body(json);
}
}

View File

@ -64,13 +64,17 @@ public class BulkImportServiceImpl implements BulkImportService {
String bulkImportReportLocation = bulkImportReport.getReportLocation();
// Write to bulkImport-report file.
bulkImportReport.addEvent("Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + ".");
String msg = "Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + ".";
logger.info(msg);
bulkImportReport.addEvent(msg);
// Do not write immediately to the file, wait for the following checks.
String additionalLoggingMsg = " | provenance: \"" + provenance + "\" | bulkImportDir: \"" + bulkImportDirName + "\"";
if ( (ParquetFileUtils.payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((ParquetFileUtils.payloadsSchema = ParquetFileUtils.parseSchema(ParquetFileUtils.payloadSchemaFilePath)) == null ) ) {
String errorMsg = "The payloadsSchema could not be parsed!";
logger.error(errorMsg);
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
@ -79,7 +83,9 @@ public class BulkImportServiceImpl implements BulkImportService {
List<String> fileLocations = getFileLocationsInsideDir(bulkImportDirName); // the error-msg has already been written
if ( fileLocations == null ) {
bulkImportReport.addEvent("Could not retrieve the files for bulk-import!");
String errorMsg = "Could not retrieve the files for bulk-import!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
@ -103,7 +109,7 @@ public class BulkImportServiceImpl implements BulkImportService {
Files.createDirectories(Paths.get(localParquetDir)); // No-op if it already exists.
} catch (Exception e) {
String errorMsg = "Could not create the local parquet-directory: " + localParquetDir;
logger.error(errorMsg, e);
logger.error(errorMsg + additionalLoggingMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
@ -114,7 +120,7 @@ public class BulkImportServiceImpl implements BulkImportService {
String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir;
if ( ! parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick.
String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir;
logger.error(errorMsg);
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
@ -128,7 +134,9 @@ public class BulkImportServiceImpl implements BulkImportService {
List<List<String>> subLists = Lists.partition(fileLocations, sizeOfEachSegment); // Divide the initial list to "numOfThreadsForBulkImportProcedures" subLists. The last one may have marginally fewer files.
int subListsSize = subLists.size();
bulkImportReport.addEvent("Going to import the files in parallel, after dividing them in " + subListsSize + " segments.");
msg = "Going to bulk-import the " + numOfFiles + " files in parallel, after dividing them in " + subListsSize + " segments.";
logger.debug(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
for ( int i = 0; i < subListsSize; ++i ) {
@ -155,11 +163,11 @@ public class BulkImportServiceImpl implements BulkImportService {
// The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory.
} catch (ExecutionException ee) {
String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + "\n" + stackTraceMessage);
logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + additionalLoggingMsg + "\n" + stackTraceMessage);
} catch (CancellationException ce) {
logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage());
logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage() + additionalLoggingMsg);
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage());
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage() + additionalLoggingMsg);
}
}
} catch (Exception e) {
@ -175,7 +183,6 @@ public class BulkImportServiceImpl implements BulkImportService {
}
// Check the results.
String msg;
if ( numAllFailedFiles == numOfFiles ) {
String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!";
logger.error(errorMsg);
@ -196,7 +203,7 @@ public class BulkImportServiceImpl implements BulkImportService {
// Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations.
DatabaseConnector.databaseLock.lock();
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null); // msg is already logged
if ( mergeErrorMsg != null ) {
if ( mergeErrorMsg != null ) { // the message in already logged
DatabaseConnector.databaseLock.unlock();
bulkImportReport.addEvent(mergeErrorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
@ -245,8 +252,11 @@ public class BulkImportServiceImpl implements BulkImportService {
failedFiles.add(fileLocation);
}
if ( ((++counter) % 100) == 0 ) { // Every 100 files, report the status.
bulkImportReport.addEvent("Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files.");
if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status.
msg = "Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files.";
if ( logger.isTraceEnabled() )
logger.trace(msg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
}
@ -320,8 +330,7 @@ public class BulkImportServiceImpl implements BulkImportService {
segmentSuccessMsg = "As the user requested, the successfully imported files of " + provenance + " procedure, of bulk-import segment-" + segmentCounter + ", from directory " + bulkImportDirName + ", will be deleted.";
logger.info(segmentSuccessMsg);
bulkImportReport.addEvent(segmentSuccessMsg);
// Delete all files except the ones in the "failedHashSet"
// Delete all files except the ones in the "failedHashSet".
for ( String fileLocation : fileLocationsSegment ) {
if ( !failedFiles.contains(fileLocation) )
if ( !fileUtils.deleteFile(fileLocation) )

View File

@ -609,6 +609,8 @@ public class FileUtils {
}
public static final int twentyFiveKb = 25_600; // 25 Kb
public static final int halfMb = 524_288; // 0.5 Mb = 512 Kb
public static final int tenMb = (10 * 1_048_576);
public boolean saveArchive(HttpURLConnection conn, File zstdFile)
@ -703,7 +705,7 @@ public class FileUtils {
if ( shouldLockThreads ) // In case multiple threads write to the same file. for ex. during the bulk-import procedure.
fileWriteLock.lock();
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FileUtils.tenMb) )
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), halfMb) )
{
bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter.
} catch (Exception e) {