Refactor the UrlsController: a) offload the business-logic to the dedicated "UrlsService" and b) move the "checkParquetFilesSuccess()"-method to "ParquetFileUtils".

This commit is contained in:
Lampros Smyrnaios 2023-02-21 15:36:35 +02:00
parent a1c16ffc19
commit 8893662a81
4 changed files with 473 additions and 413 deletions

View File

@ -1,36 +1,18 @@
package eu.openaire.urls_controller.controllers; package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport; import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; import eu.openaire.urls_controller.services.UrlsService;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -41,26 +23,14 @@ public class UrlController {
private static final Logger logger = LoggerFactory.getLogger(UrlController.class); private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
@Autowired @Autowired
private JdbcTemplate jdbcTemplate; private UrlsService urlsService;
@Autowired
private FileUtils fileUtils;
@Autowired
private ParquetFileUtils parquetFileUtils;
public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*"); private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
@Value("${services.pdfaggregation.controller.assignmentLimit}") @Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentLimit; private int assignmentLimit;
private final AtomicInteger maxAttemptsPerRecordAtomic;
public UrlController(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord) {
maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord);
}
@GetMapping("") @GetMapping("")
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) { public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
@ -74,8 +44,6 @@ public class UrlController {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit); logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server. // Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit; int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) { if ( assignmentsLimit == 0 ) {
@ -87,128 +55,10 @@ public class UrlController {
assignmentsLimit = assignmentLimit; assignmentsLimit = assignmentLimit;
} }
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" + return urlsService.getUrls(workerId, assignmentsLimit);
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count\n" +
"from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts\n" +
"on attempts.id=p.id\n" +
"left outer join (select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
"union all\n" +
"select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing\n" +
"on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() +
"\nand not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
"and pu.url != '' and pu.url is not null\n" + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
"limit " + (assignmentsLimit * 10) +
")\nas non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit " + assignmentsLimit +
"\n) as findAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
//logger.debug("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG!
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
ImpalaConnector.databaseLock.lock();
String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery);
if ( errorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
long timestampMillis = System.currentTimeMillis();
Timestamp timestamp = new Timestamp(timestampMillis); // Store it here, in order to have the same for all current records.
try {
jdbcTemplate.query(getAssignmentsQuery, rs -> {
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setTimestamp(timestamp);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
assignment.setId(rs.getString(1));
assignment.setOriginalUrl(rs.getString(2));
datasource.setId(rs.getString(3));
datasource.setName(rs.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
}
assignment.setDatasource(datasource);
assignments.add(assignment);
});
} catch (Exception e) {
errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.";
logger.error(errorMsg);
String tmpErrMsg = dropCurrentAssignmentTable();
ImpalaConnector.databaseLock.unlock();
if ( tmpErrMsg != null ) {
errorMsg += "\n" + tmpErrMsg; // The additional error-msg is already logged.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} else
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body(new AssignmentsResponse((long) -1, null));
} else if ( assignmentsSize < assignmentsLimit ) {
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.");
}
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', " + timestampMillis + "\n"
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
try {
jdbcTemplate.execute(insertAssignmentsQuery);
} catch (Exception e) {
errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
errorMsg = dropCurrentAssignmentTable();
if ( errorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.databaseLock.unlock();
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
} }
public static final ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
@PostMapping("addWorkerReport") @PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) { public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
@ -243,257 +93,7 @@ public class UrlController {
long curReportAssignments = workerReport.getAssignmentRequestCounter(); long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database."); logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". return urlsService.addWorkerReport(curWorkerId, curReportAssignments, urlReports, sizeOUrlReports, request);
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
}
else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments);
String currentParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignments + File.separator;
java.nio.file.Path parquetDirPath = Paths.get(currentParquetPath);
if ( !Files.isDirectory(parquetDirPath) ) {
try {
Files.createDirectories(parquetDirPath);
} catch (Exception e) {
logger.error("", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage());
}
}
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments);
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOUrlReports, curReportAssignments, currentParquetPath, uploadFullTextsResponse);
boolean hasAttemptParquetFileProblem = false;
boolean hasPayloadParquetFileProblem = false;
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
List<Future<ParquetReport>> futures = insertsExecutor.invokeAll(callableTasks);
SumParquetSuccess sumParquetSuccess = checkParquetFilesSuccess(futures);
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
if ( errorResponseEntity != null ) {
return errorResponseEntity; // The related log is already shown.
}
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem();
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignments);
else {
if ( hasAttemptParquetFileProblem )
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignments);
else if ( hasPayloadParquetFileProblem )
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload\", for batch-assignments_" + curReportAssignments);
else
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files.");
}
// Load all the parquet files of each type into its table.
ImpalaConnector.databaseLock.lock();
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt");
if ( ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload");
ImpalaConnector.databaseLock.unlock();
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload\" table, for batch-assignments_" + curReportAssignments);
else
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// This is a very rare case. At the moment, we just move on with table-merging.
} catch (RuntimeException re) {
String errorMsg = re.getMessage();
ImpalaConnector.databaseLock.lock();
String assignmentErrorMsg = deleteWorkerAssignments(curWorkerId);
ImpalaConnector.databaseLock.unlock();
if ( assignmentErrorMsg != null ) {
errorMsg += "\n" + assignmentErrorMsg;
}
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} catch (Exception e) {
String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
logger.debug("Deleting parquet directory: " + currentParquetPath);
FileUtils.deleteDirectory(new File(currentParquetPath));
}
logger.debug("Going to merge the parquet files for the tables which were altered.");
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
String mergeErrorMsg;
ImpalaConnector.databaseLock.lock();
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
}
if ( ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
}
mergeErrorMsg = deleteWorkerAssignments(curWorkerId);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.databaseLock.unlock();
logger.debug("Finished merging the database tables.");
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful )
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body("The full-text files failed to be acquired from the worker!");
else
return ResponseEntity.status(HttpStatus.OK).build();
}
private String deleteWorkerAssignments(String curWorkerId)
{
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
return fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
}
private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery)
{
String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
try {
jdbcTemplate.execute(createCurrentAssignmentsQuery);
} catch (Exception e) {
String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
return errorMsg;
}
try {
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
} catch (Exception e) {
String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
return errorMsg;
}
return null; // All good.
}
private String dropCurrentAssignmentTable() {
String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE";
try {
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
return null; // All good. No error-message.
} catch (Exception e) {
return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e); // The error is already logged inside.
}
}
private static SumParquetSuccess checkParquetFilesSuccess(List<Future<ParquetReport>> futures)
{
int numOfAllAttemptParquetFileCreations = 0;
int numOfFailedAttemptParquetFileCreations = 0;
int numOfAllPayloadParquetFileCreations = 0;
int numOfFailedPayloadParquetFileCreations = 0;
for ( Future<ParquetReport> future : futures )
{
ParquetReport parquetReport = null;
try {
parquetReport = future.get();
boolean hasProblems = (! parquetReport.isSuccessful());
ParquetReport.ParquetType parquetType = parquetReport.getParquetType();
if ( parquetType.equals(ParquetReport.ParquetType.attempt) ) {
numOfAllAttemptParquetFileCreations++;
if ( hasProblems )
numOfFailedAttemptParquetFileCreations++;
} else if ( parquetType.equals(ParquetReport.ParquetType.payload) ) {
numOfAllPayloadParquetFileCreations ++;
if ( hasProblems )
numOfFailedPayloadParquetFileCreations ++;
} else {
String errMsg = "An invalid \"ParquetReport.ParquetType\" was found: " + parquetType; // This should never happen, but anyway.
logger.error(errMsg);
return new SumParquetSuccess(false, false, ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errMsg));
}
} catch (Exception e) {
logger.error("", e);
// We do not know if the failed "future" refers to a "payload" or to a "attempt".
// So we cannot increase a specific counter. That's ok, the only drawback if that we may try to "load" the non-existent data and get an exception.
}
} // End-for
boolean hasAttemptParquetFileProblem = (numOfFailedAttemptParquetFileCreations == numOfAllAttemptParquetFileCreations);
boolean hasPayloadParquetFileProblem = (numOfFailedPayloadParquetFileCreations == numOfAllPayloadParquetFileCreations);
return new SumParquetSuccess(hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, null);
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for ( int i=1; i <= dataSize; ++i ) {
sb.append("(");
for ( int j=1; j <= numParamsPerRow; ++j ) {
sb.append("?");
if ( j < numParamsPerRow )
sb.append(",");
}
sb.append(")");
if ( i < dataSize )
sb.append(",");
}
PreparedStatement preparedInsertStatement;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con.prepareStatement(sb.toString());
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
logger.error(errorMsg + sqle.getMessage());
throw new RuntimeException(errorMsg);
}
return preparedInsertStatement;
} }
} }

View File

@ -0,0 +1,15 @@
package eu.openaire.urls_controller.services;
import eu.openaire.urls_controller.models.UrlReport;
import org.springframework.http.ResponseEntity;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
public interface UrlsService {
ResponseEntity<?> getUrls(String workerId, int assignmentsLimit);
ResponseEntity<?> addWorkerReport(String curWorkerId, long curReportAssignments, List<UrlReport> urlReports, int sizeOfUrlReports, HttpServletRequest request);
}

View File

@ -0,0 +1,402 @@
package eu.openaire.urls_controller.services;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Service
public class UrlsServiceImpl implements UrlsService {
private static final Logger logger = LoggerFactory.getLogger(UrlsServiceImpl.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private FileUtils fileUtils;
@Autowired
private ParquetFileUtils parquetFileUtils;
public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private final AtomicInteger maxAttemptsPerRecordAtomic;
public static final ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
public UrlsServiceImpl(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord) {
maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord);
}
public ResponseEntity<?> getUrls(String workerId, int assignmentsLimit)
{
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count\n" +
"from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts\n" +
"on attempts.id=p.id\n" +
"left outer join (select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
"union all\n" +
"select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing\n" +
"on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() +
"\nand not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
"and pu.url != '' and pu.url is not null\n" + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
"limit " + (assignmentsLimit * 10) +
")\nas non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit " + assignmentsLimit +
"\n) as findAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
//logger.debug("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG!
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
ImpalaConnector.databaseLock.lock();
String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery);
if ( errorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
long timestampMillis = System.currentTimeMillis();
Timestamp timestamp = new Timestamp(timestampMillis); // Store it here, in order to have the same for all current records.
try {
jdbcTemplate.query(getAssignmentsQuery, rs -> {
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setTimestamp(timestamp);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
assignment.setId(rs.getString(1));
assignment.setOriginalUrl(rs.getString(2));
datasource.setId(rs.getString(3));
datasource.setName(rs.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
}
assignment.setDatasource(datasource);
assignments.add(assignment);
});
} catch (Exception e) {
errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.";
logger.error(errorMsg);
String tmpErrMsg = dropCurrentAssignmentTable();
ImpalaConnector.databaseLock.unlock();
if ( tmpErrMsg != null ) {
errorMsg += "\n" + tmpErrMsg; // The additional error-msg is already logged.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} else
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body(new AssignmentsResponse((long) -1, null));
} else if ( assignmentsSize < assignmentsLimit ) {
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.");
}
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', " + timestampMillis + "\n"
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
try {
jdbcTemplate.execute(insertAssignmentsQuery);
} catch (Exception e) {
errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
errorMsg = dropCurrentAssignmentTable();
if ( errorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.databaseLock.unlock();
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
public ResponseEntity<?> addWorkerReport(String curWorkerId, long curReportAssignments, List<UrlReport> urlReports, int sizeOfUrlReports, HttpServletRequest request)
{
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
}
else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments);
String currentParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignments + File.separator;
java.nio.file.Path parquetDirPath = Paths.get(currentParquetPath);
if ( !Files.isDirectory(parquetDirPath) ) {
try {
Files.createDirectories(parquetDirPath);
} catch (Exception e) {
logger.error("", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage());
}
}
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments);
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignments, currentParquetPath, uploadFullTextsResponse);
boolean hasAttemptParquetFileProblem = false;
boolean hasPayloadParquetFileProblem = false;
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
List<Future<ParquetReport>> futures = insertsExecutor.invokeAll(callableTasks);
SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures);
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
if ( errorResponseEntity != null ) {
return errorResponseEntity; // The related log is already shown.
}
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem();
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignments);
else {
if ( hasAttemptParquetFileProblem )
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignments);
else if ( hasPayloadParquetFileProblem )
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload\", for batch-assignments_" + curReportAssignments);
else
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files.");
}
// Load all the parquet files of each type into its table.
ImpalaConnector.databaseLock.lock();
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt");
if ( ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload");
ImpalaConnector.databaseLock.unlock();
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload\" table, for batch-assignments_" + curReportAssignments);
else
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// This is a very rare case. At the moment, we just move on with table-merging.
} catch (RuntimeException re) {
String errorMsg = re.getMessage();
ImpalaConnector.databaseLock.lock();
String assignmentErrorMsg = deleteWorkerAssignments(curWorkerId);
ImpalaConnector.databaseLock.unlock();
if ( assignmentErrorMsg != null ) {
errorMsg += "\n" + assignmentErrorMsg;
}
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} catch (Exception e) {
String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
logger.debug("Deleting parquet directory: " + currentParquetPath);
FileUtils.deleteDirectory(new File(currentParquetPath));
}
logger.debug("Going to merge the parquet files for the tables which were altered.");
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
String mergeErrorMsg;
ImpalaConnector.databaseLock.lock();
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
}
if ( ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
}
mergeErrorMsg = deleteWorkerAssignments(curWorkerId);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.databaseLock.unlock();
logger.debug("Finished merging the database tables.");
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful )
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body("The full-text files failed to be acquired from the worker!");
else
return ResponseEntity.status(HttpStatus.OK).build();
}
private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery)
{
String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
try {
jdbcTemplate.execute(createCurrentAssignmentsQuery);
} catch (Exception e) {
String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
return errorMsg;
}
try {
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
} catch (Exception e) {
String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
return errorMsg;
}
return null; // All good.
}
private String dropCurrentAssignmentTable() {
String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE";
try {
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
return null; // All good. No error-message.
} catch (Exception e) {
return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e); // The error is already logged inside.
}
}
private String deleteWorkerAssignments(String curWorkerId)
{
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
return fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for ( int i=1; i <= dataSize; ++i ) {
sb.append("(");
for ( int j=1; j <= numParamsPerRow; ++j ) {
sb.append("?");
if ( j < numParamsPerRow )
sb.append(",");
}
sb.append(")");
if ( i < dataSize )
sb.append(",");
}
PreparedStatement preparedInsertStatement;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con.prepareStatement(sb.toString());
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
logger.error(errorMsg + sqle.getMessage());
throw new RuntimeException(errorMsg);
}
return preparedInsertStatement;
}
}

View File

@ -2,11 +2,9 @@ package eu.openaire.urls_controller.util;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.controllers.UrlController;
import eu.openaire.urls_controller.models.Error; import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.ParquetReport; import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.models.Payload; import eu.openaire.urls_controller.services.UrlsServiceImpl;
import eu.openaire.urls_controller.models.UrlReport;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
@ -26,6 +24,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -43,6 +43,7 @@ import java.util.ArrayList;
import java.util.Base64; import java.util.Base64;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Future;
@Component @Component
public class ParquetFileUtils { public class ParquetFileUtils {
@ -139,7 +140,7 @@ public class ParquetFileUtils {
} }
public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse) public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
{ {
// Split the "UrlReports" into some sub-lists. // Split the "UrlReports" into some sub-lists.
List<List<UrlReport>> subLists; List<List<UrlReport>> subLists;
@ -148,7 +149,7 @@ public class ParquetFileUtils {
List<Callable<ParquetReport>> callableTasks = new ArrayList<>(6); List<Callable<ParquetReport>> callableTasks = new ArrayList<>(6);
// One thread will handle the inserts to the "payload" table and the others to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala). // One thread will handle the inserts to the "payload" table and the others to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala).
int sizeOfEachSubList = (int)(sizeOUrlReports * 0.2); int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.2);
if ( sizeOfEachSubList > 10 ) if ( sizeOfEachSubList > 10 )
{ {
subLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >. subLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
@ -230,7 +231,7 @@ public class ParquetFileUtils {
return false; return false;
} }
String fileName = UrlController.assignmentsBatchCounter.get() + "_attempts_" + attemptsIncNum + ".parquet"; String fileName = UrlsServiceImpl.assignmentsBatchCounter.get() + "_attempts_" + attemptsIncNum + ".parquet";
//logger.debug("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG! //logger.debug("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = currentParquetPath + fileName; String fullFilePath = currentParquetPath + fileName;
@ -290,7 +291,7 @@ public class ParquetFileUtils {
return false; return false;
} }
String fileName = UrlController.assignmentsBatchCounter.get() + "_payloads.parquet"; String fileName = UrlsServiceImpl.assignmentsBatchCounter.get() + "_payloads.parquet";
//logger.debug("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG! //logger.debug("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = currentParquetPath + fileName; String fullFilePath = currentParquetPath + fileName;
@ -577,6 +578,48 @@ public class ParquetFileUtils {
} }
public SumParquetSuccess checkParquetFilesSuccess(List<Future<ParquetReport>> futures)
{
int numOfAllAttemptParquetFileCreations = 0;
int numOfFailedAttemptParquetFileCreations = 0;
int numOfAllPayloadParquetFileCreations = 0;
int numOfFailedPayloadParquetFileCreations = 0;
for ( Future<ParquetReport> future : futures )
{
ParquetReport parquetReport = null;
try {
parquetReport = future.get();
boolean hasProblems = (! parquetReport.isSuccessful());
ParquetReport.ParquetType parquetType = parquetReport.getParquetType();
if ( parquetType.equals(ParquetReport.ParquetType.attempt) ) {
numOfAllAttemptParquetFileCreations++;
if ( hasProblems )
numOfFailedAttemptParquetFileCreations++;
} else if ( parquetType.equals(ParquetReport.ParquetType.payload) ) {
numOfAllPayloadParquetFileCreations ++;
if ( hasProblems )
numOfFailedPayloadParquetFileCreations ++;
} else {
String errMsg = "An invalid \"ParquetReport.ParquetType\" was found: " + parquetType; // This should never happen, but anyway.
logger.error(errMsg);
return new SumParquetSuccess(false, false, ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errMsg));
}
} catch (Exception e) {
logger.error("", e);
// We do not know if the failed "future" refers to a "payload" or to a "attempt".
// So we cannot increase a specific counter. That's ok, the only drawback if that we may try to "load" the non-existent data and get an exception.
}
} // End-for
boolean hasAttemptParquetFileProblem = (numOfFailedAttemptParquetFileCreations == numOfAllAttemptParquetFileCreations);
boolean hasPayloadParquetFileProblem = (numOfFailedPayloadParquetFileCreations == numOfAllPayloadParquetFileCreations);
return new SumParquetSuccess(hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, null);
}
// Use this if we decide to delete undeleted files (probably due to failed "load" attempts). For now, it's better to leave them there, in order to fix potential problems more easily. // Use this if we decide to delete undeleted files (probably due to failed "load" attempts). For now, it's better to leave them there, in order to fix potential problems more easily.
public String deleteFileFromHDFS(String fileLocation, String parquetFileName) throws Exception public String deleteFileFromHDFS(String fileLocation, String parquetFileName) throws Exception
{ {