diff --git a/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java b/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java index 4ff5cf6..b830f70 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java @@ -37,7 +37,7 @@ public class StatsController { logger.info("Received a \"getNumberOfAllPayloads\" request."); final String getAllPayloadsNumberQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload"; - return statsService.getNumberOfPayloads(getAllPayloadsNumberQuery, "all payloads"); + return statsService.getNumberOfPayloads(getAllPayloadsNumberQuery, "all payloads", 0); } @@ -51,7 +51,7 @@ public class StatsController { logger.info("Received a \"getNumberOfPayloadsAggregatedByServiceThroughCrawling\" request."); String getNumOfPayloadsAggregatedByServiceThroughCrawlingQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload_aggregated"; - return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceThroughCrawlingQuery, "payloads aggregated by the Service through crawling"); + return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceThroughCrawlingQuery, "payloads aggregated by the Service through crawling", 0); } @@ -65,7 +65,7 @@ public class StatsController { logger.info("Received a \"getNumberOfPayloadsAggregatedByServiceThroughBulkImport\" request."); String getNumOfPayloadsAggregatedByServiceThroughBulkImportQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload_bulk_import"; - return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceThroughBulkImportQuery, "payloads aggregated by the Service through BulkImport procedures"); + return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceThroughBulkImportQuery, "payloads aggregated by the Service through BulkImport procedures", 0); } @@ -83,7 +83,7 @@ public class StatsController { " union all\n" + " select id from " + ImpalaConnector.databaseName + ".payload_bulk_import)\n" + " as payloads_from_service"; - return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceQuery, "payloads aggregated by the Service, through both crawling and bulk-import procedures"); + return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceQuery, "payloads aggregated by the Service, through both crawling and bulk-import procedures", 0); } @@ -97,7 +97,7 @@ public class StatsController { logger.info("Received a \"getNumberOfLegacyPayloads\" request."); String getNumOfLegacyPayloadsQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload_legacy"; - return statsService.getNumberOfPayloads(getNumOfLegacyPayloadsQuery, "legacy payloads"); + return statsService.getNumberOfPayloads(getNumOfLegacyPayloadsQuery, "legacy payloads", 0); } @@ -114,7 +114,7 @@ public class StatsController { if ( logger.isTraceEnabled() ) logger.trace("getNumOfPayloadsForDatasourceQuery:\n" + getNumOfPayloadsForDatasourceQuery); - return statsService.getNumberOfPayloads(getNumOfPayloadsForDatasourceQuery, "payloads related to datasourceId \"" + datasourceId + "\""); + return statsService.getNumberOfPayloads(getNumOfPayloadsForDatasourceQuery, "payloads related to datasourceId \"" + datasourceId + "\"", 0); } @@ -152,7 +152,7 @@ public class StatsController { public ResponseEntity getNumberOfAllDistinctFullTexts() { logger.info("Received a \"getNumberOfAllDistinctFullTexts\" request."); final String getPayloadsNumberQuery = "select count(distinct `hash`) from " + ImpalaConnector.databaseName + ".payload"; - return statsService.getNumberOfPayloads(getPayloadsNumberQuery, "distinct full-text files"); + return statsService.getNumberOfPayloads(getPayloadsNumberQuery, "distinct full-text files", 0); } @@ -165,7 +165,7 @@ public class StatsController { if ( ! isCalledFromScheduler ) logger.info("Received a \"getNumberOfRecordsInspectedByServiceThroughCrawling\" request."); - return statsService.getNumberOfRecordsInspectedByServiceThroughCrawling(); + return statsService.getNumberOfRecordsInspectedByServiceThroughCrawling(0); } } diff --git a/src/main/java/eu/openaire/urls_controller/services/StatsService.java b/src/main/java/eu/openaire/urls_controller/services/StatsService.java index fe669b2..5fa3014 100644 --- a/src/main/java/eu/openaire/urls_controller/services/StatsService.java +++ b/src/main/java/eu/openaire/urls_controller/services/StatsService.java @@ -4,8 +4,8 @@ import org.springframework.http.ResponseEntity; public interface StatsService { - ResponseEntity getNumberOfPayloads(String getPayloadsNumberQuery, String extraMsg); + ResponseEntity getNumberOfPayloads(String getPayloadsNumberQuery, String extraMsg, int retryNum); - ResponseEntity getNumberOfRecordsInspectedByServiceThroughCrawling(); + ResponseEntity getNumberOfRecordsInspectedByServiceThroughCrawling(int retryNum); } diff --git a/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java index ebfca8b..df62f1d 100644 --- a/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java @@ -19,9 +19,19 @@ public class StatsServiceImpl implements StatsService { private JdbcTemplate jdbcTemplate; // No DB-lock is required for these READ-operations. + // BUT! The is an issue.. these queries may run while a "table-merging" operation is in progress.. thus resulting in "no table reference" and "no file found (fieName.parquet)" + // Thus, we need to have an "error-detection-and-retry" mechanism, in order to avoid returning error that we know will exist in certain times and we can overcome them. + // The final time-to-return of the results-retrieval methods may be somewhat large, but the alternative of returning predictable errors or locking the DB and slowing down the aggregation system are even worse. + + public ResponseEntity getNumberOfPayloads(String getNumberQuery, String message, int retryNum) + { + if ( retryNum > 10 ) { + String errorMsg = "Could not find the requested payload-type table in an non-merging state, after " + retryNum + " retries!"; + logger.error(errorMsg); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } - public ResponseEntity getNumberOfPayloads(String getNumberQuery, String message) { try { Object result = jdbcTemplate.queryForObject(getNumberQuery, Integer.class); if ( result != null ) { @@ -33,6 +43,12 @@ public class StatsServiceImpl implements StatsService { } catch (EmptyResultDataAccessException erdae) { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The number of " + message + " could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getNumberQuery: " + getNumberQuery); } catch (Exception e) { + String exMsg = e.getMessage(); + if ( (exMsg != null) && (exMsg.contains("Could not resolve table reference") || exMsg.contains("Failed to open HDFS file")) ) { + sleep1min(); + return getNumberOfPayloads(getNumberQuery, message, (++retryNum)); + } + String errorMsg = "Problem when executing \"getNumberQuery\": " + getNumberQuery; logger.error(errorMsg, e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); @@ -41,8 +57,14 @@ public class StatsServiceImpl implements StatsService { } - public ResponseEntity getNumberOfRecordsInspectedByServiceThroughCrawling() + public ResponseEntity getNumberOfRecordsInspectedByServiceThroughCrawling(int retryNum) { + if ( retryNum > 10 ) { + String errorMsg = "Could not find the requested attempt table in an non-merging state, after " + retryNum + " retries!"; + logger.error(errorMsg); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } + // Note that until all the records are inspected, the "attempt" table contains all the inspected records PLUS very few duplicates (id-url) which come from the publications-database. // After all the records are inspected, it contains duplicate records of more and more id-urls, as time goes on, since for every eligible record the Service re-attempts to get the full-text. // So in order to get the number of inspected records, we want the distinct number, which at some point it will remain stable, even though the Service will try again and again some records. @@ -60,6 +82,12 @@ public class StatsServiceImpl implements StatsService { } catch (EmptyResultDataAccessException erdae) { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery); } catch (Exception e) { + String exMsg = e.getMessage(); + if ( (exMsg != null) && (exMsg.contains("Could not resolve table reference") || exMsg.contains("Failed to open HDFS file")) ) { + sleep1min(); + return getNumberOfRecordsInspectedByServiceThroughCrawling(++retryNum); + } + String errorMsg = "Problem when executing \"getInspectedRecordsNumberQuery\": " + getInspectedRecordsNumberQuery; logger.error(errorMsg, e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); @@ -70,4 +98,13 @@ public class StatsServiceImpl implements StatsService { // To get the human-friendly timestamp format from the BigInt in the database: // select from_timestamp(CAST(CAST(`date` as decimal(30,0))/1000 AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload + + private void sleep1min() { + try { + Thread.sleep(60_000); // Sleep for 1 min. + } catch (InterruptedException ie) { + logger.warn("Sleeping was interrupted!"); + } + } + }