forked from lsmyrnaios/UrlsController
- Fix not using actual the "currentAssignmentsBatch" of the workerReport itself, when creating the parquetFileNames and when reporting to the user the initialization of the "addition of the workerReport".
- Code polishing.
This commit is contained in:
parent
0f4b63c4a9
commit
4c3e2e6b6e
|
@ -242,46 +242,46 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
|
|
||||||
|
|
||||||
@Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.")
|
@Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.")
|
||||||
public Boolean addWorkerReport(String curWorkerId, long curReportAssignments, List<UrlReport> urlReports, int sizeOfUrlReports)
|
public Boolean addWorkerReport(String curWorkerId, long curReportAssignmentsCounter, List<UrlReport> urlReports, int sizeOfUrlReports)
|
||||||
{
|
{
|
||||||
logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + assignmentsBatchCounter);
|
logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter);
|
||||||
|
|
||||||
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
|
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
|
||||||
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, curReportAssignments, curWorkerId);
|
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, curReportAssignmentsCounter, curWorkerId);
|
||||||
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
|
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, "Problem with the Impala-database!");
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Problem with the Impala-database!");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
|
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
|
||||||
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignments);
|
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter);
|
||||||
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
|
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
|
||||||
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
|
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
|
||||||
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
|
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
|
||||||
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
|
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
|
||||||
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
|
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
|
||||||
} else
|
} else
|
||||||
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments);
|
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignmentsCounter);
|
||||||
|
|
||||||
String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignments + File.separator;
|
String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignmentsCounter + File.separator;
|
||||||
try {
|
try {
|
||||||
Files.createDirectories(Paths.get(localParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException"
|
Files.createDirectories(Paths.get(localParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException"
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errorMsg = "Could not create the parquet-directory: " + localParquetPath;
|
String errorMsg = "Could not create the parquet-directory: " + localParquetPath;
|
||||||
logger.error(errorMsg, e);
|
logger.error(errorMsg, e);
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg);
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments);
|
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignmentsCounter);
|
||||||
|
|
||||||
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignments, localParquetPath, uploadFullTextsResponse);
|
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, localParquetPath, uploadFullTextsResponse);
|
||||||
|
|
||||||
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
|
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
|
||||||
String endingMkDirAndParams = curReportAssignments + "/" + parquetFileUtils.mkDirsAndParams;
|
String endingMkDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.mkDirsAndParams;
|
||||||
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
|
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
|
||||||
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams) )
|
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams) )
|
||||||
{
|
{
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignments);
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignmentsCounter);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,19 +294,19 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures);
|
SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures);
|
||||||
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
|
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
|
||||||
if ( errorResponseEntity != null ) { // The related log is already shown.
|
if ( errorResponseEntity != null ) { // The related log is already shown.
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, "Error when creating or uploading the parquet files!");
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating or uploading the parquet files!");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
|
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
|
||||||
hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem();
|
hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem();
|
||||||
|
|
||||||
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
|
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
|
||||||
throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignments);
|
throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignmentsCounter);
|
||||||
else {
|
else {
|
||||||
if ( hasAttemptParquetFileProblem )
|
if ( hasAttemptParquetFileProblem )
|
||||||
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignments);
|
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignmentsCounter);
|
||||||
else if ( hasPayloadParquetFileProblem )
|
else if ( hasPayloadParquetFileProblem )
|
||||||
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignments);
|
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter);
|
||||||
else
|
else
|
||||||
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files.");
|
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files.");
|
||||||
}
|
}
|
||||||
|
@ -315,19 +315,19 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
ImpalaConnector.databaseLock.lock();
|
ImpalaConnector.databaseLock.lock();
|
||||||
|
|
||||||
if ( ! hasAttemptParquetFileProblem )
|
if ( ! hasAttemptParquetFileProblem )
|
||||||
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignments + "/", "attempt");
|
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/", "attempt");
|
||||||
|
|
||||||
if ( ! hasPayloadParquetFileProblem )
|
if ( ! hasPayloadParquetFileProblem )
|
||||||
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/", "payload_aggregated");
|
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignmentsCounter + "/", "payload_aggregated");
|
||||||
|
|
||||||
ImpalaConnector.databaseLock.unlock();
|
ImpalaConnector.databaseLock.unlock();
|
||||||
|
|
||||||
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
|
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
|
||||||
throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignments);
|
throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter);
|
||||||
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
|
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
|
||||||
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignments);
|
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignmentsCounter);
|
||||||
else
|
else
|
||||||
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignments);
|
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter);
|
||||||
|
|
||||||
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
|
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
|
||||||
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
|
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
|
||||||
|
@ -340,22 +340,22 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
if ( assignmentErrorMsg != null )
|
if ( assignmentErrorMsg != null )
|
||||||
errorMsg += "\n" + assignmentErrorMsg;
|
errorMsg += "\n" + assignmentErrorMsg;
|
||||||
logger.error(errorMsg);
|
logger.error(errorMsg);
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg);
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);
|
||||||
return false;
|
return false;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage();
|
String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage();
|
||||||
logger.error(errorMsg, e);
|
logger.error(errorMsg, e);
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg);
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);
|
||||||
return false; // No tables-merging is happening.
|
return false; // No tables-merging is happening.
|
||||||
} finally {
|
} finally {
|
||||||
logger.debug("Deleting parquet directory: " + localParquetPath);
|
logger.debug("Deleting parquet directory: " + localParquetPath);
|
||||||
fileUtils.deleteDirectory(new File(localParquetPath));
|
fileUtils.deleteDirectory(new File(localParquetPath));
|
||||||
// Delete the HDFS subDirs for this Report.
|
// Delete the HDFS subDirs for this Report.
|
||||||
String endingRmDirAndParams = curReportAssignments + "/" + parquetFileUtils.rmDirsAndParams;
|
String endingRmDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.rmDirsAndParams;
|
||||||
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
|
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
|
||||||
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams) )
|
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams) )
|
||||||
{
|
{
|
||||||
logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignments); // A directory-specific log has already appeared.
|
logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); // A directory-specific log has already appeared.
|
||||||
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
|
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
|
||||||
// The worst case is that a few subDirs will be left back in the HDFS, although, without their parquetFiles, since they have already moved inside the DB tables.
|
// The worst case is that a few subDirs will be left back in the HDFS, although, without their parquetFiles, since they have already moved inside the DB tables.
|
||||||
}
|
}
|
||||||
|
@ -372,7 +372,7 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
|
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
|
||||||
if ( mergeErrorMsg != null ) {
|
if ( mergeErrorMsg != null ) {
|
||||||
ImpalaConnector.databaseLock.unlock();
|
ImpalaConnector.databaseLock.unlock();
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg);
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -381,7 +381,7 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
|
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
|
||||||
if ( mergeErrorMsg != null ) {
|
if ( mergeErrorMsg != null ) {
|
||||||
ImpalaConnector.databaseLock.unlock();
|
ImpalaConnector.databaseLock.unlock();
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg);
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -389,7 +389,7 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
mergeErrorMsg = deleteWorkerAssignments(curWorkerId);
|
mergeErrorMsg = deleteWorkerAssignments(curWorkerId);
|
||||||
if ( mergeErrorMsg != null ) {
|
if ( mergeErrorMsg != null ) {
|
||||||
ImpalaConnector.databaseLock.unlock();
|
ImpalaConnector.databaseLock.unlock();
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg);
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,12 +397,12 @@ public class UrlsServiceImpl implements UrlsService {
|
||||||
|
|
||||||
logger.debug("Finished merging the database tables.");
|
logger.debug("Finished merging the database tables.");
|
||||||
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
|
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, "The full-text files failed to be acquired from the worker!");
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "The full-text files failed to be acquired from the worker!");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify the Worker that the processing of this report was successful, so that the Worker can delete the files.
|
// Notify the Worker that the processing of this report was successful, so that the Worker can delete the files.
|
||||||
postReportResultToWorker(curWorkerId, curReportAssignments, null);
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, null);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -205,7 +205,7 @@ public class ParquetFileUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String localParquetPath)
|
public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignmentsCounter, String localParquetPath)
|
||||||
{
|
{
|
||||||
List<GenericData.Record> recordList = new ArrayList<>(urlReports.size());
|
List<GenericData.Record> recordList = new ArrayList<>(urlReports.size());
|
||||||
GenericData.Record record;
|
GenericData.Record record;
|
||||||
|
@ -213,7 +213,7 @@ public class ParquetFileUtils {
|
||||||
for ( UrlReport urlReport : urlReports ) {
|
for ( UrlReport urlReport : urlReports ) {
|
||||||
Payload payload = urlReport.getPayload();
|
Payload payload = urlReport.getPayload();
|
||||||
if ( payload == null ) {
|
if ( payload == null ) {
|
||||||
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
|
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + "\n" + urlReport);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,7 +244,7 @@ public class ParquetFileUtils {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
String fileName = UrlsServiceImpl.assignmentsBatchCounter.get() + "_attempts_" + attemptsIncNum + ".parquet";
|
String fileName = curReportAssignmentsCounter + "_attempts_" + attemptsIncNum + ".parquet";
|
||||||
//logger.trace("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG!
|
//logger.trace("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG!
|
||||||
|
|
||||||
String fullFilePath = localParquetPath + fileName;
|
String fullFilePath = localParquetPath + fileName;
|
||||||
|
@ -252,7 +252,7 @@ public class ParquetFileUtils {
|
||||||
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
|
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
|
||||||
|
|
||||||
// Upload and insert the data to the "attempt" Impala table.
|
// Upload and insert the data to the "attempt" Impala table.
|
||||||
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + curReportAssignments + "/"));
|
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/"));
|
||||||
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
|
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
|
||||||
} else
|
} else
|
||||||
return false;
|
return false;
|
||||||
|
@ -262,7 +262,7 @@ public class ParquetFileUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String localParquetPath, String parquetHDFSDirectoryPathPayloads)
|
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignmentsCounter, String localParquetPath, String parquetHDFSDirectoryPathPayloads)
|
||||||
{
|
{
|
||||||
List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2));
|
List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2));
|
||||||
GenericData.Record record;
|
GenericData.Record record;
|
||||||
|
@ -271,7 +271,7 @@ public class ParquetFileUtils {
|
||||||
{
|
{
|
||||||
Payload payload = urlReport.getPayload();
|
Payload payload = urlReport.getPayload();
|
||||||
if ( payload == null ) {
|
if ( payload == null ) {
|
||||||
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
|
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + "\n" + urlReport);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,7 +304,7 @@ public class ParquetFileUtils {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
String fileName = UrlsServiceImpl.assignmentsBatchCounter.get() + "_payloads.parquet";
|
String fileName = curReportAssignmentsCounter + "_payloads.parquet";
|
||||||
//logger.trace("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG!
|
//logger.trace("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG!
|
||||||
|
|
||||||
String fullFilePath = localParquetPath + fileName;
|
String fullFilePath = localParquetPath + fileName;
|
||||||
|
|
Loading…
Reference in New Issue