2023-05-11 02:07:55 +02:00
package eu.openaire.urls_controller.controllers ;
2023-09-11 16:24:39 +02:00
import com.google.gson.Gson ;
import com.google.gson.GsonBuilder ;
import com.google.gson.JsonParseException ;
import com.google.gson.JsonParser ;
2023-05-11 02:07:55 +02:00
import eu.openaire.urls_controller.components.BulkImport ;
import eu.openaire.urls_controller.models.BulkImportReport ;
2023-07-25 10:59:47 +02:00
import eu.openaire.urls_controller.models.BulkImportResponse ;
2023-05-29 11:12:08 +02:00
import eu.openaire.urls_controller.services.BulkImportService ;
2023-05-11 02:07:55 +02:00
import eu.openaire.urls_controller.util.FileUtils ;
import eu.openaire.urls_controller.util.GenericUtils ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.http.HttpStatus ;
2023-07-25 10:59:47 +02:00
import org.springframework.http.MediaType ;
2023-05-11 02:07:55 +02:00
import org.springframework.http.ResponseEntity ;
2023-07-25 10:59:47 +02:00
import org.springframework.web.bind.MissingServletRequestParameterException ;
import org.springframework.web.bind.annotation.* ;
2023-05-11 02:07:55 +02:00
import java.io.BufferedReader ;
import java.io.File ;
import java.io.InputStreamReader ;
import java.nio.file.* ;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.Set ;
import java.util.concurrent.ConcurrentHashMap ;
2023-07-21 10:45:50 +02:00
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
2023-10-09 16:23:59 +02:00
import java.util.concurrent.RejectedExecutionException ;
2023-05-11 02:07:55 +02:00
import java.util.regex.Matcher ;
import java.util.regex.Pattern ;
@RestController
@RequestMapping ( " " )
2023-05-29 11:12:08 +02:00
public class BulkImportController {
2023-05-11 02:07:55 +02:00
2023-05-29 11:12:08 +02:00
private static final Logger logger = LoggerFactory . getLogger ( BulkImportController . class ) ;
2023-05-11 02:07:55 +02:00
@Autowired
private FileUtils fileUtils ;
2023-05-29 11:12:08 +02:00
private final BulkImportService bulkImportService ;
2023-05-11 02:07:55 +02:00
private final String baseBulkImportLocation ;
private final String bulkImportReportLocation ;
private final HashMap < String , BulkImport . BulkImportSource > bulkImportSources ;
2023-08-21 17:19:53 +02:00
public static final Set < String > bulkImportDirsUnderProcessing = Collections . newSetFromMap ( new ConcurrentHashMap < String , Boolean > ( ) ) ;
2023-05-11 02:07:55 +02:00
2023-08-21 17:19:53 +02:00
public static int numOfThreadsForBulkImportProcedures ;
2023-07-21 10:45:50 +02:00
public static ExecutorService bulkImportExecutor ;
2023-05-11 02:07:55 +02:00
2023-05-29 11:12:08 +02:00
public BulkImportController ( BulkImportService bulkImportService , BulkImport bulkImport )
2023-05-11 02:07:55 +02:00
{
2023-10-04 15:17:12 +02:00
String bulkImportReportLocationTemp ;
2023-05-11 02:07:55 +02:00
this . baseBulkImportLocation = bulkImport . getBaseBulkImportLocation ( ) ;
this . bulkImportSources = new HashMap < > ( bulkImport . getBulkImportSources ( ) ) ;
2023-10-04 15:17:12 +02:00
bulkImportReportLocationTemp = bulkImport . getBulkImportReportLocation ( ) ;
if ( ! bulkImportReportLocationTemp . endsWith ( " / " ) )
bulkImportReportLocationTemp + = " / " ;
this . bulkImportReportLocation = bulkImportReportLocationTemp ;
2023-05-11 02:07:55 +02:00
2023-05-29 11:12:08 +02:00
this . bulkImportService = bulkImportService ;
2023-07-21 10:45:50 +02:00
2023-08-21 17:19:53 +02:00
numOfThreadsForBulkImportProcedures = bulkImport . getNumOfThreadsForBulkImportProcedures ( ) ;
2023-08-30 13:02:54 +02:00
if ( numOfThreadsForBulkImportProcedures < = 0 )
throw new RuntimeException ( " The given \" numOfThreadsForBulkImportProcedures \" is not a positive number: " + numOfThreadsForBulkImportProcedures ) ;
2023-08-23 15:55:23 +02:00
logger . info ( " Will use " + numOfThreadsForBulkImportProcedures + " threads for the bulk-import procedures. " ) ;
bulkImportExecutor = Executors . newFixedThreadPool ( numOfThreadsForBulkImportProcedures ) ; // At most < numOfThreadsForBulkImportProcedures > threads will be used per bulk-import procedure..
2023-05-11 02:07:55 +02:00
}
2023-07-25 10:59:47 +02:00
private static final Pattern LAST_DIR_REGEX = Pattern . compile ( " ^.*/([^/]+/)$ " ) ;
2023-05-11 02:07:55 +02:00
2023-07-25 10:59:47 +02:00
2023-07-25 14:36:54 +02:00
// This method shows the parameters which are missing when dealing with the bulk-import API.
2023-07-25 10:59:47 +02:00
// Spring Boot does not show any specific messages to the user (like stacktraces), for security reasons.
@ExceptionHandler ( MissingServletRequestParameterException . class )
public ResponseEntity < ? > handleMissingParams ( MissingServletRequestParameterException ex ) {
return ResponseEntity . badRequest ( ) . body ( String . format ( " Missing parameter: %s \ n " , ex . getParameterName ( ) ) ) ;
}
@PostMapping ( " bulkImportFullTexts " )
2023-10-04 15:17:12 +02:00
public ResponseEntity < ? > bulkImportFullTexts ( @RequestParam String provenance , @RequestParam String bulkImportDir , @RequestParam boolean shouldDeleteFilesOnFinish )
{
2023-05-11 02:07:55 +02:00
BulkImport . BulkImportSource bulkImportSource = bulkImportSources . get ( provenance ) ;
if ( bulkImportSource = = null ) {
String errorMsg = " The provided provenance \" " + provenance + " \" is not in the list of the bulk-imported sources, so no configuration-rules are available! " ;
logger . error ( errorMsg ) ;
return ResponseEntity . badRequest ( ) . body ( errorMsg ) ; // It's the user's fault that gave an unsupported provenance.
}
// Check if the given directory parameter exists.
if ( bulkImportDir . isEmpty ( ) ) {
String errorMsg = " The \" bulkImportDir \" was missing from the request! " ;
logger . error ( errorMsg ) ;
return ResponseEntity . badRequest ( ) . body ( errorMsg ) ;
}
2023-07-25 10:59:47 +02:00
String givenBulkDir = bulkImportDir ; // Keep the given value here, to not expose the full-path, in case the user has not provided an absolute path.
2023-05-11 02:07:55 +02:00
// Make sure the whole path ends with "/", so that we can easily append file-names later.
if ( ! bulkImportDir . endsWith ( " / " ) )
bulkImportDir + = " / " ;
String relativeBulkImportDir = null ;
// Check if we have "relative-path" so that we can append it to the "baseBulkImportLocation".
if ( ! bulkImportDir . startsWith ( " / " ) ) {
// A relative path was given.
relativeBulkImportDir = bulkImportDir ;
bulkImportDir = baseBulkImportLocation + bulkImportDir ;
} else {
String errMsg = " The bulkImportDir \" " + bulkImportDir + " \" was problematic! " ;
Matcher matcher = LAST_DIR_REGEX . matcher ( bulkImportDir ) ;
if ( ! matcher . matches ( ) ) {
logger . error ( errMsg ) ;
return ResponseEntity . badRequest ( ) . body ( errMsg ) ;
}
relativeBulkImportDir = matcher . group ( 1 ) ;
if ( ( relativeBulkImportDir = = null ) | | relativeBulkImportDir . isEmpty ( ) ) {
logger . error ( errMsg ) ;
return ResponseEntity . badRequest ( ) . body ( errMsg ) ;
}
}
// The "relativeBulkImportDir" should always be guaranteed to end with "/"! Otherwise, the import-procedure will fail.
2023-07-25 10:59:47 +02:00
2023-05-11 02:07:55 +02:00
logger . info ( " Received a \" bulkImportFullTexts \" request for \" " + provenance + " \" procedure and bulkImportDir: \" " + bulkImportDir + " \" . " ) ;
// Check whether the given directory is accessible.
File givenDir = new File ( bulkImportDir ) ;
if ( ! givenDir . isDirectory ( ) ) {
String errorMsg = " The bulkImportDir \" " + bulkImportDir + " \" is invalid! " ;
logger . error ( errorMsg ) ;
return ResponseEntity . badRequest ( ) . body ( errorMsg ) ;
}
// Efficiently check if the dir is empty, without loading all the file-entries in memory.
try ( DirectoryStream < Path > directory = Files . newDirectoryStream ( givenDir . toPath ( ) ) ) {
if ( ! directory . iterator ( ) . hasNext ( ) ) {
String errorMsg = " The givenDir \" " + givenDir + " \" is empty! " ;
logger . warn ( errorMsg ) ;
return ResponseEntity . badRequest ( ) . body ( errorMsg ) ;
}
// The above check does not catch the case were the directory has at least one subdirectory, but no full-texts files.
2023-08-21 17:19:53 +02:00
// The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare and will be caught later on, after this procedure has been accepted.
2023-05-11 02:07:55 +02:00
} catch ( Exception e ) {
String errorMsg = " Error when checking if the givenDir \" " + givenDir + " \" is empty! " ;
logger . error ( errorMsg ) ;
return ResponseEntity . internalServerError ( ) . body ( errorMsg ) ;
}
2023-07-21 15:19:00 +02:00
// After applying all the logic-checks and informing the user of any mistake, then we check and inform if the Service is about to shut down.
if ( ShutdownController . shouldShutdownService ) {
String warnMsg = " The Service is about to shutdown, after all under-processing assignments and/or bulkImport requests are handled. No new requests are accepted! " ;
logger . warn ( warnMsg ) ;
return ResponseEntity . status ( HttpStatus . CONFLICT ) . body ( warnMsg ) ;
}
2023-05-11 02:07:55 +02:00
// Detect if the same directory is scheduled for being processed. In that case, return a 429.
2023-08-21 17:19:53 +02:00
if ( ! bulkImportDirsUnderProcessing . add ( bulkImportDir ) ) {
2023-05-11 02:07:55 +02:00
// We allow multiple jobs for the same provenance, running at the same time, but not multiple jobs for the same bulkImportDirectory.
String errorMsg = " There is a bulk-import request for the directory \" " + bulkImportDir + " \" that is being handled at the moment. Please wait until it's finished being processed, before making another request. " ;
logger . error ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . TOO_MANY_REQUESTS ) . body ( errorMsg ) ;
}
2023-07-25 10:59:47 +02:00
Path currentBulkImportReportLocationDir = Paths . get ( this . bulkImportReportLocation , provenance , relativeBulkImportDir ) ;
2023-05-11 02:07:55 +02:00
try {
Files . createDirectories ( currentBulkImportReportLocationDir ) ; // No-op if dir exists. It does not throw a "alreadyExistsException"
} catch ( Exception e ) {
2023-08-21 17:19:53 +02:00
String errorMsg = " Could not create the \" bulkImportReportLocation \" for provenance \" " + provenance + " \" : " + currentBulkImportReportLocationDir ;
2023-05-11 02:07:55 +02:00
logger . error ( errorMsg , e ) ;
2023-08-21 17:19:53 +02:00
bulkImportDirsUnderProcessing . remove ( bulkImportDir ) ;
2023-05-11 02:07:55 +02:00
return ResponseEntity . internalServerError ( ) . body ( errorMsg ) ;
}
2023-07-25 10:59:47 +02:00
// Generate the "bulkImportReportID". We are removing the ending "slash" ("/") from the "relativeBulkImportDir".
2023-08-21 17:19:53 +02:00
String bulkImportReportID = provenance + " / " + relativeBulkImportDir + " report_ " + GenericUtils . getRandomNumber ( 10000 , 99999 ) ;
2023-05-11 02:07:55 +02:00
String bulkImportReportFullPath = this . bulkImportReportLocation + bulkImportReportID + " .json " ;
2023-10-26 10:44:23 +02:00
String msg = " The bulkImportFullTexts request for " + provenance + " procedure and bulkImportDir: " + givenBulkDir + " was accepted and will be submitted for execution. "
2023-05-11 02:07:55 +02:00
+ ( shouldDeleteFilesOnFinish ? " The successfully imported files will be deleted. " : " All files will remain inside the directory after processing. " )
2023-07-25 10:59:47 +02:00
+ " You can request a report at any moment, using the reportID. " ;
2023-05-11 02:07:55 +02:00
BulkImportReport bulkImportReport = new BulkImportReport ( provenance , bulkImportReportFullPath , bulkImportReportID ) ;
bulkImportReport . addEvent ( msg ) ;
2023-05-24 12:52:28 +02:00
String errorMsg = fileUtils . writeToFile ( bulkImportReportFullPath , bulkImportReport . getJsonReport ( ) , true ) ;
2023-08-21 17:19:53 +02:00
if ( errorMsg ! = null ) {
bulkImportDirsUnderProcessing . remove ( bulkImportDir ) ;
2023-05-11 02:07:55 +02:00
return ResponseEntity . internalServerError ( ) . body ( errorMsg ) ;
2023-08-21 17:19:53 +02:00
}
2023-05-11 02:07:55 +02:00
2023-07-25 10:59:47 +02:00
logger . info ( msg + " \" bulkImportReportID \" : " + bulkImportReportID ) ;
2023-05-11 02:07:55 +02:00
// Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours).
String finalBulkImportDir = bulkImportDir ;
String finalRelativeBulkImportDir = relativeBulkImportDir ;
2023-10-09 16:23:59 +02:00
try {
UrlsController . futuresOfBackgroundTasks . add (
UrlsController . backgroundExecutor . submit (
( ) - > bulkImportService . bulkImportFullTextsFromDirectory ( bulkImportReport , finalRelativeBulkImportDir , finalBulkImportDir , givenDir , provenance , bulkImportSource , shouldDeleteFilesOnFinish )
)
) ;
} catch ( RejectedExecutionException ree ) {
errorMsg = " The bulkImport request for bulkImportReportLocation \" " + bulkImportReportLocation + " \" and provenance " + provenance + " has failed to be executed! " ;
bulkImportReport . addEvent ( msg ) ;
logger . error ( errorMsg , ree ) ;
return ResponseEntity . internalServerError ( ) . body ( errorMsg ) ;
}
2023-05-11 02:07:55 +02:00
2023-08-21 17:19:53 +02:00
// This directory, will be removed from "bulkImportDirsUnderProcessing", when the background job finishes.
2023-09-26 17:01:55 +02:00
return ResponseEntity . ok ( ) . body ( new BulkImportResponse ( msg , bulkImportReportID ) ) ; // The response is automatically serialized to json, and it has the type "application/json".
2023-05-11 02:07:55 +02:00
}
2023-07-25 10:59:47 +02:00
@GetMapping ( value = " getBulkImportReport " , produces = MediaType . APPLICATION_JSON_VALUE )
2023-09-11 16:24:39 +02:00
public ResponseEntity < ? > getBulkImportReport ( @RequestParam ( " id " ) String bulkImportReportId , @RequestParam ( name = " pretty " , defaultValue = " false " ) boolean prettyFormatting )
2023-05-11 02:07:55 +02:00
{
2023-09-11 16:24:39 +02:00
logger . info ( " Received a \" getBulkImportReport \" request for \" bulkImportReportId \" : \" " + bulkImportReportId + " \" . " + ( prettyFormatting ? " Will return the report pretty-formatted. " : " " ) ) ;
2023-10-09 16:23:59 +02:00
// Even if the Service is set to shut down soon, we allow this endpoint to return the report up to the last minute,
// since the Service may be up for another hour, running a bulk-import procedure, for which we want to check its progress.
2023-05-11 02:07:55 +02:00
// Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response.
2023-09-11 16:24:39 +02:00
final StringBuilder stringBuilder = new StringBuilder ( 25_000 ) ;
2023-05-11 02:07:55 +02:00
String line ;
2023-09-15 10:54:32 +02:00
FileUtils . fileAccessLock . lock ( ) ;
2023-09-11 16:24:39 +02:00
try ( BufferedReader in = new BufferedReader ( new InputStreamReader ( Files . newInputStream ( Paths . get ( this . bulkImportReportLocation , bulkImportReportId + " .json " ) ) ) , FileUtils . twentyFiveKb ) ) {
2023-05-11 02:07:55 +02:00
while ( ( line = in . readLine ( ) ) ! = null )
2023-10-04 15:17:12 +02:00
stringBuilder . append ( line ) . append ( GenericUtils . endOfLine ) ; // The "readLine()" does not return the line-term char.
2023-05-11 02:07:55 +02:00
} catch ( NoSuchFileException nsfe ) {
logger . warn ( " The requested report-file with ID: \" " + bulkImportReportId + " \" was not found! " ) ;
return ResponseEntity . notFound ( ) . build ( ) ;
} catch ( Exception e ) {
String errorMsg = " Failed to read the contents of report-file with ID: " + bulkImportReportId ;
logger . error ( errorMsg , e ) ;
return ResponseEntity . internalServerError ( ) . body ( errorMsg ) ; // It's ok to give the file-path to the user, since the report already contains the file-path.
2023-09-15 10:54:32 +02:00
} finally {
FileUtils . fileAccessLock . unlock ( ) ;
2023-05-11 02:07:55 +02:00
}
2023-09-11 16:24:39 +02:00
String json = stringBuilder . toString ( ) . trim ( ) ;
if ( prettyFormatting ) {
final Gson gson = new GsonBuilder ( ) . setPrettyPrinting ( ) . create ( ) ;
try {
json = gson . toJson ( JsonParser . parseString ( json ) ) ;
} catch ( JsonParseException jpe ) {
logger . error ( " Problem when parsing the json-string: " + jpe . getMessage ( ) + " \ nIt is not a valid json! \ n " + json ) ;
}
}
return ResponseEntity . ok ( ) . body ( json ) ;
2023-05-11 02:07:55 +02:00
}
}