2021-11-09 22:59:27 +01:00
package eu.openaire.urls_controller.configuration ;
import com.zaxxer.hikari.HikariConfig ;
import com.zaxxer.hikari.HikariDataSource ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
2021-12-21 14:55:27 +01:00
2021-11-09 22:59:27 +01:00
import java.beans.PropertyVetoException ;
import java.io.File ;
import java.io.FileReader ;
import java.sql.* ;
import java.util.Properties ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
public final class ImpalaConnector {
private static final Logger logger = LoggerFactory . getLogger ( ImpalaConnector . class ) ;
public static String impalaDriver ;
public static String impalaConnectionUrl ;
2022-01-28 06:24:42 +01:00
public static String oldDatabaseName ;
public static String databaseName ;
2021-11-09 22:59:27 +01:00
public static String poolName ;
public static int hikariMaxConnectionPoolSize ;
public static int hikariMinIdleConnections ;
public static int hikariConnectionTimeOut ;
public static int hikariIdleTimeOut ;
public static int hikariMaxLifetime ;
public static final Lock databaseLock = new ReentrantLock ( true ) ; // This lock is locking the threads trying to execute queries in the database.
public static HikariDataSource hikariDataSource ;
private static final ImpalaConnector singletonObject = new ImpalaConnector ( ) ;
public static ImpalaConnector getInstance ( )
{
return singletonObject ;
}
public ImpalaConnector ( )
{
logger . info ( " Max available memory to the Controller: " + Runtime . getRuntime ( ) . maxMemory ( ) + " bytes. " ) ;
try {
String dbSettingsPropertyFile = System . getProperty ( " user.dir " ) + File . separator + " src " + File . separator + " main " + File . separator + " resources " + File . separator + " application.properties " ;
FileReader fReader = new FileReader ( dbSettingsPropertyFile ) ;
Properties props = new Properties ( ) ;
props . load ( fReader ) ; // Load jdbc related properties.
// Get each property value.
impalaDriver = props . getProperty ( " spring.impala.driver-class-name " ) ;
if ( ! " " . equals ( impalaDriver ) ) { // If not "null" or empty.
Class . forName ( impalaDriver ) ;
impalaConnectionUrl = props . getProperty ( " spring.impala.url " ) ;
2022-01-28 06:24:42 +01:00
oldDatabaseName = props . getProperty ( " spring.impala.oldDatabaseName " ) ;
databaseName = props . getProperty ( " spring.impala.databaseName " ) ;
2021-11-09 22:59:27 +01:00
poolName = props . getProperty ( " spring.datasource.hikari.pool-name " ) ;
hikariMaxConnectionPoolSize = Integer . parseInt ( props . getProperty ( " spring.datasource.hikari.maximumPoolSize " ) ) ;
hikariMaxLifetime = Integer . parseInt ( props . getProperty ( " spring.datasource.hikari.maxLifetime " ) ) ;
hikariMinIdleConnections = Integer . parseInt ( props . getProperty ( " spring.datasource.hikari.minimumIdle " ) ) ;
hikariConnectionTimeOut = Integer . parseInt ( props . getProperty ( " spring.datasource.hikari.connectionTimeout " ) ) ;
hikariIdleTimeOut = Integer . parseInt ( props . getProperty ( " spring.datasource.hikari.idleTimeout " ) ) ;
} else
throw new RuntimeException ( " The \" impalaDriver \" was null or empty! " ) ;
} catch ( Exception e ) {
2021-11-30 12:26:19 +01:00
String errorMsg = " Error when loading the database properties! \ n " + e . getMessage ( ) ;
logger . error ( errorMsg , e ) ;
System . err . println ( errorMsg ) ;
2021-11-09 22:59:27 +01:00
System . exit ( 11 ) ;
}
try {
hikariDataSource = impalaDS ( ) ;
} catch ( SQLException | PropertyVetoException e ) {
2021-11-30 12:26:19 +01:00
logger . error ( " Problem when creating the Hikari connection pool! " , e ) ;
2021-11-09 22:59:27 +01:00
}
createDatabase ( ) ;
}
2021-11-30 12:26:19 +01:00
public HikariDataSource impalaDS ( ) throws SQLException , PropertyVetoException
{
HikariConfig hikariConfig = new HikariConfig ( ) ;
hikariConfig . setDriverClassName ( ImpalaConnector . impalaDriver ) ;
hikariConfig . setAutoCommit ( true ) ;
hikariConfig . setJdbcUrl ( ImpalaConnector . impalaConnectionUrl ) ;
hikariConfig . setPoolName ( poolName ) ;
hikariConfig . setMaximumPoolSize ( hikariMaxConnectionPoolSize ) ;
hikariConfig . setMaxLifetime ( hikariMaxLifetime ) ;
hikariConfig . setMinimumIdle ( hikariMinIdleConnections ) ;
hikariConfig . setConnectionTimeout ( hikariConnectionTimeOut ) ;
hikariConfig . setIdleTimeout ( hikariIdleTimeOut ) ;
return new HikariDataSource ( hikariConfig ) ;
}
2021-11-09 22:59:27 +01:00
public void createDatabase ( )
{
Connection con = getConnection ( ) ;
2021-11-30 12:26:19 +01:00
if ( con = = null )
2021-11-09 22:59:27 +01:00
System . exit ( 22 ) ;
try {
if ( ! con . getMetaData ( ) . supportsBatchUpdates ( ) )
logger . warn ( " The database does not support \" BatchUpdates \" ! " ) ;
} catch ( SQLException e ) {
2021-11-30 12:26:19 +01:00
logger . error ( e . getMessage ( ) , e ) ;
2021-11-09 22:59:27 +01:00
}
logger . info ( " Going to create the database and the tables, if they do not exist. Also will fill some tables with data from OpenAIRE. " ) ;
Statement statement = null ;
try {
statement = con . createStatement ( ) ;
} catch ( SQLException sqle ) {
logger . error ( " Problem when creating a connection-statement! \ n " + sqle . getMessage ( ) ) ;
2021-11-30 12:26:19 +01:00
ImpalaConnector . closeConnection ( con ) ;
2021-11-09 22:59:27 +01:00
System . exit ( 33 ) ;
}
try {
statement . execute ( " CREATE DATABASE IF NOT EXISTS " + databaseName ) ;
statement . execute ( " CREATE TABLE IF NOT EXISTS " + databaseName + " .publication stored as parquet as select * from " + oldDatabaseName + " .publication " ) ;
statement . execute ( " COMPUTE STATS " + databaseName + " .publication " ) ;
statement . execute ( " CREATE TABLE IF NOT EXISTS " + databaseName + " .publication_pids stored as parquet as select * from " + oldDatabaseName + " .publication_pids " ) ;
statement . execute ( " COMPUTE STATS " + databaseName + " .publication_pids " ) ;
statement . execute ( " CREATE TABLE IF NOT EXISTS " + databaseName + " .publication_urls stored as parquet as select * from " + oldDatabaseName + " .publication_urls " ) ;
statement . execute ( " COMPUTE STATS " + databaseName + " .publication_urls " ) ;
statement . execute ( " CREATE TABLE IF NOT EXISTS " + databaseName + " .datasource stored as parquet as select * from " + oldDatabaseName + " .datasource " ) ;
statement . execute ( " COMPUTE STATS " + databaseName + " .datasource " ) ;
statement . execute ( " CREATE TABLE IF NOT EXISTS " + databaseName + " .assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet " ) ;
statement . execute ( " COMPUTE STATS " + databaseName + " .assignment " ) ;
2022-01-19 00:37:47 +01:00
statement . execute ( " DROP TABLE IF EXISTS " + ImpalaConnector . databaseName + " .current_assignment PURGE " ) ;
2021-11-09 22:59:27 +01:00
statement . execute ( " CREATE TABLE IF NOT EXISTS " + databaseName + " .attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet " ) ;
statement . execute ( " COMPUTE STATS " + databaseName + " .attempt " ) ;
statement . execute ( " CREATE TABLE IF NOT EXISTS " + databaseName + " .payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet " ) ;
statement . execute ( " COMPUTE STATS " + databaseName + " .payload " ) ;
} catch ( SQLException sqle ) {
2021-11-30 12:26:19 +01:00
String errorMsg = " Problem when executing the \" create database and create tables queries! \ n " + sqle . getMessage ( ) + " \ nSQL state: " + sqle . getSQLState ( ) + " \ nError code: " + sqle . getErrorCode ( ) ;
logger . error ( errorMsg , sqle ) ;
System . err . println ( errorMsg ) ;
2021-11-09 22:59:27 +01:00
System . exit ( 44 ) ;
} finally {
try {
statement . close ( ) ;
con . close ( ) ;
} catch ( SQLException sqle2 ) {
logger . error ( " Could not close the connection with the Impala-database. \ n " + sqle2 . getMessage ( ) ) ;
}
}
logger . info ( " The database \" " + databaseName + " \" and its tables were created or validated. " ) ;
}
public Connection getConnection ( )
{
try {
return hikariDataSource . getConnection ( ) ;
//return DriverManager.getConnection(impalaConnectionUrl, null, null); // This is for non pooled connections.
} catch ( SQLException sqle ) {
logger . error ( " Problem when connecting with the Impala-database! \ n " + sqle . getMessage ( ) ) ;
return null ;
}
}
public boolean testDatabaseAccess ( )
{
logger . info ( " Going to test Impala access.. " ) ;
Connection con = getConnection ( ) ;
if ( con = = null )
return false ;
ResultSet res = null ;
try {
String tableName = " publication " ;
// show tables
String sql = " show tables ' " + tableName + " ' " ;
logger . debug ( " Running: " + sql ) ;
res = con . prepareStatement ( sql ) . executeQuery ( ) ;
if ( res . next ( ) ) {
logger . debug ( res . getString ( 1 ) ) ;
}
// describe table
sql = " describe " + tableName ;
logger . debug ( " Running: " + sql ) ;
res = con . prepareStatement ( sql ) . executeQuery ( ) ;
while ( res . next ( ) ) {
logger . debug ( res . getString ( 1 ) + " \ t " + res . getString ( 2 ) ) ;
}
// select * query
sql = " select * from " + tableName + " limit 3; " ;
logger . debug ( " Running: " + sql ) ;
res = con . prepareStatement ( sql ) . executeQuery ( ) ;
while ( res . next ( ) ) {
logger . debug ( res . getString ( 1 ) ) ;
}
// Get Assignments, only for testing here.
//UrlController urlController = new UrlController();
//ResponseEntity<?> responseEntity = urlController.getUrls("worker_1", ControllerConstants.ASSIGNMENTS_LIMIT);
//logger.debug(responseEntity.toString());
} catch ( SQLException sqle ) {
2021-11-30 12:26:19 +01:00
logger . error ( sqle . getMessage ( ) , sqle ) ;
2021-11-09 22:59:27 +01:00
return false ;
} finally {
try {
if ( res ! = null )
res . close ( ) ;
con . close ( ) ;
} catch ( SQLException sqle ) {
logger . error ( " Could not close the connection with the Impala-database. \ n " + sqle ) ;
}
}
return true ;
}
2021-11-30 12:26:19 +01:00
public static boolean closeConnection ( Connection con ) {
try {
if ( con ! = null )
con . close ( ) ; // It may have already closed and that's fine.
return true ;
} catch ( SQLException sqle ) {
logger . error ( " Could not close the connection with the Impala-database. \ n " + sqle . getMessage ( ) ) ;
return false ;
}
}
public static String handlePreparedStatementException ( String queryName , String query , String preparedStatementName , PreparedStatement preparedStatement , Connection con , Exception e )
{
String errorMsg = " Problem when creating " + ( ( ! queryName . startsWith ( " get " ) ) ? " and executing " : " " ) + " the prepared statement for \" " + queryName + " \" ! \ n " ;
logger . error ( errorMsg + " \ n \ n " + query + " \ n \ n " + e . getMessage ( ) , e ) ;
closeConnection ( con ) ;
return errorMsg ;
}
2021-11-09 22:59:27 +01:00
}