- Apply error-checking on individual CallableTasks and in tasks-batches related to the creation and upload of all the data related to the "attempt" and "payload" table. So, if no data could be uploaded for one or both tables, no "load"-queries will be executed for that/those tables.

- Catch the more general "Exception", inside "FileUtils.mergeParquetFiles()", in order to be certain that the "SQLException" can also be caught.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2022-12-09 12:46:06 +02:00
parent 0209d24068
commit bfdf06bd09
5 changed files with 207 additions and 48 deletions

View File

@ -1,9 +1,7 @@
package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Assignment;
import eu.openaire.urls_controller.models.Datasource;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.FileUtils;
@ -30,6 +28,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@ -271,27 +270,64 @@ public class UrlController {
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments);
List<Callable<Void>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOUrlReports, curReportAssignments, currentParquetPath, uploadFullTextsResponse);
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOUrlReports, curReportAssignments, currentParquetPath, uploadFullTextsResponse);
String errorMsgAttempts = null, errorMsgPayloads = null;
boolean hasAttemptParquetFileProblem = false;
boolean hasPayloadParquetFileProblem = false;
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
insertsExecutor.invokeAll(callableTasks);
List<Future<ParquetReport>> futures = insertsExecutor.invokeAll(callableTasks);
SumParquetSuccess sumParquetSuccess = checkParquetFilesSuccess(futures);
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
if ( errorResponseEntity != null ) {
return errorResponseEntity; // The related log is already shown.
}
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem();
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignments);
else {
if ( hasAttemptParquetFileProblem )
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignments);
else if ( hasPayloadParquetFileProblem )
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload\", for batch-assignments_" + curReportAssignments);
else
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files.");
}
// Load all the parquet files of each type into its table.
ImpalaConnector.databaseLock.lock();
errorMsgAttempts = parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt");
errorMsgPayloads = parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload");
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt");
if ( ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload");
ImpalaConnector.databaseLock.unlock();
if ( (errorMsgAttempts == null) && (errorMsgPayloads == null) )
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload\" table, for batch-assignments_" + curReportAssignments);
else
logger.warn("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or/and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// This is a very rare case. At the moment, we just move on with table-merging.
} catch (RuntimeException re) {
String errorMsg = re.getMessage();
ImpalaConnector.databaseLock.lock();
String assignmentErrorMsg = deleteWorkerAssignments(curWorkerId);
ImpalaConnector.databaseLock.unlock();
if ( assignmentErrorMsg != null ) {
errorMsg += "\n" + assignmentErrorMsg;
}
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} catch (Exception e) {
String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
@ -308,15 +344,7 @@ public class UrlController {
ImpalaConnector.databaseLock.lock();
if ( errorMsgPayloads == null ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
}
if ( errorMsgAttempts == null ) {
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
@ -324,10 +352,15 @@ public class UrlController {
}
}
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
if ( ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
}
mergeErrorMsg = deleteWorkerAssignments(curWorkerId);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
@ -343,6 +376,15 @@ public class UrlController {
}
private String deleteWorkerAssignments(String curWorkerId)
{
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
return fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
}
private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery)
{
String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
@ -382,6 +424,48 @@ public class UrlController {
}
private static SumParquetSuccess checkParquetFilesSuccess(List<Future<ParquetReport>> futures)
{
int numOfAllAttemptParquetFileCreations = 0;
int numOfFailedAttemptParquetFileCreations = 0;
int numOfAllPayloadParquetFileCreations = 0;
int numOfFailedPayloadParquetFileCreations = 0;
for ( Future<ParquetReport> future : futures )
{
ParquetReport parquetReport = null;
try {
parquetReport = future.get();
boolean hasProblems = (! parquetReport.isSuccessful());
ParquetReport.ParquetType parquetType = parquetReport.getParquetType();
if ( parquetType.equals(ParquetReport.ParquetType.attempt) ) {
numOfAllAttemptParquetFileCreations++;
if ( hasProblems )
numOfFailedAttemptParquetFileCreations++;
} else if ( parquetType.equals(ParquetReport.ParquetType.payload) ) {
numOfAllPayloadParquetFileCreations ++;
if ( hasProblems )
numOfFailedPayloadParquetFileCreations ++;
} else {
String errMsg = "An invalid \"ParquetReport.ParquetType\" was found: " + parquetType; // This should never happen, but anyway.
logger.error(errMsg);
return new SumParquetSuccess(false, false, ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errMsg));
}
} catch (Exception e) {
logger.error("", e);
// We do not know if the failed "future" refers to a "payload" or to a "attempt".
// So we cannot increase a specific counter. That's ok, the only drawback if that we may try to "load" the non-existent data and get an exception.
}
} // End-for
boolean hasAttemptParquetFileProblem = (numOfFailedAttemptParquetFileCreations == numOfAllAttemptParquetFileCreations);
boolean hasPayloadParquetFileProblem = (numOfFailedPayloadParquetFileCreations == numOfAllPayloadParquetFileCreations);
return new SumParquetSuccess(hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, null);
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {

View File

@ -0,0 +1,34 @@
package eu.openaire.urls_controller.models;
public class ParquetReport {
public enum ParquetType {attempt, payload};
private ParquetType parquetType;
private boolean successful;
public ParquetReport(ParquetType parquetType, boolean successful) {
this.parquetType = parquetType;
this.successful = successful;
}
public ParquetType getParquetType() {
return parquetType;
}
public boolean isSuccessful() {
return successful;
}
@Override
public String toString() {
return "ParquetReport{" +
"parquetType=" + parquetType +
", wasSuccessful=" + successful +
'}';
}
}

View File

@ -0,0 +1,41 @@
package eu.openaire.urls_controller.models;
import org.springframework.http.ResponseEntity;
public class SumParquetSuccess {
private boolean attemptParquetFileProblem;
private boolean payloadParquetFileProblem;
private ResponseEntity<?> responseEntity;
public SumParquetSuccess(boolean attemptParquetFileProblem, boolean payloadParquetFileProblem, ResponseEntity<?> responseEntity) {
this.attemptParquetFileProblem = attemptParquetFileProblem;
this.payloadParquetFileProblem = payloadParquetFileProblem;
this.responseEntity = responseEntity;
}
public boolean isAttemptParquetFileProblem() {
return attemptParquetFileProblem;
}
public boolean isPayloadParquetFileProblem() {
return payloadParquetFileProblem;
}
public ResponseEntity<?> getResponseEntity() {
return responseEntity;
}
@Override
public String toString() {
return "SumParquetSuccess{" +
"attemptParquetFileProblem=" + attemptParquetFileProblem +
", payloadParquetFileProblem=" + payloadParquetFileProblem +
", responseEntity=" + responseEntity +
'}';
}
}

View File

@ -8,7 +8,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@ -73,7 +72,7 @@ public class FileUtils {
jdbcTemplate.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
jdbcTemplate.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
jdbcTemplate.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
} catch (DataAccessException e) {
} catch (Exception e) {
errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
logger.error(errorMsg, e);
return errorMsg;

View File

@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.controllers.UrlController;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.ParquetReport;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import org.apache.avro.Schema;
@ -135,13 +136,13 @@ public class ParquetFileUtils {
}
public List<Callable<Void>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
{
// Split the "UrlReports" into some sub-lists.
List<List<UrlReport>> subLists;
// Pre-define the tasks to run.
List<Callable<Void>> callableTasks = new ArrayList<>(6);
List<Callable<ParquetReport>> callableTasks = new ArrayList<>(6);
// One thread will handle the inserts to the "payload" table and the others to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala).
int sizeOfEachSubList = (int)(sizeOUrlReports * 0.2);
@ -153,19 +154,17 @@ public class ParquetFileUtils {
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath);
return null;
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath));
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts.
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath);
return null;
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath));
});
}
if ( uploadFullTextsResponse != FileUtils.UploadFullTextsResponse.unsuccessful )
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful )
{
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
@ -173,8 +172,7 @@ public class ParquetFileUtils {
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
}
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath);
return null;
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath));
});
}
@ -188,7 +186,7 @@ public class ParquetFileUtils {
}
public void createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
{
List<GenericData.Record> recordList = new ArrayList<>();
GenericData.Record record;
@ -223,7 +221,7 @@ public class ParquetFileUtils {
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No attempts are available to be inserted to the database!");
return;
return false;
}
String fileName = UrlController.assignmentsBatchCounter.get() + "_attempts_" + attemptsIncNum + ".parquet";
@ -234,16 +232,17 @@ public class ParquetFileUtils {
//logger.debug("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "attempt" Impala table.
uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathAttempts);
// The possible error-message returned from the above method, is already logged by the Controller.
}
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathAttempts);
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
} else
return false;
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
}
public void createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
{
List<GenericData.Record> recordList = new ArrayList<>();
GenericData.Record record;
@ -282,7 +281,7 @@ public class ParquetFileUtils {
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No payloads are available to be inserted to the database!");
return;
return false;
}
String fileName = UrlController.assignmentsBatchCounter.get() + "_payloads.parquet";
@ -293,9 +292,10 @@ public class ParquetFileUtils {
//logger.debug("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "payload" Impala table.
uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathPayloads);
// The possible error-message returned from the above method, is already logged by the Controller.
}
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathPayloads);
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
} else
return false;
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
@ -424,17 +424,18 @@ public class ParquetFileUtils {
}
public String loadParquetDataIntoTable(String remoteParquetDataDirectory, String tableName)
public boolean loadParquetDataIntoTable(String remoteParquetDataDirectory, String tableName)
{
// Import the data from the parquet file into the database's table.
String loadParquetInTableQuery = "load data inpath '" + remoteParquetDataDirectory + "' into table " + ImpalaConnector.databaseName + "." + tableName;
try {
jdbcTemplate.execute(loadParquetInTableQuery);
} catch (Exception e) {
return ImpalaConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged.
ImpalaConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged.
return false;
}
//logger.debug("The data from \"" + remoteParquetDataDirectory + "\" was loaded into the " + tableName + " table."); // DEBUG!
return null;
return true;
}