You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
UrlsController/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java

575 lines
32 KiB
Java

package eu.openaire.urls_controller.controllers;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.sql.*;
import java.sql.Date;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
@RestController
@RequestMapping("/urls")
public class UrlController {
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
@GetMapping("")
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > ControllerConstants.ASSIGNMENTS_LIMIT ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + ControllerConstants.ASSIGNMENTS_LIMIT + "). Will use the Controller's limit.");
assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT;
}
String getAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
"left outer join (\n" +
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
" union all\n" +
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= ? and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry')\n" +
") as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit ?) as getAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
// TODO - If we add more limits it could be faster.. Inner queries could have a limit of e.g. < assignmentsLimit ^ 2 >
// The LIMIT of < assignmentsLimit > should be kept in the end, as we want 10_000 of distinct results.
// This is just for tests without the attempts, payloads and the assignments
/*String getAssignmentsQuery = "select * from (select distinct pubid, url, datasourceid, datasourcetype from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"where d.allow_harvest=true " +
"order by reverse(p.id), pu.url) as distinct_results\n" +
"limit ? ) as getAssignmentsQuery";*/
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) { // This is already logged in "getConnection()".
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
PreparedStatement getAssignmentsPreparedStatement = null;
try {
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
getAssignmentsPreparedStatement.setInt(1, ControllerConstants.MAX_ATTEMPTS_PER_RECORD);
getAssignmentsPreparedStatement.setInt(2, assignmentsLimit);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when creating the prepared statement for \"getAssignmentsQuery\"!\n";
logger.error(errorMsg + sqle.getMessage());
try {
if ( getAssignmentsPreparedStatement != null )
getAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Could not close the \"getAssignmentsPreparedStatement\".\n" + sqle2.getMessage());
}
try {
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
Date date = new Date(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) {
// Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it.
/*if ( !resultSet.first() ) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}*/
// The cursor is automatically before the first element in this configuration.
while ( resultSet.next() ) {
// The following few lines, cannot be outside the "while" loop, since the same record is returned, despite that we update the inner-values.
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setDate(date);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
assignment.setId(resultSet.getString(1));
assignment.setOriginalUrl(resultSet.getString(2));
datasource.setId(resultSet.getString(3));
datasource.setName(resultSet.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow(), sqle);
continue; // This object is broken, move to the next row.
}
assignment.setDatasource(datasource);
assignments.add(assignment);
}
} catch (Exception e) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
logger.error(errorMsg, e);
try {
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
getAssignmentsPreparedStatement.close();
} catch (SQLException sqle) {
logger.error("Failed to close the \"getAssignmentsPreparedStatement\"!\n" + sqle.getMessage());
}
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
try {
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// The following is a test of inserting multiple rows with a singme insert-query. If applied with a preparedStatement, then the JDBC fails with "OutOfMemory"-Error.
/*String testInsert = "INSERT INTO assignment (id,original_url,workerid,`date`) VALUES ( 'doiboost____::4e8b1f12ac3ba5a9d8fbff9872000000', 'http://dx.doi.org/10.17267/2596-3368dentistry.v6i2.586', 'worker_1', CAST('2021-10-01' AS TIMESTAMP) ) , ( 'doiboost____::4e8b1f12ac3ba5a9d8fbff9872000000', 'https://academic.microsoft.com/#/detail/2887540302', 'worker_1', CAST('2021-10-01' AS TIMESTAMP) );";
try (Statement insertStatement = con.createStatement()) {
insertStatement.execute(testInsert);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String mergeErrorMsg = "Problem when executing the testInsert statement for \"" + testInsert + "\"";
logger.error(mergeErrorMsg + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}*/
// Write the Assignment details to the database and then send it to the worker.
String insertIntoAssignmentBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".assignment (id, original_url, workerid, date) VALUES (?, ?, ?, ?)";
PreparedStatement preparedInsertAssignmentStatement;
try { // We use a "PreparedStatement" to do insertions, for security and performance reasons.
preparedInsertAssignmentStatement = con.prepareStatement(insertIntoAssignmentBaseQuery);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when creating the prepared statement for \"insertIntoAssignmentBaseQuery\"!\n";
logger.error(errorMsg + sqle.getMessage());
try {
con.close();
} catch (SQLException sqle2) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage());
}
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
// Before, we wanted to execute the getAssignmentQuery and take the assignments immediately, but now it's more efficient to commit all the inserts in the end.
try {
con.setAutoCommit(false);
} catch (SQLException sqle) { // There is a database-error. The latter actions will probably fail as well.
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
closePreparedStatements(preparedInsertAssignmentStatement, null, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
String tempFullQueryString = null;
for ( Assignment assignment : assignments ) {
try {
preparedInsertAssignmentStatement.setString(1, assignment.getId());
preparedInsertAssignmentStatement.setString(2, assignment.getOriginalUrl());
preparedInsertAssignmentStatement.setString(3, workerId);
preparedInsertAssignmentStatement.setDate(4, date);
tempFullQueryString = getAssignmentsPreparedStatement.toString();
preparedInsertAssignmentStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoAssignmentQuery\":\n" + tempFullQueryString + "\n" + sqle.getMessage() + "\n\n");
}
}//end for-loop
try {
con.commit(); // Send all the insert-queries to the database.
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when committing changes to the database!";
logger.error(errorMsg + "\n" + sqle.getMessage());
closePreparedStatements(preparedInsertAssignmentStatement, null, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = mergeParquetFiles("assignment", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
closePreparedStatements(preparedInsertAssignmentStatement, null, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
try {
con.commit(); // Apply the merge.
con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool.
} catch (SQLException sqle) {
String errorMsg = "Problem when committing changes to the database!";
logger.error(errorMsg , sqle);//+ "\n" + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
ImpalaConnector.databaseLock.unlock();
closePreparedStatements(preparedInsertAssignmentStatement, null, con);
}
logger.info("Sending batch-assignments_" + assignmentsBatchCounter.incrementAndGet() + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(assignmentsBatchCounter.get(), assignments));
}
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport) {
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || urlReports.isEmpty() ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + workerReport.getWorkerId() + "\" was empty!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
logger.info("Received the WorkerReport for batch_ " + workerReport.getAssignmentRequestCounter() + ", from the worker with id: " + workerReport.getWorkerId() + ". It contains " + urlReports.size() + " urlReports. Going to insert them into the database.");
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
// Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
String tempInsertQueryName = null;
PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null;
try {
tempInsertQueryName = "insertIntoPayloadBaseQuery";
preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery);
tempInsertQueryName = "insertIntoAttemptBaseQuery";
preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery);
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n";
logger.error(errorMsg + sqle.getMessage());
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
try {
con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end.
} catch (SQLException sqle) {
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
String payloadErrorMsg = null;
int failedCount = 0;
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.error("Payload was \"null\" for a \"urlReport\", in assignments_" + assignmentsBatchCounter);
payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!";
continue;
}
String tempFullQueryString = null;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertPayloadStatement.setString(1, payload.getId());
preparedInsertPayloadStatement.setString(2, payload.getOriginal_url());
preparedInsertPayloadStatement.setString(3, payload.getActual_url());
preparedInsertPayloadStatement.setDate(4, payload.getDate_acquired());
preparedInsertPayloadStatement.setString(5, payload.getMime_type());
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
String stringSize = null;
Long size = payload.getSize();
if ( size != null )
stringSize = String.valueOf(size);
preparedInsertPayloadStatement.setString(6, stringSize);
preparedInsertPayloadStatement.setString(7, payload.getHash());
preparedInsertPayloadStatement.setString(8, payload.getLocation());
preparedInsertPayloadStatement.setString(9, payload.getProvenance());
tempFullQueryString = preparedInsertPayloadStatement.toString();
preparedInsertPayloadStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\":\n" + tempFullQueryString + "\n" + sqle.getMessage() + "\n\n");
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of the loop)
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertAttemptStatement.setString(1, payload.getId());
preparedInsertAttemptStatement.setString(2, payload.getOriginal_url());
preparedInsertAttemptStatement.setDate(3, payload.getDate_acquired());
preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString());
preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null".
preparedInsertAttemptStatement.setString(6, error.getMessage());
tempFullQueryString = preparedInsertAttemptStatement.toString();
preparedInsertAttemptStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\":\n" + tempFullQueryString + "\n" + sqle.getMessage() + "\n\n");
}
}//end for-loop
try {
con.commit(); // Commit all the insert-queries to the database (write them to disk).
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here!
}
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
String mergeErrorMsg = mergeParquetFiles("payload", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
mergeErrorMsg = mergeParquetFiles("attempt", con);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
try {
con.commit(); // Apply the merges permanently (write them to disk).
con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool.
} catch (SQLException sqle) {
String errorMsg = "Problem when committing changes to the database!";
logger.error(errorMsg + "\n" + sqle.getMessage());
// The statements used in "mergeParquetFiles()" are already closed.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
}
return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg);
}
/**
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
* Renames the clone to the original's name.
* Returns the errorMsg, if an error appears, otherwise is returns "null".
* */
private static String mergeParquetFiles(String tableName, Connection con)
{
String errorMsg;
if ( tableName == null ) {
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
logger.error(errorMsg);
return errorMsg;
}
Statement statement;
try {
statement = con.createStatement();
} catch (SQLException sqle) {
errorMsg = "Problem when creating a connection-statement!\n";
logger.error(errorMsg + sqle.getMessage());
return errorMsg;
}
try {
statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName);
statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
} catch (SQLException sqle) {
errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage()));
sqle.printStackTrace();
return errorMsg;
} finally {
// Make sure we close the statement.
try { statement.close(); }
catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); }
}
return null; // No errorMsg, everything is fine.
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException
{
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for ( int i=1; i <= dataSize; ++i ) {
sb.append("(");
for ( int j=1; j <= numParamsPerRow; ++j ) {
sb.append("?");
if ( j < numParamsPerRow )
sb.append(",");
}
sb.append(")");
if ( i < dataSize )
sb.append(",");
}
PreparedStatement preparedInsertStatement;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con.prepareStatement(sb.toString());
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
logger.error(errorMsg + sqle.getMessage());
throw new RuntimeException(errorMsg);
}
return preparedInsertStatement;
}
private static String getCutBatchExceptionMessage(String sqleMessage)
{
// The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging.
int maxEnding = 1500;
if ( sqleMessage.length() > maxEnding )
return (sqleMessage.substring(0, maxEnding) + "...");
else
return sqleMessage;
}
private boolean closePreparedStatements(PreparedStatement preparedStatement1, PreparedStatement preparedStatement2, Connection con) {
try {
if ( preparedStatement1 != null )
preparedStatement1.close();
if ( preparedStatement2 != null )
preparedStatement2.close();
if ( con != null )
con.close(); // It may have already closed and that's fine.
return true;
} catch (SQLException sqle) {
logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage());
return false;
}
}
@GetMapping("test")
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
try {
new FileUtils(); // Find the input file.
} catch (Exception e) {
logger.error(e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The resource file, for the requested assignments, was not found.");
}
List<Assignment> assignments = new ArrayList<>();
HashMultimap<String, String> loadedIdUrlPairs;
boolean isFirstRun = true;
boolean assignmentsLimitReached = false;
Date date = new Date(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
// Start loading urls.
while ( true ) {
loadedIdUrlPairs = FileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
if ( FileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
break;
else
isFirstRun = false;
Set<Map.Entry<String, String>> pairs = loadedIdUrlPairs.entries();
for ( Map.Entry<String,String> pair : pairs )
{
if ( assignments.size() >= workerAssignmentsLimit ) {
assignmentsLimitReached = true;
break;
}
int randomNum = GenericUtils.getRandomNumber(1, 5);
assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, date));
}// end pairs-for-loop
if ( assignmentsLimitReached ) {
logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached.");
break;
}
}// end loading-while-loop
Scanner scanner = FileUtils.inputScanner.get();
if ( scanner != null ) // Check if the initial value is null.
scanner.close();
logger.info("Sending batch_" + assignmentsBatchCounter.incrementAndGet() + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(assignmentsBatchCounter.get(), assignments));
}
}