Handle the case where the "stats"-queries are executed while some table of the DB are in a "merge" state. In this case, the queries fail and the Controller retries up to 10 times.

This commit is contained in:
Lampros Smyrnaios 2023-07-06 18:29:13 +03:00
parent e8644cb64f
commit d5c139c410
3 changed files with 49 additions and 12 deletions

View File

@ -37,7 +37,7 @@ public class StatsController {
logger.info("Received a \"getNumberOfAllPayloads\" request.");
final String getAllPayloadsNumberQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload";
return statsService.getNumberOfPayloads(getAllPayloadsNumberQuery, "all payloads");
return statsService.getNumberOfPayloads(getAllPayloadsNumberQuery, "all payloads", 0);
}
@ -51,7 +51,7 @@ public class StatsController {
logger.info("Received a \"getNumberOfPayloadsAggregatedByServiceThroughCrawling\" request.");
String getNumOfPayloadsAggregatedByServiceThroughCrawlingQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload_aggregated";
return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceThroughCrawlingQuery, "payloads aggregated by the Service through crawling");
return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceThroughCrawlingQuery, "payloads aggregated by the Service through crawling", 0);
}
@ -65,7 +65,7 @@ public class StatsController {
logger.info("Received a \"getNumberOfPayloadsAggregatedByServiceThroughBulkImport\" request.");
String getNumOfPayloadsAggregatedByServiceThroughBulkImportQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload_bulk_import";
return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceThroughBulkImportQuery, "payloads aggregated by the Service through BulkImport procedures");
return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceThroughBulkImportQuery, "payloads aggregated by the Service through BulkImport procedures", 0);
}
@ -83,7 +83,7 @@ public class StatsController {
" union all\n" +
" select id from " + ImpalaConnector.databaseName + ".payload_bulk_import)\n" +
" as payloads_from_service";
return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceQuery, "payloads aggregated by the Service, through both crawling and bulk-import procedures");
return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceQuery, "payloads aggregated by the Service, through both crawling and bulk-import procedures", 0);
}
@ -97,7 +97,7 @@ public class StatsController {
logger.info("Received a \"getNumberOfLegacyPayloads\" request.");
String getNumOfLegacyPayloadsQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload_legacy";
return statsService.getNumberOfPayloads(getNumOfLegacyPayloadsQuery, "legacy payloads");
return statsService.getNumberOfPayloads(getNumOfLegacyPayloadsQuery, "legacy payloads", 0);
}
@ -114,7 +114,7 @@ public class StatsController {
if ( logger.isTraceEnabled() )
logger.trace("getNumOfPayloadsForDatasourceQuery:\n" + getNumOfPayloadsForDatasourceQuery);
return statsService.getNumberOfPayloads(getNumOfPayloadsForDatasourceQuery, "payloads related to datasourceId \"" + datasourceId + "\"");
return statsService.getNumberOfPayloads(getNumOfPayloadsForDatasourceQuery, "payloads related to datasourceId \"" + datasourceId + "\"", 0);
}
@ -152,7 +152,7 @@ public class StatsController {
public ResponseEntity<?> getNumberOfAllDistinctFullTexts() {
logger.info("Received a \"getNumberOfAllDistinctFullTexts\" request.");
final String getPayloadsNumberQuery = "select count(distinct `hash`) from " + ImpalaConnector.databaseName + ".payload";
return statsService.getNumberOfPayloads(getPayloadsNumberQuery, "distinct full-text files");
return statsService.getNumberOfPayloads(getPayloadsNumberQuery, "distinct full-text files", 0);
}
@ -165,7 +165,7 @@ public class StatsController {
if ( ! isCalledFromScheduler )
logger.info("Received a \"getNumberOfRecordsInspectedByServiceThroughCrawling\" request.");
return statsService.getNumberOfRecordsInspectedByServiceThroughCrawling();
return statsService.getNumberOfRecordsInspectedByServiceThroughCrawling(0);
}
}

View File

@ -4,8 +4,8 @@ import org.springframework.http.ResponseEntity;
public interface StatsService {
ResponseEntity<?> getNumberOfPayloads(String getPayloadsNumberQuery, String extraMsg);
ResponseEntity<?> getNumberOfPayloads(String getPayloadsNumberQuery, String extraMsg, int retryNum);
ResponseEntity<?> getNumberOfRecordsInspectedByServiceThroughCrawling();
ResponseEntity<?> getNumberOfRecordsInspectedByServiceThroughCrawling(int retryNum);
}

View File

@ -19,9 +19,19 @@ public class StatsServiceImpl implements StatsService {
private JdbcTemplate jdbcTemplate;
// No DB-lock is required for these READ-operations.
// BUT! The is an issue.. these queries may run while a "table-merging" operation is in progress.. thus resulting in "no table reference" and "no file found (fieName.parquet)"
// Thus, we need to have an "error-detection-and-retry" mechanism, in order to avoid returning error that we know will exist in certain times and we can overcome them.
// The final time-to-return of the results-retrieval methods may be somewhat large, but the alternative of returning predictable errors or locking the DB and slowing down the aggregation system are even worse.
public ResponseEntity<?> getNumberOfPayloads(String getNumberQuery, String message, int retryNum)
{
if ( retryNum > 10 ) {
String errorMsg = "Could not find the requested payload-type table in an non-merging state, after " + retryNum + " retries!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
public ResponseEntity<?> getNumberOfPayloads(String getNumberQuery, String message) {
try {
Object result = jdbcTemplate.queryForObject(getNumberQuery, Integer.class);
if ( result != null ) {
@ -33,6 +43,12 @@ public class StatsServiceImpl implements StatsService {
} catch (EmptyResultDataAccessException erdae) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The number of " + message + " could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getNumberQuery: " + getNumberQuery);
} catch (Exception e) {
String exMsg = e.getMessage();
if ( (exMsg != null) && (exMsg.contains("Could not resolve table reference") || exMsg.contains("Failed to open HDFS file")) ) {
sleep1min();
return getNumberOfPayloads(getNumberQuery, message, (++retryNum));
}
String errorMsg = "Problem when executing \"getNumberQuery\": " + getNumberQuery;
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
@ -41,8 +57,14 @@ public class StatsServiceImpl implements StatsService {
}
public ResponseEntity<?> getNumberOfRecordsInspectedByServiceThroughCrawling()
public ResponseEntity<?> getNumberOfRecordsInspectedByServiceThroughCrawling(int retryNum)
{
if ( retryNum > 10 ) {
String errorMsg = "Could not find the requested attempt table in an non-merging state, after " + retryNum + " retries!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
// Note that until all the records are inspected, the "attempt" table contains all the inspected records PLUS very few duplicates (id-url) which come from the publications-database.
// After all the records are inspected, it contains duplicate records of more and more id-urls, as time goes on, since for every eligible record the Service re-attempts to get the full-text.
// So in order to get the number of inspected records, we want the distinct number, which at some point it will remain stable, even though the Service will try again and again some records.
@ -60,6 +82,12 @@ public class StatsServiceImpl implements StatsService {
} catch (EmptyResultDataAccessException erdae) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery);
} catch (Exception e) {
String exMsg = e.getMessage();
if ( (exMsg != null) && (exMsg.contains("Could not resolve table reference") || exMsg.contains("Failed to open HDFS file")) ) {
sleep1min();
return getNumberOfRecordsInspectedByServiceThroughCrawling(++retryNum);
}
String errorMsg = "Problem when executing \"getInspectedRecordsNumberQuery\": " + getInspectedRecordsNumberQuery;
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
@ -70,4 +98,13 @@ public class StatsServiceImpl implements StatsService {
// To get the human-friendly timestamp format from the BigInt in the database:
// select from_timestamp(CAST(CAST(`date` as decimal(30,0))/1000 AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload
private void sleep1min() {
try {
Thread.sleep(60_000); // Sleep for 1 min.
} catch (InterruptedException ie) {
logger.warn("Sleeping was interrupted!");
}
}
}