145 lines
8.2 KiB
Java
145 lines
8.2 KiB
Java
package eu.openaire.urls_controller.configuration;
|
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Repository;
|
|
|
|
import javax.annotation.PostConstruct;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
@Repository
|
|
public class DatabaseConnector {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(DatabaseConnector.class);
|
|
|
|
@Autowired
|
|
private JdbcTemplate jdbcTemplate;
|
|
|
|
private final boolean isTestEnvironment;
|
|
private final String initialDatabaseName;
|
|
private final String testDatabaseName;
|
|
|
|
public static String databaseName;
|
|
|
|
public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database.
|
|
|
|
|
|
public DatabaseConnector(@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment,
|
|
@Value("${services.pdfaggregation.controller.db.initialDatabaseName}") String initialDatabaseName,
|
|
@Value("${services.pdfaggregation.controller.db.testDatabaseName}") String testDatabaseName) {
|
|
this.isTestEnvironment = isTestEnvironment;
|
|
this.initialDatabaseName = initialDatabaseName;
|
|
this.testDatabaseName = testDatabaseName;
|
|
}
|
|
|
|
|
|
@PostConstruct
|
|
public void init() {
|
|
int OneMb = 1048576;
|
|
logger.info("Max available memory to the Controller: " + (Runtime.getRuntime().maxMemory() / OneMb) + " Mb.");
|
|
try {
|
|
boolean supportsBatchUpdates = jdbcTemplate.getDataSource().getConnection().getMetaData().supportsBatchUpdates();
|
|
logger.info("The database " + (supportsBatchUpdates ? "supports" : "does not support") + " \"BatchUpdates\"!");
|
|
} catch (Exception e) {
|
|
logger.error("Error testing if database supports batch updates!", e);
|
|
}
|
|
|
|
createDatabase(); // In case of an exception, the App will exit with the stacktrace.
|
|
}
|
|
|
|
|
|
private void createDatabase()
|
|
{
|
|
if ( isTestEnvironment ) {
|
|
logger.info("Going to create (if not exist) the TEST-database \"" + testDatabaseName + "\" and its tables. Also will fill some tables with data from the initial-database \"" + initialDatabaseName + "\".");
|
|
jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + testDatabaseName);
|
|
|
|
try { // Metastore takes some time to recognize the DB has been created, in order to use it later..
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException ignore) {}
|
|
|
|
jdbcTemplate.update("INVALIDATE METADATA");
|
|
|
|
try { // Metastore takes some time to recognize the DB has been created, in order to use it later..
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException ignore) {}
|
|
|
|
// Create VIEWs of the original data. We just READ from it, so it's safe for our testing environment..
|
|
|
|
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication as select * from " + initialDatabaseName + ".publication");
|
|
|
|
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication_pids as select * from " + initialDatabaseName + ".publication_pids");
|
|
|
|
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication_urls as select * from " + initialDatabaseName + ".publication_urls");
|
|
|
|
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication_boost as select * from " + initialDatabaseName + ".publication_boost");
|
|
|
|
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".datasource as select * from " + initialDatabaseName + ".datasource");
|
|
|
|
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".payload_legacy as select * from " + initialDatabaseName + ".payload_legacy");
|
|
|
|
databaseName = testDatabaseName;
|
|
} else {
|
|
logger.info("Going to create or validate the tables that are populated by the Controller, for the \"initialDatabase\" = \"" + initialDatabaseName + "\"");
|
|
// Note that for the "initialDatabase", the initial 5 tables are expected to be created either manually or by other pieces of software, as views of the contents of the Graph.
|
|
databaseName = initialDatabaseName;
|
|
}
|
|
|
|
// For both cases, got check and create the tables which will be populated by the Controller.
|
|
|
|
// Drop the "current_assignment" table. It is a temporary table which is created on-demand during execution.
|
|
jdbcTemplate.execute("DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE");
|
|
|
|
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, assignments_batch_counter bigint, `date` bigint) stored as parquet");
|
|
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".assignment");
|
|
|
|
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` bigint, status string, error_class string, error_message string) stored as parquet");
|
|
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt");
|
|
|
|
// Create a VIEW "payload" which consists of 3 different tables:
|
|
// 1) The "payload_legacy" table, which contains data older than 2022, which is imported by previous full-text aggregation processes.
|
|
// 2) The "payload_aggregated" table, which contains data from 2022 onwards, collected by the new PDF-Aggregation-Service.
|
|
// 3) The "payload_bulk_import", which contains data collected from the bulk-imported content from datasources like "arXiv".
|
|
|
|
// So, each aggregation process will "load" its contents to the right table, but in order to get the "total" metrics, we can just query the "payload" view.
|
|
|
|
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload_aggregated (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
|
|
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload_aggregated");
|
|
|
|
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload_bulk_import (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
|
|
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload_bulk_import");
|
|
|
|
|
|
try { // Metastore takes some time to recognize the tables have been created, in order to use them in the view.
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException ignore) {}
|
|
|
|
jdbcTemplate.update("INVALIDATE METADATA");
|
|
|
|
try { // Metastore takes some time to recognize the tables have been created, in order to use them in the view.
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException ignore) {}
|
|
|
|
|
|
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + databaseName + ".payload\n" +
|
|
"AS SELECT * from " + databaseName + ".payload_legacy\n" +
|
|
"UNION ALL SELECT * FROM " + databaseName +".payload_aggregated\n" +
|
|
"UNION ALL SELECT * FROM " + databaseName + ".payload_bulk_import");
|
|
// We do not do the "compute stats" for the view, since we get the following error: "COMPUTE STATS not supported for view: pdfaggregationdatabase_payloads_view.payload".
|
|
|
|
logger.info("The " + (isTestEnvironment ? "TEST-" : "") + "database \"" + databaseName + "\" and its tables were created or validated.");
|
|
}
|
|
|
|
|
|
public static String handleQueryException(String queryName, String query, Exception e) {
|
|
String errorMsg = "Problem when executing the query \"" + queryName + "\"!\n";
|
|
logger.error(errorMsg + "\n\n" + query + "\n\n", e);
|
|
return errorMsg;
|
|
}
|
|
|
|
}
|