- Implement the "getUrls" and "addWorkerReport" endpoints with full database-handling.

- Add connectivity with an Impala-database and create a dedicated Controller for future statistics-requests.
- Optimize the "getTestUrls"-endpoint.
- Disable the "reportCurrentTime()" scheduled-task.
- Update dependencies and bump project's version to '1.0.0-SNAPSHOT'.
- Set the logging-appender to "File".
- Code cleanup.
This commit is contained in:
Lampros Smyrnaios 2021-11-09 23:59:27 +02:00
parent 0d47c33a08
commit d100af35d0
9 changed files with 839 additions and 43 deletions

View File

@ -5,11 +5,20 @@ plugins {
}
group = 'eu.openaire.urls_controller'
version = '0.0.1-SNAPSHOT'
version = '1.0.0-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
maven {
name "icm"
url "http://maven.icm.edu.pl/artifactory/repo/"
allowInsecureProtocol = true
}
maven {
name "pentaho-repo"
url "https://public.nexus.pentaho.org/content/groups/omni/"
}
}
dependencies {
@ -32,13 +41,18 @@ dependencies {
implementation "org.projectlombok:lombok:1.18.22"
// https://mvnrepository.com/artifact/com.google.guava/guava
implementation group: 'com.google.guava', name: 'guava', version: '30.1.1-jre'
implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
implementation "org.projectlombok:lombok:1.18.20"
implementation group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
// https://mvnrepository.com/artifact/com.cloudera.impala/jdbc
implementation group: 'com.cloudera.impala', name: 'jdbc', version: '2.5.31'
testImplementation group: 'org.springframework.security', name: 'spring-security-test', version: springSecurityVersion
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation group: 'org.springframework.security', name: 'spring-security-test'
testImplementation "org.springframework.boot:spring-boot-starter-test"
}
configurations {
// Eliminates slf4j-log4j12
all*.exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
test {

View File

@ -1,6 +1,7 @@
package eu.openaire.urls_controller;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.util.UriBuilder;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
@ -12,6 +13,7 @@ import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.CorsConfigurationSource;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.Collections;
@ -36,6 +38,14 @@ public class Application {
}
@PreDestroy
public static void preDestroy()
{
if ( ImpalaConnector.hikariDataSource != null )
ImpalaConnector.hikariDataSource.close();
}
@Bean
public CommandLineRunner setServerBaseUrl(Environment environment)
{

View File

@ -16,7 +16,7 @@ public class ScheduledTasks {
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
@Scheduled(fixedRate = 600_000) // TODO - Change to every 10 mins: 600_000
//@Scheduled(fixedRate = 600_000) // Run every 10 mins: 600_000
public void reportCurrentTime() {
logger.info("Server is live! Time is now {}", dateFormat.format(new Date()));
}

View File

@ -0,0 +1,239 @@
package eu.openaire.urls_controller.configuration;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.beans.PropertyVetoException;
import java.io.File;
import java.io.FileReader;
import java.sql.*;
import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public final class ImpalaConnector {
private static final Logger logger = LoggerFactory.getLogger(ImpalaConnector.class);
public static String impalaDriver;
public static String impalaConnectionUrl;
public static String poolName;
public static int hikariMaxConnectionPoolSize;
public static int hikariMinIdleConnections;
public static int hikariConnectionTimeOut;
public static int hikariIdleTimeOut;
public static int hikariMaxLifetime;
public static String oldDatabaseName = "pdfaggregation_i";
public static String databaseName = "pdfAggregationDatabase";
public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database.
public static HikariDataSource hikariDataSource;
private static final ImpalaConnector singletonObject = new ImpalaConnector();
public static ImpalaConnector getInstance()
{
return singletonObject;
}
public HikariDataSource impalaDS() throws SQLException, PropertyVetoException
{
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setDriverClassName(ImpalaConnector.impalaDriver);
hikariConfig.setAutoCommit(true);
hikariConfig.setJdbcUrl(ImpalaConnector.impalaConnectionUrl);
hikariConfig.setPoolName(poolName);
hikariConfig.setMaximumPoolSize(hikariMaxConnectionPoolSize);
hikariConfig.setMaxLifetime(hikariMaxLifetime);
hikariConfig.setMinimumIdle(hikariMinIdleConnections);
hikariConfig.setConnectionTimeout(hikariConnectionTimeOut);
hikariConfig.setIdleTimeout(hikariIdleTimeOut);
return new HikariDataSource(hikariConfig);
}
public ImpalaConnector()
{
logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes.");
try {
String dbSettingsPropertyFile = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "application.properties";
FileReader fReader = new FileReader(dbSettingsPropertyFile);
Properties props = new Properties();
props.load(fReader); // Load jdbc related properties.
// Get each property value.
impalaDriver = props.getProperty("spring.impala.driver-class-name");
if ( !"".equals(impalaDriver) ) { // If not "null" or empty.
Class.forName(impalaDriver);
impalaConnectionUrl = props.getProperty("spring.impala.url");
poolName = props.getProperty("spring.datasource.hikari.pool-name");
hikariMaxConnectionPoolSize = Integer.parseInt(props.getProperty("spring.datasource.hikari.maximumPoolSize"));
hikariMaxLifetime = Integer.parseInt(props.getProperty("spring.datasource.hikari.maxLifetime"));
hikariMinIdleConnections = Integer.parseInt(props.getProperty("spring.datasource.hikari.minimumIdle"));
hikariConnectionTimeOut = Integer.parseInt(props.getProperty("spring.datasource.hikari.connectionTimeout"));
hikariIdleTimeOut = Integer.parseInt(props.getProperty("spring.datasource.hikari.idleTimeout"));
} else
throw new RuntimeException("The \"impalaDriver\" was null or empty!");
} catch(Exception e) {
logger.error("Error when loading the database properties!\n" + e.getMessage());
e.printStackTrace();
System.exit(11);
}
try {
hikariDataSource = impalaDS();
} catch (SQLException | PropertyVetoException e) {
logger.error("Problem when creating the Hikari connection pool!");
e.printStackTrace();
}
createDatabase();
}
public void createDatabase()
{
databaseLock.lock(); // Make sure the database and tables are created before the workers can request assignments.
Connection con = getConnection();
if ( con == null ) {
databaseLock.unlock();
System.exit(22);
}
try {
if ( !con.getMetaData().supportsBatchUpdates() )
logger.warn("The database does not support \"BatchUpdates\"!");
} catch (SQLException e) {
e.printStackTrace();
}
logger.info("Going to create the database and the tables, if they do not exist. Also will fill some tables with data from OpenAIRE.");
Statement statement = null;
try {
statement = con.createStatement();
} catch (SQLException sqle) {
logger.error("Problem when creating a connection-statement!\n" + sqle.getMessage());
try {
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
databaseLock.unlock();
System.exit(33);
}
try {
statement.execute("CREATE DATABASE IF NOT EXISTS " + databaseName);
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + oldDatabaseName + ".publication");
statement.execute("COMPUTE STATS " + databaseName + ".publication");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + oldDatabaseName + ".publication_pids");
statement.execute("COMPUTE STATS " + databaseName + ".publication_pids");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + oldDatabaseName + ".publication_urls");
statement.execute("COMPUTE STATS " + databaseName + ".publication_urls");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + oldDatabaseName + ".datasource");
statement.execute("COMPUTE STATS " + databaseName + ".datasource");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet");
statement.execute("COMPUTE STATS " + databaseName + ".assignment");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet");
statement.execute("COMPUTE STATS " + databaseName + ".attempt");
statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
statement.execute("COMPUTE STATS " + databaseName + ".payload");
} catch (SQLException sqle) {
logger.error("Problem when executing the \"create database and create tables queries!\n" + sqle.getMessage() + "\nSQL state: " + sqle.getSQLState() + "\nError code: " + sqle.getErrorCode());
sqle.printStackTrace();
System.exit(44);
} finally {
databaseLock.unlock();
try {
statement.close();
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
}
logger.info("The database \"" + databaseName + "\" and its tables were created or validated.");
}
public Connection getConnection()
{
try {
return hikariDataSource.getConnection();
//return DriverManager.getConnection(impalaConnectionUrl, null, null); // This is for non pooled connections.
} catch (SQLException sqle) {
logger.error("Problem when connecting with the Impala-database!\n" + sqle.getMessage());
return null;
}
}
public boolean testDatabaseAccess()
{
logger.info("Going to test Impala access..");
Connection con = getConnection();
if ( con == null )
return false;
ResultSet res = null;
try {
String tableName = "publication";
// show tables
String sql = "show tables '" + tableName + "'";
logger.debug("Running: " + sql);
res = con.prepareStatement(sql).executeQuery();
if ( res.next() ) {
logger.debug(res.getString(1));
}
// describe table
sql = "describe " + tableName;
logger.debug("Running: " + sql);
res = con.prepareStatement(sql).executeQuery();
while ( res.next() ) {
logger.debug(res.getString(1) + "\t" + res.getString(2));
}
// select * query
sql = "select * from " + tableName + " limit 3;";
logger.debug("Running: " + sql);
res = con.prepareStatement(sql).executeQuery();
while ( res.next() ) {
logger.debug(res.getString(1));
}
// Get Assignments, only for testing here.
//UrlController urlController = new UrlController();
//ResponseEntity<?> responseEntity = urlController.getUrls("worker_1", ControllerConstants.ASSIGNMENTS_LIMIT);
//logger.debug(responseEntity.toString());
} catch (SQLException sqle) {
sqle.printStackTrace();
return false;
} finally {
try {
if ( res != null )
res.close();
con.close();
} catch (SQLException sqle) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle);
}
}
return true;
}
}

View File

@ -0,0 +1,64 @@
package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@RestController
@RequestMapping("/impala")
public class ImpalaController {
// This controller will test the connectivity with the database and return statistics!
private static final Logger logger = LoggerFactory.getLogger(ImpalaController.class);
@GetMapping("get10PublicationIdsTest")
public ResponseEntity<?> get10PublicationIdsTest() {
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
String query = "SELECT id FROM publication LIMIT 10;";
try ( ResultSet res = con.prepareStatement(query).executeQuery()) {
if ( !res.first() ) {
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\"!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
List<String> publications = new ArrayList<>();
do {
publications.add(res.getString(0));
} while ( res.next() );
return new ResponseEntity<String>(publications.toString(), HttpStatus.OK);
} catch (Exception e) {
String errorMsg = "Problem when executing \"getAssignmentsQuery\": " + query;
logger.error(errorMsg);
e.printStackTrace();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
con.close();
} catch (SQLException sqle) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle);
}
}
}
}

View File

@ -1,12 +1,11 @@
package eu.openaire.urls_controller.controllers;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.models.Assignment;
import eu.openaire.urls_controller.models.Datasource;
import eu.openaire.urls_controller.models.Task;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentResponse;
//import eu.openaire.urls_controller.repositories.AssignmentRepository;
import eu.openaire.urls_controller.util.ControllerConstants;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
@ -16,6 +15,9 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.sql.*;
import java.sql.Date;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
@ -25,72 +27,520 @@ public class UrlController {
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
private static AtomicLong assignmentCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
}
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
@GetMapping("")
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
List<Task> tasks = new ArrayList<>();
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
// TODO - Create the Assignment from the id-urls stored in the database up to the tasks-limit.
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
// TODO - Make sure the Date is the same for all entries!
Date date = new Date(); // Store it here, in order to have the same for sure.
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > ControllerConstants.ASSIGNMENTS_LIMIT ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + ControllerConstants.ASSIGNMENTS_LIMIT + "). Will use the Controller's limit.");
assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT;
}
int assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT;
if ( assignmentsLimit > workerAssignmentsLimit )
assignmentsLimit = workerAssignmentsLimit;
String getAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
"left outer join (\n" +
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
" union all\n" +
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= ? and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry')\n" +
") as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit ?) as getAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
int tasksLimitForAssignment = ControllerConstants.ASSIGNMENTS_LIMIT;
if ( tasksLimitForAssignment > workerTasksLimit )
tasksLimitForAssignment = workerTasksLimit;
// TODO - If we add more limits it could be faster.. Inner queries could have a limit of e.g. < assignmentsLimit ^ 2 >
// The LIMIT of < assignmentsLimit > should be kept in the end, as we want 10_000 of distinct results.
List<Assignment> assignments = null; // TODO -> // assignmentRepository.getNewAssignments(tasksLimitForAssignment);
// This is just for tests without the attempts, payloads and the assignments
/*String getAssignmentsQuery = "select * from (select distinct pubid, url, datasourceid, datasourcetype from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"where d.allow_harvest=true " +
"order by reverse(p.id), pu.url) as distinct_results\n" +
"limit ? ) as getAssignmentsQuery";*/
//Assignment assignment = new Assignment(assignmentId, tasks, workerId, date);
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
// TODO - Write the Assignment details to the database and then send it to the worker.
ImpalaConnector.databaseLock.lock();
logger.info("Sending assignment_" + assignmentCounter.incrementAndGet() + " to worker with ID: " + workerId);
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) { // This is already logged in "getConnection()".
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
return ResponseEntity.status(200).header("Content-Type", "application/json").body(new AssignmentResponse(assignmentCounter.get(), assignments));
PreparedStatement getAssignmentsPreparedStatement = null;
try {
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
getAssignmentsPreparedStatement.setInt(1, ControllerConstants.MAX_ATTEMPTS_PER_RECORD);
getAssignmentsPreparedStatement.setInt(2, assignmentsLimit);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when creating the prepared statement for \"getAssignmentsQuery\"!\n";
logger.error(errorMsg + sqle.getMessage());
try {
if ( getAssignmentsPreparedStatement != null )
getAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Could not close the \"getAssignmentsPreparedStatement\".\n" + sqle2.getMessage());
}
try {
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
Date date = new Date(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) {
// Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it.
/*if ( !resultSet.first() ) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}*/
// The cursor is automatically before the first element in this configuration.
while ( resultSet.next() ) {
// The following few lines, cannot be outside the "while" loop, since the same record is returned, despite that we update the inner-values.
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setDate(date);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
assignment.setId(resultSet.getString(1));
assignment.setOriginalUrl(resultSet.getString(2));
datasource.setId(resultSet.getString(3));
datasource.setName(resultSet.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow());
sqle.printStackTrace();
continue; // This object is broken, move to the next row.
}
assignment.setDatasource(datasource);
assignments.add(assignment);
}
} catch (Exception e) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
logger.error(errorMsg, e);
try {
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
getAssignmentsPreparedStatement.close();
} catch (SQLException sqle) {
logger.error("Failed to close the \"getAssignmentsPreparedStatement\"!\n" + sqle.getMessage());
}
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
try {
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// The following is a test of inserting multiple rows with a singme insert-query. If applied with a preparedStatement, then the JDBC fails with "OutOfMemory"-Error.
/*String testInsert = "INSERT INTO assignment (id,original_url,workerid,`date`) VALUES ( 'doiboost____::4e8b1f12ac3ba5a9d8fbff9872000000', 'http://dx.doi.org/10.17267/2596-3368dentistry.v6i2.586', 'worker_1', CAST('2021-10-01' AS TIMESTAMP) ) , ( 'doiboost____::4e8b1f12ac3ba5a9d8fbff9872000000', 'https://academic.microsoft.com/#/detail/2887540302', 'worker_1', CAST('2021-10-01' AS TIMESTAMP) );";
try (Statement insertStatement = con.createStatement()) {
insertStatement.execute(testInsert);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String mergeErrorMsg = "Problem when executing the testInsert statement for \"" + testInsert + "\"";
logger.error(mergeErrorMsg + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}*/
// Write the Assignment details to the database and then send it to the worker.
String insertIntoAssignmentBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".assignment (id, original_url, workerid, date) VALUES (?, ?, ?, ?)";
PreparedStatement preparedInsertAssignmentStatement;
try { // We use a "PreparedStatement" to do insertions, for security and performance reasons.
preparedInsertAssignmentStatement = con.prepareStatement(insertIntoAssignmentBaseQuery);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when creating the prepared statement for \"insertIntoAssignmentBaseQuery\"!\n";
logger.error(errorMsg + sqle.getMessage());
try {
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
// Before, we wanted to execute the getAssignmentQuery and take the assignments immediately, but now it's more efficient to commit all the inserts in the end.
try {
con.setAutoCommit(false);
} catch (SQLException sqle) { // There is a database-error. The latter actions will probably fail as well.
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
closePreparedStatements(preparedInsertAssignmentStatement, null, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
String tempFullQueryString = null;
for ( Assignment assignment : assignments ) {
try {
preparedInsertAssignmentStatement.setString(1, assignment.getId());
preparedInsertAssignmentStatement.setString(2, assignment.getOriginalUrl());
preparedInsertAssignmentStatement.setString(3, workerId);
preparedInsertAssignmentStatement.setDate(4, date);
tempFullQueryString = getAssignmentsPreparedStatement.toString();
preparedInsertAssignmentStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoAssignmentQuery\":\n" + tempFullQueryString + "\n" + sqle.getMessage() + "\n\n");
}
}//end for-loop
try {
con.commit(); // Send all the insert-queries to the database.
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when committing changes to the database!";
logger.error(errorMsg + "\n" + sqle.getMessage());
closePreparedStatements(preparedInsertAssignmentStatement, null, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = mergeParquetFiles("assignment", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
closePreparedStatements(preparedInsertAssignmentStatement, null, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
try {
con.commit(); // Apply the merge.
con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool.
} catch (SQLException sqle) {
String errorMsg = "Problem when committing changes to the database!";
logger.error(errorMsg , sqle);//+ "\n" + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
ImpalaConnector.databaseLock.unlock();
closePreparedStatements(preparedInsertAssignmentStatement, null, con);
}
logger.info("Sending batch_" + assignmentsBatchCounter.incrementAndGet() + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentResponse(assignmentsBatchCounter.get(), assignments));
}
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport) {
if ( workerReport == null )
return ResponseEntity.status(HttpStatus.BAD_REQUEST).build();
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
logger.debug("Received the WorkerReport:\n" + workerReport.toString());
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || urlReports.isEmpty() ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + workerReport.getWorkerId() + "\" was empty!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
// TODO - Store the workerReport into the database.
logger.info("Received the WorkerReport for batch_ " + workerReport.getAssignmentRequestCounter() + ", from the worker with id: " + workerReport.getWorkerId() + ". It contains " + urlReports.size() + " urlReports. Going to insert them into the database.");
return ResponseEntity.status(HttpStatus.OK).build();
// TODO - The Controller will have to request the files from the Worker, in order to upload them to the S3, in the future.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
// Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
String tempInsertQueryName = null;
PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null;
try {
tempInsertQueryName = "insertIntoPayloadBaseQuery";
preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery);
tempInsertQueryName = "insertIntoAttemptBaseQuery";
preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery);
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n";
logger.error(errorMsg + sqle.getMessage());
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
try {
con.setAutoCommit(false);
} catch (SQLException sqle) {
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
String payloadErrorMsg = null;
int failedCount = 0;
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.error("Payload was \"null\" for a \"urlReport\"!");
// TODO - A bit rare to happen.. but what should we do? (otherwise an NPE will be thrown later)
// We can't just create an empty object and proceed, since the payload is as important as the whole row to be inserted..
// TODO - Just add it in an errorMessage to be returned in the end. Should it return HTTP-200 but with a small error message along?
payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!";
continue;
}
String tempFullQueryString = null;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertPayloadStatement.setString(1, payload.getId());
preparedInsertPayloadStatement.setString(2, payload.getOriginal_url());
preparedInsertPayloadStatement.setString(3, payload.getActual_url());
preparedInsertPayloadStatement.setDate(4, payload.getDate_acquired());
preparedInsertPayloadStatement.setString(5, payload.getMime_type());
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
preparedInsertPayloadStatement.setString(6, String.valueOf(payload.getSize()));
preparedInsertPayloadStatement.setString(7, payload.getHash());
preparedInsertPayloadStatement.setString(8, payload.getLocation());
preparedInsertPayloadStatement.setString(9, payload.getProvenance());
tempFullQueryString = preparedInsertPayloadStatement.toString();
preparedInsertPayloadStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\":\n" + tempFullQueryString + "\n" + sqle.getMessage() + "\n\n");
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of the loop)
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertAttemptStatement.setString(1, payload.getId());
preparedInsertAttemptStatement.setString(2, payload.getOriginal_url());
preparedInsertAttemptStatement.setDate(3, payload.getDate_acquired());
preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString());
preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null".
preparedInsertAttemptStatement.setString(6, error.getMessage());
tempFullQueryString = preparedInsertAttemptStatement.toString();
preparedInsertAttemptStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\":\n" + tempFullQueryString + "\n" + sqle.getMessage() + "\n\n");
}
}//end for-loop
ImpalaConnector.databaseLock.lock();
try {
con.commit(); // Send all the insert-queries to the database.
} catch (SQLException sqle) {
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here!
}
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
String mergeErrorMsg = mergeParquetFiles("payload", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
try { con.close(); }
catch (SQLException sqle) { logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); }
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
mergeErrorMsg = mergeParquetFiles("attempt", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
try { con.close(); }
catch (SQLException sqle) { logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); }
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
try {
con.commit(); // Apply the merges.
con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool.
} catch (SQLException sqle) {
String errorMsg = "Problem when committing changes to the database!";
logger.error(errorMsg + "\n" + sqle.getMessage());
// The statements used in "mergeParquetFiles()" are already closed.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
ImpalaConnector.databaseLock.unlock();
try { con.close(); }
catch (SQLException sqle) { logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); }
}
return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg);
}
/**
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
* Renames the clone to the original's name.
* Returns the errorMsg, if an error appears, otherwise is returns "null".
* */
private static String mergeParquetFiles(String tableName, Connection con)
{
String errorMsg;
if ( tableName == null ) {
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
logger.error(errorMsg);
return errorMsg;
}
Statement statement;
try {
statement = con.createStatement();
} catch (SQLException sqle) {
errorMsg = "Problem when creating a connection-statement!\n";
logger.error(errorMsg + sqle.getMessage());
return errorMsg;
}
try {
statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName);
statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
} catch (SQLException sqle) {
errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage()));
sqle.printStackTrace();
return errorMsg;
} finally {
// Make sure we close the statement.
try { statement.close(); }
catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); }
}
return null; // No errorMsg, everything is fine.
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException
{
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for ( int i=1; i <= dataSize; ++i ) {
sb.append("(");
for ( int j=1; j <= numParamsPerRow; ++j ) {
sb.append("?");
if ( j < numParamsPerRow )
sb.append(",");
}
sb.append(")");
if ( i < dataSize )
sb.append(",");
}
PreparedStatement preparedInsertStatement;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con.prepareStatement(sb.toString());
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
logger.error(errorMsg + sqle.getMessage());
throw new RuntimeException(errorMsg);
}
return preparedInsertStatement;
}
private static String getCutBatchExceptionMessage(String sqleMessage)
{
// The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging.
int maxEnding = 1500;
if ( sqleMessage.length() > maxEnding )
return (sqleMessage.substring(0, maxEnding) + "...");
else
return sqleMessage;
}
private boolean closePreparedStatements(PreparedStatement preparedStatement1, PreparedStatement preparedStatement2, Connection con) {
try {
if ( preparedStatement1 != null )
preparedStatement1.close();
if ( preparedStatement2 != null )
preparedStatement2.close();
if ( con != null )
con.close(); // It may have already closed and that's fine.
return true;
}
catch (SQLException sqle) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage());
return false;
}
}
@GetMapping("test")
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
try {
new FileUtils(); // Find the input file.
} catch (Exception e) {
logger.error(e.getMessage());
return ResponseEntity.status(500).body("The resource file, for the requested assignments, was not found.");
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The resource file, for the requested assignments, was not found.");
}
List<Assignment> assignments = new ArrayList<>();
HashMultimap<String, String> loadedIdUrlPairs;
boolean isFirstRun = true;
boolean assignmentsLimitReached = false;
Date date = new Date();
Date date = new Date(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
// Start loading urls.
while ( true ) {
@ -120,12 +570,13 @@ public class UrlController {
}
}// end loading-while-loop
if ( FileUtils.inputScanner.get() != null ) // Check if the initial value is null.
FileUtils.inputScanner.get().close();
Scanner scanner = FileUtils.inputScanner.get();
if ( scanner != null ) // Check if the initial value is null.
scanner.close();
logger.info("Sending AssignmentResponse_" + assignmentCounter.incrementAndGet() + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
logger.info("Sending batch_" + assignmentsBatchCounter.incrementAndGet() + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
return ResponseEntity.status(200).header("Content-Type", "application/json").body(new AssignmentResponse(assignmentCounter.get(), assignments));
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentResponse(assignmentsBatchCounter.get(), assignments));
}
}

View File

@ -3,6 +3,8 @@ package eu.openaire.urls_controller.util;
public interface ControllerConstants {
int ASSIGNMENTS_LIMIT = 10000; // The general assignments-limit the Controller will get. If the worker cannot handle them, then the worker's limit will be applied.
int ASSIGNMENTS_LIMIT = 100_000; // The upper assignments-limit the Controller can handle. If the worker's limit is above this one, then the controller's limit is used. Otherwise, the worker's limit will be applied.
int MAX_ATTEMPTS_PER_RECORD = 3; // The maximum times a record can be processed, if each of the previous times failed with a "couldRetry" Error-Class.
}

View File

@ -15,6 +15,22 @@ server.port = 1880
# Server api path
server.servlet.context-path=/api
# Database
spring.impala.url = jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/
# Note: The "UseNativeQuery" does not work with the PreparedStatements! Also, the url does not work without the ending "/"
# The username and the password do not matter, since this app is always run in an pre-authenticated machine.
spring.impala.driver-class-name = com.cloudera.impala.jdbc41.Driver
spring.datasource.hikari.pool-name=ControllerPool
spring.datasource.hikari.maximumPoolSize=20
spring.datasource.hikari.maxLifetime=1800000
spring.datasource.hikari.minimumIdle=4
spring.datasource.hikari.connectionTimeout=30000
spring.datasource.hikari.idleTimeout=600000
# LOGGING LEVELS
logging.level.root=INFO
logging.level.org.springframework.web=INFO

View File

@ -24,7 +24,7 @@
</appender>
<root level="debug">
<appender-ref ref="Console" /> <!-- TODO - Change it to "File" in production! -->
<appender-ref ref="File" />
</root>
</configuration>