Update Bulk-Import API:

- Increase the "numOfThreadsPerBulkImportProcedure" to 6.
- Fix Bulk import not working from a second-level subdirectory; the report-subDirectory was not created.
- Fix not returning the bulk-import-report as "application/json".
- Add useful messages for missing parameters.
- Change the HTTP-method for the "bulkImportFullTexts" endpoint to "POST".
- Show a structured json-response for the "bulkImportFullTexts" endpoint.
- Fix uncommon date-format.
- Remove single quotes from json-report, since they are returned as bytes, not characters.
- Optimize the generation of the json-bulkImport-report.
This commit is contained in:
Lampros Smyrnaios 2023-07-25 11:59:47 +03:00
parent 8d8a387ff2
commit 66a5b3c7da
6 changed files with 96 additions and 34 deletions

View File

@ -2,6 +2,7 @@ package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.models.BulkImportResponse;
import eu.openaire.urls_controller.services.BulkImportService;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
@ -9,11 +10,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.MissingServletRequestParameterException;
import org.springframework.web.bind.annotation.*;
import java.io.BufferedReader;
import java.io.File;
@ -71,9 +71,18 @@ public class BulkImportController {
}
private static final Pattern LAST_DIR_REGEX = Pattern.compile("^.*/([^/]+[/]?)$");
private static final Pattern LAST_DIR_REGEX = Pattern.compile("^.*/([^/]+/)$");
@GetMapping("bulkImportFullTexts")
// THis method shows the parameters which are missing when dealing with the bulk-import API.
// Spring Boot does not show any specific messages to the user (like stacktraces), for security reasons.
@ExceptionHandler(MissingServletRequestParameterException.class)
public ResponseEntity<?> handleMissingParams(MissingServletRequestParameterException ex) {
return ResponseEntity.badRequest().body(String.format("Missing parameter: %s\n", ex.getParameterName()));
}
@PostMapping("bulkImportFullTexts")
public ResponseEntity<?> bulkImportFullTexts(@RequestParam String provenance, @RequestParam String bulkImportDir, @RequestParam boolean shouldDeleteFilesOnFinish) {
BulkImport.BulkImportSource bulkImportSource = bulkImportSources.get(provenance);
@ -90,7 +99,7 @@ public class BulkImportController {
return ResponseEntity.badRequest().body(errorMsg);
}
String givenBulkDir = bulkImportDir; // Keep the given value here, to not expose the full-path, in case the user has not provided an absolut path.
String givenBulkDir = bulkImportDir; // Keep the given value here, to not expose the full-path, in case the user has not provided an absolute path.
// Make sure the whole path ends with "/", so that we can easily append file-names later.
if ( !bulkImportDir.endsWith("/") )
@ -118,6 +127,7 @@ public class BulkImportController {
}
// The "relativeBulkImportDir" should always be guaranteed to end with "/"! Otherwise, the import-procedure will fail.
logger.info("Received a \"bulkImportFullTexts\" request for \"" + provenance + "\" procedure and bulkImportDir: \"" + bulkImportDir + "\".");
// Check whether the given directory is accessible.
@ -136,7 +146,7 @@ public class BulkImportController {
return ResponseEntity.badRequest().body(errorMsg);
}
// The above check does not catch the case were the directory has at least one subdirectory, but no full-texts files.
// The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare.
// The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare and will be caught later on, after this procedure being accepted.
} catch (Exception e) {
String errorMsg = "Error when checking if the givenDir \"" + givenDir + "\" is empty!";
logger.error(errorMsg);
@ -158,7 +168,7 @@ public class BulkImportController {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(errorMsg);
}
Path currentBulkImportReportLocationDir = Paths.get(this.bulkImportReportLocation, provenance);
Path currentBulkImportReportLocationDir = Paths.get(this.bulkImportReportLocation, provenance, relativeBulkImportDir);
try {
Files.createDirectories(currentBulkImportReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException"
} catch (Exception e) {
@ -167,12 +177,13 @@ public class BulkImportController {
return ResponseEntity.internalServerError().body(errorMsg);
}
// Generate the "bulkImportReportID". We are removing the ending "slash" ("/") from the "relativeBulkImportDir".
String bulkImportReportID = provenance + "/" + relativeBulkImportDir.substring(0, (relativeBulkImportDir.length() -1)) + "_report_" + GenericUtils.getRandomNumber(10000, 99999);
String bulkImportReportFullPath = this.bulkImportReportLocation + bulkImportReportID + ".json";
String msg = "The 'bulkImportFullTexts' request for '" + provenance + "' procedure and bulkImportDir: '" + givenBulkDir + "' was accepted and will be scheduled for execution. "
String msg = "The bulkImportFullTexts request for " + provenance + " procedure and bulkImportDir: " + givenBulkDir + " was accepted and will be scheduled for execution. "
+ (shouldDeleteFilesOnFinish ? "The successfully imported files will be deleted." : "All files will remain inside the directory after processing.")
+ " You can request a report at any moment, using this reportFileID: " + bulkImportReportID;
+ " You can request a report at any moment, using the reportID.";
BulkImportReport bulkImportReport = new BulkImportReport(provenance, bulkImportReportFullPath, bulkImportReportID);
bulkImportReport.addEvent(msg);
@ -181,7 +192,7 @@ public class BulkImportController {
if ( errorMsg != null )
return ResponseEntity.internalServerError().body(errorMsg);
logger.info(msg);
logger.info(msg + " \"bulkImportReportID\": " + bulkImportReportID);
// Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours).
String finalBulkImportDir = bulkImportDir;
@ -190,11 +201,11 @@ public class BulkImportController {
bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish)
);
return ResponseEntity.ok().body(msg);
return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID));
}
@GetMapping("getBulkImportReport")
@GetMapping(value = "getBulkImportReport", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<?> getBulkImportReport(@RequestParam("id") String bulkImportReportId)
{
// Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response.

View File

@ -50,7 +50,7 @@ public class BulkImportReport {
{
//Convert the LinkedHashMultiMap<String, String> to Map<String, Collection<String>>, since Gson cannot serialize Multimaps.
eventsMap = eventsMultimap.asMap();
return gson.toJson(this);
return gson.toJson(this, BulkImportReport.class);
}
public String getProvenance() {

View File

@ -0,0 +1,51 @@
package eu.openaire.urls_controller.models;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"message",
"reportID"
})
public class BulkImportResponse {
@JsonProperty("message")
String message;
@JsonProperty("reportID")
String reportID;
public BulkImportResponse() {}
public BulkImportResponse(String message, String bulkImportReportID) {
this.message = message;
this.reportID = bulkImportReportID;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getReportID() {
return reportID;
}
public void setReportID(String reportID) {
this.reportID = reportID;
}
@Override
public String toString() {
return "BulkImportResponse{" +
"message='" + message + '\'' +
", reportID='" + reportID + '\'' +
'}';
}
}

View File

@ -64,12 +64,12 @@ public class BulkImportServiceImpl implements BulkImportService {
String bulkImportReportLocation = bulkImportReport.getReportLocation();
// Write to bulkImport-report file.
bulkImportReport.addEvent("Initializing the bulkImport '" + provenance + "' procedure with bulkImportDir '" + bulkImportDirName + "'.");
bulkImportReport.addEvent("Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + ".");
// Do not write immediately to the file, wait for the following checks.
if ( (ParquetFileUtils.payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((ParquetFileUtils.payloadsSchema = ParquetFileUtils.parseSchema(ParquetFileUtils.payloadSchemaFilePath)) == null ) ) {
String errorMsg = "The 'payloadsSchema' could not be parsed!";
String errorMsg = "The payloadsSchema could not be parsed!";
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
@ -113,7 +113,7 @@ public class BulkImportServiceImpl implements BulkImportService {
// Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before.
String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir;
if ( ! parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick.
String errorMsg = "Could not create the hdfs-directory: " + currentBulkImportHdfsDir;
String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir;
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
@ -123,8 +123,8 @@ public class BulkImportServiceImpl implements BulkImportService {
long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records.
List<Callable<Integer>> callables = new ArrayList<>(numOfFiles);
List<List<String>> subLists = Lists.partition(fileLocations, BulkImportController.numOfThreadsPerBulkImportProcedure); // Divide the initial list to "numOfBulkImportThreads" subLists. The last one may have marginally fewer files.
List<Callable<Integer>> callableTasksForFileSegments = new ArrayList<>(numOfFiles);
List<List<String>> subLists = Lists.partition(fileLocations, BulkImportController.numOfThreadsPerBulkImportProcedure); // Divide the initial list to "numOfThreadsPerBulkImportProcedure" subLists. The last one may have marginally fewer files.
int subListsSize = subLists.size();
bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel.");
@ -132,7 +132,7 @@ public class BulkImportServiceImpl implements BulkImportService {
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callables.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
callableTasksForFileSegments.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish);
});
}
@ -140,7 +140,7 @@ public class BulkImportServiceImpl implements BulkImportService {
int numFailedSegments = 0;
int numFailedFiles = 0;
try {
List<Future<Integer>> futures = BulkImportController.bulkImportExecutor.invokeAll(callables); // This waits for all tasks to finish.
List<Future<Integer>> futures = BulkImportController.bulkImportExecutor.invokeAll(callableTasksForFileSegments); // This waits for all tasks to finish.
int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i ) {
try {
@ -174,7 +174,7 @@ public class BulkImportServiceImpl implements BulkImportService {
// Check the results.
String msg;
if ( numFailedFiles == numOfFiles ) {
String errorMsg = "None of the files inside the bulkImportDir '" + bulkImportDirName + "' were imported!";
String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!";
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
@ -184,7 +184,7 @@ public class BulkImportServiceImpl implements BulkImportService {
msg = numFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName;
logger.warn(msg);
} else {
msg = "All " + numOfFiles + " files, from bulkImportDir '" + bulkImportDirName + "' were bulkImported successfully.";
msg = "All " + numOfFiles + " files, from bulkImportDir: " + bulkImportDirName + " were bulkImported successfully.";
logger.info(msg);
}
bulkImportReport.addEvent(msg);
@ -202,7 +202,7 @@ public class BulkImportServiceImpl implements BulkImportService {
}
ImpalaConnector.databaseLock.unlock();
String successMsg = "Finished the bulk-import procedure for '" + provenance + "' and bulkImportDir: " + bulkImportDirName;
String successMsg = "Finished the bulk-import procedure for " + provenance + " and bulkImportDir: " + bulkImportDirName;
logger.info(successMsg);
bulkImportReport.addEvent(successMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
@ -222,7 +222,7 @@ public class BulkImportServiceImpl implements BulkImportService {
String bulkImportReportLocation = bulkImportReport.getReportLocation();
int numOfFilesInSegment = fileLocationsSegment.size();
String msg = "Going to import " + numOfFilesInSegment + " files for segment-" + segmentCounter + " , of bulkImport procedure '" + provenance + "' | dir: '" + bulkImportDirName + "'..";
String msg = "Going to import " + numOfFilesInSegment + " files for segment-" + segmentCounter + " , of bulkImport procedure: " + provenance + " | dir: " + bulkImportDirName;
logger.debug(msg);
bulkImportReport.addEvent(msg);
@ -238,7 +238,7 @@ public class BulkImportServiceImpl implements BulkImportService {
if ( record != null )
payloadRecords.add(record);
else {
bulkImportReport.addEvent("An error caused the file: '" + fileLocation + "' to not be imported!");
bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!");
failedFiles.add(fileLocation);
}
@ -272,7 +272,7 @@ public class BulkImportServiceImpl implements BulkImportService {
logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG!
if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) {
bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: '" + parquetFileName + "'!");
bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: " + parquetFileName + " !");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
@ -284,7 +284,7 @@ public class BulkImportServiceImpl implements BulkImportService {
// Upload and insert the data to the "payload" Impala table. (no database-locking is required)
String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir);
if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller.
bulkImportReport.addEvent("Could not upload the parquet-file '" + parquetFileName + "' to HDFS!");
bulkImportReport.addEvent("Could not upload the parquet-file " + parquetFileName + " to HDFS!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
@ -308,7 +308,7 @@ public class BulkImportServiceImpl implements BulkImportService {
bulkImportReport.addEvent(segmentSuccessMsg);
if ( shouldDeleteFilesOnFinish ) {
segmentSuccessMsg = "As the user requested, the successfully imported files of '" + provenance + "' procedure, of bulk-import segment-" + segmentCounter + ", from directory '" + bulkImportDirName + "', will be deleted.";
segmentSuccessMsg = "As the user requested, the successfully imported files of " + provenance + " procedure, of bulk-import segment-" + segmentCounter + ", from directory " + bulkImportDirName + ", will be deleted.";
logger.info(segmentSuccessMsg);
bulkImportReport.addEvent(segmentSuccessMsg);
@ -316,7 +316,7 @@ public class BulkImportServiceImpl implements BulkImportService {
for ( String fileLocation : fileLocationsSegment ) {
if ( !failedFiles.contains(fileLocation) )
if ( !fileUtils.deleteFile(fileLocation) )
bulkImportReport.addEvent("The file '" + fileLocation + "' could not be deleted! Please make sure you have provided the WRITE-permission.");
bulkImportReport.addEvent("The file " + fileLocation + " could not be deleted! Please make sure you have provided the WRITE-permission.");
}
}

View File

@ -6,7 +6,7 @@ import java.util.Date;
public class GenericUtils {
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss.SSS z");
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z");
public static String getReadableCurrentTimeAndZone() {
return (simpleDateFormat.format(new Date(System.currentTimeMillis())));

View File

@ -39,7 +39,7 @@ services:
bulk-import:
baseBulkImportLocation: /mnt/bulk_import/
bulkImportReportLocation: /reports/bulkImportReports/
numOfThreadsPerBulkImportProcedure: 4
numOfThreadsPerBulkImportProcedure: 6
bulkImportSources: # These sources are accepted for bulk-import requests and are excluded from crawling.
arxivImport:
datasourceID: opendoar____::6f4922f45568161a8cdf4ad2299f6d23