package eu.openaire.urls_controller.services; import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.ParquetFileUtils; import io.micrometer.core.annotation.Timed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import org.springframework.web.client.HttpClientErrorException; import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.client.RestTemplate; import java.io.File; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @Service public class UrlsServiceImpl implements UrlsService { private static final Logger logger = LoggerFactory.getLogger(UrlsServiceImpl.class); @Autowired private JdbcTemplate jdbcTemplate; @Autowired private FileUtils fileUtils; @Autowired private ParquetFileUtils parquetFileUtils; @Value("${services.pdfaggregation.controller.workerReportsDirPath}") private String workerReportsDirPath; public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); private final AtomicInteger maxAttemptsPerRecordAtomic; private static String excludedDatasourceIDsStringList = null; public static final ExecutorService insertsExecutor = Executors.newFixedThreadPool(6); // TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle? public UrlsServiceImpl(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord, BulkImport bulkImport) { maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord); HashMap bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources()); // The "bulkImportSources" will not be null, as it will be defined inside the "application.yml" file. // In case no bulkImport Datasources are given, then the "bulkImportSources" list will just be empty. if ( bulkImportSources.isEmpty() ) return; // So the "excludedDatasourceIDsStringList" -code should be placed last in this Constructor-method. if ( logger.isTraceEnabled() ) logger.trace("BulkImportSources:\n" + bulkImportSources); List excludedIDs = new ArrayList<>(); for ( BulkImport.BulkImportSource source : bulkImportSources.values() ) { String datasourceID = source.getDatasourceID(); if ( (datasourceID == null) || datasourceID.isEmpty() ) throw new RuntimeException("One of the bulk-imported datasourceIDs was not found! | source: " + source); excludedIDs.add(datasourceID); } int exclusionListSize = excludedIDs.size(); // This list will not be empty. // Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern: // ("ID_1", "ID_2", ...) final StringBuilder sb = new StringBuilder((exclusionListSize * 46) + (exclusionListSize -1) +2 ); sb.append("("); for ( int i=0; i < exclusionListSize; ++i ) { sb.append("\"").append(excludedIDs.get(i)).append("\""); if ( i < (exclusionListSize -1) ) sb.append(", "); } sb.append(")"); excludedDatasourceIDsStringList = sb.toString(); logger.info("The following bulkImport-datasources will be excluded from crawling: " + excludedDatasourceIDsStringList); } @Timed(value = "getAssignments.time", description = "Time taken to return the assignments.") public ResponseEntity getAssignments(String workerId, int assignmentsLimit) { // Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >. String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcename\n" + // Select the final sorted data with "assignmentsLimit". "from (select distinct pubid, url, datasourceid, datasourcename, attempt_count, pub_year\n" + // Select the distinct id-url data. Beware that this will return duplicate id-url paris, wince one pair may be associated with multiple datasources. " from (select p.id as pubid, pu.url as url, pb.level as level, attempts.counts as attempt_count, p.year as pub_year, d.id as datasourceid, d.name as datasourcename\n" + // Select all needed columns frm JOINs, order by "boost.level" and limit them to (assignmentsLimit * 10) " from " + DatabaseConnector.databaseName + ".publication p\n" + " join " + DatabaseConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" + " join " + DatabaseConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" + // This is needed for the "d.allow_harvest=true" check later on. " left outer join " + DatabaseConnector.databaseName + ".publication_boost pb\n" + " on p.id=pb.id\n" + " left outer join (select count(a.id) as counts, a.id from " + DatabaseConnector.databaseName + ".attempt a group by a.id) as attempts\n" + " on attempts.id=p.id\n" + " left outer join (\n" + " select a.id, a.original_url from " + DatabaseConnector.databaseName + ".assignment a\n" + " union all\n" + " select pl.id, pl.original_url from " + DatabaseConnector.databaseName + ".payload pl\n" + // Here we access the payload-VIEW which includes the three payload-tables. " ) as existing\n" + " on existing.id=p.id and existing.original_url=pu.url\n" + " where d.allow_harvest=true and existing.id is null\n" + // For records not found on existing, the "existing.id" will be null. ((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below. (" and d.id not in " + excludedDatasourceIDsStringList + "\n") : "") + " and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + "\n" + " and not exists (select 1 from " + DatabaseConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + " and pu.url != '' and pu.url is not null\n" + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing. " order by coalesce(level, -1000) desc\n" + " limit " + (assignmentsLimit * 10) + "\n" + " ) as non_distinct_results\n" + " order by coalesce(attempt_count, 0), coalesce(pub_year, 0) desc, reverse(pubid), url\n" + // We also order by reverse "pubid" and "url", in order to get the exactly same records for consecutive runs, all things being equal. " limit " + assignmentsLimit + "\n" + ") as findAssignmentsQuery"; // The datasourceID and datasourceName are currently not used during the processing in the Worker. They may be used by the Worker, in the future to apply a datasource-specific aggregation plugin to take the full-texts quickly, instead of using the general crawling one. // However, the "datasourceid" is useful to be able to generate the fileNames for the S3, without needing to perform additional select queries (with JOINs) at that phase. // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. //logger.trace("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG! final String getAssignmentsQuery = "select * from " + DatabaseConnector.databaseName + ".current_assignment"; List assignments = new ArrayList<>(assignmentsLimit); long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet(); DatabaseConnector.databaseLock.lock(); // For performance reasons, we collect the returned assignments and create a temp-table with them, so that later they can be copied efficiently to the "a-bit-more-permanent" assignment table. // This way, we avoid having to manually insert thousands of assignment records there. Instead, we create a new table AS the results from the "findAssignmentsQuery". String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery); if ( errorMsg != null ) { DatabaseConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } long timestampMillis = System.currentTimeMillis(); Timestamp timestamp = new Timestamp(timestampMillis); // Store it here, in order to have the same for all current records. try { jdbcTemplate.query(getAssignmentsQuery, rs -> { Assignment assignment = new Assignment(); assignment.setWorkerId(workerId); assignment.setAssignmentsBatchCounter(curAssignmentsBatchCounter); assignment.setTimestamp(timestamp); Datasource datasource = new Datasource(); try { // For each of the 4 columns returned, do the following. The column-indexing starts from 1 assignment.setId(rs.getString(1)); assignment.setOriginalUrl(rs.getString(2)); datasource.setId(rs.getString(3)); datasource.setName(rs.getString(4)); } catch (SQLException sqle) { logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle); } assignment.setDatasource(datasource); assignments.add(assignment); }); } catch (EmptyResultDataAccessException erdae) { errorMsg = "No results were returned for \"getAssignmentsQuery\":\n" + getAssignmentsQuery; String tmpErrMsg = dropCurrentAssignmentTable(); DatabaseConnector.databaseLock.unlock(); if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; logger.warn(errorMsg); return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg); } catch (Exception e) { errorMsg = DatabaseConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e); String tmpErrMsg = dropCurrentAssignmentTable(); DatabaseConnector.databaseLock.unlock(); if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } int assignmentsSize = assignments.size(); if ( assignmentsSize == 0 ) { errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests."; logger.error(errorMsg); String tmpErrMsg = dropCurrentAssignmentTable(); DatabaseConnector.databaseLock.unlock(); if ( tmpErrMsg != null ) { errorMsg += "\n" + tmpErrMsg; // The additional error-msg is already logged. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } else return ResponseEntity.status(HttpStatus.MULTI_STATUS).body(new AssignmentsResponse((long) -1, null)); } else if ( assignmentsSize < assignmentsLimit ) logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + ", for the next requests."); logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker."); // Write the Assignment details to the assignment-table. String insertAssignmentsQuery = "insert into " + DatabaseConnector.databaseName + ".assignment \n select pubid, url, '" + workerId + "', " + curAssignmentsBatchCounter + ", " + timestampMillis + "\nfrom " + DatabaseConnector.databaseName + ".current_assignment"; try { jdbcTemplate.execute(insertAssignmentsQuery); } catch (Exception e) { errorMsg = DatabaseConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e); String tmpErrMsg = dropCurrentAssignmentTable(); if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; DatabaseConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } errorMsg = dropCurrentAssignmentTable(); if ( errorMsg != null ) { DatabaseConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table."); String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null); if ( mergeErrorMsg != null ) { DatabaseConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); } DatabaseConnector.databaseLock.unlock(); // Due to the fact that one publication with an id-url pair can be connected with multiple datasources, the results returned from the query may be duplicates. // So, we apply a post-processing step where we collect only one instance of each id-url pair and send it to the Worker. // We would certainly like the query to return assignments with unique id-url pairs, but for now there is no other solution. // There is no logic-error though, since the worker requests a "LIMIT" of 5000 not "exactly 5000" nor it matters the actual number it will process. // So, it will ask for "up to 5000" and the Controller may return 4700. // Since the average number of duplicates will be the same for each worker-request, none of the Workers will have any advantage over the other in the long run, // so there will be no conflicting performance data between them. final HashMap uniquePairsAndAssignments = new HashMap<>((int) (assignmentsLimit * 0.9)); for ( Assignment assignment : assignments ) { uniquePairsAndAssignments.put(assignment.getId() + "_" + assignment.getOriginalUrl(), assignment); // This will just update the duplicate record with another "assignment object", containing a different datasource. } List distinctAssignments = new ArrayList<>(uniquePairsAndAssignments.values()); int distinctAssignmentsSize = distinctAssignments.size(); if ( logger.isTraceEnabled() ) logger.trace("numDuplicates in returned assignments_" + curAssignmentsBatchCounter + " = " + (assignmentsSize - distinctAssignmentsSize)); logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + distinctAssignmentsSize + " assignments to worker with ID: " + workerId + "."); return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, distinctAssignments)); } public static final AtomicLong numOfWorkerReportsProcessed = new AtomicLong(0); @Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.") public Boolean addWorkerReport(String curWorkerId, long curReportAssignmentsCounter, List urlReports, int sizeOfUrlReports) { long currentNumOfWorkerReportsProcessed = numOfWorkerReportsProcessed.incrementAndGet(); // Increment it when it is actually being pressed, not when it arrives at the endpoint. logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter); // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, curReportAssignmentsCounter, curWorkerId); if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) { postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Problem with the Impala-database!"); return false; } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter); // The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available. fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false); // We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store. // We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later. // The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later. } else logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignmentsCounter); String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignmentsCounter + File.separator; try { Files.createDirectories(Paths.get(localParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException" } catch (Exception e) { String errorMsg = "Could not create the parquet-directory: " + localParquetPath; logger.error(errorMsg, e); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg); return false; } logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignmentsCounter); // Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables. String endingMkDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.mkDirsAndParams; if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams) || !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams) ) { postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); return false; } List> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, localParquetPath, uploadFullTextsResponse); boolean hasAttemptParquetFileProblem = false; boolean hasPayloadParquetFileProblem = false; try { // Invoke all the tasks and wait for them to finish. List> futures = insertsExecutor.invokeAll(callableTasks); SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures); ResponseEntity errorResponseEntity = sumParquetSuccess.getResponseEntity(); if ( errorResponseEntity != null ) { // The related log is already shown. postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating or uploading the parquet files!"); return false; } hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem(); hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem(); if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem ) throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignmentsCounter); else { if ( hasAttemptParquetFileProblem ) logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignmentsCounter); else if ( hasPayloadParquetFileProblem ) logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter); else logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files."); } // Load all the parquet files of each type into its table. DatabaseConnector.databaseLock.lock(); if ( ! hasAttemptParquetFileProblem ) hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/", "attempt"); if ( ! hasPayloadParquetFileProblem ) hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignmentsCounter + "/", "payload_aggregated"); DatabaseConnector.databaseLock.unlock(); if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem ) throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter); else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem ) logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignmentsCounter); else logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter); } catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled. logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage()); // This is a very rare case. At the moment, we just move on with table-merging. } catch (RuntimeException re) { DatabaseConnector.databaseLock.lock(); String assignmentErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter); DatabaseConnector.databaseLock.unlock(); String errorMsg = re.getMessage(); if ( assignmentErrorMsg != null ) errorMsg += "\n" + assignmentErrorMsg; logger.error(errorMsg); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg); return false; } catch (Exception e) { String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage(); logger.error(errorMsg, e); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg); return false; // No tables-merging is happening. } finally { logger.debug("Deleting parquet directory: " + localParquetPath); fileUtils.deleteDirectory(new File(localParquetPath)); // Delete the HDFS subDirs for this Report. String endingRmDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.rmDirsAndParams; if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams) || !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams) ) { logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); // A directory-specific log has already appeared. // The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files). // The worst case is that a few subDirs will be left back in the HDFS, although, without their parquetFiles, since they have already moved inside the DB tables. } } // For every "numOfWorkers" assignment-batches that go to workers, we merge the tables, once a workerReport comes in. // After the first few increases of "assignmentsBatchCounter" until all workers get assignment-batches, // there will always be a time when the counter will be just before the "golden-value" and then one workerReport has to be processed here and the counter will be incremented by one and signal the merging-time. if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem) ) return false; if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "The full-text files failed to be acquired from the worker!"); return false; } // Notify the Worker that the processing of this report was successful, so that the Worker can delete the files. postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, null); return true; } private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery) { final String createCurrentAssignmentsQuery = "create table " + DatabaseConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; final String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + DatabaseConnector.databaseName + ".current_assignment"; try { jdbcTemplate.execute(createCurrentAssignmentsQuery); } catch (Exception e) { String errorMsg = DatabaseConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e); String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened). if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; return errorMsg; } try { jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery); } catch (Exception e) { String errorMsg = DatabaseConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e); String tmpErrMsg = dropCurrentAssignmentTable(); if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; return errorMsg; } return null; // All good. } private String dropCurrentAssignmentTable() { String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + DatabaseConnector.databaseName + ".current_assignment PURGE"; try { jdbcTemplate.execute(dropCurrentAssignmentsQuery); return null; // All good. No error-message. } catch (Exception e) { return DatabaseConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e); // The error is already logged inside. } } private boolean mergeWorkerRelatedTables(String curWorkerId, long curReportAssignmentsCounter, boolean hasAttemptParquetFileProblem, boolean hasPayloadParquetFileProblem) { logger.debug("Going to merge the parquet files for the tables which were altered."); // When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table. // This means that over time a table may have thousand of parquet files and the search through them will be very slow. Thus, we merge them every now and then. String mergeErrorMsg; DatabaseConnector.databaseLock.lock(); if ( ! hasAttemptParquetFileProblem ) { mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null); if ( mergeErrorMsg != null ) { DatabaseConnector.databaseLock.unlock(); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); return false; } } if ( ! hasPayloadParquetFileProblem ) { mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null); if ( mergeErrorMsg != null ) { DatabaseConnector.databaseLock.unlock(); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); return false; } } mergeErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter); if ( mergeErrorMsg != null ) { DatabaseConnector.databaseLock.unlock(); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); return false; } DatabaseConnector.databaseLock.unlock(); logger.debug("Finished merging the database tables."); return true; } private String deleteAssignmentsBatch(long givenAssignmentsBatchCounter) { // This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data. // Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table. // We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload_aggregated" table for previously handled tasks. return fileUtils.mergeParquetFiles("assignment", " WHERE assignments_batch_counter != ", givenAssignmentsBatchCounter); } private static final RestTemplate restTemplate = new RestTemplate(); private boolean postReportResultToWorker(String workerId, long assignmentRequestCounter, String errorMsg) { // Get the IP of this worker. WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); if ( workerInfo == null ) { logger.error("Could not find any info for worker with id: \"" + workerId +"\"."); return false; } String url = "http://" + workerInfo.getWorkerIP() + ":1881/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will not be null. if ( logger.isTraceEnabled() ) logger.trace("Going to \"postReportResultToWorker\": \"" + workerId + "\", for assignments_" + assignmentRequestCounter + ((errorMsg != null) ? "\nError: " + errorMsg : "")); try { ResponseEntity responseEntity = restTemplate.postForEntity(url, errorMsg, String.class); // We may pass a "null" entity. int responseCode = responseEntity.getStatusCodeValue(); if ( responseCode != HttpStatus.OK.value() ) { logger.error("HTTP-Connection problem with the submission of the \"postReportResultToWorker\" of worker \"" + workerId + "\" and assignments_" + assignmentRequestCounter + "! Error-code was: " + responseCode); return false; } else { fileUtils.deleteFile(workerReportsDirPath + "/" + workerId + "/" + workerId + "_assignments_" + assignmentRequestCounter + "_report.json"); return true; } } catch (HttpServerErrorException hsee) { logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage()); return false; } catch (Exception e) { errorMsg = "Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId; Throwable cause = e.getCause(); String exMsg; if ( (cause != null) && ((exMsg = cause.getMessage()) != null) && exMsg.contains("Connection refused") ) { logger.error(errorMsg + " | The worker has probably crashed, since we received a \"Connection refused\"!"); workerInfo.setHasShutdown(true); // Avoid sending possible shutdown-Requests later on. Also show a specific message if this Worker requests new assignments in the future. } else logger.error(errorMsg, e); return false; } } // The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution. // Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested. private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException { StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - If this is ever used, make it a global Thread-Local var. And then "clear" (reset) it after each use. sb.append(baseInsertQuery); for ( int i=1; i <= dataSize; ++i ) { sb.append("("); for ( int j=1; j <= numParamsPerRow; ++j ) { sb.append("?"); if ( j < numParamsPerRow ) sb.append(","); } sb.append(")"); if ( i < dataSize ) sb.append(","); } PreparedStatement preparedInsertStatement; try { // We use a "PreparedStatement" to do insertions, for security reasons. preparedInsertStatement = con.prepareStatement(sb.toString()); } catch (SQLException sqle) { String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n"; logger.error(errorMsg + sqle.getMessage()); throw new RuntimeException(errorMsg); } return preparedInsertStatement; } }