
567 lines
32 KiB

package eu.openaire.urls_controller.controllers;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.ControllerConstants;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.sql.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
public class UrlController {
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + workerId;
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > ControllerConstants.ASSIGNMENTS_LIMIT ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + ControllerConstants.ASSIGNMENTS_LIMIT + "). Will use the Controller's limit.");
assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT;
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
"left outer join (\n" +
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
" union all\n" +
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + ControllerConstants.MAX_ATTEMPTS_PER_RECORD + " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry')\n" +
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit " + assignmentsLimit + ") as findAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
String createAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) { // This is already logged in "getConnection()".
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
// All transactions in Impala automatically commit at the end of the statement. Currently, Impala does not support multi-statement transactions.
// https://impala.apache.org/docs/build/html/topics/impala_transactions.html
// We cannot use "savePoints" along with "autoCommit = false" to roll back to a previous state among multiple statements.
PreparedStatement createCurrentAssignmentsPreparedStatement = null;
try {
createCurrentAssignmentsPreparedStatement = con.prepareStatement(createAssignmentsQuery);
// We cannot set the "limits" and the MAX_ATTEMPTS_PER_RECORD as preparedStatements parameters, as we get a "java.sql.SQLException: [Simba][JDBC](11420) Error, parameter metadata not populated."
} catch (SQLException sqle) {
String errorMsg = ImpalaConnector.handlePreparedStatementException("createAssignmentsQuery", createAssignmentsQuery, "createCurrentAssignmentsPreparedStatement", createCurrentAssignmentsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( createCurrentAssignmentsPreparedStatement != null )
} catch (SQLException sqle2) {
logger.error("Failed to close the \"createCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
PreparedStatement computeCurrentAssignmentsStatsPreparedStatement = null;
try {
computeCurrentAssignmentsStatsPreparedStatement = con.prepareStatement(computeCurrentAssignmentsStatsQuery);
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( computeCurrentAssignmentsStatsPreparedStatement != null )
} catch (SQLException sqle2) {
logger.error("Failed to close the \"computeCurrentAssignmentsStatsPreparedStatement\"!\n" + sqle2.getMessage());
PreparedStatement getAssignmentsPreparedStatement = null;
try {
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle);
// The "getAssignmentsPreparedStatement" will always be null here, so we do not close it.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) {
// Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it.
/*if ( !resultSet.first() ) {
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
// The cursor is automatically before the first element in this configuration.
while ( resultSet.next() ) {
// The following few lines, cannot be outside the "while" loop, since the same object is added, despite that we update the inner-values.
Assignment assignment = new Assignment();
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow(), sqle);
continue; // This object is broken, move to the next row.
} catch (Exception e) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
} catch (SQLException sqle) {
logger.error("Failed to close the \"getAssignmentsPreparedStatement\"!\n" + sqle.getMessage());
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId;
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
// The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party.
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
PreparedStatement insertAssignmentsPreparedStatement = null;
try {
insertAssignmentsPreparedStatement = con.prepareStatement(insertAssignmentsQuery);
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( insertAssignmentsPreparedStatement != null )
} catch (SQLException sqle2) {
logger.error("Failed to close the \"insertAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, "", null);
if ( mergeErrorMsg != null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
String curWorkerId = workerReport.getWorkerId();
if ( curWorkerId == null ) {
String errorMsg = "No \"workerId\" was included inside the \"WorkerReport\"!";
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(curWorkerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + curWorkerId;
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || urlReports.isEmpty() ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
if ( ! FileUtils.getAndUploadFullTexts(urlReports, con, request, curReportAssignments, curWorkerId) ) {
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
// The "databaseLock" was unlocked inside the "FileUtils.getAndUploadFullTexts" to avoid blocking the database while doing large irrelevant tasks like transferring files.
// Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, `date`, mimetype, size, `hash`, `location`, provenance) VALUES ";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, `date`, status, error_class, error_message) VALUES ";
String tempInsertQueryName = null;
Statement insertPayloadStatement = null, insertAttemptStatement = null;
try {
tempInsertQueryName = "insertIntoPayloadBaseQuery";
insertPayloadStatement = con.createStatement();
tempInsertQueryName = "insertIntoAttemptBaseQuery";
insertAttemptStatement = con.createStatement();
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the statement for \"" + tempInsertQueryName + "\"!\n";
logger.error(errorMsg + sqle.getMessage());
closeStatements(insertPayloadStatement, insertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
try {
con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end.
} catch (SQLException sqle) {
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n";
logger.error(errorMsg + sqle.getMessage());
closeStatements(insertPayloadStatement, insertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
String payloadErrorMsg = null;
int failedCount = 0;
// TODO - Think about handling this loop with multiple threads..
// The Impala-server will handle the synchronization itself..
// Check online what happens with "statement.setPoolable()" does it improves speed? in multi or also in single thread?
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.error("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments);
payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!";
try {
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
String stringSize = null;
Long size = payload.getSize();
if ( size != null )
stringSize = String.valueOf(size);
String insertIntoPayloadFullQuery = insertIntoPayloadBaseQuery + "('" + payload.getId() + "','" + payload.getOriginal_url() + "','" + payload.getActual_url() + "','"
+ payload.getTimestamp_acquired() + "','" + payload.getMime_type() + "','" + stringSize + "','" + payload.getHash() + "','"
+ payload.getLocation() + "','" + payload.getProvenance() + "')";
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoPayloadFullQuery\": " + sqle.getMessage() + "\n\n");
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
try {
String errorCause = error.getMessage();
if ( errorCause != null )
errorCause = StringUtils.replace(errorCause, "'", "\\'", -1); // Escape single quotes in the error-cause-message.
String insertIntoAttemptFullQuery = insertIntoAttemptBaseQuery + "('" + payload.getId() + "','" + payload.getOriginal_url() + "','"
+ payload.getTimestamp_acquired() + "','" + urlReport.getStatus().toString() + "','" + error.getType() + "','" + errorCause + "')";
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage() + "\n\n");
}//end for-loop
try {
con.commit(); // Commit all the insert-queries to the database (write them to disk).
} catch (SQLException sqle) {
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
closeStatements(insertPayloadStatement, insertAttemptStatement, null); // Do not close the connection here!
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
String mergeErrorMsg = FileUtils.mergeParquetFiles("payload", con, "", null);
if ( mergeErrorMsg != null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
mergeErrorMsg = FileUtils.mergeParquetFiles("attempt", con, "", null);
if ( mergeErrorMsg != null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We do not need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, " WHERE workerid != ", curWorkerId);
if ( mergeErrorMsg != null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
try {
con.commit(); // Apply the merges permanently (write them to disk).
con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool.
} catch (SQLException sqle) {
String errorMsg = "Problem when committing changes to the database!";
logger.error(errorMsg + "\n" + sqle.getMessage());
// The statements used in "mergeParquetFiles()" are already closed.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg);
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
for ( int i=1; i <= dataSize; ++i ) {
for ( int j=1; j <= numParamsPerRow; ++j ) {
if ( j < numParamsPerRow )
if ( i < dataSize )
PreparedStatement preparedInsertStatement;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con.prepareStatement(sb.toString());
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
logger.error(errorMsg + sqle.getMessage());
throw new RuntimeException(errorMsg);
return preparedInsertStatement;
private boolean closeStatements(Statement statement1, Statement statement2, Connection con) {
try {
if ( statement1 != null )
if ( statement2 != null )
if ( con != null )
con.close(); // It may have already closed and that's fine.
return true;
} catch (SQLException sqle) {
logger.error("Could not close the statements or the connection with the Impala-database.\n" + sqle.getMessage());
return false;
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
try {
new FileUtils(); // Find the input file.
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The resource file, for the requested assignments, was not found.");
List<Assignment> assignments = new ArrayList<>();
HashMultimap<String, String> loadedIdUrlPairs;
boolean isFirstRun = true;
boolean assignmentsLimitReached = false;
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
// Start loading urls.
while ( true ) {
loadedIdUrlPairs = FileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
if ( FileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
isFirstRun = false;
Set<Map.Entry<String, String>> pairs = loadedIdUrlPairs.entries();
for ( Map.Entry<String,String> pair : pairs )
if ( assignments.size() >= workerAssignmentsLimit ) {
assignmentsLimitReached = true;
int randomNum = GenericUtils.getRandomNumber(1, 5);
assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp));
}// end pairs-for-loop
if ( assignmentsLimitReached ) {
logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached.");
}// end loading-while-loop
Scanner scanner = FileUtils.inputScanner.get();
if ( scanner != null ) // Check if the initial value is null.
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
private String dropCurrentAssignmentTable(Connection con)
String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector.databaseName + ".current_assignment PURGE";
PreparedStatement dropCurrentAssignmentsPreparedStatement = null;
try {
dropCurrentAssignmentsPreparedStatement = con.prepareStatement(dropCurrentAssignmentsQuery);
return null;
} catch (SQLException sqle) {
return ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, "dropCurrentAssignmentsPreparedStatement", dropCurrentAssignmentsPreparedStatement, con, sqle);
} finally {
try {
if ( dropCurrentAssignmentsPreparedStatement != null )
} catch (SQLException sqle2) {
logger.error("Failed to close the \"dropCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());