- Process the WorkerReports in background Jobs and post the reportResults to the Workers.

- Save the workerReports to json files, until they are processed successfully.
- Show some custom metrics in prometheus.
This commit is contained in:
Lampros Smyrnaios 2023-05-24 13:52:28 +03:00
parent 0ea3e2de24
commit cd1fb0af88
10 changed files with 173 additions and 64 deletions

View File

@ -19,6 +19,9 @@ services:
- type: bind
source: /mnt/bulk_import
target: /mnt/bulk_import
- type: bind
source: $HOME/workerReports
target: /workerReports
- type: bind
source: $HOME/bulkImportReports
target: /bulkImportReports

View File

@ -4,6 +4,8 @@ import eu.openaire.urls_controller.services.FullTextsServiceImpl;
import eu.openaire.urls_controller.services.UrlsServiceImpl;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.UriBuilder;
import io.micrometer.core.aop.TimedAspect;
import io.micrometer.core.instrument.MeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
@ -101,4 +103,10 @@ public class Application {
return args -> new UriBuilder(environment, webServerAppCtxt);
}
@Bean
public TimedAspect timedAspect(MeterRegistry registry) {
return new TimedAspect(registry);
}
}

View File

@ -163,7 +163,7 @@ public class FullTextsController {
BulkImportReport bulkImportReport = new BulkImportReport(provenance, bulkImportReportFullPath, bulkImportReportID);
bulkImportReport.addEvent(msg);
String errorMsg = fileUtils.writeToFile(bulkImportReportFullPath, bulkImportReport.getJsonReport());
String errorMsg = fileUtils.writeToFile(bulkImportReportFullPath, bulkImportReport.getJsonReport(), true);
if ( errorMsg != null )
return ResponseEntity.internalServerError().body(errorMsg);

View File

@ -3,7 +3,9 @@ package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.FullTextsServiceImpl;
import eu.openaire.urls_controller.services.UrlsService;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -14,6 +16,9 @@ import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
@ -28,6 +33,9 @@ public class UrlsController {
@Autowired
private UrlsService urlsService;
@Autowired
private FileUtils fileUtils;
@Autowired
private ParquetFileUtils parquetFileUtils;
@ -38,19 +46,16 @@ public class UrlsController {
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
private String workerReportsDirPath;
// TODO - Implement an endpoint in the Controller to request the Controller to shutdown everything.
// The controller will make sure that it has finished with requesting the full-texts and sent a "shutDownRequest" to each worker (as we will have its IP)
// (some shutdown may fail (for any reason), but that should not halt the process ?)
// after the shut-Down-request have been sent the endpoint return the message that the shutdown process is in progress.
// TODO - Make another endpoint in the Controller to take POST requests from the workers about their shutdown-process.
// This endpoint will assign to the worker-s hashmap the value of "hashShutdown=true", and check if all the workers have finished.
// So, if we have the info that the current shutdown worker in the last one, then show a log-message and shutdown the Controller.
// TODO - Will the "last one" be the "actual last one" ? What if we had 7 workers but one crashed and now we have 6 workers to shutdown properly but the 7th worker seems to be still working..?
// In that case, we can cross it out easily, as the Controller will get either a "Connection refused" or a "connection timeout", depending on the state of the worker.
public UrlsController(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath)
{
if ( !workerReportsDirPath.endsWith("/") )
workerReportsDirPath += "/";
// TODO - Make the Worker to sent a POST request to the Controller to notify it that is has finished all work and it is about to close.
this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later.
}
@GetMapping("")
@ -124,7 +129,7 @@ public class UrlsController {
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport) {
if ( workerReport == null ) {
String errorMsg = "No \"WorkerReport\" was given!";
@ -154,10 +159,33 @@ public class UrlsController {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
}
long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
long curReportAssignmentsCounter = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignmentsCounter + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
return urlsService.addWorkerReport(curWorkerId, curReportAssignments, urlReports, sizeOUrlReports, request);
// Make sure this worker's report directory is created.
Path currentWorkerReportLocationDir = Paths.get(this.workerReportsDirPath, curWorkerId);
try {
Files.createDirectories(currentWorkerReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException"
} catch (Exception e) {
String errorMsg = "Could nor create the \"currentWorkerReportLocationDir\" for worker \"" + curWorkerId + "\" : " + currentWorkerReportLocationDir;
logger.error(errorMsg, e);
return ResponseEntity.internalServerError().body(errorMsg);
}
// Create the report file and write the worker-report to it.
String workerReportFile = currentWorkerReportLocationDir + "/" + curWorkerId + "_assignments_" + curReportAssignmentsCounter + "_report.json";
logger.debug("Going to write the worker report to json-file: \"" + workerReportFile + "\".");
fileUtils.writeToFile(workerReportFile, workerReport.getJsonReport(), false); // Only one thread is writing to this specific file.
// The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0).
int finalSizeOUrlReports = sizeOUrlReports;
FullTextsServiceImpl.backgroundCallableTasks.add(() ->
urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports)
);
String msg = "The 'addWorkerReport' request for worker with id: '" + curWorkerId + "' and assignments_" + curReportAssignmentsCounter + " , was accepted and will be scheduled for execution.";
logger.info(msg);
return ResponseEntity.ok().body(msg);
}
}

View File

@ -3,6 +3,7 @@ package eu.openaire.urls_controller.payloads.requests;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.google.gson.Gson;
import eu.openaire.urls_controller.models.UrlReport;
import java.util.List;
@ -16,6 +17,8 @@ import java.util.List;
})
public class WorkerReport {
private static final Gson gson = new Gson(); // This is "transient" be default. It won't be included in any json object.
@JsonProperty("workerId")
private String workerId;
@ -55,6 +58,11 @@ public class WorkerReport {
this.urlReports = urlReports;
}
public String getJsonReport() {
return gson.toJson(this);
}
@Override
public String toString() {
return "WorkerReport{" +

View File

@ -51,7 +51,7 @@ public class FullTextsServiceImpl implements FullTextsService {
@Autowired
private JdbcTemplate jdbcTemplate;
public static final ExecutorService backgroundExecutor = Executors.newFixedThreadPool(2); // At most 2 threads will be used.
public static final ExecutorService backgroundExecutor = Executors.newFixedThreadPool(4); // At most 4 threads will be used.
public static final List<Callable<Boolean>> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>());
@ -77,7 +77,7 @@ public class FullTextsServiceImpl implements FullTextsService {
String errorMsg = "The 'payloadsSchema' could not be parsed!";
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
@ -85,7 +85,7 @@ public class FullTextsServiceImpl implements FullTextsService {
List<String> fileLocations = getFileLocationsInsideDir(bulkImportDirName);
if ( fileLocations == null ) {
bulkImportReport.addEvent("Could not retrieve the files for bulk-import!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
@ -95,7 +95,7 @@ public class FullTextsServiceImpl implements FullTextsService {
String errorMsg = "No files were found inside the bulkImportDir: " + bulkImportDirName;
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
@ -110,7 +110,7 @@ public class FullTextsServiceImpl implements FullTextsService {
String errorMsg = "Could not create the local parquet-directory: " + localParquetDir;
logger.error(errorMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
@ -121,7 +121,7 @@ public class FullTextsServiceImpl implements FullTextsService {
String errorMsg = "Could not create the hdfs-directory: " + currentBulkImportHdfsDir;
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
@ -133,7 +133,7 @@ public class FullTextsServiceImpl implements FullTextsService {
int subListsSize = subLists.size();
bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel.");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
@ -168,7 +168,7 @@ public class FullTextsServiceImpl implements FullTextsService {
String errorMsg = "An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName;
logger.error(errorMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
} finally {
@ -182,7 +182,7 @@ public class FullTextsServiceImpl implements FullTextsService {
String errorMsg = "None of the files inside the bulkImportDir '" + bulkImportDirName + "' were imported!";
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
} else if ( numFailedFiles > 0 ) { // Some failed, but not all.
@ -193,7 +193,7 @@ public class FullTextsServiceImpl implements FullTextsService {
logger.info(msg);
}
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations.
ImpalaConnector.databaseLock.lock();
@ -201,7 +201,7 @@ public class FullTextsServiceImpl implements FullTextsService {
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
bulkImportReport.addEvent(mergeErrorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
@ -210,7 +210,10 @@ public class FullTextsServiceImpl implements FullTextsService {
String successMsg = "Finished the bulk-import procedure for '" + provenance + "' and bulkImportDir: " + bulkImportDirName;
logger.info(successMsg);
bulkImportReport.addEvent(successMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// The report-file will be overwritten every now and then, instead of appended, since we want to add an update new JSON report-object each time.
// Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time,
// after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours).
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return true;
@ -246,7 +249,7 @@ public class FullTextsServiceImpl implements FullTextsService {
if ( ((++counter) % 100) == 0 ) { // Every 100 files, report the status.
bulkImportReport.addEvent("Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files.");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
}
@ -255,7 +258,7 @@ public class FullTextsServiceImpl implements FullTextsService {
String errorMsg = "No payload-records were generated for any of the files inside the bulkImportDir: " + bulkImportDirName;
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
} else if ( numOfPayloadRecords != numOfFilesInSegment ) {
@ -263,7 +266,7 @@ public class FullTextsServiceImpl implements FullTextsService {
String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be imported, for segment-" + segmentCounter + "!";
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
// Construct the parquet file, upload it to HDFS and load them it in the "payload_bulk_import" table.
@ -275,7 +278,7 @@ public class FullTextsServiceImpl implements FullTextsService {
if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) {
bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: '" + parquetFileName + "'!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
}
@ -287,7 +290,7 @@ public class FullTextsServiceImpl implements FullTextsService {
String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir);
if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller.
bulkImportReport.addEvent("Could not upload the parquet-file '" + parquetFileName + "' to HDFS!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
}
@ -299,7 +302,7 @@ public class FullTextsServiceImpl implements FullTextsService {
if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) {
ImpalaConnector.databaseLock.unlock();
bulkImportReport.addEvent("Could not load the payload-records to the database!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
}

View File

@ -3,13 +3,12 @@ package eu.openaire.urls_controller.services;
import eu.openaire.urls_controller.models.UrlReport;
import org.springframework.http.ResponseEntity;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
public interface UrlsService {
ResponseEntity<?> getAssignments(String workerId, int assignmentsLimit);
ResponseEntity<?> addWorkerReport(String curWorkerId, long curReportAssignments, List<UrlReport> urlReports, int sizeOfUrlReports, HttpServletRequest request);
Boolean addWorkerReport(String curWorkerId, long curReportAssignments, List<UrlReport> urlReports, int sizeOfUrlReports);
}

View File

@ -2,10 +2,12 @@ package eu.openaire.urls_controller.services;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import io.micrometer.core.annotation.Timed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -14,8 +16,9 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.client.RestTemplate;
import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
@ -48,6 +51,9 @@ public class UrlsServiceImpl implements UrlsService {
@Autowired
private ParquetFileUtils parquetFileUtils;
@Value("${services.pdfaggregation.controller.workerReportsDirPath}")
private String workerReportsDirPath;
public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private final AtomicInteger maxAttemptsPerRecordAtomic;
@ -97,6 +103,7 @@ public class UrlsServiceImpl implements UrlsService {
}
@Timed(value = "getAssignments.time", description = "Time taken to return the assignments.")
public ResponseEntity<?> getAssignments(String workerId, int assignmentsLimit)
{
// Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >.
@ -225,12 +232,16 @@ public class UrlsServiceImpl implements UrlsService {
}
public ResponseEntity<?> addWorkerReport(String curWorkerId, long curReportAssignments, List<UrlReport> urlReports, int sizeOfUrlReports, HttpServletRequest request)
@Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.")
public Boolean addWorkerReport(String curWorkerId, long curReportAssignments, List<UrlReport> urlReports, int sizeOfUrlReports)
{
logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + assignmentsBatchCounter);
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, curReportAssignments, curWorkerId);
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
postReportResultToWorker(curWorkerId, curReportAssignments, "Problem with the Impala-database!");
return false;
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignments);
@ -248,7 +259,8 @@ public class UrlsServiceImpl implements UrlsService {
} catch (Exception e) {
String errorMsg = "Could not create the parquet-directory: " + localParquetPath;
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg);
return false;
}
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments);
@ -263,8 +275,9 @@ public class UrlsServiceImpl implements UrlsService {
SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures);
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
if ( errorResponseEntity != null ) {
return errorResponseEntity; // The related log is already shown.
if ( errorResponseEntity != null ) { // The related log is already shown.
postReportResultToWorker(curWorkerId, curReportAssignments, "Error when creating or uploading the parquet files!");
return false;
}
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem();
@ -309,11 +322,13 @@ public class UrlsServiceImpl implements UrlsService {
if ( assignmentErrorMsg != null )
errorMsg += "\n" + assignmentErrorMsg;
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg);
return false;
} catch (Exception e) {
String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg);
return false;
} finally {
logger.debug("Deleting parquet directory: " + localParquetPath);
fileUtils.deleteDirectory(new File(localParquetPath));
@ -330,7 +345,8 @@ public class UrlsServiceImpl implements UrlsService {
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg);
return false;
}
}
@ -338,23 +354,29 @@ public class UrlsServiceImpl implements UrlsService {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg);
return false;
}
}
mergeErrorMsg = deleteWorkerAssignments(curWorkerId);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg);
return false;
}
ImpalaConnector.databaseLock.unlock();
logger.debug("Finished merging the database tables.");
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful )
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body("The full-text files failed to be acquired from the worker!");
else
return ResponseEntity.status(HttpStatus.OK).build();
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
postReportResultToWorker(curWorkerId, curReportAssignments, "The full-text files failed to be acquired from the worker!");
return false;
}
// Notify the Worker that the processing of this report was successful, so that the Worker can delete the files.
postReportResultToWorker(curWorkerId, curReportAssignments, null);
return true;
}
@ -405,6 +427,39 @@ public class UrlsServiceImpl implements UrlsService {
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload_aggregated" table for previously handled tasks.
return fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
}
private boolean postReportResultToWorker(String workerId, long assignmentRequestCounter, String errorMsg)
{
// Get the IP of this worker.
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) {
logger.error("Could not find any info for worker with id: \"" + workerId +"\".");
return false;
}
String url = "http://" + workerInfo.getWorkerIP() + ":1881/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will not be null.
if ( logger.isTraceEnabled() )
logger.trace("Going to \"postReportResultToWorker\": \"" + workerId + "\", for assignments_" + assignmentRequestCounter + ((errorMsg != null) ? "\nError: " + errorMsg : ""));
try {
ResponseEntity<String> responseEntity = new RestTemplate().postForEntity(url, errorMsg, String.class); // We may pass a "null" entity.
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode != HttpStatus.OK.value() ) {
logger.error("HTTP-Connection problem with the submission of the \"postReportResultToWorker\" of worker \"" + workerId + "\" and assignments_" + assignmentRequestCounter + "! Error-code was: " + responseCode);
return false;
} else {
fileUtils.deleteFile(workerReportsDirPath + "/" + workerId + "/" + workerId + "_assignments_" + assignmentRequestCounter + "_report.json");
return true;
}
} catch (HttpServerErrorException hsee) {
logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage());
return false;
} catch (Exception e) {
logger.error("Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId, e);
return false;
}
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.

View File

@ -4,8 +4,10 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import org.apache.commons.io.FileDeleteStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,7 +17,6 @@ import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
@ -124,17 +125,17 @@ public class FileUtils {
// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
public UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) {
public UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, long assignmentsBatchCounter, String workerId) {
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!");
return UploadFullTextsResponse.unsuccessful;
}
String remoteAddr = request.getHeader("X-FORWARDED-FOR");
if ( (remoteAddr == null) || "".equals(remoteAddr) )
remoteAddr = request.getRemoteAddr();
String workerIp = null;
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) {
logger.error("Could not retrieve the info of worker: " + workerId);
return null;
} else
workerIp = workerInfo.getWorkerIP(); // This won't be null.
// Get the file-locations.
AtomicInteger numFullTextsFound = new AtomicInteger();
@ -246,7 +247,7 @@ public class FileUtils {
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
String baseUrl = "http://" + workerIp + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
// TODO - The worker should send the port in which it accepts requests, along with the current request.
// TODO - The least we have to do it to expose the port-assignment somewhere more obvious like inside the "application.yml" file.
@ -606,9 +607,11 @@ public class FileUtils {
Lock fileWriteLock = new ReentrantLock(true);
public String writeToFile(String fileFullPath, String stringToWrite)
public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads)
{
fileWriteLock.lock();
if ( shouldLockThreads )
fileWriteLock.lock();
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FileUtils.tenMb) )
{
bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter.
@ -617,7 +620,8 @@ public class FileUtils {
logger.error(errorMsg, e);
return errorMsg;
} finally {
fileWriteLock.unlock();
if ( shouldLockThreads )
fileWriteLock.unlock();
}
return null;
}

View File

@ -20,6 +20,7 @@ services:
assignmentLimit: 10000
maxAttemptsPerRecord: 3
baseFilesLocation: tmp/
workerReportsDirPath: /workerReports/
parquetLocalDirectoryPath: ${services.pdfaggregation.controller.baseFilesLocation}parquetFiles/
s3:
endpoint: XA