UrlsController/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java

671 lines
38 KiB
Java

package eu.openaire.urls_controller.util;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.io.OutputFile;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
@Component
public class ParquetFileUtils {
private static final Logger logger = LoggerFactory.getLogger(ParquetFileUtils.class);
public final String parquetBaseLocalDirectoryPath;
// The "@Autowire" does not work in this case (gives an NPE when it's used).
private final FileUtils fileUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Value("${hdfs.baseUrl}")
public String webHDFSBaseUrl;
private final String hdfsHttpAuthString;
private final String hdfsUserName;
public static final String payloadSchemaFilePath = "schemas/payload.avsc";
private static final String attemptSchemaFilePath = "schemas/attempt.avsc";
public static Schema payloadsSchema = null;
public Schema attemptsSchema;
public final String parquetHDFSDirectoryPathAttempts;
// The "payloads_legacy" are not handled by the Controller (no data is added there), they just exist in the database and are taken into account when performing READ queries to the "payload" VIEW.
public final String parquetHDFSDirectoryPathPayloadsAggregated;
public final String parquetHDFSDirectoryPathPayloadsBulkImport;
public String mkDirsAndParams;
//public String setPermAndParams;
public String rmDirsAndParams;
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
@Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${services.pdfaggregation.controller.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath,
@Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir,
@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, FileUtils fileUtils) throws IOException
{
this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName;
if ( webHDFSBaseUrl.endsWith("/") ) // We don't wand an ending slash in the url (as it causes problems when the file=path is added).
this.webHDFSBaseUrl = webHDFSBaseUrl.substring(0, (webHDFSBaseUrl.length() -1));
else
this.webHDFSBaseUrl = webHDFSBaseUrl;
if ( (hdfsUserName != null) && !hdfsUserName.isEmpty() )
this.hdfsUserName = hdfsUserName; // Set it to be used later as a url-param.
else
throw new RuntimeException("No hdfs-username was given!");
// If the "hdfsHttpAuthString" is given, then use it right away. In general this is better from "privacy perspective".
if ( (hdfsHttpAuthString != null) && !hdfsHttpAuthString.isEmpty() )
this.hdfsHttpAuthString = hdfsHttpAuthString;
else if ( (hdfsPassword != null) && !hdfsPassword.isEmpty() ) {
String authValue = (this.hdfsUserName + ":" + hdfsPassword);
this.hdfsHttpAuthString = ("Basic " + Base64.getEncoder().encodeToString(authValue.getBytes(StandardCharsets.UTF_8)));
}
else // At this point, all credential-checks have been made and there is no way the Controller can continue.
throw new RuntimeException("No hdfs-credentials were given, in any form!");
//logger.trace("\"hdfsHttpAuthString\": " + this.hdfsHttpAuthString); // DEBUG!
if ( ! parquetBaseDirectoryPath.endsWith(File.separator) )
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath + File.separator;
else
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath;
// Create the local parquet file base directory, if it does not exist.
Files.createDirectories(Paths.get(this.parquetBaseLocalDirectoryPath)); // No-op if dir exists. It does not throw a "alreadyExistsException"
// Create the remote directories for uploading the parquet-files, if those directories do not exist.
// The limited-permissions user in use, does not have permission to access other users' created directories, so we have to make sure it creates its own.
if ( !hdfsParquetBaseDir.endsWith("/") )
hdfsParquetBaseDir += "/";
if ( isTestEnvironment ) // Make sure the hdfs-remote-dir is different for running tests, in order to not cause conflicts with production.
hdfsParquetBaseDir += "test/";
this.parquetHDFSDirectoryPathAttempts = hdfsParquetBaseDir + "attempts/";
this.parquetHDFSDirectoryPathPayloadsAggregated = hdfsParquetBaseDir + "payloads_aggregated/";
this.parquetHDFSDirectoryPathPayloadsBulkImport = hdfsParquetBaseDir + "payloads_bulk_import/";
this.fileUtils = fileUtils;
this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName; // All permissions for user, group and others must be set, in order for this service' user to have access to the hdfs directory.
//this.setPermAndParams = "?op=SETPERMISSION&permission=777&user.name=" + hdfsUserName;
this.rmDirsAndParams = "?op=DELETE&recursive=true&user.name=" + hdfsUserName;
createRemoteParquetDirectories(hdfsParquetBaseDir);
}
public static Schema parseSchema(String schemaResourcePath) {
try {
return (new Schema.Parser()).parse(new ClassPathResource(schemaResourcePath).getInputStream());
} catch (Throwable e) {
logger.error("", e);
return null;
}
}
public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
{
// Split the "UrlReports" into some sub-lists.
List<List<UrlReport>> subLists;
// Pre-define the tasks to run.
List<Callable<ParquetReport>> callableTasks = new ArrayList<>(6);
// One thread will handle the inserts to the "payload" table and the others to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala).
int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.2);
if ( sizeOfEachSubList > 10 )
{
subLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
// The above will create some sub-lists, each one containing 20% of total amount.
int subListsSize = subLists.size();
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath));
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts.
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath));
});
}
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful )
{
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved.
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method.
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
}
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
return new ParquetReport(ParquetReport.ParquetType.payload, createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/")));
});
}
if ( (attemptsSchema == null) // Parse the schema if it's not already parsed.
&& ((attemptsSchema = parseSchema(attemptSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the attemptsSchema! Exiting.."); // The cause is already logged inside the above method.
System.exit(89); // Exit the whole app, as it cannot add the results to the database!
}
return callableTasks;
}
public boolean createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignmentsCounter, String localParquetPath)
{
List<GenericData.Record> recordList = new ArrayList<>(urlReports.size());
GenericData.Record record;
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + "\n" + urlReport);
continue;
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try {
record = new GenericData.Record(attemptsSchema);
record.put("id", payload.getId());
record.put("original_url", payload.getOriginal_url());
Timestamp timestamp = payload.getTimestamp_acquired();
record.put("date", (timestamp != null) ? timestamp.getTime() : System.currentTimeMillis());
record.put("status", urlReport.getStatus().toString());
record.put("error_class", String.valueOf(error.getType()));
record.put("error_message", error.getMessage());
recordList.add(record);
} catch (Exception e) {
logger.error("Failed to create an attempt record!", e);
}
}
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No attempts are available to be inserted to the database!");
return false;
}
String fileName = curReportAssignmentsCounter + "_attempts_" + attemptsIncNum + ".parquet";
//logger.trace("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = localParquetPath + fileName;
if ( writeToParquet(recordList, attemptsSchema, fullFilePath) ) {
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "attempt" Impala table.
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/"));
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
} else
return false;
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
}
public boolean createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignmentsCounter, String localParquetPath, String parquetHDFSDirectoryPathPayloads)
{
List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2));
GenericData.Record record;
for ( UrlReport urlReport : urlReports )
{
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + "\n" + urlReport);
continue;
}
String fileLocation = payload.getLocation();
if ( fileLocation == null ) // We want only the records with uploaded full-texts in the "payload" table.
continue;
try {
record = new GenericData.Record(payloadsSchema);
record.put("id", payload.getId());
record.put("original_url", payload.getOriginal_url());
record.put("actual_url", payload.getActual_url());
Timestamp timestamp = payload.getTimestamp_acquired();
record.put("date", (timestamp != null) ? timestamp.getTime() : System.currentTimeMillis());
record.put("mimetype", payload.getMime_type());
Long size = payload.getSize();
record.put("size", ((size != null) ? String.valueOf(size) : null));
record.put("hash", payload.getHash());
record.put("location", fileLocation);
record.put("provenance", payload.getProvenance());
recordList.add(record);
} catch (Exception e) {
logger.error("Failed to create a payload record!", e);
}
}
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No payloads are available to be inserted to the database!");
return false;
}
String fileName = curReportAssignmentsCounter + "_payloads.parquet";
//logger.trace("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = localParquetPath + fileName;
if ( writeToParquet(recordList, payloadsSchema, fullFilePath) ) {
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "payload" Impala table.
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathPayloads);
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
} else
return false;
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
}
public boolean writeToParquet(List<GenericData.Record> recordList, Schema schema, String fullFilePath)
{
OutputFile outputFile;
try { // TODO - Verify that this will create any directories which do not exist in the provided path. Currently we create the directories beforehand.
outputFile = HadoopOutputFile.fromPath(new Path(fullFilePath), new Configuration());
//logger.trace("Created the parquet " + outputFile); // DEBUG!
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
logger.error("", e);
return false;
}
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile).withSchema(schema)
.withCompressionCodec(CompressionCodecName.GZIP).build())
{
// When the app runs inside a Docker Container, it is NOT guaranteed that all compression-types will work. For example, the "SNAPPY"-compression does NOT work, while the "GZIP" works.
// Also, we would prefer ZSTD over GZIP, but the old version of the Impala-Database does not support it..
//logger.trace("Going to write to \"" + fullFilePath + "\" the record list: " + recordList); // DEBUG!
for ( GenericRecord record : recordList ) {
//logger.trace("Writing to \"" + fullFilePath + "\" the record: " + record); // DEBUG!
writer.write(record);
}
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
String errorMsg = "Problem when creating the \"ParquetWriter\" object or when writing the records with it!";
if ( e instanceof org.apache.hadoop.fs.FileAlreadyExistsException )
logger.error(errorMsg + "\n" + e.getMessage());
else
logger.error(errorMsg, e);
// At some point, I got an "NoSuchMethodError", because of a problem in the AvroSchema file: (java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;).
// The error was with the schema: {"name": "date", "type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]},
return false;
}
//logger.trace("Done writing to \"" + fullFilePath + "\""); // DEBUG!
return true;
}
public String uploadParquetFileToHDFS(String parquetFileFullLocalPath, String parquetFileName, String parquetRemoteDirectory)
{
/*
Create the new file in path.
curl -u <USERNAME> -i -X PUT "https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1/tmp/parquet_uploads/newFile.parquet?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>"
Gives a redirect to a DATANODE.
LOCATION: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE...
Create a new PUT request.
curl -u <USERNAME> -i -X PUT -T <PATH_TO_LOCAL_FILE> "https://<DATANODE>:<PORT>/webhdfs/v1/tmp/parquet_uploads/newFile.parquet?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>"
*/
try {
// Check if the parquet file exists locally.
File parquetFile = new File(parquetFileFullLocalPath);
if ( ! parquetFile.isFile() ) {
String errorMsg = "The local parquet file \"" + parquetFileFullLocalPath + "\" does not exist!";
logger.error(errorMsg);
return errorMsg;
}
// https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1/tmp/parquet_uploads/<parquetFileName.parquet>?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>
String parquetFileURI = webHDFSBaseUrl + parquetRemoteDirectory + parquetFileName;
URL url = new URL(parquetFileURI + "?op=CREATE&overwrite=true&blocksize=1048576&permission=777&buffersize=10485760&user.name=" + hdfsUserName);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode != 307 ) {
String errorMsg = "We expected a \"307 TEMPORARY_REDIRECT\" response, but got: \"" + statusCode + "\" instead!";
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return errorMsg;
}
// We get the location in which we were redirected to. It's a "datanode".
String location = conn.getHeaderField("Location");
if ( location == null ) {
String errorMsg = "We expected a \"location\" from the last \"307 TEMPORARY_REDIRECT\" response, but got nothing..";
logger.error(errorMsg + "\n\n" + conn.getHeaderFields());
return errorMsg;
}
//logger.trace("The target location is: " + location + "\nWill do a silent redirect to HTTPS."); // DEBUG!
location = StringUtils.replace(location, "http:", "https:", 1);
// Unless we handle this here, we have to either complicate the process by handling the https-redirect or in any-way getting a hit in performance by having one more step each time ww want to upload a file.
conn = (HttpURLConnection) (new URL(location)).openConnection(); // This already contains the "user.name" parameter.
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setRequestProperty("content-type", "application/octet-stream");
conn.setDoOutput(true);
conn.setInstanceFollowRedirects(true); // It is possible that the "location" was an intermediate one.
conn.connect();
// Upload the parquet file.
try ( BufferedInputStream inputS = new BufferedInputStream(Files.newInputStream(parquetFile.toPath()), FileUtils.tenMb);
BufferedOutputStream outS = new BufferedOutputStream(conn.getOutputStream(), FileUtils.tenMb) )
{
int readByte = -1;
while ( (readByte = inputS.read()) != -1 )
outS.write(readByte);
} // Any exception will be caught in the end of this method.
statusCode = conn.getResponseCode();
if ( statusCode != 201 ) {
String errorMsg = "We expected a \"201 Created\" response, but got: \"" + statusCode + "\" instead, for url: " + location;
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return errorMsg;
}
location = conn.getHeaderField("Location"); // The response should have a "location" header as per the official documentation.
if ( location == null ) {
//logger.warn("We expected a \"location\" from the last \"201 Created\" response, but got nothing..\n" + conn.getHeaderFields() + "\n\nIt's fine."); // The "201-Created" response has no content inside.
// Do not return here. The absence of the location is not critical. We can still create it on our own.
location = parquetFileURI; // This location does not include the "user.name" parameter.
}
//logger.trace("The file \"" + parquetFileName + "\" was successfully uploaded. It's location is: " + location); // DEBUG!
// Important note!
// Using the "load data inpath" command, he files are MOVED, not copied! So we don't have to delete them afterwards.
// See: https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_load_data.html
} catch (Throwable e) {
String errorMsg = "Error while uploading parquet file \"" + parquetFileFullLocalPath + "\" to HDFS!\n" + e;
logger.error(errorMsg);
return errorMsg;
}
// The local parquet file will be deleted later.
return null;
}
public boolean loadParquetDataIntoTable(String remoteParquetDataLocation, String tableName)
{
// Import the data from the parquet file into the database's table.
String loadParquetInTableQuery = "load data inpath '" + remoteParquetDataLocation + "' into table " + DatabaseConnector.databaseName + "." + tableName;
try {
jdbcTemplate.execute(loadParquetInTableQuery);
} catch (Exception e) {
// Check if this error is related to the files be missing from the HDFS directory.
Throwable cause = e.getCause();
if ( cause instanceof SQLException ) { // In this case the "parent" exception is: "org.springframework.jdbc.UncategorizedSQLException".
String errorMsg = cause.getMessage();
if ( (errorMsg != null) && errorMsg.contains("contains no visible files") ) {
logger.error("The \"remoteParquetDataLocation\": \"" + remoteParquetDataLocation + "\" was found empty, when tried to load its content into the \"" + tableName + "\" table!");
return false; // Since each thread is using a different subDir, by design, This error is unacceptable.
}
}
DatabaseConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged.
return false;
}
//logger.trace("The data from \"" + remoteParquetDataLocation + "\" was loaded into the " + tableName + " table."); // DEBUG!
return true;
}
public boolean createRemoteParquetDirectories(String parquetBaseRemoteDirectory)
{
// Check if the remote directories exist. If so, then return and continue with execution.
// If the directories do not exist, then make them in two requests.
// The WebHDFS uses the "mkdirs" operations which creates all the non-existent directories in the specified path.
// So with one request we will create the "parquet_uploads/" and the "parquet_uploads/attempts/" and with the seconds request, the "parquet_uploads/payloads/" directory.
logger.info("Going to check if the remote parquet directories exist.");
String initErrMsg = "The remote parquet HDFS-directory ";
String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName;
boolean attemptCreationSuccessful = true;
boolean payloadAggregatedCreationSuccessful = true;
boolean payloadBulkImportCreationSuccessful = true;
// Get the "fileStatuses" of the directories (main and subdirectories) in one request.
try {
URL url = new URL(listMainDirectoryUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( (statusCode != 200) && (statusCode != 404) ) {
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + listMainDirectoryUrl;
String retrievedMessage = fileUtils.getMessageFromResponseBody(conn, true);
logger.error(errorMsg + "\n\n" + ((retrievedMessage != null) ? retrievedMessage : ""));
return false;
}
if ( statusCode == 404 ) {
logger.info(initErrMsg + "\"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
// We need all directories to be created in order for the app to function properly!
}
// Check the json-response, to see if all the subdirectories exist.
// Take the json-response.
String jsonResponse = fileUtils.getMessageFromResponseBody(conn, false);
if ( (jsonResponse == null) || jsonResponse.isEmpty() ) {
logger.error("The \"jsonResponse\" was not retrieved!");
return false;
}
// Else, if an error message exists inside the response, then we will be alerted when parsing the Json bellow.
//logger.trace("\"jsonResponse\":\n" + jsonResponse); // DEBUG!
boolean foundAttemptsDir = false;
boolean foundPayloadsAggregatedDir = false;
boolean foundPayloadsBulkImportDir = false;
try { // Parse the jsonData
JSONObject jObj = new JSONObject(jsonResponse); // Construct a JSONObject from the retrieved jsonData.
JSONObject entityObject = jObj.getJSONObject("FileStatuses");
//logger.trace("EntityObject: " + entityObject.toString()); // DEBUG!
JSONArray directoryStatuses = entityObject.getJSONArray("FileStatus");
//logger.trace("directoryStatuses: " + directoryStatuses.toString()); // DEBUG!
// In case no fileStatuses are found, the follow for-loop will not run.
for ( Object fileStatusObject : directoryStatuses ) {
JSONObject fileStatusJsonObject = (JSONObject) fileStatusObject;
//logger.trace("FileStatusJsonObject: " + fileStatusJsonObject.toString()); // DEBUG!
String dirPath = fileStatusJsonObject.getString("pathSuffix");
//logger.trace("DirPath: " + dirPath); // DEBUG!
if ( dirPath.equals("attempts") )
foundAttemptsDir = true;
else if ( dirPath.equals("payloads_aggregated") )
foundPayloadsAggregatedDir = true;
else if ( dirPath.equals("payloads_bulk_import") )
foundPayloadsBulkImportDir = true;
else
logger.warn("Unknown remote parquet HDFS-directory found: " + dirPath);
}
} catch (JSONException je) { // In case any of the above "json-keys" was not found.
logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse);
return false;
}
// IMPORTANT NOTE: It is possible that the ".../attempts" dir exists, but the ".../payloads" dir does not and vise-versa (in case of remote filesystem failure of by accidental deletion by some other user).
// Also, it is possible that the Controller was terminated before creating all the directories, or that in the previous executions the second "create"-request failed, resulting in Controller's shut down.
// For each missing subdirectories, run the mkDirs-request.
if ( !foundAttemptsDir ) {
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it.");
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
} else
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
if ( !foundPayloadsAggregatedDir ) {
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it.");
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
} else
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists.");
if ( !foundPayloadsBulkImportDir ) {
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it.");
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
} else
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists.");
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
// We need all directories to be created in order for the app to function properly!
} catch (Exception e) {
logger.error("", e);
return false;
}
}
public boolean applyHDFOperation(String hdfsOperationUrl)
{
try {
URL url = new URL(hdfsOperationUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(hdfsOperationUrl.contains("DELETE") ? "DELETE" : "PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode == -1 ) {
logger.error("Problem when getting the \"status-code\" for url: " + hdfsOperationUrl);
return false;
}
else if ( statusCode != 200 ) {
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + hdfsOperationUrl;
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return false;
}
if ( logger.isTraceEnabled() )
logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + "\n" + fileUtils.getMessageFromResponseBody(conn, false));
} catch (Exception e) {
logger.error("", e);
return false;
}
return true;
}
public SumParquetSuccess checkParquetFilesSuccess(List<Future<ParquetReport>> futures)
{
int numOfAllAttemptParquetFileCreations = 0;
int numOfFailedAttemptParquetFileCreations = 0;
int numOfAllPayloadParquetFileCreations = 0;
int numOfFailedPayloadParquetFileCreations = 0;
for ( Future<ParquetReport> future : futures )
{
ParquetReport parquetReport = null;
try {
parquetReport = future.get();
boolean hasProblems = (! parquetReport.isSuccessful());
ParquetReport.ParquetType parquetType = parquetReport.getParquetType();
if ( parquetType.equals(ParquetReport.ParquetType.attempt) ) {
numOfAllAttemptParquetFileCreations++;
if ( hasProblems )
numOfFailedAttemptParquetFileCreations++;
} else if ( parquetType.equals(ParquetReport.ParquetType.payload) ) {
numOfAllPayloadParquetFileCreations ++;
if ( hasProblems )
numOfFailedPayloadParquetFileCreations ++;
} else {
String errMsg = "An invalid \"ParquetReport.ParquetType\" was found: " + parquetType; // This should never happen, but anyway.
logger.error(errMsg);
return new SumParquetSuccess(false, false, ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errMsg));
}
} catch (Exception e) {
logger.error("", e);
// We do not know if the failed "future" refers to a "payload" or to a "attempt".
// So we cannot increase a specific counter. That's ok, the only drawback if that we may try to "load" the non-existent data and get an exception.
}
} // End-for
boolean hasAttemptParquetFileProblem = (numOfFailedAttemptParquetFileCreations == numOfAllAttemptParquetFileCreations);
boolean hasPayloadParquetFileProblem = (numOfFailedPayloadParquetFileCreations == numOfAllPayloadParquetFileCreations);
return new SumParquetSuccess(hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, null);
}
}