diff --git a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java index 3c25370..51309b5 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java @@ -1,5 +1,9 @@ package eu.openaire.urls_controller.controllers; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.models.BulkImportReport; import eu.openaire.urls_controller.models.BulkImportResponse; @@ -213,12 +217,14 @@ public class BulkImportController { @GetMapping(value = "getBulkImportReport", produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity getBulkImportReport(@RequestParam("id") String bulkImportReportId) + public ResponseEntity getBulkImportReport(@RequestParam("id") String bulkImportReportId, @RequestParam(name = "pretty", defaultValue = "false") boolean prettyFormatting) { + logger.info("Received a \"getBulkImportReport\" request for \"bulkImportReportId\": \"" + bulkImportReportId + "\"." + (prettyFormatting ? " Will return the report pretty-formatted." : "")); + // Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response. - StringBuilder stringBuilder = new StringBuilder(2_000); + final StringBuilder stringBuilder = new StringBuilder(25_000); String line; - try ( BufferedReader in = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(this.bulkImportReportLocation, bulkImportReportId + ".json"))), FileUtils.tenMb) ) { + try ( BufferedReader in = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(this.bulkImportReportLocation, bulkImportReportId + ".json"))), FileUtils.twentyFiveKb) ) { while ( (line = in.readLine()) != null ) stringBuilder.append(line).append("\n"); // The "readLine()" does not return the line-term char. } catch (NoSuchFileException nsfe) { @@ -230,7 +236,16 @@ public class BulkImportController { return ResponseEntity.internalServerError().body(errorMsg); // It's ok to give the file-path to the user, since the report already contains the file-path. } - return ResponseEntity.ok().body(stringBuilder.toString()); + String json = stringBuilder.toString().trim(); + if ( prettyFormatting ) { + final Gson gson = new GsonBuilder().setPrettyPrinting().create(); + try { + json = gson.toJson(JsonParser.parseString(json)); + } catch (JsonParseException jpe) { + logger.error("Problem when parsing the json-string: " + jpe.getMessage() + "\nIt is not a valid json!\n" + json); + } + } + return ResponseEntity.ok().body(json); } } diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index cc234b2..4397b6c 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -64,13 +64,17 @@ public class BulkImportServiceImpl implements BulkImportService { String bulkImportReportLocation = bulkImportReport.getReportLocation(); // Write to bulkImport-report file. - bulkImportReport.addEvent("Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + "."); + String msg = "Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + "."; + logger.info(msg); + bulkImportReport.addEvent(msg); // Do not write immediately to the file, wait for the following checks. + String additionalLoggingMsg = " | provenance: \"" + provenance + "\" | bulkImportDir: \"" + bulkImportDirName + "\""; + if ( (ParquetFileUtils.payloadsSchema == null) // Parse the schema if it's not already parsed. && ((ParquetFileUtils.payloadsSchema = ParquetFileUtils.parseSchema(ParquetFileUtils.payloadSchemaFilePath)) == null ) ) { String errorMsg = "The payloadsSchema could not be parsed!"; - logger.error(errorMsg); + logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); @@ -79,7 +83,9 @@ public class BulkImportServiceImpl implements BulkImportService { List fileLocations = getFileLocationsInsideDir(bulkImportDirName); // the error-msg has already been written if ( fileLocations == null ) { - bulkImportReport.addEvent("Could not retrieve the files for bulk-import!"); + String errorMsg = "Could not retrieve the files for bulk-import!"; + logger.error(errorMsg + additionalLoggingMsg); + bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; @@ -103,7 +109,7 @@ public class BulkImportServiceImpl implements BulkImportService { Files.createDirectories(Paths.get(localParquetDir)); // No-op if it already exists. } catch (Exception e) { String errorMsg = "Could not create the local parquet-directory: " + localParquetDir; - logger.error(errorMsg, e); + logger.error(errorMsg + additionalLoggingMsg, e); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); @@ -114,7 +120,7 @@ public class BulkImportServiceImpl implements BulkImportService { String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir; if ( ! parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick. String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir; - logger.error(errorMsg); + logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); @@ -128,7 +134,9 @@ public class BulkImportServiceImpl implements BulkImportService { List> subLists = Lists.partition(fileLocations, sizeOfEachSegment); // Divide the initial list to "numOfThreadsForBulkImportProcedures" subLists. The last one may have marginally fewer files. int subListsSize = subLists.size(); - bulkImportReport.addEvent("Going to import the files in parallel, after dividing them in " + subListsSize + " segments."); + msg = "Going to bulk-import the " + numOfFiles + " files in parallel, after dividing them in " + subListsSize + " segments."; + logger.debug(msg + additionalLoggingMsg); + bulkImportReport.addEvent(msg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); for ( int i = 0; i < subListsSize; ++i ) { @@ -155,11 +163,11 @@ public class BulkImportServiceImpl implements BulkImportService { // The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory. } catch (ExecutionException ee) { String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP). - logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + "\n" + stackTraceMessage); + logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + additionalLoggingMsg + "\n" + stackTraceMessage); } catch (CancellationException ce) { - logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage()); + logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage() + additionalLoggingMsg); } catch (IndexOutOfBoundsException ioobe) { - logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage()); + logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage() + additionalLoggingMsg); } } } catch (Exception e) { @@ -175,7 +183,6 @@ public class BulkImportServiceImpl implements BulkImportService { } // Check the results. - String msg; if ( numAllFailedFiles == numOfFiles ) { String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!"; logger.error(errorMsg); @@ -196,7 +203,7 @@ public class BulkImportServiceImpl implements BulkImportService { // Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations. DatabaseConnector.databaseLock.lock(); String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null); // msg is already logged - if ( mergeErrorMsg != null ) { + if ( mergeErrorMsg != null ) { // the message in already logged DatabaseConnector.databaseLock.unlock(); bulkImportReport.addEvent(mergeErrorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); @@ -245,8 +252,11 @@ public class BulkImportServiceImpl implements BulkImportService { failedFiles.add(fileLocation); } - if ( ((++counter) % 100) == 0 ) { // Every 100 files, report the status. - bulkImportReport.addEvent("Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files."); + if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status. + msg = "Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files."; + if ( logger.isTraceEnabled() ) + logger.trace(msg); + bulkImportReport.addEvent(msg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); } } @@ -320,8 +330,7 @@ public class BulkImportServiceImpl implements BulkImportService { segmentSuccessMsg = "As the user requested, the successfully imported files of " + provenance + " procedure, of bulk-import segment-" + segmentCounter + ", from directory " + bulkImportDirName + ", will be deleted."; logger.info(segmentSuccessMsg); bulkImportReport.addEvent(segmentSuccessMsg); - - // Delete all files except the ones in the "failedHashSet" + // Delete all files except the ones in the "failedHashSet". for ( String fileLocation : fileLocationsSegment ) { if ( !failedFiles.contains(fileLocation) ) if ( !fileUtils.deleteFile(fileLocation) ) diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index cb1cf66..a226f81 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -609,6 +609,8 @@ public class FileUtils { } + public static final int twentyFiveKb = 25_600; // 25 Kb + public static final int halfMb = 524_288; // 0.5 Mb = 512 Kb public static final int tenMb = (10 * 1_048_576); public boolean saveArchive(HttpURLConnection conn, File zstdFile) @@ -703,7 +705,7 @@ public class FileUtils { if ( shouldLockThreads ) // In case multiple threads write to the same file. for ex. during the bulk-import procedure. fileWriteLock.lock(); - try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FileUtils.tenMb) ) + try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), halfMb) ) { bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter. } catch (Exception e) {