diff --git a/build.gradle b/build.gradle index d5c8dfb..bb8d274 100644 --- a/build.gradle +++ b/build.gradle @@ -49,10 +49,17 @@ dependencies { implementation 'io.minio:minio:8.3.5' // https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp - implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3' // This is required by the minio, as Spring uses a version which is not supported by minio. + implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3' + // This is required by the minio, as Spring uses a version which is not supported by minio. // https://mvnrepository.com/artifact/com.cloudera.impala/jdbc - implementation group: 'com.cloudera.impala', name: 'jdbc', version: '2.5.31' + implementation("com.cloudera.impala:jdbc:2.5.31") { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'org.apache.derby', module: 'derby' + exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all' + exclude group: 'log4j', module: 'log4j' + exclude group: 'log4j', module: 'apache-log4j-extras' + } testImplementation group: 'org.springframework.security', name: 'spring-security-test' testImplementation "org.springframework.boot:spring-boot-starter-test" diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/Application.java index 4b06804..1803e0b 100644 --- a/src/main/java/eu/openaire/urls_controller/Application.java +++ b/src/main/java/eu/openaire/urls_controller/Application.java @@ -1,8 +1,5 @@ package eu.openaire.urls_controller; - -import eu.openaire.urls_controller.configuration.ImpalaConnector; -import eu.openaire.urls_controller.util.S3ObjectStoreMinIO; import eu.openaire.urls_controller.util.UriBuilder; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; @@ -14,7 +11,6 @@ import org.springframework.web.cors.CorsConfiguration; import org.springframework.web.cors.CorsConfigurationSource; import org.springframework.web.cors.UrlBasedCorsConfigurationSource; -import javax.annotation.PreDestroy; import java.util.Arrays; import java.util.Collections; @@ -23,7 +19,6 @@ import java.util.Collections; public class Application { public static void main(String[] args) { - new S3ObjectStoreMinIO(); SpringApplication.run(Application.class, args); } @@ -38,20 +33,10 @@ public class Application { source.registerCorsConfiguration("/**", configuration); return source; } - - - @PreDestroy - public static void preDestroy() - { - if ( ImpalaConnector.hikariDataSource != null ) - ImpalaConnector.hikariDataSource.close(); - } - - - @Bean - public CommandLineRunner setServerBaseUrl(Environment environment) - { - return args -> new UriBuilder(environment); - } +// +// @Bean +// public CommandLineRunner setServerBaseUrl(Environment environment) { +// return args -> new UriBuilder(environment); +// } } \ No newline at end of file diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java index c0085e6..9d33dc4 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -1,255 +1,80 @@ package eu.openaire.urls_controller.configuration; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; -import java.beans.PropertyVetoException; -import java.io.File; -import java.io.FileReader; -import java.sql.*; -import java.util.Properties; +import javax.annotation.PostConstruct; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - +@Repository public final class ImpalaConnector { private static final Logger logger = LoggerFactory.getLogger(ImpalaConnector.class); - public static String impalaDriver; - public static String impalaConnectionUrl; - public static String poolName; - public static int hikariMaxConnectionPoolSize; - public static int hikariMinIdleConnections; - public static int hikariConnectionTimeOut; - public static int hikariIdleTimeOut; - public static int hikariMaxLifetime; + @Autowired + private JdbcTemplate jdbcTemplate; - public static String oldDatabaseName = "pdfaggregation_i"; - public static String databaseName = "pdfAggregationDatabase"; + @Value("services.pdfaggregation.controller.db.oldDatabaseName:pdfaggregation_i") + public static String oldDatabaseName; + @Value("services.pdfaggregation.controller.db.databaseName:pdfAggregationDatabase") + public static String databaseName; public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database. - public static HikariDataSource hikariDataSource; - - private static final ImpalaConnector singletonObject = new ImpalaConnector(); - - public static ImpalaConnector getInstance() - { - return singletonObject; - } - - - public ImpalaConnector() - { + @PostConstruct + public void init() { logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes."); try { - String dbSettingsPropertyFile = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "application.properties"; - FileReader fReader = new FileReader(dbSettingsPropertyFile); - Properties props = new Properties(); - props.load(fReader); // Load jdbc related properties. - - // Get each property value. - impalaDriver = props.getProperty("spring.impala.driver-class-name"); - if ( !"".equals(impalaDriver) ) { // If not "null" or empty. - Class.forName(impalaDriver); - impalaConnectionUrl = props.getProperty("spring.impala.url"); - poolName = props.getProperty("spring.datasource.hikari.pool-name"); - hikariMaxConnectionPoolSize = Integer.parseInt(props.getProperty("spring.datasource.hikari.maximumPoolSize")); - hikariMaxLifetime = Integer.parseInt(props.getProperty("spring.datasource.hikari.maxLifetime")); - hikariMinIdleConnections = Integer.parseInt(props.getProperty("spring.datasource.hikari.minimumIdle")); - hikariConnectionTimeOut = Integer.parseInt(props.getProperty("spring.datasource.hikari.connectionTimeout")); - hikariIdleTimeOut = Integer.parseInt(props.getProperty("spring.datasource.hikari.idleTimeout")); - } else - throw new RuntimeException("The \"impalaDriver\" was null or empty!"); - } catch(Exception e) { - String errorMsg = "Error when loading the database properties!\n" + e.getMessage(); - logger.error(errorMsg, e); - System.err.println(errorMsg); - System.exit(11); - } - - try { - hikariDataSource = impalaDS(); - } catch (SQLException | PropertyVetoException e) { - logger.error("Problem when creating the Hikari connection pool!", e); + if ( jdbcTemplate.getDataSource().getConnection().getMetaData().supportsBatchUpdates() ) + logger.warn("The database does not support \"BatchUpdates\"!"); + } catch (Exception e) { + logger.error("Error testing if database supports batch updates", e); } createDatabase(); } - - public HikariDataSource impalaDS() throws SQLException, PropertyVetoException - { - HikariConfig hikariConfig = new HikariConfig(); - hikariConfig.setDriverClassName(ImpalaConnector.impalaDriver); - hikariConfig.setAutoCommit(true); - hikariConfig.setJdbcUrl(ImpalaConnector.impalaConnectionUrl); - hikariConfig.setPoolName(poolName); - hikariConfig.setMaximumPoolSize(hikariMaxConnectionPoolSize); - hikariConfig.setMaxLifetime(hikariMaxLifetime); - hikariConfig.setMinimumIdle(hikariMinIdleConnections); - hikariConfig.setConnectionTimeout(hikariConnectionTimeOut); - hikariConfig.setIdleTimeout(hikariIdleTimeOut); - return new HikariDataSource(hikariConfig); - } - - - public void createDatabase() - { - Connection con = getConnection(); - if ( con == null ) - System.exit(22); - - try { - if ( !con.getMetaData().supportsBatchUpdates() ) - logger.warn("The database does not support \"BatchUpdates\"!"); - } catch (SQLException e) { - logger.error(e.getMessage(), e); - } - + private void createDatabase() { logger.info("Going to create the database and the tables, if they do not exist. Also will fill some tables with data from OpenAIRE."); - Statement statement = null; - try { - statement = con.createStatement(); - } catch (SQLException sqle) { - logger.error("Problem when creating a connection-statement!\n" + sqle.getMessage()); - ImpalaConnector.closeConnection(con); - System.exit(33); - } - try { - statement.execute("CREATE DATABASE IF NOT EXISTS " + databaseName); + jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + databaseName); - statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + oldDatabaseName + ".publication"); - statement.execute("COMPUTE STATS " + databaseName + ".publication"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + oldDatabaseName + ".publication"); + jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication"); - statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + oldDatabaseName + ".publication_pids"); - statement.execute("COMPUTE STATS " + databaseName + ".publication_pids"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + oldDatabaseName + ".publication_pids"); + jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_pids"); - statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + oldDatabaseName + ".publication_urls"); - statement.execute("COMPUTE STATS " + databaseName + ".publication_urls"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + oldDatabaseName + ".publication_urls"); + jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_urls"); - statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + oldDatabaseName + ".datasource"); - statement.execute("COMPUTE STATS " + databaseName + ".datasource"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + oldDatabaseName + ".datasource"); + jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".datasource"); - statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet"); - statement.execute("COMPUTE STATS " + databaseName + ".assignment"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet"); + jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".assignment"); - statement.execute("DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE"); + jdbcTemplate.execute("DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE"); - statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet"); - statement.execute("COMPUTE STATS " + databaseName + ".attempt"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet"); + jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt"); - statement.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet"); - statement.execute("COMPUTE STATS " + databaseName + ".payload"); - } catch (SQLException sqle) { - String errorMsg = "Problem when executing the \"create database and create tables queries!\n" + sqle.getMessage() + "\nSQL state: " + sqle.getSQLState() + "\nError code: " + sqle.getErrorCode(); - logger.error(errorMsg, sqle); - System.err.println(errorMsg); - System.exit(44); - } finally { - try { - statement.close(); - con.close(); - } catch (SQLException sqle2) { - logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage()); - } - } + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet"); + jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload"); logger.info("The database \"" + databaseName + "\" and its tables were created or validated."); } - - public Connection getConnection() - { - try { - return hikariDataSource.getConnection(); - //return DriverManager.getConnection(impalaConnectionUrl, null, null); // This is for non pooled connections. - } catch (SQLException sqle) { - logger.error("Problem when connecting with the Impala-database!\n" + sqle.getMessage()); - return null; - } - } - - - public boolean testDatabaseAccess() - { - logger.info("Going to test Impala access.."); - Connection con = getConnection(); - if ( con == null ) - return false; - - ResultSet res = null; - try { - String tableName = "publication"; - - // show tables - String sql = "show tables '" + tableName + "'"; - logger.debug("Running: " + sql); - res = con.prepareStatement(sql).executeQuery(); - if ( res.next() ) { - logger.debug(res.getString(1)); - } - - // describe table - sql = "describe " + tableName; - logger.debug("Running: " + sql); - res = con.prepareStatement(sql).executeQuery(); - while ( res.next() ) { - logger.debug(res.getString(1) + "\t" + res.getString(2)); - } - - // select * query - sql = "select * from " + tableName + " limit 3;"; - logger.debug("Running: " + sql); - res = con.prepareStatement(sql).executeQuery(); - while ( res.next() ) { - logger.debug(res.getString(1)); - } - - // Get Assignments, only for testing here. - //UrlController urlController = new UrlController(); - //ResponseEntity responseEntity = urlController.getUrls("worker_1", ControllerConstants.ASSIGNMENTS_LIMIT); - //logger.debug(responseEntity.toString()); - - } catch (SQLException sqle) { - logger.error(sqle.getMessage(), sqle); - return false; - } finally { - try { - if ( res != null ) - res.close(); - con.close(); - } catch (SQLException sqle) { - logger.error("Could not close the connection with the Impala-database.\n" + sqle); - } - } - return true; - } - - - public static boolean closeConnection(Connection con) { - try { - if ( con != null ) - con.close(); // It may have already closed and that's fine. - return true; - } catch (SQLException sqle) { - logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage()); - return false; - } - } - - - public static String handlePreparedStatementException(String queryName, String query, String preparedStatementName, PreparedStatement preparedStatement, Connection con, Exception e) + public static String handlePreparedStatementException(String queryName, String query, Exception e) { String errorMsg = "Problem when creating " + (( ! queryName.startsWith("get")) ? "and executing " : "") + "the prepared statement for \"" + queryName + "\"!\n"; - logger.error(errorMsg + "\n\n" + query + "\n\n" + e.getMessage(), e); - closeConnection(con); + logger.error(errorMsg + "\n\n" + query + "\n\n", e); return errorMsg; } - } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java b/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java index c405640..ba29794 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java @@ -1,58 +1,41 @@ package eu.openaire.urls_controller.controllers; -import eu.openaire.urls_controller.configuration.ImpalaConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import java.sql.Connection; -import java.sql.ResultSet; -import java.util.ArrayList; import java.util.List; +/** + * This controller will test the connectivity with the database and return statistics! + */ @RestController @RequestMapping("/impala") public class ImpalaController { - - // This controller will test the connectivity with the database and return statistics! - private static final Logger logger = LoggerFactory.getLogger(ImpalaController.class); + @Autowired + private JdbcTemplate jdbcTemplate; @GetMapping("get10PublicationIdsTest") public ResponseEntity get10PublicationIdsTest() { - Connection con = ImpalaConnector.getInstance().getConnection(); - if ( con == null ) - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!"); - String query = "SELECT id FROM publication LIMIT 10;"; - try ( ResultSet res = con.prepareStatement(query).executeQuery()) { - if ( !res.first() ) { - String errorMsg = "No results retrieved from the \"getAssignmentsQuery\"!"; - logger.error(errorMsg); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } - - List publications = new ArrayList<>(); - do { - publications.add(res.getString(0)); - } while ( res.next() ); - - return new ResponseEntity(publications.toString(), HttpStatus.OK); + try { + List publications = jdbcTemplate.queryForList(query, String.class); + return new ResponseEntity<>(publications.toString(), HttpStatus.OK); } catch (Exception e) { String errorMsg = "Problem when executing \"getAssignmentsQuery\": " + query; logger.error(errorMsg, e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } finally { - ImpalaConnector.closeConnection(con); } } - } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/TestController.java b/src/main/java/eu/openaire/urls_controller/controllers/TestController.java new file mode 100644 index 0000000..433931e --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/controllers/TestController.java @@ -0,0 +1,82 @@ +package eu.openaire.urls_controller.controllers; + +import com.google.common.collect.HashMultimap; +import eu.openaire.urls_controller.models.Assignment; +import eu.openaire.urls_controller.models.Datasource; +import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; +import eu.openaire.urls_controller.util.GenericUtils; +import eu.openaire.urls_controller.util.TestFileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.sql.Timestamp; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; + +@RestController +public class TestController extends GeneralController { + + private static final Logger logger = LoggerFactory.getLogger(TestController.class); + + @Autowired + private TestFileUtils fileUtils; + + @Value("services.pdfaggregation.controller.assignmentLimit") + private int assignmentLimit; + + private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint. + + @GetMapping("test") + public ResponseEntity getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) { + + logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + this.assignmentLimit); + + List assignments = new ArrayList<>(); + HashMultimap loadedIdUrlPairs; + boolean isFirstRun = true; + boolean assignmentsLimitReached = false; + Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records. + + // Start loading urls. + while ( true ) { + loadedIdUrlPairs = fileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile. + + if ( fileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on. + break; + else + isFirstRun = false; + + Set> pairs = loadedIdUrlPairs.entries(); + + for ( Map.Entry pair : pairs ) { + if ( assignments.size() >= workerAssignmentsLimit ) { + assignmentsLimitReached = true; + break; + } + + int randomNum = GenericUtils.getRandomNumber(1, 5); + assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp)); + }// end pairs-for-loop + + if ( assignmentsLimitReached ) { + logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached."); + break; + } + }// end loading-while-loop + + Scanner scanner = fileUtils.inputScanner.get(); + if ( scanner != null ) // Check if the initial value is null. + scanner.close(); + + long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet(); + logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + fileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId); + return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments)); + } +} diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index 59dff72..78a894b 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -1,23 +1,25 @@ package eu.openaire.urls_controller.controllers; -import com.google.common.collect.HashMultimap; import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.models.Error; import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.payloads.requests.WorkerReport; import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; -import eu.openaire.urls_controller.util.ControllerConstants; import eu.openaire.urls_controller.util.FileUtils; -import eu.openaire.urls_controller.util.GenericUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowCallbackHandler; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import java.sql.*; -import java.util.*; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -27,11 +29,20 @@ public class UrlController { private static final Logger logger = LoggerFactory.getLogger(UrlController.class); + @Autowired + private JdbcTemplate jdbcTemplate; + + @Autowired + private FileUtils fileUtils; + private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint. - private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*"); - private static int maxAttemptsPerRecord = ControllerConstants.MAX_ATTEMPTS_PER_RECORD; + @Value("services.pdfaggregation.controller.maxAttemptsPerRecord") + private int maxAttemptsPerRecord; + + @Value("services.pdfaggregation.controller.assignmentLimit") + private int assignmentLimit; @GetMapping("") public ResponseEntity getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) { @@ -43,7 +54,7 @@ public class UrlController { return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg); } - logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT); + logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit); // Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >. @@ -53,9 +64,9 @@ public class UrlController { String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!"; logger.error(errorMsg); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg); - } else if ( assignmentsLimit > ControllerConstants.ASSIGNMENTS_LIMIT ) { - logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + ControllerConstants.ASSIGNMENTS_LIMIT + "). Will use the Controller's limit."); - assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT; + } else if ( assignmentsLimit > assignmentLimit ) { + logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + assignmentLimit + "). Will use the Controller's limit."); + assignmentsLimit = assignmentLimit; } String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" + @@ -80,129 +91,74 @@ public class UrlController { String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment"; String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment"; - List assignments = new ArrayList<>(assignmentsLimit); + List assignments = new ArrayList<>(); ImpalaConnector.databaseLock.lock(); - Connection con = ImpalaConnector.getInstance().getConnection(); - if ( con == null ) { // This is already logged in "getConnection()". + + try { + jdbcTemplate.execute(createAssignmentsQuery); + } catch (Exception sqle) { ImpalaConnector.databaseLock.unlock(); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!"); } - // All transactions in Impala automatically commit at the end of the statement. Currently, Impala does not support multi-statement transactions. - // https://impala.apache.org/docs/build/html/topics/impala_transactions.html - // We cannot use "savePoints" along with "autoCommit = false" to roll back to a previous state among multiple statements. - - PreparedStatement createCurrentAssignmentsPreparedStatement = null; try { - createCurrentAssignmentsPreparedStatement = con.prepareStatement(createAssignmentsQuery); - // We cannot set the "limits" and the MAX_ATTEMPTS_PER_RECORD as preparedStatements parameters, as we get a "java.sql.SQLException: [Simba][JDBC](11420) Error, parameter metadata not populated." - createCurrentAssignmentsPreparedStatement.execute(); - } catch (SQLException sqle) { - ImpalaConnector.databaseLock.unlock(); - String errorMsg = ImpalaConnector.handlePreparedStatementException("createAssignmentsQuery", createAssignmentsQuery, "createCurrentAssignmentsPreparedStatement", createCurrentAssignmentsPreparedStatement, con, sqle); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } finally { - try { - if ( createCurrentAssignmentsPreparedStatement != null ) - createCurrentAssignmentsPreparedStatement.close(); - } catch (SQLException sqle2) { - logger.error("Failed to close the \"createCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage()); - } - } + jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery); + } catch (Exception sqle) { + String errorMsg = dropCurrentAssignmentTable(); - PreparedStatement computeCurrentAssignmentsStatsPreparedStatement = null; - try { - computeCurrentAssignmentsStatsPreparedStatement = con.prepareStatement(computeCurrentAssignmentsStatsQuery); - computeCurrentAssignmentsStatsPreparedStatement.execute(); - } catch (SQLException sqle) { - String errorMsg = dropCurrentAssignmentTable(con); if ( errorMsg != null ) // The "databaseLock" is already unlocked. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - ImpalaConnector.databaseLock.unlock(); - errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } finally { - try { - if ( computeCurrentAssignmentsStatsPreparedStatement != null ) - computeCurrentAssignmentsStatsPreparedStatement.close(); - } catch (SQLException sqle2) { - logger.error("Failed to close the \"computeCurrentAssignmentsStatsPreparedStatement\"!\n" + sqle2.getMessage()); - } - } - PreparedStatement getAssignmentsPreparedStatement = null; - try { - getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery); - } catch (SQLException sqle) { - String errorMsg = dropCurrentAssignmentTable(con); - if ( errorMsg != null ) // The "databaseLock" is already unlocked. - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); ImpalaConnector.databaseLock.unlock(); - errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle); - // The "getAssignmentsPreparedStatement" will always be null here, so we do not close it. + errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, sqle); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records. - try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) { - // Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it. - /*if ( !resultSet.first() ) { - ImpalaConnector.databaseLock.unlock(); - String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId; - logger.error(errorMsg); - ImpalaConnector.closeConnection(con); - return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg); - }*/ - - // The cursor is automatically before the first element in this configuration. - while ( resultSet.next() ) { // Move the cursor forward. - // If the resultsSet is empty, then the control will never get inside the loop. - // The following few lines, cannot be outside the "while" loop, since the same object is added, despite that we update the inner-values. - Assignment assignment = new Assignment(); - assignment.setWorkerId(workerId); - assignment.setTimestamp(timestamp); - Datasource datasource = new Datasource(); - try { // For each of the 4 columns returned. The indexing starts from 1 - assignment.setId(resultSet.getString(1)); - assignment.setOriginalUrl(resultSet.getString(2)); - datasource.setId(resultSet.getString(3)); - datasource.setName(resultSet.getString(4)); - } catch (SQLException sqle) { - logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow(), sqle); - continue; // This object is broken, move to the next row. + try { + jdbcTemplate.query(getAssignmentsQuery, new RowCallbackHandler() { + @Override + public void processRow(ResultSet rs) throws SQLException { + Assignment assignment = new Assignment(); + assignment.setWorkerId(workerId); + assignment.setTimestamp(timestamp); + Datasource datasource = new Datasource(); + try { // For each of the 4 columns returned. The indexing starts from 1 + assignment.setId(rs.getString(1)); + assignment.setOriginalUrl(rs.getString(2)); + datasource.setId(rs.getString(3)); + datasource.setName(rs.getString(4)); + } catch (SQLException sqle) { + logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle); + } + assignment.setDatasource(datasource); + assignments.add(assignment); } - assignment.setDatasource(datasource); - assignments.add(assignment); - } + }); } catch (Exception e) { - String errorMsg = dropCurrentAssignmentTable(con); + String errorMsg = dropCurrentAssignmentTable(); if ( errorMsg != null ) // The "databaseLock" is already unlocked. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + ImpalaConnector.databaseLock.unlock(); + errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n"; logger.error(errorMsg, e); - ImpalaConnector.closeConnection(con); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } finally { - try { - getAssignmentsPreparedStatement.close(); - } catch (SQLException sqle) { - logger.error("Failed to close the \"getAssignmentsPreparedStatement\"!\n" + sqle.getMessage()); - } } int assignmentsSize = assignments.size(); if ( assignmentsSize == 0 ) { - String errorMsg = dropCurrentAssignmentTable(con); + String errorMsg = dropCurrentAssignmentTable(); if ( errorMsg != null ) // The "databaseLock" is already unlocked. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); ImpalaConnector.databaseLock.unlock(); maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests. errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests."; logger.error(errorMsg); - ImpalaConnector.closeConnection(con); + return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg); } else if ( assignmentsSize < assignmentsLimit ) { maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests. @@ -218,48 +174,37 @@ public class UrlController { String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n" + "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data"; - PreparedStatement insertAssignmentsPreparedStatement = null; try { - insertAssignmentsPreparedStatement = con.prepareStatement(insertAssignmentsQuery); - insertAssignmentsPreparedStatement.execute(); - } catch (SQLException sqle) { - String errorMsg = dropCurrentAssignmentTable(con); + jdbcTemplate.execute(insertAssignmentsQuery); + } catch (Exception sqle) { + String errorMsg = dropCurrentAssignmentTable(); if ( errorMsg != null ) // The "databaseLock" is already unlocked. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); ImpalaConnector.databaseLock.unlock(); - errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle); + errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, sqle); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } finally { - try { - if ( insertAssignmentsPreparedStatement != null ) - insertAssignmentsPreparedStatement.close(); - } catch (SQLException sqle2) { - logger.error("Failed to close the \"insertAssignmentsPreparedStatement\"!\n" + sqle2.getMessage()); - } } - String errorMsg = dropCurrentAssignmentTable(con); + String errorMsg = dropCurrentAssignmentTable(); if ( errorMsg != null ) // The "databaseLock" is already unlocked. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table."); - String mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, "", null); + String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - ImpalaConnector.closeConnection(con); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); } ImpalaConnector.databaseLock.unlock(); - ImpalaConnector.closeConnection(con); long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet(); logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + "."); return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments)); } - @PostMapping("addWorkerReport") public ResponseEntity addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) { @@ -294,52 +239,22 @@ public class UrlController { logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database."); // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". - FileUtils.UploadFullTextsResponse uploadFullTextsResponse = FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId); + FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId); if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!"); } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments); // The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads. - FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports); + fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports); } ImpalaConnector.databaseLock.lock(); - Connection con = ImpalaConnector.getInstance().getConnection(); - if ( con == null ) { - ImpalaConnector.databaseLock.unlock(); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!"); - } // Store the workerReport into the database. String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; - String tempInsertQueryName = null; - PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null; - try { - tempInsertQueryName = "insertIntoPayloadBaseQuery"; - preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery); - tempInsertQueryName = "insertIntoAttemptBaseQuery"; - preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery); - } catch (SQLException sqle) { - ImpalaConnector.databaseLock.unlock(); - String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n"; - logger.error(errorMsg + sqle.getMessage()); - closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } - - try { - con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end. - } catch (SQLException sqle) { - ImpalaConnector.databaseLock.unlock(); - String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n"; - logger.error(errorMsg + sqle.getMessage()); - closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } - String payloadErrorMsg = null; int failedCount = 0; @@ -349,32 +264,27 @@ public class UrlController { for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); + if ( payload == null ) { - logger.error("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments); + logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments); payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!"; + continue; } try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons. - preparedInsertPayloadStatement.setString(1, payload.getId()); - preparedInsertPayloadStatement.setString(2, payload.getOriginal_url()); - preparedInsertPayloadStatement.setString(3, payload.getActual_url()); - preparedInsertPayloadStatement.setTimestamp(4, payload.getTimestamp_acquired()); - preparedInsertPayloadStatement.setString(5, payload.getMime_type()); + Object[] args = new Object[] { + payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(), + payload.getMime_type(), payload.getSize() != null?String.valueOf(payload.getSize()):null, payload.getHash(), + payload.getLocation(), payload.getProvenance()}; + int[] argTypes = new int[] { + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; - // The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers. - String sizeStr = null; - Long size = payload.getSize(); - if ( size != null ) - sizeStr = String.valueOf(size); + jdbcTemplate.update(insertIntoPayloadBaseQuery, args, argTypes); - preparedInsertPayloadStatement.setString(6, sizeStr); - preparedInsertPayloadStatement.setString(7, payload.getHash()); - preparedInsertPayloadStatement.setString(8, payload.getLocation()); - preparedInsertPayloadStatement.setString(9, payload.getProvenance()); - preparedInsertPayloadStatement.executeUpdate(); - } catch (SQLException sqle) { - logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + sqle.getMessage() + "\n\n"); + } catch (Exception sqle) { + logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": ", sqle); } Error error = urlReport.getError(); @@ -384,81 +294,58 @@ public class UrlController { } try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons. - preparedInsertAttemptStatement.setString(1, payload.getId()); - preparedInsertAttemptStatement.setString(2, payload.getOriginal_url()); - preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired()); - preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString()); - preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null" too. - preparedInsertAttemptStatement.setString(6, error.getMessage()); - preparedInsertAttemptStatement.executeUpdate(); - } catch (SQLException sqle) { - logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage() + "\n\n"); + Object[] args = new Object[] { + payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(), + urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()}; + int[] argTypes = new int[] { + Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR}; + + jdbcTemplate.update(insertIntoAttemptBaseQuery, args, argTypes); + } catch (Exception sqle) { + logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": ", sqle.getMessage()); } }//end for-loop - try { - con.commit(); // Commit all the insert-queries to the database (write them to disk). - } catch (SQLException sqle) { - ImpalaConnector.databaseLock.unlock(); - ImpalaConnector.closeConnection(con); - String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!"; - logger.error(errorMsg + "\n" + sqle.getMessage()); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } finally { - closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here, as we might move forward. - } if ( payloadErrorMsg != null ) logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables, although " + payloadErrorMsg + " Going to merge the parquet files for those tables."); else logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables."); - String mergeErrorMsg = FileUtils.mergeParquetFiles("payload", con, "", null); + String mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - ImpalaConnector.closeConnection(con); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); } - mergeErrorMsg = FileUtils.mergeParquetFiles("attempt", con, "", null); + mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - ImpalaConnector.closeConnection(con); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); } // This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data. // Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table. // We do not need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks. - mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, " WHERE workerid != ", curWorkerId); + mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - ImpalaConnector.closeConnection(con); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); } - try { - con.commit(); // Apply the merges permanently (write them to disk). - con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool. - } catch (SQLException sqle) { - String errorMsg = "Problem when committing changes to the database!"; - logger.error(errorMsg + "\n" + sqle.getMessage()); - // The statements used in "mergeParquetFiles()" are already closed. - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } finally { - ImpalaConnector.databaseLock.unlock(); - ImpalaConnector.closeConnection(con); - } + ImpalaConnector.databaseLock.unlock(); logger.debug("Finished merging the database tables."); return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg); } - // The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution. // Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested. - private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException - { + private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException { StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use. sb.append(baseInsertQuery); for ( int i=1; i <= dataSize; ++i ) { @@ -484,98 +371,15 @@ public class UrlController { return preparedInsertStatement; } - - private boolean closeStatements(Statement statement1, Statement statement2, Connection con) { - try { - if ( statement1 != null ) - statement1.close(); - if ( statement2 != null ) - statement2.close(); - if ( con != null ) - con.close(); // It may have already closed and that's fine. - return true; - } catch (SQLException sqle) { - logger.error("Could not close the statements or the connection with the Impala-database.\n" + sqle.getMessage()); - return false; - } - } - - - @GetMapping("test") - public ResponseEntity getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) { - - logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT); - - try { - new FileUtils(); // Find the input file. - } catch (Exception e) { - logger.error(e.getMessage()); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The resource file, for the requested assignments, was not found."); - } - - List assignments = new ArrayList<>(); - HashMultimap loadedIdUrlPairs; - boolean isFirstRun = true; - boolean assignmentsLimitReached = false; - Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records. - - // Start loading urls. - while ( true ) { - loadedIdUrlPairs = FileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile. - - if ( FileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on. - break; - else - isFirstRun = false; - - Set> pairs = loadedIdUrlPairs.entries(); - - for ( Map.Entry pair : pairs ) - { - if ( assignments.size() >= workerAssignmentsLimit ) { - assignmentsLimitReached = true; - break; - } - - int randomNum = GenericUtils.getRandomNumber(1, 5); - assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp)); - }// end pairs-for-loop - - if ( assignmentsLimitReached ) { - logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached."); - break; - } - }// end loading-while-loop - - Scanner scanner = FileUtils.inputScanner.get(); - if ( scanner != null ) // Check if the initial value is null. - scanner.close(); - - long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet(); - logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId); - return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments)); - } - - - private String dropCurrentAssignmentTable(Connection con) - { + private String dropCurrentAssignmentTable() { String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector.databaseName + ".current_assignment PURGE"; - PreparedStatement dropCurrentAssignmentsPreparedStatement = null; + try { - dropCurrentAssignmentsPreparedStatement = con.prepareStatement(dropCurrentAssignmentsQuery); - dropCurrentAssignmentsPreparedStatement.execute(); + jdbcTemplate.execute(dropCurrentAssignmentsQuery); return null; - } catch (SQLException sqle) { + } catch (Exception sqle) { ImpalaConnector.databaseLock.unlock(); - return ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, "dropCurrentAssignmentsPreparedStatement", dropCurrentAssignmentsPreparedStatement, con, sqle); - } finally { - try { - if ( dropCurrentAssignmentsPreparedStatement != null ) - dropCurrentAssignmentsPreparedStatement.close(); - } catch (SQLException sqle2) { - logger.error("Failed to close the \"dropCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage()); - } + return ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, sqle); } } - } diff --git a/src/main/java/eu/openaire/urls_controller/util/ControllerConstants.java b/src/main/java/eu/openaire/urls_controller/util/ControllerConstants.java deleted file mode 100644 index ffe86c8..0000000 --- a/src/main/java/eu/openaire/urls_controller/util/ControllerConstants.java +++ /dev/null @@ -1,10 +0,0 @@ -package eu.openaire.urls_controller.util; - - -public interface ControllerConstants { - - int ASSIGNMENTS_LIMIT = 100_000; // The upper assignments-limit the Controller can handle. If the worker's limit is above this one, then the controller's limit is used. Otherwise, the worker's limit will be applied. - - int MAX_ATTEMPTS_PER_RECORD = 3; // The maximum times a record can be processed, if each of the previous times failed with a "couldRetry" Error-Class. - -} diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java b/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java index ed88cda..287ce6a 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java @@ -2,6 +2,7 @@ package eu.openaire.urls_controller.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import java.io.File; import java.io.FileInputStream; @@ -12,20 +13,16 @@ import java.nio.file.StandardCopyOption; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; - +@Component public class FileUnZipper { private static final Logger logger = LoggerFactory.getLogger(FileUnZipper.class); - - public static void unzipFolder(Path source, Path target) throws Exception - { - try ( ZipInputStream zis = new ZipInputStream(new FileInputStream(source.toFile())) ) - { + public void unzipFolder(Path source, Path target) throws Exception { + try ( ZipInputStream zis = new ZipInputStream(new FileInputStream(source.toFile())) ) { // Iterate over the files in zip and un-zip them. ZipEntry zipEntry = zis.getNextEntry(); - while ( zipEntry != null ) - { + while ( zipEntry != null ) { Path targetPath = zipSlipProtect(zipEntry, target); if ( zipEntry.getName().endsWith(File.separator) ) // If we have a directory. @@ -44,10 +41,8 @@ public class FileUnZipper { } } - // Protect from a Zip Slip attack: https://snyk.io/research/zip-slip-vulnerability - public static Path zipSlipProtect(ZipEntry zipEntry, Path targetDir) throws IOException - { + public Path zipSlipProtect(ZipEntry zipEntry, Path targetDir) throws IOException { Path targetDirResolved = targetDir.resolve(zipEntry.getName()); // Make sure normalized file still has targetDir as its prefix, else throw an exception. Path normalizePath = targetDirResolved.normalize(); @@ -56,5 +51,4 @@ public class FileUnZipper { } return normalizePath; } - -} +} \ No newline at end of file diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 01f66d8..8320524 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -3,12 +3,15 @@ package eu.openaire.urls_controller.util; import com.google.common.collect.HashMultimap; import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.models.Payload; -import eu.openaire.urls_controller.models.Task; import eu.openaire.urls_controller.models.UrlReport; +import org.codehaus.groovy.syntax.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.boot.configurationprocessor.json.JSONException; -import org.springframework.boot.configurationprocessor.json.JSONObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; import javax.servlet.http.HttpServletRequest; import java.io.*; @@ -17,51 +20,38 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.sql.*; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +@Component public class FileUtils { private static final Logger logger = LoggerFactory.getLogger(FileUtils.class); - public static ThreadLocal inputScanner = new ThreadLocal(); // Every Thread has its own variable. - private static final ThreadLocal fileIndex = new ThreadLocal(); - private static final ThreadLocal unretrievableInputLines = new ThreadLocal(); - public static ThreadLocal duplicateIdUrlEntries = new ThreadLocal(); - public static final int jsonBatchSize = 3000; - private static final String utf8Charset = "UTF-8"; - public static String inputFileFullPath; - private static final String workingDir = System.getProperty("user.dir") + File.separator; + @Autowired + private JdbcTemplate jdbcTemplate; + @Autowired + private S3ObjectStore s3ObjectStore; - public FileUtils() throws RuntimeException - { - inputFileFullPath = workingDir + "src" + File.separator + "main" + File.separator + "resources"; - String resourceFileName = "testInputFiles" + File.separator + "orderedList1000.json"; - inputFileFullPath += File.separator + resourceFileName; - InputStream inputStream = getClass().getClassLoader().getResourceAsStream(resourceFileName); - if ( inputStream == null ) - throw new RuntimeException("No resourceFile was found with name \"" + resourceFileName + "\"."); + @Autowired + private FileUnZipper fileUnZipper; - logger.debug("Going to retrieve the data from the inputResourceFile: " + resourceFileName); + public enum UploadFullTextsResponse {successful, unsuccessful, databaseError} - FileUtils.inputScanner.set(new Scanner(inputStream, utf8Charset)); - fileIndex.set(0); // Re-initialize the file-number-pointer. - unretrievableInputLines.set(0); - duplicateIdUrlEntries.set(0); + public FileUtils() throws RuntimeException { } - /** * In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files.. * This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table. * Renames the clone to the original's name. * Returns the errorMsg, if an error appears, otherwise is returns "null". * */ - public static String mergeParquetFiles(String tableName, Connection con, String whereClause, String parameter) - { + public String mergeParquetFiles(String tableName, String whereClause, String parameter) { String errorMsg; if ( tableName == null ) { errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!"; @@ -78,42 +68,29 @@ public class FileUtils { else parameter = " '" + parameter + "'"; // This will be a "string-check". - Statement statement; try { - statement = con.createStatement(); - } catch (SQLException sqle) { - errorMsg = "Problem when creating a connection-statement!\n"; - logger.error(errorMsg + sqle.getMessage()); - return errorMsg; - } - - try { - statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter); - statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE"); - statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName); - statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName); - } catch (SQLException sqle) { + jdbcTemplate.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter); + jdbcTemplate.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE"); + jdbcTemplate.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName); + jdbcTemplate.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName); + } catch (DataAccessException e) { errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n"; - logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage()), sqle); + logger.error(errorMsg, e); return errorMsg; - } finally { - // Make sure we close the statement. - try { statement.close(); } - catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); } } return null; // No errorMsg, everything is fine. } + private final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$"); + private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$"); - public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}; - private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$"); - private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$"); - public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator; - private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames). + @Value("services.pdfaggregation.controller.baseTargetLocation") + private String baseTargetLocation; - public static UploadFullTextsResponse getAndUploadFullTexts(List urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) - { + private final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames). + + public UploadFullTextsResponse getAndUploadFullTexts(List urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) { // The Controller have to request the files from the Worker, in order to upload them to the S3. // We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database. @@ -126,30 +103,15 @@ public class FileUtils { remoteAddr = request.getRemoteAddr(); ImpalaConnector.databaseLock.lock(); - Connection con = ImpalaConnector.getInstance().getConnection(); - if ( con == null ) { - ImpalaConnector.databaseLock.unlock(); - logger.error("Problem when creating the Impala-connection!"); - return UploadFullTextsResponse.databaseError; - } String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ; - PreparedStatement getFileLocationForHashPreparedStatement = null; - try { - getFileLocationForHashPreparedStatement = con.prepareStatement(getFileLocationForHashQuery); - } catch (SQLException sqle) { - ImpalaConnector.databaseLock.unlock(); - logger.error("Problem when creating the prepared statement for \"" + getFileLocationForHashQuery + "\"!\n" + sqle.getMessage()); - return UploadFullTextsResponse.databaseError; - } // Get the file-locations. int numFullTextUrlsFound = 0; int numFilesFoundFromPreviousAssignmentsBatches = 0; HashMultimap allFileNamesWithIDsHashMap = HashMultimap.create((urlReports.size() / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. - for ( UrlReport urlReport : urlReports ) - { + for ( UrlReport urlReport : urlReports ) { UrlReport.StatusType statusType = urlReport.getStatus(); if ( (statusType == null) || statusType.equals(UrlReport.StatusType.non_accessible) ) { continue; @@ -160,7 +122,7 @@ public class FileUtils { if ( payload == null ) continue; - String fileLocation = null; + String fileLocation; // Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH. // If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker. @@ -168,32 +130,24 @@ public class FileUtils { // Use the same prepared-statement for all requests, to improve speed (just like when inserting similar thing to the DB). String fileHash = payload.getHash(); if ( fileHash != null ) { - try { - getFileLocationForHashPreparedStatement.setString(1, fileHash); - } catch (SQLException sqle) { - logger.error("Error when setting the parameter in \"getFileLocationForHashQuery\"!\n" + sqle.getMessage()); + + fileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, new int[] {Types.STRING}, String.class); + + if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded. + payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical. + + //logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG! + numFilesFoundFromPreviousAssignmentsBatches ++; + + continue; // Do not request the file from the worker, it's already uploaded. Move on. } - try ( ResultSet resultSet = getFileLocationForHashPreparedStatement.executeQuery() ) { - if ( resultSet.next() ) { // Move the "cursor" to the first row. If there is any data, then take the first result (there should not be more, but we still want the first anyway). - fileLocation = resultSet.getString(1); - if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded. - payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical. - //logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG! - numFilesFoundFromPreviousAssignmentsBatches ++; - continue; // Do not request the file from the worker, it's already uploaded. Move on. - } - } - } catch (Exception e) { - logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage()); - - // TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database?? - // TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error). - // TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection. - } + // TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database?? + // TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error). + // TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection. } - // If the full-text of this record was not found by a previous batch.. + // If the full-text of this record was not found by a previous batch... fileLocation = payload.getLocation(); if ( fileLocation != null ) { // If the docFile was downloaded (without an error).. Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation); @@ -209,16 +163,7 @@ public class FileUtils { } } - // Close the Prepared Statement. - try { - if ( getFileLocationForHashPreparedStatement != null ) - getFileLocationForHashPreparedStatement.close(); - } catch (SQLException sqle) { - logger.error("Failed to close the \"getFileLocationForHashPreparedStatement\"!\n" + sqle.getMessage()); - } finally { - ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database. - ImpalaConnector.closeConnection(con); - } + ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database. logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ")."); logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches); @@ -252,8 +197,7 @@ public class FileUtils { File curAssignmentsBaseDir = new File(curAssignmentsBaseLocation); int failedBatches = 0; - for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) - { + for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) { List fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter); HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId); if ( conn == null ) { @@ -280,7 +224,7 @@ public class FileUtils { //logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG! - FileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath); + fileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath); String[] fileNames = new File(targetDirectory).list(); if ( (fileNames == null) || (fileNames.length <= 1 ) ) { // The directory might have only one file, the "zip-file". @@ -311,7 +255,7 @@ public class FileUtils { // At this point, we know that this file is related with one or more IDs of the payloads AND it has a valid fileName. // Let's try to upload the file to S3 and update the payloads of all related IDs, either in successful upload or not. - String s3Url = S3ObjectStoreMinIO.uploadToS3(fileName, fileFullPath); + String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath); if ( s3Url != null ) { setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url); // It checks weather (s3Url != null) and acts accordingly. numUploadedFiles ++; @@ -342,9 +286,7 @@ public class FileUtils { } } - - private static HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) - { + private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) { baseUrl += batchNum + "/"; String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch); logger.info("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]"); @@ -366,9 +308,7 @@ public class FileUtils { return conn; } - - private static String getErrorMessageFromResponseBody(HttpURLConnection conn) - { + private String getErrorMessageFromResponseBody(HttpURLConnection conn) { StringBuilder errorMsgStrB = new StringBuilder(500); try ( BufferedReader br = new BufferedReader(new InputStreamReader(conn.getErrorStream())) ) { // Try-with-resources String inputLine; @@ -387,9 +327,7 @@ public class FileUtils { } } - - private static List getFileNamesForBatch(List allFileNames, int numAllFullTexts, int curBatch) - { + private List getFileNamesForBatch(List allFileNames, int numAllFullTexts, int curBatch) { int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch); int endingIndex = (curBatch * numOfFullTextsPerBatch); if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small. @@ -406,12 +344,9 @@ public class FileUtils { return fileNamesOfCurBatch; } + private String getRequestUrlForBatch(String baseUrl, List fileNamesForCurBatch) { + final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50); - private static final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50); - // TODO - Make it THREAD-LOCAL, if we move to multi-thread batch requests. - - private static String getRequestUrlForBatch(String baseUrl, List fileNamesForCurBatch) - { sb.append(baseUrl); int numFullTextsCurBatch = fileNamesForCurBatch.size(); for ( int j=0; j < numFullTextsCurBatch; ++j ){ @@ -420,14 +355,13 @@ public class FileUtils { sb.append(","); } String requestUrl = sb.toString(); - sb.setLength(0); // Reset for the next batch. + return requestUrl; } + private final int bufferSize = 20971520; // 20 MB - private static final int bufferSize = 20971520; // 20 MB - public static boolean saveZipFile(HttpURLConnection conn, File zipFile) - { + public boolean saveZipFile(HttpURLConnection conn, File zipFile) { InputStream inStream = null; FileOutputStream outStream = null; try { @@ -454,9 +388,7 @@ public class FileUtils { } } - - private static boolean isFileNameProblematic(String fileName, HashMultimap payloadsHashMultimap) - { + private boolean isFileNameProblematic(String fileName, HashMultimap payloadsHashMultimap) { // Get the ID of the file. Matcher matcher = FILENAME_ID.matcher(fileName); if ( !matcher.matches() ) { @@ -492,16 +424,13 @@ public class FileUtils { return true; } - /** * This method updates the UrlReports to not point to any downloaded fullText files. * This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails. * Then, we don't want any "links" to locally stored files, which will be deleted. * @param urlReports - * @return */ - public static void updateUrlReportsToHaveNoFullTextFiles(List urlReports) - { + public void updateUrlReportsToHaveNoFullTextFiles(List urlReports) { for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); if ( payload != null ) @@ -509,22 +438,18 @@ public class FileUtils { } } - - private static void replaceNotUploadedFileLocations(List urlReports) - { + private void replaceNotUploadedFileLocations(List urlReports) { for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); if ( payload != null ) { String fileLocation = payload.getLocation(); - if ( (fileLocation != null) && (! fileLocation.startsWith(S3ObjectStoreMinIO.endpoint)) ) + if ( (fileLocation != null) && (! s3ObjectStore.locationInStore(fileLocation)) ) setUnretrievedFullText(payload); } } } - - public static void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMultimap payloadsHashMultimap, List fileNames) - { + public void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMultimap payloadsHashMultimap, List fileNames) { for ( String fileName : fileNames ) { // Get the ID of the file. Matcher matcher = FILENAME_ID.matcher(fileName); @@ -543,9 +468,7 @@ public class FileUtils { } } - - public static void setUnretrievedFullText(Payload payload) - { + public void setUnretrievedFullText(Payload payload) { // Mark the full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept. payload.setLocation(null); payload.setHash(null); @@ -553,15 +476,13 @@ public class FileUtils { payload.setSize(null); } - /** * Set the fileLocation for all those IDs related to the File. The IDs may have one or more payloads. * @param fileIDs * @param payloadsHashMultimap * @param s3Url */ - public static void setFullTextForMultipleIDs(Set fileIDs, HashMultimap payloadsHashMultimap, String s3Url) - { + public void setFullTextForMultipleIDs(Set fileIDs, HashMultimap payloadsHashMultimap, String s3Url) { for ( String id : fileIDs ) { Set payloads = payloadsHashMultimap.get(id); if ( payloads.isEmpty() ) { @@ -575,8 +496,7 @@ public class FileUtils { } } - - public static boolean deleteDirectory(File curBatchDir) { + public boolean deleteDirectory(File curBatchDir) { try { org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir); return true; @@ -585,136 +505,4 @@ public class FileUtils { return false; } } - - - private static String getCutBatchExceptionMessage(String sqleMessage) - { - // The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging. - int maxEnding = 1500; - if ( sqleMessage.length() > maxEnding ) - return (sqleMessage.substring(0, maxEnding) + "..."); - else - return sqleMessage; - } - - - // This is currently not used, but it may be useful in a future scenario. - private static long getInputFileLinesNum() - { - long numOfLines = 0; - try { - numOfLines = Files.lines(Paths.get(inputFileFullPath)).count(); - logger.debug("The numOfLines in the inputFile is " + numOfLines); - } catch (IOException e) { - logger.error("Could not retrieve the numOfLines. " + e); - return -1; - } - return numOfLines; - } - - - /** - * This method decodes a Json String and returns its members. - * @param jsonLine String - * @return HashMap - */ - public static Task jsonDecoder(String jsonLine) - { - // Get ID and url and put them in the HashMap - String idStr = null; - String urlStr = null; - try { - JSONObject jObj = new JSONObject(jsonLine); // Construct a JSONObject from the retrieved jsonLine. - idStr = jObj.get("id").toString(); - urlStr = jObj.get("url").toString(); - } catch (JSONException je) { - logger.warn("JSONException caught when tried to parse and extract values from jsonLine: \t" + jsonLine, je); - return null; - } - - if ( urlStr.isEmpty() ) { - if ( !idStr.isEmpty() ) // If we only have the id, then go and log it. - logger.warn("The url was not found for id: \"" + idStr + "\""); - return null; - } - return new Task(idStr, urlStr, null); - } - - - /** - * This method parses a Json file and extracts the urls, along with the IDs. - * @return HashMultimap - */ - public static HashMultimap getNextIdUrlPairBatchFromJson() - { - Task inputIdUrlTuple; - int expectedPathsPerID = 5; - int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID; - - HashMultimap idAndUrlMappedInput = HashMultimap.create(expectedIDsPerBatch, expectedPathsPerID); - - int curBeginning = fileIndex.get(); - - while ( inputScanner.get().hasNextLine() && (fileIndex.get() < (curBeginning + jsonBatchSize)) ) - {// While (!EOF) and inside the current url-batch, iterate through lines. - - //logger.debug("fileIndex: " + FileUtils.fileIndex.get()); // DEBUG! - - // Take each line, remove potential double quotes. - String retrievedLineStr = inputScanner.get().nextLine(); - //logger.debug("Loaded from inputFile: " + retrievedLineStr); // DEBUG! - - fileIndex.set(fileIndex.get() +1); - - if ( retrievedLineStr.isEmpty() ) { - unretrievableInputLines.set(unretrievableInputLines.get() +1); - continue; - } - - if ( (inputIdUrlTuple = jsonDecoder(retrievedLineStr)) == null ) { // Decode the jsonLine and take the two attributes. - logger.warn("A problematic inputLine found: \t" + retrievedLineStr); - unretrievableInputLines.set(unretrievableInputLines.get() +1); - continue; - } - - if ( !idAndUrlMappedInput.put(inputIdUrlTuple.getId(), inputIdUrlTuple.getUrl()) ) { // We have a duplicate url in the input.. log it here as we cannot pass it through the HashMultimap. It's possible that this as well as the original might be/give a docUrl. - duplicateIdUrlEntries.set(duplicateIdUrlEntries.get() +1); - } - } - - return idAndUrlMappedInput; - } - - - /** - * This method returns the number of (non-heading, non-empty) lines we have read from the inputFile. - * @return loadedUrls - */ - public static int getCurrentlyLoadedUrls() // In the end, it gives the total number of urls we have processed. - { - return FileUtils.fileIndex.get() - FileUtils.unretrievableInputLines.get(); - } - - - /** - * This method checks if there is no more input-data and returns true in that case. - * Otherwise, it returns false, if there is more input-data to be loaded. - * A "RuntimeException" is thrown if no input-urls were retrieved in general. - * @param isEmptyOfData - * @param isFirstRun - * @return finished loading / not finished - * @throws RuntimeException - */ - public static boolean isFinishedLoading(boolean isEmptyOfData, boolean isFirstRun) - { - if ( isEmptyOfData ) { - if ( isFirstRun ) - logger.error("Could not retrieve any urls from the inputFile!"); - else - logger.debug("Done loading " + FileUtils.getCurrentlyLoadedUrls() + " urls from the inputFile."); - return true; - } - return false; - } - -} +} \ No newline at end of file diff --git a/src/main/java/eu/openaire/urls_controller/util/S3ObjectStore.java b/src/main/java/eu/openaire/urls_controller/util/S3ObjectStore.java new file mode 100644 index 0000000..c62c41d --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/util/S3ObjectStore.java @@ -0,0 +1,141 @@ +package eu.openaire.urls_controller.util; + +import io.minio.*; +import io.minio.messages.Bucket; +import io.minio.messages.Item; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Component +public class S3ObjectStore { + + private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class); + + @Value("services.pdfaggregation.controller.s3.endpoint") + private String endpoint = null; // This is useful to be "public", to test file-locations. + @Value("services.pdfaggregation.controller.s3.accessKey") + private String accessKey = null; + @Value("services.pdfaggregation.controller.s3.secretKey") + private String secretKey = null; + @Value("services.pdfaggregation.controller.s3.region") + private String region = null; + @Value("services.pdfaggregation.controller.s3.bucketName") + private String bucketName = null; + + @Value("services.pdfaggregation.controller.s3.shouldEmptyBucket") + private boolean shouldEmptyBucket = false; // Set true only for testing! + @Value("services.pdfaggregation.controller.s3.shouldShowAllS3Buckets") + private boolean shouldShowAllS3Buckets = false; + + private MinioClient minioClient; + + @PostConstruct + public void init() throws Exception { + this.minioClient = MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).region(region).build(); + + boolean bucketExists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build()); + + // Keep this commented-out to avoid objects-deletion by accident. The code is open-sourced, so it's easy to enable this ability if we really want it (e.g. for testing). + if ( bucketExists && shouldEmptyBucket ) { + emptyBucket(bucketName, false); + //throw new RuntimeException("stop just for test!"); + } + + // Make the bucket, if not exist. + if ( !bucketExists ) { + logger.info("Bucket \"" + bucketName + "\" does not exist! Going to create it.."); + minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build()); + } else + logger.debug("Bucket \"" + bucketName + "\" already exists."); + + if ( shouldShowAllS3Buckets ) { + List buckets = null; + try { + buckets = minioClient.listBuckets(); + logger.debug("The buckets in the S3 ObjectStore are:"); + for ( Bucket bucket : buckets ) { + logger.debug(bucket.name()); + } + } catch (Exception e) { + logger.warn("Could not listBuckets: " + e.getMessage()); + } + } + } + + private final Pattern EXTENSION_PATTERN = Pattern.compile("(\\.[^.]+)$"); + + /** + * @param fileObjKeyName = "**File object key name**"; + * @param fileFullPath = "**Path of the file to upload**"; + * @return the url of the uploaded file + */ + public String uploadToS3(String fileObjKeyName, String fileFullPath) throws Exception { + String contentType = null; + + // Take the Matcher to retrieve the extension. + Matcher extensionMatcher = EXTENSION_PATTERN.matcher(fileFullPath); + if ( extensionMatcher.find() ) { + String extension = null; + if ( (extension = extensionMatcher.group(0)) == null ) + contentType = "application/pdf"; + else { + if ( extension.equals("pdf") ) + contentType = "application/pdf"; + /*else if ( *//* TODO - other-extension-match *//* ) + contentType = "application/pdf"; */ + else + contentType = "application/pdf"; + } + } else { + logger.warn("The file with key \"" + fileObjKeyName + "\" does not have a file-extension! Setting the \"pdf\"-mimeType."); + contentType = "application/pdf"; + } + + minioClient.uploadObject(UploadObjectArgs.builder() + .bucket(bucketName) + .object(fileObjKeyName).filename(fileFullPath) + .contentType(contentType).build()); + + // TODO - What if the fileObjKeyName already exists? + // Right now it gets overwritten (unless we add versioning, which is irrelevant for different objects..) + + String s3Url = endpoint + "/" + bucketName + "/" + fileObjKeyName; // Be aware: This url works only if the access to the bucket is public. + //logger.debug("Uploaded file \"" + fileObjKeyName + "\". The s3Url is: " + s3Url); + return s3Url; + } + + public void emptyBucket(String bucketName, boolean shouldDeleteBucket) throws Exception { + logger.warn("Going to " + (shouldDeleteBucket ? "delete" : "empty") + " bucket \"" + bucketName + "\""); + + // First list the objects of the bucket. + Iterable> results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build()); + + // Then, delete the objects. + for ( Result resultItem : results ) + try { + deleteFile(resultItem.get().objectName(), bucketName); + } catch (Exception e) { + logger.warn("Could not remove " + resultItem.get().objectName()); + } + + if ( shouldDeleteBucket ) { + // Lastly, delete the empty bucket. + minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build()); + } + } + + public boolean locationInStore(String location) { + return location.startsWith(endpoint); + } + + private void deleteFile(String fileObjKeyName, String bucketName) throws Exception { + minioClient.removeObject(RemoveObjectArgs.builder().bucket(bucketName).object(fileObjKeyName).build()); + } +} diff --git a/src/main/java/eu/openaire/urls_controller/util/S3ObjectStoreMinIO.java b/src/main/java/eu/openaire/urls_controller/util/S3ObjectStoreMinIO.java deleted file mode 100644 index 642e803..0000000 --- a/src/main/java/eu/openaire/urls_controller/util/S3ObjectStoreMinIO.java +++ /dev/null @@ -1,226 +0,0 @@ -package eu.openaire.urls_controller.util; - -import io.minio.*; -import io.minio.messages.Bucket; -import io.minio.messages.Item; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.List; -import java.util.Scanner; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - - -public class S3ObjectStoreMinIO { - - private static final Logger logger = LoggerFactory.getLogger(S3ObjectStoreMinIO.class); - - public static String endpoint = null; // This is useful to be "public", to test file-locations. - private static String accessKey = null; - private static String secretKey = null; - private static String region = null; - private static String bucketName = null; - - private static MinioClient minioClient; - - public static final boolean shouldEmptyBucket = false; // Set true only for testing! - public static final String credentialsFilePath = System.getProperty("user.dir") + File.separator + "S3_minIO_credentials.txt"; - private static final boolean shouldShowAllS3Buckets = false; - - - /** - * This must be called before any other methods. - * */ - public S3ObjectStoreMinIO() - { - // Take the credentials from the file. - Scanner myReader = null; - try { - File credentialsFile = new File(credentialsFilePath); - if ( !credentialsFile.exists() ) { - throw new RuntimeException("credentialsFile \"" + credentialsFilePath + "\" does not exists!"); - } - myReader = new Scanner(credentialsFile); - if ( myReader.hasNextLine() ) { - String[] credentials = myReader.nextLine().split(","); - if ( credentials.length < 5 ) { - throw new RuntimeException("Not all credentials were retrieved from file \"" + credentialsFilePath + "\"!"); - } - endpoint = credentials[0].trim(); - accessKey = credentials[1].trim(); - secretKey = credentials[2].trim(); - region = credentials[3].trim(); - bucketName = credentials[4].trim(); - } - } catch (Exception e) { - String errorMsg = "An error prevented the retrieval of the minIO credentials from the file: " + credentialsFilePath + "\n" + e.getMessage(); - logger.error(errorMsg, e); - System.err.println(errorMsg); - System.exit(53); - } finally { - if ( myReader != null ) - myReader.close(); - } - - if ( (endpoint == null) || (accessKey == null) || (secretKey == null) || (region == null) || (bucketName == null) ) { - String errorMsg = "No \"endpoint\" or/and \"accessKey\" or/and \"secretKey\" or/and \"region\" or/and \"bucketName\" could be retrieved from the file: " + credentialsFilePath; - logger.error(errorMsg); - System.err.println(errorMsg); - System.exit(54); - } - // It's not safe, nor helpful to show the credentials in the logs. - - minioClient = MinioClient.builder().endpoint(endpoint).credentials(accessKey, secretKey).region(region).build(); - - boolean bucketExists = false; - try { - bucketExists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build()); - } catch (Exception e) { - String errorMsg = "There was a problem while checking if the bucket \"" + bucketName + "\" exists!\n" + e.getMessage(); - logger.error(errorMsg); - System.err.println(errorMsg); - System.exit(55); - } - - // Keep this commented-out to avoid objects-deletion by accident. The code is open-sourced, so it's easy to enable this ability if we really want it (e.g. for testing). -/* if ( bucketExists && shouldEmptyBucket ) { - emptyBucket(bucketName, false); - //throw new RuntimeException("stop just for test!"); - }*/ - - // Make the bucket, if not exist. - try { - if ( !bucketExists ) { - logger.info("Bucket \"" + bucketName + "\" does not exist! Going to create it.."); - minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build()); - } - else - logger.debug("Bucket \"" + bucketName + "\" already exists."); - } catch (Exception e) { - String errorMsg = "Could not create the bucket \"" + bucketName + "\"!"; - logger.error(errorMsg ,e); - System.err.println(errorMsg); - System.exit(56); - } - - if ( shouldShowAllS3Buckets ) { - List buckets = null; - try { - buckets = minioClient.listBuckets(); - logger.debug("The buckets in the S3 ObjectStore are:"); - for ( Bucket bucket : buckets ) { - logger.debug(bucket.name()); - } - } catch (Exception e) { - logger.warn("Could not listBuckets: " + e.getMessage()); - } - } - } - - - public static final Pattern EXTENSION_PATTERN = Pattern.compile("(\\.[^.]+)$"); - - /** - * @param fileObjKeyName = "**File object key name**"; - * @param fileFullPath = "**Path of the file to upload**"; - * @return - */ - public static String uploadToS3(String fileObjKeyName, String fileFullPath) - { - String contentType = null; - - // Take the Matcher to retrieve the extension. - Matcher extensionMatcher = EXTENSION_PATTERN.matcher(fileFullPath); - if ( extensionMatcher.find() ) { - String extension = null; - if ( (extension = extensionMatcher.group(0)) == null ) - contentType = "application/pdf"; - else { - if ( extension.equals("pdf") ) - contentType = "application/pdf"; - /*else if ( *//* TODO - other-extension-match *//* ) - contentType = "application/pdf"; */ - else - contentType = "application/pdf"; - } - } else { - logger.warn("The file with key \"" + fileObjKeyName + "\" does not have a file-extension! Setting the \"pdf\"-mimeType."); - contentType = "application/pdf"; - } - - ObjectWriteResponse response; - try { - response = minioClient.uploadObject(UploadObjectArgs.builder() - .bucket(bucketName) - .object(fileObjKeyName).filename(fileFullPath) - .contentType(contentType).build()); - - // TODO - What if the fileObjKeyName already exists? - // Right now it gets overwritten (unless we add versioning, which is irrelevant for different objects..) - - } catch (Exception e) { - logger.error("Could not upload the file \"" + fileObjKeyName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e); - return null; - } - - String s3Url = endpoint + "/" + bucketName + "/" + fileObjKeyName; // Be aware: This url works only if the access to the bucket is public. - //logger.debug("Uploaded file \"" + fileObjKeyName + "\". The s3Url is: " + s3Url); - return s3Url; - } - - - public static boolean emptyBucket(String bucketName, boolean shouldDeleteBucket) - { - logger.warn("Going to " + (shouldDeleteBucket ? "delete" : "empty") + " bucket \"" + bucketName + "\""); - - // First list the objects of the bucket. - Iterable> results; - try { - results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build()); - } catch (Exception e) { - logger.error("Could not retrieve the list of objects of bucket \"" + bucketName + "\"!"); - return false; - } - - // Then, delete the objects. - for ( Result resultItem : results ) { - try { - if ( !deleteFile(resultItem.get().objectName(), bucketName) ) { - logger.error("Cannot proceed with bucket deletion, since only an empty bucket can be removed!"); - return false; - } - } catch (Exception e) { - logger.error("Error getting the object from resultItem: " + resultItem.toString() + "\nThe bucket \"" + bucketName + "\" will not be able to be deleted! Exception message: " + e.getMessage()); - return false; - } - } - - if ( shouldDeleteBucket ) { - // Lastly, delete the empty bucket. - try { - minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build()); - } catch (Exception e) { - logger.error("Could not delete the bucket \"" + bucketName + "\" from the S3 ObjectStore, exception: " + e.getMessage(), e); - return false; - } - } - - return true; - } - - - public static boolean deleteFile(String fileObjKeyName, String bucketName) - { - try { - minioClient.removeObject(RemoveObjectArgs.builder().bucket(bucketName).object(fileObjKeyName).build()); - } catch (Exception e) { - logger.error("Could not delete the file \"" + fileObjKeyName + "\" from the S3 ObjectStore, exception: " + e.getMessage(), e); - return false; - } - return true; - } - - -} diff --git a/src/main/java/eu/openaire/urls_controller/util/TestFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/TestFileUtils.java new file mode 100644 index 0000000..8645ed3 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/util/TestFileUtils.java @@ -0,0 +1,147 @@ +package eu.openaire.urls_controller.util; + +import com.google.common.collect.HashMultimap; +import eu.openaire.urls_controller.models.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.boot.configurationprocessor.json.JSONObject; +import org.springframework.core.io.Resource; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Scanner; + +@Component +public class TestFileUtils { + + private static final Logger logger = LoggerFactory.getLogger(TestFileUtils.class); + + @Value("classpath:testInputFiles/orderedList1000.json") + Resource testResource; + + public ThreadLocal duplicateIdUrlEntries = new ThreadLocal<>(); + public ThreadLocal inputScanner = new ThreadLocal<>(); // Every Thread has its own variable. + + private final int jsonBatchSize = 3000; + private final ThreadLocal fileIndex = new ThreadLocal<>(); + private final ThreadLocal unretrievableInputLines = new ThreadLocal<>(); + + private final String utf8Charset = "UTF-8"; + + public TestFileUtils() throws IOException { + String resourceFileName = "testInputFiles/orderedList1000.json"; + + InputStream inputStream = testResource.getInputStream(); + if ( inputStream == null ) + throw new RuntimeException("No resourceFile was found with name \"" + resourceFileName + "\"."); + + logger.debug("Going to retrieve the data from the inputResourceFile: " + resourceFileName); + + inputScanner.set(new Scanner(inputStream, utf8Charset)); + + fileIndex.set(0); + unretrievableInputLines.set(0); + duplicateIdUrlEntries.set(0); + } + + /** + * This method parses a Json file and extracts the urls, along with the IDs. + * @return HashMultimap + */ + public HashMultimap getNextIdUrlPairBatchFromJson() { + Task inputIdUrlTuple; + int expectedPathsPerID = 5; + int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID; + + HashMultimap idAndUrlMappedInput = HashMultimap.create(expectedIDsPerBatch, expectedPathsPerID); + + int curBeginning = fileIndex.get(); + + while ( inputScanner.get().hasNextLine() && (fileIndex.get() < (curBeginning + jsonBatchSize)) ) + {// While (!EOF) and inside the current url-batch, iterate through lines. + + //logger.debug("fileIndex: " + FileUtils.fileIndex.get()); // DEBUG! + + // Take each line, remove potential double quotes. + String retrievedLineStr = inputScanner.get().nextLine(); + //logger.debug("Loaded from inputFile: " + retrievedLineStr); // DEBUG! + + fileIndex.set(fileIndex.get() +1); + + if ( retrievedLineStr.isEmpty() ) { + unretrievableInputLines.set(unretrievableInputLines.get() +1); + continue; + } + + if ( (inputIdUrlTuple = jsonDecoder(retrievedLineStr)) == null ) { // Decode the jsonLine and take the two attributes. + logger.warn("A problematic inputLine found: \t" + retrievedLineStr); + unretrievableInputLines.set(unretrievableInputLines.get() +1); + continue; + } + + if ( !idAndUrlMappedInput.put(inputIdUrlTuple.getId(), inputIdUrlTuple.getUrl()) ) { // We have a duplicate url in the input.. log it here as we cannot pass it through the HashMultimap. It's possible that this as well as the original might be/give a docUrl. + duplicateIdUrlEntries.set(duplicateIdUrlEntries.get() +1); + } + } + + return idAndUrlMappedInput; + } + + /** + * This method decodes a Json String and returns its members. + * @param jsonLine String + * @return HashMap + */ + private Task jsonDecoder(String jsonLine) { + // Get ID and url and put them in the HashMap + String idStr = null; + String urlStr = null; + try { + JSONObject jObj = new JSONObject(jsonLine); // Construct a JSONObject from the retrieved jsonLine. + idStr = jObj.get("id").toString(); + urlStr = jObj.get("url").toString(); + } catch (JSONException je) { + logger.warn("JSONException caught when tried to parse and extract values from jsonLine: \t" + jsonLine, je); + return null; + } + + if ( urlStr.isEmpty() ) { + if ( !idStr.isEmpty() ) // If we only have the id, then go and log it. + logger.warn("The url was not found for id: \"" + idStr + "\""); + return null; + } + return new Task(idStr, urlStr, null); + } + + /** + * This method checks if there is no more input-data and returns true in that case. + * Otherwise, it returns false, if there is more input-data to be loaded. + * A "RuntimeException" is thrown if no input-urls were retrieved in general. + * @param isEmptyOfData + * @param isFirstRun + * @return finished loading / not finished + * @throws RuntimeException + */ + public boolean isFinishedLoading(boolean isEmptyOfData, boolean isFirstRun) { + if ( isEmptyOfData ) { + if ( isFirstRun ) + logger.error("Could not retrieve any urls from the inputFile!"); + else + logger.debug("Done loading " + getCurrentlyLoadedUrls() + " urls from the inputFile."); + return true; + } + return false; + } + + /** + * This method returns the number of (non-heading, non-empty) lines we have read from the inputFile. + * @return loadedUrls + */ + private int getCurrentlyLoadedUrls() { // In the end, it gives the total number of urls we have processed. + return fileIndex.get() - unretrievableInputLines.get(); + } +} diff --git a/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java b/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java index d47155a..c1da4a4 100644 --- a/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java +++ b/src/main/java/eu/openaire/urls_controller/util/UriBuilder.java @@ -13,73 +13,72 @@ import java.net.URL; public class UriBuilder { - private static final Logger logger = LoggerFactory.getLogger(UriBuilder.class); - - public static String baseUrl = null; - - public UriBuilder(Environment environment) - { - baseUrl = "http"; - String sslEnabled = environment.getProperty("server.ssl.enabled"); - if (sslEnabled == null) { // It's expected to not exist if there is no SSL-configuration. - logger.warn("No property \"server.ssl.enabled\" was found in \"application.properties\". Continuing with plain HTTP.."); - sslEnabled = "false"; - } - baseUrl += sslEnabled.equals("true") ? "s" : ""; - baseUrl += "://"; - - String hostName = getPublicIP(); - if ( hostName == null ) - hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null. - - baseUrl += hostName; - String serverPort = environment.getProperty("server.port"); - if (serverPort == null) { // This is unacceptable! - logger.error("No property \"server.port\" was found in \"application.properties\"!"); - System.exit(-1); // Well, I guess the Spring Boot would not start in this case anyway. - } - baseUrl += ":" + serverPort; - - String baseInternalPath = environment.getProperty("server.servlet.context-path"); - if ( baseInternalPath != null ) { - if ( !baseInternalPath.startsWith("/") ) - baseUrl += "/"; - baseUrl += baseInternalPath; - if ( !baseInternalPath.endsWith("/") ) - baseUrl += "/"; - } else { - logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected. - baseUrl += "/"; - } - - logger.debug("ServerBaseURL: " + baseUrl); - } - - private static String getPublicIP() - { - String publicIpAddress = ""; - URL url_name; - try { - url_name = new URL("https://api.ipify.org/"); - } catch (MalformedURLException mue) { - logger.warn(mue.getMessage()); - return null; - } - try (BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) { - publicIpAddress = bf.readLine().trim(); - } catch (Exception e) { - logger.warn("Cannot get the publicIP address for this machine!", e); - return null; - } - return publicIpAddress; - } - - public static String getBaseUrl() { - return baseUrl; - } - - public static void setBaseUrl(String baseUrl) { - UriBuilder.baseUrl = baseUrl; - } +// private static final Logger logger = LoggerFactory.getLogger(UriBuilder.class); +// +// public static String baseUrl = null; +// +// public UriBuilder(Environment environment) { +// baseUrl = "http"; +// String sslEnabled = environment.getProperty("server.ssl.enabled"); +// if (sslEnabled == null) { // It's expected to not exist if there is no SSL-configuration. +// logger.warn("No property \"server.ssl.enabled\" was found in \"application.properties\". Continuing with plain HTTP.."); +// sslEnabled = "false"; +// } +// baseUrl += sslEnabled.equals("true") ? "s" : ""; +// baseUrl += "://"; +// +// String hostName = getPublicIP(); +// if ( hostName == null ) +// hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null. +// +// baseUrl += hostName; +// String serverPort = environment.getProperty("server.port"); +// if (serverPort == null) { // This is unacceptable! +// logger.error("No property \"server.port\" was found in \"application.properties\"!"); +// System.exit(-1); // Well, I guess the Spring Boot would not start in this case anyway. +// } +// baseUrl += ":" + serverPort; +// +// String baseInternalPath = environment.getProperty("server.servlet.context-path"); +// if ( baseInternalPath != null ) { +// if ( !baseInternalPath.startsWith("/") ) +// baseUrl += "/"; +// baseUrl += baseInternalPath; +// if ( !baseInternalPath.endsWith("/") ) +// baseUrl += "/"; +// } else { +// logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected. +// baseUrl += "/"; +// } +// +// logger.debug("ServerBaseURL: " + baseUrl); +// } +// +// private String getPublicIP() +// { +// String publicIpAddress = ""; +// URL url_name; +// try { +// url_name = new URL("https://api.ipify.org/"); +// } catch (MalformedURLException mue) { +// logger.warn(mue.getMessage()); +// return null; +// } +// try (BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) { +// publicIpAddress = bf.readLine().trim(); +// } catch (Exception e) { +// logger.warn("Cannot get the publicIP address for this machine!", e); +// return null; +// } +// return publicIpAddress; +// } +// +// public static String getBaseUrl() { +// return baseUrl; +// } +// +// public static void setBaseUrl(String baseUrl) { +// UriBuilder.baseUrl = baseUrl; +// } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 04f472c..c5ac15f 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -15,13 +15,27 @@ server.port = 1880 # Server api path server.servlet.context-path=/api +#Service config +services.pdfaggregation.controller.db.oldDatabaseName = pdfaggregation_i +services.pdfaggregation.controller.db.databaseName = pdfAggregationDatabase +services.pdfaggregation.controller.baseTargetLocation = /tmp/ +services.pdfaggregation.controller.maxAttemptsPerRecord = 3 +services.pdfaggregation.controller.assignmentLimit = 10000 + +services.pdfaggregation.controller.s3.endpoint = xa +services.pdfaggregation.controller.s3.accessKey = xa +services.pdfaggregation.controller.s3.secretKey = xa +services.pdfaggregation.controller.s3.region = xa +services.pdfaggregation.controller.s3.bucketName = xa +services.pdfaggregation.controller.s3.shouldEmptyBucket = false +services.pdfaggregation.controller.s3.shouldShowAllS3Buckets = true + + # Database - -spring.impala.url = jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/ -# Note: The "UseNativeQuery" does not work with the PreparedStatements! Also, the url does not work without the ending "/" -# The username and the password do not matter, since this app is always run in an pre-authenticated machine. - -spring.impala.driver-class-name = com.cloudera.impala.jdbc41.Driver +spring.datasource.url=jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/ +spring.datasource.username= +spring.datasource.password= +spring.datasource.driver-class-name=com.cloudera.impala.jdbc41.Driver spring.datasource.hikari.pool-name=ControllerPool spring.datasource.hikari.maximumPoolSize=20