diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index 4857833..d0e934f 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -72,7 +72,6 @@ public class ParquetFileUtils { public Schema payloadsSchema; public Schema attemptsSchema; - public final String parquetHDFSDirectoryPathAttempts; public final String parquetHDFSDirectoryPathPayloads; @@ -80,7 +79,8 @@ public class ParquetFileUtils { public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl, @Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${output.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath, - @Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir, FileUtils fileUtils) throws IOException + @Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir, + @Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, FileUtils fileUtils) throws IOException { if ( webHDFSBaseUrl.endsWith("/") ) // We don't wand an ending slash in the url (as it causes problems when the file=path is added). this.webHDFSBaseUrl = webHDFSBaseUrl.substring(0, (webHDFSBaseUrl.length() -1)); @@ -119,6 +119,9 @@ public class ParquetFileUtils { if ( !hdfsParquetBaseDir.endsWith("/") ) hdfsParquetBaseDir += "/"; + if ( isTestEnvironment ) // Make sure the hdfs-remote-dir is different for running tests, in order to not cause conflicts with production. + hdfsParquetBaseDir += "test/"; + this.parquetHDFSDirectoryPathPayloads = hdfsParquetBaseDir + "payloads/"; this.parquetHDFSDirectoryPathAttempts = hdfsParquetBaseDir + "attempts/"; this.fileUtils = fileUtils; @@ -450,14 +453,17 @@ public class ParquetFileUtils { // Check if the remote directories exist. If so, then return and continue with execution. // If the directories do not exist, then make them in two requests. // The WebHDFS uses the "mkdirs" operations which creates all the non-existent directories in the specified path. - // So with one request we will create the "parquet_uploads/" and the "parquet_uploads/payloads/" and with the seconds request, the "parquet_uploads/attempts/" directory. + // So with one request we will create the "parquet_uploads/" and the "parquet_uploads/attempts/" and with the seconds request, the "parquet_uploads/payloads/" directory. - String mkdirsParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName; + String mkDirsParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName; logger.info("Going to check if the remote parquet directories exist."); String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName; + boolean payloadCreationSuccessful = true; + boolean attemptCreationSuccessful = true; + // Get the "fileStatuses" of the directories (main and subdirectories) in one request. try { URL url = new URL(listMainDirectoryUrl); @@ -476,9 +482,8 @@ public class ParquetFileUtils { if ( statusCode == 404 ) { logger.info("The directory \"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories."); - createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkdirsParams); - createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkdirsParams); - return true; + attemptCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsParams); + payloadCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkDirsParams); } else { // Check the json-response, to see if all the subdirectories exist. @@ -522,19 +527,19 @@ public class ParquetFileUtils { return false; } - // IMPORTANT NOTE: It is possible that the ".../payloads" dir exists, but the ".../attempts" dir does not (in case of remote filesystem failure of by accidental deletion by some other user). + // IMPORTANT NOTE: It is possible that the ".../attempts" dir exists, but the ".../payloads" dir does not and vise-versa (in case of remote filesystem failure of by accidental deletion by some other user). // Also, it is possible that the Controller was terminated before creating all the directories, or that in the previous executions the second "create"-request failed, resulting in Controller's shut down. - // For each missing subdirectories, run the mkdirs-request. + // For each missing subdirectories, run the mkDirs-request. if ( !foundAttemptsDir ) { logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it."); - createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkdirsParams); + attemptCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsParams); } else logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" exists."); if ( !foundPayloadsDir ) { logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloads + "\" does not exist! Going to create it."); - createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkdirsParams); + payloadCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkDirsParams); } else logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloads + "\" exists."); } @@ -543,7 +548,8 @@ public class ParquetFileUtils { return false; } - return true; + return (attemptCreationSuccessful && payloadCreationSuccessful); + // We need both to be created in order for the app to function properly! }