diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java index ef1a3cb..0de7c2d 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlsController.java @@ -1,8 +1,10 @@ package eu.openaire.urls_controller.controllers; import eu.openaire.urls_controller.models.UrlReport; +import eu.openaire.urls_controller.models.WorkerInfo; import eu.openaire.urls_controller.payloads.requests.WorkerReport; import eu.openaire.urls_controller.services.UrlsService; +import eu.openaire.urls_controller.util.ParquetFileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -13,6 +15,7 @@ import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; @@ -25,6 +28,8 @@ public class UrlsController { @Autowired private UrlsService urlsService; + @Autowired + private ParquetFileUtils parquetFileUtils; private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*"); @@ -32,8 +37,25 @@ public class UrlsController { private int assignmentLimit; + public static final ConcurrentHashMap workersInfoMap = new ConcurrentHashMap<>(6); + + + // TODO - Implement an endpoint in the Controller to request the Controller to shutdown everything. + // The controller will make sure that it has finished with requesting the full-texts and sent a "shutDownRequest" to each worker (as we will have its IP) + // (some shutdown may fail (for any reason), but that should not halt the process ?) + // after the shut-Down-request have been sent the endpoint return the message that the shutdown process is in progress. + + // TODO - Make another endpoint in the Controller to take POST requests from the workers about their shutdown-process. + // This endpoint will assign to the worker-s hashmap the value of "hashShutdown=true", and check if all the workers have finished. + // So, if we have the info that the current shutdown worker in the last one, then show a log-message and shutdown the Controller. + // TODO - Will the "last one" be the "actual last one" ? What if we had 7 workers but one crashed and now we have 6 workers to shutdown properly but the 7th worker seems to be still working..? + // In that case, we can cross it out easily, as the Controller will get either a "Connection refused" or a "connection timeout", depending on the state of the worker. + + // TODO - Make the Worker to sent a POST request to the Controller to notify it that is has finished all work and it is about to close. + + @GetMapping("") - public ResponseEntity getAssignments(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) { + public ResponseEntity getAssignments(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit, HttpServletRequest request) { // As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves. if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) { @@ -55,6 +77,35 @@ public class UrlsController { assignmentsLimit = assignmentLimit; } + if ( request == null ) { + logger.error("The \"HttpServletRequest\" is null!"); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); + } + String remoteAddr = request.getHeader("X-FORWARDED-FOR"); + if ( (remoteAddr == null) || "".equals(remoteAddr) ) + remoteAddr = request.getRemoteAddr(); + + WorkerInfo workerInfo = workersInfoMap.get(workerId); + if ( workerInfo != null ) { // This worker has already been identified. + String savedWorkerIp = workerInfo.getWorkerIP(); + if ( savedWorkerIp.equals(remoteAddr) ) { + logger.warn("The worker with id \"" + workerId + "\" has changed IP from \"" + savedWorkerIp + "\" to \"" + remoteAddr + "\"."); + workerInfo.setWorkerIP(remoteAddr); // Set the new IP. The update will be reflected in the map. + } + } else { + logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP and create the remote parquet subdirectories (in HDFS)."); + workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false)); + // Create extra subdirectories in HDFS parquet-directories, so that the parquet directory does not become empty right before "loading" the data to the DB, in case another worker loaded multiple-worker's data to the DB. + String endingDirAndParams = workerId + "/" + parquetFileUtils.mkDirsAndParams; + if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingDirAndParams) + || !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingDirAndParams) ) + { + String errorMsg = "Error when creating the HDFS sub-directories for worker with id: " + workerId; + logger.error(errorMsg); + return ResponseEntity.internalServerError().body(errorMsg); + } + } + return urlsService.getAssignments(workerId, assignmentsLimit); } diff --git a/src/main/java/eu/openaire/urls_controller/models/WorkerInfo.java b/src/main/java/eu/openaire/urls_controller/models/WorkerInfo.java new file mode 100644 index 0000000..d4636d9 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/models/WorkerInfo.java @@ -0,0 +1,47 @@ +package eu.openaire.urls_controller.models; + +public class WorkerInfo { + + + String workerIP; + + boolean hasShutdown; + + // TODO - Add other info + + + public WorkerInfo() { + } + + + public WorkerInfo(String workerIP, boolean hasShutdown) { + this.workerIP = workerIP; + this.hasShutdown = hasShutdown; + } + + public String getWorkerIP() { + return workerIP; + } + + public void setWorkerIP(String workerIP) { + this.workerIP = workerIP; + } + + + public boolean getHasShutdown() { + return hasShutdown; + } + + public void setHasShutdown(boolean hasShutdown) { + this.hasShutdown = hasShutdown; + } + + + @Override + public String toString() { + return "WorkerInfo{" + + "workerIP='" + workerIP + '\'' + + ", hasShutdown=" + hasShutdown + + '}'; + } +} diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index eb8487b..89d7ef2 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -242,18 +242,18 @@ public class UrlsServiceImpl implements UrlsService { } else logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments); - String currentParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignments + File.separator; + String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignments + File.separator; try { - Files.createDirectories(Paths.get(currentParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException" + Files.createDirectories(Paths.get(localParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException" } catch (Exception e) { - String errorMsg = "Could not create the parquet-directory: " + currentParquetPath; + String errorMsg = "Could not create the parquet-directory: " + localParquetPath; logger.error(errorMsg, e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments); - List> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignments, currentParquetPath, uploadFullTextsResponse); + List> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignments, localParquetPath, uploadFullTextsResponse, curWorkerId); boolean hasAttemptParquetFileProblem = false; boolean hasPayloadParquetFileProblem = false; @@ -316,8 +316,8 @@ public class UrlsServiceImpl implements UrlsService { logger.error(errorMsg, e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } finally { - logger.debug("Deleting parquet directory: " + currentParquetPath); - fileUtils.deleteDirectory(new File(currentParquetPath)); + logger.debug("Deleting parquet directory: " + localParquetPath); + fileUtils.deleteDirectory(new File(localParquetPath)); } logger.debug("Going to merge the parquet files for the tables which were altered."); diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index a6e0296..5439f56 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -149,7 +149,7 @@ public class ParquetFileUtils { } - public List> getTasksForCreatingAndUploadingParquetFiles(List urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse) + public List> getTasksForCreatingAndUploadingParquetFiles(List urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse, String workerId) { // Split the "UrlReports" into some sub-lists. List> subLists; @@ -167,27 +167,26 @@ public class ParquetFileUtils { for ( int i = 0; i < subListsSize; ++i ) { int finalI = i; callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. - return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath)); + return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath, workerId)); }); } } else { // If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts. callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. - return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath)); + return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath, workerId)); }); } if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful ) { // At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved. - if ( (payloadsSchema == null) // Parse the schema if it's not already parsed. && ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) { logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method. System.exit(88); // Exit the whole app, as it cannot add the results to the database! } callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount. - return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath, parquetHDFSDirectoryPathPayloadsAggregated)); + return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + workerId + "/"))); }); } @@ -201,7 +200,7 @@ public class ParquetFileUtils { } - public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List urlReports, long curReportAssignments, String currentParquetPath) + public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List urlReports, long curReportAssignments, String localParquetPath, String workerId) { List recordList = new ArrayList<>(urlReports.size()); GenericData.Record record; @@ -243,12 +242,12 @@ public class ParquetFileUtils { String fileName = UrlsServiceImpl.assignmentsBatchCounter.get() + "_attempts_" + attemptsIncNum + ".parquet"; //logger.trace("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG! - String fullFilePath = currentParquetPath + fileName; + String fullFilePath = localParquetPath + fileName; if ( writeToParquet(recordList, attemptsSchema, fullFilePath) ) { //logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG! // Upload and insert the data to the "attempt" Impala table. - String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathAttempts); + String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + workerId + "/")); return (errorMsg == null); // The possible error-message returned, is already logged by the Controller. } else return false; @@ -258,7 +257,7 @@ public class ParquetFileUtils { } - public boolean createAndLoadParquetDataIntoPayloadTable(List urlReports, long curReportAssignments, String currentParquetPath, String parquetHDFSDirectoryPathPayloads) + public boolean createAndLoadParquetDataIntoPayloadTable(List urlReports, long curReportAssignments, String localParquetPath, String parquetHDFSDirectoryPathPayloads) { List recordList = new ArrayList<>((int) (urlReports.size() * 0.2)); GenericData.Record record; @@ -303,7 +302,7 @@ public class ParquetFileUtils { String fileName = UrlsServiceImpl.assignmentsBatchCounter.get() + "_payloads.parquet"; //logger.trace("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG! - String fullFilePath = currentParquetPath + fileName; + String fullFilePath = localParquetPath + fileName; if ( writeToParquet(recordList, payloadsSchema, fullFilePath) ) { //logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!