diff --git a/docker-compose.yml b/docker-compose.yml index a3c1499..b5091ef 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,9 @@ services: - type: bind source: /mnt/bulk_import target: /mnt/bulk_import + - type: bind + source: $HOME/workerReports + target: /workerReports - type: bind source: $HOME/bulkImportReports target: /bulkImportReports diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/Application.java index 20579e7..dc2048a 100644 --- a/src/main/java/eu/openaire/urls_controller/Application.java +++ b/src/main/java/eu/openaire/urls_controller/Application.java @@ -4,6 +4,8 @@ import eu.openaire.urls_controller.services.FullTextsServiceImpl; import eu.openaire.urls_controller.services.UrlsServiceImpl; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.UriBuilder; +import io.micrometer.core.aop.TimedAspect; +import io.micrometer.core.instrument.MeterRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; @@ -101,4 +103,10 @@ public class Application { return args -> new UriBuilder(environment, webServerAppCtxt); } + + @Bean + public TimedAspect timedAspect(MeterRegistry registry) { + return new TimedAspect(registry); + } + } \ No newline at end of file diff --git a/src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java b/src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java index 6522bd2..8b5cb5a 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java @@ -163,7 +163,7 @@ public class FullTextsController { BulkImportReport bulkImportReport = new BulkImportReport(provenance, bulkImportReportFullPath, bulkImportReportID); bulkImportReport.addEvent(msg); - String errorMsg = fileUtils.writeToFile(bulkImportReportFullPath, bulkImportReport.getJsonReport()); + String errorMsg = fileUtils.writeToFile(bulkImportReportFullPath, bulkImportReport.getJsonReport(), true); if ( errorMsg != null ) return ResponseEntity.internalServerError().body(errorMsg); diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index 2200e26..f7eb76b 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -3,7 +3,9 @@ package eu.openaire.urls_controller.controllers; import eu.openaire.urls_controller.models.UrlReport; import eu.openaire.urls_controller.models.WorkerInfo; import eu.openaire.urls_controller.payloads.requests.WorkerReport; +import eu.openaire.urls_controller.services.FullTextsServiceImpl; import eu.openaire.urls_controller.services.UrlsService; +import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.ParquetFileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,6 +16,9 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; @@ -28,6 +33,9 @@ public class UrlsController { @Autowired private UrlsService urlsService; + @Autowired + private FileUtils fileUtils; + @Autowired private ParquetFileUtils parquetFileUtils; @@ -38,19 +46,16 @@ public class UrlsController { public static final ConcurrentHashMap workersInfoMap = new ConcurrentHashMap<>(6); + private String workerReportsDirPath; - // TODO - Implement an endpoint in the Controller to request the Controller to shutdown everything. - // The controller will make sure that it has finished with requesting the full-texts and sent a "shutDownRequest" to each worker (as we will have its IP) - // (some shutdown may fail (for any reason), but that should not halt the process ?) - // after the shut-Down-request have been sent the endpoint return the message that the shutdown process is in progress. - // TODO - Make another endpoint in the Controller to take POST requests from the workers about their shutdown-process. - // This endpoint will assign to the worker-s hashmap the value of "hashShutdown=true", and check if all the workers have finished. - // So, if we have the info that the current shutdown worker in the last one, then show a log-message and shutdown the Controller. - // TODO - Will the "last one" be the "actual last one" ? What if we had 7 workers but one crashed and now we have 6 workers to shutdown properly but the 7th worker seems to be still working..? - // In that case, we can cross it out easily, as the Controller will get either a "Connection refused" or a "connection timeout", depending on the state of the worker. + public UrlsController(@Value("${services.pdfaggregation.controller.workerReportsDirPath}") String workerReportsDirPath) + { + if ( !workerReportsDirPath.endsWith("/") ) + workerReportsDirPath += "/"; - // TODO - Make the Worker to sent a POST request to the Controller to notify it that is has finished all work and it is about to close. + this.workerReportsDirPath = workerReportsDirPath; // This dir will be created later. + } @GetMapping("") @@ -124,7 +129,7 @@ public class UrlsController { @PostMapping("addWorkerReport") - public ResponseEntity addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) { + public ResponseEntity addWorkerReport(@RequestBody WorkerReport workerReport) { if ( workerReport == null ) { String errorMsg = "No \"WorkerReport\" was given!"; @@ -154,10 +159,33 @@ public class UrlsController { return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg); } - long curReportAssignments = workerReport.getAssignmentRequestCounter(); - logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database."); + long curReportAssignmentsCounter = workerReport.getAssignmentRequestCounter(); + logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignmentsCounter + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database."); - return urlsService.addWorkerReport(curWorkerId, curReportAssignments, urlReports, sizeOUrlReports, request); + // Make sure this worker's report directory is created. + Path currentWorkerReportLocationDir = Paths.get(this.workerReportsDirPath, curWorkerId); + try { + Files.createDirectories(currentWorkerReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException" + } catch (Exception e) { + String errorMsg = "Could nor create the \"currentWorkerReportLocationDir\" for worker \"" + curWorkerId + "\" : " + currentWorkerReportLocationDir; + logger.error(errorMsg, e); + return ResponseEntity.internalServerError().body(errorMsg); + } + + // Create the report file and write the worker-report to it. + String workerReportFile = currentWorkerReportLocationDir + "/" + curWorkerId + "_assignments_" + curReportAssignmentsCounter + "_report.json"; + logger.debug("Going to write the worker report to json-file: \"" + workerReportFile + "\"."); + fileUtils.writeToFile(workerReportFile, workerReport.getJsonReport(), false); // Only one thread is writing to this specific file. + // The above method will overwrite a possibly existing file. So in case of a crash, it's better to back up the reports before starting the Controller again (as the assignments-counter will start over, from 0). + + int finalSizeOUrlReports = sizeOUrlReports; + FullTextsServiceImpl.backgroundCallableTasks.add(() -> + urlsService.addWorkerReport(curWorkerId, curReportAssignmentsCounter, urlReports, finalSizeOUrlReports) + ); + + String msg = "The 'addWorkerReport' request for worker with id: '" + curWorkerId + "' and assignments_" + curReportAssignmentsCounter + " , was accepted and will be scheduled for execution."; + logger.info(msg); + return ResponseEntity.ok().body(msg); } } diff --git a/src/main/java/eu/openaire/urls_controller/payloads/requests/WorkerReport.java b/src/main/java/eu/openaire/urls_controller/payloads/requests/WorkerReport.java index 013842c..c54d5e9 100644 --- a/src/main/java/eu/openaire/urls_controller/payloads/requests/WorkerReport.java +++ b/src/main/java/eu/openaire/urls_controller/payloads/requests/WorkerReport.java @@ -3,6 +3,7 @@ package eu.openaire.urls_controller.payloads.requests; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.google.gson.Gson; import eu.openaire.urls_controller.models.UrlReport; import java.util.List; @@ -16,6 +17,8 @@ import java.util.List; }) public class WorkerReport { + private static final Gson gson = new Gson(); // This is "transient" be default. It won't be included in any json object. + @JsonProperty("workerId") private String workerId; @@ -55,6 +58,11 @@ public class WorkerReport { this.urlReports = urlReports; } + + public String getJsonReport() { + return gson.toJson(this); + } + @Override public String toString() { return "WorkerReport{" + diff --git a/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java index 359292d..49aad0d 100644 --- a/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java @@ -51,7 +51,7 @@ public class FullTextsServiceImpl implements FullTextsService { @Autowired private JdbcTemplate jdbcTemplate; - public static final ExecutorService backgroundExecutor = Executors.newFixedThreadPool(2); // At most 2 threads will be used. + public static final ExecutorService backgroundExecutor = Executors.newFixedThreadPool(4); // At most 4 threads will be used. public static final List> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>()); @@ -77,7 +77,7 @@ public class FullTextsServiceImpl implements FullTextsService { String errorMsg = "The 'payloadsSchema' could not be parsed!"; logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); FullTextsController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -85,7 +85,7 @@ public class FullTextsServiceImpl implements FullTextsService { List fileLocations = getFileLocationsInsideDir(bulkImportDirName); if ( fileLocations == null ) { bulkImportReport.addEvent("Could not retrieve the files for bulk-import!"); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); FullTextsController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -95,7 +95,7 @@ public class FullTextsServiceImpl implements FullTextsService { String errorMsg = "No files were found inside the bulkImportDir: " + bulkImportDirName; logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); FullTextsController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -110,7 +110,7 @@ public class FullTextsServiceImpl implements FullTextsService { String errorMsg = "Could not create the local parquet-directory: " + localParquetDir; logger.error(errorMsg, e); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); FullTextsController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -121,7 +121,7 @@ public class FullTextsServiceImpl implements FullTextsService { String errorMsg = "Could not create the hdfs-directory: " + currentBulkImportHdfsDir; logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); FullTextsController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -133,7 +133,7 @@ public class FullTextsServiceImpl implements FullTextsService { int subListsSize = subLists.size(); bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel."); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); for ( int i = 0; i < subListsSize; ++i ) { int finalI = i; @@ -168,7 +168,7 @@ public class FullTextsServiceImpl implements FullTextsService { String errorMsg = "An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName; logger.error(errorMsg, e); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); FullTextsController.bulkImportDirs.remove(bulkImportDirName); return false; } finally { @@ -182,7 +182,7 @@ public class FullTextsServiceImpl implements FullTextsService { String errorMsg = "None of the files inside the bulkImportDir '" + bulkImportDirName + "' were imported!"; logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); FullTextsController.bulkImportDirs.remove(bulkImportDirName); return false; } else if ( numFailedFiles > 0 ) { // Some failed, but not all. @@ -193,7 +193,7 @@ public class FullTextsServiceImpl implements FullTextsService { logger.info(msg); } bulkImportReport.addEvent(msg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations. ImpalaConnector.databaseLock.lock(); @@ -201,7 +201,7 @@ public class FullTextsServiceImpl implements FullTextsService { if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); bulkImportReport.addEvent(mergeErrorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); FullTextsController.bulkImportDirs.remove(bulkImportDirName); return false; } @@ -210,7 +210,10 @@ public class FullTextsServiceImpl implements FullTextsService { String successMsg = "Finished the bulk-import procedure for '" + provenance + "' and bulkImportDir: " + bulkImportDirName; logger.info(successMsg); bulkImportReport.addEvent(successMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + // The report-file will be overwritten every now and then, instead of appended, since we want to add an update new JSON report-object each time. + // Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time, + // after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours). FullTextsController.bulkImportDirs.remove(bulkImportDirName); return true; @@ -246,7 +249,7 @@ public class FullTextsServiceImpl implements FullTextsService { if ( ((++counter) % 100) == 0 ) { // Every 100 files, report the status. bulkImportReport.addEvent("Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files."); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); } } @@ -255,7 +258,7 @@ public class FullTextsServiceImpl implements FullTextsService { String errorMsg = "No payload-records were generated for any of the files inside the bulkImportDir: " + bulkImportDirName; logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; } else if ( numOfPayloadRecords != numOfFilesInSegment ) { @@ -263,7 +266,7 @@ public class FullTextsServiceImpl implements FullTextsService { String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be imported, for segment-" + segmentCounter + "!"; logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); } // Construct the parquet file, upload it to HDFS and load them it in the "payload_bulk_import" table. @@ -275,7 +278,7 @@ public class FullTextsServiceImpl implements FullTextsService { if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: '" + parquetFileName + "'!"); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; } @@ -287,7 +290,7 @@ public class FullTextsServiceImpl implements FullTextsService { String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller. bulkImportReport.addEvent("Could not upload the parquet-file '" + parquetFileName + "' to HDFS!"); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; } @@ -299,7 +302,7 @@ public class FullTextsServiceImpl implements FullTextsService { if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) { ImpalaConnector.databaseLock.unlock(); bulkImportReport.addEvent("Could not load the payload-records to the database!"); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; } diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsService.java b/src/main/java/eu/openaire/urls_controller/services/UrlsService.java index 7593321..d6e3974 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsService.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsService.java @@ -3,13 +3,12 @@ package eu.openaire.urls_controller.services; import eu.openaire.urls_controller.models.UrlReport; import org.springframework.http.ResponseEntity; -import javax.servlet.http.HttpServletRequest; import java.util.List; public interface UrlsService { ResponseEntity getAssignments(String workerId, int assignmentsLimit); - ResponseEntity addWorkerReport(String curWorkerId, long curReportAssignments, List urlReports, int sizeOfUrlReports, HttpServletRequest request); + Boolean addWorkerReport(String curWorkerId, long curReportAssignments, List urlReports, int sizeOfUrlReports); } diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 7dbb969..d9ee265 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -2,10 +2,12 @@ package eu.openaire.urls_controller.services; import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.ParquetFileUtils; +import io.micrometer.core.annotation.Timed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -14,8 +16,9 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; +import org.springframework.web.client.HttpServerErrorException; +import org.springframework.web.client.RestTemplate; -import javax.servlet.http.HttpServletRequest; import java.io.File; import java.nio.file.Files; import java.nio.file.Paths; @@ -48,6 +51,9 @@ public class UrlsServiceImpl implements UrlsService { @Autowired private ParquetFileUtils parquetFileUtils; + @Value("${services.pdfaggregation.controller.workerReportsDirPath}") + private String workerReportsDirPath; + public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); private final AtomicInteger maxAttemptsPerRecordAtomic; @@ -97,6 +103,7 @@ public class UrlsServiceImpl implements UrlsService { } + @Timed(value = "getAssignments.time", description = "Time taken to return the assignments.") public ResponseEntity getAssignments(String workerId, int assignmentsLimit) { // Create the Assignments from the id-urls stored in the database up to the < assignmentsLimit >. @@ -225,12 +232,16 @@ public class UrlsServiceImpl implements UrlsService { } - public ResponseEntity addWorkerReport(String curWorkerId, long curReportAssignments, List urlReports, int sizeOfUrlReports, HttpServletRequest request) + @Timed(value = "addWorkerReport.time", description = "Time taken to add the WorkerReport.") + public Boolean addWorkerReport(String curWorkerId, long curReportAssignments, List urlReports, int sizeOfUrlReports) { + logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + assignmentsBatchCounter); + // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". - FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId); + FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, curReportAssignments, curWorkerId); if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) { - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!"); + postReportResultToWorker(curWorkerId, curReportAssignments, "Problem with the Impala-database!"); + return false; } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignments); @@ -248,7 +259,8 @@ public class UrlsServiceImpl implements UrlsService { } catch (Exception e) { String errorMsg = "Could not create the parquet-directory: " + localParquetPath; logger.error(errorMsg, e); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg); + return false; } logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments); @@ -263,8 +275,9 @@ public class UrlsServiceImpl implements UrlsService { SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures); ResponseEntity errorResponseEntity = sumParquetSuccess.getResponseEntity(); - if ( errorResponseEntity != null ) { - return errorResponseEntity; // The related log is already shown. + if ( errorResponseEntity != null ) { // The related log is already shown. + postReportResultToWorker(curWorkerId, curReportAssignments, "Error when creating or uploading the parquet files!"); + return false; } hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem(); hasPayloadParquetFileProblem = sumParquetSuccess.isPayloadParquetFileProblem(); @@ -309,11 +322,13 @@ public class UrlsServiceImpl implements UrlsService { if ( assignmentErrorMsg != null ) errorMsg += "\n" + assignmentErrorMsg; logger.error(errorMsg); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg); + return false; } catch (Exception e) { String errorMsg = "Unexpected error when inserting into the \"attempt\" and \"payload_aggregated\" tables in parallel! " + e.getMessage(); logger.error(errorMsg, e); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + postReportResultToWorker(curWorkerId, curReportAssignments, errorMsg); + return false; } finally { logger.debug("Deleting parquet directory: " + localParquetPath); fileUtils.deleteDirectory(new File(localParquetPath)); @@ -330,7 +345,8 @@ public class UrlsServiceImpl implements UrlsService { mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); + postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg); + return false; } } @@ -338,23 +354,29 @@ public class UrlsServiceImpl implements UrlsService { mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); + postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg); + return false; } } mergeErrorMsg = deleteWorkerAssignments(curWorkerId); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); + postReportResultToWorker(curWorkerId, curReportAssignments, mergeErrorMsg); + return false; } ImpalaConnector.databaseLock.unlock(); logger.debug("Finished merging the database tables."); - if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) - return ResponseEntity.status(HttpStatus.MULTI_STATUS).body("The full-text files failed to be acquired from the worker!"); - else - return ResponseEntity.status(HttpStatus.OK).build(); + if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { + postReportResultToWorker(curWorkerId, curReportAssignments, "The full-text files failed to be acquired from the worker!"); + return false; + } + + // Notify the Worker that the processing of this report was successful, so that the Worker can delete the files. + postReportResultToWorker(curWorkerId, curReportAssignments, null); + return true; } @@ -405,6 +427,39 @@ public class UrlsServiceImpl implements UrlsService { // We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload_aggregated" table for previously handled tasks. return fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId); } + + + private boolean postReportResultToWorker(String workerId, long assignmentRequestCounter, String errorMsg) + { + // Get the IP of this worker. + WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); + if ( workerInfo == null ) { + logger.error("Could not find any info for worker with id: \"" + workerId +"\"."); + return false; + } + String url = "http://" + workerInfo.getWorkerIP() + ":1881/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will not be null. + + if ( logger.isTraceEnabled() ) + logger.trace("Going to \"postReportResultToWorker\": \"" + workerId + "\", for assignments_" + assignmentRequestCounter + ((errorMsg != null) ? "\nError: " + errorMsg : "")); + + try { + ResponseEntity responseEntity = new RestTemplate().postForEntity(url, errorMsg, String.class); // We may pass a "null" entity. + int responseCode = responseEntity.getStatusCodeValue(); + if ( responseCode != HttpStatus.OK.value() ) { + logger.error("HTTP-Connection problem with the submission of the \"postReportResultToWorker\" of worker \"" + workerId + "\" and assignments_" + assignmentRequestCounter + "! Error-code was: " + responseCode); + return false; + } else { + fileUtils.deleteFile(workerReportsDirPath + "/" + workerId + "/" + workerId + "_assignments_" + assignmentRequestCounter + "_report.json"); + return true; + } + } catch (HttpServerErrorException hsee) { + logger.error("The Worker \"" + workerId + "\" failed to handle the \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage()); + return false; + } catch (Exception e) { + logger.error("Error for \"postReportResultToWorker\", of assignments_" + assignmentRequestCounter + " to the Worker: " + workerId, e); + return false; + } + } // The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution. diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index afb5e0b..ffbddc4 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -4,8 +4,10 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.models.Payload; import eu.openaire.urls_controller.models.UrlReport; +import eu.openaire.urls_controller.models.WorkerInfo; import org.apache.commons.io.FileDeleteStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,7 +17,6 @@ import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; -import javax.servlet.http.HttpServletRequest; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; @@ -124,17 +125,17 @@ public class FileUtils { // TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle? - public UploadFullTextsResponse getAndUploadFullTexts(List urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) { + public UploadFullTextsResponse getAndUploadFullTexts(List urlReports, long assignmentsBatchCounter, String workerId) { // The Controller have to request the files from the Worker, in order to upload them to the S3. // We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database. - if ( request == null ) { - logger.error("The \"HttpServletRequest\" is null!"); - return UploadFullTextsResponse.unsuccessful; - } - String remoteAddr = request.getHeader("X-FORWARDED-FOR"); - if ( (remoteAddr == null) || "".equals(remoteAddr) ) - remoteAddr = request.getRemoteAddr(); + String workerIp = null; + WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); + if ( workerInfo == null ) { + logger.error("Could not retrieve the info of worker: " + workerId); + return null; + } else + workerIp = workerInfo.getWorkerIP(); // This won't be null. // Get the file-locations. AtomicInteger numFullTextsFound = new AtomicInteger(); @@ -246,7 +247,7 @@ public class FileUtils { logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each)."); // Check if one full text is left out because of the division. Put it int the last batch. - String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; + String baseUrl = "http://" + workerIp + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; // TODO - The worker should send the port in which it accepts requests, along with the current request. // TODO - The least we have to do it to expose the port-assignment somewhere more obvious like inside the "application.yml" file. @@ -606,9 +607,11 @@ public class FileUtils { Lock fileWriteLock = new ReentrantLock(true); - public String writeToFile(String fileFullPath, String stringToWrite) + public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads) { - fileWriteLock.lock(); + if ( shouldLockThreads ) + fileWriteLock.lock(); + try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FileUtils.tenMb) ) { bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter. @@ -617,7 +620,8 @@ public class FileUtils { logger.error(errorMsg, e); return errorMsg; } finally { - fileWriteLock.unlock(); + if ( shouldLockThreads ) + fileWriteLock.unlock(); } return null; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d071076..44fc8b2 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -20,6 +20,7 @@ services: assignmentLimit: 10000 maxAttemptsPerRecord: 3 baseFilesLocation: tmp/ + workerReportsDirPath: /workerReports/ parquetLocalDirectoryPath: ${services.pdfaggregation.controller.baseFilesLocation}parquetFiles/ s3: endpoint: XA