UrlsController/src/main/java/eu/openaire/urls_controller/configuration/DatabaseConnector.java

145 lines
8.2 KiB
Java

package eu.openaire.urls_controller.configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import javax.annotation.PostConstruct;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Repository
public class DatabaseConnector {
private static final Logger logger = LoggerFactory.getLogger(DatabaseConnector.class);
@Autowired
private JdbcTemplate jdbcTemplate;
private final boolean isTestEnvironment;
private final String initialDatabaseName;
private final String testDatabaseName;
public static String databaseName;
public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database.
public DatabaseConnector(@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment,
@Value("${services.pdfaggregation.controller.db.initialDatabaseName}") String initialDatabaseName,
@Value("${services.pdfaggregation.controller.db.testDatabaseName}") String testDatabaseName) {
this.isTestEnvironment = isTestEnvironment;
this.initialDatabaseName = initialDatabaseName;
this.testDatabaseName = testDatabaseName;
}
@PostConstruct
public void init() {
int OneMb = 1048576;
logger.info("Max available memory to the Controller: " + (Runtime.getRuntime().maxMemory() / OneMb) + " Mb.");
try {
boolean supportsBatchUpdates = jdbcTemplate.getDataSource().getConnection().getMetaData().supportsBatchUpdates();
logger.info("The database " + (supportsBatchUpdates ? "supports" : "does not support") + " \"BatchUpdates\"!");
} catch (Exception e) {
logger.error("Error testing if database supports batch updates!", e);
}
createDatabase(); // In case of an exception, the App will exit with the stacktrace.
}
private void createDatabase()
{
if ( isTestEnvironment ) {
logger.info("Going to create (if not exist) the TEST-database \"" + testDatabaseName + "\" and its tables. Also will fill some tables with data from the initial-database \"" + initialDatabaseName + "\".");
jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + testDatabaseName);
try { // Metastore takes some time to recognize the DB has been created, in order to use it later..
Thread.sleep(1000);
} catch (InterruptedException ignore) {}
jdbcTemplate.update("INVALIDATE METADATA");
try { // Metastore takes some time to recognize the DB has been created, in order to use it later..
Thread.sleep(1000);
} catch (InterruptedException ignore) {}
// Create VIEWs of the original data. We just READ from it, so it's safe for our testing environment..
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication as select * from " + initialDatabaseName + ".publication");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication_pids as select * from " + initialDatabaseName + ".publication_pids");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication_urls as select * from " + initialDatabaseName + ".publication_urls");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication_boost as select * from " + initialDatabaseName + ".publication_boost");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".datasource as select * from " + initialDatabaseName + ".datasource");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".payload_legacy as select * from " + initialDatabaseName + ".payload_legacy");
databaseName = testDatabaseName;
} else {
logger.info("Going to create or validate the tables that are populated by the Controller, for the \"initialDatabase\" = \"" + initialDatabaseName + "\"");
// Note that for the "initialDatabase", the initial 5 tables are expected to be created either manually or by other pieces of software, as views of the contents of the Graph.
databaseName = initialDatabaseName;
}
// For both cases, got check and create the tables which will be populated by the Controller.
// Drop the "current_assignment" table. It is a temporary table which is created on-demand during execution.
jdbcTemplate.execute("DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, assignments_batch_counter bigint, `date` bigint) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".assignment");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` bigint, status string, error_class string, error_message string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt");
// Create a VIEW "payload" which consists of 3 different tables:
// 1) The "payload_legacy" table, which contains data older than 2022, which is imported by previous full-text aggregation processes.
// 2) The "payload_aggregated" table, which contains data from 2022 onwards, collected by the new PDF-Aggregation-Service.
// 3) The "payload_bulk_import", which contains data collected from the bulk-imported content from datasources like "arXiv".
// So, each aggregation process will "load" its contents to the right table, but in order to get the "total" metrics, we can just query the "payload" view.
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload_aggregated (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload_aggregated");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload_bulk_import (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload_bulk_import");
try { // Metastore takes some time to recognize the tables have been created, in order to use them in the view.
Thread.sleep(1000);
} catch (InterruptedException ignore) {}
jdbcTemplate.update("INVALIDATE METADATA");
try { // Metastore takes some time to recognize the tables have been created, in order to use them in the view.
Thread.sleep(1000);
} catch (InterruptedException ignore) {}
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + databaseName + ".payload\n" +
"AS SELECT * from " + databaseName + ".payload_legacy\n" +
"UNION ALL SELECT * FROM " + databaseName +".payload_aggregated\n" +
"UNION ALL SELECT * FROM " + databaseName + ".payload_bulk_import");
// We do not do the "compute stats" for the view, since we get the following error: "COMPUTE STATS not supported for view: pdfaggregationdatabase_payloads_view.payload".
logger.info("The " + (isTestEnvironment ? "TEST-" : "") + "database \"" + databaseName + "\" and its tables were created or validated.");
}
public static String handleQueryException(String queryName, String query, Exception e) {
String errorMsg = "Problem when executing the query \"" + queryName + "\"!\n";
logger.error(errorMsg + "\n\n" + query + "\n\n", e);
return errorMsg;
}
}