diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java index 19c05f5..ebfc9c0 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -19,16 +19,21 @@ public class ImpalaConnector { @Autowired private JdbcTemplate jdbcTemplate; + private final boolean isTestEnvironment; private final String initialDatabaseName; - private final String databaseName; + private final String testDatabaseName; + + public static String databaseName; public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database. - public ImpalaConnector(@Value("${services.pdfaggregation.controller.db.initialDatabaseName}") String initialDatabaseName, - @Value("${services.pdfaggregation.controller.db.databaseName}") String databaseName) { + public ImpalaConnector(@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, + @Value("${services.pdfaggregation.controller.db.initialDatabaseName}") String initialDatabaseName, + @Value("${services.pdfaggregation.controller.db.testDatabaseName}") String testDatabaseName) { + this.isTestEnvironment = isTestEnvironment; this.initialDatabaseName = initialDatabaseName; - this.databaseName = databaseName; + this.testDatabaseName = testDatabaseName; } @@ -45,26 +50,34 @@ public class ImpalaConnector { } - private void createDatabase() { - logger.info("Going to create (if not exist) the database \"" + databaseName + "\" and its tables. Also will fill some tables with data from database \"" + initialDatabaseName + "\"."); + private void createDatabase() + { + if ( isTestEnvironment ) { + logger.info("Going to create (if not exist) the database \"" + testDatabaseName + "\" and its tables. Also will fill some tables with data from database \"" + initialDatabaseName + "\"."); + jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + testDatabaseName); - jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + databaseName); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication stored as parquet as select * from " + initialDatabaseName + ".publication"); + jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".publication"); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + initialDatabaseName + ".publication"); - jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication_pids stored as parquet as select * from " + initialDatabaseName + ".publication_pids"); + jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".publication_pids"); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + initialDatabaseName + ".publication_pids"); - jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_pids"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication_urls stored as parquet as select * from " + initialDatabaseName + ".publication_urls"); + jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".publication_urls"); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + initialDatabaseName + ".publication_urls"); - jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_urls"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".datasource stored as parquet as select * from " + initialDatabaseName + ".datasource"); + jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".datasource"); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + initialDatabaseName + ".datasource"); - jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".datasource"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet"); + jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".assignment"); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet"); - jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".assignment"); + databaseName = testDatabaseName; // For the rest of the queries. + } else + databaseName = initialDatabaseName; + // For both cases, got check and create the tables which will be populated by the Controller. + + // Drop the "current_assignment" table. It is a temporary table which is created on-demand during execution. jdbcTemplate.execute("DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet"); @@ -73,7 +86,7 @@ public class ImpalaConnector { jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload"); - logger.info("The database \"" + databaseName + "\" and its tables were created or validated."); + logger.info("The " + (isTestEnvironment ? "test-" : "") + "database \"" + databaseName + "\" and its tables were created or validated."); } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java b/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java index 1b53130..8e849e4 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java @@ -1,9 +1,9 @@ package eu.openaire.urls_controller.controllers; +import eu.openaire.urls_controller.configuration.ImpalaConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.jdbc.core.JdbcTemplate; @@ -24,13 +24,11 @@ public class ImpalaController { @Autowired private JdbcTemplate jdbcTemplate; - @Value("${services.pdfaggregation.controller.db.databaseName}") - private String databaseName; @GetMapping("get10PublicationIdsTest") public ResponseEntity get10PublicationIdsTest() { - String query = "SELECT id FROM " + databaseName + ".publication LIMIT 10;"; + String query = "SELECT id FROM " + ImpalaConnector.databaseName + ".publication LIMIT 10;"; try { List publications = jdbcTemplate.queryForList(query, String.class); diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index 7422a96..b4e8f2d 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -46,12 +46,8 @@ public class UrlController { @Value("${services.pdfaggregation.controller.assignmentLimit}") private int assignmentLimit; - @Value("${services.pdfaggregation.controller.db.databaseName}") - private String databaseName; - private final AtomicInteger maxAttemptsPerRecordAtomic; - public UrlController(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord) { maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord); } @@ -86,16 +82,16 @@ public class UrlController { "from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count\n" + "from (\n" + "select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" + - "from " + databaseName + ".publication p\n" + - "join " + databaseName + ".publication_urls pu on pu.id=p.id\n" + - "join " + databaseName + ".datasource d on d.id=p.datasourceid\n" + - "left outer join (select count(a.id) as counts, a.id from " + databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" + - "left outer join (select a.id, a.original_url from " + databaseName + ".assignment a\n" + + "from " + ImpalaConnector.databaseName + ".publication p\n" + + "join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" + + "join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" + + "left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" + + "left outer join (select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" + "union all\n" + - "select pl.id, pl.original_url from " + databaseName + ".payload pl)\n" + + "select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl)\n" + "as existing on existing.id=p.id and existing.original_url=pu.url\n" + "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + - "\nand not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + + "\nand not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + "limit " + (assignmentsLimit * 10) + ")\n" + "as non_distinct_results\n" + "order by coalesce(attempt_count, 0), reverse(pubid), url\n" + @@ -105,9 +101,9 @@ public class UrlController { // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. //logger.debug(findAssignmentsQuery); // DEBUG! - String createCurrentAssignmentsQuery = "create table " + databaseName + ".current_assignment as \n" + findAssignmentsQuery; - String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + databaseName + ".current_assignment"; - String getAssignmentsQuery = "select * from " + databaseName + ".current_assignment"; + String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; + String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment"; + String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment"; List assignments = new ArrayList<>(assignmentsLimit); @@ -182,8 +178,8 @@ public class UrlController { // Write the Assignment details to the assignment-table. // The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party. - String insertAssignmentsQuery = "insert into " + databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n" - + "from (\n select pubid, url from " + databaseName + ".current_assignment) as pub_data"; + String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n" + + "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data"; try { jdbcTemplate.execute(insertAssignmentsQuery); @@ -268,10 +264,10 @@ public class UrlController { logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments); // Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons. - final String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + final String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; final int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; - final String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; + final String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; final int[] attemptArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; final AtomicInteger failedCount = new AtomicInteger(0); @@ -367,7 +363,7 @@ public class UrlController { private String dropCurrentAssignmentTable() { - String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE"; + String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE"; try { jdbcTemplate.execute(dropCurrentAssignmentsQuery); return null; // All good. No error-message. diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java b/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java index 287ce6a..e943d6a 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java @@ -24,20 +24,20 @@ public class FileUnZipper { ZipEntry zipEntry = zis.getNextEntry(); while ( zipEntry != null ) { Path targetPath = zipSlipProtect(zipEntry, target); - if ( zipEntry.getName().endsWith(File.separator) ) // If we have a directory. Files.createDirectories(targetPath); else { - // Some zip -files store only the file-paths and not separate directories. We need to create parent directories, e.g data/folder/file.txt + // Some zip-files store only the file-paths and not separate directories. We need to create parent directories, e.g data/folder/file.txt Path parentPath = targetPath.getParent(); if ( (parentPath != null) && Files.notExists(parentPath) ) { Files.createDirectories(parentPath); } Files.copy(zis, targetPath, StandardCopyOption.REPLACE_EXISTING); // Copy an individual entry. } + zis.closeEntry(); zipEntry = zis.getNextEntry(); } - zis.closeEntry(); + // Here the "zipEntry" will always be "null", so no "current ZIP entry" will be open, (so the "closeEntry()" is not needed). } } diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 4137774..95053fc 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -42,10 +42,6 @@ public class FileUtils { @Autowired private FileUnZipper fileUnZipper; - @Value("${services.pdfaggregation.controller.db.databaseName}") - private String databaseName; - - public enum UploadFullTextsResponse {successful, unsuccessful, databaseError} @@ -73,10 +69,10 @@ public class FileUtils { parameter = " '" + parameter + "'"; // This will be a "string-check", thus the single-quotes. try { - jdbcTemplate.execute("CREATE TABLE " + databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + databaseName + "." + tableName + " " + whereClause + parameter); - jdbcTemplate.execute("DROP TABLE " + databaseName + "." + tableName + " PURGE"); - jdbcTemplate.execute("ALTER TABLE " + databaseName + "." + tableName + "_tmp RENAME TO " + databaseName + "." + tableName); - jdbcTemplate.execute("COMPUTE STATS " + databaseName + "." + tableName); + jdbcTemplate.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter); + jdbcTemplate.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE"); + jdbcTemplate.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName); + jdbcTemplate.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName); } catch (DataAccessException e) { errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n"; logger.error(errorMsg, e); @@ -115,7 +111,7 @@ public class FileUtils { int numFilesFoundFromPreviousAssignmentsBatches = 0; int urlReportsSize = urlReports.size(); HashMultimap allFileNamesWithPayloads = HashMultimap.create((urlReportsSize / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. - String getFileLocationForHashQuery = "select `location` from " + databaseName + ".payload where `hash` = ? limit 1" ; + String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1" ; final int[] hashArgType = new int[] {Types.VARCHAR}; ImpalaConnector.databaseLock.lock(); @@ -202,10 +198,17 @@ public class FileUtils { int failedBatches = 0; for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) { List fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter); - HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId); - if ( conn == null ) { - failedBatches ++; - continue; // To the next batch. + HttpURLConnection conn; + try { + conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId); + if ( conn == null ) { + failedBatches ++; + continue; // To the next batch. + } + } catch (RuntimeException re) { + // The "cause" was logged inside "getConnection()". + failedBatches += (1 + (numOfBatches - batchCounter)); // Add this and the rest of the failed batches. + break; } // Get the extracted files. @@ -277,6 +280,15 @@ public class FileUtils { continue; } + // This file is related with some payloads, in a sense that these payloads have urls which lead to the same full-text url. + // These payloads might have different IDs and sourceUrls. But, in the end, the different sourceUrls give the same full-text. + // Below, we make sure we pick the database from the payload which has the same id as the full-text's name. + // If there are multiple payloads with the same id, which point to the same file, then we can take whatever datasource we want from those payloads. + // It is possible that payloads with same IDs, but different sourceUrls pointing to the same full-text, can be related with different datasources + // (especially for IDs of type: "doiboost_____::XXXXXXXXXXXXXXXXXXXXX") + // It does not really matter, since the first-ever payload to give this full-text could very well be another one, + // since the crawling happens in multiple threads which compete with each other for CPU time. + String datasourceId = null; String hash = null; boolean isFound = false; @@ -325,7 +337,8 @@ public class FileUtils { } - private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) { + private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException + { baseUrl += batchNum + "/"; String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch); //logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]"); @@ -341,7 +354,12 @@ public class FileUtils { return null; } } catch (Exception e) { - logger.warn("Problem when requesting the ZipFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + e.getMessage()); + String exMessage = e.getMessage(); + logger.warn("Problem when requesting the ZipFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + exMessage); + if ( exMessage.contains("Connection refused") ) { + logger.error("Since we received a \"Connection refused\", all of the remaining batches (" + (totalBatches - batchNum) + ") will not be requested!"); + throw new RuntimeException(); + } return null; } return conn; diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f8e1f5a..33ec900 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -3,9 +3,16 @@ server.port = 1880 # Server api path server.servlet.context-path=/api -#Service config +# Service config + +services.pdfaggregation.controller.isTestEnvironment = false +# In case the "isTestEnvironment" is "true", the "testDatabase" below and its tables are created (if not exist). +# The tables "datasource", "publication", "publication_pids" and "publication_urls" are filled with the data from the same tables existing in the "initialDatabase". +# In case the "isTestEnvironment" is "false", the "initialDatabase" is used. The Controller assumes that the above 4 tables are present, and only creates the following tables: +# "assignment", "attempt" and "payload", which are populated during execution. + services.pdfaggregation.controller.db.initialDatabaseName = pdfaggregation_i -services.pdfaggregation.controller.db.databaseName = pdfaggregationdatabase_new_s3_names +services.pdfaggregation.controller.db.testDatabaseName = pdfaggregationdatabase_new_s3_names services.pdfaggregation.controller.baseTargetLocation = /tmp/ services.pdfaggregation.controller.maxAttemptsPerRecord = 3