389 lines
22 KiB
Java
389 lines
22 KiB
Java
package eu.openaire.urls_controller.controllers;
|
|
|
|
import eu.openaire.urls_controller.configuration.ImpalaConnector;
|
|
import eu.openaire.urls_controller.models.Error;
|
|
import eu.openaire.urls_controller.models.*;
|
|
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
|
|
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
|
|
import eu.openaire.urls_controller.util.FileUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.http.HttpStatus;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.jdbc.core.RowCallbackHandler;
|
|
import org.springframework.web.bind.annotation.*;
|
|
|
|
import javax.servlet.http.HttpServletRequest;
|
|
import java.sql.*;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.regex.Pattern;
|
|
|
|
@RestController
|
|
@RequestMapping("/urls")
|
|
public class UrlController {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
|
|
|
|
@Autowired
|
|
private JdbcTemplate jdbcTemplate;
|
|
|
|
@Autowired
|
|
private FileUtils fileUtils;
|
|
|
|
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
|
|
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
|
|
|
|
@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}")
|
|
private int maxAttemptsPerRecord;
|
|
|
|
@Value("${services.pdfaggregation.controller.assignmentLimit}")
|
|
private int assignmentLimit;
|
|
|
|
@Value("${services.pdfaggregation.controller.db.databaseName}")
|
|
private String databaseName;
|
|
|
|
@GetMapping("")
|
|
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
|
|
|
|
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
|
|
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
|
|
String errorMsg = "Possibly malicious \"workerId\" received: " + workerId;
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
|
|
}
|
|
|
|
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
|
|
|
|
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
|
|
|
|
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
|
|
int assignmentsLimit = workerAssignmentsLimit;
|
|
if ( assignmentsLimit == 0 ) {
|
|
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
|
|
} else if ( assignmentsLimit > assignmentLimit ) {
|
|
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + assignmentLimit + "). Will use the Controller's limit.");
|
|
assignmentsLimit = assignmentLimit;
|
|
}
|
|
|
|
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
|
|
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
|
|
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
|
|
"from " + databaseName + ".publication p\n" +
|
|
"join " + databaseName + ".publication_urls pu on pu.id=p.id\n" +
|
|
"join " + databaseName + ".datasource d on d.id=p.datasourceid\n" +
|
|
"left outer join (select count(a.id) as counts, a.id from " + databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
|
|
"left outer join (\n" +
|
|
" select a.id, a.original_url from " + databaseName + ".assignment a\n" +
|
|
" union all\n" +
|
|
" select pl.id, pl.original_url from " + databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
|
|
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecord + " and not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
|
|
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
|
|
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
|
|
"limit " + assignmentsLimit + ") as findAssignmentsQuery";
|
|
|
|
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
|
|
|
|
String createAssignmentsQuery = "create table " + databaseName + ".current_assignment as \n" + findAssignmentsQuery;
|
|
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + databaseName + ".current_assignment";
|
|
String getAssignmentsQuery = "select * from " + databaseName + ".current_assignment";
|
|
|
|
List<Assignment> assignments = new ArrayList<>();
|
|
|
|
ImpalaConnector.databaseLock.lock();
|
|
|
|
try {
|
|
jdbcTemplate.execute(createAssignmentsQuery);
|
|
} catch (Exception sqle) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
}
|
|
|
|
try {
|
|
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
|
|
} catch (Exception sqle) {
|
|
String errorMsg = dropCurrentAssignmentTable();
|
|
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, sqle);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
|
|
|
|
try {
|
|
jdbcTemplate.query(getAssignmentsQuery, new RowCallbackHandler() {
|
|
@Override
|
|
public void processRow(ResultSet rs) throws SQLException {
|
|
Assignment assignment = new Assignment();
|
|
assignment.setWorkerId(workerId);
|
|
assignment.setTimestamp(timestamp);
|
|
Datasource datasource = new Datasource();
|
|
try { // For each of the 4 columns returned. The indexing starts from 1
|
|
assignment.setId(rs.getString(1));
|
|
assignment.setOriginalUrl(rs.getString(2));
|
|
datasource.setId(rs.getString(3));
|
|
datasource.setName(rs.getString(4));
|
|
} catch (SQLException sqle) {
|
|
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
|
|
}
|
|
assignment.setDatasource(datasource);
|
|
assignments.add(assignment);
|
|
}
|
|
});
|
|
} catch (Exception e) {
|
|
String errorMsg = dropCurrentAssignmentTable();
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
|
|
logger.error(errorMsg, e);
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
int assignmentsSize = assignments.size();
|
|
if ( assignmentsSize == 0 ) {
|
|
String errorMsg = dropCurrentAssignmentTable();
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
ImpalaConnector.databaseLock.unlock();
|
|
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
|
|
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests.";
|
|
logger.error(errorMsg);
|
|
|
|
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
|
|
} else if ( assignmentsSize < assignmentsLimit ) {
|
|
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
|
|
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests.");
|
|
}
|
|
|
|
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
|
|
|
|
|
|
// Write the Assignment details to the assignment-table.
|
|
|
|
// The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party.
|
|
String insertAssignmentsQuery = "insert into " + databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
|
|
+ "from (\n select pubid, url from " + databaseName + ".current_assignment) as pub_data";
|
|
|
|
try {
|
|
jdbcTemplate.execute(insertAssignmentsQuery);
|
|
} catch (Exception sqle) {
|
|
String errorMsg = dropCurrentAssignmentTable();
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
ImpalaConnector.databaseLock.unlock();
|
|
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, sqle);
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
String errorMsg = dropCurrentAssignmentTable();
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
|
|
|
|
String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
|
}
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
|
|
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
|
|
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
|
|
}
|
|
|
|
@PostMapping("addWorkerReport")
|
|
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
|
|
|
|
if ( workerReport == null ) {
|
|
String errorMsg = "No \"WorkerReport\" was given!";
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
|
|
}
|
|
|
|
String curWorkerId = workerReport.getWorkerId();
|
|
if ( curWorkerId == null ) {
|
|
String errorMsg = "No \"workerId\" was included inside the \"WorkerReport\"!";
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
|
|
}
|
|
|
|
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
|
|
if ( MALICIOUS_INPUT_STRING.matcher(curWorkerId).matches() ) {
|
|
String errorMsg = "Possibly malicious \"workerId\" received: " + curWorkerId;
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
|
|
}
|
|
|
|
List<UrlReport> urlReports = workerReport.getUrlReports();
|
|
if ( (urlReports == null) || urlReports.isEmpty() ) {
|
|
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
|
|
logger.error(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
|
|
}
|
|
|
|
long curReportAssignments = workerReport.getAssignmentRequestCounter();
|
|
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
|
|
|
|
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
|
|
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
|
|
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
|
|
}
|
|
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
|
|
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
|
|
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
|
|
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
|
|
}
|
|
|
|
ImpalaConnector.databaseLock.lock();
|
|
|
|
// Store the workerReport into the database.
|
|
String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
|
String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
|
|
|
|
String payloadErrorMsg = null;
|
|
int failedCount = 0;
|
|
|
|
// TODO - Think about handling this loop with multiple threads..
|
|
// The Impala-server will handle the synchronization itself..
|
|
// Check online what happens with "statement.setPoolable()" does it improves speed? in multi or also in single thread?
|
|
|
|
for ( UrlReport urlReport : urlReports ) {
|
|
Payload payload = urlReport.getPayload();
|
|
|
|
if ( payload == null ) {
|
|
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments);
|
|
payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!";
|
|
|
|
continue;
|
|
}
|
|
|
|
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
|
|
Object[] args = new Object[] {
|
|
payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(),
|
|
payload.getMime_type(), payload.getSize() != null?String.valueOf(payload.getSize()):null, payload.getHash(),
|
|
payload.getLocation(), payload.getProvenance()};
|
|
int[] argTypes = new int[] {
|
|
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
|
|
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
|
|
|
|
jdbcTemplate.update(insertIntoPayloadBaseQuery, args, argTypes);
|
|
|
|
} catch (Exception sqle) {
|
|
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": ", sqle);
|
|
}
|
|
|
|
Error error = urlReport.getError();
|
|
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
|
|
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
|
|
error = new Error(null, null);
|
|
}
|
|
|
|
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
|
|
Object[] args = new Object[] {
|
|
payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(),
|
|
urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()};
|
|
int[] argTypes = new int[] {
|
|
Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
|
|
Types.VARCHAR};
|
|
|
|
jdbcTemplate.update(insertIntoAttemptBaseQuery, args, argTypes);
|
|
} catch (Exception sqle) {
|
|
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": ", sqle.getMessage());
|
|
}
|
|
}//end for-loop
|
|
|
|
|
|
if ( payloadErrorMsg != null )
|
|
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables, although " + payloadErrorMsg + " Going to merge the parquet files for those tables.");
|
|
else
|
|
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
|
|
|
|
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
|
}
|
|
|
|
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
|
}
|
|
|
|
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
|
|
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
|
|
// We do not need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
|
|
mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
|
}
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
logger.debug("Finished merging the database tables.");
|
|
return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg);
|
|
}
|
|
|
|
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
|
|
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
|
|
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
|
|
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
|
|
sb.append(baseInsertQuery);
|
|
for ( int i=1; i <= dataSize; ++i ) {
|
|
sb.append("(");
|
|
for ( int j=1; j <= numParamsPerRow; ++j ) {
|
|
sb.append("?");
|
|
if ( j < numParamsPerRow )
|
|
sb.append(",");
|
|
}
|
|
sb.append(")");
|
|
if ( i < dataSize )
|
|
sb.append(",");
|
|
}
|
|
|
|
PreparedStatement preparedInsertStatement;
|
|
try { // We use a "PreparedStatement" to do insertions, for security reasons.
|
|
preparedInsertStatement = con.prepareStatement(sb.toString());
|
|
} catch (SQLException sqle) {
|
|
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
|
|
logger.error(errorMsg + sqle.getMessage());
|
|
throw new RuntimeException(errorMsg);
|
|
}
|
|
return preparedInsertStatement;
|
|
}
|
|
|
|
private String dropCurrentAssignmentTable() {
|
|
String dropCurrentAssignmentsQuery = "DROP TABLE " + databaseName + ".current_assignment PURGE";
|
|
|
|
try {
|
|
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
|
|
return null;
|
|
} catch (Exception sqle) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
return ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, sqle);
|
|
}
|
|
}
|
|
}
|