2021-03-16 14:25:15 +01:00
package eu.openaire.urls_controller.controllers ;
2021-11-09 22:59:27 +01:00
import eu.openaire.urls_controller.configuration.ImpalaConnector ;
2022-12-09 11:46:06 +01:00
import eu.openaire.urls_controller.models.* ;
2021-06-22 04:38:48 +02:00
import eu.openaire.urls_controller.payloads.requests.WorkerReport ;
2021-11-30 12:26:19 +01:00
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse ;
2021-12-21 14:55:27 +01:00
import eu.openaire.urls_controller.util.FileUtils ;
2022-11-10 16:18:21 +01:00
import eu.openaire.urls_controller.util.ParquetFileUtils ;
2021-03-16 14:25:15 +01:00
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
2022-01-30 21:14:52 +01:00
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
2021-06-22 04:38:48 +02:00
import org.springframework.http.HttpStatus ;
2021-03-16 14:25:15 +01:00
import org.springframework.http.ResponseEntity ;
2022-01-30 21:14:52 +01:00
import org.springframework.jdbc.core.JdbcTemplate ;
2021-06-22 04:38:48 +02:00
import org.springframework.web.bind.annotation.* ;
2021-03-16 14:25:15 +01:00
2021-11-30 17:23:27 +01:00
import javax.servlet.http.HttpServletRequest ;
2022-11-10 16:18:21 +01:00
import java.io.File ;
import java.nio.file.Files ;
import java.nio.file.Paths ;
import java.sql.Connection ;
import java.sql.PreparedStatement ;
import java.sql.SQLException ;
import java.sql.Timestamp ;
2022-01-30 21:14:52 +01:00
import java.util.ArrayList ;
import java.util.List ;
2022-02-04 13:48:22 +01:00
import java.util.concurrent.Callable ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
2022-12-09 11:46:06 +01:00
import java.util.concurrent.Future ;
2022-02-04 13:48:22 +01:00
import java.util.concurrent.atomic.AtomicInteger ;
2021-07-05 13:04:39 +02:00
import java.util.concurrent.atomic.AtomicLong ;
2021-12-10 20:47:58 +01:00
import java.util.regex.Pattern ;
2021-03-16 14:25:15 +01:00
2022-02-02 19:19:46 +01:00
2021-03-16 14:25:15 +01:00
@RestController
@RequestMapping ( " /urls " )
public class UrlController {
private static final Logger logger = LoggerFactory . getLogger ( UrlController . class ) ;
2022-01-30 21:14:52 +01:00
@Autowired
private JdbcTemplate jdbcTemplate ;
@Autowired
private FileUtils fileUtils ;
2021-03-16 14:25:15 +01:00
2022-11-10 16:18:21 +01:00
@Autowired
private ParquetFileUtils parquetFileUtils ;
public static final AtomicLong assignmentsBatchCounter = new AtomicLong ( 0 ) ;
2021-12-10 20:47:58 +01:00
private static final Pattern MALICIOUS_INPUT_STRING = Pattern . compile ( " .*[';` \" ]+.* " ) ;
2022-01-31 12:49:14 +01:00
@Value ( " ${services.pdfaggregation.controller.assignmentLimit} " )
2022-01-30 21:14:52 +01:00
private int assignmentLimit ;
2021-03-16 14:25:15 +01:00
2022-02-22 12:54:16 +01:00
private final AtomicInteger maxAttemptsPerRecordAtomic ;
2022-02-07 12:57:09 +01:00
public UrlController ( @Value ( " ${services.pdfaggregation.controller.maxAttemptsPerRecord} " ) int maxAttemptsPerRecord ) {
maxAttemptsPerRecordAtomic = new AtomicInteger ( maxAttemptsPerRecord ) ;
}
2022-02-02 19:19:46 +01:00
2021-03-16 14:25:15 +01:00
@GetMapping ( " " )
2021-08-05 14:43:37 +02:00
public ResponseEntity < ? > getUrls ( @RequestParam String workerId , @RequestParam int workerAssignmentsLimit ) {
2021-03-16 14:25:15 +01:00
2021-12-10 20:47:58 +01:00
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING . matcher ( workerId ) . matches ( ) ) {
String errorMsg = " Possibly malicious \" workerId \" received: " + workerId ;
logger . error ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . FORBIDDEN ) . body ( errorMsg ) ;
}
2022-01-30 21:14:52 +01:00
logger . info ( " Worker with id: \" " + workerId + " \" , requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit ) ;
2021-03-16 14:25:15 +01:00
2021-11-09 22:59:27 +01:00
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
2021-08-05 14:43:37 +02:00
2021-11-09 22:59:27 +01:00
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit ;
if ( assignmentsLimit = = 0 ) {
String errorMsg = " The given \" workerAssignmentsLimit \" was ZERO! " ;
logger . error ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . BAD_REQUEST ) . body ( errorMsg ) ;
2022-01-30 21:14:52 +01:00
} else if ( assignmentsLimit > assignmentLimit ) {
logger . warn ( " The given \" workerAssignmentsLimit \" ( " + workerAssignmentsLimit + " ) was larger than the Controller's limit ( " + assignmentLimit + " ). Will use the Controller's limit. " ) ;
assignmentsLimit = assignmentLimit ;
2021-11-09 22:59:27 +01:00
}
2021-08-05 14:43:37 +02:00
2021-11-30 18:59:46 +01:00
String findAssignmentsQuery = " select pubid, url, datasourceid, datasourcetype \ n " +
2022-02-22 12:54:16 +01:00
" from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count \ n " +
" from ( \ n " +
" select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count \ n " +
2022-04-08 16:39:45 +02:00
" from " + ImpalaConnector . databaseName + " .publication p \ n " +
" join " + ImpalaConnector . databaseName + " .publication_urls pu on pu.id=p.id \ n " +
" join " + ImpalaConnector . databaseName + " .datasource d on d.id=p.datasourceid \ n " +
" left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector . databaseName + " .attempt a group by a.id) as attempts on attempts.id=p.id \ n " +
" left outer join (select a.id, a.original_url from " + ImpalaConnector . databaseName + " .assignment a \ n " +
2022-02-22 12:54:16 +01:00
" union all \ n " +
2022-04-08 16:39:45 +02:00
" select pl.id, pl.original_url from " + ImpalaConnector . databaseName + " .payload pl) \ n " +
2022-02-22 12:54:16 +01:00
" as existing on existing.id=p.id and existing.original_url=pu.url \ n " +
" where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic . get ( ) +
2022-04-08 16:39:45 +02:00
" \ nand not exists (select 1 from " + ImpalaConnector . databaseName + " .attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1) \ n " +
2022-02-22 12:54:16 +01:00
" limit " + ( assignmentsLimit * 10 ) + " ) \ n " +
" as non_distinct_results \ n " +
" order by coalesce(attempt_count, 0), reverse(pubid), url \ n " +
" limit " + assignmentsLimit + " ) \ n " +
" as findAssignmentsQuery " ;
2021-11-09 22:59:27 +01:00
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
2022-02-07 12:57:09 +01:00
//logger.debug(findAssignmentsQuery); // DEBUG!
2021-11-09 22:59:27 +01:00
2022-04-08 16:39:45 +02:00
String getAssignmentsQuery = " select * from " + ImpalaConnector . databaseName + " .current_assignment " ;
2021-11-09 22:59:27 +01:00
2022-02-02 19:19:46 +01:00
List < Assignment > assignments = new ArrayList < > ( assignmentsLimit ) ;
2021-11-09 22:59:27 +01:00
ImpalaConnector . databaseLock . lock ( ) ;
2021-03-16 14:25:15 +01:00
2022-12-07 12:48:00 +01:00
String errorMsg = createAndInitializeCurrentAssignmentsTable ( findAssignmentsQuery ) ;
if ( errorMsg ! = null ) {
2022-02-02 19:19:46 +01:00
ImpalaConnector . databaseLock . unlock ( ) ;
2021-11-09 22:59:27 +01:00
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
2021-07-05 13:04:39 +02:00
2022-11-10 16:18:21 +01:00
long timestampMillis = System . currentTimeMillis ( ) ;
Timestamp timestamp = new Timestamp ( timestampMillis ) ; // Store it here, in order to have the same for all current records.
2021-11-09 22:59:27 +01:00
2022-01-30 21:14:52 +01:00
try {
2022-02-02 19:19:46 +01:00
jdbcTemplate . query ( getAssignmentsQuery , rs - > {
Assignment assignment = new Assignment ( ) ;
assignment . setWorkerId ( workerId ) ;
assignment . setTimestamp ( timestamp ) ;
Datasource datasource = new Datasource ( ) ;
try { // For each of the 4 columns returned. The indexing starts from 1
assignment . setId ( rs . getString ( 1 ) ) ;
assignment . setOriginalUrl ( rs . getString ( 2 ) ) ;
datasource . setId ( rs . getString ( 3 ) ) ;
datasource . setName ( rs . getString ( 4 ) ) ;
} catch ( SQLException sqle ) {
logger . error ( " No value was able to be retrieved from one of the columns of row_ " + rs . getRow ( ) , sqle ) ;
2021-11-09 22:59:27 +01:00
}
2022-02-02 19:19:46 +01:00
assignment . setDatasource ( datasource ) ;
assignments . add ( assignment ) ;
2022-01-30 21:14:52 +01:00
} ) ;
2021-11-09 22:59:27 +01:00
} catch ( Exception e ) {
2022-12-07 12:48:00 +01:00
errorMsg = ImpalaConnector . handleQueryException ( " getAssignmentsQuery " , getAssignmentsQuery , e ) ;
2022-02-07 12:57:09 +01:00
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
2021-11-09 22:59:27 +01:00
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
2021-07-05 13:04:39 +02:00
2021-11-09 22:59:27 +01:00
int assignmentsSize = assignments . size ( ) ;
if ( assignmentsSize = = 0 ) {
2022-12-07 12:48:00 +01:00
errorMsg = " No results retrieved from the \" findAssignmentsQuery \" for worker with id: " + workerId + " . Will increase the \" maxAttempts \" to " + maxAttemptsPerRecordAtomic . incrementAndGet ( ) + " for the next requests. " ;
2021-11-09 22:59:27 +01:00
logger . error ( errorMsg ) ;
2022-02-07 12:57:09 +01:00
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
2022-12-05 15:44:00 +01:00
if ( tmpErrMsg ! = null ) {
errorMsg + = " \ n " + tmpErrMsg ; // The additional error-msg is already logged.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} else
return ResponseEntity . status ( HttpStatus . MULTI_STATUS ) . body ( new AssignmentsResponse ( ( long ) - 1 , null ) ) ;
2022-01-27 00:18:26 +01:00
} else if ( assignmentsSize < assignmentsLimit ) {
2022-02-07 12:57:09 +01:00
logger . warn ( " The retrieved results were fewer ( " + assignmentsSize + " ) than the \" assignmentsLimit \" ( " + assignmentsLimit + " ), for worker with id: " + workerId + " . Will increase the \" maxAttempts \" to " + maxAttemptsPerRecordAtomic . incrementAndGet ( ) + " for the next requests. " ) ;
2021-11-09 22:59:27 +01:00
}
2021-07-05 13:04:39 +02:00
2021-11-09 22:59:27 +01:00
logger . debug ( " Finished gathering " + assignmentsSize + " assignments for worker with id \" " + workerId + " \" . Going to insert them into the \" assignment \" table and then return them to the worker. " ) ;
2021-11-30 18:59:46 +01:00
// Write the Assignment details to the assignment-table.
2022-11-10 16:18:21 +01:00
String insertAssignmentsQuery = " insert into " + ImpalaConnector . databaseName + " .assignment \ n select pub_data.pubid, pub_data.url, ' " + workerId + " ', " + timestampMillis + " \ n "
+ " from ( \ n select pubid, url from " + ImpalaConnector . databaseName + " .current_assignment) as pub_data " ;
2021-11-09 22:59:27 +01:00
2021-11-30 18:59:46 +01:00
try {
2022-01-30 21:14:52 +01:00
jdbcTemplate . execute ( insertAssignmentsQuery ) ;
2022-02-07 12:57:09 +01:00
} catch ( Exception e ) {
2022-12-07 12:48:00 +01:00
errorMsg = ImpalaConnector . handleQueryException ( " insertAssignmentsQuery " , insertAssignmentsQuery , e ) ;
2022-02-07 12:57:09 +01:00
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
2021-11-09 22:59:27 +01:00
ImpalaConnector . databaseLock . unlock ( ) ;
2021-11-30 18:59:46 +01:00
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
2021-11-09 22:59:27 +01:00
}
2021-03-16 14:25:15 +01:00
2022-12-07 12:48:00 +01:00
errorMsg = dropCurrentAssignmentTable ( ) ;
2022-02-02 19:19:46 +01:00
if ( errorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
2021-11-09 22:59:27 +01:00
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
2022-02-02 19:19:46 +01:00
}
2021-11-09 22:59:27 +01:00
logger . debug ( " Finished inserting " + assignmentsSize + " assignments into the \" assignment \" -table. Going to merge the parquet files for this table. " ) ;
2022-01-30 21:14:52 +01:00
String mergeErrorMsg = fileUtils . mergeParquetFiles ( " assignment " , " " , null ) ;
2021-11-09 22:59:27 +01:00
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
}
2021-11-30 18:59:46 +01:00
ImpalaConnector . databaseLock . unlock ( ) ;
2021-11-09 22:59:27 +01:00
2021-12-06 19:18:30 +01:00
long curAssignmentsBatchCounter = assignmentsBatchCounter . incrementAndGet ( ) ;
logger . info ( " Sending batch-assignments_ " + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + " . " ) ;
return ResponseEntity . status ( HttpStatus . OK ) . body ( new AssignmentsResponse ( curAssignmentsBatchCounter , assignments ) ) ;
2021-03-16 14:25:15 +01:00
}
2022-02-02 19:19:46 +01:00
2022-12-15 13:04:22 +01:00
public static final ExecutorService insertsExecutor = Executors . newFixedThreadPool ( 6 ) ;
2022-02-04 13:48:22 +01:00
2021-06-22 04:38:48 +02:00
@PostMapping ( " addWorkerReport " )
2021-11-30 17:23:27 +01:00
public ResponseEntity < ? > addWorkerReport ( @RequestBody WorkerReport workerReport , HttpServletRequest request ) {
2021-06-22 04:38:48 +02:00
2021-11-09 22:59:27 +01:00
if ( workerReport = = null ) {
String errorMsg = " No \" WorkerReport \" was given! " ;
logger . error ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . BAD_REQUEST ) . body ( errorMsg ) ;
}
2021-12-10 20:47:58 +01:00
String curWorkerId = workerReport . getWorkerId ( ) ;
if ( curWorkerId = = null ) {
String errorMsg = " No \" workerId \" was included inside the \" WorkerReport \" ! " ;
logger . error ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . BAD_REQUEST ) . body ( errorMsg ) ;
}
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING . matcher ( curWorkerId ) . matches ( ) ) {
String errorMsg = " Possibly malicious \" workerId \" received: " + curWorkerId ;
logger . error ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . FORBIDDEN ) . body ( errorMsg ) ;
}
2022-04-04 23:01:44 +02:00
int sizeOUrlReports = 0 ;
2021-11-09 22:59:27 +01:00
List < UrlReport > urlReports = workerReport . getUrlReports ( ) ;
2022-04-04 23:01:44 +02:00
if ( ( urlReports = = null ) | | ( ( sizeOUrlReports = urlReports . size ( ) ) = = 0 ) ) {
2021-12-10 20:47:58 +01:00
String errorMsg = " The given \" WorkerReport \" from worker with ID \" " + curWorkerId + " \" was empty (without any UrlReports)! " ;
2021-11-09 22:59:27 +01:00
logger . error ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . BAD_REQUEST ) . body ( errorMsg ) ;
}
2021-06-22 04:38:48 +02:00
2021-12-06 19:18:30 +01:00
long curReportAssignments = workerReport . getAssignmentRequestCounter ( ) ;
2022-04-04 23:01:44 +02:00
logger . info ( " Received the WorkerReport for batch-assignments_ " + curReportAssignments + " , from the worker with id: " + curWorkerId + " . It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database. " ) ;
2021-06-22 04:38:48 +02:00
2021-11-30 17:23:27 +01:00
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
2022-01-30 21:14:52 +01:00
FileUtils . UploadFullTextsResponse uploadFullTextsResponse = fileUtils . getAndUploadFullTexts ( urlReports , request , curReportAssignments , curWorkerId ) ;
2022-01-12 23:47:15 +01:00
if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . databaseError ) {
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( " Problem with the Impala-database! " ) ;
}
else if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . unsuccessful ) {
2022-04-04 23:01:44 +02:00
logger . error ( " Failed to get and/or upload the fullTexts for batch-assignments_ " + curReportAssignments ) ;
2022-09-28 21:34:33 +02:00
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
2022-02-08 14:02:13 +01:00
fileUtils . updateUrlReportsToHaveNoFullTextFiles ( urlReports , false ) ;
2022-09-28 21:34:33 +02:00
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
2021-11-30 17:23:27 +01:00
}
2022-04-04 23:01:44 +02:00
else
logger . debug ( " Finished uploading the full-texts from batch-assignments_ " + curReportAssignments ) ;
2022-02-22 12:54:16 +01:00
2023-01-10 12:34:54 +01:00
String currentParquetPath = parquetFileUtils . parquetBaseLocalDirectoryPath + " assignments_ " + curReportAssignments + File . separator ;
2022-11-10 16:18:21 +01:00
java . nio . file . Path parquetDirPath = Paths . get ( currentParquetPath ) ;
if ( ! Files . isDirectory ( parquetDirPath ) ) {
try {
Files . createDirectories ( parquetDirPath ) ;
} catch ( Exception e ) {
logger . error ( " " , e ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( e . getMessage ( ) ) ;
}
}
2021-11-09 22:59:27 +01:00
2022-11-10 16:18:21 +01:00
logger . debug ( " Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_ " + curReportAssignments ) ;
2022-02-22 12:54:16 +01:00
2022-12-09 11:46:06 +01:00
List < Callable < ParquetReport > > callableTasks = parquetFileUtils . getTasksForCreatingAndUploadingParquetFiles ( urlReports , sizeOUrlReports , curReportAssignments , currentParquetPath , uploadFullTextsResponse ) ;
2021-11-09 22:59:27 +01:00
2022-12-09 11:46:06 +01:00
boolean hasAttemptParquetFileProblem = false ;
boolean hasPayloadParquetFileProblem = false ;
2022-02-04 13:48:22 +01:00
2022-11-10 16:18:21 +01:00
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
2022-12-09 11:46:06 +01:00
List < Future < ParquetReport > > futures = insertsExecutor . invokeAll ( callableTasks ) ;
SumParquetSuccess sumParquetSuccess = checkParquetFilesSuccess ( futures ) ;
ResponseEntity < ? > errorResponseEntity = sumParquetSuccess . getResponseEntity ( ) ;
if ( errorResponseEntity ! = null ) {
return errorResponseEntity ; // The related log is already shown.
}
hasAttemptParquetFileProblem = sumParquetSuccess . isAttemptParquetFileProblem ( ) ;
hasPayloadParquetFileProblem = sumParquetSuccess . isPayloadParquetFileProblem ( ) ;
if ( hasAttemptParquetFileProblem & & hasPayloadParquetFileProblem )
throw new RuntimeException ( " All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_ " + curReportAssignments ) ;
else {
if ( hasAttemptParquetFileProblem )
logger . error ( " All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \" attempt \" , for batch-assignments_ " + curReportAssignments ) ;
else if ( hasPayloadParquetFileProblem )
logger . error ( " The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \" payload \" , for batch-assignments_ " + curReportAssignments ) ;
else
logger . debug ( " Going to execute \" load \" -requests on the database, for the uploaded parquet-files. " ) ;
}
2022-02-17 15:27:40 +01:00
2022-11-10 16:18:21 +01:00
// Load all the parquet files of each type into its table.
ImpalaConnector . databaseLock . lock ( ) ;
2022-12-09 11:46:06 +01:00
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils . loadParquetDataIntoTable ( parquetFileUtils . parquetHDFSDirectoryPathAttempts , " attempt " ) ;
if ( ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils . loadParquetDataIntoTable ( parquetFileUtils . parquetHDFSDirectoryPathPayloads , " payload " ) ;
2022-11-10 16:18:21 +01:00
ImpalaConnector . databaseLock . unlock ( ) ;
2022-02-04 13:48:22 +01:00
2022-12-09 11:46:06 +01:00
if ( hasAttemptParquetFileProblem & & hasPayloadParquetFileProblem )
throw new RuntimeException ( " The data from the HDFS parquet sub-directories COULD NOT be loaded into the \" attempt \" and the \" payload \" tables, for batch-assignments_ " + curReportAssignments ) ;
else if ( hasAttemptParquetFileProblem | | hasPayloadParquetFileProblem )
logger . error ( " The data from the HDFS parquet sub-directories COULD NOT be loaded into the \" attempt \" or the \" payload \" table, for batch-assignments_ " + curReportAssignments ) ;
2022-11-10 16:18:21 +01:00
else
2022-12-09 11:46:06 +01:00
logger . debug ( " The data from the HDFS parquet sub-directories was loaded into the \" attempt \" and the \" payload \" tables, for batch-assignments_ " + curReportAssignments ) ;
2021-11-09 22:59:27 +01:00
2022-02-04 13:48:22 +01:00
} catch ( InterruptedException ie ) { // In this case, any unfinished tasks are cancelled.
logger . warn ( " The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie . getMessage ( ) ) ;
2022-11-10 16:18:21 +01:00
// This is a very rare case. At the moment, we just move on with table-merging.
2022-12-09 11:46:06 +01:00
} catch ( RuntimeException re ) {
String errorMsg = re . getMessage ( ) ;
ImpalaConnector . databaseLock . lock ( ) ;
String assignmentErrorMsg = deleteWorkerAssignments ( curWorkerId ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
if ( assignmentErrorMsg ! = null ) {
errorMsg + = " \ n " + assignmentErrorMsg ;
}
logger . error ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
2022-02-04 13:48:22 +01:00
} catch ( Exception e ) {
String errorMsg = " Unexpected error when inserting into the \" payload \" and \" attempt \" tables in parallel! " + e . getMessage ( ) ;
logger . error ( errorMsg , e ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
2022-11-10 16:18:21 +01:00
} finally {
2022-12-08 15:28:41 +01:00
logger . debug ( " Deleting directory: " + currentParquetPath ) ;
2022-12-15 17:29:51 +01:00
FileUtils . deleteDirectory ( new File ( currentParquetPath ) ) ;
2022-02-04 13:48:22 +01:00
}
2022-11-10 16:18:21 +01:00
logger . debug ( " Going to merge the parquet files for the tables which were altered. " ) ;
2022-12-08 15:28:41 +01:00
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
2021-11-09 22:59:27 +01:00
2022-11-10 16:18:21 +01:00
String mergeErrorMsg ;
ImpalaConnector . databaseLock . lock ( ) ;
2022-12-09 11:46:06 +01:00
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = fileUtils . mergeParquetFiles ( " attempt " , " " , null ) ;
2022-11-10 16:18:21 +01:00
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
}
2021-11-09 22:59:27 +01:00
}
2022-12-09 11:46:06 +01:00
if ( ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils . mergeParquetFiles ( " payload " , " " , null ) ;
2022-11-10 16:18:21 +01:00
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
}
2021-12-10 20:47:58 +01:00
}
2022-12-09 11:46:06 +01:00
mergeErrorMsg = deleteWorkerAssignments ( curWorkerId ) ;
2021-11-09 22:59:27 +01:00
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
}
2022-11-10 16:18:21 +01:00
2022-01-30 21:14:52 +01:00
ImpalaConnector . databaseLock . unlock ( ) ;
2022-02-04 13:48:22 +01:00
2022-01-19 00:37:47 +01:00
logger . debug ( " Finished merging the database tables. " ) ;
2022-09-28 21:34:33 +02:00
if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . unsuccessful )
2022-11-10 16:18:21 +01:00
return ResponseEntity . status ( HttpStatus . MULTI_STATUS ) . body ( " The full-text files failed to be acquired from the worker! " ) ;
2022-09-28 21:34:33 +02:00
else
2022-11-10 16:18:21 +01:00
return ResponseEntity . status ( HttpStatus . OK ) . build ( ) ;
2021-11-09 22:59:27 +01:00
}
2022-02-02 19:19:46 +01:00
2022-12-09 11:46:06 +01:00
private String deleteWorkerAssignments ( String curWorkerId )
{
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
return fileUtils . mergeParquetFiles ( " assignment " , " WHERE workerid != " , curWorkerId ) ;
}
2022-12-07 12:48:00 +01:00
private String createAndInitializeCurrentAssignmentsTable ( String findAssignmentsQuery )
{
String createCurrentAssignmentsQuery = " create table " + ImpalaConnector . databaseName + " .current_assignment as \ n " + findAssignmentsQuery ;
String computeCurrentAssignmentsStatsQuery = " COMPUTE STATS " + ImpalaConnector . databaseName + " .current_assignment " ;
try {
jdbcTemplate . execute ( createCurrentAssignmentsQuery ) ;
} catch ( Exception e ) {
String errorMsg = ImpalaConnector . handleQueryException ( " createCurrentAssignmentsQuery " , createCurrentAssignmentsQuery , e ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ; // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
return errorMsg ;
}
try {
jdbcTemplate . execute ( computeCurrentAssignmentsStatsQuery ) ;
} catch ( Exception e ) {
String errorMsg = ImpalaConnector . handleQueryException ( " computeCurrentAssignmentsStatsQuery " , computeCurrentAssignmentsStatsQuery , e ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
return errorMsg ;
}
return null ; // All good.
}
2022-02-14 11:36:00 +01:00
private String dropCurrentAssignmentTable ( ) {
2022-04-08 16:39:45 +02:00
String dropCurrentAssignmentsQuery = " DROP TABLE IF EXISTS " + ImpalaConnector . databaseName + " .current_assignment PURGE " ;
2022-02-14 11:36:00 +01:00
try {
jdbcTemplate . execute ( dropCurrentAssignmentsQuery ) ;
2022-04-04 23:01:44 +02:00
return null ; // All good. No error-message.
2022-02-14 11:36:00 +01:00
} catch ( Exception e ) {
2022-12-05 15:44:00 +01:00
return ImpalaConnector . handleQueryException ( " dropCurrentAssignmentsQuery " , dropCurrentAssignmentsQuery , e ) ; // The error is already logged inside.
2022-02-14 11:36:00 +01:00
}
}
2022-12-09 11:46:06 +01:00
private static SumParquetSuccess checkParquetFilesSuccess ( List < Future < ParquetReport > > futures )
{
int numOfAllAttemptParquetFileCreations = 0 ;
int numOfFailedAttemptParquetFileCreations = 0 ;
int numOfAllPayloadParquetFileCreations = 0 ;
int numOfFailedPayloadParquetFileCreations = 0 ;
for ( Future < ParquetReport > future : futures )
{
ParquetReport parquetReport = null ;
try {
parquetReport = future . get ( ) ;
boolean hasProblems = ( ! parquetReport . isSuccessful ( ) ) ;
ParquetReport . ParquetType parquetType = parquetReport . getParquetType ( ) ;
if ( parquetType . equals ( ParquetReport . ParquetType . attempt ) ) {
numOfAllAttemptParquetFileCreations + + ;
if ( hasProblems )
numOfFailedAttemptParquetFileCreations + + ;
} else if ( parquetType . equals ( ParquetReport . ParquetType . payload ) ) {
numOfAllPayloadParquetFileCreations + + ;
if ( hasProblems )
numOfFailedPayloadParquetFileCreations + + ;
} else {
String errMsg = " An invalid \" ParquetReport.ParquetType \" was found: " + parquetType ; // This should never happen, but anyway.
logger . error ( errMsg ) ;
return new SumParquetSuccess ( false , false , ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errMsg ) ) ;
}
} catch ( Exception e ) {
logger . error ( " " , e ) ;
// We do not know if the failed "future" refers to a "payload" or to a "attempt".
// So we cannot increase a specific counter. That's ok, the only drawback if that we may try to "load" the non-existent data and get an exception.
}
} // End-for
boolean hasAttemptParquetFileProblem = ( numOfFailedAttemptParquetFileCreations = = numOfAllAttemptParquetFileCreations ) ;
boolean hasPayloadParquetFileProblem = ( numOfFailedPayloadParquetFileCreations = = numOfAllPayloadParquetFileCreations ) ;
return new SumParquetSuccess ( hasAttemptParquetFileProblem , hasPayloadParquetFileProblem , null ) ;
}
2021-11-09 22:59:27 +01:00
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
2022-01-30 21:14:52 +01:00
private static PreparedStatement constructLargeInsertQuery ( Connection con , String baseInsertQuery , int dataSize , int numParamsPerRow ) throws RuntimeException {
2021-11-09 22:59:27 +01:00
StringBuilder sb = new StringBuilder ( baseInsertQuery . length ( ) + ( dataSize * 6 * numParamsPerRow ) ) ; // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb . append ( baseInsertQuery ) ;
for ( int i = 1 ; i < = dataSize ; + + i ) {
sb . append ( " ( " ) ;
for ( int j = 1 ; j < = numParamsPerRow ; + + j ) {
sb . append ( " ? " ) ;
if ( j < numParamsPerRow )
sb . append ( " , " ) ;
}
sb . append ( " ) " ) ;
if ( i < dataSize )
sb . append ( " , " ) ;
}
PreparedStatement preparedInsertStatement ;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con . prepareStatement ( sb . toString ( ) ) ;
} catch ( SQLException sqle ) {
String errorMsg = " Problem when creating the prepared statement for the insertQuery: \" " + baseInsertQuery + " \" ...! \ n " ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
throw new RuntimeException ( errorMsg ) ;
}
return preparedInsertStatement ;
}
2021-03-16 14:25:15 +01:00
}