2023-05-11 02:07:55 +02:00
package eu.openaire.urls_controller.services ;
import com.google.common.collect.Lists ;
import eu.openaire.urls_controller.components.BulkImport ;
2023-08-23 15:55:23 +02:00
import eu.openaire.urls_controller.configuration.DatabaseConnector ;
2023-05-29 11:12:08 +02:00
import eu.openaire.urls_controller.controllers.BulkImportController ;
2023-05-11 02:07:55 +02:00
import eu.openaire.urls_controller.models.BulkImportReport ;
import eu.openaire.urls_controller.models.DocFileData ;
import eu.openaire.urls_controller.models.FileLocationData ;
import eu.openaire.urls_controller.util.FileUtils ;
import eu.openaire.urls_controller.util.GenericUtils ;
import eu.openaire.urls_controller.util.ParquetFileUtils ;
import org.apache.avro.generic.GenericData ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.dao.EmptyResultDataAccessException ;
import org.springframework.jdbc.core.JdbcTemplate ;
import org.springframework.stereotype.Service ;
import javax.xml.bind.DatatypeConverter ;
import java.io.File ;
2023-10-04 12:01:13 +02:00
import java.net.ConnectException ;
2023-05-11 02:07:55 +02:00
import java.nio.file.Files ;
import java.nio.file.Path ;
import java.nio.file.Paths ;
import java.security.MessageDigest ;
import java.sql.Types ;
import java.util.ArrayList ;
import java.util.HashSet ;
import java.util.List ;
2023-07-21 10:45:50 +02:00
import java.util.concurrent.Callable ;
import java.util.concurrent.CancellationException ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.Future ;
2023-05-11 02:07:55 +02:00
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
@Service
2023-05-29 11:12:08 +02:00
public class BulkImportServiceImpl implements BulkImportService {
2023-05-11 02:07:55 +02:00
2023-05-29 11:12:08 +02:00
private static final Logger logger = LoggerFactory . getLogger ( BulkImportServiceImpl . class ) ;
2023-05-11 02:07:55 +02:00
@Autowired
private FileUtils fileUtils ;
@Autowired
private ParquetFileUtils parquetFileUtils ;
@Autowired
private JdbcTemplate jdbcTemplate ;
/ * *
* Given a directory with full - text - files , this method imports the full - texts files in the PDF Aggregation Service .
* Also , it provides the guarantee that the failed files will not be deleted ! A file can " fail " if any of the expected results fail ( upload - to - S3 , parquet - creation and upload , load - to - db , ect )
* * /
public Boolean bulkImportFullTextsFromDirectory ( BulkImportReport bulkImportReport , String relativeBulkImportDir , String bulkImportDirName , File bulkImportDir , String provenance , BulkImport . BulkImportSource bulkImportSource , boolean shouldDeleteFilesOnFinish )
{
String bulkImportReportLocation = bulkImportReport . getReportLocation ( ) ;
// Write to bulkImport-report file.
2023-09-11 16:24:39 +02:00
String msg = " Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + " . " ;
logger . info ( msg ) ;
bulkImportReport . addEvent ( msg ) ;
2023-05-11 02:07:55 +02:00
// Do not write immediately to the file, wait for the following checks.
2023-09-11 16:24:39 +02:00
String additionalLoggingMsg = " | provenance: \" " + provenance + " \" | bulkImportDir: \" " + bulkImportDirName + " \" " ;
2023-05-11 02:07:55 +02:00
if ( ( ParquetFileUtils . payloadsSchema = = null ) // Parse the schema if it's not already parsed.
& & ( ( ParquetFileUtils . payloadsSchema = ParquetFileUtils . parseSchema ( ParquetFileUtils . payloadSchemaFilePath ) ) = = null ) ) {
2023-07-25 10:59:47 +02:00
String errorMsg = " The payloadsSchema could not be parsed! " ;
2023-09-11 16:24:39 +02:00
logger . error ( errorMsg + additionalLoggingMsg ) ;
2023-05-11 02:07:55 +02:00
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-08-21 17:19:53 +02:00
BulkImportController . bulkImportDirsUnderProcessing . remove ( bulkImportDirName ) ;
2023-05-11 02:07:55 +02:00
return false ;
}
2023-09-04 15:33:27 +02:00
List < String > fileLocations = getFileLocationsInsideDir ( bulkImportDirName ) ; // the error-msg has already been written
2023-05-11 02:07:55 +02:00
if ( fileLocations = = null ) {
2023-09-11 16:24:39 +02:00
String errorMsg = " Could not retrieve the files for bulk-import! " ;
logger . error ( errorMsg + additionalLoggingMsg ) ;
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-08-21 17:19:53 +02:00
BulkImportController . bulkImportDirsUnderProcessing . remove ( bulkImportDirName ) ;
2023-05-11 02:07:55 +02:00
return false ;
}
int numOfFiles = fileLocations . size ( ) ;
if ( numOfFiles = = 0 ) {
String errorMsg = " No files were found inside the bulkImportDir: " + bulkImportDirName ;
logger . warn ( errorMsg ) ;
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-08-21 17:19:53 +02:00
BulkImportController . bulkImportDirsUnderProcessing . remove ( bulkImportDirName ) ;
2023-05-11 02:07:55 +02:00
return false ;
}
2023-05-15 11:44:16 +02:00
if ( logger . isTraceEnabled ( ) )
2023-10-04 15:17:12 +02:00
logger . trace ( " fileLocations: " + additionalLoggingMsg + GenericUtils . endOfLine + fileLocations ) ;
2023-05-11 02:07:55 +02:00
String localParquetDir = parquetFileUtils . parquetBaseLocalDirectoryPath + " bulk_import_ " + provenance + File . separator + relativeBulkImportDir ; // This ends with "/".
try {
Files . createDirectories ( Paths . get ( localParquetDir ) ) ; // No-op if it already exists.
} catch ( Exception e ) {
String errorMsg = " Could not create the local parquet-directory: " + localParquetDir ;
2023-09-11 16:24:39 +02:00
logger . error ( errorMsg + additionalLoggingMsg , e ) ;
2023-05-11 02:07:55 +02:00
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-08-21 17:19:53 +02:00
BulkImportController . bulkImportDirsUnderProcessing . remove ( bulkImportDirName ) ;
2023-05-11 02:07:55 +02:00
return false ;
}
// Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before.
String currentBulkImportHdfsDir = parquetFileUtils . parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir ;
if ( ! parquetFileUtils . applyHDFOperation ( parquetFileUtils . webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils . mkDirsAndParams ) ) { // N0-op if it already exists. It is very quick.
2023-07-25 10:59:47 +02:00
String errorMsg = " Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir ;
2023-09-11 16:24:39 +02:00
logger . error ( errorMsg + additionalLoggingMsg ) ;
2023-05-11 02:07:55 +02:00
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-08-21 17:19:53 +02:00
BulkImportController . bulkImportDirsUnderProcessing . remove ( bulkImportDirName ) ;
2023-05-11 02:07:55 +02:00
return false ;
}
long timeMillis = System . currentTimeMillis ( ) ; // Store it here, in order to have the same for all current records.
2023-07-25 10:59:47 +02:00
List < Callable < Integer > > callableTasksForFileSegments = new ArrayList < > ( numOfFiles ) ;
2023-08-21 17:19:53 +02:00
int sizeOfEachSegment = ( numOfFiles / BulkImportController . numOfThreadsForBulkImportProcedures ) ;
2023-08-23 15:55:23 +02:00
List < List < String > > subLists = Lists . partition ( fileLocations , sizeOfEachSegment ) ; // Divide the initial list to "numOfThreadsForBulkImportProcedures" subLists. The last one may have marginally fewer files.
2023-05-11 02:07:55 +02:00
int subListsSize = subLists . size ( ) ;
2023-09-11 16:24:39 +02:00
msg = " Going to bulk-import the " + numOfFiles + " files in parallel, after dividing them in " + subListsSize + " segments. " ;
logger . debug ( msg + additionalLoggingMsg ) ;
bulkImportReport . addEvent ( msg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-05-11 02:07:55 +02:00
for ( int i = 0 ; i < subListsSize ; + + i ) {
int finalI = i ;
2023-07-25 10:59:47 +02:00
callableTasksForFileSegments . add ( ( ) - > { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
2023-09-20 16:50:10 +02:00
return processBulkImportedFilesSegment ( bulkImportReport , finalI , subLists . get ( finalI ) , bulkImportDirName , localParquetDir , currentBulkImportHdfsDir , provenance , bulkImportSource , timeMillis , shouldDeleteFilesOnFinish , additionalLoggingMsg ) ;
2023-05-11 02:07:55 +02:00
} ) ;
}
int numFailedSegments = 0 ;
2023-08-21 17:19:53 +02:00
int numFailedFilesForSegment = 0 ;
int numAllFailedFiles = 0 ;
2023-05-11 02:07:55 +02:00
try {
2023-07-25 10:59:47 +02:00
List < Future < Integer > > futures = BulkImportController . bulkImportExecutor . invokeAll ( callableTasksForFileSegments ) ; // This waits for all tasks to finish.
2023-08-21 17:19:53 +02:00
int sizeOfFutures = futures . size ( ) ; // This is the same as the "subListsSize".
for ( int i = 0 ; i < sizeOfFutures ; + + i )
{ // For each segment..
2023-05-11 02:07:55 +02:00
try {
2023-08-21 17:19:53 +02:00
numFailedFilesForSegment = futures . get ( i ) . get ( ) ;
numAllFailedFiles + = numFailedFilesForSegment ;
if ( numFailedFilesForSegment = = subLists . get ( i ) . size ( ) )
2023-05-11 02:07:55 +02:00
numFailedSegments + + ;
// In case all the files failed to be bulk-imported, then we will detect it in the "numSuccessfulSegments"-check later.
// The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory.
} catch ( ExecutionException ee ) {
String stackTraceMessage = GenericUtils . getSelectiveStackTrace ( ee , null , 15 ) ; // These can be serious errors like an "out of memory exception" (Java HEAP).
2023-10-04 15:17:12 +02:00
logger . error ( " Task_ " + ( i + 1 ) + " failed with: " + ee . getMessage ( ) + additionalLoggingMsg + GenericUtils . endOfLine + stackTraceMessage ) ;
2023-05-11 02:07:55 +02:00
} catch ( CancellationException ce ) {
2023-09-11 16:24:39 +02:00
logger . error ( " Task_ " + ( i + 1 ) + " was cancelled: " + ce . getMessage ( ) + additionalLoggingMsg ) ;
2023-05-11 02:07:55 +02:00
} catch ( IndexOutOfBoundsException ioobe ) {
2023-09-11 16:24:39 +02:00
logger . error ( " IOOBE for task_ " + i + " in the futures-list! " + ioobe . getMessage ( ) + additionalLoggingMsg ) ;
2023-05-11 02:07:55 +02:00
}
}
} catch ( Exception e ) {
String errorMsg = " An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName ;
logger . error ( errorMsg , e ) ;
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-08-21 17:19:53 +02:00
BulkImportController . bulkImportDirsUnderProcessing . remove ( bulkImportDirName ) ;
2023-05-11 02:07:55 +02:00
return false ;
} finally {
logger . debug ( " Deleting local parquet directory: " + localParquetDir ) ;
fileUtils . deleteDirectory ( new File ( localParquetDir ) ) ; // It may not exist at all, if none of the parquet files were created.
}
// Check the results.
2023-08-21 17:19:53 +02:00
if ( numAllFailedFiles = = numOfFiles ) {
2023-07-25 10:59:47 +02:00
String errorMsg = " None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported! " ;
2023-05-11 02:07:55 +02:00
logger . error ( errorMsg ) ;
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-08-21 17:19:53 +02:00
BulkImportController . bulkImportDirsUnderProcessing . remove ( bulkImportDirName ) ;
2023-05-11 02:07:55 +02:00
return false ;
2023-08-21 17:19:53 +02:00
} else if ( numAllFailedFiles > 0 ) { // Some failed, but not all.
msg = numAllFailedFiles + " files " + ( numFailedSegments > 0 ? ( " and " + numFailedSegments + " whole segments " ) : " " ) + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName ;
2023-05-11 02:07:55 +02:00
logger . warn ( msg ) ;
} else {
2023-07-25 10:59:47 +02:00
msg = " All " + numOfFiles + " files, from bulkImportDir: " + bulkImportDirName + " were bulkImported successfully. " ;
2023-05-11 02:07:55 +02:00
logger . info ( msg ) ;
}
bulkImportReport . addEvent ( msg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-05-11 02:07:55 +02:00
// Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations.
2023-08-23 15:55:23 +02:00
DatabaseConnector . databaseLock . lock ( ) ;
2023-09-04 15:33:27 +02:00
String mergeErrorMsg = fileUtils . mergeParquetFiles ( " payload_bulk_import " , " " , null ) ; // msg is already logged
2023-09-11 16:24:39 +02:00
if ( mergeErrorMsg ! = null ) { // the message in already logged
2023-08-23 15:55:23 +02:00
DatabaseConnector . databaseLock . unlock ( ) ;
2023-05-11 02:07:55 +02:00
bulkImportReport . addEvent ( mergeErrorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-08-21 17:19:53 +02:00
BulkImportController . bulkImportDirsUnderProcessing . remove ( bulkImportDirName ) ;
2023-05-11 02:07:55 +02:00
return false ;
}
2023-08-23 15:55:23 +02:00
DatabaseConnector . databaseLock . unlock ( ) ;
2023-05-11 02:07:55 +02:00
2023-07-25 10:59:47 +02:00
String successMsg = " Finished the bulk-import procedure for " + provenance + " and bulkImportDir: " + bulkImportDirName ;
2023-05-11 02:07:55 +02:00
logger . info ( successMsg ) ;
bulkImportReport . addEvent ( successMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
// The report-file will be overwritten every now and then, instead of appended, since we want to add an update new JSON report-object each time.
// Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time,
// after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours).
2023-05-11 02:07:55 +02:00
2023-08-21 17:19:53 +02:00
BulkImportController . bulkImportDirsUnderProcessing . remove ( bulkImportDirName ) ;
2023-05-11 02:07:55 +02:00
return true ;
}
private int processBulkImportedFilesSegment ( BulkImportReport bulkImportReport , int segmentCounter , List < String > fileLocationsSegment , String bulkImportDirName , String localParquetDir , String currentBulkImportHdfsDir ,
2023-09-20 16:50:10 +02:00
String provenance , BulkImport . BulkImportSource bulkImportSource , long timeMillis , boolean shouldDeleteFilesOnFinish , String additionalLoggingMsg )
2023-05-11 02:07:55 +02:00
{
// Inside this thread, process a segment of the files.
String bulkImportReportLocation = bulkImportReport . getReportLocation ( ) ;
int numOfFilesInSegment = fileLocationsSegment . size ( ) ;
2023-09-20 16:50:10 +02:00
String msg = " Going to import " + numOfFilesInSegment + " files, for segment- " + segmentCounter ;
logger . debug ( msg + additionalLoggingMsg ) ;
2023-05-11 02:07:55 +02:00
bulkImportReport . addEvent ( msg ) ;
List < GenericData . Record > payloadRecords = new ArrayList < > ( numOfFilesInSegment ) ;
// Use a HashSet for the failed files, in order to not remove them in the end.
HashSet < String > failedFiles = new HashSet < > ( ) ;
int counter = 0 ;
// Upload files to S3 and collect payloadRecords.
2023-10-04 12:01:13 +02:00
for ( int i = 0 ; i < numOfFilesInSegment ; + + i ) {
String fileLocation = fileLocationsSegment . get ( i ) ;
GenericData . Record record = null ;
try {
record = processBulkImportedFile ( fileLocation , provenance , bulkImportSource , timeMillis , additionalLoggingMsg ) ;
} catch ( ConnectException ce ) {
String errorMsg = " ConnectException when uploading the files of segment_ " + segmentCounter + " to the S3 Object Store. Will avoid uploading any file for this segment.. " ;
logger . error ( errorMsg + additionalLoggingMsg ) ;
bulkImportReport . addEvent ( errorMsg ) ;
for ( int j = i ; j < numOfFilesInSegment ; + + j )
failedFiles . add ( fileLocationsSegment . get ( j ) ) ; // The rest of the files are considered "failed".
break ;
}
2023-05-11 02:07:55 +02:00
if ( record ! = null )
payloadRecords . add ( record ) ;
else {
2023-07-25 10:59:47 +02:00
bulkImportReport . addEvent ( " An error caused the file: " + fileLocation + " to not be imported! " ) ;
2023-05-11 02:07:55 +02:00
failedFiles . add ( fileLocation ) ;
}
2023-09-20 16:50:10 +02:00
if ( ( ( + + counter ) % 150 ) = = 0 ) { // Every 150 files, report the status for this segment.
2023-09-11 16:24:39 +02:00
msg = " Progress for segment- " + segmentCounter + " : " + payloadRecords . size ( ) + " files have been imported and " + failedFiles . size ( ) + " have failed, out of " + numOfFilesInSegment + " files. " ;
if ( logger . isTraceEnabled ( ) )
2023-09-20 16:50:10 +02:00
logger . trace ( msg + additionalLoggingMsg ) ;
2023-09-11 16:24:39 +02:00
bulkImportReport . addEvent ( msg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-05-11 02:07:55 +02:00
}
}
int numOfPayloadRecords = payloadRecords . size ( ) ;
if ( numOfPayloadRecords = = 0 ) {
2023-08-21 17:19:53 +02:00
String errorMsg = " No payload-records were generated for any of the files, of segment- " + segmentCounter + " , inside the bulkImportDir: " + bulkImportDirName ;
2023-05-11 02:07:55 +02:00
logger . warn ( errorMsg ) ;
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-05-11 02:07:55 +02:00
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment ;
} else if ( numOfPayloadRecords ! = numOfFilesInSegment ) {
// Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early.
2023-08-21 17:19:53 +02:00
String errorMsg = failedFiles . size ( ) + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment- " + segmentCounter + " ! " ;
2023-09-20 16:50:10 +02:00
logger . warn ( errorMsg + additionalLoggingMsg ) ;
2023-05-11 02:07:55 +02:00
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-05-11 02:07:55 +02:00
}
2023-08-21 17:19:53 +02:00
// Construct the parquet file, upload it to HDFS and load it in the "payload_bulk_import" table.
2023-05-11 02:07:55 +02:00
String parquetFileName = " payloads_ " + segmentCounter + " .parquet " ;
String fullLocalParquetFilePath = localParquetDir + parquetFileName ;
2023-05-15 11:44:16 +02:00
if ( logger . isTraceEnabled ( ) )
2023-09-20 16:50:10 +02:00
logger . trace ( " Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath + additionalLoggingMsg ) ; // DEBUG!
2023-05-11 02:07:55 +02:00
2023-09-04 15:33:27 +02:00
if ( ! parquetFileUtils . writeToParquet ( payloadRecords , ParquetFileUtils . payloadsSchema , fullLocalParquetFilePath ) ) { // Any errorMsg is already logged inside.
String errorMsg = " Could not write the payload-records for segment- " + segmentCounter + " to the parquet-file: " + parquetFileName + " ! " ;
2023-09-20 16:50:10 +02:00
logger . error ( errorMsg + additionalLoggingMsg ) ;
2023-09-04 15:33:27 +02:00
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-05-11 02:07:55 +02:00
// None of the files of this segment will be deleted, in any case.
2023-08-21 17:19:53 +02:00
return numOfFilesInSegment ; // All files of this segment have failed.
2023-05-11 02:07:55 +02:00
}
2023-05-15 11:44:16 +02:00
if ( logger . isTraceEnabled ( ) )
2023-09-20 16:50:10 +02:00
logger . trace ( " Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS. " + additionalLoggingMsg ) ; // DEBUG!
2023-05-11 18:40:48 +02:00
2023-05-11 02:07:55 +02:00
// Upload and insert the data to the "payload" Impala table. (no database-locking is required)
2023-09-04 15:33:27 +02:00
String errorMsg = parquetFileUtils . uploadParquetFileToHDFS ( fullLocalParquetFilePath , parquetFileName , currentBulkImportHdfsDir ) ; // The returned message is already logged inside.
2023-05-11 02:07:55 +02:00
if ( errorMsg ! = null ) { // The possible error-message returned, is already logged by the Controller.
2023-09-04 15:33:27 +02:00
errorMsg = " Could not upload the parquet-file " + parquetFileName + " to HDFS! " ;
2023-09-20 16:50:10 +02:00
logger . error ( errorMsg + additionalLoggingMsg ) ;
2023-09-04 15:33:27 +02:00
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-05-11 02:07:55 +02:00
// None of the files of this segment will be deleted, in any case.
2023-08-21 17:19:53 +02:00
return numOfFilesInSegment ; // All files of this segment have failed.
2023-05-11 02:07:55 +02:00
}
2023-05-15 11:44:16 +02:00
if ( logger . isTraceEnabled ( ) )
2023-09-20 16:50:10 +02:00
logger . trace ( " Going to load the data of parquet-file: \" " + parquetFileName + " \" to the database-table: \" payload_bulk_import \" . " + additionalLoggingMsg ) ; // DEBUG!
2023-05-11 18:40:48 +02:00
2023-08-23 15:55:23 +02:00
DatabaseConnector . databaseLock . lock ( ) ;
2023-05-11 02:07:55 +02:00
if ( ! parquetFileUtils . loadParquetDataIntoTable ( ( currentBulkImportHdfsDir + parquetFileName ) , " payload_bulk_import " ) ) {
2023-08-23 15:55:23 +02:00
DatabaseConnector . databaseLock . unlock ( ) ;
2023-09-04 15:33:27 +02:00
errorMsg = " Could not load the payload-records to the database, for segment- " + segmentCounter + " ! " ;
2023-09-20 16:50:10 +02:00
logger . error ( errorMsg + additionalLoggingMsg ) ;
2023-09-04 15:33:27 +02:00
bulkImportReport . addEvent ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
fileUtils . writeToFile ( bulkImportReportLocation , bulkImportReport . getJsonReport ( ) , true ) ;
2023-05-11 02:07:55 +02:00
// None of the files of this segment will be deleted, in any case.
2023-08-21 17:19:53 +02:00
return numOfFilesInSegment ; // All files of this segment have failed.
2023-05-11 02:07:55 +02:00
}
2023-08-23 15:55:23 +02:00
DatabaseConnector . databaseLock . unlock ( ) ;
2023-05-11 02:07:55 +02:00
2023-08-21 17:19:53 +02:00
String segmentSuccessMsg = " Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + " , for segment- " + segmentCounter + " . " ;
2023-09-20 16:50:10 +02:00
logger . info ( segmentSuccessMsg + additionalLoggingMsg ) ;
2023-05-11 02:07:55 +02:00
bulkImportReport . addEvent ( segmentSuccessMsg ) ;
if ( shouldDeleteFilesOnFinish ) {
2023-07-25 10:59:47 +02:00
segmentSuccessMsg = " As the user requested, the successfully imported files of " + provenance + " procedure, of bulk-import segment- " + segmentCounter + " , from directory " + bulkImportDirName + " , will be deleted. " ;
2023-05-11 02:07:55 +02:00
logger . info ( segmentSuccessMsg ) ;
bulkImportReport . addEvent ( segmentSuccessMsg ) ;
2023-09-11 16:24:39 +02:00
// Delete all files except the ones in the "failedHashSet".
2023-05-11 02:07:55 +02:00
for ( String fileLocation : fileLocationsSegment ) {
if ( ! failedFiles . contains ( fileLocation ) )
if ( ! fileUtils . deleteFile ( fileLocation ) )
2023-07-25 10:59:47 +02:00
bulkImportReport . addEvent ( " The file " + fileLocation + " could not be deleted! Please make sure you have provided the WRITE-permission. " ) ;
2023-05-11 02:07:55 +02:00
}
}
return ( numOfFilesInSegment - numOfPayloadRecords ) ; // Return the numOfFailedFiles.
}
2023-09-20 16:50:10 +02:00
private GenericData . Record processBulkImportedFile ( String fileLocation , String provenance , BulkImport . BulkImportSource bulkImportSource , long timeMillis , String additionalLoggingMsg )
2023-10-04 12:01:13 +02:00
throws ConnectException
2023-05-11 02:07:55 +02:00
{
File fullTextFile = new File ( fileLocation ) ;
DocFileData docFileData = new DocFileData ( fullTextFile , null , null , null ) ;
docFileData . calculateAndSetHashAndSize ( ) ;
// Check if this file is already found by crawling. Even though we started excluding this datasource from crawling, many full-texts have already been downloaded.
// Also, it may be the case that this file was downloaded by another datasource.
FileLocationData fileLocationData ;
try {
fileLocationData = new FileLocationData ( fileLocation ) ;
} catch ( RuntimeException re ) {
2023-09-20 16:50:10 +02:00
logger . error ( re . getMessage ( ) + additionalLoggingMsg ) ;
2023-05-11 02:07:55 +02:00
return null ;
}
2023-05-11 18:40:48 +02:00
String fileHash = docFileData . getHash ( ) ;
if ( fileHash = = null )
return null ; // No check of past found full-text can be made nor the S3-fileName can be created.
2023-05-11 02:07:55 +02:00
String datasourceId = bulkImportSource . getDatasourceID ( ) ;
String datasourcePrefix = bulkImportSource . getDatasourcePrefix ( ) ;
String fileNameID = fileLocationData . getFileNameID ( ) ;
2023-05-11 18:40:48 +02:00
String actualUrl = ( bulkImportSource . getPdfUrlPrefix ( ) + fileNameID ) ; // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources.
2023-05-11 02:07:55 +02:00
String originalUrl = actualUrl ; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link.
2023-08-23 15:55:23 +02:00
final String getFileLocationForHashQuery = " select `location` from " + DatabaseConnector . databaseName + " .payload where `hash` = ? limit 1 " ;
2023-05-11 02:07:55 +02:00
final int [ ] hashArgType = new int [ ] { Types . VARCHAR } ;
String alreadyFoundFileLocation = null ;
2023-08-23 15:55:23 +02:00
DatabaseConnector . databaseLock . lock ( ) ;
2023-05-11 02:07:55 +02:00
try {
alreadyFoundFileLocation = jdbcTemplate . queryForObject ( getFileLocationForHashQuery , new Object [ ] { fileHash } , hashArgType , String . class ) ;
} catch ( EmptyResultDataAccessException erdae ) {
// No fileLocation is found, it's ok. It will be null by default.
} catch ( Exception e ) {
2023-09-20 16:50:10 +02:00
logger . error ( " Error when executing or acquiring data from the the 'getFileLocationForHashQuery'! " + additionalLoggingMsg , e ) ;
2023-05-11 02:07:55 +02:00
// Continue with bulk-importing the file and uploading it to S3.
} finally {
2023-08-23 15:55:23 +02:00
DatabaseConnector . databaseLock . unlock ( ) ;
2023-05-11 02:07:55 +02:00
}
2023-10-05 15:31:52 +02:00
String openAireId = generateOpenaireId ( fileNameID , datasourcePrefix , bulkImportSource . getIsAuthoritative ( ) ) ;
2023-09-26 17:01:55 +02:00
if ( openAireId = = null )
2023-05-11 02:07:55 +02:00
return null ;
String s3Url = null ;
if ( alreadyFoundFileLocation ! = null ) // If the full-text of this record is already-found and uploaded.
{
// This full-text was found to already be in the database.
// If it has the same datasourceID, then it likely was crawled before from an ID belonging to this datasource.
// If also has the same ID, then the exact same record from that datasource was retrieved previously.
// Else, the file was downloaded by another record of this datasource.
// ELse if the datasourceID is not the same, then the same file was retrieved from another datasource.
// The above analysis is educational, it does not need to take place and is not currently used.
s3Url = alreadyFoundFileLocation ;
2023-10-04 12:01:13 +02:00
} else
s3Url = fileUtils . constructS3FilenameAndUploadToS3 ( fileLocationData . getFileDir ( ) , fileLocationData . getFileName ( ) , fileNameID , fileLocationData . getDotFileExtension ( ) , datasourceId , fileHash ) ;
2023-05-11 02:07:55 +02:00
2023-10-17 11:50:51 +02:00
return parquetFileUtils . getPayloadParquetRecord ( openAireId , originalUrl , actualUrl , timeMillis , bulkImportSource . getMimeType ( ) ,
docFileData . getSize ( ) , fileHash , s3Url , provenance , true ) ; // It may return null.
2023-05-11 02:07:55 +02:00
}
public List < String > getFileLocationsInsideDir ( String directory )
{
List < String > fileLocations = null ;
try ( Stream < Path > walkStream = Files . find ( Paths . get ( directory ) , Integer . MAX_VALUE , ( filePath , fileAttr ) - > fileAttr . isRegularFile ( ) ) )
// In case we ever include other type-of-Files inside the same directory, we need to add this filter: "&& !filePath.toString().endsWith("name.ext")"
{
fileLocations = walkStream . map ( Path : : toString ) . collect ( Collectors . toList ( ) ) ;
} catch ( Exception e ) {
String errorMsg = " Could not retrieve the files from directory: ' " + directory + " '! " ;
logger . error ( errorMsg , e ) ;
return null ;
}
return fileLocations ;
}
2023-09-26 17:01:55 +02:00
public String getMD5Hash ( String string )
2023-05-11 02:07:55 +02:00
{
String md5 = null ;
try {
MessageDigest md5MD = MessageDigest . getInstance ( " MD5 " ) ; // New instance for any new request. Otherwise, we need to synchronize the use of that object among multiple threads.
md5MD . update ( string . getBytes ( ) ) ;
md5 = DatatypeConverter . printHexBinary ( md5MD . digest ( ) ) . toLowerCase ( ) ;
} catch ( Exception e ) {
logger . error ( " Error when getting the MD5-hash for: " + string , e ) ;
return null ;
}
return md5 ;
}
2023-09-26 17:01:55 +02:00
public String generateOpenaireId ( String id , String datasourcePrefix , boolean isAuthoritative )
{
// If the "provenance" relates to an "authoritative" source, then its id has to be lowercase, before the md5() is applied to it.
// general_openaire_id = <datasourcePrefix> + "::" + <md5(ID)>
// authoritative_openaire_id = <datasourcePrefix> + "::" + <md5(lowercase(ID))>
if ( isAuthoritative )
id = id . toLowerCase ( ) ;
String idMd5Hash = getMD5Hash ( id ) ;
if ( idMd5Hash = = null )
return null ;
return ( datasourcePrefix + " :: " + idMd5Hash ) ;
}
2023-05-11 02:07:55 +02:00
}