package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
public class UrlController {
private static final Logger logger = LoggerFactory.getLogger(UrlController.class);
private JdbcTemplate jdbcTemplate;
private FileUtils fileUtils;
private ParquetFileUtils parquetFileUtils;
public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
private int assignmentLimit;
private final AtomicInteger maxAttemptsPerRecordAtomic;
public UrlController(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord) {
maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord);
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + workerId;
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " assignments. The assignments-limit of the controller is: " + assignmentLimit);
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
// Sanitize the "assignmentsLimit". Do not let an overload happen in the Controller's or the Impala's server.
int assignmentsLimit = workerAssignmentsLimit;
if ( assignmentsLimit == 0 ) {
String errorMsg = "The given \"workerAssignmentsLimit\" was ZERO!";
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} else if ( assignmentsLimit > assignmentLimit ) {
logger.warn("The given \"workerAssignmentsLimit\" (" + workerAssignmentsLimit + ") was larger than the Controller's limit (" + assignmentLimit + "). Will use the Controller's limit.");
assignmentsLimit = assignmentLimit;
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count\n" +
"from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector.databaseName + ".publication p\n" +
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
"left outer join (select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
"union all\n" +
"select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl)\n" +
"as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() +
"\nand not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
"limit " + (assignmentsLimit * 10) + ")\n" +
"as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit " + assignmentsLimit + ")\n" +
"as findAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
//logger.debug(findAssignmentsQuery); // DEBUG!
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery);
if ( errorMsg != null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
long timestampMillis = System.currentTimeMillis();
Timestamp timestamp = new Timestamp(timestampMillis); // Store it here, in order to have the same for all current records.
try {
jdbcTemplate.query(getAssignmentsQuery, rs -> {
Assignment assignment = new Assignment();
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
} catch (Exception e) {
errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.";
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null ) {
errorMsg += "\n" + tmpErrMsg; // The additional error-msg is already logged.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} else
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body(new AssignmentsResponse((long) -1, null));
} else if ( assignmentsSize < assignmentsLimit ) {
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.");
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', " + timestampMillis + "\n"
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
try {
} catch (Exception e) {
errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = dropCurrentAssignmentTable();
if ( errorMsg != null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null);
if ( mergeErrorMsg != null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
public static final ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
String curWorkerId = workerReport.getWorkerId();
if ( curWorkerId == null ) {
String errorMsg = "No \"workerId\" was included inside the \"WorkerReport\"!";
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(curWorkerId).matches() ) {
String errorMsg = "Possibly malicious \"workerId\" received: " + curWorkerId;
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
int sizeOUrlReports = 0;
List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || ((sizeOUrlReports = urlReports.size()) == 0) ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments);
String currentParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + assignmentsBatchCounter.get() + File.separator;
java.nio.file.Path parquetDirPath = Paths.get(currentParquetPath);
if ( !Files.isDirectory(parquetDirPath) ) {
try {
} catch (Exception e) {
logger.error("", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage());
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments);
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOUrlReports, curReportAssignments, currentParquetPath, uploadFullTextsResponse);
boolean hasAttemptParquetFileProblem = false;
boolean hasPayloadParquetFileProblem = false;
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
List<Future<ParquetReport>> futures = insertsExecutor.invokeAll(callableTasks);
SumParquetSuccess sumParquetSuccess = checkParquetFilesSuccess(futures);
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
if ( errorResponseEntity != null ) {
return errorResponseEntity; // The related log is already shown.
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem();
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignments);
else {
if ( hasAttemptParquetFileProblem )
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignments);
else if ( hasPayloadParquetFileProblem )
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload\", for batch-assignments_" + curReportAssignments);
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files.");
// Load all the parquet files of each type into its table.
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt");
if ( ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload");
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload\" table, for batch-assignments_" + curReportAssignments);
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// This is a very rare case. At the moment, we just move on with table-merging.
} catch (RuntimeException re) {
String errorMsg = re.getMessage();
String assignmentErrorMsg = deleteWorkerAssignments(curWorkerId);
if ( assignmentErrorMsg != null ) {
errorMsg += "\n" + assignmentErrorMsg;
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} catch (Exception e) {
String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
logger.debug("Deleting directory: " + currentParquetPath);
FileUtils.deleteDirectory(new File(currentParquetPath));
logger.debug("Going to merge the parquet files for the tables which were altered.");
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
String mergeErrorMsg;
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
if ( ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
if ( mergeErrorMsg != null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
mergeErrorMsg = deleteWorkerAssignments(curWorkerId);
if ( mergeErrorMsg != null ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
logger.debug("Finished merging the database tables.");
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful )
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body("The full-text files failed to be acquired from the worker!");
return ResponseEntity.status(HttpStatus.OK).build();
private String deleteWorkerAssignments(String curWorkerId)
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
return fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery)
String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
try {
} catch (Exception e) {
String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
return errorMsg;
try {
} catch (Exception e) {
String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
return errorMsg;
return null; // All good.
private String dropCurrentAssignmentTable() {
String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE";
try {
return null; // All good. No error-message.
} catch (Exception e) {
return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e); // The error is already logged inside.
private static SumParquetSuccess checkParquetFilesSuccess(List<Future<ParquetReport>> futures)
int numOfAllAttemptParquetFileCreations = 0;
int numOfFailedAttemptParquetFileCreations = 0;
int numOfAllPayloadParquetFileCreations = 0;
int numOfFailedPayloadParquetFileCreations = 0;
for ( Future<ParquetReport> future : futures )
ParquetReport parquetReport = null;
try {
parquetReport = future.get();
boolean hasProblems = (! parquetReport.isSuccessful());
ParquetReport.ParquetType parquetType = parquetReport.getParquetType();
if ( parquetType.equals(ParquetReport.ParquetType.attempt) ) {
if ( hasProblems )
} else if ( parquetType.equals(ParquetReport.ParquetType.payload) ) {
numOfAllPayloadParquetFileCreations ++;
if ( hasProblems )
numOfFailedPayloadParquetFileCreations ++;
} else {
String errMsg = "An invalid \"ParquetReport.ParquetType\" was found: " + parquetType; // This should never happen, but anyway.
return new SumParquetSuccess(false, false, ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errMsg));
} catch (Exception e) {
logger.error("", e);
// We do not know if the failed "future" refers to a "payload" or to a "attempt".
// So we cannot increase a specific counter. That's ok, the only drawback if that we may try to "load" the non-existent data and get an exception.
} // End-for
boolean hasAttemptParquetFileProblem = (numOfFailedAttemptParquetFileCreations == numOfAllAttemptParquetFileCreations);
boolean hasPayloadParquetFileProblem = (numOfFailedPayloadParquetFileCreations == numOfAllPayloadParquetFileCreations);
return new SumParquetSuccess(hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, null);
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
for ( int i=1; i <= dataSize; ++i ) {
for ( int j=1; j <= numParamsPerRow; ++j ) {
if ( j < numParamsPerRow )
if ( i < dataSize )
PreparedStatement preparedInsertStatement;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con.prepareStatement(sb.toString());
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
logger.error(errorMsg + sqle.getMessage());
throw new RuntimeException(errorMsg);
return preparedInsertStatement;