2021-11-09 22:59:27 +01:00
package eu.openaire.urls_controller.configuration ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
2022-01-30 21:14:52 +01:00
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.jdbc.core.JdbcTemplate ;
import org.springframework.stereotype.Repository ;
2021-12-21 14:55:27 +01:00
2022-01-30 21:14:52 +01:00
import javax.annotation.PostConstruct ;
2021-11-09 22:59:27 +01:00
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
2022-01-30 21:14:52 +01:00
@Repository
2022-02-01 15:57:28 +01:00
public class ImpalaConnector {
2021-11-09 22:59:27 +01:00
private static final Logger logger = LoggerFactory . getLogger ( ImpalaConnector . class ) ;
2022-01-30 21:14:52 +01:00
@Autowired
private JdbcTemplate jdbcTemplate ;
2021-11-09 22:59:27 +01:00
2022-04-08 16:39:45 +02:00
private final boolean isTestEnvironment ;
2022-04-04 23:01:44 +02:00
private final String initialDatabaseName ;
2022-04-08 16:39:45 +02:00
private final String testDatabaseName ;
public static String databaseName ;
2021-11-09 22:59:27 +01:00
public static final Lock databaseLock = new ReentrantLock ( true ) ; // This lock is locking the threads trying to execute queries in the database.
2022-02-02 19:19:46 +01:00
2022-04-08 16:39:45 +02:00
public ImpalaConnector ( @Value ( " ${services.pdfaggregation.controller.isTestEnvironment} " ) boolean isTestEnvironment ,
@Value ( " ${services.pdfaggregation.controller.db.initialDatabaseName} " ) String initialDatabaseName ,
@Value ( " ${services.pdfaggregation.controller.db.testDatabaseName} " ) String testDatabaseName ) {
this . isTestEnvironment = isTestEnvironment ;
2022-04-04 23:01:44 +02:00
this . initialDatabaseName = initialDatabaseName ;
2022-04-08 16:39:45 +02:00
this . testDatabaseName = testDatabaseName ;
2022-02-01 01:08:02 +01:00
}
2022-02-02 19:19:46 +01:00
2022-01-30 21:14:52 +01:00
@PostConstruct
public void init ( ) {
2021-11-09 22:59:27 +01:00
logger . info ( " Max available memory to the Controller: " + Runtime . getRuntime ( ) . maxMemory ( ) + " bytes. " ) ;
try {
2022-02-02 19:19:46 +01:00
boolean supportsBatchUpdates = jdbcTemplate . getDataSource ( ) . getConnection ( ) . getMetaData ( ) . supportsBatchUpdates ( ) ;
logger . info ( " The database " + ( supportsBatchUpdates ? " supports " : " does not support " ) + " \" BatchUpdates \" ! " ) ;
2022-01-30 21:14:52 +01:00
} catch ( Exception e ) {
2022-02-02 19:19:46 +01:00
logger . error ( " Error testing if database supports batch updates! " , e ) ;
2021-11-09 22:59:27 +01:00
}
2022-02-02 19:19:46 +01:00
createDatabase ( ) ; // In case of an exception, the App will exit with the stacktrace.
2021-11-09 22:59:27 +01:00
}
2022-02-02 19:19:46 +01:00
2022-04-08 16:39:45 +02:00
private void createDatabase ( )
{
if ( isTestEnvironment ) {
2022-05-26 14:43:59 +02:00
logger . info ( " Going to create (if not exist) the test-database \" " + testDatabaseName + " \" and its tables. Also will fill some tables with data from initial-database \" " + initialDatabaseName + " \" . " ) ;
2022-04-08 16:39:45 +02:00
jdbcTemplate . execute ( " CREATE DATABASE IF NOT EXISTS " + testDatabaseName ) ;
jdbcTemplate . execute ( " CREATE TABLE IF NOT EXISTS " + testDatabaseName + " .publication stored as parquet as select * from " + initialDatabaseName + " .publication " ) ;
jdbcTemplate . execute ( " COMPUTE STATS " + testDatabaseName + " .publication " ) ;
2021-11-09 22:59:27 +01:00
2022-04-08 16:39:45 +02:00
jdbcTemplate . execute ( " CREATE TABLE IF NOT EXISTS " + testDatabaseName + " .publication_pids stored as parquet as select * from " + initialDatabaseName + " .publication_pids " ) ;
jdbcTemplate . execute ( " COMPUTE STATS " + testDatabaseName + " .publication_pids " ) ;
2021-11-09 22:59:27 +01:00
2022-04-08 16:39:45 +02:00
jdbcTemplate . execute ( " CREATE TABLE IF NOT EXISTS " + testDatabaseName + " .publication_urls stored as parquet as select * from " + initialDatabaseName + " .publication_urls " ) ;
jdbcTemplate . execute ( " COMPUTE STATS " + testDatabaseName + " .publication_urls " ) ;
2021-11-09 22:59:27 +01:00
2022-04-08 16:39:45 +02:00
jdbcTemplate . execute ( " CREATE TABLE IF NOT EXISTS " + testDatabaseName + " .datasource stored as parquet as select * from " + initialDatabaseName + " .datasource " ) ;
jdbcTemplate . execute ( " COMPUTE STATS " + testDatabaseName + " .datasource " ) ;
2021-11-09 22:59:27 +01:00
2022-04-08 16:39:45 +02:00
jdbcTemplate . execute ( " CREATE TABLE IF NOT EXISTS " + testDatabaseName + " .assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet " ) ;
jdbcTemplate . execute ( " COMPUTE STATS " + testDatabaseName + " .assignment " ) ;
2021-11-09 22:59:27 +01:00
2022-04-08 16:39:45 +02:00
databaseName = testDatabaseName ; // For the rest of the queries.
2022-04-11 20:15:22 +02:00
} else {
logger . info ( " Going to create or validate the tables that are populated by the Controller, for the \" initialDatabase \" = \" " + initialDatabaseName + " \" " ) ;
2022-04-08 16:39:45 +02:00
databaseName = initialDatabaseName ;
2022-04-11 20:15:22 +02:00
}
2021-11-09 22:59:27 +01:00
2022-04-08 16:39:45 +02:00
// For both cases, got check and create the tables which will be populated by the Controller.
2021-11-09 22:59:27 +01:00
2022-04-08 16:39:45 +02:00
// Drop the "current_assignment" table. It is a temporary table which is created on-demand during execution.
2022-02-01 01:08:02 +01:00
jdbcTemplate . execute ( " DROP TABLE IF EXISTS " + databaseName + " .current_assignment PURGE " ) ;
2022-01-19 00:37:47 +01:00
2022-01-30 21:14:52 +01:00
jdbcTemplate . execute ( " CREATE TABLE IF NOT EXISTS " + databaseName + " .attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet " ) ;
jdbcTemplate . execute ( " COMPUTE STATS " + databaseName + " .attempt " ) ;
2021-11-09 22:59:27 +01:00
2022-01-30 21:14:52 +01:00
jdbcTemplate . execute ( " CREATE TABLE IF NOT EXISTS " + databaseName + " .payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet " ) ;
jdbcTemplate . execute ( " COMPUTE STATS " + databaseName + " .payload " ) ;
2021-11-09 22:59:27 +01:00
2022-04-08 16:39:45 +02:00
logger . info ( " The " + ( isTestEnvironment ? " test- " : " " ) + " database \" " + databaseName + " \" and its tables were created or validated. " ) ;
2021-11-09 22:59:27 +01:00
}
2022-02-02 19:19:46 +01:00
public static String handleQueryException ( String queryName , String query , Exception e ) {
2022-02-07 12:57:09 +01:00
String errorMsg = " Problem when executing the query \" " + queryName + " \" ! \ n " ;
2022-01-30 21:14:52 +01:00
logger . error ( errorMsg + " \ n \ n " + query + " \ n \ n " , e ) ;
2021-11-30 12:26:19 +01:00
return errorMsg ;
}
2022-02-02 19:19:46 +01:00
2021-11-09 22:59:27 +01:00
}