2023-02-21 14:36:35 +01:00
package eu.openaire.urls_controller.services ;
2023-05-11 02:07:55 +02:00
import eu.openaire.urls_controller.components.BulkImport ;
2023-02-21 14:36:35 +01:00
import eu.openaire.urls_controller.configuration.ImpalaConnector ;
2023-05-24 12:52:28 +02:00
import eu.openaire.urls_controller.controllers.UrlsController ;
2023-02-21 14:36:35 +01:00
import eu.openaire.urls_controller.models.* ;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse ;
import eu.openaire.urls_controller.util.FileUtils ;
import eu.openaire.urls_controller.util.ParquetFileUtils ;
2023-05-24 12:52:28 +02:00
import io.micrometer.core.annotation.Timed ;
2023-02-21 14:36:35 +01:00
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
2023-06-22 11:39:11 +02:00
import org.springframework.dao.EmptyResultDataAccessException ;
2023-02-21 14:36:35 +01:00
import org.springframework.http.HttpStatus ;
import org.springframework.http.ResponseEntity ;
import org.springframework.jdbc.core.JdbcTemplate ;
import org.springframework.stereotype.Service ;
2023-05-24 12:52:28 +02:00
import org.springframework.web.client.HttpServerErrorException ;
import org.springframework.web.client.RestTemplate ;
2023-02-21 14:36:35 +01:00
import java.io.File ;
import java.nio.file.Files ;
import java.nio.file.Paths ;
import java.sql.Connection ;
import java.sql.PreparedStatement ;
import java.sql.SQLException ;
import java.sql.Timestamp ;
import java.util.ArrayList ;
2023-05-11 02:07:55 +02:00
import java.util.HashMap ;
2023-02-21 14:36:35 +01:00
import java.util.List ;
import java.util.concurrent.Callable ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Future ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.atomic.AtomicLong ;
@Service
public class UrlsServiceImpl implements UrlsService {
private static final Logger logger = LoggerFactory . getLogger ( UrlsServiceImpl . class ) ;
@Autowired
private JdbcTemplate jdbcTemplate ;
@Autowired
private FileUtils fileUtils ;
@Autowired
private ParquetFileUtils parquetFileUtils ;
2023-05-24 12:52:28 +02:00
@Value ( " ${services.pdfaggregation.controller.workerReportsDirPath} " )
private String workerReportsDirPath ;
2023-02-21 14:36:35 +01:00
public static final AtomicLong assignmentsBatchCounter = new AtomicLong ( 0 ) ;
private final AtomicInteger maxAttemptsPerRecordAtomic ;
2023-03-21 06:19:35 +01:00
private static String excludedDatasourceIDsStringList = null ;
2023-02-21 14:36:35 +01:00
public static final ExecutorService insertsExecutor = Executors . newFixedThreadPool ( 6 ) ;
2023-05-11 02:10:53 +02:00
// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
2023-02-21 14:36:35 +01:00
2023-05-11 02:07:55 +02:00
public UrlsServiceImpl ( @Value ( " ${services.pdfaggregation.controller.maxAttemptsPerRecord} " ) int maxAttemptsPerRecord , BulkImport bulkImport )
{
2023-02-21 14:36:35 +01:00
maxAttemptsPerRecordAtomic = new AtomicInteger ( maxAttemptsPerRecord ) ;
2023-03-21 06:19:35 +01:00
2023-05-11 02:07:55 +02:00
HashMap < String , BulkImport . BulkImportSource > bulkImportSources = new HashMap < > ( bulkImport . getBulkImportSources ( ) ) ;
// The "bulkImportSources" will not be null, as it will be defined inside the "application.yml" file.
// In case no bulkImport Datasources are given, then the "bulkImportSources" list will just be empty.
if ( bulkImportSources . isEmpty ( ) )
2023-03-21 06:19:35 +01:00
return ; // So the "excludedDatasourceIDsStringList" -code should be placed last in this Constructor-method.
2023-05-15 11:44:16 +02:00
if ( logger . isTraceEnabled ( ) )
logger . trace ( " BulkImportSources: \ n " + bulkImportSources ) ;
2023-05-11 02:07:55 +02:00
List < String > excludedIDs = new ArrayList < > ( ) ;
for ( BulkImport . BulkImportSource source : bulkImportSources . values ( ) ) {
String datasourceID = source . getDatasourceID ( ) ;
if ( ( datasourceID = = null ) | | datasourceID . isEmpty ( ) )
throw new RuntimeException ( " One of the bulk-imported datasourceIDs was not found! | source: " + source ) ;
excludedIDs . add ( datasourceID ) ;
}
int exclusionListSize = excludedIDs . size ( ) ; // This list will not be empty.
2023-03-21 06:19:35 +01:00
// Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern:
// ("ID_1", "ID_2", ...)
final StringBuilder sb = new StringBuilder ( ( exclusionListSize * 46 ) + ( exclusionListSize - 1 ) + 2 ) ;
sb . append ( " ( " ) ;
for ( int i = 0 ; i < exclusionListSize ; + + i ) {
sb . append ( " \" " ) . append ( excludedIDs . get ( i ) ) . append ( " \" " ) ;
if ( i < ( exclusionListSize - 1 ) )
2023-03-21 16:04:28 +01:00
sb . append ( " , " ) ;
2023-03-21 06:19:35 +01:00
}
sb . append ( " ) " ) ;
excludedDatasourceIDsStringList = sb . toString ( ) ;
2023-05-11 02:07:55 +02:00
logger . info ( " The following bulkImport-datasources will be excluded from crawling: " + excludedDatasourceIDsStringList ) ;
2023-02-21 14:36:35 +01:00
}
2023-05-24 12:52:28 +02:00
@Timed ( value = " getAssignments.time " , description = " Time taken to return the assignments. " )
2023-03-07 15:55:41 +01:00
public ResponseEntity < ? > getAssignments ( String workerId , int assignmentsLimit )
2023-02-21 14:36:35 +01:00
{
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
2023-03-21 06:19:35 +01:00
String findAssignmentsQuery =
2023-04-26 18:33:28 +02:00
" select pubid, url, datasourceid, datasourcename \ n " + // The datsourceName is currently not used. It may be used by the Worker, in the future to apply a datasource-specific aggregation plugin to take the full-texts quickly, instead of using the general crawling one.
2023-04-10 14:00:23 +02:00
" from (select distinct pubid, url, datasourceid, datasourcename, attempt_count, pub_year \ n " +
" from (select p.id as pubid, p.year as pub_year, pu.url as url, d.id as datasourceid, d.name as datasourcename, attempts.counts as attempt_count \ n " +
2023-03-21 06:19:35 +01:00
" from " + ImpalaConnector . databaseName + " .publication p \ n " +
" join " + ImpalaConnector . databaseName + " .publication_urls pu on pu.id=p.id \ n " +
" join " + ImpalaConnector . databaseName + " .datasource d on d.id=p.datasourceid \ n " +
" left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector . databaseName + " .attempt a group by a.id) as attempts \ n " +
" on attempts.id=p.id \ n " +
2023-04-26 18:33:28 +02:00
" left outer join ( \ n " +
" select a.id, a.original_url from " + ImpalaConnector . databaseName + " .assignment a \ n " +
" union all \ n " +
2023-06-15 22:19:36 +02:00
" select pl.id, pl.original_url from " + ImpalaConnector . databaseName + " .payload pl \ n " + // Here we access the payload-VIEW which includes the three payload-tables.
" ) as existing \ n " +
2023-03-21 06:19:35 +01:00
" on existing.id=p.id and existing.original_url=pu.url \ n " +
2023-06-15 22:19:36 +02:00
" where d.allow_harvest=true and existing.id is null \ n " + // For records not found on existing, the "existing.id" will be null.
2023-03-21 06:19:35 +01:00
( ( excludedDatasourceIDsStringList ! = null ) ? // If we have an exclusion-list, use it below.
2023-04-26 18:33:28 +02:00
( " and d.id not in " + excludedDatasourceIDsStringList + " \ n " ) : " " ) +
" and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic . get ( ) + " \ n " +
" and not exists (select 1 from " + ImpalaConnector . databaseName + " .attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1) \ n " +
" and pu.url != '' and pu.url is not null \ n " + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
" limit " + ( assignmentsLimit * 10 ) + " ) \ n " +
2023-03-21 06:19:35 +01:00
" as non_distinct_results \ n " +
2023-04-10 14:00:23 +02:00
" order by coalesce(attempt_count, 0), coalesce(pub_year, 0) desc, reverse(pubid), url \ n " +
2023-03-21 06:19:35 +01:00
" limit " + assignmentsLimit + " ) \ n " +
" as findAssignmentsQuery " ;
2023-02-21 14:36:35 +01:00
2023-04-10 14:00:23 +02:00
2023-02-21 14:36:35 +01:00
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
2023-05-15 11:44:16 +02:00
//logger.trace("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG!
2023-02-21 14:36:35 +01:00
2023-04-26 18:33:28 +02:00
final String getAssignmentsQuery = " select * from " + ImpalaConnector . databaseName + " .current_assignment " ;
2023-02-21 14:36:35 +01:00
List < Assignment > assignments = new ArrayList < > ( assignmentsLimit ) ;
ImpalaConnector . databaseLock . lock ( ) ;
String errorMsg = createAndInitializeCurrentAssignmentsTable ( findAssignmentsQuery ) ;
if ( errorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
long timestampMillis = System . currentTimeMillis ( ) ;
Timestamp timestamp = new Timestamp ( timestampMillis ) ; // Store it here, in order to have the same for all current records.
try {
jdbcTemplate . query ( getAssignmentsQuery , rs - > {
Assignment assignment = new Assignment ( ) ;
assignment . setWorkerId ( workerId ) ;
assignment . setTimestamp ( timestamp ) ;
Datasource datasource = new Datasource ( ) ;
try { // For each of the 4 columns returned. The indexing starts from 1
assignment . setId ( rs . getString ( 1 ) ) ;
assignment . setOriginalUrl ( rs . getString ( 2 ) ) ;
datasource . setId ( rs . getString ( 3 ) ) ;
datasource . setName ( rs . getString ( 4 ) ) ;
} catch ( SQLException sqle ) {
logger . error ( " No value was able to be retrieved from one of the columns of row_ " + rs . getRow ( ) , sqle ) ;
}
assignment . setDatasource ( datasource ) ;
assignments . add ( assignment ) ;
} ) ;
2023-06-22 11:39:11 +02:00
} catch ( EmptyResultDataAccessException erdae ) {
errorMsg = " No results were returned for \" getAssignmentsQuery \" : \ n " + getAssignmentsQuery ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
logger . warn ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . NO_CONTENT ) . body ( errorMsg ) ;
2023-02-21 14:36:35 +01:00
} catch ( Exception e ) {
errorMsg = ImpalaConnector . handleQueryException ( " getAssignmentsQuery " , getAssignmentsQuery , e ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
2023-06-22 11:39:11 +02:00
ImpalaConnector . databaseLock . unlock ( ) ;
2023-02-21 14:36:35 +01:00
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
int assignmentsSize = assignments . size ( ) ;
if ( assignmentsSize = = 0 ) {
errorMsg = " No results retrieved from the \" findAssignmentsQuery \" for worker with id: " + workerId + " . Will increase the \" maxAttempts \" to " + maxAttemptsPerRecordAtomic . incrementAndGet ( ) + " for the next requests. " ;
logger . error ( errorMsg ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
if ( tmpErrMsg ! = null ) {
errorMsg + = " \ n " + tmpErrMsg ; // The additional error-msg is already logged.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} else
return ResponseEntity . status ( HttpStatus . MULTI_STATUS ) . body ( new AssignmentsResponse ( ( long ) - 1 , null ) ) ;
2023-04-26 18:33:28 +02:00
} else if ( assignmentsSize < assignmentsLimit )
2023-02-21 14:36:35 +01:00
logger . warn ( " The retrieved results were fewer ( " + assignmentsSize + " ) than the \" assignmentsLimit \" ( " + assignmentsLimit + " ), for worker with id: " + workerId + " . Will increase the \" maxAttempts \" to " + maxAttemptsPerRecordAtomic . incrementAndGet ( ) + " for the next requests. " ) ;
logger . debug ( " Finished gathering " + assignmentsSize + " assignments for worker with id \" " + workerId + " \" . Going to insert them into the \" assignment \" table and then return them to the worker. " ) ;
// Write the Assignment details to the assignment-table.
String insertAssignmentsQuery = " insert into " + ImpalaConnector . databaseName + " .assignment \ n select pub_data.pubid, pub_data.url, ' " + workerId + " ', " + timestampMillis + " \ n "
+ " from ( \ n select pubid, url from " + ImpalaConnector . databaseName + " .current_assignment) as pub_data " ;
try {
jdbcTemplate . execute ( insertAssignmentsQuery ) ;
} catch ( Exception e ) {
errorMsg = ImpalaConnector . handleQueryException ( " insertAssignmentsQuery " , insertAssignmentsQuery , e ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
errorMsg = dropCurrentAssignmentTable ( ) ;
if ( errorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
logger . debug ( " Finished inserting " + assignmentsSize + " assignments into the \" assignment \" -table. Going to merge the parquet files for this table. " ) ;
String mergeErrorMsg = fileUtils . mergeParquetFiles ( " assignment " , " " , null ) ;
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
}
ImpalaConnector . databaseLock . unlock ( ) ;
long curAssignmentsBatchCounter = assignmentsBatchCounter . incrementAndGet ( ) ;
logger . info ( " Sending batch-assignments_ " + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + " . " ) ;
return ResponseEntity . status ( HttpStatus . OK ) . body ( new AssignmentsResponse ( curAssignmentsBatchCounter , assignments ) ) ;
}
2023-05-24 12:52:28 +02:00
@Timed ( value = " addWorkerReport.time " , description = " Time taken to add the WorkerReport. " )
public Boolean addWorkerReport ( String curWorkerId , long curReportAssignments , List < UrlReport > urlReports , int sizeOfUrlReports )
2023-02-21 14:36:35 +01:00
{
2023-05-24 12:52:28 +02:00
logger . info ( " Initializing the addition of the worker's ( " + curWorkerId + " ) report for assignments_ " + assignmentsBatchCounter ) ;
2023-02-21 14:36:35 +01:00
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
2023-05-24 12:52:28 +02:00
FileUtils . UploadFullTextsResponse uploadFullTextsResponse = fileUtils . getAndUploadFullTexts ( urlReports , curReportAssignments , curWorkerId ) ;
2023-02-21 14:36:35 +01:00
if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . databaseError ) {
2023-05-24 12:52:28 +02:00
postReportResultToWorker ( curWorkerId , curReportAssignments , " Problem with the Impala-database! " ) ;
return false ;
2023-02-21 14:36:35 +01:00
}
else if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . unsuccessful ) {
logger . error ( " Failed to get and/or upload the fullTexts for batch-assignments_ " + curReportAssignments ) ;
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
fileUtils . updateUrlReportsToHaveNoFullTextFiles ( urlReports , false ) ;
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
2023-04-28 13:58:33 +02:00
} else
2023-02-21 14:36:35 +01:00
logger . debug ( " Finished uploading the full-texts from batch-assignments_ " + curReportAssignments ) ;
2023-05-15 12:12:20 +02:00
String localParquetPath = parquetFileUtils . parquetBaseLocalDirectoryPath + " assignments_ " + curReportAssignments + File . separator ;
2023-04-28 13:58:33 +02:00
try {
2023-05-15 12:12:20 +02:00
Files . createDirectories ( Paths . get ( localParquetPath ) ) ; // No-op if it already exists. It does not throw a "alreadyExistsException"
2023-04-28 13:58:33 +02:00
} catch ( Exception e ) {
2023-05-15 12:12:20 +02:00
String errorMsg = " Could not create the parquet-directory: " + localParquetPath ;
2023-04-28 13:58:33 +02:00
logger . error ( errorMsg , e ) ;
2023-05-24 12:52:28 +02:00
postReportResultToWorker ( curWorkerId , curReportAssignments , errorMsg ) ;
return false ;
2023-02-21 14:36:35 +01:00
}
logger . debug ( " Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_ " + curReportAssignments ) ;
2023-05-27 01:36:05 +02:00
List < Callable < ParquetReport > > callableTasks = parquetFileUtils . getTasksForCreatingAndUploadingParquetFiles ( urlReports , sizeOfUrlReports , curReportAssignments , localParquetPath , uploadFullTextsResponse ) ;
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
String endingMkDirAndParams = curReportAssignments + " / " + parquetFileUtils . mkDirsAndParams ;
if ( ! parquetFileUtils . applyHDFOperation ( parquetFileUtils . webHDFSBaseUrl + parquetFileUtils . parquetHDFSDirectoryPathAttempts + endingMkDirAndParams )
| | ! parquetFileUtils . applyHDFOperation ( parquetFileUtils . webHDFSBaseUrl + parquetFileUtils . parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams ) )
{
postReportResultToWorker ( curWorkerId , curReportAssignments , " Error when creating the HDFS sub-directories for assignments_ " + curReportAssignments ) ;
return false ;
}
2023-02-21 14:36:35 +01:00
boolean hasAttemptParquetFileProblem = false ;
boolean hasPayloadParquetFileProblem = false ;
2023-04-10 21:28:53 +02:00
try { // Invoke all the tasks and wait for them to finish.
2023-02-21 14:36:35 +01:00
List < Future < ParquetReport > > futures = insertsExecutor . invokeAll ( callableTasks ) ;
SumParquetSuccess sumParquetSuccess = parquetFileUtils . checkParquetFilesSuccess ( futures ) ;
ResponseEntity < ? > errorResponseEntity = sumParquetSuccess . getResponseEntity ( ) ;
2023-05-24 12:52:28 +02:00
if ( errorResponseEntity ! = null ) { // The related log is already shown.
postReportResultToWorker ( curWorkerId , curReportAssignments , " Error when creating or uploading the parquet files! " ) ;
return false ;
2023-02-21 14:36:35 +01:00
}
hasAttemptParquetFileProblem = sumParquetSuccess . isAttemptParquetFileProblem ( ) ;
hasPayloadParquetFileProblem = sumParquetSuccess . isPayloadParquetFileProblem ( ) ;
if ( hasAttemptParquetFileProblem & & hasPayloadParquetFileProblem )
throw new RuntimeException ( " All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_ " + curReportAssignments ) ;
else {
if ( hasAttemptParquetFileProblem )
logger . error ( " All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \" attempt \" , for batch-assignments_ " + curReportAssignments ) ;
else if ( hasPayloadParquetFileProblem )
2023-04-10 14:55:50 +02:00
logger . error ( " The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \" payload_aggregated \" , for batch-assignments_ " + curReportAssignments ) ;
2023-02-21 14:36:35 +01:00
else
logger . debug ( " Going to execute \" load \" -requests on the database, for the uploaded parquet-files. " ) ;
}
// Load all the parquet files of each type into its table.
ImpalaConnector . databaseLock . lock ( ) ;
if ( ! hasAttemptParquetFileProblem )
2023-05-27 01:36:05 +02:00
hasAttemptParquetFileProblem = ! parquetFileUtils . loadParquetDataIntoTable ( parquetFileUtils . parquetHDFSDirectoryPathAttempts + curReportAssignments + " / " , " attempt " ) ;
2023-02-21 14:36:35 +01:00
if ( ! hasPayloadParquetFileProblem )
2023-05-27 01:36:05 +02:00
hasPayloadParquetFileProblem = ! parquetFileUtils . loadParquetDataIntoTable ( parquetFileUtils . parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + " / " , " payload_aggregated " ) ;
2023-02-21 14:36:35 +01:00
ImpalaConnector . databaseLock . unlock ( ) ;
if ( hasAttemptParquetFileProblem & & hasPayloadParquetFileProblem )
2023-04-10 14:55:50 +02:00
throw new RuntimeException ( " The data from the HDFS parquet sub-directories COULD NOT be loaded into the \" attempt \" and the \" payload_aggregated \" tables, for batch-assignments_ " + curReportAssignments ) ;
2023-02-21 14:36:35 +01:00
else if ( hasAttemptParquetFileProblem | | hasPayloadParquetFileProblem )
2023-04-10 14:55:50 +02:00
logger . error ( " The data from the HDFS parquet sub-directories COULD NOT be loaded into the \" attempt \" or the \" payload_aggregated \" table, for batch-assignments_ " + curReportAssignments ) ;
2023-02-21 14:36:35 +01:00
else
2023-04-10 14:55:50 +02:00
logger . debug ( " The data from the HDFS parquet sub-directories was loaded into the \" attempt \" and the \" payload_aggregated \" tables, for batch-assignments_ " + curReportAssignments ) ;
2023-02-21 14:36:35 +01:00
} catch ( InterruptedException ie ) { // In this case, any unfinished tasks are cancelled.
logger . warn ( " The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie . getMessage ( ) ) ;
// This is a very rare case. At the moment, we just move on with table-merging.
} catch ( RuntimeException re ) {
ImpalaConnector . databaseLock . lock ( ) ;
String assignmentErrorMsg = deleteWorkerAssignments ( curWorkerId ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
2023-06-15 22:19:36 +02:00
String errorMsg = re . getMessage ( ) ;
2023-05-22 20:33:02 +02:00
if ( assignmentErrorMsg ! = null )
2023-02-21 14:36:35 +01:00
errorMsg + = " \ n " + assignmentErrorMsg ;
logger . error ( errorMsg ) ;
2023-05-24 12:52:28 +02:00
postReportResultToWorker ( curWorkerId , curReportAssignments , errorMsg ) ;
return false ;
2023-02-21 14:36:35 +01:00
} catch ( Exception e ) {
2023-04-10 14:55:50 +02:00
String errorMsg = " Unexpected error when inserting into the \" attempt \" and \" payload_aggregated \" tables in parallel! " + e . getMessage ( ) ;
2023-02-21 14:36:35 +01:00
logger . error ( errorMsg , e ) ;
2023-05-24 12:52:28 +02:00
postReportResultToWorker ( curWorkerId , curReportAssignments , errorMsg ) ;
2023-05-27 01:36:05 +02:00
return false ; // No tables-merging is happening.
2023-02-21 14:36:35 +01:00
} finally {
2023-05-15 12:12:20 +02:00
logger . debug ( " Deleting parquet directory: " + localParquetPath ) ;
fileUtils . deleteDirectory ( new File ( localParquetPath ) ) ;
2023-05-27 01:36:05 +02:00
// Delete the HDFS subDirs for this Report.
String endingRmDirAndParams = curReportAssignments + " / " + parquetFileUtils . rmDirsAndParams ;
if ( ! parquetFileUtils . applyHDFOperation ( parquetFileUtils . webHDFSBaseUrl + parquetFileUtils . parquetHDFSDirectoryPathAttempts + endingRmDirAndParams )
| | ! parquetFileUtils . applyHDFOperation ( parquetFileUtils . webHDFSBaseUrl + parquetFileUtils . parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams ) )
{
logger . error ( " Error when deleting the HDFS sub-directories for assignments_ " + curReportAssignments ) ; // A directory-specific log has already appeared.
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
// The worst case is that a few subDirs will be left back in the HDFS, although, without their parquetFiles, since they have already moved inside the DB tables.
}
2023-02-21 14:36:35 +01:00
}
logger . debug ( " Going to merge the parquet files for the tables which were altered. " ) ;
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
String mergeErrorMsg ;
ImpalaConnector . databaseLock . lock ( ) ;
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = fileUtils . mergeParquetFiles ( " attempt " , " " , null ) ;
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
2023-05-24 12:52:28 +02:00
postReportResultToWorker ( curWorkerId , curReportAssignments , mergeErrorMsg ) ;
return false ;
2023-02-21 14:36:35 +01:00
}
}
if ( ! hasPayloadParquetFileProblem ) {
2023-04-10 14:55:50 +02:00
mergeErrorMsg = fileUtils . mergeParquetFiles ( " payload_aggregated " , " " , null ) ;
2023-02-21 14:36:35 +01:00
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
2023-05-24 12:52:28 +02:00
postReportResultToWorker ( curWorkerId , curReportAssignments , mergeErrorMsg ) ;
return false ;
2023-02-21 14:36:35 +01:00
}
}
mergeErrorMsg = deleteWorkerAssignments ( curWorkerId ) ;
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
2023-05-24 12:52:28 +02:00
postReportResultToWorker ( curWorkerId , curReportAssignments , mergeErrorMsg ) ;
return false ;
2023-02-21 14:36:35 +01:00
}
ImpalaConnector . databaseLock . unlock ( ) ;
logger . debug ( " Finished merging the database tables. " ) ;
2023-05-24 12:52:28 +02:00
if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . unsuccessful ) {
postReportResultToWorker ( curWorkerId , curReportAssignments , " The full-text files failed to be acquired from the worker! " ) ;
return false ;
}
// Notify the Worker that the processing of this report was successful, so that the Worker can delete the files.
postReportResultToWorker ( curWorkerId , curReportAssignments , null ) ;
return true ;
2023-02-21 14:36:35 +01:00
}
private String createAndInitializeCurrentAssignmentsTable ( String findAssignmentsQuery )
{
2023-06-15 22:19:36 +02:00
final String createCurrentAssignmentsQuery = " create table " + ImpalaConnector . databaseName + " .current_assignment as \ n " + findAssignmentsQuery ;
final String computeCurrentAssignmentsStatsQuery = " COMPUTE STATS " + ImpalaConnector . databaseName + " .current_assignment " ;
2023-02-21 14:36:35 +01:00
try {
jdbcTemplate . execute ( createCurrentAssignmentsQuery ) ;
} catch ( Exception e ) {
String errorMsg = ImpalaConnector . handleQueryException ( " createCurrentAssignmentsQuery " , createCurrentAssignmentsQuery , e ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ; // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
return errorMsg ;
}
try {
jdbcTemplate . execute ( computeCurrentAssignmentsStatsQuery ) ;
} catch ( Exception e ) {
String errorMsg = ImpalaConnector . handleQueryException ( " computeCurrentAssignmentsStatsQuery " , computeCurrentAssignmentsStatsQuery , e ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
return errorMsg ;
}
return null ; // All good.
}
private String dropCurrentAssignmentTable ( ) {
String dropCurrentAssignmentsQuery = " DROP TABLE IF EXISTS " + ImpalaConnector . databaseName + " .current_assignment PURGE " ;
try {
jdbcTemplate . execute ( dropCurrentAssignmentsQuery ) ;
return null ; // All good. No error-message.
} catch ( Exception e ) {
return ImpalaConnector . handleQueryException ( " dropCurrentAssignmentsQuery " , dropCurrentAssignmentsQuery , e ) ; // The error is already logged inside.
}
}
private String deleteWorkerAssignments ( String curWorkerId )
{
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
2023-04-10 14:55:50 +02:00
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload_aggregated" table for previously handled tasks.
2023-02-21 14:36:35 +01:00
return fileUtils . mergeParquetFiles ( " assignment " , " WHERE workerid != " , curWorkerId ) ;
}
2023-05-24 12:52:28 +02:00
2023-06-10 01:31:57 +02:00
private static final RestTemplate restTemplate = new RestTemplate ( ) ;
2023-05-24 12:52:28 +02:00
private boolean postReportResultToWorker ( String workerId , long assignmentRequestCounter , String errorMsg )
{
// Get the IP of this worker.
WorkerInfo workerInfo = UrlsController . workersInfoMap . get ( workerId ) ;
if ( workerInfo = = null ) {
logger . error ( " Could not find any info for worker with id: \" " + workerId + " \" . " ) ;
return false ;
}
String url = " http:// " + workerInfo . getWorkerIP ( ) + " :1881/api/addReportResultToWorker/ " + assignmentRequestCounter ; // This workerIP will not be null.
if ( logger . isTraceEnabled ( ) )
logger . trace ( " Going to \" postReportResultToWorker \" : \" " + workerId + " \" , for assignments_ " + assignmentRequestCounter + ( ( errorMsg ! = null ) ? " \ nError: " + errorMsg : " " ) ) ;
try {
2023-06-10 01:31:57 +02:00
ResponseEntity < String > responseEntity = restTemplate . postForEntity ( url , errorMsg , String . class ) ; // We may pass a "null" entity.
2023-05-24 12:52:28 +02:00
int responseCode = responseEntity . getStatusCodeValue ( ) ;
if ( responseCode ! = HttpStatus . OK . value ( ) ) {
logger . error ( " HTTP-Connection problem with the submission of the \" postReportResultToWorker \" of worker \" " + workerId + " \" and assignments_ " + assignmentRequestCounter + " ! Error-code was: " + responseCode ) ;
return false ;
} else {
fileUtils . deleteFile ( workerReportsDirPath + " / " + workerId + " / " + workerId + " _assignments_ " + assignmentRequestCounter + " _report.json " ) ;
return true ;
}
} catch ( HttpServerErrorException hsee ) {
logger . error ( " The Worker \" " + workerId + " \" failed to handle the \" postReportResultToWorker \" , of assignments_ " + assignmentRequestCounter + " : " + hsee . getMessage ( ) ) ;
return false ;
} catch ( Exception e ) {
2023-06-15 22:19:36 +02:00
errorMsg = " Error for \" postReportResultToWorker \" , of assignments_ " + assignmentRequestCounter + " to the Worker: " + workerId ;
Throwable cause = e . getCause ( ) ;
String exMsg ;
if ( ( cause ! = null ) & & ( ( exMsg = cause . getMessage ( ) ) ! = null ) & & exMsg . contains ( " Connection refused " ) ) {
logger . error ( errorMsg + " | The worker has probably crashed, since we received a \" Connection refused \" ! " ) ;
workerInfo . setHasShutdown ( true ) ; // Avoid sending possible shutdown-Requests later on. Also show a specific message if this Worker requests new assignments in the future.
} else
logger . error ( errorMsg , e ) ;
2023-05-24 12:52:28 +02:00
return false ;
}
}
2023-02-21 14:36:35 +01:00
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery ( Connection con , String baseInsertQuery , int dataSize , int numParamsPerRow ) throws RuntimeException {
2023-05-27 01:36:05 +02:00
StringBuilder sb = new StringBuilder ( baseInsertQuery . length ( ) + ( dataSize * 6 * numParamsPerRow ) ) ; // TODO - If this is ever used, make it a global Thread-Local var. And then "clear" (reset) it after each use.
2023-02-21 14:36:35 +01:00
sb . append ( baseInsertQuery ) ;
for ( int i = 1 ; i < = dataSize ; + + i ) {
sb . append ( " ( " ) ;
for ( int j = 1 ; j < = numParamsPerRow ; + + j ) {
sb . append ( " ? " ) ;
if ( j < numParamsPerRow )
sb . append ( " , " ) ;
}
sb . append ( " ) " ) ;
if ( i < dataSize )
sb . append ( " , " ) ;
}
PreparedStatement preparedInsertStatement ;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con . prepareStatement ( sb . toString ( ) ) ;
} catch ( SQLException sqle ) {
String errorMsg = " Problem when creating the prepared statement for the insertQuery: \" " + baseInsertQuery + " \" ...! \ n " ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
throw new RuntimeException ( errorMsg ) ;
}
return preparedInsertStatement ;
}
}