Upgrade payload-table to payload-view which consists of three separate payload tables: "payload_legacy", "payload_aggregated" and "payload_bulk_import".

This commit is contained in:
Lampros Smyrnaios 2023-04-10 15:55:50 +03:00
parent 37363100fd
commit c39fef2654
4 changed files with 63 additions and 28 deletions

View File

@ -69,10 +69,13 @@ public class ImpalaConnector {
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".datasource stored as parquet as select * from " + initialDatabaseName + ".datasource");
jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".datasource");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".payload_legacy stored as parquet as select * from " + initialDatabaseName + ".payload_legacy");
jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".payload_legacy");
databaseName = testDatabaseName;
} else {
logger.info("Going to create or validate the tables that are populated by the Controller, for the \"initialDatabase\" = \"" + initialDatabaseName + "\"");
// Note that for the "initialDatabase", the initial 4 tables are expected to be created either manually or by other pieces of software, as views of the contents of the Graph.
// Note that for the "initialDatabase", the initial 5 tables are expected to be created either manually or by other pieces of software, as views of the contents of the Graph.
databaseName = initialDatabaseName;
}
@ -87,8 +90,24 @@ public class ImpalaConnector {
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` bigint, status string, error_class string, error_message string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload");
// Create a VIEW "payload" which consists of 3 different tables:
// 1) The "payload_legacy" table, which contains data older than 2022, which is imported by previous full-text aggregation processes.
// 2) The "payload_aggregated" table, which contains data from 2022 onwards, collected by the new PDF-Aggregation-Service.
// 3) The "payload_bulk_import", which contains data collected from the bulk-imported content from datasources like "arXiv".
// So, each aggregation process will "load" its contents to the right table, but in order to get the "total" metrics, we can just query the "payload" view.
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload_aggregated (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload_aggregated");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload_bulk_import (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload_bulk_import");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + databaseName + ".payload " +
"AS SELECT * from " + databaseName + ".payload_legacy " +
"UNION ALL SELECT * FROM " + databaseName +".payload_aggregated " +
"UNION ALL SELECT * FROM " + databaseName + ".payload_bulk_import");
// We do not do the "compute stats" for the view, since we get the following error: "COMPUTE STATS not supported for view: pdfaggregationdatabase_payloads_view.payload".
logger.info("The " + (isTestEnvironment ? "TEST-" : "") + "database \"" + databaseName + "\" and its tables were created or validated.");
}

View File

@ -96,7 +96,7 @@ public class UrlsServiceImpl implements UrlsService {
" on attempts.id=p.id\n" +
" left outer join (select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
" union all\n" +
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing\n" +
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing\n" + // Here we access the payload-VIEW which includes the three payload-tables.
" on existing.id=p.id and existing.original_url=pu.url\n" +
" where d.allow_harvest=true and existing.id is null\n" +
((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below.
@ -263,7 +263,7 @@ public class UrlsServiceImpl implements UrlsService {
if ( hasAttemptParquetFileProblem )
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignments);
else if ( hasPayloadParquetFileProblem )
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload\", for batch-assignments_" + curReportAssignments);
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignments);
else
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files.");
}
@ -275,16 +275,16 @@ public class UrlsServiceImpl implements UrlsService {
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt");
if ( ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload");
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated, "payload_aggregated");
ImpalaConnector.databaseLock.unlock();
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignments);
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload\" table, for batch-assignments_" + curReportAssignments);
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignments);
else
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignments);
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
@ -300,7 +300,7 @@ public class UrlsServiceImpl implements UrlsService {
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} catch (Exception e) {
String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage();
String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
@ -324,7 +324,7 @@ public class UrlsServiceImpl implements UrlsService {
}
if ( ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
@ -391,7 +391,7 @@ public class UrlsServiceImpl implements UrlsService {
{
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload_aggregated" table for previously handled tasks.
return fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
}

View File

@ -75,7 +75,9 @@ public class ParquetFileUtils {
public final String parquetHDFSDirectoryPathAttempts;
public final String parquetHDFSDirectoryPathPayloads;
// The "payloads_legacy" are not handled by the Controller (no data is added there), they just exist in the database and are taken into account when performing READ queries to the "payload" VIEW.
public final String parquetHDFSDirectoryPathPayloadsAggregated;
public final String parquetHDFSDirectoryPathPayloadsBulkImport;
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
@ -123,8 +125,9 @@ public class ParquetFileUtils {
if ( isTestEnvironment ) // Make sure the hdfs-remote-dir is different for running tests, in order to not cause conflicts with production.
hdfsParquetBaseDir += "test/";
this.parquetHDFSDirectoryPathPayloads = hdfsParquetBaseDir + "payloads/";
this.parquetHDFSDirectoryPathAttempts = hdfsParquetBaseDir + "attempts/";
this.parquetHDFSDirectoryPathPayloadsAggregated = hdfsParquetBaseDir + "payloads_aggregated/";
this.parquetHDFSDirectoryPathPayloadsBulkImport = hdfsParquetBaseDir + "payloads_bulk_import/";
this.fileUtils = fileUtils;
createRemoteParquetDirectories(hdfsParquetBaseDir);
}
@ -178,7 +181,7 @@ public class ParquetFileUtils {
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
}
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath));
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath, parquetHDFSDirectoryPathPayloadsAggregated));
});
}
@ -249,7 +252,7 @@ public class ParquetFileUtils {
}
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath, String parquetHDFSDirectoryPathPayloads)
{
List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2));
GenericData.Record record;
@ -462,8 +465,9 @@ public class ParquetFileUtils {
String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName;
boolean payloadCreationSuccessful = true;
boolean attemptCreationSuccessful = true;
boolean payloadAggregatedCreationSuccessful = true;
boolean payloadBulkImportCreationSuccessful = true;
// Get the "fileStatuses" of the directories (main and subdirectories) in one request.
try {
@ -484,7 +488,8 @@ public class ParquetFileUtils {
if ( statusCode == 404 ) {
logger.info("The directory \"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
attemptCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsParams);
payloadCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkDirsParams);
payloadAggregatedCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsParams);
payloadBulkImportCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsParams);
}
else {
// Check the json-response, to see if all the subdirectories exist.
@ -500,7 +505,8 @@ public class ParquetFileUtils {
//logger.debug("\"jsonResponse\":\n" + jsonResponse); // DEBUG!
boolean foundAttemptsDir = false;
boolean foundPayloadsDir = false;
boolean foundPayloadsAggregatedDir = false;
boolean foundPayloadsBulkImportDir = false;
try { // Parse the jsonData
JSONObject jObj = new JSONObject(jsonResponse); // Construct a JSONObject from the retrieved jsonData.
@ -520,8 +526,12 @@ public class ParquetFileUtils {
if ( dirPath.equals("attempts") )
foundAttemptsDir = true;
else if ( dirPath.equals("payloads") )
foundPayloadsDir = true;
else if ( dirPath.equals("payloads_aggregated") )
foundPayloadsAggregatedDir = true;
else if ( dirPath.equals("payloads_bulk_import") )
foundPayloadsBulkImportDir = true;
else
logger.warn("Unknown remote parquet directory found: " + dirPath);
}
} catch ( JSONException je ) { // In case any of the above "json-keys" was not found.
logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse);
@ -538,19 +548,25 @@ public class ParquetFileUtils {
} else
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
if ( !foundPayloadsDir ) {
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloads + "\" does not exist! Going to create it.");
payloadCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkDirsParams);
if ( !foundPayloadsAggregatedDir ) {
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it.");
payloadAggregatedCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsParams);
} else
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloads + "\" exists.");
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists.");
if ( !foundPayloadsBulkImportDir ) {
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it.");
payloadBulkImportCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsParams);
} else
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists.");
}
} catch (Exception e) {
logger.error("", e);
return false;
}
return (attemptCreationSuccessful && payloadCreationSuccessful);
// We need both to be created in order for the app to function properly!
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
// We need all directories to be created in order for the app to function properly!
}

View File

@ -14,7 +14,7 @@ services:
db:
initialDatabaseName: pdfaggregation_i
testDatabaseName: pdfaggregationdatabase_parquet_test_new
testDatabaseName: pdfaggregationdatabase_payloads_view
assignmentLimit: 10000
maxAttemptsPerRecord: 3