From e9bede5c456c2361f77dcbd6c7175131c2ce6224 Mon Sep 17 00:00:00 2001 From: Antonis Lempesis Date: Tue, 1 Feb 2022 02:08:02 +0200 Subject: [PATCH] more fixes --- .../configuration/ImpalaConnector.java | 13 +++++--- .../controllers/UrlController.java | 33 ++++++++++--------- .../urls_controller/util/FileUtils.java | 14 +++++--- 3 files changed, 35 insertions(+), 25 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java index 6f2b09f6..b0309e9f 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -19,13 +19,16 @@ public final class ImpalaConnector { @Autowired private JdbcTemplate jdbcTemplate; - @Value("${services.pdfaggregation.controller.db.oldDatabaseName}") - public static String oldDatabaseName; - @Value("${services.pdfaggregation.controller.db.databaseName}") - public static String databaseName; + private final String oldDatabaseName; + private final String databaseName; public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database. + public ImpalaConnector(@Value("${services.pdfaggregation.controller.db.oldDatabaseName}") String oldDatabaseName, + @Value("${services.pdfaggregation.controller.db.databaseName}") String databaseName) { + this.oldDatabaseName = oldDatabaseName; + this.databaseName = databaseName; + } @PostConstruct public void init() { logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes."); @@ -60,7 +63,7 @@ public final class ImpalaConnector { jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".assignment"); - jdbcTemplate.execute("DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE"); + jdbcTemplate.execute("DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt"); diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index af7cf07f..0b9b3b9f 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -44,6 +44,9 @@ public class UrlController { @Value("${services.pdfaggregation.controller.assignmentLimit}") private int assignmentLimit; + @Value("${services.pdfaggregation.controller.db.databaseName}") + private String databaseName; + @GetMapping("") public ResponseEntity getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) { @@ -72,24 +75,24 @@ public class UrlController { String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" + "from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" + "select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" + - "from " + ImpalaConnector.databaseName + ".publication p\n" + - "join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" + - "join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" + - "left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" + + "from " + databaseName + ".publication p\n" + + "join " + databaseName + ".publication_urls pu on pu.id=p.id\n" + + "join " + databaseName + ".datasource d on d.id=p.datasourceid\n" + + "left outer join (select count(a.id) as counts, a.id from " + databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" + "left outer join (\n" + - " select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" + + " select a.id, a.original_url from " + databaseName + ".assignment a\n" + " union all\n" + - " select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" + - "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecord + " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + + " select pl.id, pl.original_url from " + databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" + + "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecord + " and not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + "limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" + "order by coalesce(attempt_count, 0), reverse(pubid), url\n" + "limit " + assignmentsLimit + ") as findAssignmentsQuery"; // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. - String createAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; - String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment"; - String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment"; + String createAssignmentsQuery = "create table " + databaseName + ".current_assignment as \n" + findAssignmentsQuery; + String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + databaseName + ".current_assignment"; + String getAssignmentsQuery = "select * from " + databaseName + ".current_assignment"; List assignments = new ArrayList<>(); @@ -171,8 +174,8 @@ public class UrlController { // Write the Assignment details to the assignment-table. // The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party. - String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n" - + "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data"; + String insertAssignmentsQuery = "insert into " + databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n" + + "from (\n select pubid, url from " + databaseName + ".current_assignment) as pub_data"; try { jdbcTemplate.execute(insertAssignmentsQuery); @@ -252,8 +255,8 @@ public class UrlController { ImpalaConnector.databaseLock.lock(); // Store the workerReport into the database. - String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; + String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; String payloadErrorMsg = null; int failedCount = 0; @@ -372,7 +375,7 @@ public class UrlController { } private String dropCurrentAssignmentTable() { - String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector.databaseName + ".current_assignment PURGE"; + String dropCurrentAssignmentsQuery = "DROP TABLE " + databaseName + ".current_assignment PURGE"; try { jdbcTemplate.execute(dropCurrentAssignmentsQuery); diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 1e084768..9030a1ce 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -40,6 +40,10 @@ public class FileUtils { @Autowired private FileUnZipper fileUnZipper; + @Value("${services.pdfaggregation.controller.db.databaseName}") + private String databaseName; + + public enum UploadFullTextsResponse {successful, unsuccessful, databaseError} public FileUtils() throws RuntimeException { @@ -69,10 +73,10 @@ public class FileUtils { parameter = " '" + parameter + "'"; // This will be a "string-check". try { - jdbcTemplate.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter); - jdbcTemplate.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE"); - jdbcTemplate.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName); - jdbcTemplate.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName); + jdbcTemplate.execute("CREATE TABLE " + databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + databaseName + "." + tableName + " " + whereClause + parameter); + jdbcTemplate.execute("DROP TABLE " + databaseName + "." + tableName + " PURGE"); + jdbcTemplate.execute("ALTER TABLE " + databaseName + "." + tableName + "_tmp RENAME TO " + databaseName + "." + tableName); + jdbcTemplate.execute("COMPUTE STATS " + databaseName + "." + tableName); } catch (DataAccessException e) { errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n"; logger.error(errorMsg, e); @@ -104,7 +108,7 @@ public class FileUtils { ImpalaConnector.databaseLock.lock(); - String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ; + String getFileLocationForHashQuery = "select `location` from " + databaseName + ".payload where `hash` = ?" ; // Get the file-locations. int numFullTextUrlsFound = 0;