560 lines
32 KiB
Java
560 lines
32 KiB
Java
package eu.openaire.urls_controller.controllers;
|
|
|
|
import com.google.common.collect.HashMultimap;
|
|
import eu.openaire.urls_controller.configuration.ImpalaConnector;
|
|
import eu.openaire.urls_controller.models.Error;
|
|
import eu.openaire.urls_controller.models.*;
|
|
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
|
|
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
|
|
import eu.openaire.urls_controller.util.*;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.http.HttpStatus;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.web.bind.annotation.*;
|
|
|
|
import javax.servlet.http.HttpServletRequest;
|
|
import java.sql.*;
|
|
|
|
import java.util.*;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.regex.Pattern;
|
|
|
|
@RestController
|
|
@RequestMapping("/urls")
|
|
public class UrlController {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
|
|
|
|
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
|
|
|
|
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
|
|
|
|
|
|
@GetMapping("")
|
|
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
|
|
|
|
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
|
|
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
|
|
String errorMsg = "Possibly malicious \"workerId\" received: " + workerId;
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
|
|
}
|
|
|
|
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
|
|
|
|
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
|
|
|
|
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
|
|
int assignmentsLimit = workerAssignmentsLimit;
|
|
if ( assignmentsLimit == 0 ) {
|
|
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
|
|
} else if ( assignmentsLimit > ControllerConstants.ASSIGNMENTS_LIMIT ) {
|
|
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + ControllerConstants.ASSIGNMENTS_LIMIT + "). Will use the Controller's limit.");
|
|
assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT;
|
|
}
|
|
|
|
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
|
|
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
|
|
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
|
|
"from " + ImpalaConnector.databaseName + ".publication p\n" +
|
|
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
|
|
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
|
|
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
|
|
"left outer join (\n" +
|
|
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
|
|
" union all\n" +
|
|
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
|
|
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + ControllerConstants.MAX_ATTEMPTS_PER_RECORD + " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry')\n" +
|
|
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
|
|
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
|
|
"limit " + assignmentsLimit + ") as findAssignmentsQuery";
|
|
|
|
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
|
|
|
|
String createAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
|
|
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
|
|
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
|
|
|
|
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
|
|
|
|
ImpalaConnector.databaseLock.lock();
|
|
|
|
Connection con = ImpalaConnector.getInstance().getConnection();
|
|
if ( con == null ) { // This is already logged in "getConnection()".
|
|
ImpalaConnector.databaseLock.unlock();
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
|
|
}
|
|
|
|
// All transactions in Impala automatically commit at the end of the statement. Currently, Impala does not support multi-statement transactions.
|
|
// https://impala.apache.org/docs/build/html/topics/impala_transactions.html
|
|
// We cannot use "savePoints" along with "autoCommit = false" to roll back to a previous state among multiple statements.
|
|
|
|
PreparedStatement createCurrentAssignmentsPreparedStatement = null;
|
|
try {
|
|
createCurrentAssignmentsPreparedStatement = con.prepareStatement(createAssignmentsQuery);
|
|
// We cannot set the "limits" and the MAX_ATTEMPTS_PER_RECORD as preparedStatements parameters, as we get a "java.sql.SQLException: [Simba][JDBC](11420) Error, parameter metadata not populated."
|
|
createCurrentAssignmentsPreparedStatement.execute();
|
|
} catch (SQLException sqle) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
String errorMsg = ImpalaConnector.handlePreparedStatementException("createAssignmentsQuery", createAssignmentsQuery, "createCurrentAssignmentsPreparedStatement", createCurrentAssignmentsPreparedStatement, con, sqle);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
} finally {
|
|
try {
|
|
if ( createCurrentAssignmentsPreparedStatement != null )
|
|
createCurrentAssignmentsPreparedStatement.close();
|
|
} catch (SQLException sqle2) {
|
|
logger.error("Failed to close the \"createCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
|
|
}
|
|
}
|
|
|
|
PreparedStatement computeCurrentAssignmentsStatsPreparedStatement = null;
|
|
try {
|
|
computeCurrentAssignmentsStatsPreparedStatement = con.prepareStatement(computeCurrentAssignmentsStatsQuery);
|
|
computeCurrentAssignmentsStatsPreparedStatement.execute();
|
|
} catch (SQLException sqle) {
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
if ( errorMsg != null )
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
ImpalaConnector.databaseLock.unlock();
|
|
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
} finally {
|
|
try {
|
|
if ( computeCurrentAssignmentsStatsPreparedStatement != null )
|
|
computeCurrentAssignmentsStatsPreparedStatement.close();
|
|
} catch (SQLException sqle2) {
|
|
logger.error("Failed to close the \"computeCurrentAssignmentsStatsPreparedStatement\"!\n" + sqle2.getMessage());
|
|
}
|
|
}
|
|
|
|
PreparedStatement getAssignmentsPreparedStatement = null;
|
|
try {
|
|
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
|
|
} catch (SQLException sqle) {
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
if ( errorMsg != null )
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
ImpalaConnector.databaseLock.unlock();
|
|
errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle);
|
|
// The "getAssignmentsPreparedStatement" will always be null here, so we do not close it.
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
|
|
|
|
try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) {
|
|
// Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it.
|
|
/*if ( !resultSet.first() ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}*/
|
|
|
|
// The cursor is automatically before the first element in this configuration.
|
|
while ( resultSet.next() ) {
|
|
// The following few lines, cannot be outside the "while" loop, since the same object is added, despite that we update the inner-values.
|
|
Assignment assignment = new Assignment();
|
|
assignment.setWorkerId(workerId);
|
|
assignment.setTimestamp(timestamp);
|
|
Datasource datasource = new Datasource();
|
|
try { // For each of the 4 columns returned. The indexing starts from 1
|
|
assignment.setId(resultSet.getString(1));
|
|
assignment.setOriginalUrl(resultSet.getString(2));
|
|
datasource.setId(resultSet.getString(3));
|
|
datasource.setName(resultSet.getString(4));
|
|
} catch (SQLException sqle) {
|
|
logger.error("No value was able to be retrieved from one of the columns of row_" + resultSet.getRow(), sqle);
|
|
continue; // This object is broken, move to the next row.
|
|
}
|
|
assignment.setDatasource(datasource);
|
|
assignments.add(assignment);
|
|
}
|
|
} catch (Exception e) {
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
if ( errorMsg != null )
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
ImpalaConnector.databaseLock.unlock();
|
|
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
|
|
logger.error(errorMsg, e);
|
|
ImpalaConnector.closeConnection(con);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
} finally {
|
|
try {
|
|
getAssignmentsPreparedStatement.close();
|
|
} catch (SQLException sqle) {
|
|
logger.error("Failed to close the \"getAssignmentsPreparedStatement\"!\n" + sqle.getMessage());
|
|
}
|
|
}
|
|
|
|
int assignmentsSize = assignments.size();
|
|
if ( assignmentsSize == 0 ) {
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
if ( errorMsg != null )
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
ImpalaConnector.databaseLock.unlock();
|
|
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId;
|
|
logger.error(errorMsg);
|
|
ImpalaConnector.closeConnection(con);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
|
|
|
|
|
|
// Write the Assignment details to the assignment-table.
|
|
|
|
// The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party.
|
|
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
|
|
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
|
|
|
|
PreparedStatement insertAssignmentsPreparedStatement = null;
|
|
try {
|
|
insertAssignmentsPreparedStatement = con.prepareStatement(insertAssignmentsQuery);
|
|
insertAssignmentsPreparedStatement.execute();
|
|
} catch (SQLException sqle) {
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
if ( errorMsg != null )
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
ImpalaConnector.databaseLock.unlock();
|
|
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
} finally {
|
|
try {
|
|
if ( insertAssignmentsPreparedStatement != null )
|
|
insertAssignmentsPreparedStatement.close();
|
|
} catch (SQLException sqle2) {
|
|
logger.error("Failed to close the \"insertAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
|
|
}
|
|
}
|
|
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
if ( errorMsg != null )
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
|
|
|
|
String mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, "", null);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
ImpalaConnector.closeConnection(con);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
|
}
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
ImpalaConnector.closeConnection(con);
|
|
|
|
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
|
|
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
|
|
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
|
|
}
|
|
|
|
|
|
@PostMapping("addWorkerReport")
|
|
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
|
|
|
|
if ( workerReport == null ) {
|
|
String errorMsg = "No \"WorkerReport\" was given!";
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
|
|
}
|
|
|
|
String curWorkerId = workerReport.getWorkerId();
|
|
if ( curWorkerId == null ) {
|
|
String errorMsg = "No \"workerId\" was included inside the \"WorkerReport\"!";
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
|
|
}
|
|
|
|
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
|
|
if ( MALICIOUS_INPUT_STRING.matcher(curWorkerId).matches() ) {
|
|
String errorMsg = "Possibly malicious \"workerId\" received: " + curWorkerId;
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
|
|
}
|
|
|
|
List<UrlReport> urlReports = workerReport.getUrlReports();
|
|
if ( (urlReports == null) || urlReports.isEmpty() ) {
|
|
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
|
|
}
|
|
|
|
long curReportAssignments = workerReport.getAssignmentRequestCounter();
|
|
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
|
|
|
|
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
|
|
if ( ! FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId) ) {
|
|
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
|
|
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
|
|
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
|
|
}
|
|
|
|
ImpalaConnector.databaseLock.lock();
|
|
|
|
Connection con = ImpalaConnector.getInstance().getConnection();
|
|
if ( con == null )
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
|
|
|
|
// Store the workerReport into the database.
|
|
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
|
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
|
|
|
|
String tempInsertQueryName = null;
|
|
PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null;
|
|
try {
|
|
tempInsertQueryName = "insertIntoPayloadBaseQuery";
|
|
preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery);
|
|
tempInsertQueryName = "insertIntoAttemptBaseQuery";
|
|
preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery);
|
|
} catch (SQLException sqle) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n";
|
|
logger.error(errorMsg + sqle.getMessage());
|
|
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
try {
|
|
con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end.
|
|
} catch (SQLException sqle) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!";
|
|
logger.error(errorMsg + "\n" + sqle.getMessage());
|
|
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
String payloadErrorMsg = null;
|
|
int failedCount = 0;
|
|
|
|
for ( UrlReport urlReport : urlReports ) {
|
|
Payload payload = urlReport.getPayload();
|
|
if ( payload == null ) {
|
|
logger.error("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments);
|
|
payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!";
|
|
continue;
|
|
}
|
|
|
|
try { // We use a "PreparedStatement" to do insertions, for security reasons.
|
|
preparedInsertPayloadStatement.setString(1, payload.getId());
|
|
preparedInsertPayloadStatement.setString(2, payload.getOriginal_url());
|
|
preparedInsertPayloadStatement.setString(3, payload.getActual_url());
|
|
preparedInsertPayloadStatement.setTimestamp(4, payload.getTimestamp_acquired());
|
|
preparedInsertPayloadStatement.setString(5, payload.getMime_type());
|
|
|
|
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
|
|
String stringSize = null;
|
|
Long size = payload.getSize();
|
|
if ( size != null )
|
|
stringSize = String.valueOf(size);
|
|
|
|
preparedInsertPayloadStatement.setString(6, stringSize);
|
|
preparedInsertPayloadStatement.setString(7, payload.getHash());
|
|
preparedInsertPayloadStatement.setString(8, payload.getLocation());
|
|
preparedInsertPayloadStatement.setString(9, payload.getProvenance());
|
|
preparedInsertPayloadStatement.executeUpdate();
|
|
} catch (SQLException sqle) {
|
|
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + sqle.getMessage() + "\n\n");
|
|
}
|
|
|
|
Error error = urlReport.getError();
|
|
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of the loop)
|
|
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
|
|
error = new Error(null, null);
|
|
}
|
|
|
|
try { // We use a "PreparedStatement" to do insertions, for security reasons.
|
|
preparedInsertAttemptStatement.setString(1, payload.getId());
|
|
preparedInsertAttemptStatement.setString(2, payload.getOriginal_url());
|
|
preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired());
|
|
preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString());
|
|
preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null".
|
|
preparedInsertAttemptStatement.setString(6, error.getMessage());
|
|
preparedInsertAttemptStatement.executeUpdate();
|
|
} catch (SQLException sqle) {
|
|
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage() + "\n\n");
|
|
}
|
|
}//end for-loop
|
|
|
|
try {
|
|
con.commit(); // Commit all the insert-queries to the database (write them to disk).
|
|
} catch (SQLException sqle) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
|
|
logger.error(errorMsg + "\n" + sqle.getMessage());
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
} finally {
|
|
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here!
|
|
}
|
|
|
|
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
|
|
|
|
String mergeErrorMsg = FileUtils.mergeParquetFiles("payload", con, "", null);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
ImpalaConnector.closeConnection(con);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
|
}
|
|
|
|
mergeErrorMsg = FileUtils.mergeParquetFiles("attempt", con, "", null);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
ImpalaConnector.closeConnection(con);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
|
}
|
|
|
|
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
|
|
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
|
|
mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, " WHERE workerid != ", curWorkerId);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
ImpalaConnector.closeConnection(con);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
|
}
|
|
|
|
try {
|
|
con.commit(); // Apply the merges permanently (write them to disk).
|
|
con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool.
|
|
} catch (SQLException sqle) {
|
|
String errorMsg = "Problem when committing changes to the database!";
|
|
logger.error(errorMsg + "\n" + sqle.getMessage());
|
|
// The statements used in "mergeParquetFiles()" are already closed.
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
} finally {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
ImpalaConnector.closeConnection(con);
|
|
}
|
|
|
|
return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg);
|
|
}
|
|
|
|
|
|
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
|
|
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
|
|
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException
|
|
{
|
|
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
|
|
sb.append(baseInsertQuery);
|
|
for ( int i=1; i <= dataSize; ++i ) {
|
|
sb.append("(");
|
|
for ( int j=1; j <= numParamsPerRow; ++j ) {
|
|
sb.append("?");
|
|
if ( j < numParamsPerRow )
|
|
sb.append(",");
|
|
}
|
|
sb.append(")");
|
|
if ( i < dataSize )
|
|
sb.append(",");
|
|
}
|
|
|
|
PreparedStatement preparedInsertStatement;
|
|
try { // We use a "PreparedStatement" to do insertions, for security reasons.
|
|
preparedInsertStatement = con.prepareStatement(sb.toString());
|
|
} catch (SQLException sqle) {
|
|
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
|
|
logger.error(errorMsg + sqle.getMessage());
|
|
throw new RuntimeException(errorMsg);
|
|
}
|
|
return preparedInsertStatement;
|
|
}
|
|
|
|
|
|
private boolean closePreparedStatements(PreparedStatement preparedStatement1, PreparedStatement preparedStatement2, Connection con) {
|
|
try {
|
|
if ( preparedStatement1 != null )
|
|
preparedStatement1.close();
|
|
if ( preparedStatement2 != null )
|
|
preparedStatement2.close();
|
|
if ( con != null )
|
|
con.close(); // It may have already closed and that's fine.
|
|
return true;
|
|
} catch (SQLException sqle) {
|
|
logger.error("Could not close the connection with the Impala-database.\n" + sqle.getMessage());
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
@GetMapping("test")
|
|
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
|
|
|
|
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + ControllerConstants.ASSIGNMENTS_LIMIT);
|
|
|
|
try {
|
|
new FileUtils(); // Find the input file.
|
|
} catch (Exception e) {
|
|
logger.error(e.getMessage());
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The resource file, for the requested assignments, was not found.");
|
|
}
|
|
|
|
List<Assignment> assignments = new ArrayList<>();
|
|
HashMultimap<String, String> loadedIdUrlPairs;
|
|
boolean isFirstRun = true;
|
|
boolean assignmentsLimitReached = false;
|
|
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
|
|
|
|
// Start loading urls.
|
|
while ( true ) {
|
|
loadedIdUrlPairs = FileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
|
|
|
|
if ( FileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
|
|
break;
|
|
else
|
|
isFirstRun = false;
|
|
|
|
Set<Map.Entry<String, String>> pairs = loadedIdUrlPairs.entries();
|
|
|
|
for ( Map.Entry<String,String> pair : pairs )
|
|
{
|
|
if ( assignments.size() >= workerAssignmentsLimit ) {
|
|
assignmentsLimitReached = true;
|
|
break;
|
|
}
|
|
|
|
int randomNum = GenericUtils.getRandomNumber(1, 5);
|
|
assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp));
|
|
}// end pairs-for-loop
|
|
|
|
if ( assignmentsLimitReached ) {
|
|
logger.debug("Done loading urls from the inputFile as the assignmentsLimit (" + workerAssignmentsLimit + ") was reached.");
|
|
break;
|
|
}
|
|
}// end loading-while-loop
|
|
|
|
Scanner scanner = FileUtils.inputScanner.get();
|
|
if ( scanner != null ) // Check if the initial value is null.
|
|
scanner.close();
|
|
|
|
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
|
|
logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + FileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
|
|
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
|
|
}
|
|
|
|
|
|
private String dropCurrentAssignmentTable(Connection con)
|
|
{
|
|
String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector.databaseName + ".current_assignment PURGE";
|
|
PreparedStatement dropCurrentAssignmentsPreparedStatement = null;
|
|
try {
|
|
dropCurrentAssignmentsPreparedStatement = con.prepareStatement(dropCurrentAssignmentsQuery);
|
|
dropCurrentAssignmentsPreparedStatement.execute();
|
|
return null;
|
|
} catch (SQLException sqle) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
return ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, "dropCurrentAssignmentsPreparedStatement", dropCurrentAssignmentsPreparedStatement, con, sqle);
|
|
} finally {
|
|
try {
|
|
if ( dropCurrentAssignmentsPreparedStatement != null )
|
|
dropCurrentAssignmentsPreparedStatement.close();
|
|
} catch (SQLException sqle2) {
|
|
logger.error("Failed to close the \"dropCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage());
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|