You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
UrlsController/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java

496 lines
28 KiB
Java

package eu.openaire.urls_controller.controllers;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.sql.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
@RestController
@RequestMapping("/urls")
public class UrlController {
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
@GetMapping("")
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > ControllerConstants.ASSIGNMENTS_LIMIT ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + ControllerConstants.ASSIGNMENTS_LIMIT + "). Will use the Controller's limit.");
assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT;
}
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
"left outer join (\n" +
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
" union all\n" +
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + ControllerConstants.MAX_ATTEMPTS_PER_RECORD + " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry')\n" +
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit " + assignmentsLimit + ") as findAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
String createAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) { // This is already logged in "getConnection()".
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
PreparedStatement createCurrentAssignmentsPreparedStatement = null;
try {
createCurrentAssignmentsPreparedStatement = con.prepareStatement(createAssignmentsQuery);
createCurrentAssignmentsPreparedStatement.execute();
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = ImpalaConnector.handlePreparedStatementException("createAssignmentsQuery", createAssignmentsQuery, "createCurrentAssignmentsPreparedStatement", createCurrentAssignmentsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( createCurrentAssignmentsPreparedStatement != null )
createCurrentAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"createCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
PreparedStatement computeCurrentAssignmentsStatsPreparedStatement = null;
try {
computeCurrentAssignmentsStatsPreparedStatement = con.prepareStatement(computeCurrentAssignmentsStatsQuery);
computeCurrentAssignmentsStatsPreparedStatement.execute();
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( computeCurrentAssignmentsStatsPreparedStatement != null )
computeCurrentAssignmentsStatsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"computeCurrentAssignmentsStatsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
PreparedStatement getAssignmentsPreparedStatement = null;
try {
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle);
// The "getAssignmentsPreparedStatement" will always be null here, so we do not close it.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) {
// Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it.
/*if ( !resultSet.first() ) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}*/
// The cursor is automatically before the first element in this configuration.
while ( resultSet.next() ) {
// The following few lines, cannot be outside the "while" loop, since the same record is returned, despite that we update the inner-values.
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setTimestamp(timestamp);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
assignment.setId(resultSet.getString(1));
assignment.setOriginalUrl(resultSet.getString(2));
datasource.setId(resultSet.getString(3));
datasource.setName(resultSet.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow(), sqle);
continue; // This object is broken, move to the next row.
}
assignment.setDatasource(datasource);
assignments.add(assignment);
}
} catch (Exception e) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
logger.error(errorMsg, e);
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
getAssignmentsPreparedStatement.close();
} catch (SQLException sqle) {
logger.error("Failed to close the \"getAssignmentsPreparedStatement\"!\n" + sqle.getMessage());
}
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
PreparedStatement insertAssignmentsPreparedStatement = null;
try {
insertAssignmentsPreparedStatement = con.prepareStatement(insertAssignmentsQuery);
insertAssignmentsPreparedStatement.execute();
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( insertAssignmentsPreparedStatement != null )
insertAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"insertAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector.databaseName + ".current_assignment PURGE";
PreparedStatement dropCurrentAssignmentsPreparedStatement = null;
try {
dropCurrentAssignmentsPreparedStatement = con.prepareStatement(dropCurrentAssignmentsQuery);
dropCurrentAssignmentsPreparedStatement.execute();
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, "dropCurrentAssignmentsPreparedStatement", dropCurrentAssignmentsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( dropCurrentAssignmentsPreparedStatement != null )
dropCurrentAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"dropCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.closeConnection(con);
ImpalaConnector.databaseLock.unlock();
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || urlReports.isEmpty() ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + workerReport.getWorkerId() + "\" was empty!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + workerReport.getWorkerId() + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
if ( ! FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, workerReport.getWorkerId()) ) {
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
}
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
// Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
String tempInsertQueryName = null;
PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null;
try {
tempInsertQueryName = "insertIntoPayloadBaseQuery";
preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery);
tempInsertQueryName = "insertIntoAttemptBaseQuery";
preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery);
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n";
logger.error(errorMsg + sqle.getMessage());
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
try {
con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end.
} catch (SQLException sqle) {
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
String payloadErrorMsg = null;
int failedCount = 0;
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.error("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments);
payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!";
continue;
}
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertPayloadStatement.setString(1, payload.getId());
preparedInsertPayloadStatement.setString(2, payload.getOriginal_url());
preparedInsertPayloadStatement.setString(3, payload.getActual_url());
preparedInsertPayloadStatement.setTimestamp(4, payload.getTimestamp_acquired());
preparedInsertPayloadStatement.setString(5, payload.getMime_type());
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
String stringSize = null;
Long size = payload.getSize();
if ( size != null )
stringSize = String.valueOf(size);
preparedInsertPayloadStatement.setString(6, stringSize);
preparedInsertPayloadStatement.setString(7, payload.getHash());
preparedInsertPayloadStatement.setString(8, payload.getLocation());
preparedInsertPayloadStatement.setString(9, payload.getProvenance());
preparedInsertPayloadStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + sqle.getMessage() + "\n\n");
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of the loop)
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertAttemptStatement.setString(1, payload.getId());
preparedInsertAttemptStatement.setString(2, payload.getOriginal_url());
preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired());
preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString());
preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null".
preparedInsertAttemptStatement.setString(6, error.getMessage());
preparedInsertAttemptStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage() + "\n\n");
}
}//end for-loop
try {
con.commit(); // Commit all the insert-queries to the database (write them to disk).
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here!
}
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
String mergeErrorMsg = FileUtils.mergeParquetFiles("payload", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
mergeErrorMsg = FileUtils.mergeParquetFiles("attempt", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
try {
con.commit(); // Apply the merges permanently (write them to disk).
con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool.
} catch (SQLException sqle) {
String errorMsg = "Problem when committing changes to the database!";
logger.error(errorMsg + "\n" + sqle.getMessage());
// The statements used in "mergeParquetFiles()" are already closed.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
}
return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg);
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException
{
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for ( int i=1; i <= dataSize; ++i ) {
sb.append("(");
for ( int j=1; j <= numParamsPerRow; ++j ) {
sb.append("?");
if ( j < numParamsPerRow )
sb.append(",");
}
sb.append(")");
if ( i < dataSize )
sb.append(",");
}
PreparedStatement preparedInsertStatement;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con.prepareStatement(sb.toString());
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
logger.error(errorMsg + sqle.getMessage());
throw new RuntimeException(errorMsg);
}
return preparedInsertStatement;
}
private boolean closePreparedStatements(PreparedStatement preparedStatement1, PreparedStatement preparedStatement2, Connection con) {
try {
if ( preparedStatement1 != null )
preparedStatement1.close();
if ( preparedStatement2 != null )
preparedStatement2.close();
if ( con != null )
con.close(); // It may have already closed and that's fine.
return true;
} catch (SQLException sqle) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage());
return false;
}
}
@GetMapping("test")
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
try {
new FileUtils(); // Find the input file.
} catch (Exception e) {
logger.error(e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The resource file, for the requested assignments, was not found.");
}
List<Assignment> assignments = new ArrayList<>();
HashMultimap<String, String> loadedIdUrlPairs;
boolean isFirstRun = true;
boolean assignmentsLimitReached = false;
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
// Start loading urls.
while ( true ) {
loadedIdUrlPairs = FileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
if ( FileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
break;
else
isFirstRun = false;
Set<Map.Entry<String, String>> pairs = loadedIdUrlPairs.entries();
for ( Map.Entry<String,String> pair : pairs )
{
if ( assignments.size() >= workerAssignmentsLimit ) {
assignmentsLimitReached = true;
break;
}
int randomNum = GenericUtils.getRandomNumber(1, 5);
assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp));
}// end pairs-for-loop
if ( assignmentsLimitReached ) {
logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached.");
break;
}
}// end loading-while-loop
Scanner scanner = FileUtils.inputScanner.get();
if ( scanner != null ) // Check if the initial value is null.
scanner.close();
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
}