You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
UrlsController/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java

576 lines
34 KiB
Java

package eu.openaire.urls_controller.controllers;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.ControllerConstants;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.sql.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@RestController
@RequestMapping("/urls")
public class UrlController {
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
@GetMapping("")
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + workerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > ControllerConstants.ASSIGNMENTS_LIMIT ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + ControllerConstants.ASSIGNMENTS_LIMIT + "). Will use the Controller's limit.");
assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT;
}
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
"left outer join (\n" +
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
" union all\n" +
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + ControllerConstants.MAX_ATTEMPTS_PER_RECORD + " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit " + assignmentsLimit + ") as findAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
String createAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) { // This is already logged in "getConnection()".
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
// All transactions in Impala automatically commit at the end of the statement. Currently, Impala does not support multi-statement transactions.
// https://impala.apache.org/docs/build/html/topics/impala_transactions.html
// We cannot use "savePoints" along with "autoCommit = false" to roll back to a previous state among multiple statements.
PreparedStatement createCurrentAssignmentsPreparedStatement = null;
try {
createCurrentAssignmentsPreparedStatement = con.prepareStatement(createAssignmentsQuery);
// We cannot set the "limits" and the MAX_ATTEMPTS_PER_RECORD as preparedStatements parameters, as we get a "java.sql.SQLException: [Simba][JDBC](11420) Error, parameter metadata not populated."
createCurrentAssignmentsPreparedStatement.execute();
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = ImpalaConnector.handlePreparedStatementException("createAssignmentsQuery", createAssignmentsQuery, "createCurrentAssignmentsPreparedStatement", createCurrentAssignmentsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( createCurrentAssignmentsPreparedStatement != null )
createCurrentAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"createCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
PreparedStatement computeCurrentAssignmentsStatsPreparedStatement = null;
try {
computeCurrentAssignmentsStatsPreparedStatement = con.prepareStatement(computeCurrentAssignmentsStatsQuery);
computeCurrentAssignmentsStatsPreparedStatement.execute();
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( computeCurrentAssignmentsStatsPreparedStatement != null )
computeCurrentAssignmentsStatsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"computeCurrentAssignmentsStatsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
PreparedStatement getAssignmentsPreparedStatement = null;
try {
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle);
// The "getAssignmentsPreparedStatement" will always be null here, so we do not close it.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) {
// Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it.
/*if ( !resultSet.first() ) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
}*/
// The cursor is automatically before the first element in this configuration.
while ( resultSet.next() ) {
// The following few lines, cannot be outside the "while" loop, since the same object is added, despite that we update the inner-values.
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setTimestamp(timestamp);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
assignment.setId(resultSet.getString(1));
assignment.setOriginalUrl(resultSet.getString(2));
datasource.setId(resultSet.getString(3));
datasource.setName(resultSet.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow(), sqle);
continue; // This object is broken, move to the next row.
}
assignment.setDatasource(datasource);
assignments.add(assignment);
}
} catch (Exception e) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
logger.error(errorMsg, e);
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
getAssignmentsPreparedStatement.close();
} catch (SQLException sqle) {
logger.error("Failed to close the \"getAssignmentsPreparedStatement\"!\n" + sqle.getMessage());
}
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
}
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
// The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party.
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
PreparedStatement insertAssignmentsPreparedStatement = null;
try {
insertAssignmentsPreparedStatement = con.prepareStatement(insertAssignmentsQuery);
insertAssignmentsPreparedStatement.execute();
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
if ( insertAssignmentsPreparedStatement != null )
insertAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"insertAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
String curWorkerId = workerReport.getWorkerId();
if ( curWorkerId == null ) {
String errorMsg = "No \"workerId\" was included inside the \"WorkerReport\"!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(curWorkerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + curWorkerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || urlReports.isEmpty() ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
}
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
// Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
String tempInsertQueryName = null;
PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null;
try {
tempInsertQueryName = "insertIntoPayloadBaseQuery";
preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery);
tempInsertQueryName = "insertIntoAttemptBaseQuery";
preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n";
logger.error(errorMsg + sqle.getMessage());
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
try {
con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end.
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n";
logger.error(errorMsg + sqle.getMessage());
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
String payloadErrorMsg = null;
int failedCount = 0;
// TODO - Think about handling this loop with multiple threads..
// The Impala-server will handle the synchronization itself..
// Check online what happens with "statement.setPoolable()" does it improves speed? in multi or also in single thread?
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.error("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments);
payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!";
continue;
}
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
preparedInsertPayloadStatement.setString(1, payload.getId());
preparedInsertPayloadStatement.setString(2, payload.getOriginal_url());
preparedInsertPayloadStatement.setString(3, payload.getActual_url());
preparedInsertPayloadStatement.setTimestamp(4, payload.getTimestamp_acquired());
preparedInsertPayloadStatement.setString(5, payload.getMime_type());
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
String sizeStr = null;
Long size = payload.getSize();
if ( size != null )
sizeStr = String.valueOf(size);
preparedInsertPayloadStatement.setString(6, sizeStr);
preparedInsertPayloadStatement.setString(7, payload.getHash());
preparedInsertPayloadStatement.setString(8, payload.getLocation());
preparedInsertPayloadStatement.setString(9, payload.getProvenance());
preparedInsertPayloadStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + sqle.getMessage() + "\n\n");
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
preparedInsertAttemptStatement.setString(1, payload.getId());
preparedInsertAttemptStatement.setString(2, payload.getOriginal_url());
preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired());
preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString());
preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null" too.
preparedInsertAttemptStatement.setString(6, error.getMessage());
preparedInsertAttemptStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage() + "\n\n");
}
}//end for-loop
try {
con.commit(); // Commit all the insert-queries to the database (write them to disk).
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here, as we might move forward.
}
if ( payloadErrorMsg != null )
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables, although " + payloadErrorMsg + " Going to merge the parquet files for those tables.");
else
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
String mergeErrorMsg = FileUtils.mergeParquetFiles("payload", con, "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
mergeErrorMsg = FileUtils.mergeParquetFiles("attempt", con, "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We do not need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, " WHERE workerid != ", curWorkerId);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
try {
con.commit(); // Apply the merges permanently (write them to disk).
con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool.
} catch (SQLException sqle) {
String errorMsg = "Problem when committing changes to the database!";
logger.error(errorMsg + "\n" + sqle.getMessage());
// The statements used in "mergeParquetFiles()" are already closed.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
}
logger.debug("Finished merging the database tables.");
return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg);
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException
{
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for ( int i=1; i <= dataSize; ++i ) {
sb.append("(");
for ( int j=1; j <= numParamsPerRow; ++j ) {
sb.append("?");
if ( j < numParamsPerRow )
sb.append(",");
}
sb.append(")");
if ( i < dataSize )
sb.append(",");
}
PreparedStatement preparedInsertStatement;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con.prepareStatement(sb.toString());
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
logger.error(errorMsg + sqle.getMessage());
throw new RuntimeException(errorMsg);
}
return preparedInsertStatement;
}
private boolean closeStatements(Statement statement1, Statement statement2, Connection con) {
try {
if ( statement1 != null )
statement1.close();
if ( statement2 != null )
statement2.close();
if ( con != null )
con.close(); // It may have already closed and that's fine.
return true;
} catch (SQLException sqle) {
logger.error("Could not close the statements or the connection with the Impala-database.\n" + sqle.getMessage());
return false;
}
}
@GetMapping("test")
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
try {
new FileUtils(); // Find the input file.
} catch (Exception e) {
logger.error(e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The resource file, for the requested assignments, was not found.");
}
List<Assignment> assignments = new ArrayList<>();
HashMultimap<String, String> loadedIdUrlPairs;
boolean isFirstRun = true;
boolean assignmentsLimitReached = false;
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
// Start loading urls.
while ( true ) {
loadedIdUrlPairs = FileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
if ( FileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
break;
else
isFirstRun = false;
Set<Map.Entry<String, String>> pairs = loadedIdUrlPairs.entries();
for ( Map.Entry<String,String> pair : pairs )
{
if ( assignments.size() >= workerAssignmentsLimit ) {
assignmentsLimitReached = true;
break;
}
int randomNum = GenericUtils.getRandomNumber(1, 5);
assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp));
}// end pairs-for-loop
if ( assignmentsLimitReached ) {
logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached.");
break;
}
}// end loading-while-loop
Scanner scanner = FileUtils.inputScanner.get();
if ( scanner != null ) // Check if the initial value is null.
scanner.close();
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
private String dropCurrentAssignmentTable(Connection con)
{
String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector.databaseName + ".current_assignment PURGE";
PreparedStatement dropCurrentAssignmentsPreparedStatement = null;
try {
dropCurrentAssignmentsPreparedStatement = con.prepareStatement(dropCurrentAssignmentsQuery);
dropCurrentAssignmentsPreparedStatement.execute();
return null;
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
return ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, "dropCurrentAssignmentsPreparedStatement", dropCurrentAssignmentsPreparedStatement, con, sqle);
} finally {
try {
if ( dropCurrentAssignmentsPreparedStatement != null )
dropCurrentAssignmentsPreparedStatement.close();
} catch (SQLException sqle2) {
logger.error("Failed to close the \"dropCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
}
}
}
}