package eu.openaire.urls_controller.configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import javax.annotation.PostConstruct; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Repository public class ImpalaConnector { private static final Logger logger = LoggerFactory.getLogger(ImpalaConnector.class); @Autowired private JdbcTemplate jdbcTemplate; private final boolean isTestEnvironment; private final String initialDatabaseName; private final String testDatabaseName; public static String databaseName; public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database. public ImpalaConnector(@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, @Value("${services.pdfaggregation.controller.db.initialDatabaseName}") String initialDatabaseName, @Value("${services.pdfaggregation.controller.db.testDatabaseName}") String testDatabaseName) { this.isTestEnvironment = isTestEnvironment; this.initialDatabaseName = initialDatabaseName; this.testDatabaseName = testDatabaseName; } @PostConstruct public void init() { logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes."); try { boolean supportsBatchUpdates = jdbcTemplate.getDataSource().getConnection().getMetaData().supportsBatchUpdates(); logger.info("The database " + (supportsBatchUpdates ? "supports" : "does not support") + " \"BatchUpdates\"!"); } catch (Exception e) { logger.error("Error testing if database supports batch updates!", e); } createDatabase(); // In case of an exception, the App will exit with the stacktrace. } private void createDatabase() { if ( isTestEnvironment ) { logger.info("Going to create (if not exist) the test-database \"" + testDatabaseName + "\" and its tables. Also will fill some tables with data from database \"" + initialDatabaseName + "\"."); jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + testDatabaseName); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication stored as parquet as select * from " + initialDatabaseName + ".publication"); jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".publication"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication_pids stored as parquet as select * from " + initialDatabaseName + ".publication_pids"); jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".publication_pids"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication_urls stored as parquet as select * from " + initialDatabaseName + ".publication_urls"); jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".publication_urls"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".datasource stored as parquet as select * from " + initialDatabaseName + ".datasource"); jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".datasource"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet"); jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".assignment"); databaseName = testDatabaseName; // For the rest of the queries. } else { logger.info("Going to create or validate the tables that are populated by the Controller, for the \"initialDatabase\" = \"" + initialDatabaseName + "\""); databaseName = initialDatabaseName; } // For both cases, got check and create the tables which will be populated by the Controller. // Drop the "current_assignment" table. It is a temporary table which is created on-demand during execution. jdbcTemplate.execute("DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload"); logger.info("The " + (isTestEnvironment ? "test-" : "") + "database \"" + databaseName + "\" and its tables were created or validated."); } public static String handleQueryException(String queryName, String query, Exception e) { String errorMsg = "Problem when executing the query \"" + queryName + "\"!\n"; logger.error(errorMsg + "\n\n" + query + "\n\n", e); return errorMsg; } }