diff --git a/build.gradle b/build.gradle index 151579d..93b3729 100644 --- a/build.gradle +++ b/build.gradle @@ -5,11 +5,20 @@ plugins { } group = 'eu.openaire.urls_controller' -version = '0.0.1-SNAPSHOT' +version = '1.0.0-SNAPSHOT' sourceCompatibility = '1.8' repositories { mavenCentral() + maven { + name "icm" + url "http://maven.icm.edu.pl/artifactory/repo/" + allowInsecureProtocol = true + } + maven { + name "pentaho-repo" + url "https://public.nexus.pentaho.org/content/groups/omni/" + } } dependencies { @@ -32,13 +41,18 @@ dependencies { implementation "org.projectlombok:lombok:1.18.22" // https://mvnrepository.com/artifact/com.google.guava/guava - implementation group: 'com.google.guava', name: 'guava', version: '30.1.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre' - implementation "org.projectlombok:lombok:1.18.20" - implementation group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final' + // https://mvnrepository.com/artifact/com.cloudera.impala/jdbc + implementation group: 'com.cloudera.impala', name: 'jdbc', version: '2.5.31' - testImplementation group: 'org.springframework.security', name: 'spring-security-test', version: springSecurityVersion - testImplementation 'org.springframework.boot:spring-boot-starter-test' + testImplementation group: 'org.springframework.security', name: 'spring-security-test' + testImplementation "org.springframework.boot:spring-boot-starter-test" +} + +configurations { + // Eliminates slf4j-log4j12 + all*.exclude group: 'org.slf4j', module: 'slf4j-log4j12' } test { diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/Application.java index 8fb2bef..9dbc78e 100644 --- a/src/main/java/eu/openaire/urls_controller/Application.java +++ b/src/main/java/eu/openaire/urls_controller/Application.java @@ -1,6 +1,7 @@ package eu.openaire.urls_controller; +import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.util.UriBuilder; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; @@ -12,6 +13,7 @@ import org.springframework.web.cors.CorsConfiguration; import org.springframework.web.cors.CorsConfigurationSource; import org.springframework.web.cors.UrlBasedCorsConfigurationSource; +import javax.annotation.PreDestroy; import java.util.Arrays; import java.util.Collections; @@ -36,6 +38,14 @@ public class Application { } + @PreDestroy + public static void preDestroy() + { + if ( ImpalaConnector.hikariDataSource != null ) + ImpalaConnector.hikariDataSource.close(); + } + + @Bean public CommandLineRunner setServerBaseUrl(Environment environment) { diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index 5fd3181..311ebca 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -16,7 +16,7 @@ public class ScheduledTasks { private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); - @Scheduled(fixedRate = 600_000) // TODO - Change to every 10 mins: 600_000 + //@Scheduled(fixedRate = 600_000) // Run every 10 mins: 600_000 public void reportCurrentTime() { logger.info("Server is live! Time is now {}", dateFormat.format(new Date())); } diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java new file mode 100644 index 0000000..265e0e5 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -0,0 +1,239 @@ +package eu.openaire.urls_controller.configuration; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.beans.PropertyVetoException; +import java.io.File; +import java.io.FileReader; +import java.sql.*; +import java.util.Properties; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +public final class ImpalaConnector { + + private static final Logger logger = LoggerFactory.getLogger(ImpalaConnector.class); + + public static String impalaDriver; + public static String impalaConnectionUrl; + public static String poolName; + public static int hikariMaxConnectionPoolSize; + public static int hikariMinIdleConnections; + public static int hikariConnectionTimeOut; + public static int hikariIdleTimeOut; + public static int hikariMaxLifetime; + + public static String oldDatabaseName = "pdfaggregation_i"; + public static String databaseName = "pdfAggregationDatabase"; + + public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database. + + public static HikariDataSource hikariDataSource; + + private static final ImpalaConnector singletonObject = new ImpalaConnector(); + + public static ImpalaConnector getInstance() + { + return singletonObject; + } + + + public HikariDataSource impalaDS() throws SQLException, PropertyVetoException + { + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setDriverClassName(ImpalaConnector.impalaDriver); + hikariConfig.setAutoCommit(true); + hikariConfig.setJdbcUrl(ImpalaConnector.impalaConnectionUrl); + hikariConfig.setPoolName(poolName); + hikariConfig.setMaximumPoolSize(hikariMaxConnectionPoolSize); + hikariConfig.setMaxLifetime(hikariMaxLifetime); + hikariConfig.setMinimumIdle(hikariMinIdleConnections); + hikariConfig.setConnectionTimeout(hikariConnectionTimeOut); + hikariConfig.setIdleTimeout(hikariIdleTimeOut); + return new HikariDataSource(hikariConfig); + } + + + public ImpalaConnector() + { + logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes."); + + try { + String dbSettingsPropertyFile = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "application.properties"; + FileReader fReader = new FileReader(dbSettingsPropertyFile); + Properties props = new Properties(); + props.load(fReader); // Load jdbc related properties. + + // Get each property value. + impalaDriver = props.getProperty("spring.impala.driver-class-name"); + if ( !"".equals(impalaDriver) ) { // If not "null" or empty. + Class.forName(impalaDriver); + impalaConnectionUrl = props.getProperty("spring.impala.url"); + poolName = props.getProperty("spring.datasource.hikari.pool-name"); + hikariMaxConnectionPoolSize = Integer.parseInt(props.getProperty("spring.datasource.hikari.maximumPoolSize")); + hikariMaxLifetime = Integer.parseInt(props.getProperty("spring.datasource.hikari.maxLifetime")); + hikariMinIdleConnections = Integer.parseInt(props.getProperty("spring.datasource.hikari.minimumIdle")); + hikariConnectionTimeOut = Integer.parseInt(props.getProperty("spring.datasource.hikari.connectionTimeout")); + hikariIdleTimeOut = Integer.parseInt(props.getProperty("spring.datasource.hikari.idleTimeout")); + } else + throw new RuntimeException("The \"impalaDriver\" was null or empty!"); + } catch(Exception e) { + logger.error("Error when loading the database properties!\n" + e.getMessage()); + e.printStackTrace(); + System.exit(11); + } + + try { + hikariDataSource = impalaDS(); + } catch (SQLException | PropertyVetoException e) { + logger.error("Problem when creating the Hikari connection pool!"); + e.printStackTrace(); + } + + createDatabase(); + } + + + public void createDatabase() + { + databaseLock.lock(); // Make sure the database and tables are created before the workers can request assignments. + Connection con = getConnection(); + if ( con == null ) { + databaseLock.unlock(); + System.exit(22); + } + + try { + if ( !con.getMetaData().supportsBatchUpdates() ) + logger.warn("The database does not support \"BatchUpdates\"!"); + } catch (SQLException e) { + e.printStackTrace(); + } + + logger.info("Going to create the database and the tables, if they do not exist. Also will fill some tables with data from OpenAIRE."); + Statement statement = null; + try { + statement = con.createStatement(); + } catch (SQLException sqle) { + logger.error("Problem when creating a connection-statement!\n" + sqle.getMessage()); + try { + con.close(); + } catch (SQLException sqle2) { + logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage()); + } + databaseLock.unlock(); + System.exit(33); + } + + try { + statement.execute("CREATE DATABASE IF NOT EXISTS " + databaseName); + + statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + oldDatabaseName + ".publication"); + statement.execute("COMPUTE STATS " + databaseName + ".publication"); + + statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + oldDatabaseName + ".publication_pids"); + statement.execute("COMPUTE STATS " + databaseName + ".publication_pids"); + + statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + oldDatabaseName + ".publication_urls"); + statement.execute("COMPUTE STATS " + databaseName + ".publication_urls"); + + statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + oldDatabaseName + ".datasource"); + statement.execute("COMPUTE STATS " + databaseName + ".datasource"); + + statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet"); + statement.execute("COMPUTE STATS " + databaseName + ".assignment"); + + statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet"); + statement.execute("COMPUTE STATS " + databaseName + ".attempt"); + + statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet"); + statement.execute("COMPUTE STATS " + databaseName + ".payload"); + } catch (SQLException sqle) { + logger.error("Problem when executing the \"create database and create tables queries!\n" + sqle.getMessage() + "\nSQL state: " + sqle.getSQLState() + "\nError code: " + sqle.getErrorCode()); + sqle.printStackTrace(); + System.exit(44); + } finally { + databaseLock.unlock(); + try { + statement.close(); + con.close(); + } catch (SQLException sqle2) { + logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage()); + } + } + + logger.info("The database \"" + databaseName + "\" and its tables were created or validated."); + } + + + public Connection getConnection() + { + try { + return hikariDataSource.getConnection(); + //return DriverManager.getConnection(impalaConnectionUrl, null, null); // This is for non pooled connections. + } catch (SQLException sqle) { + logger.error("Problem when connecting with the Impala-database!\n" + sqle.getMessage()); + return null; + } + } + + + public boolean testDatabaseAccess() + { + logger.info("Going to test Impala access.."); + Connection con = getConnection(); + if ( con == null ) + return false; + + ResultSet res = null; + try { + String tableName = "publication"; + + // show tables + String sql = "show tables '" + tableName + "'"; + logger.debug("Running: " + sql); + res = con.prepareStatement(sql).executeQuery(); + if ( res.next() ) { + logger.debug(res.getString(1)); + } + + // describe table + sql = "describe " + tableName; + logger.debug("Running: " + sql); + res = con.prepareStatement(sql).executeQuery(); + while ( res.next() ) { + logger.debug(res.getString(1) + "\t" + res.getString(2)); + } + + // select * query + sql = "select * from " + tableName + " limit 3;"; + logger.debug("Running: " + sql); + res = con.prepareStatement(sql).executeQuery(); + while ( res.next() ) { + logger.debug(res.getString(1)); + } + + // Get Assignments, only for testing here. + //UrlController urlController = new UrlController(); + //ResponseEntity responseEntity = urlController.getUrls("worker_1", ControllerConstants.ASSIGNMENTS_LIMIT); + //logger.debug(responseEntity.toString()); + + } catch (SQLException sqle) { + sqle.printStackTrace(); + return false; + } finally { + try { + if ( res != null ) + res.close(); + con.close(); + } catch (SQLException sqle) { + logger.error("Could not close the connection with the Impala-database.\n" + sqle); + } + } + return true; + } + +} diff --git a/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java b/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java new file mode 100644 index 0000000..360042e --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java @@ -0,0 +1,64 @@ +package eu.openaire.urls_controller.controllers; + +import eu.openaire.urls_controller.configuration.ImpalaConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +@RestController +@RequestMapping("/impala") +public class ImpalaController { + + // This controller will test the connectivity with the database and return statistics! + + private static final Logger logger = LoggerFactory.getLogger(ImpalaController.class); + + + @GetMapping("get10PublicationIdsTest") + public ResponseEntity get10PublicationIdsTest() { + + Connection con = ImpalaConnector.getInstance().getConnection(); + if ( con == null ) + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!"); + + String query = "SELECT id FROM publication LIMIT 10;"; + + try ( ResultSet res = con.prepareStatement(query).executeQuery()) { + if ( !res.first() ) { + String errorMsg = "No results retrieved from the \"getAssignmentsQuery\"!"; + logger.error(errorMsg); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } + + List publications = new ArrayList<>(); + do { + publications.add(res.getString(0)); + } while ( res.next() ); + + return new ResponseEntity(publications.toString(), HttpStatus.OK); + + } catch (Exception e) { + String errorMsg = "Problem when executing \"getAssignmentsQuery\": " + query; + logger.error(errorMsg); + e.printStackTrace(); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } finally { + try { + con.close(); + } catch (SQLException sqle) { + logger.error("Could not close the connection with the Impala-database.\n" + sqle); + } + } + } + +} diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index d9cbaa6..4e3803c 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -1,12 +1,11 @@ package eu.openaire.urls_controller.controllers; import com.google.common.collect.HashMultimap; -import eu.openaire.urls_controller.models.Assignment; -import eu.openaire.urls_controller.models.Datasource; -import eu.openaire.urls_controller.models.Task; +import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.models.Error; +import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.payloads.requests.WorkerReport; import eu.openaire.urls_controller.payloads.responces.AssignmentResponse; -//import eu.openaire.urls_controller.repositories.AssignmentRepository; import eu.openaire.urls_controller.util.ControllerConstants; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.GenericUtils; @@ -16,6 +15,9 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import java.sql.*; + +import java.sql.Date; import java.util.*; import java.util.concurrent.atomic.AtomicLong; @@ -25,72 +27,520 @@ public class UrlController { private static final Logger logger = LoggerFactory.getLogger(UrlController.class); - - private static AtomicLong assignmentCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint. - - } + private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint. @GetMapping("") public ResponseEntity getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) { - List tasks = new ArrayList<>(); + logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT); - // TODO - Create the Assignment from the id-urls stored in the database up to the tasks-limit. + // Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >. - // TODO - Make sure the Date is the same for all entries! - Date date = new Date(); // Store it here, in order to have the same for sure. + // Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server. + int assignmentsLimit = workerAssignmentsLimit; + if ( assignmentsLimit == 0 ) { + String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!"; + logger.error(errorMsg); + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg); + } else if ( assignmentsLimit > ControllerConstants.ASSIGNMENTS_LIMIT ) { + logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + ControllerConstants.ASSIGNMENTS_LIMIT + "). Will use the Controller's limit."); + assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT; + } - int assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT; - if ( assignmentsLimit > workerAssignmentsLimit ) - assignmentsLimit = workerAssignmentsLimit; + String getAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" + + "from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" + + "select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" + + "from " + ImpalaConnector.databaseName + ".publication p\n" + + "join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" + + "join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" + + "left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" + + "left outer join (\n" + + " select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" + + " union all\n" + + " select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" + + "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= ? and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry')\n" + + ") as non_distinct_results\n" + + "order by coalesce(attempt_count, 0), reverse(pubid), url\n" + + "limit ?) as getAssignmentsQuery"; + // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. - int tasksLimitForAssignment = ControllerConstants.ASSIGNMENTS_LIMIT; - if ( tasksLimitForAssignment > workerTasksLimit ) - tasksLimitForAssignment = workerTasksLimit; + // TODO - If we add more limits it could be faster.. Inner queries could have a limit of e.g. < assignmentsLimit ^ 2 > + // The LIMIT of < assignmentsLimit > should be kept in the end, as we want 10_000 of distinct results. - List assignments = null; // TODO -> // assignmentRepository.getNewAssignments(tasksLimitForAssignment); + // This is just for tests without the attempts, payloads and the assignments + /*String getAssignmentsQuery = "select * from (select distinct pubid, url, datasourceid, datasourcetype from (\n" + + "select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype\n" + + "from " + ImpalaConnector.databaseName + ".publication p\n" + + "join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" + + "join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" + + "where d.allow_harvest=true " + + "order by reverse(p.id), pu.url) as distinct_results\n" + + "limit ? ) as getAssignmentsQuery";*/ - //Assignment assignment = new Assignment(assignmentId, tasks, workerId, date); + List assignments = new ArrayList<>(assignmentsLimit); - // TODO - Write the Assignment details to the database and then send it to the worker. + ImpalaConnector.databaseLock.lock(); - logger.info("Sending assignment_" + assignmentCounter.incrementAndGet() + " to worker with ID: " + workerId); + Connection con = ImpalaConnector.getInstance().getConnection(); + if ( con == null ) { // This is already logged in "getConnection()". + ImpalaConnector.databaseLock.unlock(); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!"); + } - return ResponseEntity.status(200).header("Content-Type", "application/json").body(new AssignmentResponse(assignmentCounter.get(), assignments)); + PreparedStatement getAssignmentsPreparedStatement = null; + try { + getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery); + getAssignmentsPreparedStatement.setInt(1, ControllerConstants.MAX_ATTEMPTS_PER_RECORD); + getAssignmentsPreparedStatement.setInt(2, assignmentsLimit); + } catch (SQLException sqle) { + ImpalaConnector.databaseLock.unlock(); + String errorMsg = "Problem when creating the prepared statement for \"getAssignmentsQuery\"!\n"; + logger.error(errorMsg + sqle.getMessage()); + try { + if ( getAssignmentsPreparedStatement != null ) + getAssignmentsPreparedStatement.close(); + } catch (SQLException sqle2) { + logger.error("Could not close the \"getAssignmentsPreparedStatement\".\n" + sqle2.getMessage()); + } + try { + con.close(); + } catch (SQLException sqle2) { + logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage()); + } + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } + + Date date = new Date(System.currentTimeMillis()); // Store it here, in order to have the same for all current records. + + try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) { + // Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it. + /*if ( !resultSet.first() ) { + ImpalaConnector.databaseLock.unlock(); + String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId; + logger.error(errorMsg); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + }*/ + + // The cursor is automatically before the first element in this configuration. + while ( resultSet.next() ) { + // The following few lines, cannot be outside the "while" loop, since the same record is returned, despite that we update the inner-values. + Assignment assignment = new Assignment(); + assignment.setWorkerId(workerId); + assignment.setDate(date); + Datasource datasource = new Datasource(); + try { // For each of the 4 columns returned. The indexing starts from 1 + assignment.setId(resultSet.getString(1)); + assignment.setOriginalUrl(resultSet.getString(2)); + datasource.setId(resultSet.getString(3)); + datasource.setName(resultSet.getString(4)); + } catch (SQLException sqle) { + logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow()); + sqle.printStackTrace(); + continue; // This object is broken, move to the next row. + } + assignment.setDatasource(datasource); + assignments.add(assignment); + } + } catch (Exception e) { + ImpalaConnector.databaseLock.unlock(); + String errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n"; + logger.error(errorMsg, e); + try { + con.close(); + } catch (SQLException sqle2) { + logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage()); + } + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } finally { + try { + getAssignmentsPreparedStatement.close(); + } catch (SQLException sqle) { + logger.error("Failed to close the \"getAssignmentsPreparedStatement\"!\n" + sqle.getMessage()); + } + } + + int assignmentsSize = assignments.size(); + if ( assignmentsSize == 0 ) { + ImpalaConnector.databaseLock.unlock(); + String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId; + logger.error(errorMsg); + try { + con.close(); + } catch (SQLException sqle2) { + logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage()); + } + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } + + logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker."); + + // The following is a test of inserting multiple rows with a singme insert-query. If applied with a preparedStatement, then the JDBC fails with "OutOfMemory"-Error. + /*String testInsert = "INSERT INTO assignment (id,original_url,workerid,`date`) VALUES ( 'doiboost____::4e8b1f12ac3ba5a9d8fbff9872000000', 'http://dx.doi.org/10.17267/2596-3368dentistry.v6i2.586', 'worker_1', CAST('2021-10-01' AS TIMESTAMP) ) , ( 'doiboost____::4e8b1f12ac3ba5a9d8fbff9872000000', 'https://academic.microsoft.com/#/detail/2887540302', 'worker_1', CAST('2021-10-01' AS TIMESTAMP) );"; + try (Statement insertStatement = con.createStatement()) { + insertStatement.execute(testInsert); + } catch (SQLException sqle) { + ImpalaConnector.databaseLock.unlock(); + String mergeErrorMsg = "Problem when executing the testInsert statement for \"" + testInsert + "\""; + logger.error(mergeErrorMsg + sqle.getMessage()); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); + }*/ + + // Write the Assignment details to the database and then send it to the worker. + String insertIntoAssignmentBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".assignment (id, original_url, workerid, date) VALUES (?, ?, ?, ?)"; + + PreparedStatement preparedInsertAssignmentStatement; + try { // We use a "PreparedStatement" to do insertions, for security and performance reasons. + preparedInsertAssignmentStatement = con.prepareStatement(insertIntoAssignmentBaseQuery); + } catch (SQLException sqle) { + ImpalaConnector.databaseLock.unlock(); + String errorMsg = "Problem when creating the prepared statement for \"insertIntoAssignmentBaseQuery\"!\n"; + logger.error(errorMsg + sqle.getMessage()); + try { + con.close(); + } catch (SQLException sqle2) { + logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage()); + } + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } + + // Before, we wanted to execute the getAssignmentQuery and take the assignments immediately, but now it's more efficient to commit all the inserts in the end. + try { + con.setAutoCommit(false); + } catch (SQLException sqle) { // There is a database-error. The latter actions will probably fail as well. + ImpalaConnector.databaseLock.unlock(); + String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!"; + logger.error(errorMsg + "\n" + sqle.getMessage()); + closePreparedStatements(preparedInsertAssignmentStatement, null, con); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } + + String tempFullQueryString = null; + for ( Assignment assignment : assignments ) { + try { + preparedInsertAssignmentStatement.setString(1, assignment.getId()); + preparedInsertAssignmentStatement.setString(2, assignment.getOriginalUrl()); + preparedInsertAssignmentStatement.setString(3, workerId); + preparedInsertAssignmentStatement.setDate(4, date); + tempFullQueryString = getAssignmentsPreparedStatement.toString(); + preparedInsertAssignmentStatement.executeUpdate(); + } catch (SQLException sqle) { + logger.error("Problem when executing the \"insertIntoAssignmentQuery\":\n" + tempFullQueryString + "\n" + sqle.getMessage() + "\n\n"); + } + }//end for-loop + + try { + con.commit(); // Send all the insert-queries to the database. + } catch (SQLException sqle) { + ImpalaConnector.databaseLock.unlock(); + String errorMsg = "Problem when committing changes to the database!"; + logger.error(errorMsg + "\n" + sqle.getMessage()); + closePreparedStatements(preparedInsertAssignmentStatement, null, con); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } + + logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table."); + + String mergeErrorMsg = mergeParquetFiles("assignment", con); + if ( mergeErrorMsg != null ) { + ImpalaConnector.databaseLock.unlock(); + closePreparedStatements(preparedInsertAssignmentStatement, null, con); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); + } + + try { + con.commit(); // Apply the merge. + con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool. + } catch (SQLException sqle) { + String errorMsg = "Problem when committing changes to the database!"; + logger.error(errorMsg , sqle);//+ "\n" + sqle.getMessage()); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } finally { + ImpalaConnector.databaseLock.unlock(); + closePreparedStatements(preparedInsertAssignmentStatement, null, con); + } + + logger.info("Sending batch_" + assignmentsBatchCounter.incrementAndGet() + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + "."); + return ResponseEntity.status(HttpStatus.OK).body(new AssignmentResponse(assignmentsBatchCounter.get(), assignments)); } + @PostMapping("addWorkerReport") public ResponseEntity addWorkerReport(@RequestBody WorkerReport workerReport) { - if ( workerReport == null ) - return ResponseEntity.status(HttpStatus.BAD_REQUEST).build(); + if ( workerReport == null ) { + String errorMsg = "No \"WorkerReport\" was given!"; + logger.error(errorMsg); + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg); + } - logger.debug("Received the WorkerReport:\n" + workerReport.toString()); + List urlReports = workerReport.getUrlReports(); + if ( (urlReports == null) || urlReports.isEmpty() ) { + String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + workerReport.getWorkerId() + "\" was empty!"; + logger.error(errorMsg); + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg); + } - // TODO - Store the workerReport into the database. + logger.info("Received the WorkerReport for batch_ " + workerReport.getAssignmentRequestCounter() + ", from the worker with id: " + workerReport.getWorkerId() + ". It contains " + urlReports.size() + " urlReports. Going to insert them into the database."); - return ResponseEntity.status(HttpStatus.OK).build(); + + // TODO - The Controller will have to request the files from the Worker, in order to upload them to the S3, in the future. + // We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database. + + + Connection con = ImpalaConnector.getInstance().getConnection(); + if ( con == null ) + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!"); + + // Store the workerReport into the database. + String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; + + String tempInsertQueryName = null; + PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null; + try { + tempInsertQueryName = "insertIntoPayloadBaseQuery"; + preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery); + tempInsertQueryName = "insertIntoAttemptBaseQuery"; + preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery); + } catch (SQLException sqle) { + String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n"; + logger.error(errorMsg + sqle.getMessage()); + closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } + + try { + con.setAutoCommit(false); + } catch (SQLException sqle) { + String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!"; + logger.error(errorMsg + "\n" + sqle.getMessage()); + closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } + + String payloadErrorMsg = null; + int failedCount = 0; + + for ( UrlReport urlReport : urlReports ) { + Payload payload = urlReport.getPayload(); + if ( payload == null ) { + logger.error("Payload was \"null\" for a \"urlReport\"!"); + // TODO - A bit rare to happen.. but what should we do? (otherwise an NPE will be thrown later) + // We can't just create an empty object and proceed, since the payload is as important as the whole row to be inserted.. + // TODO - Just add it in an errorMessage to be returned in the end. Should it return HTTP-200 but with a small error message along? + payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!"; + continue; + } + + String tempFullQueryString = null; + + try { // We use a "PreparedStatement" to do insertions, for security reasons. + preparedInsertPayloadStatement.setString(1, payload.getId()); + preparedInsertPayloadStatement.setString(2, payload.getOriginal_url()); + preparedInsertPayloadStatement.setString(3, payload.getActual_url()); + preparedInsertPayloadStatement.setDate(4, payload.getDate_acquired()); + preparedInsertPayloadStatement.setString(5, payload.getMime_type()); + // The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers. + preparedInsertPayloadStatement.setString(6, String.valueOf(payload.getSize())); + preparedInsertPayloadStatement.setString(7, payload.getHash()); + preparedInsertPayloadStatement.setString(8, payload.getLocation()); + preparedInsertPayloadStatement.setString(9, payload.getProvenance()); + tempFullQueryString = preparedInsertPayloadStatement.toString(); + preparedInsertPayloadStatement.executeUpdate(); + } catch (SQLException sqle) { + logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\":\n" + tempFullQueryString + "\n" + sqle.getMessage() + "\n\n"); + } + + Error error = urlReport.getError(); + if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of the loop) + logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members."); + error = new Error(null, null); + } + + try { // We use a "PreparedStatement" to do insertions, for security reasons. + preparedInsertAttemptStatement.setString(1, payload.getId()); + preparedInsertAttemptStatement.setString(2, payload.getOriginal_url()); + preparedInsertAttemptStatement.setDate(3, payload.getDate_acquired()); + preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString()); + preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null". + preparedInsertAttemptStatement.setString(6, error.getMessage()); + tempFullQueryString = preparedInsertAttemptStatement.toString(); + preparedInsertAttemptStatement.executeUpdate(); + } catch (SQLException sqle) { + logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\":\n" + tempFullQueryString + "\n" + sqle.getMessage() + "\n\n"); + } + }//end for-loop + + ImpalaConnector.databaseLock.lock(); + try { + con.commit(); // Send all the insert-queries to the database. + } catch (SQLException sqle) { + String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!"; + logger.error(errorMsg + "\n" + sqle.getMessage()); + ImpalaConnector.databaseLock.unlock(); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } finally { + closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here! + } + + logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables."); + + String mergeErrorMsg = mergeParquetFiles("payload", con); + if ( mergeErrorMsg != null ) { + ImpalaConnector.databaseLock.unlock(); + try { con.close(); } + catch (SQLException sqle) { logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); } + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); + } + + mergeErrorMsg = mergeParquetFiles("attempt", con); + if ( mergeErrorMsg != null ) { + ImpalaConnector.databaseLock.unlock(); + try { con.close(); } + catch (SQLException sqle) { logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); } + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); + } + + try { + con.commit(); // Apply the merges. + con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool. + } catch (SQLException sqle) { + String errorMsg = "Problem when committing changes to the database!"; + logger.error(errorMsg + "\n" + sqle.getMessage()); + // The statements used in "mergeParquetFiles()" are already closed. + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } finally { + ImpalaConnector.databaseLock.unlock(); + try { con.close(); } + catch (SQLException sqle) { logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); } + } + + return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg); + } + + + /** + * In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files.. + * This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table. + * Renames the clone to the original's name. + * Returns the errorMsg, if an error appears, otherwise is returns "null". + * */ + private static String mergeParquetFiles(String tableName, Connection con) + { + String errorMsg; + if ( tableName == null ) { + errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!"; + logger.error(errorMsg); + return errorMsg; + } + + Statement statement; + try { + statement = con.createStatement(); + } catch (SQLException sqle) { + errorMsg = "Problem when creating a connection-statement!\n"; + logger.error(errorMsg + sqle.getMessage()); + return errorMsg; + } + + try { + statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName); + statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE"); + statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName); + statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName); + } catch (SQLException sqle) { + errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n"; + logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage())); + sqle.printStackTrace(); + return errorMsg; + } finally { + // Make sure we close the statement. + try { statement.close(); } + catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); } + } + + return null; // No errorMsg, everything is fine. + } + + + // The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution. + // Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested. + private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException + { + StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use. + sb.append(baseInsertQuery); + for ( int i=1; i <= dataSize; ++i ) { + sb.append("("); + for ( int j=1; j <= numParamsPerRow; ++j ) { + sb.append("?"); + if ( j < numParamsPerRow ) + sb.append(","); + } + sb.append(")"); + if ( i < dataSize ) + sb.append(","); + } + + PreparedStatement preparedInsertStatement; + try { // We use a "PreparedStatement" to do insertions, for security reasons. + preparedInsertStatement = con.prepareStatement(sb.toString()); + } catch (SQLException sqle) { + String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n"; + logger.error(errorMsg + sqle.getMessage()); + throw new RuntimeException(errorMsg); + } + return preparedInsertStatement; + } + + + private static String getCutBatchExceptionMessage(String sqleMessage) + { + // The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging. + int maxEnding = 1500; + if ( sqleMessage.length() > maxEnding ) + return (sqleMessage.substring(0, maxEnding) + "..."); + else + return sqleMessage; + } + + + private boolean closePreparedStatements(PreparedStatement preparedStatement1, PreparedStatement preparedStatement2, Connection con) { + try { + if ( preparedStatement1 != null ) + preparedStatement1.close(); + if ( preparedStatement2 != null ) + preparedStatement2.close(); + if ( con != null ) + con.close(); // It may have already closed and that's fine. + return true; + } + catch (SQLException sqle) { + logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); + return false; + } } @GetMapping("test") public ResponseEntity getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) { + logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT); + try { new FileUtils(); // Find the input file. } catch (Exception e) { logger.error(e.getMessage()); - return ResponseEntity.status(500).body("The resource file, for the requested assignments, was not found."); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The resource file, for the requested assignments, was not found."); } List assignments = new ArrayList<>(); HashMultimap loadedIdUrlPairs; boolean isFirstRun = true; boolean assignmentsLimitReached = false; - Date date = new Date(); + Date date = new Date(System.currentTimeMillis()); // Store it here, in order to have the same for all current records. // Start loading urls. while ( true ) { @@ -120,12 +570,13 @@ public class UrlController { } }// end loading-while-loop - if ( FileUtils.inputScanner.get() != null ) // Check if the initial value is null. - FileUtils.inputScanner.get().close(); + Scanner scanner = FileUtils.inputScanner.get(); + if ( scanner != null ) // Check if the initial value is null. + scanner.close(); - logger.info("Sending AssignmentResponse_" + assignmentCounter.incrementAndGet() + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId); + logger.info("Sending batch_" + assignmentsBatchCounter.incrementAndGet() + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId); - return ResponseEntity.status(200).header("Content-Type", "application/json").body(new AssignmentResponse(assignmentCounter.get(), assignments)); + return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentResponse(assignmentsBatchCounter.get(), assignments)); } } diff --git a/src/main/java/eu/openaire/urls_controller/util/ControllerConstants.java b/src/main/java/eu/openaire/urls_controller/util/ControllerConstants.java index 3ae2aca..ffe86c8 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ControllerConstants.java +++ b/src/main/java/eu/openaire/urls_controller/util/ControllerConstants.java @@ -3,6 +3,8 @@ package eu.openaire.urls_controller.util; public interface ControllerConstants { - int ASSIGNMENTS_LIMIT = 10000; // The general assignments-limit the Controller will get. If the worker cannot handle them, then the worker's limit will be applied. + int ASSIGNMENTS_LIMIT = 100_000; // The upper assignments-limit the Controller can handle. If the worker's limit is above this one, then the controller's limit is used. Otherwise, the worker's limit will be applied. + + int MAX_ATTEMPTS_PER_RECORD = 3; // The maximum times a record can be processed, if each of the previous times failed with a "couldRetry" Error-Class. } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index b65d501..04f472c 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -15,6 +15,22 @@ server.port = 1880 # Server api path server.servlet.context-path=/api +# Database + +spring.impala.url = jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/ +# Note: The "UseNativeQuery" does not work with the PreparedStatements! Also, the url does not work without the ending "/" +# The username and the password do not matter, since this app is always run in an pre-authenticated machine. + +spring.impala.driver-class-name = com.cloudera.impala.jdbc41.Driver + +spring.datasource.hikari.pool-name=ControllerPool +spring.datasource.hikari.maximumPoolSize=20 +spring.datasource.hikari.maxLifetime=1800000 +spring.datasource.hikari.minimumIdle=4 +spring.datasource.hikari.connectionTimeout=30000 +spring.datasource.hikari.idleTimeout=600000 + + # LOGGING LEVELS logging.level.root=INFO logging.level.org.springframework.web=INFO diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index c5b711e..a1972a0 100644 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -24,7 +24,7 @@ - + \ No newline at end of file