UrlsController/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java

590 lines
30 KiB
Java

package eu.openaire.urls_controller.util;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.controllers.UrlController;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.io.OutputFile;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.Callable;
@Component
public class ParquetFileUtils {
private static final Logger logger = LoggerFactory.getLogger(ParquetFileUtils.class);
public final String parquetBaseLocalDirectoryPath;
// The "@Autowire" does not work in this case (gives an NPE when it's used).
private final FileUtils fileUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Value("${hdfs.baseUrl}")
private String webHDFSBaseUrl;
private final String hdfsHttpAuthString;
private final String hdfsUserName;
@Value("${schema.payload.filePath}")
String payloadSchemaFilePath;
@Value("${schema.attempt.filePath}")
String attemptSchemaFilePath;
public Schema payloadsSchema;
public Schema attemptsSchema;
public final String parquetHDFSDirectoryPathAttempts;
public final String parquetHDFSDirectoryPathPayloads;
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
@Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${output.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath,
@Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir, FileUtils fileUtils) throws IOException
{
if ( webHDFSBaseUrl.endsWith("/") ) // We don't wand an ending slash in the url (as it causes problems when the file=path is added).
this.webHDFSBaseUrl = webHDFSBaseUrl.substring(0, (webHDFSBaseUrl.length() -1));
else
this.webHDFSBaseUrl = webHDFSBaseUrl;
if ( (hdfsUserName != null) && !hdfsUserName.isEmpty() )
this.hdfsUserName = hdfsUserName; // Set it to be used later as a url-param.
else
throw new RuntimeException("No hdfs-username was given!");
// If the "hdfsHttpAuthString" is given, then use it right away. In general this is better from "privacy perspective".
if ( (hdfsHttpAuthString != null) && !hdfsHttpAuthString.isEmpty() )
this.hdfsHttpAuthString = hdfsHttpAuthString;
else if ( (hdfsPassword != null) && !hdfsPassword.isEmpty() ) {
String authValue = (this.hdfsUserName + ":" + hdfsPassword);
this.hdfsHttpAuthString = ("Basic " + Base64.getEncoder().encodeToString(authValue.getBytes(StandardCharsets.UTF_8)));
}
else // At this point, all credential-checks have been made and there is no way the Controller can continue.
throw new RuntimeException("No hdfs-credentials were given, in any form!");
//logger.debug("\"hdfsHttpAuthString\": " + this.hdfsHttpAuthString); // DEBUG!
if ( ! parquetBaseDirectoryPath.endsWith(File.separator) )
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath + File.separator;
else
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath;
java.nio.file.Path parquetDirPath = Paths.get(this.parquetBaseLocalDirectoryPath);
if ( !Files.isDirectory(parquetDirPath) ) {
Files.createDirectories(parquetDirPath);
}
// Create the remote directories for uploading the parquet-files, if those directories do not exist.
// The limited-permissions user in use, does not have permission to acces other users' created directories, so we have to make sure it creates its own.
if ( !hdfsParquetBaseDir.endsWith("/") )
hdfsParquetBaseDir += "/";
this.parquetHDFSDirectoryPathPayloads = hdfsParquetBaseDir + "payloads/";
this.parquetHDFSDirectoryPathAttempts = hdfsParquetBaseDir + "attempts/";
this.fileUtils = fileUtils;
createRemoteParquetDirectories(hdfsParquetBaseDir);
}
public Schema parseSchema(String schemaFilePath) {
try {
return (new Schema.Parser()).parse(Files.newInputStream(Paths.get(schemaFilePath)));
} catch (Throwable e) {
logger.error("", e);
return null;
}
}
public List<Callable<Void>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
{
// Split the "UrlReports" into some sub-lists.
List<List<UrlReport>> subLists;
// Pre-define the tasks to run.
List<Callable<Void>> callableTasks = new ArrayList<>(6);
// One thread will handle the inserts to the "payload" table and the others to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala).
int sizeOfEachSubList = (int)(sizeOUrlReports * 0.2);
if ( sizeOfEachSubList > 10 )
{
subLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
// The above will create some sub-lists, each one containing 20% of total amount.
int subListsSize = subLists.size();
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath);
return null;
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts.
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath);
return null;
});
}
if ( uploadFullTextsResponse != FileUtils.UploadFullTextsResponse.unsuccessful )
{
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the payloadsSchema. Exiting.."); // The cause is already logged inside the above method.
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
}
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath);
return null;
});
}
if ( (attemptsSchema == null) // Parse the schema if it's not already parsed.
&& ((attemptsSchema = parseSchema(attemptSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the attemptsSchema. Exiting.."); // The cause is already logged inside the above method.
System.exit(89); // Exit the whole app, as it cannot add the results to the database!
}
return callableTasks;
}
public void createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
{
List<GenericData.Record> recordList = new ArrayList<>();
GenericData.Record record;
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try {
record = new GenericData.Record(attemptsSchema);
record.put("id",payload.getId());
record.put("original_url", payload.getOriginal_url());
Timestamp timestamp = payload.getTimestamp_acquired();
record.put("date", (timestamp != null) ? timestamp.getTime() : System.currentTimeMillis());
record.put("status", urlReport.getStatus().toString());
record.put("error_class", String.valueOf(error.getType()));
record.put("error_message", error.getMessage());
recordList.add(record);
} catch (Exception e) {
logger.error("Failed to create an attempt record!", e);
}
}
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No attempts are available to be inserted to the database!");
return;
}
String fileName = UrlController.assignmentsBatchCounter.get() + "_attempts_" + attemptsIncNum + ".parquet";
//logger.debug("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = currentParquetPath + fileName;
if ( writeToParquet(recordList, attemptsSchema, fullFilePath) ) {
//logger.debug("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "attempt" Impala table.
uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathAttempts);
// The possible error-message returned from the above method, is already logged by the Controller.
}
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
}
public void createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
{
List<GenericData.Record> recordList = new ArrayList<>();
GenericData.Record record;
for ( UrlReport urlReport : urlReports )
{
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
String fileLocation = payload.getLocation();
if ( fileLocation == null ) // We want only the records with uploaded full-texts in the "payload" table.
continue;
try {
record = new GenericData.Record(payloadsSchema);
record.put("id", payload.getId());
record.put("original_url", payload.getOriginal_url());
record.put("actual_url", payload.getActual_url());
Timestamp timestamp = payload.getTimestamp_acquired();
record.put("date", (timestamp != null) ? timestamp.getTime() : System.currentTimeMillis());
record.put("mimetype", payload.getMime_type());
Long size = payload.getSize();
record.put("size", ((size != null) ? String.valueOf(size) : null));
record.put("hash", payload.getHash());
record.put("location", fileLocation);
record.put("provenance", payload.getProvenance());
recordList.add(record);
} catch (Exception e) {
logger.error("Failed to create a payload record!", e);
}
}
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No payloads are available to be inserted to the database!");
return;
}
String fileName = UrlController.assignmentsBatchCounter.get() + "_payloads.parquet";
//logger.debug("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = currentParquetPath + fileName;
if ( writeToParquet(recordList, payloadsSchema, fullFilePath) ) {
//logger.debug("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "payload" Impala table.
uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathPayloads);
// The possible error-message returned from the above method, is already logged by the Controller.
}
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
}
public boolean writeToParquet(List<GenericData.Record> recordList, Schema schema, String fullFilePath)
{
OutputFile outputFile;
try {
outputFile = HadoopOutputFile.fromPath(new Path(fullFilePath), new Configuration());
//logger.debug("Created the parquet " + outputFile); // DEBUG!
} catch (Throwable e) {
logger.error("", e);
return false;
}
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile).withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY).build())
{
//logger.debug("Going to write to \"" + fullFilePath + "\" the record list: " + recordList); // DEBUG!
for ( GenericRecord record : recordList ) {
//logger.debug("Writing to \"" + fullFilePath + "\" the record: " + record); // DEBUG!
writer.write(record);
}
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
logger.error("Problem when creating the \"ParquetWriter\" object or when writing a record with it!", e); // Last time, I got an "NoSuchMethodError", because of a problem in the AvroSchema file: (java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;).
// {"name": "date", "type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]},
return false;
}
//logger.debug("Done writing to \"" + fullFilePath + "\""); // DEBUG!
return true;
}
public String uploadParquetFileToHDFS(String parquetFileFullLocalPath, String parquetFileName, String parquetRemoteDirectory)
{
/*
Create the new file in path.
curl -u <USERNAME> -i -X PUT "https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1/tmp/parquet_uploads/newFile.parquet?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>"
Gives a redirect to a DATANODE.
LOCATION: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE...
Create a new PUT request.
curl -u <USERNAME> -i -X PUT -T <PATH_TO_LOCAL_FILE> "https://<DATANODE>:<PORT>/webhdfs/v1/tmp/parquet_uploads/newFile.parquet?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>"
*/
try {
// Check if the parquet file exists locally.
File parquetFile = new File(parquetFileFullLocalPath);
if ( ! parquetFile.isFile() ) {
String errorMsg = "The parquet file \"" + parquetFileFullLocalPath + "\" does not exist!";
logger.error(errorMsg);
return errorMsg;
}
// https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1/tmp/parquet_uploads/<parquetFileName.parquet>?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>
String parquetFileURI = webHDFSBaseUrl + parquetRemoteDirectory + parquetFileName;
URL url = new URL(parquetFileURI + "?op=CREATE&overwrite=true&blocksize=1048576&permission=777&buffersize=10485760&user.name=" + hdfsUserName);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode != 307 ) {
String errorMsg = "We expected a \"307 TEMPORARY_REDIRECT\" response, but got: \"" + statusCode + "\" instead!";
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return errorMsg;
}
// We get the location in which we were redirected to. It's a "datanode".
String location = conn.getHeaderField("Location");
if ( location == null ) {
String errorMsg = "We expected a \"location\" from the last \"307 TEMPORARY_REDIRECT\" response, but got nothing..";
logger.error(errorMsg + "\n\n" + conn.getHeaderFields());
return errorMsg;
}
//logger.debug("The target location is: " + location + "\nWill do a silent redirect to HTTPS."); // DEBUG!
location = StringUtils.replace(location, "http:", "https:", 1);
// Unless we handle this here, we have to either complicate the process by handling the https-redirect or in any-way getting a hit in performance by having one more step each time ww want to upload a file.
conn = (HttpURLConnection) (new URL(location)).openConnection(); // This already contains the "user.name" parameter.
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setRequestProperty("content-type", "application/octet-stream");
conn.setDoOutput(true);
conn.setInstanceFollowRedirects(true); // It is possible that the "location was"
conn.connect();
// Write the parquet file.
try ( InputStream inputS = Files.newInputStream(parquetFile.toPath()); OutputStream outS = conn.getOutputStream()) {
int readByte = -1; while ( (readByte = inputS.read()) != -1 ) outS.write(readByte);
}
statusCode = conn.getResponseCode();
if ( statusCode != 201 ) {
String errorMsg = "We expected a \"201 Created\" response, but got: \"" + statusCode + "\" instead, for url: " + location;
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return errorMsg;
}
location = conn.getHeaderField("Location"); // The response should have a "location" header as per the official documentation.
if ( location == null ) {
//logger.warn("We expected a \"location\" from the last \"201 Created\" response, but got nothing..\n" + conn.getHeaderFields() + "\n\nIt's fine."); // The "201-Created" response has no content inside.
// Do not return here. The absence of the location is not critical. We can still create it on our own.
location = parquetFileURI; // This location does not include the "user.name" parameter.
}
//logger.debug("The file \"" + parquetFileName + "\" was successfully uploaded. It's location is: " + location); // DEBUG!
// Important note!
// Using the "load data inpath" command, he files are MOVED, not copied! So we don't have to delete them afterwards.
// See: https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_load_data.html
} catch (Throwable e) {
String errorMsg = "Error while importing parquet data into the Impala Database, through WebHDFS!\n" + e;
logger.error(errorMsg);
return errorMsg;
}
return null;
}
public String loadParquetDataIntoTable(String remoteParquetDataDirectory, String tableName)
{
// Import the data from the parquet file into the database's table.
String loadParquetInTableQuery = "load data inpath '" + remoteParquetDataDirectory + "' into table " + ImpalaConnector.databaseName + "." + tableName;
try {
jdbcTemplate.execute(loadParquetInTableQuery);
} catch (Exception e) {
return ImpalaConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged.
}
//logger.debug("The data from \"" + remoteParquetDataDirectory + "\" was loaded into the " + tableName + " table."); // DEBUG!
return null;
}
public boolean createRemoteParquetDirectories(String parquetBaseRemoteDirectory)
{
// Check if the remote directories exist. If so, then return and continue with execution.
// If the directories do not exist, then make them in two requests.
// The WebHDFS uses the "mkdirs" operations which creates all the non-existent directories in the specified path.
// So with one request we will create the "parquet_uploads/" and the "parquet_uploads/payloads/" and with the seconds request, the "parquet_uploads/attempts/" directory.
String mkdirsParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName;
logger.info("Going to check if the remote parquet directories exist.");
String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName;
// Get the "fileStatuses" of the directories (main and subdirectories) in one request.
try {
URL url = new URL(listMainDirectoryUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( (statusCode != 200) && (statusCode != 404) ) {
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + listMainDirectoryUrl;
String retrievedMessage = fileUtils.getMessageFromResponseBody(conn, true);
logger.error(errorMsg + "\n\n" + ((retrievedMessage != null) ? retrievedMessage : ""));
return false;
}
if ( statusCode == 404 ) {
logger.info("The directory \"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkdirsParams);
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkdirsParams);
return true;
}
else {
// Check the json-response, to see if all the subdirectories exist.
// Take the json-response.
String jsonResponse = fileUtils.getMessageFromResponseBody(conn, false);
if ( (jsonResponse == null) || jsonResponse.isEmpty() ) {
logger.error("The \"jsonResponse\" was not retrieved!");
return false;
}
// Else, if an error message exists inside the response, then we will be alerted when parsing the Json bellow.
//logger.debug("\"jsonResponse\":\n" + jsonResponse); // DEBUG!
boolean foundAttemptsDir = false;
boolean foundPayloadsDir = false;
try { // Parse the jsonData
JSONObject jObj = new JSONObject(jsonResponse); // Construct a JSONObject from the retrieved jsonData.
JSONObject entityObject = jObj.getJSONObject("FileStatuses");
//logger.debug("EntityObject: " + entityObject.toString()); // DEBUG!
JSONArray directoryStatuses = entityObject.getJSONArray("FileStatus");
//logger.debug("directoryStatuses: " + directoryStatuses.toString()); // DEBUG!
// In case no fileStatuses are found, the follow for-loop will not run.
for ( Object fileStatusObject : directoryStatuses ) {
JSONObject fileStatusJsonObject = (JSONObject) fileStatusObject;
//logger.debug("FileStatusJsonObject: " + fileStatusJsonObject.toString()); // DEBUG!
String dirPath = fileStatusJsonObject.getString("pathSuffix");
//logger.debug("DirPath: " + dirPath); // DEBUG!
if ( dirPath.equals("attempts") )
foundAttemptsDir = true;
else if ( dirPath.equals("payloads") )
foundPayloadsDir = true;
}
} catch ( JSONException je ) { // In case any of the above "json-keys" was not found.
logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse);
return false;
}
// IMPORTANT NOTE: It is possible that the ".../payloads" dir exists, but the ".../attempts" dir does not (in case of remote filesystem failure of by accidental deletion by some other user).
// Also, it is possible that the Controller was terminated before creating all the directories, or that in the previous executions the second "create"-request failed, resulting in Controller's shut down.
// For each missing subdirectories, run the mkdirs-request.
if ( !foundAttemptsDir ) {
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it.");
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkdirsParams);
} else
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
if ( !foundPayloadsDir ) {
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloads + "\" does not exist! Going to create it.");
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkdirsParams);
} else
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloads + "\" exists.");
}
} catch (Exception e) {
logger.error("", e);
return false;
}
return true;
}
public boolean createHDFSDirectory(String createDirectoryUrl)
{
try {
URL url = new URL(createDirectoryUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode != 200 ) {
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + createDirectoryUrl;
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return false;
}
} catch (Exception e) {
logger.error("", e);
return false;
}
return true;
}
// Use this if we decide to delete undeleted files (probably due to failed "load" attempts). For now, it's better to leave them there, in order to fix potential problems more easily.
public String deleteFileFromHDFS(String fileLocation, String parquetFileName) throws Exception
{
// Delete the file from the temporal storage on HDFS.
HttpURLConnection conn = (HttpURLConnection) (new URL(fileLocation + "?op=DELETE&user.name=" + hdfsUserName)).openConnection();
conn.setRequestMethod("DELETE");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode == 200 ) {
logger.debug("The file \"" + parquetFileName + "\" was successfully deleted."); // DEBUG!
} else {
String errorMsg = "The file \"" + parquetFileName + "\" could not be deleted! Response-code: " + statusCode;
logger.error(errorMsg);
return errorMsg;
}
return null;
}
}