From c39fef26549b76b89362d1b604a3945ff35b6dce Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Mon, 10 Apr 2023 15:55:50 +0300 Subject: [PATCH] Upgrade payload-table to payload-view which consists of three separate payload tables: "payload_legacy", "payload_aggregated" and "payload_bulk_import". --- .../configuration/ImpalaConnector.java | 25 ++++++++-- .../services/UrlsServiceImpl.java | 18 ++++---- .../util/ParquetFileUtils.java | 46 +++++++++++++------ src/main/resources/application.yml | 2 +- 4 files changed, 63 insertions(+), 28 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java index 751461c..bc580e3 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -69,10 +69,13 @@ public class ImpalaConnector { jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".datasource stored as parquet as select * from " + initialDatabaseName + ".datasource"); jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".datasource"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".payload_legacy stored as parquet as select * from " + initialDatabaseName + ".payload_legacy"); + jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".payload_legacy"); + databaseName = testDatabaseName; } else { logger.info("Going to create or validate the tables that are populated by the Controller, for the \"initialDatabase\" = \"" + initialDatabaseName + "\""); - // Note that for the "initialDatabase", the initial 4 tables are expected to be created either manually or by other pieces of software, as views of the contents of the Graph. + // Note that for the "initialDatabase", the initial 5 tables are expected to be created either manually or by other pieces of software, as views of the contents of the Graph. databaseName = initialDatabaseName; } @@ -87,8 +90,24 @@ public class ImpalaConnector { jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` bigint, status string, error_class string, error_message string) stored as parquet"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt"); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet"); - jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload"); + // Create a VIEW "payload" which consists of 3 different tables: + // 1) The "payload_legacy" table, which contains data older than 2022, which is imported by previous full-text aggregation processes. + // 2) The "payload_aggregated" table, which contains data from 2022 onwards, collected by the new PDF-Aggregation-Service. + // 3) The "payload_bulk_import", which contains data collected from the bulk-imported content from datasources like "arXiv". + + // So, each aggregation process will "load" its contents to the right table, but in order to get the "total" metrics, we can just query the "payload" view. + + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload_aggregated (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet"); + jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload_aggregated"); + + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload_bulk_import (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet"); + jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload_bulk_import"); + + jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + databaseName + ".payload " + + "AS SELECT * from " + databaseName + ".payload_legacy " + + "UNION ALL SELECT * FROM " + databaseName +".payload_aggregated " + + "UNION ALL SELECT * FROM " + databaseName + ".payload_bulk_import"); + // We do not do the "compute stats" for the view, since we get the following error: "COMPUTE STATS not supported for view: pdfaggregationdatabase_payloads_view.payload". logger.info("The " + (isTestEnvironment ? "TEST-" : "") + "database \"" + databaseName + "\" and its tables were created or validated."); } diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 24add38..a71e53c 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -96,7 +96,7 @@ public class UrlsServiceImpl implements UrlsService { " on attempts.id=p.id\n" + " left outer join (select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" + " union all\n" + - " select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing\n" + + " select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing\n" + // Here we access the payload-VIEW which includes the three payload-tables. " on existing.id=p.id and existing.original_url=pu.url\n" + " where d.allow_harvest=true and existing.id is null\n" + ((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below. @@ -263,7 +263,7 @@ public class UrlsServiceImpl implements UrlsService { if ( hasAttemptParquetFileProblem ) logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignments); else if ( hasPayloadParquetFileProblem ) - logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload\", for batch-assignments_" + curReportAssignments); + logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignments); else logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files."); } @@ -275,16 +275,16 @@ public class UrlsServiceImpl implements UrlsService { hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt"); if ( ! hasPayloadParquetFileProblem ) - hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload"); + hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated, "payload_aggregated"); ImpalaConnector.databaseLock.unlock(); if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem ) - throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments); + throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignments); else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem ) - logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload\" table, for batch-assignments_" + curReportAssignments); + logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignments); else - logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments); + logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignments); } catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled. logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage()); @@ -300,7 +300,7 @@ public class UrlsServiceImpl implements UrlsService { logger.error(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } catch (Exception e) { - String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage(); + String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage(); logger.error(errorMsg, e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } finally { @@ -324,7 +324,7 @@ public class UrlsServiceImpl implements UrlsService { } if ( ! hasPayloadParquetFileProblem ) { - mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null); + mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); @@ -391,7 +391,7 @@ public class UrlsServiceImpl implements UrlsService { { // This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data. // Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table. - // We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks. + // We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload_aggregated" table for previously handled tasks. return fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId); } diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index 8fe3d9c..d8c6788 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -75,7 +75,9 @@ public class ParquetFileUtils { public final String parquetHDFSDirectoryPathAttempts; - public final String parquetHDFSDirectoryPathPayloads; + // The "payloads_legacy" are not handled by the Controller (no data is added there), they just exist in the database and are taken into account when performing READ queries to the "payload" VIEW. + public final String parquetHDFSDirectoryPathPayloadsAggregated; + public final String parquetHDFSDirectoryPathPayloadsBulkImport; public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl, @@ -123,8 +125,9 @@ public class ParquetFileUtils { if ( isTestEnvironment ) // Make sure the hdfs-remote-dir is different for running tests, in order to not cause conflicts with production. hdfsParquetBaseDir += "test/"; - this.parquetHDFSDirectoryPathPayloads = hdfsParquetBaseDir + "payloads/"; this.parquetHDFSDirectoryPathAttempts = hdfsParquetBaseDir + "attempts/"; + this.parquetHDFSDirectoryPathPayloadsAggregated = hdfsParquetBaseDir + "payloads_aggregated/"; + this.parquetHDFSDirectoryPathPayloadsBulkImport = hdfsParquetBaseDir + "payloads_bulk_import/"; this.fileUtils = fileUtils; createRemoteParquetDirectories(hdfsParquetBaseDir); } @@ -178,7 +181,7 @@ public class ParquetFileUtils { System.exit(88); // Exit the whole app, as it cannot add the results to the database! } callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount. - return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath)); + return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath, parquetHDFSDirectoryPathPayloadsAggregated)); }); } @@ -249,7 +252,7 @@ public class ParquetFileUtils { } - public boolean createAndLoadParquetDataIntoPayloadTable(List urlReports, long curReportAssignments, String currentParquetPath) + public boolean createAndLoadParquetDataIntoPayloadTable(List urlReports, long curReportAssignments, String currentParquetPath, String parquetHDFSDirectoryPathPayloads) { List recordList = new ArrayList<>((int) (urlReports.size() * 0.2)); GenericData.Record record; @@ -462,8 +465,9 @@ public class ParquetFileUtils { String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName; - boolean payloadCreationSuccessful = true; boolean attemptCreationSuccessful = true; + boolean payloadAggregatedCreationSuccessful = true; + boolean payloadBulkImportCreationSuccessful = true; // Get the "fileStatuses" of the directories (main and subdirectories) in one request. try { @@ -484,7 +488,8 @@ public class ParquetFileUtils { if ( statusCode == 404 ) { logger.info("The directory \"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories."); attemptCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsParams); - payloadCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkDirsParams); + payloadAggregatedCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsParams); + payloadBulkImportCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsParams); } else { // Check the json-response, to see if all the subdirectories exist. @@ -500,7 +505,8 @@ public class ParquetFileUtils { //logger.debug("\"jsonResponse\":\n" + jsonResponse); // DEBUG! boolean foundAttemptsDir = false; - boolean foundPayloadsDir = false; + boolean foundPayloadsAggregatedDir = false; + boolean foundPayloadsBulkImportDir = false; try { // Parse the jsonData JSONObject jObj = new JSONObject(jsonResponse); // Construct a JSONObject from the retrieved jsonData. @@ -520,8 +526,12 @@ public class ParquetFileUtils { if ( dirPath.equals("attempts") ) foundAttemptsDir = true; - else if ( dirPath.equals("payloads") ) - foundPayloadsDir = true; + else if ( dirPath.equals("payloads_aggregated") ) + foundPayloadsAggregatedDir = true; + else if ( dirPath.equals("payloads_bulk_import") ) + foundPayloadsBulkImportDir = true; + else + logger.warn("Unknown remote parquet directory found: " + dirPath); } } catch ( JSONException je ) { // In case any of the above "json-keys" was not found. logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse); @@ -538,19 +548,25 @@ public class ParquetFileUtils { } else logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" exists."); - if ( !foundPayloadsDir ) { - logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloads + "\" does not exist! Going to create it."); - payloadCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkDirsParams); + if ( !foundPayloadsAggregatedDir ) { + logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it."); + payloadAggregatedCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsParams); } else - logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloads + "\" exists."); + logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists."); + + if ( !foundPayloadsBulkImportDir ) { + logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it."); + payloadBulkImportCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsParams); + } else + logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists."); } } catch (Exception e) { logger.error("", e); return false; } - return (attemptCreationSuccessful && payloadCreationSuccessful); - // We need both to be created in order for the app to function properly! + return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful); + // We need all directories to be created in order for the app to function properly! } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index ab3c509..b8a732e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -14,7 +14,7 @@ services: db: initialDatabaseName: pdfaggregation_i - testDatabaseName: pdfaggregationdatabase_parquet_test_new + testDatabaseName: pdfaggregationdatabase_payloads_view assignmentLimit: 10000 maxAttemptsPerRecord: 3