forked from lsmyrnaios/UrlsController
When all the data is processed, increase the number of "max-attempts" to retry some very old records, in the next requests.
This commit is contained in:
parent
3c9f8870d1
commit
a01e11eef0
|
@ -30,6 +30,7 @@ public class UrlController {
|
||||||
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
|
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
|
||||||
|
|
||||||
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
|
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
|
||||||
|
private static int maxAttemptsPerRecord = ControllerConstants.MAX_ATTEMPTS_PER_RECORD;
|
||||||
|
|
||||||
|
|
||||||
@GetMapping("")
|
@GetMapping("")
|
||||||
|
@ -68,7 +69,7 @@ public class UrlController {
|
||||||
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
|
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
|
||||||
" union all\n" +
|
" union all\n" +
|
||||||
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
|
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
|
||||||
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + ControllerConstants.MAX_ATTEMPTS_PER_RECORD + " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
|
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecord + " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
|
||||||
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
|
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
|
||||||
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
|
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
|
||||||
"limit " + assignmentsLimit + ") as findAssignmentsQuery";
|
"limit " + assignmentsLimit + ") as findAssignmentsQuery";
|
||||||
|
@ -156,7 +157,8 @@ public class UrlController {
|
||||||
}*/
|
}*/
|
||||||
|
|
||||||
// The cursor is automatically before the first element in this configuration.
|
// The cursor is automatically before the first element in this configuration.
|
||||||
while ( resultSet.next() ) {
|
while ( resultSet.next() ) { // Move the cursor forward.
|
||||||
|
// If the resultsSet is empty, then the control will never get inside the loop.
|
||||||
// The following few lines, cannot be outside the "while" loop, since the same object is added, despite that we update the inner-values.
|
// The following few lines, cannot be outside the "while" loop, since the same object is added, despite that we update the inner-values.
|
||||||
Assignment assignment = new Assignment();
|
Assignment assignment = new Assignment();
|
||||||
assignment.setWorkerId(workerId);
|
assignment.setWorkerId(workerId);
|
||||||
|
@ -197,10 +199,14 @@ public class UrlController {
|
||||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||||
ImpalaConnector.databaseLock.unlock();
|
ImpalaConnector.databaseLock.unlock();
|
||||||
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId;
|
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
|
||||||
|
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests.";
|
||||||
logger.error(errorMsg);
|
logger.error(errorMsg);
|
||||||
ImpalaConnector.closeConnection(con);
|
ImpalaConnector.closeConnection(con);
|
||||||
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
|
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
|
||||||
|
} else if ( assignmentsSize < assignmentsLimit ) {
|
||||||
|
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
|
||||||
|
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests.");
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
|
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
|
||||||
|
|
Loading…
Reference in New Issue