UrlsController/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java

111 lines
7.0 KiB
Java

package eu.openaire.urls_controller.services;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
@Service
public class StatsServiceImpl implements StatsService {
private static final Logger logger = LoggerFactory.getLogger(StatsServiceImpl.class);
@Autowired
private JdbcTemplate jdbcTemplate;
// No DB-lock is required for these READ-operations.
// BUT! There is an issue.. these queries may run while a "table-merging" operation is in progress.. thus resulting in "no table reference" and "no file found (fieName.parquet)"
// Thus, we need to have an "error-detection-and-retry" mechanism, in order to avoid returning error that we know will exist in certain times and we can overcome them.
// The final time-to-return of the results-retrieval methods may be somewhat large, but the alternative of returning predictable errors or locking the DB and slowing down the aggregation system are even worse.
public ResponseEntity<?> getNumberOfPayloads(String getNumberQuery, String message, int retryCount)
{
if ( retryCount > 10 ) {
String errorMsg = "Could not find the requested payload-type table in an non-merging state, after " + (retryCount -1) + " retries!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
try {
Object result = jdbcTemplate.queryForObject(getNumberQuery, Integer.class);
if ( result != null ) {
int numOfPayloads = (int) result;
logger.info("The number of " + message + " in the database \"" + DatabaseConnector.databaseName + "\" is " + numOfPayloads);
return ResponseEntity.ok(Integer.toString(numOfPayloads));
} else
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The number of " + message + " could not be retrieved (it was null) from the database \"" + DatabaseConnector.databaseName + "\" using the getNumberQuery: " + getNumberQuery);
} catch (EmptyResultDataAccessException erdae) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The number of " + message + " could not be retrieved (empty result) from the database \"" + DatabaseConnector.databaseName + "\" using the getNumberQuery: " + getNumberQuery);
} catch (Exception e) {
String exMsg = e.getMessage();
if ( (exMsg != null) && (exMsg.contains("Could not resolve table reference") || exMsg.contains("Failed to open HDFS file")) ) {
sleep1min();
return getNumberOfPayloads(getNumberQuery, message, (++retryCount));
}
String errorMsg = "Problem when executing \"getNumberQuery\": " + getNumberQuery;
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
// We may get a "Class Cast Exception", in case the Impala returns a non-integer value.
}
}
public ResponseEntity<?> getNumberOfRecordsInspectedByServiceThroughCrawling(int retryCount)
{
if ( retryCount > 10 ) {
String errorMsg = "Could not find the requested attempt table in an non-merging state, after " + (retryCount -1) + " retries!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
// Note that until all the records are inspected, the "attempt" table contains all the inspected records PLUS very few duplicates (id-url) which come from the publications-database.
// After all the records are inspected, it contains duplicate records of more and more id-urls, as time goes on, since for every eligible record the Service re-attempts to get the full-text.
// So in order to get the number of inspected records, we want the distinct number, which at some point it will remain stable, even though the Service will try again and again some records.
// Before all the records are inspected, this endpoint will report all the inspected records MINUS the duplicate records which come straight from the "publication" table.
final String getInspectedRecordsNumberQuery = "select count(dist.id) from (select distinct id, original_url from " + DatabaseConnector.databaseName + ".attempt) as dist";
try {
Object result = jdbcTemplate.queryForObject(getInspectedRecordsNumberQuery, Integer.class);
if ( result != null ) {
int numOfInspectedRecords = (int) result;
logger.info("Number of crawling-inspected records from the database \"" + DatabaseConnector.databaseName + "\" is " + numOfInspectedRecords);
return ResponseEntity.ok(Integer.toString(numOfInspectedRecords));
} else
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + DatabaseConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery);
} catch (EmptyResultDataAccessException erdae) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + DatabaseConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery);
} catch (Exception e) {
String exMsg = e.getMessage();
if ( (exMsg != null) && (exMsg.contains("Could not resolve table reference") || exMsg.contains("Failed to open HDFS file")) ) {
sleep1min();
return getNumberOfRecordsInspectedByServiceThroughCrawling(++retryCount);
}
String errorMsg = "Problem when executing \"getInspectedRecordsNumberQuery\": " + getInspectedRecordsNumberQuery;
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
// We may get a "Class Cast Exception", in case the Impala returns a non-integer value.
}
}
// To get the human-friendly timestamp format from the BigInt in the database:
// select from_timestamp(CAST(CAST(`date` as decimal(30,0))/1000 AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload
// Or simpler: select from_timestamp(CAST((`date`/1000) AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload
private void sleep1min() {
try {
Thread.sleep(60_000); // Sleep for 1 min.
} catch (InterruptedException ie) {
logger.warn("Sleeping was interrupted!");
}
}
}