UrlsController/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java

629 lines
41 KiB
Java

package eu.openaire.urls_controller.services;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import io.micrometer.core.annotation.Timed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.client.RestTemplate;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Service
public class UrlsServiceImpl implements UrlsService {
private static final Logger logger = LoggerFactory.getLogger(UrlsServiceImpl.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private FileUtils fileUtils;
@Autowired
private ParquetFileUtils parquetFileUtils;
@Value("${services.pdfaggregation.controller.workerReportsDirPath}")
private String workerReportsDirPath;
@Value("${services.pdfaggregation.worker.port}")
private String workerPort;
public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private final AtomicInteger maxAttemptsPerRecordAtomic;
private static String excludedDatasourceIDsStringList = null;
private static final String DOC_URL_FILTER = ".+(?:pdf|download|/doc|document|(?:/|[?]|&)file|/fulltext|attachment|/paper|view(?:file|doc)|/get|cgi/viewcontent.cgi\\?|t[ée]l[ée]charger|descargar).*";
// "DOC_URL_FILTER" works for lowerCase Strings (we use the "ignore-case" indicator in the "regexp_like()" method).
public static final ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
public UrlsServiceImpl(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord, BulkImport bulkImport)
{
maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord);
HashMap<String, BulkImport.BulkImportSource> bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources());
// The "bulkImportSources" will not be null, as it will be defined inside the "application.yml" file.
// In case no bulkImport Datasources are given, then the "bulkImportSources" list will just be empty.
if ( bulkImportSources.isEmpty() )
return; // So the "excludedDatasourceIDsStringList" -code should be placed last in this Constructor-method.
if ( logger.isTraceEnabled() )
logger.trace("BulkImportSources:\n" + bulkImportSources);
List<String> excludedIDs = new ArrayList<>();
for ( BulkImport.BulkImportSource source : bulkImportSources.values() ) {
String datasourceID = source.getDatasourceID();
if ( (datasourceID == null) || datasourceID.isEmpty() )
throw new RuntimeException("One of the bulk-imported datasourceIDs was not found! | source: " + source);
excludedIDs.add(datasourceID);
}
int exclusionListSize = excludedIDs.size(); // This list will not be empty.
// Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern:
// ("ID_1", "ID_2", ...)
int stringBuilderCapacity = ((exclusionListSize * 46) + (exclusionListSize -1) +2);
excludedDatasourceIDsStringList = FileUtils.getQueryListString(excludedIDs, exclusionListSize, stringBuilderCapacity);
logger.info("The following bulkImport data-sources will be excluded from crawling: " + excludedDatasourceIDsStringList);
}
@Timed(value = "getAssignments.time", description = "Time taken to return the assignments.")
public ResponseEntity<?> getAssignments(String workerId, int assignmentsLimit)
{
int currentYear = Calendar.getInstance().get(Calendar.YEAR);
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
String findAssignmentsQuery =
"select pubid, url, datasourceid, datasourcename\n" + // Select the final sorted data with "assignmentsLimit".
"from (select distinct p.id as pubid, pu.url as url, pb.level as level, attempts.counts as attempt_count, p.year as pub_year, d.id as datasourceid, d.name as datasourcename\n" + // Select the distinct id-url data. Beware that this will return duplicate id-url pairs, wince one pair may be associated with multiple datasources.
" from " + DatabaseConnector.databaseName + ".publication_urls pu\n" +
" join " + DatabaseConnector.databaseName + ".publication p on p.id=pu.id\n" +
" join " + DatabaseConnector.databaseName + ".datasource d on d.id=p.datasourceid and d.allow_harvest=true"+
((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below.
(" and d.id not in " + excludedDatasourceIDsStringList + GenericUtils.endOfLine) : "") +
" left anti join (select a.original_url from " + DatabaseConnector.databaseName + ".assignment a\n" +
" union all\n" +
" select pl.original_url from " + DatabaseConnector.databaseName + ".payload pl\n" + // Here we access the payload-VIEW which includes the three payload-tables.
" ) as existing\n" +
" on existing.original_url=pu.url\n" +
" left outer join " + DatabaseConnector.databaseName + ".publication_boost pb\n" +
" on p.id=pb.id\n" +
" left outer join (select count(at.original_url) as counts, at.original_url from " + DatabaseConnector.databaseName + ".attempt at group by at.original_url) as attempts\n" +
" on attempts.original_url=pu.url\n" +
" where coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + GenericUtils.endOfLine +
" and not exists (select 1 from " + DatabaseConnector.databaseName + ".attempt a where a.original_url=pu.url and a.error_class = 'noRetry' limit 1)\n" +
" and (p.year <= " + currentYear + " or p.year > " + (currentYear + 5) + ")\n" + // Exclude the pubs which will be published in the next 5 years. They don't provide full-texts now. (We don't exclude all future pubs, since, some have invalid year, like "9999").
") as distinct_results\n" +
"order by coalesce(attempt_count, 0), coalesce(level, 0) desc, coalesce(pub_year, 0) desc, (case when regexp_like(url, '" + DOC_URL_FILTER + "', 'i') then 1 else 0 end) desc, reverse(pubid), url\n" + // We also order by reverse "pubid" and "url", in order to get the exactly same records for consecutive runs, all things being equal.
"limit " + assignmentsLimit;
// The datasourceID and datasourceName are currently not used during the processing in the Worker. They may be used by the Worker, in the future to apply a datasource-specific aggregation plugin to take the full-texts quickly, instead of using the general crawling one.
// However, the "datasourceid" is useful to be able to generate the fileNames for the S3, without needing to perform additional select queries (with JOINs) at that phase.
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
//logger.trace("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG!
final String getAssignmentsQuery = "select * from " + DatabaseConnector.databaseName + ".current_assignment";
final List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
DatabaseConnector.databaseLock.lock();
// For performance reasons, we collect the returned assignments and create a temp-table with them, so that later they can be copied efficiently to the "a-bit-more-permanent" assignment table.
// This way, we avoid having to manually insert thousands of assignment records there. Instead, we create a new table AS the results from the "findAssignmentsQuery".
String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery);
if ( errorMsg != null ) {
DatabaseConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
long timestampMillis = System.currentTimeMillis();
Timestamp timestamp = new Timestamp(timestampMillis); // Store it here, in order to have the same for all current records.
try {
jdbcTemplate.query(getAssignmentsQuery, rs -> {
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setAssignmentsBatchCounter(curAssignmentsBatchCounter);
assignment.setTimestamp(timestamp);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned, do the following. The column-indexing starts from 1.
assignment.setId(rs.getString(1));
assignment.setOriginalUrl(rs.getString(2));
datasource.setId(rs.getString(3));
datasource.setName(rs.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
}
assignment.setDatasource(datasource);
assignments.add(assignment);
});
} catch (EmptyResultDataAccessException erdae) {
String tmpErrMsg = dropCurrentAssignmentTable();
DatabaseConnector.databaseLock.unlock();
errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.";
logger.error(errorMsg);
if ( tmpErrMsg != null ) {
errorMsg += GenericUtils.endOfLine + tmpErrMsg; // The additional error-msg is already logged.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} else
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(new AssignmentsResponse((long) -1, null));
} catch (Exception e) {
String tmpErrMsg = dropCurrentAssignmentTable();
DatabaseConnector.databaseLock.unlock();
errorMsg = DatabaseConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e);
if ( tmpErrMsg != null )
errorMsg += GenericUtils.endOfLine + tmpErrMsg;
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
String tmpErrMsg = dropCurrentAssignmentTable();
DatabaseConnector.databaseLock.unlock();
errorMsg = "Some results were retrieved from the \"getAssignmentsQuery\", but no data could be extracted from them, for worker with id: " + workerId;
if ( tmpErrMsg != null )
errorMsg += GenericUtils.endOfLine + tmpErrMsg;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
else if ( assignmentsSize < assignmentsLimit )
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + ", for the next requests.");
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker. | assignments_" + curAssignmentsBatchCounter);
// Write the Assignment details to the assignment-table.
String insertAssignmentsQuery = "insert into " + DatabaseConnector.databaseName + ".assignment \n select pubid, url, '" + workerId + "', " + curAssignmentsBatchCounter + ", " + timestampMillis
+ "\nfrom " + DatabaseConnector.databaseName + ".current_assignment";
try {
jdbcTemplate.execute(insertAssignmentsQuery);
} catch (Exception e) {
String tmpErrMsg = dropCurrentAssignmentTable();
DatabaseConnector.databaseLock.unlock();
errorMsg = DatabaseConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e);
if ( tmpErrMsg != null )
errorMsg += GenericUtils.endOfLine + tmpErrMsg;
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
errorMsg = dropCurrentAssignmentTable();
DatabaseConnector.databaseLock.unlock();
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
// We do not need to "merge" the parquet files for the "assignment" table here, since this happens every time we delete the assignments of a specific batch.
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. | curAssignmentsBatchCounter: " + curAssignmentsBatchCounter + " | worker: " + workerId);
// Due to the fact that one publication with an id-url pair can be connected with multiple datasources, the results returned from the query may be duplicates.
// So, we apply a post-processing step where we collect only one instance of each id-url pair and send it to the Worker.
// We would certainly like the query to return assignments with unique id-url pairs, but for now there is no other solution.
// There is no logic-error though, since the worker requests a "LIMIT" of 5000 not "exactly 5000" nor it matters the actual number it will process.
// So, it will ask for "up to 5000" and the Controller may return 4700.
// Since the average number of duplicates will be the same for each worker-request, none of the Workers will have any advantage over the other in the long run,
// so there will be no conflicting performance data between them.
final HashMap<String, Assignment> uniquePairsAndAssignments = new HashMap<>((int) (assignmentsLimit * 0.9));
for ( Assignment assignment : assignments )
uniquePairsAndAssignments.put(assignment.getId() + "_" + assignment.getOriginalUrl(), assignment);
// This will just update the duplicate record with another "assignment object", containing a different datasource.
List<Assignment> distinctAssignments = new ArrayList<>(uniquePairsAndAssignments.values());
int distinctAssignmentsSize = distinctAssignments.size();
if ( logger.isTraceEnabled() )
logger.trace("numDuplicates in returned assignments_" + curAssignmentsBatchCounter + " = " + (assignmentsSize - distinctAssignmentsSize));
logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + ", with " + distinctAssignmentsSize + " assignments, to worker with ID: " + workerId + ".");
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, distinctAssignments));
}
public static final AtomicLong numOfWorkerReportsProcessed = new AtomicLong(0);
@Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.")
public Boolean addWorkerReport(String curWorkerId, long curReportAssignmentsCounter, List<UrlReport> urlReports, int sizeOfUrlReports)
{
long currentNumOfWorkerReportsProcessed = numOfWorkerReportsProcessed.incrementAndGet(); // Increment it when it is actually being pressed, not when it arrives at the endpoint.
logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter);
boolean hasFulltexts = true;
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, curWorkerId);
if ( uploadFullTextsResponse == null ) {
// Nothing to post to the Worker, since we do not have the worker's info.
// Rename the worker-report-file to indicate its "failure", so that the scheduler can pick it up and retry processing it.
String workerReportBaseName = this.workerReportsDirPath + File.separator + curWorkerId + File.separator + curWorkerId + "_assignments_" + curReportAssignmentsCounter + "_report";
renameAndGetWorkerReportFile(workerReportBaseName, new File(workerReportBaseName + ".json"), "No info was found for worker: " + curWorkerId); // It may return null.
return false;
} else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
fileUtils.removeUnretrievedFullTextsFromUrlReports(urlReports, false);
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
hasFulltexts = false;
} else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful_without_fulltexts )
hasFulltexts = false;
else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignmentsCounter);
String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignmentsCounter + File.separator;
try {
Files.createDirectories(Paths.get(localParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException"
} catch (Exception e) {
String errorMsg = "Could not create the parquet-directory: " + localParquetPath;
logger.error(errorMsg, e);
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);
return false;
}
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignmentsCounter);
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
String endingMkDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.mkDirsAndParams;
if ( !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
|| (hasFulltexts && !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams)) )
{
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignmentsCounter);
return false;
}
boolean hasAttemptParquetFileProblem = false;
boolean hasPayloadParquetFileProblem = false;
DatabaseConnector.databaseLock.lock();
// Lock the DB here so the prefilled-Payloads which will be generated inside the "getTasksForCreatingAndUploadingParquetFiles()" method (using a dedicated query)
// will be synchronized with the insert of all attempt and payload records to the DB. This action is NOT a callable-task, so it runs during the execution of this method.
// This is important in order to avoid having workers take these records as assignments, when we know that payloads are ready to be inserted for them.
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, localParquetPath, uploadFullTextsResponse);
try { // Invoke all the tasks and wait for them to finish.
List<Future<ParquetReport>> futures = insertsExecutor.invokeAll(callableTasks);
SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures);
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
if ( errorResponseEntity != null ) { // The related log is already shown in this case.
DatabaseConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating or uploading the parquet files!");
return false;
}
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem();
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem ) {
DatabaseConnector.databaseLock.unlock();
throw new RuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_" + curReportAssignmentsCounter);
} else {
if ( hasAttemptParquetFileProblem )
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignmentsCounter);
else if ( hasPayloadParquetFileProblem )
logger.error("All of the payload-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter);
else
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files, for batch-assignments_" + curReportAssignmentsCounter);
}
// Load all the parquet files of each type into its table.
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/", "attempt");
if ( hasFulltexts && ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignmentsCounter + "/", "payload_aggregated");
DatabaseConnector.databaseLock.unlock();
if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem )
throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter);
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignmentsCounter);
else
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" " + (hasFulltexts ? "and the \"payload_aggregated\" tables" : "table") + ", for batch-assignments_" + curReportAssignmentsCounter);
} catch (InterruptedException ie) { // Thrown by "insertsExecutor.invokeAll()". In this case, any unfinished tasks are cancelled.
DatabaseConnector.databaseLock.unlock();
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// This is a very rare case. At the moment, we just move on with table-merging.
} catch (RuntimeException re) {
// Only one of the REs is inside the DB-locked code, so the "unlock" happens before it's thrown, there.
String errorMsg = re.getMessage();
logger.error(errorMsg);
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);
return false;
} catch (Exception e) { // Thrown by "insertsExecutor.invokeAll()".
DatabaseConnector.databaseLock.unlock();
String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, errorMsg);
return false; // No tables-merging is happening.
} finally {
logger.debug("Deleting parquet directory: " + localParquetPath);
fileUtils.deleteDirectory(new File(localParquetPath));
// Delete the HDFS subDirs for this Report.
String endingRmDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.rmDirsAndParams;
if ( !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
|| (hasFulltexts && !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams)) )
{
logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); // A directory-specific log has already appeared.
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
// The worst case is that a few subDirs will be left back in the HDFS, although, without their parquetFiles, since they have already moved inside the DB tables.
}
}
// Delete the assignments each time, as they are bound to the "current" assignmentsCounter. Otherwise, they will never be deleted!
// If this method exits sooner, due to an error, then the assignments are not deleted in order to wait for the schedulers to retry them and not be given to workers, to avoid reprocessing the urls.
DatabaseConnector.databaseLock.lock();
String deleteErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter);
DatabaseConnector.databaseLock.unlock();
if ( deleteErrorMsg != null ) {
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, deleteErrorMsg);
return false;
}
// For every "numOfWorkers" assignment-batches that get processed, we merge the "attempts" and "payload_aggregated" tables.
if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfActiveWorkers.get()) == 0 ) // The workersNum will not be zero!
if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, hasFulltexts) )
// The "postReportResultToWorker()" was called inside.
return false;
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "The full-text files failed to be acquired from the worker!");
return false;
}
// Notify the Worker that the processing of this report was successful, so that the Worker can delete the files.
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, null);
return true;
}
private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery)
{
String createCurrentAssignmentsQuery = "create table " + DatabaseConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
final String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + DatabaseConnector.databaseName + ".current_assignment";
try {
jdbcTemplate.execute(createCurrentAssignmentsQuery);
} catch (Exception e) {
String errorMsg = DatabaseConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
if ( tmpErrMsg != null )
errorMsg += GenericUtils.endOfLine + tmpErrMsg;
return errorMsg;
}
try {
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
} catch (Exception e) {
String errorMsg = DatabaseConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += GenericUtils.endOfLine + tmpErrMsg;
return errorMsg;
}
return null; // All good.
}
private String dropCurrentAssignmentTable() {
final String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + DatabaseConnector.databaseName + ".current_assignment PURGE";
try {
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
return null; // All good. No error-message.
} catch (Exception e) {
return DatabaseConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e); // The error is already logged inside.
}
}
private boolean mergeWorkerRelatedTables(String curWorkerId, long curReportAssignmentsCounter, boolean hasAttemptParquetFileProblem, boolean hasPayloadParquetFileProblem, boolean hasFulltexts)
{
logger.debug("Going to merge the parquet files for the tables which were altered, for batch-assignments_" + curReportAssignmentsCounter);
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
// This means that over time a table may have thousands of parquet files and the search through them will be very slow. Thus, we merge them regularly.
String mergeErrorMsg;
DatabaseConnector.databaseLock.lock();
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = parquetFileUtils.mergeParquetFilesOfTable("attempt", "", null);
if ( mergeErrorMsg != null ) {
DatabaseConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
return false;
}
}
if ( hasFulltexts && ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = parquetFileUtils.mergeParquetFilesOfTable("payload_aggregated", "", null);
if ( mergeErrorMsg != null ) {
DatabaseConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
return false;
}
}
DatabaseConnector.databaseLock.unlock();
logger.debug("Finished merging the database tables, for batch-assignments_" + curReportAssignmentsCounter);
return true;
}
public String deleteAssignmentsBatch(long givenAssignmentsBatchCounter)
{
// This will delete the rows of the "assignment" table which refer to the "givenAssignmentsBatchCounter".
// As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER "givenAssignmentsBatchCounter" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info forever, as the "findAssignmentsQuery" checks the "payload" table for previously handled tasks.
return parquetFileUtils.mergeParquetFilesOfTable("assignment", " WHERE assignments_batch_counter != ", givenAssignmentsBatchCounter);
}
public String deleteAssignmentsWithOlderDate(long givenDate)
{
// This will delete the rows of the "assignment" table which are older than "givenDate".
// As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to NEWER than "givenDate" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info forever, as the "findAssignmentsQuery" checks the "payload" table for previously handled tasks.
return parquetFileUtils.mergeParquetFilesOfTable("assignment", " WHERE `date` >= ", givenDate);
}
private static final RestTemplate restTemplate = new RestTemplate();
private boolean postReportResultToWorker(String workerId, long assignmentRequestCounter, String errorMsg)
{
// Rename the worker-report to indicate success or failure.
String workerReportBaseName = this.workerReportsDirPath + File.separator + workerId + File.separator + workerId + "_assignments_" + assignmentRequestCounter + "_report";
File workerReport = new File(workerReportBaseName + ".json");
File renamedWorkerReport = renameAndGetWorkerReportFile(workerReportBaseName, workerReport, errorMsg); // It may return null.
// Get the IP of this worker.
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) {
logger.error("Could not find any info for worker with id: \"" + workerId +"\".");
return false;
}
String url = "http://" + workerInfo.getWorkerIP() + ":" + workerPort + "/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will NOT be null.
if ( logger.isTraceEnabled() )
logger.trace("Going to \"postReportResultToWorker\": \"" + workerId + "\", for assignments_" + assignmentRequestCounter + ((errorMsg != null) ? "\nError: " + errorMsg : ""));
try {
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, errorMsg, String.class); // We may pass a "null" entity.
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode != 200 ) {
logger.error("HTTP-Connection problem with the submission of the \"postReportResultToWorker\" of worker \"" + workerId + "\" and assignments_" + assignmentRequestCounter + "! Error-code was: " + responseCode);
return false;
} else if ( errorMsg == null ) // If the worker was notified, then go delete the successful workerReport.
fileUtils.deleteFile((renamedWorkerReport != null) ? renamedWorkerReport.getAbsolutePath() : workerReport.getAbsolutePath());
return true;
} catch (HttpServerErrorException hsee) {
logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage());
return false;
} catch (HttpClientErrorException hcee) {
logger.error("The Controller did something wrong when sending the report result to the Worker (" + workerId + "). | url: " + url + GenericUtils.endOfLine + hcee.getMessage());
return false;
} catch (Exception e) {
errorMsg = "Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId;
Throwable cause = e.getCause();
String exMsg;
if ( (cause != null) && ((exMsg = cause.getMessage()) != null) && exMsg.contains("Connection refused") ) {
logger.error(errorMsg + " | The worker has probably crashed, since we received a \"Connection refused\"!");
workerInfo.setHasShutdown(true); // Avoid sending possible shutdown-Requests later on. Also show a specific message if this Worker requests new assignments in the future.
} else
logger.error(errorMsg, e);
return false;
}
}
private static File renameAndGetWorkerReportFile(String workerReportBaseName, File workerReport, String errorMsg)
{
File renamedWorkerReport = null;
try {
// Check if the workerReport does not exist under this name, as it may have been renamed previously to "failed" and now is the 2nd try.
if ( !workerReport.isFile() ) {
// Then this is the 2nd try and the report has already been renamed to "failed" the 1st time.
workerReport = new File(workerReportBaseName + "_failed.json");
if ( !workerReport.isFile() ) { // In this case, an error exists, since this file was deleted before its time.
logger.error("The workerReport file \"" + workerReport.getAbsolutePath() + "\" does not exist!");
// TODO - Do we need additional handling? This report may be either successful or failed but the file was deleted.
return null; // Do not proceed on renaming the non-existing file.
}
}
// Only if the file exists, proceed to renaming it.
renamedWorkerReport = new File(workerReportBaseName + ((errorMsg == null) ? "_successful.json" : "_failed.json"));
if ( ! workerReport.renameTo(renamedWorkerReport) ) {
logger.warn("There was a problem when renaming the workerReport: " + workerReport.getName());
return null;
}
} catch (Exception e) {
logger.error("There was a problem when renaming the workerReport: " + workerReport.getName(), e);
return null;
}
return renamedWorkerReport;
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
StringBuilder sb = new StringBuilder(baseInsertQuery.length() + (dataSize * 6 * numParamsPerRow)); // TODO - If this is ever used, make it a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for ( int i=1; i <= dataSize; ++i ) {
sb.append("(");
for ( int j=1; j <= numParamsPerRow; ++j ) {
sb.append("?");
if ( j < numParamsPerRow )
sb.append(",");
}
sb.append(")");
if ( i < dataSize )
sb.append(",");
}
PreparedStatement preparedInsertStatement;
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertStatement = con.prepareStatement(sb.toString());
} catch (SQLException sqle) {
String errorMsg = "Problem when creating the prepared statement for the insertQuery: \"" + baseInsertQuery + "\"...!\n";
logger.error(errorMsg + sqle.getMessage());
throw new RuntimeException(errorMsg);
}
return preparedInsertStatement;
}
}