2023-02-21 14:36:35 +01:00
package eu.openaire.urls_controller.services ;
2023-05-11 02:07:55 +02:00
import eu.openaire.urls_controller.components.BulkImport ;
2023-02-21 14:36:35 +01:00
import eu.openaire.urls_controller.configuration.ImpalaConnector ;
import eu.openaire.urls_controller.models.* ;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse ;
import eu.openaire.urls_controller.util.FileUtils ;
import eu.openaire.urls_controller.util.ParquetFileUtils ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.http.HttpStatus ;
import org.springframework.http.ResponseEntity ;
import org.springframework.jdbc.core.JdbcTemplate ;
import org.springframework.stereotype.Service ;
import javax.servlet.http.HttpServletRequest ;
import java.io.File ;
import java.nio.file.Files ;
import java.nio.file.Paths ;
import java.sql.Connection ;
import java.sql.PreparedStatement ;
import java.sql.SQLException ;
import java.sql.Timestamp ;
import java.util.ArrayList ;
2023-05-11 02:07:55 +02:00
import java.util.HashMap ;
2023-02-21 14:36:35 +01:00
import java.util.List ;
import java.util.concurrent.Callable ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Future ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.atomic.AtomicLong ;
@Service
public class UrlsServiceImpl implements UrlsService {
private static final Logger logger = LoggerFactory . getLogger ( UrlsServiceImpl . class ) ;
@Autowired
private JdbcTemplate jdbcTemplate ;
@Autowired
private FileUtils fileUtils ;
@Autowired
private ParquetFileUtils parquetFileUtils ;
public static final AtomicLong assignmentsBatchCounter = new AtomicLong ( 0 ) ;
private final AtomicInteger maxAttemptsPerRecordAtomic ;
2023-03-21 06:19:35 +01:00
private static String excludedDatasourceIDsStringList = null ;
2023-02-21 14:36:35 +01:00
public static final ExecutorService insertsExecutor = Executors . newFixedThreadPool ( 6 ) ;
2023-05-11 02:10:53 +02:00
// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
2023-02-21 14:36:35 +01:00
2023-05-11 02:07:55 +02:00
public UrlsServiceImpl ( @Value ( " ${services.pdfaggregation.controller.maxAttemptsPerRecord} " ) int maxAttemptsPerRecord , BulkImport bulkImport )
{
2023-02-21 14:36:35 +01:00
maxAttemptsPerRecordAtomic = new AtomicInteger ( maxAttemptsPerRecord ) ;
2023-03-21 06:19:35 +01:00
2023-05-11 02:07:55 +02:00
HashMap < String , BulkImport . BulkImportSource > bulkImportSources = new HashMap < > ( bulkImport . getBulkImportSources ( ) ) ;
// The "bulkImportSources" will not be null, as it will be defined inside the "application.yml" file.
// In case no bulkImport Datasources are given, then the "bulkImportSources" list will just be empty.
if ( bulkImportSources . isEmpty ( ) )
2023-03-21 06:19:35 +01:00
return ; // So the "excludedDatasourceIDsStringList" -code should be placed last in this Constructor-method.
2023-05-11 02:07:55 +02:00
logger . trace ( " BulkImportSources: \ n " + bulkImportSources ) ;
List < String > excludedIDs = new ArrayList < > ( ) ;
for ( BulkImport . BulkImportSource source : bulkImportSources . values ( ) ) {
String datasourceID = source . getDatasourceID ( ) ;
if ( ( datasourceID = = null ) | | datasourceID . isEmpty ( ) )
throw new RuntimeException ( " One of the bulk-imported datasourceIDs was not found! | source: " + source ) ;
excludedIDs . add ( datasourceID ) ;
}
int exclusionListSize = excludedIDs . size ( ) ; // This list will not be empty.
2023-03-21 06:19:35 +01:00
// Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern:
// ("ID_1", "ID_2", ...)
final StringBuilder sb = new StringBuilder ( ( exclusionListSize * 46 ) + ( exclusionListSize - 1 ) + 2 ) ;
sb . append ( " ( " ) ;
for ( int i = 0 ; i < exclusionListSize ; + + i ) {
sb . append ( " \" " ) . append ( excludedIDs . get ( i ) ) . append ( " \" " ) ;
if ( i < ( exclusionListSize - 1 ) )
2023-03-21 16:04:28 +01:00
sb . append ( " , " ) ;
2023-03-21 06:19:35 +01:00
}
sb . append ( " ) " ) ;
excludedDatasourceIDsStringList = sb . toString ( ) ;
2023-05-11 02:07:55 +02:00
logger . info ( " The following bulkImport-datasources will be excluded from crawling: " + excludedDatasourceIDsStringList ) ;
2023-02-21 14:36:35 +01:00
}
2023-03-07 15:55:41 +01:00
public ResponseEntity < ? > getAssignments ( String workerId , int assignmentsLimit )
2023-02-21 14:36:35 +01:00
{
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
2023-03-21 06:19:35 +01:00
String findAssignmentsQuery =
2023-04-26 18:33:28 +02:00
" select pubid, url, datasourceid, datasourcename \ n " + // The datsourceName is currently not used. It may be used by the Worker, in the future to apply a datasource-specific aggregation plugin to take the full-texts quickly, instead of using the general crawling one.
2023-04-10 14:00:23 +02:00
" from (select distinct pubid, url, datasourceid, datasourcename, attempt_count, pub_year \ n " +
" from (select p.id as pubid, p.year as pub_year, pu.url as url, d.id as datasourceid, d.name as datasourcename, attempts.counts as attempt_count \ n " +
2023-03-21 06:19:35 +01:00
" from " + ImpalaConnector . databaseName + " .publication p \ n " +
" join " + ImpalaConnector . databaseName + " .publication_urls pu on pu.id=p.id \ n " +
" join " + ImpalaConnector . databaseName + " .datasource d on d.id=p.datasourceid \ n " +
" left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector . databaseName + " .attempt a group by a.id) as attempts \ n " +
" on attempts.id=p.id \ n " +
2023-04-26 18:33:28 +02:00
" left outer join ( \ n " +
" select a.id, a.original_url from " + ImpalaConnector . databaseName + " .assignment a \ n " +
" union all \ n " +
" select pl.id, pl.original_url from " + ImpalaConnector . databaseName + " .payload pl \ n " +
" ) as existing \ n " + // Here we access the payload-VIEW which includes the three payload-tables.
2023-03-21 06:19:35 +01:00
" on existing.id=p.id and existing.original_url=pu.url \ n " +
2023-04-26 18:33:28 +02:00
" where d.allow_harvest=true and existing.id is null \ n " +
2023-03-21 06:19:35 +01:00
( ( excludedDatasourceIDsStringList ! = null ) ? // If we have an exclusion-list, use it below.
2023-04-26 18:33:28 +02:00
( " and d.id not in " + excludedDatasourceIDsStringList + " \ n " ) : " " ) +
" and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic . get ( ) + " \ n " +
" and not exists (select 1 from " + ImpalaConnector . databaseName + " .attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1) \ n " +
" and pu.url != '' and pu.url is not null \ n " + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
" limit " + ( assignmentsLimit * 10 ) + " ) \ n " +
2023-03-21 06:19:35 +01:00
" as non_distinct_results \ n " +
2023-04-10 14:00:23 +02:00
" order by coalesce(attempt_count, 0), coalesce(pub_year, 0) desc, reverse(pubid), url \ n " +
2023-03-21 06:19:35 +01:00
" limit " + assignmentsLimit + " ) \ n " +
" as findAssignmentsQuery " ;
2023-02-21 14:36:35 +01:00
2023-04-10 14:00:23 +02:00
2023-02-21 14:36:35 +01:00
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
2023-05-11 02:10:53 +02:00
logger . trace ( " findAssignmentsQuery: \ n " + findAssignmentsQuery ) ; // DEBUG!
2023-02-21 14:36:35 +01:00
2023-04-26 18:33:28 +02:00
final String getAssignmentsQuery = " select * from " + ImpalaConnector . databaseName + " .current_assignment " ;
2023-02-21 14:36:35 +01:00
List < Assignment > assignments = new ArrayList < > ( assignmentsLimit ) ;
ImpalaConnector . databaseLock . lock ( ) ;
String errorMsg = createAndInitializeCurrentAssignmentsTable ( findAssignmentsQuery ) ;
if ( errorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
long timestampMillis = System . currentTimeMillis ( ) ;
Timestamp timestamp = new Timestamp ( timestampMillis ) ; // Store it here, in order to have the same for all current records.
try {
jdbcTemplate . query ( getAssignmentsQuery , rs - > {
Assignment assignment = new Assignment ( ) ;
assignment . setWorkerId ( workerId ) ;
assignment . setTimestamp ( timestamp ) ;
Datasource datasource = new Datasource ( ) ;
try { // For each of the 4 columns returned. The indexing starts from 1
assignment . setId ( rs . getString ( 1 ) ) ;
assignment . setOriginalUrl ( rs . getString ( 2 ) ) ;
datasource . setId ( rs . getString ( 3 ) ) ;
datasource . setName ( rs . getString ( 4 ) ) ;
} catch ( SQLException sqle ) {
logger . error ( " No value was able to be retrieved from one of the columns of row_ " + rs . getRow ( ) , sqle ) ;
}
assignment . setDatasource ( datasource ) ;
assignments . add ( assignment ) ;
} ) ;
} catch ( Exception e ) {
errorMsg = ImpalaConnector . handleQueryException ( " getAssignmentsQuery " , getAssignmentsQuery , e ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
int assignmentsSize = assignments . size ( ) ;
if ( assignmentsSize = = 0 ) {
errorMsg = " No results retrieved from the \" findAssignmentsQuery \" for worker with id: " + workerId + " . Will increase the \" maxAttempts \" to " + maxAttemptsPerRecordAtomic . incrementAndGet ( ) + " for the next requests. " ;
logger . error ( errorMsg ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
if ( tmpErrMsg ! = null ) {
errorMsg + = " \ n " + tmpErrMsg ; // The additional error-msg is already logged.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} else
return ResponseEntity . status ( HttpStatus . MULTI_STATUS ) . body ( new AssignmentsResponse ( ( long ) - 1 , null ) ) ;
2023-04-26 18:33:28 +02:00
} else if ( assignmentsSize < assignmentsLimit )
2023-02-21 14:36:35 +01:00
logger . warn ( " The retrieved results were fewer ( " + assignmentsSize + " ) than the \" assignmentsLimit \" ( " + assignmentsLimit + " ), for worker with id: " + workerId + " . Will increase the \" maxAttempts \" to " + maxAttemptsPerRecordAtomic . incrementAndGet ( ) + " for the next requests. " ) ;
logger . debug ( " Finished gathering " + assignmentsSize + " assignments for worker with id \" " + workerId + " \" . Going to insert them into the \" assignment \" table and then return them to the worker. " ) ;
// Write the Assignment details to the assignment-table.
String insertAssignmentsQuery = " insert into " + ImpalaConnector . databaseName + " .assignment \ n select pub_data.pubid, pub_data.url, ' " + workerId + " ', " + timestampMillis + " \ n "
+ " from ( \ n select pubid, url from " + ImpalaConnector . databaseName + " .current_assignment) as pub_data " ;
try {
jdbcTemplate . execute ( insertAssignmentsQuery ) ;
} catch ( Exception e ) {
errorMsg = ImpalaConnector . handleQueryException ( " insertAssignmentsQuery " , insertAssignmentsQuery , e ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
errorMsg = dropCurrentAssignmentTable ( ) ;
if ( errorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
logger . debug ( " Finished inserting " + assignmentsSize + " assignments into the \" assignment \" -table. Going to merge the parquet files for this table. " ) ;
String mergeErrorMsg = fileUtils . mergeParquetFiles ( " assignment " , " " , null ) ;
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
}
ImpalaConnector . databaseLock . unlock ( ) ;
long curAssignmentsBatchCounter = assignmentsBatchCounter . incrementAndGet ( ) ;
logger . info ( " Sending batch-assignments_ " + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + " . " ) ;
return ResponseEntity . status ( HttpStatus . OK ) . body ( new AssignmentsResponse ( curAssignmentsBatchCounter , assignments ) ) ;
}
public ResponseEntity < ? > addWorkerReport ( String curWorkerId , long curReportAssignments , List < UrlReport > urlReports , int sizeOfUrlReports , HttpServletRequest request )
{
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils . UploadFullTextsResponse uploadFullTextsResponse = fileUtils . getAndUploadFullTexts ( urlReports , request , curReportAssignments , curWorkerId ) ;
if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . databaseError ) {
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( " Problem with the Impala-database! " ) ;
}
else if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . unsuccessful ) {
logger . error ( " Failed to get and/or upload the fullTexts for batch-assignments_ " + curReportAssignments ) ;
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
fileUtils . updateUrlReportsToHaveNoFullTextFiles ( urlReports , false ) ;
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
2023-04-28 13:58:33 +02:00
} else
2023-02-21 14:36:35 +01:00
logger . debug ( " Finished uploading the full-texts from batch-assignments_ " + curReportAssignments ) ;
String currentParquetPath = parquetFileUtils . parquetBaseLocalDirectoryPath + " assignments_ " + curReportAssignments + File . separator ;
2023-04-28 13:58:33 +02:00
try {
Files . createDirectories ( Paths . get ( currentParquetPath ) ) ; // No-op if it already exists. It does not throw a "alreadyExistsException"
} catch ( Exception e ) {
String errorMsg = " Could not create the parquet-directory: " + currentParquetPath ;
logger . error ( errorMsg , e ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
2023-02-21 14:36:35 +01:00
}
logger . debug ( " Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_ " + curReportAssignments ) ;
List < Callable < ParquetReport > > callableTasks = parquetFileUtils . getTasksForCreatingAndUploadingParquetFiles ( urlReports , sizeOfUrlReports , curReportAssignments , currentParquetPath , uploadFullTextsResponse ) ;
boolean hasAttemptParquetFileProblem = false ;
boolean hasPayloadParquetFileProblem = false ;
2023-04-10 21:28:53 +02:00
try { // Invoke all the tasks and wait for them to finish.
2023-02-21 14:36:35 +01:00
List < Future < ParquetReport > > futures = insertsExecutor . invokeAll ( callableTasks ) ;
SumParquetSuccess sumParquetSuccess = parquetFileUtils . checkParquetFilesSuccess ( futures ) ;
ResponseEntity < ? > errorResponseEntity = sumParquetSuccess . getResponseEntity ( ) ;
if ( errorResponseEntity ! = null ) {
return errorResponseEntity ; // The related log is already shown.
}
hasAttemptParquetFileProblem = sumParquetSuccess . isAttemptParquetFileProblem ( ) ;
hasPayloadParquetFileProblem = sumParquetSuccess . isPayloadParquetFileProblem ( ) ;
if ( hasAttemptParquetFileProblem & & hasPayloadParquetFileProblem )
throw new RuntimeException ( " All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_ " + curReportAssignments ) ;
else {
if ( hasAttemptParquetFileProblem )
logger . error ( " All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \" attempt \" , for batch-assignments_ " + curReportAssignments ) ;
else if ( hasPayloadParquetFileProblem )
2023-04-10 14:55:50 +02:00
logger . error ( " The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \" payload_aggregated \" , for batch-assignments_ " + curReportAssignments ) ;
2023-02-21 14:36:35 +01:00
else
logger . debug ( " Going to execute \" load \" -requests on the database, for the uploaded parquet-files. " ) ;
}
// Load all the parquet files of each type into its table.
ImpalaConnector . databaseLock . lock ( ) ;
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils . loadParquetDataIntoTable ( parquetFileUtils . parquetHDFSDirectoryPathAttempts , " attempt " ) ;
if ( ! hasPayloadParquetFileProblem )
2023-04-10 14:55:50 +02:00
hasPayloadParquetFileProblem = ! parquetFileUtils . loadParquetDataIntoTable ( parquetFileUtils . parquetHDFSDirectoryPathPayloadsAggregated , " payload_aggregated " ) ;
2023-02-21 14:36:35 +01:00
ImpalaConnector . databaseLock . unlock ( ) ;
if ( hasAttemptParquetFileProblem & & hasPayloadParquetFileProblem )
2023-04-10 14:55:50 +02:00
throw new RuntimeException ( " The data from the HDFS parquet sub-directories COULD NOT be loaded into the \" attempt \" and the \" payload_aggregated \" tables, for batch-assignments_ " + curReportAssignments ) ;
2023-02-21 14:36:35 +01:00
else if ( hasAttemptParquetFileProblem | | hasPayloadParquetFileProblem )
2023-04-10 14:55:50 +02:00
logger . error ( " The data from the HDFS parquet sub-directories COULD NOT be loaded into the \" attempt \" or the \" payload_aggregated \" table, for batch-assignments_ " + curReportAssignments ) ;
2023-02-21 14:36:35 +01:00
else
2023-04-10 14:55:50 +02:00
logger . debug ( " The data from the HDFS parquet sub-directories was loaded into the \" attempt \" and the \" payload_aggregated \" tables, for batch-assignments_ " + curReportAssignments ) ;
2023-02-21 14:36:35 +01:00
} catch ( InterruptedException ie ) { // In this case, any unfinished tasks are cancelled.
logger . warn ( " The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie . getMessage ( ) ) ;
// This is a very rare case. At the moment, we just move on with table-merging.
} catch ( RuntimeException re ) {
String errorMsg = re . getMessage ( ) ;
ImpalaConnector . databaseLock . lock ( ) ;
String assignmentErrorMsg = deleteWorkerAssignments ( curWorkerId ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
if ( assignmentErrorMsg ! = null ) {
errorMsg + = " \ n " + assignmentErrorMsg ;
}
logger . error ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} catch ( Exception e ) {
2023-04-10 14:55:50 +02:00
String errorMsg = " Unexpected error when inserting into the \" attempt \" and \" payload_aggregated \" tables in parallel! " + e . getMessage ( ) ;
2023-02-21 14:36:35 +01:00
logger . error ( errorMsg , e ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} finally {
logger . debug ( " Deleting parquet directory: " + currentParquetPath ) ;
2023-05-11 02:07:55 +02:00
fileUtils . deleteDirectory ( new File ( currentParquetPath ) ) ;
2023-02-21 14:36:35 +01:00
}
logger . debug ( " Going to merge the parquet files for the tables which were altered. " ) ;
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
String mergeErrorMsg ;
ImpalaConnector . databaseLock . lock ( ) ;
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = fileUtils . mergeParquetFiles ( " attempt " , " " , null ) ;
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
}
}
if ( ! hasPayloadParquetFileProblem ) {
2023-04-10 14:55:50 +02:00
mergeErrorMsg = fileUtils . mergeParquetFiles ( " payload_aggregated " , " " , null ) ;
2023-02-21 14:36:35 +01:00
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
}
}
mergeErrorMsg = deleteWorkerAssignments ( curWorkerId ) ;
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
}
ImpalaConnector . databaseLock . unlock ( ) ;
logger . debug ( " Finished merging the database tables. " ) ;
if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . unsuccessful )
return ResponseEntity . status ( HttpStatus . MULTI_STATUS ) . body ( " The full-text files failed to be acquired from the worker! " ) ;
else
return ResponseEntity . status ( HttpStatus . OK ) . build ( ) ;
}
private String createAndInitializeCurrentAssignmentsTable ( String findAssignmentsQuery )
{
String createCurrentAssignmentsQuery = " create table " + ImpalaConnector . databaseName + " .current_assignment as \ n " + findAssignmentsQuery ;
String computeCurrentAssignmentsStatsQuery = " COMPUTE STATS " + ImpalaConnector . databaseName + " .current_assignment " ;
try {
jdbcTemplate . execute ( createCurrentAssignmentsQuery ) ;
} catch ( Exception e ) {
String errorMsg = ImpalaConnector . handleQueryException ( " createCurrentAssignmentsQuery " , createCurrentAssignmentsQuery , e ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ; // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
return errorMsg ;
}
try {
jdbcTemplate . execute ( computeCurrentAssignmentsStatsQuery ) ;
} catch ( Exception e ) {
String errorMsg = ImpalaConnector . handleQueryException ( " computeCurrentAssignmentsStatsQuery " , computeCurrentAssignmentsStatsQuery , e ) ;
String tmpErrMsg = dropCurrentAssignmentTable ( ) ;
if ( tmpErrMsg ! = null )
errorMsg + = " \ n " + tmpErrMsg ;
return errorMsg ;
}
return null ; // All good.
}
private String dropCurrentAssignmentTable ( ) {
String dropCurrentAssignmentsQuery = " DROP TABLE IF EXISTS " + ImpalaConnector . databaseName + " .current_assignment PURGE " ;
try {
jdbcTemplate . execute ( dropCurrentAssignmentsQuery ) ;
return null ; // All good. No error-message.
} catch ( Exception e ) {
return ImpalaConnector . handleQueryException ( " dropCurrentAssignmentsQuery " , dropCurrentAssignmentsQuery , e ) ; // The error is already logged inside.
}
}
private String deleteWorkerAssignments ( String curWorkerId )
{
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
2023-04-10 14:55:50 +02:00
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload_aggregated" table for previously handled tasks.
2023-02-21 14:36:35 +01:00
return fileUtils . mergeParquetFiles ( " assignment " , " WHERE workerid != " , curWorkerId ) ;
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery ( Connection con , String baseInsertQuery , int dataSize , int numParamsPerRow ) throws RuntimeException {
StringBuilder sb = new StringBuilder ( baseInsertQuery . length ( ) + ( dataSize * 6 * numParamsPerRow ) ) ; // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb . append ( baseInsertQuery ) ;
for ( int i = 1 ; i < = dataSize ; + + i ) {
sb . append ( " ( " ) ;
for ( int j = 1 ; j < = numParamsPerRow ; + + j ) {
sb . append ( " ? " ) ;
if ( j < numParamsPerRow )
sb . append ( " , " ) ;
}
sb . append ( " ) " ) ;
if ( i < dataSize )
sb . append ( " , " ) ;
}
PreparedStatement preparedInsertStatement ;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con . prepareStatement ( sb . toString ( ) ) ;
} catch ( SQLException sqle ) {
String errorMsg = " Problem when creating the prepared statement for the insertQuery: \" " + baseInsertQuery + " \" ...! \ n " ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
throw new RuntimeException ( errorMsg ) ;
}
return preparedInsertStatement ;
}
}