- Use separate HDFS subdirectories for each worker in order to avoid seeing exceptions about "empty hdfs directory" when "loading" data to the database, because one worker has loaded data generated by multiple workers (since we use only 1 load operation for multiple parquet files).

- Store each worker's info in a hash-table, in order to efficiently know if we need to create new hdfs subdirectories. Also, this will help to issue "shutdown" requests to the workers in the future, as well as to know which worker has shutdown.
This commit is contained in:
Lampros Smyrnaios 2023-05-15 13:12:20 +03:00
parent 9412391903
commit f51a34138f
4 changed files with 114 additions and 17 deletions

View File

@ -1,8 +1,10 @@
package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.models.WorkerInfo;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.UrlsService;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -13,6 +15,7 @@ import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
@ -25,6 +28,8 @@ public class UrlsController {
@Autowired
private UrlsService urlsService;
@Autowired
private ParquetFileUtils parquetFileUtils;
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
@ -32,8 +37,25 @@ public class UrlsController {
private int assignmentLimit;
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
// TODO - Implement an endpoint in the Controller to request the Controller to shutdown everything.
// The controller will make sure that it has finished with requesting the full-texts and sent a "shutDownRequest" to each worker (as we will have its IP)
// (some shutdown may fail (for any reason), but that should not halt the process ?)
// after the shut-Down-request have been sent the endpoint return the message that the shutdown process is in progress.
// TODO - Make another endpoint in the Controller to take POST requests from the workers about their shutdown-process.
// This endpoint will assign to the worker-s hashmap the value of "hashShutdown=true", and check if all the workers have finished.
// So, if we have the info that the current shutdown worker in the last one, then show a log-message and shutdown the Controller.
// TODO - Will the "last one" be the "actual last one" ? What if we had 7 workers but one crashed and now we have 6 workers to shutdown properly but the 7th worker seems to be still working..?
// In that case, we can cross it out easily, as the Controller will get either a "Connection refused" or a "connection timeout", depending on the state of the worker.
// TODO - Make the Worker to sent a POST request to the Controller to notify it that is has finished all work and it is about to close.
@GetMapping("")
public ResponseEntity<?> getAssignments(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
public ResponseEntity<?> getAssignments(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit, HttpServletRequest request) {
// As the Impala-driver is buggy and struggles to support parameterized queries in some types of prepared-statements, we have to sanitize the "workerId" ourselves.
if ( MALICIOUS_INPUT_STRING.matcher(workerId).matches() ) {
@ -55,6 +77,35 @@ public class UrlsController {
assignmentsLimit = assignmentLimit;
}
if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!");
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
String remoteAddr = request.getHeader("X-FORWARDED-FOR");
if ( (remoteAddr == null) || "".equals(remoteAddr) )
remoteAddr = request.getRemoteAddr();
WorkerInfo workerInfo = workersInfoMap.get(workerId);
if ( workerInfo != null ) { // This worker has already been identified.
String savedWorkerIp = workerInfo.getWorkerIP();
if ( savedWorkerIp.equals(remoteAddr) ) {
logger.warn("The worker with id \"" + workerId + "\" has changed IP from \"" + savedWorkerIp + "\" to \"" + remoteAddr + "\".");
workerInfo.setWorkerIP(remoteAddr); // Set the new IP. The update will be reflected in the map.
}
} else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP and create the remote parquet subdirectories (in HDFS).");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
// Create extra subdirectories in HDFS parquet-directories, so that the parquet directory does not become empty right before "loading" the data to the DB, in case another worker loaded multiple-worker's data to the DB.
String endingDirAndParams = workerId + "/" + parquetFileUtils.mkDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingDirAndParams)
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingDirAndParams) )
{
String errorMsg = "Error when creating the HDFS sub-directories for worker with id: " + workerId;
logger.error(errorMsg);
return ResponseEntity.internalServerError().body(errorMsg);
}
}
return urlsService.getAssignments(workerId, assignmentsLimit);
}

View File

@ -0,0 +1,47 @@
package eu.openaire.urls_controller.models;
public class WorkerInfo {
String workerIP;
boolean hasShutdown;
// TODO - Add other info
public WorkerInfo() {
}
public WorkerInfo(String workerIP, boolean hasShutdown) {
this.workerIP = workerIP;
this.hasShutdown = hasShutdown;
}
public String getWorkerIP() {
return workerIP;
}
public void setWorkerIP(String workerIP) {
this.workerIP = workerIP;
}
public boolean getHasShutdown() {
return hasShutdown;
}
public void setHasShutdown(boolean hasShutdown) {
this.hasShutdown = hasShutdown;
}
@Override
public String toString() {
return "WorkerInfo{" +
"workerIP='" + workerIP + '\'' +
", hasShutdown=" + hasShutdown +
'}';
}
}

View File

@ -242,18 +242,18 @@ public class UrlsServiceImpl implements UrlsService {
} else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments);
String currentParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignments + File.separator;
String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignments + File.separator;
try {
Files.createDirectories(Paths.get(currentParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException"
Files.createDirectories(Paths.get(localParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException"
} catch (Exception e) {
String errorMsg = "Could not create the parquet-directory: " + currentParquetPath;
String errorMsg = "Could not create the parquet-directory: " + localParquetPath;
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments);
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignments, currentParquetPath, uploadFullTextsResponse);
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignments, localParquetPath, uploadFullTextsResponse, curWorkerId);
boolean hasAttemptParquetFileProblem = false;
boolean hasPayloadParquetFileProblem = false;
@ -316,8 +316,8 @@ public class UrlsServiceImpl implements UrlsService {
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
logger.debug("Deleting parquet directory: " + currentParquetPath);
fileUtils.deleteDirectory(new File(currentParquetPath));
logger.debug("Deleting parquet directory: " + localParquetPath);
fileUtils.deleteDirectory(new File(localParquetPath));
}
logger.debug("Going to merge the parquet files for the tables which were altered.");

View File

@ -149,7 +149,7 @@ public class ParquetFileUtils {
}
public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse, String workerId)
{
// Split the "UrlReports" into some sub-lists.
List<List<UrlReport>> subLists;
@ -167,27 +167,26 @@ public class ParquetFileUtils {
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath));
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath, workerId));
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts.
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath));
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath, workerId));
});
}
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful )
{
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved.
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method.
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
}
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath, parquetHDFSDirectoryPathPayloadsAggregated));
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + workerId + "/")));
});
}
@ -201,7 +200,7 @@ public class ParquetFileUtils {
}
public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String localParquetPath, String workerId)
{
List<GenericData.Record> recordList = new ArrayList<>(urlReports.size());
GenericData.Record record;
@ -243,12 +242,12 @@ public class ParquetFileUtils {
String fileName = UrlsServiceImpl.assignmentsBatchCounter.get() + "_attempts_" + attemptsIncNum + ".parquet";
//logger.trace("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = currentParquetPath + fileName;
String fullFilePath = localParquetPath + fileName;
if ( writeToParquet(recordList, attemptsSchema, fullFilePath) ) {
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "attempt" Impala table.
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathAttempts);
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + workerId + "/"));
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
} else
return false;
@ -258,7 +257,7 @@ public class ParquetFileUtils {
}
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath, String parquetHDFSDirectoryPathPayloads)
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String localParquetPath, String parquetHDFSDirectoryPathPayloads)
{
List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2));
GenericData.Record record;
@ -303,7 +302,7 @@ public class ParquetFileUtils {
String fileName = UrlsServiceImpl.assignmentsBatchCounter.get() + "_payloads.parquet";
//logger.trace("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = currentParquetPath + fileName;
String fullFilePath = localParquetPath + fileName;
if ( writeToParquet(recordList, payloadsSchema, fullFilePath) ) {
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!