From a23c918a42d5d9da2d19736044b2c0b5bea91da3 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Tue, 5 Apr 2022 00:01:44 +0300 Subject: [PATCH] - Fix a "@JsonProperty" annotation inside "Payload.java". - Fix a "@Value" annotation inside "FileUtils.java". - Add a new database and show its name along with the initial's name in the logs. - Code cleanup and improvement. --- .../configuration/ImpalaConnector.java | 16 +++++++------- .../controllers/UrlController.java | 22 ++++++++++--------- .../urls_controller/models/Payload.java | 4 ++-- .../urls_controller/util/FileUtils.java | 9 ++++---- src/main/resources/application.properties | 5 +++-- 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java index 63ec6f7..19c05f5 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -19,15 +19,15 @@ public class ImpalaConnector { @Autowired private JdbcTemplate jdbcTemplate; - private final String oldDatabaseName; + private final String initialDatabaseName; private final String databaseName; public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database. - public ImpalaConnector(@Value("${services.pdfaggregation.controller.db.oldDatabaseName}") String oldDatabaseName, + public ImpalaConnector(@Value("${services.pdfaggregation.controller.db.initialDatabaseName}") String initialDatabaseName, @Value("${services.pdfaggregation.controller.db.databaseName}") String databaseName) { - this.oldDatabaseName = oldDatabaseName; + this.initialDatabaseName = initialDatabaseName; this.databaseName = databaseName; } @@ -46,20 +46,20 @@ public class ImpalaConnector { private void createDatabase() { - logger.info("Going to create the database and the tables, if they do not exist. Also will fill some tables with data from OpenAIRE."); + logger.info("Going to create (if not exist) the database \"" + databaseName + "\" and its tables. Also will fill some tables with data from database \"" + initialDatabaseName + "\"."); jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + databaseName); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + oldDatabaseName + ".publication"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + initialDatabaseName + ".publication"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication"); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + oldDatabaseName + ".publication_pids"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + initialDatabaseName + ".publication_pids"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_pids"); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + oldDatabaseName + ".publication_urls"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + initialDatabaseName + ".publication_urls"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_urls"); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + oldDatabaseName + ".datasource"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + initialDatabaseName + ".datasource"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".datasource"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet"); diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index 36e2ed1..7422a96 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -243,15 +243,16 @@ public class UrlController { return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg); } + int sizeOUrlReports = 0; List urlReports = workerReport.getUrlReports(); - if ( (urlReports == null) || urlReports.isEmpty() ) { + if ( (urlReports == null) || ((sizeOUrlReports = urlReports.size()) == 0) ) { String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!"; logger.error(errorMsg); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg); } long curReportAssignments = workerReport.getAssignmentRequestCounter(); - logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database."); + logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database."); // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId); @@ -259,12 +260,12 @@ public class UrlController { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!"); } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { - logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments); + logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignments); // The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available and continue with writing the attempts and the payloads. fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false); } - - logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments); + else + logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments); // Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons. final String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; @@ -276,7 +277,7 @@ public class UrlController { final AtomicInteger failedCount = new AtomicInteger(0); // Split the "UrlReports" into some sub-lists - int sizeOfEachSubList = (int)(urlReports.size() * 0.2); + int sizeOfEachSubList = (int)(sizeOUrlReports * 0.2); List> subLists = Lists.partition(urlReports, sizeOfEachSubList); // The above will create some sub-lists, each one containing 20% of total amount. @@ -309,7 +310,8 @@ public class UrlController { return null; }); - for ( int i = 0; i < subLists.size(); ++i ) { + int subListsSize = subLists.size(); + for ( int i = 0; i < subListsSize; ++i ) { int finalI = i; callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. runInsertsToAttemptTable(subLists.get(finalI), curReportAssignments, insertIntoAttemptBaseQuery, attemptArgTypes, failedCount); @@ -323,7 +325,7 @@ public class UrlController { insertsExecutor.invokeAll(callableTasks); } catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled. logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage()); - // TODO - This is a very rare case, but what should be done..? + // This is a very rare casa. At the moment, we just move on with table-merging. } catch (Exception e) { ImpalaConnector.databaseLock.unlock(); String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage(); @@ -332,7 +334,7 @@ public class UrlController { } int failedQueries = failedCount.get(); - String failedQueriesMsg = failedQueries + " out of " + (urlReports.size() *2) + " failed to be processed!"; + String failedQueriesMsg = failedQueries + " out of " + (sizeOUrlReports *2) + " failed to be processed!"; logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables" + ((failedQueries > 0) ? (", although " + failedQueriesMsg) : ".") + " Going to merge the parquet files for those tables."); @@ -368,7 +370,7 @@ public class UrlController { String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE"; try { jdbcTemplate.execute(dropCurrentAssignmentsQuery); - return null; + return null; // All good. No error-message. } catch (Exception e) { return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e); } diff --git a/src/main/java/eu/openaire/urls_controller/models/Payload.java b/src/main/java/eu/openaire/urls_controller/models/Payload.java index 82560c1..c2e8cf9 100644 --- a/src/main/java/eu/openaire/urls_controller/models/Payload.java +++ b/src/main/java/eu/openaire/urls_controller/models/Payload.java @@ -49,8 +49,8 @@ public class Payload { @JsonProperty("provenance") private String provenance; // "crawl:" - @JsonProperty("provenance") - private String datasourceId; // "crawl:" + @JsonProperty("datasourceId") + private String datasourceId; // This is NOT inserted into the "payload"-table. public Payload() {} diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 9883533..7f70805 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -87,8 +87,9 @@ public class FileUtils { } - @Value("services.pdfaggregation.controller.baseTargetLocation") + @Value("${services.pdfaggregation.controller.baseTargetLocation}") private String baseTargetLocation; + public static DecimalFormat df = new DecimalFormat("0.00"); private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:()]+\\.[\\w]{2,10})$"); @@ -233,7 +234,7 @@ public class FileUtils { } // Iterate over the files and upload them to S3. - int numUploadedFiles = 0; + //int numUploadedFiles = 0; for ( String fileName : fileNames ) { String fileFullPath = targetDirectory + fileName; @@ -260,7 +261,7 @@ public class FileUtils { String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath); setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url); - numUploadedFiles ++; + //numUploadedFiles ++; } catch (Exception e) { logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e); } @@ -276,7 +277,7 @@ public class FileUtils { } } // End of batches. - updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an s3Url have < null > file-data (some batches or uploads might have failed). + updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed). deleteDirectory(curAssignmentsBaseDir); if ( failedBatches == numOfBatches ) { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index a058e9f..f8e1f5a 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -4,8 +4,9 @@ server.port = 1880 server.servlet.context-path=/api #Service config -services.pdfaggregation.controller.db.oldDatabaseName = pdfaggregation_i -services.pdfaggregation.controller.db.databaseName = pdfAggregationDatabase +services.pdfaggregation.controller.db.initialDatabaseName = pdfaggregation_i +services.pdfaggregation.controller.db.databaseName = pdfaggregationdatabase_new_s3_names + services.pdfaggregation.controller.baseTargetLocation = /tmp/ services.pdfaggregation.controller.maxAttemptsPerRecord = 3 services.pdfaggregation.controller.assignmentLimit = 10000