UrlsController/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java

433 lines
25 KiB
Java

package eu.openaire.urls_controller.controllers;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@RestController
@RequestMapping("/urls")
public class UrlController {
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private FileUtils fileUtils;
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
@Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentLimit;
private final AtomicInteger maxAttemptsPerRecordAtomic;
public UrlController(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord) {
maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord);
}
@GetMapping("")
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + workerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > assignmentLimit ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + assignmentLimit + "). Will use the Controller's limit.");
assignmentsLimit = assignmentLimit;
}
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count\n" +
"from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
"left outer join (select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
"union all\n" +
"select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl)\n" +
"as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() +
"\nand not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
"limit " + (assignmentsLimit * 10) + ")\n" +
"as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit " + assignmentsLimit + ")\n" +
"as findAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
//logger.debug(findAssignmentsQuery); // DEBUG!
String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
ImpalaConnector.databaseLock.lock();
try {
jdbcTemplate.execute(createCurrentAssignmentsQuery);
} catch (Exception e) {
String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
try {
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
} catch (Exception e) {
String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
try {
jdbcTemplate.query(getAssignmentsQuery, rs -> {
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setTimestamp(timestamp);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
assignment.setId(rs.getString(1));
assignment.setOriginalUrl(rs.getString(2));
datasource.setId(rs.getString(3));
datasource.setName(rs.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
}
assignment.setDatasource(datasource);
assignments.add(assignment);
});
} catch (Exception e) {
String errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
String errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.";
logger.error(errorMsg);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} else if ( assignmentsSize < assignmentsLimit ) {
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.");
}
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
// The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party.
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
try {
jdbcTemplate.execute(insertAssignmentsQuery);
} catch (Exception e) {
String errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
String errorMsg = dropCurrentAssignmentTable();
if ( errorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.databaseLock.unlock();
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
public static ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
String curWorkerId = workerReport.getWorkerId();
if ( curWorkerId == null ) {
String errorMsg = "No \"workerId\" was included inside the \"WorkerReport\"!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(curWorkerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + curWorkerId;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
}
int sizeOUrlReports = 0;
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || ((sizeOUrlReports = urlReports.size()) == 0) ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available and continue with writing the attempts and the payloads.
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
}
else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments);
// Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons.
final String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
final int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
final String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
final int[] attemptArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
final AtomicInteger failedCount = new AtomicInteger(0);
// Split the "UrlReports" into some sub-lists
int sizeOfEachSubList = (int)(sizeOUrlReports * 0.2);
List<List<UrlReport>> subLists = Lists.partition(urlReports, sizeOfEachSubList);
// The above will create some sub-lists, each one containing 20% of total amount.
List<Callable<Void>> callableTasks = new ArrayList<>(6);
// One thread will handle the inserts to the "payload" table adn the other to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala).
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
String fileLocation = payload.getLocation();
if ( fileLocation == null ) // We want only the records with uploaded full-texts in the "payload" table.
continue;
try {
Long size = payload.getSize();
Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(),
payload.getMime_type(), (size != null) ? String.valueOf(size) : null, payload.getHash(), fileLocation, payload.getProvenance()};
jdbcTemplate.update(insertIntoPayloadBaseQuery, args, payloadArgTypes);
} catch (Exception e) {
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + e.getMessage());
failedCount.incrementAndGet();
}
}
return null;
});
int subListsSize = subLists.size();
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
runInsertsToAttemptTable(subLists.get(finalI), curReportAssignments, insertIntoAttemptBaseQuery, attemptArgTypes, failedCount);
return null;
});
}
ImpalaConnector.databaseLock.lock();
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
insertsExecutor.invokeAll(callableTasks);
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// This is a very rare casa. At the moment, we just move on with table-merging.
} catch (Exception e) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
int failedQueries = failedCount.get();
String failedQueriesMsg = failedQueries + " out of " + (sizeOUrlReports *2) + " failed to be processed!";
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables" + ((failedQueries > 0) ? (", although " + failedQueriesMsg) : ".")
+ " Going to merge the parquet files for those tables.");
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.databaseLock.unlock();
logger.debug("Finished merging the database tables.");
return ResponseEntity.status(HttpStatus.OK).body(failedQueriesMsg);
}
private String dropCurrentAssignmentTable() {
String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE";
try {
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
return null; // All good. No error-message.
} catch (Exception e) {
return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e);
}
}
private void runInsertsToAttemptTable(List<UrlReport> urlReports, long curReportAssignments, String insertIntoAttemptBaseQuery, int[] attemptArgTypes, AtomicInteger failedCount )
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(),
urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()};
jdbcTemplate.update(insertIntoAttemptBaseQuery, args, attemptArgTypes);
} catch (Exception e) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + e.getMessage());
failedCount.incrementAndGet();
}
}
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for ( int i=1; i <= dataSize; ++i ) {
sb.append("(");
for ( int j=1; j <= numParamsPerRow; ++j ) {
sb.append("?");
if ( j < numParamsPerRow )
sb.append(",");
}
sb.append(")");
if ( i < dataSize )
sb.append(",");
}
PreparedStatement preparedInsertStatement;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con.prepareStatement(sb.toString());
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
logger.error(errorMsg + sqle.getMessage());
throw new RuntimeException(errorMsg);
}
return preparedInsertStatement;
}
}