534 lines
32 KiB
Java
534 lines
32 KiB
Java
package eu.openaire.urls_controller.services;
|
|
|
|
import eu.openaire.urls_controller.components.BulkImport;
|
|
import eu.openaire.urls_controller.configuration.ImpalaConnector;
|
|
import eu.openaire.urls_controller.controllers.UrlsController;
|
|
import eu.openaire.urls_controller.models.*;
|
|
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
|
|
import eu.openaire.urls_controller.util.FileUtils;
|
|
import eu.openaire.urls_controller.util.ParquetFileUtils;
|
|
import io.micrometer.core.annotation.Timed;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.dao.EmptyResultDataAccessException;
|
|
import org.springframework.http.HttpStatus;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.web.client.HttpServerErrorException;
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
|
import java.io.File;
|
|
import java.nio.file.Files;
|
|
import java.nio.file.Paths;
|
|
import java.sql.Connection;
|
|
import java.sql.PreparedStatement;
|
|
import java.sql.SQLException;
|
|
import java.sql.Timestamp;
|
|
import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
@Service
|
|
public class UrlsServiceImpl implements UrlsService {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(UrlsServiceImpl.class);
|
|
|
|
@Autowired
|
|
private JdbcTemplate jdbcTemplate;
|
|
|
|
@Autowired
|
|
private FileUtils fileUtils;
|
|
|
|
@Autowired
|
|
private ParquetFileUtils parquetFileUtils;
|
|
|
|
@Value("${services.pdfaggregation.controller.workerReportsDirPath}")
|
|
private String workerReportsDirPath;
|
|
|
|
public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
|
|
|
|
private final AtomicInteger maxAttemptsPerRecordAtomic;
|
|
|
|
private static String excludedDatasourceIDsStringList = null;
|
|
|
|
public static final ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
|
|
// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
|
|
|
|
|
|
public UrlsServiceImpl(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord, BulkImport bulkImport)
|
|
{
|
|
maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord);
|
|
|
|
HashMap<String, BulkImport.BulkImportSource> bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources());
|
|
// The "bulkImportSources" will not be null, as it will be defined inside the "application.yml" file.
|
|
// In case no bulkImport Datasources are given, then the "bulkImportSources" list will just be empty.
|
|
if ( bulkImportSources.isEmpty() )
|
|
return; // So the "excludedDatasourceIDsStringList" -code should be placed last in this Constructor-method.
|
|
|
|
if ( logger.isTraceEnabled() )
|
|
logger.trace("BulkImportSources:\n" + bulkImportSources);
|
|
|
|
List<String> excludedIDs = new ArrayList<>();
|
|
for ( BulkImport.BulkImportSource source : bulkImportSources.values() ) {
|
|
String datasourceID = source.getDatasourceID();
|
|
if ( (datasourceID == null) || datasourceID.isEmpty() )
|
|
throw new RuntimeException("One of the bulk-imported datasourceIDs was not found! | source: " + source);
|
|
excludedIDs.add(datasourceID);
|
|
}
|
|
|
|
int exclusionListSize = excludedIDs.size(); // This list will not be empty.
|
|
|
|
// Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern:
|
|
// ("ID_1", "ID_2", ...)
|
|
final StringBuilder sb = new StringBuilder((exclusionListSize * 46) + (exclusionListSize -1) +2 );
|
|
sb.append("(");
|
|
for ( int i=0; i < exclusionListSize; ++i ) {
|
|
sb.append("\"").append(excludedIDs.get(i)).append("\"");
|
|
if ( i < (exclusionListSize -1) )
|
|
sb.append(", ");
|
|
}
|
|
sb.append(")");
|
|
|
|
excludedDatasourceIDsStringList = sb.toString();
|
|
logger.info("The following bulkImport-datasources will be excluded from crawling: " + excludedDatasourceIDsStringList);
|
|
}
|
|
|
|
|
|
@Timed(value = "getAssignments.time", description = "Time taken to return the assignments.")
|
|
public ResponseEntity<?> getAssignments(String workerId, int assignmentsLimit)
|
|
{
|
|
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
|
|
String findAssignmentsQuery =
|
|
"select pubid, url, datasourceid, datasourcename\n" + // The datsourceName is currently not used. It may be used by the Worker, in the future to apply a datasource-specific aggregation plugin to take the full-texts quickly, instead of using the general crawling one.
|
|
"from (select distinct pubid, url, datasourceid, datasourcename, attempt_count, pub_year\n" +
|
|
" from (select p.id as pubid, p.year as pub_year, pu.url as url, pb.level as level, d.id as datasourceid, d.name as datasourcename, attempts.counts as attempt_count\n" +
|
|
" from " + ImpalaConnector.databaseName + ".publication p\n" +
|
|
" join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
|
|
" join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
|
|
" left outer join " + ImpalaConnector.databaseName + ".publication_boost pb\n" +
|
|
" on p.id=pb.id\n" +
|
|
" left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts\n" +
|
|
" on attempts.id=p.id\n" +
|
|
" left outer join (\n" +
|
|
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
|
|
" union all\n" +
|
|
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl\n" + // Here we access the payload-VIEW which includes the three payload-tables.
|
|
" ) as existing\n" +
|
|
" on existing.id=p.id and existing.original_url=pu.url\n" +
|
|
" where d.allow_harvest=true and existing.id is null\n" + // For records not found on existing, the "existing.id" will be null.
|
|
((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below.
|
|
(" and d.id not in " + excludedDatasourceIDsStringList + "\n") : "") +
|
|
" and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + "\n" +
|
|
" and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
|
|
" and pu.url != '' and pu.url is not null\n" + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
|
|
" order by coalesce(level, -1000) desc\n" +
|
|
" limit " + (assignmentsLimit * 10) + ")\n" +
|
|
" as non_distinct_results\n" +
|
|
" order by coalesce(attempt_count, 0), coalesce(pub_year, 0) desc, reverse(pubid), url\n" + // We also order by "id" and "url", in order to get the exactly same records for consecutive runs, all things being equal.
|
|
" limit " + assignmentsLimit + ")\n" +
|
|
"as findAssignmentsQuery";
|
|
|
|
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
|
|
//logger.trace("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG!
|
|
|
|
final String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
|
|
|
|
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
|
|
|
|
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
|
|
|
|
ImpalaConnector.databaseLock.lock();
|
|
|
|
String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery);
|
|
if ( errorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
long timestampMillis = System.currentTimeMillis();
|
|
Timestamp timestamp = new Timestamp(timestampMillis); // Store it here, in order to have the same for all current records.
|
|
|
|
try {
|
|
jdbcTemplate.query(getAssignmentsQuery, rs -> {
|
|
Assignment assignment = new Assignment();
|
|
assignment.setWorkerId(workerId);
|
|
assignment.setAssignmentsBatchCounter(curAssignmentsBatchCounter);
|
|
assignment.setTimestamp(timestamp);
|
|
Datasource datasource = new Datasource();
|
|
try { // For each of the 4 columns returned. The indexing starts from 1
|
|
assignment.setId(rs.getString(1));
|
|
assignment.setOriginalUrl(rs.getString(2));
|
|
datasource.setId(rs.getString(3));
|
|
datasource.setName(rs.getString(4));
|
|
} catch (SQLException sqle) {
|
|
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
|
|
}
|
|
assignment.setDatasource(datasource);
|
|
assignments.add(assignment);
|
|
});
|
|
} catch (EmptyResultDataAccessException erdae) {
|
|
errorMsg = "No results were returned for \"getAssignmentsQuery\":\n" + getAssignmentsQuery;
|
|
String tmpErrMsg = dropCurrentAssignmentTable();
|
|
ImpalaConnector.databaseLock.unlock();
|
|
if ( tmpErrMsg != null )
|
|
errorMsg += "\n" + tmpErrMsg;
|
|
logger.warn(errorMsg);
|
|
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
|
|
} catch (Exception e) {
|
|
errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e);
|
|
String tmpErrMsg = dropCurrentAssignmentTable();
|
|
ImpalaConnector.databaseLock.unlock();
|
|
if ( tmpErrMsg != null )
|
|
errorMsg += "\n" + tmpErrMsg;
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
int assignmentsSize = assignments.size();
|
|
if ( assignmentsSize == 0 ) {
|
|
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.";
|
|
logger.error(errorMsg);
|
|
String tmpErrMsg = dropCurrentAssignmentTable();
|
|
ImpalaConnector.databaseLock.unlock();
|
|
if ( tmpErrMsg != null ) {
|
|
errorMsg += "\n" + tmpErrMsg; // The additional error-msg is already logged.
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
} else
|
|
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body(new AssignmentsResponse((long) -1, null));
|
|
} else if ( assignmentsSize < assignmentsLimit )
|
|
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + ", for the next requests.");
|
|
|
|
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
|
|
|
|
// Write the Assignment details to the assignment-table.
|
|
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pubid, url, '" + workerId + "', " + curAssignmentsBatchCounter + ", " + timestampMillis
|
|
+ "\nfrom " + ImpalaConnector.databaseName + ".current_assignment";
|
|
|
|
try {
|
|
jdbcTemplate.execute(insertAssignmentsQuery);
|
|
} catch (Exception e) {
|
|
errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e);
|
|
String tmpErrMsg = dropCurrentAssignmentTable();
|
|
if ( tmpErrMsg != null )
|
|
errorMsg += "\n" + tmpErrMsg;
|
|
ImpalaConnector.databaseLock.unlock();
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
errorMsg = dropCurrentAssignmentTable();
|
|
if ( errorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
}
|
|
|
|
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
|
|
|
|
String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
|
|
}
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + ".");
|
|
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
|
|
}
|
|
|
|
|
|
@Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.")
|
|
public Boolean addWorkerReport(String curWorkerId, long curReportAssignmentsCounter, List<UrlReport> urlReports, int sizeOfUrlReports)
|
|
{
|
|
logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter);
|
|
|
|
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
|
|
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, curReportAssignmentsCounter, curWorkerId);
|
|
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Problem with the Impala-database!");
|
|
return false;
|
|
}
|
|
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
|
|
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter);
|
|
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
|
|
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
|
|
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
|
|
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
|
|
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
|
|
} else
|
|
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignmentsCounter);
|
|
|
|
String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignmentsCounter + File.separator;
|
|
try {
|
|
Files.createDirectories(Paths.get(localParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException"
|
|
} catch (Exception e) {
|
|
String errorMsg = "Could not create the parquet-directory: " + localParquetPath;
|
|
logger.error(errorMsg, e);
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);
|
|
return false;
|
|
}
|
|
|
|
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignmentsCounter);
|
|
|
|
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, localParquetPath, uploadFullTextsResponse);
|
|
|
|
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
|
|
String endingMkDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.mkDirsAndParams;
|
|
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
|
|
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams) )
|
|
{
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignmentsCounter);
|
|
return false;
|
|
}
|
|
|
|
boolean hasAttemptParquetFileProblem = false;
|
|
boolean hasPayloadParquetFileProblem = false;
|
|
|
|
try { // Invoke all the tasks and wait for them to finish.
|
|
List<Future<ParquetReport>> futures = insertsExecutor.invokeAll(callableTasks);
|
|
|
|
SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures);
|
|
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
|
|
if ( errorResponseEntity != null ) { // The related log is already shown.
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating or uploading the parquet files!");
|
|
return false;
|
|
}
|
|
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
|
|
hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem();
|
|
|
|
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
|
|
throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignmentsCounter);
|
|
else {
|
|
if ( hasAttemptParquetFileProblem )
|
|
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignmentsCounter);
|
|
else if ( hasPayloadParquetFileProblem )
|
|
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter);
|
|
else
|
|
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files.");
|
|
}
|
|
|
|
// Load all the parquet files of each type into its table.
|
|
ImpalaConnector.databaseLock.lock();
|
|
|
|
if ( ! hasAttemptParquetFileProblem )
|
|
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/", "attempt");
|
|
|
|
if ( ! hasPayloadParquetFileProblem )
|
|
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignmentsCounter + "/", "payload_aggregated");
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
|
|
throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter);
|
|
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
|
|
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignmentsCounter);
|
|
else
|
|
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter);
|
|
|
|
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
|
|
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
|
|
// This is a very rare case. At the moment, we just move on with table-merging.
|
|
} catch (RuntimeException re) {
|
|
ImpalaConnector.databaseLock.lock();
|
|
String assignmentErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter);
|
|
ImpalaConnector.databaseLock.unlock();
|
|
String errorMsg = re.getMessage();
|
|
if ( assignmentErrorMsg != null )
|
|
errorMsg += "\n" + assignmentErrorMsg;
|
|
logger.error(errorMsg);
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);
|
|
return false;
|
|
} catch (Exception e) {
|
|
String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage();
|
|
logger.error(errorMsg, e);
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);
|
|
return false; // No tables-merging is happening.
|
|
} finally {
|
|
logger.debug("Deleting parquet directory: " + localParquetPath);
|
|
fileUtils.deleteDirectory(new File(localParquetPath));
|
|
// Delete the HDFS subDirs for this Report.
|
|
String endingRmDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.rmDirsAndParams;
|
|
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
|
|
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams) )
|
|
{
|
|
logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); // A directory-specific log has already appeared.
|
|
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
|
|
// The worst case is that a few subDirs will be left back in the HDFS, although, without their parquetFiles, since they have already moved inside the DB tables.
|
|
}
|
|
}
|
|
|
|
logger.debug("Going to merge the parquet files for the tables which were altered.");
|
|
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
|
|
|
|
String mergeErrorMsg;
|
|
|
|
ImpalaConnector.databaseLock.lock();
|
|
|
|
if ( ! hasAttemptParquetFileProblem ) {
|
|
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if ( ! hasPayloadParquetFileProblem ) {
|
|
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
mergeErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter);
|
|
if ( mergeErrorMsg != null ) {
|
|
ImpalaConnector.databaseLock.unlock();
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
|
|
return false;
|
|
}
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
logger.debug("Finished merging the database tables.");
|
|
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "The full-text files failed to be acquired from the worker!");
|
|
return false;
|
|
}
|
|
|
|
// Notify the Worker that the processing of this report was successful, so that the Worker can delete the files.
|
|
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, null);
|
|
return true;
|
|
}
|
|
|
|
|
|
private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery)
|
|
{
|
|
final String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
|
|
final String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
|
|
|
|
try {
|
|
jdbcTemplate.execute(createCurrentAssignmentsQuery);
|
|
} catch (Exception e) {
|
|
String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e);
|
|
String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
|
|
if ( tmpErrMsg != null )
|
|
errorMsg += "\n" + tmpErrMsg;
|
|
return errorMsg;
|
|
}
|
|
|
|
try {
|
|
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
|
|
} catch (Exception e) {
|
|
String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e);
|
|
String tmpErrMsg = dropCurrentAssignmentTable();
|
|
if ( tmpErrMsg != null )
|
|
errorMsg += "\n" + tmpErrMsg;
|
|
return errorMsg;
|
|
}
|
|
|
|
return null; // All good.
|
|
}
|
|
|
|
|
|
private String dropCurrentAssignmentTable() {
|
|
String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE";
|
|
try {
|
|
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
|
|
return null; // All good. No error-message.
|
|
} catch (Exception e) {
|
|
return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e); // The error is already logged inside.
|
|
}
|
|
}
|
|
|
|
|
|
private String deleteAssignmentsBatch(long givenAssignmentsBatchCounter)
|
|
{
|
|
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
|
|
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
|
|
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload_aggregated" table for previously handled tasks.
|
|
return fileUtils.mergeParquetFiles("assignment", " WHERE assignments_batch_counter != ", givenAssignmentsBatchCounter);
|
|
}
|
|
|
|
|
|
private static final RestTemplate restTemplate = new RestTemplate();
|
|
|
|
private boolean postReportResultToWorker(String workerId, long assignmentRequestCounter, String errorMsg)
|
|
{
|
|
// Get the IP of this worker.
|
|
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
|
|
if ( workerInfo == null ) {
|
|
logger.error("Could not find any info for worker with id: \"" + workerId +"\".");
|
|
return false;
|
|
}
|
|
String url = "http://" + workerInfo.getWorkerIP() + ":1881/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will not be null.
|
|
|
|
if ( logger.isTraceEnabled() )
|
|
logger.trace("Going to \"postReportResultToWorker\": \"" + workerId + "\", for assignments_" + assignmentRequestCounter + ((errorMsg != null) ? "\nError: " + errorMsg : ""));
|
|
|
|
try {
|
|
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, errorMsg, String.class); // We may pass a "null" entity.
|
|
int responseCode = responseEntity.getStatusCodeValue();
|
|
if ( responseCode != HttpStatus.OK.value() ) {
|
|
logger.error("HTTP-Connection problem with the submission of the \"postReportResultToWorker\" of worker \"" + workerId + "\" and assignments_" + assignmentRequestCounter + "! Error-code was: " + responseCode);
|
|
return false;
|
|
} else {
|
|
fileUtils.deleteFile(workerReportsDirPath + "/" + workerId + "/" + workerId + "_assignments_" + assignmentRequestCounter + "_report.json");
|
|
return true;
|
|
}
|
|
} catch (HttpServerErrorException hsee) {
|
|
logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage());
|
|
return false;
|
|
} catch (Exception e) {
|
|
errorMsg = "Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId;
|
|
Throwable cause = e.getCause();
|
|
String exMsg;
|
|
if ( (cause != null) && ((exMsg = cause.getMessage()) != null) && exMsg.contains("Connection refused") ) {
|
|
logger.error(errorMsg + " | The worker has probably crashed, since we received a \"Connection refused\"!");
|
|
workerInfo.setHasShutdown(true); // Avoid sending possible shutdown-Requests later on. Also show a specific message if this Worker requests new assignments in the future.
|
|
} else
|
|
logger.error(errorMsg, e);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
|
|
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
|
|
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
|
|
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - If this is ever used, make it a global Thread-Local var. And then "clear" (reset) it after each use.
|
|
sb.append(baseInsertQuery);
|
|
for ( int i=1; i <= dataSize; ++i ) {
|
|
sb.append("(");
|
|
for ( int j=1; j <= numParamsPerRow; ++j ) {
|
|
sb.append("?");
|
|
if ( j < numParamsPerRow )
|
|
sb.append(",");
|
|
}
|
|
sb.append(")");
|
|
if ( i < dataSize )
|
|
sb.append(",");
|
|
}
|
|
|
|
PreparedStatement preparedInsertStatement;
|
|
try { // We use a "PreparedStatement" to do insertions, for security reasons.
|
|
preparedInsertStatement = con.prepareStatement(sb.toString());
|
|
} catch (SQLException sqle) {
|
|
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
|
|
logger.error(errorMsg + sqle.getMessage());
|
|
throw new RuntimeException(errorMsg);
|
|
}
|
|
return preparedInsertStatement;
|
|
}
|
|
|
|
}
|