UrlsController/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java

840 lines
51 KiB
Java

package eu.openaire.urls_controller.util;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.io.OutputFile;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.ClassPathResource;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@Component
public class ParquetFileUtils {
private static final Logger logger = LoggerFactory.getLogger(ParquetFileUtils.class);
public final String parquetBaseLocalDirectoryPath;
// The "@Autowire" does not work in this case (gives an NPE when it's used).
private final FileUtils fileUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Value("${hdfs.baseUrl}")
public String webHDFSBaseUrl;
private final String hdfsHttpAuthString;
private final String hdfsUserName;
private static final String attemptSchemaFilePath = "schemas/attempt.avsc";
public static final String payloadSchemaFilePath = "schemas/payload.avsc";
public static Schema payloadsSchema = null;
public Schema attemptsSchema;
public final String parquetHDFSDirectoryPathAttempts;
// The "payloads_legacy" are not handled by the Controller (no data is added there), they just exist in the database and are taken into account when performing READ queries to the "payload" VIEW.
public final String parquetHDFSDirectoryPathPayloadsAggregated;
public final String parquetHDFSDirectoryPathPayloadsBulkImport;
public String mkDirsAndParams;
//public String setPermAndParams;
public String rmDirsAndParams;
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
@Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${services.pdfaggregation.controller.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath,
@Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir,
@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, FileUtils fileUtils) throws IOException
{
this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName;
if ( webHDFSBaseUrl.endsWith("/") ) // We don't wand an ending slash in the url (as it causes problems when the file=path is added).
this.webHDFSBaseUrl = webHDFSBaseUrl.substring(0, (webHDFSBaseUrl.length() -1));
else
this.webHDFSBaseUrl = webHDFSBaseUrl;
if ( (hdfsUserName != null) && !hdfsUserName.isEmpty() )
this.hdfsUserName = hdfsUserName; // Set it to be used later as a url-param.
else
throw new RuntimeException("No hdfs-username was given!");
// If the "hdfsHttpAuthString" is given, then use it right away. In general this is better from "privacy perspective".
if ( (hdfsHttpAuthString != null) && !hdfsHttpAuthString.isEmpty() )
this.hdfsHttpAuthString = hdfsHttpAuthString;
else if ( (hdfsPassword != null) && !hdfsPassword.isEmpty() ) {
String authValue = (this.hdfsUserName + ":" + hdfsPassword);
this.hdfsHttpAuthString = ("Basic " + Base64.getEncoder().encodeToString(authValue.getBytes(StandardCharsets.UTF_8)));
}
else // At this point, all credential-checks have been made and there is no way the Controller can continue.
throw new RuntimeException("No hdfs-credentials were given, in any form!");
//logger.trace("\"hdfsHttpAuthString\": " + this.hdfsHttpAuthString); // DEBUG!
if ( ! parquetBaseDirectoryPath.endsWith(File.separator) )
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath + File.separator;
else
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath;
// Create the local parquet file base directory, if it does not exist.
Files.createDirectories(Paths.get(this.parquetBaseLocalDirectoryPath)); // No-op if dir exists. It does not throw a "alreadyExistsException"
// Create the remote directories for uploading the parquet-files, if those directories do not exist.
// The limited-permissions user in use, does not have permission to access other users' created directories, so we have to make sure it creates its own.
if ( !hdfsParquetBaseDir.endsWith("/") )
hdfsParquetBaseDir += "/";
if ( isTestEnvironment ) // Make sure the hdfs-remote-dir is different for running tests, in order to not cause conflicts with production.
hdfsParquetBaseDir += "test/";
this.parquetHDFSDirectoryPathAttempts = hdfsParquetBaseDir + "attempts/";
this.parquetHDFSDirectoryPathPayloadsAggregated = hdfsParquetBaseDir + "payloads_aggregated/";
this.parquetHDFSDirectoryPathPayloadsBulkImport = hdfsParquetBaseDir + "payloads_bulk_import/";
this.fileUtils = fileUtils;
this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName; // All permissions for user, group and others must be set, in order for this service' user to have access to the hdfs directory.
//this.setPermAndParams = "?op=SETPERMISSION&permission=777&user.name=" + hdfsUserName;
this.rmDirsAndParams = "?op=DELETE&recursive=true&user.name=" + hdfsUserName;
createRemoteParquetDirectories(hdfsParquetBaseDir);
}
public static Schema parseSchema(String schemaResourcePath) {
try {
return (new Schema.Parser()).parse(new ClassPathResource(schemaResourcePath).getInputStream());
} catch (Throwable e) {
logger.error("", e);
return null;
}
}
/**
* The tasks created below must be executed inside a "database-lock" block.
* */
public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
{
if ( (attemptsSchema == null) // Parse the schema if it's not already parsed.
&& ((attemptsSchema = parseSchema(attemptSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the attemptsSchema! Exiting.."); // The cause is already logged inside the above method.
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
}
// Pre-define the tasks to run in multiple threads.
final List<Callable<ParquetReport>> callableTasks = new ArrayList<>(8); // 5 threads will handle the attempts and 3 the payloads.
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful )
{
List<Payload> initialPayloads = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.collect(Collectors.toList());
int numInitialPayloads = initialPayloads.size();
if ( numInitialPayloads > 0 ) // If at least 1 payload was created by the processed records..
{ // (it's ok to have no payloads, if there were no full-texts available)
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved FROM the worker.
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method.
System.exit(89); // Exit the whole app, as it cannot add the results to the database!
}
// The UrlsReports for the pre-filled are added only here, since we do not want attempt records fo these.
fileUtils.addUrlReportsByMatchingRecordsFromBacklog(urlReports, initialPayloads, numInitialPayloads, curReportAssignments); // This will add more Object in the update the "urlReports" list.
// In case the above method returns an error, nothing happens. We just have only the initial payloads to insert to the DB.
int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.33); // We want 3 sub-lists for the payloads.
if ( sizeOfEachSubList == 0 ) // If the "sizeOfUrlReports" is <= 3.
sizeOfEachSubList = 1;
// There may be 1 more with very few elements, due to non-persisted splitting. Unfortunately, we cannot st the number of splits, only the size of most splits.
if ( sizeOfEachSubList > 10 ) {
List<List<UrlReport>> finalSubLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
int numSubListsPayload = finalSubLists.size();
// We will run <numSubListsPayload> tasks for the payloads.
// Since the payloads are not evenly destributed in the urlReports, there may be some sub-lists of urlReports without any payloads inside.
for ( int i = 0; i < numSubListsPayload; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
return new ParquetReport(ParquetReport.ParquetType.payload, createPayloadParquetDataAndUploadToHDFS(finalI, finalSubLists.get(finalI), curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/")));
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the payload (sizeOfEachSubList * 5).
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
return new ParquetReport(ParquetReport.ParquetType.payload, createPayloadParquetDataAndUploadToHDFS(0, urlReports, curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/")));
});
}
}
}
int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.2); // We want 5 sub-lists for the attempts.
// There may be 1 more with very few elements, due to non-persisted splitting. Unfortunately, we cannot st the number of splits, only the size of most splits.
if ( sizeOfEachSubList > 10 ) {
List<List<UrlReport>> finalSubLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
// The above will create some sub-lists, each one containing 20% of total amount.
int numSubListsAttempt = finalSubLists.size();
// We will run <numSubListsAttempt> tasks for the payloads.
for ( int i = 0; i < numSubListsAttempt; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAttemptParquetDataAndUploadToHDFS(finalI, finalSubLists.get(finalI), curReportAssignments, currentParquetPath));
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts (sizeOfEachSubList * 5).
callableTasks.add(() -> { // Handle inserts to the "attempt" table.
return new ParquetReport(ParquetReport.ParquetType.attempt, createAttemptParquetDataAndUploadToHDFS(0, urlReports, curReportAssignments, currentParquetPath));
});
}
return callableTasks;
}
public boolean createAttemptParquetDataAndUploadToHDFS(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignmentsCounter, String localParquetPath)
{
List<GenericData.Record> recordList = new ArrayList<>(urlReports.size());
GenericData.Record record;
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + GenericUtils.endOfLine + urlReport);
continue;
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try {
record = new GenericData.Record(attemptsSchema);
record.put("id", payload.getId());
record.put("original_url", payload.getOriginal_url());
Timestamp timestamp = payload.getTimestamp_acquired();
record.put("date", (timestamp != null) ? timestamp.getTime() : System.currentTimeMillis());
record.put("status", urlReport.getStatus().toString());
record.put("error_class", String.valueOf(error.getType()));
record.put("error_message", error.getMessage());
recordList.add(record);
} catch (Exception e) {
logger.error("Failed to create an attempt record!", e);
}
}
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.error("No attempt-parquet-records could be created in order to be inserted to the database!");
return false;
}
String fileName = curReportAssignmentsCounter + "_attempts_" + attemptsIncNum + ".parquet";
//logger.trace("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = localParquetPath + fileName;
if ( writeToParquet(recordList, attemptsSchema, fullFilePath) ) {
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "attempt" Impala table.
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/"));
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
} else
return false;
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
}
public boolean createPayloadParquetDataAndUploadToHDFS(int payloadsCounter, List<UrlReport> urlReports, long curReportAssignmentsCounter, String localParquetPath, String parquetHDFSDirectoryPathPayloads)
{
List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2));
GenericData.Record record;
int numPayloadsInsideUrlReports = 0;
for ( UrlReport urlReport : urlReports )
{
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + GenericUtils.endOfLine + urlReport);
continue;
}
String fileLocation = payload.getLocation();
if ( fileLocation == null ) // We want only the records with uploaded full-texts in the "payload" table.
continue;
numPayloadsInsideUrlReports ++;
Timestamp timestamp = payload.getTimestamp_acquired();
record = getPayloadParquetRecord(payload.getId(), payload.getOriginal_url(), payload.getActual_url(),
(timestamp != null) ? timestamp.getTime() : System.currentTimeMillis(),
payload.getMime_type(), payload.getSize(), payload.getHash(), fileLocation, payload.getProvenance(), false);
if ( record != null )
recordList.add(record);
}
if ( numPayloadsInsideUrlReports == 0 )
return true; // This urlsReports-sublist does not have any payloads inside to use. That's fine.
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.error("No payload-parquet-records could be created in order to be inserted to the database!");
return false;
}
String fileName = curReportAssignmentsCounter + "_payloads_" + payloadsCounter + ".parquet";
//logger.trace("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = localParquetPath + fileName;
if ( writeToParquet(recordList, payloadsSchema, fullFilePath) ) {
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "payload" Impala table.
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathPayloads);
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
} else
return false;
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
}
public GenericData.Record getPayloadParquetRecord(String id, String original_url, String actual_url, long timeMillis, String mimetype, Long size,
String hash, String fileLocation, String provenance, boolean isForBulkImport)
{
GenericData.Record record;
try {
record = new GenericData.Record(payloadsSchema);
record.put("id", id);
record.put("original_url", original_url);
record.put("actual_url", actual_url);
record.put("date", timeMillis);
record.put("mimetype", mimetype);
record.put("size", ((size != null) ? String.valueOf(size) : null));
record.put("hash", hash);
record.put("location", fileLocation);
record.put("provenance", (isForBulkImport ? "bulk:" : "") + provenance);
// Add the "bulk:" prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW.
return record;
} catch (Exception e) {
logger.error("Failed to create a payload record!", e);
return null;
}
}
public boolean writeToParquet(List<GenericData.Record> recordList, Schema schema, String fullFilePath)
{
OutputFile outputFile;
try { // TODO - Verify that this will create any directories which do not exist in the provided path. Currently we create the directories beforehand.
outputFile = HadoopOutputFile.fromPath(new Path(fullFilePath), new Configuration());
//logger.trace("Created the parquet " + outputFile); // DEBUG!
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
logger.error("", e);
return false;
}
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile).withSchema(schema)
.withCompressionCodec(CompressionCodecName.GZIP).build())
{
// When the app runs inside a Docker Container, it is NOT guaranteed that all compression-types will work. For example, the "SNAPPY"-compression does NOT work, while the "GZIP" works.
// Also, we would prefer ZSTD over GZIP, but the old version of the Impala-Database does not support it..
//logger.trace("Going to write to \"" + fullFilePath + "\" the record list: " + recordList); // DEBUG!
for ( GenericRecord record : recordList ) {
//logger.trace("Writing to \"" + fullFilePath + "\" the record: " + record); // DEBUG!
writer.write(record);
}
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
String errorMsg = "Problem when creating the \"ParquetWriter\" object or when writing the records with it!";
if ( e instanceof org.apache.hadoop.fs.FileAlreadyExistsException )
logger.error(errorMsg + GenericUtils.endOfLine + e.getMessage());
else
logger.error(errorMsg, e);
// At some point, I got an "NoSuchMethodError", because of a problem in the AvroSchema file: (java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;).
// The error was with the schema: {"name": "date", "type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]},
return false;
}
//logger.trace("Done writing to \"" + fullFilePath + "\""); // DEBUG!
return true;
}
public String uploadParquetFileToHDFS(String parquetFileFullLocalPath, String parquetFileName, String parquetRemoteDirectory)
{
/*
Create the new file in path.
curl -u <USERNAME> -i -X PUT "https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1/tmp/parquet_uploads/newFile.parquet?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>"
Gives a redirect to a DATANODE.
LOCATION: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE...
Create a new PUT request.
curl -u <USERNAME> -i -X PUT -T <PATH_TO_LOCAL_FILE> "https://<DATANODE>:<PORT>/webhdfs/v1/tmp/parquet_uploads/newFile.parquet?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>"
*/
try {
// Check if the parquet file exists locally.
File parquetFile = new File(parquetFileFullLocalPath);
if ( ! parquetFile.isFile() ) {
String errorMsg = "The local parquet file \"" + parquetFileFullLocalPath + "\" does not exist!";
logger.error(errorMsg);
return errorMsg;
}
// https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1/tmp/parquet_uploads/<parquetFileName.parquet>?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>
String parquetFileURI = webHDFSBaseUrl + parquetRemoteDirectory + parquetFileName;
URL url = new URL(parquetFileURI + "?op=CREATE&overwrite=true&blocksize=1048576&permission=777&buffersize=10485760&user.name=" + hdfsUserName);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode != 307 ) {
String errorMsg = "We expected a \"307 TEMPORARY_REDIRECT\" response, but got: \"" + statusCode + "\" instead!";
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return errorMsg;
}
// We get the location in which we were redirected to. It's a "datanode".
String location = conn.getHeaderField("Location");
if ( location == null ) {
String errorMsg = "We expected a \"location\" from the last \"307 TEMPORARY_REDIRECT\" response, but got nothing..";
logger.error(errorMsg + "\n\n" + conn.getHeaderFields());
return errorMsg;
}
//logger.trace("The target location is: " + location + "\nWill do a silent redirect to HTTPS."); // DEBUG!
// In case the WebHDFS uses https, then perform the offline redirect to https here (to avoid the live redirection.)
if ( webHDFSBaseUrl.startsWith("https:") )
location = StringUtils.replace(location, "http:", "https:", 1);
// Unless we handle this here, we have to either complicate the process by handling the https-redirect or in any-way getting a hit in performance by having one more step each time ww want to upload a file.
conn = (HttpURLConnection) (new URL(location)).openConnection(); // This already contains the "user.name" parameter.
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setRequestProperty("content-type", "application/octet-stream");
conn.setDoOutput(true);
conn.setInstanceFollowRedirects(true); // It is possible that the "location" was an intermediate one.
conn.connect();
// Upload the parquet file.
try ( BufferedInputStream inputS = new BufferedInputStream(Files.newInputStream(parquetFile.toPath()), FileUtils.tenMb);
BufferedOutputStream outS = new BufferedOutputStream(conn.getOutputStream(), FileUtils.tenMb) )
{
int readByte = -1;
while ( (readByte = inputS.read()) != -1 )
outS.write(readByte);
} // Any exception will be caught in the end of this method.
statusCode = conn.getResponseCode();
if ( statusCode != 201 ) {
String errorMsg = "We expected a \"201 Created\" response, but got: \"" + statusCode + "\" instead, for url: " + location;
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return errorMsg;
}
location = conn.getHeaderField("Location"); // The response should have a "location" header as per the official documentation.
if ( location == null ) {
//logger.warn("We expected a \"location\" from the last \"201 Created\" response, but got nothing..\n" + conn.getHeaderFields() + "\n\nIt's fine."); // The "201-Created" response has no content inside.
// Do not return here. The absence of the location is not critical. We can still create it on our own.
location = parquetFileURI; // This location does not include the "user.name" parameter.
}
//logger.trace("The file \"" + parquetFileName + "\" was successfully uploaded. It's location is: " + location); // DEBUG!
// Important note!
// Using the "load data inpath" command, he files are MOVED, not copied! So we don't have to delete them afterwards.
// See: https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_load_data.html
} catch (Throwable e) {
String errorMsg = "Error while uploading parquet file \"" + parquetFileFullLocalPath + "\" to HDFS!\n";
logger.error(errorMsg, e);
return errorMsg + e.getMessage();
}
// The local parquet file will be deleted later.
return null;
}
public boolean loadParquetDataIntoTable(String remoteParquetDataLocation, String tableName)
{
// Import the data from the parquet file into the database's table.
String loadParquetInTableQuery = "load data inpath '" + remoteParquetDataLocation + "' into table " + DatabaseConnector.databaseName + "." + tableName;
try {
jdbcTemplate.execute(loadParquetInTableQuery);
} catch (Exception e) {
// Check if this error is related to the files be missing from the HDFS directory.
Throwable cause = e.getCause();
if ( cause instanceof SQLException ) { // In this case the "parent" exception is: "org.springframework.jdbc.UncategorizedSQLException".
String errorMsg = cause.getMessage();
if ( (errorMsg != null) && errorMsg.contains("contains no visible files") ) {
logger.error("The \"remoteParquetDataLocation\": \"" + remoteParquetDataLocation + "\" was found empty, when tried to load its content into the \"" + tableName + "\" table!");
return false; // Since each thread is using a different subDir, by design, This error is unacceptable.
}
}
DatabaseConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged.
return false;
}
//logger.trace("The data from \"" + remoteParquetDataLocation + "\" was loaded into the " + tableName + " table."); // DEBUG!
return true;
}
public boolean createRemoteParquetDirectories(String parquetBaseRemoteDirectory)
{
// Check if the remote directories exist. If so, then return and continue with execution.
// If the directories do not exist, then make them in two requests.
// The WebHDFS uses the "mkdirs" operations which creates all the non-existent directories in the specified path.
// So with one request we will create the "parquet_uploads/" and the "parquet_uploads/attempts/" and with the seconds request, the "parquet_uploads/payloads/" directory.
logger.info("Going to check if the remote parquet directories exist.");
String initErrMsg = "The remote parquet HDFS-directory ";
String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName;
boolean attemptCreationSuccessful = true;
boolean payloadAggregatedCreationSuccessful = true;
boolean payloadBulkImportCreationSuccessful = true;
// Get the "fileStatuses" of the directories (main and subdirectories) in one request.
try {
URL url = new URL(listMainDirectoryUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( (statusCode != 200) && (statusCode != 404) ) {
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + listMainDirectoryUrl;
String retrievedMessage = fileUtils.getMessageFromResponseBody(conn, true);
logger.error(errorMsg + "\n\n" + ((retrievedMessage != null) ? retrievedMessage : ""));
return false;
}
if ( statusCode == 404 ) {
logger.info(initErrMsg + "\"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
attemptCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
payloadAggregatedCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
payloadBulkImportCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
// We need all directories to be created in order for the app to function properly!
}
// Check the json-response, to see if all the subdirectories exist.
// Take the json-response.
String jsonResponse = fileUtils.getMessageFromResponseBody(conn, false);
if ( (jsonResponse == null) || jsonResponse.isEmpty() ) {
logger.error("The \"jsonResponse\" was not retrieved!");
return false;
}
// Else, if an error message exists inside the response, then we will be alerted when parsing the Json bellow.
//logger.trace("\"jsonResponse\":\n" + jsonResponse); // DEBUG!
boolean foundAttemptsDir = false;
boolean foundPayloadsAggregatedDir = false;
boolean foundPayloadsBulkImportDir = false;
try { // Parse the jsonData
JSONObject jObj = new JSONObject(jsonResponse); // Construct a JSONObject from the retrieved jsonData.
JSONObject entityObject = jObj.getJSONObject("FileStatuses");
//logger.trace("EntityObject: " + entityObject.toString()); // DEBUG!
JSONArray directoryStatuses = entityObject.getJSONArray("FileStatus");
//logger.trace("directoryStatuses: " + directoryStatuses.toString()); // DEBUG!
// In case no fileStatuses are found, the follow for-loop will not run.
for ( Object fileStatusObject : directoryStatuses ) {
JSONObject fileStatusJsonObject = (JSONObject) fileStatusObject;
//logger.trace("FileStatusJsonObject: " + fileStatusJsonObject.toString()); // DEBUG!
String dirPath = fileStatusJsonObject.getString("pathSuffix");
//logger.trace("DirPath: " + dirPath); // DEBUG!
if ( dirPath.equals("attempts") )
foundAttemptsDir = true;
else if ( dirPath.equals("payloads_aggregated") )
foundPayloadsAggregatedDir = true;
else if ( dirPath.equals("payloads_bulk_import") )
foundPayloadsBulkImportDir = true;
else if ( ! dirPath.equals("test") ) // The "test" directory helps with testing the service, without interfering with the production directories.
logger.warn("Unknown remote parquet HDFS-directory found: " + dirPath);
}
} catch (JSONException je) { // In case any of the above "json-keys" was not found.
logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse);
return false;
}
// IMPORTANT NOTE: It is possible that the ".../attempts" dir exists, but the ".../payloads" dir does not and vise-versa (in case of remote filesystem failure of by accidental deletion by some other user).
// Also, it is possible that the Controller was terminated before creating all the directories, or that in the previous executions the second "create"-request failed, resulting in Controller's shut down.
// For each missing subdirectories, run the mkDirs-request.
if ( !foundAttemptsDir ) {
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it.");
attemptCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
} else
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
if ( !foundPayloadsAggregatedDir ) {
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it.");
payloadAggregatedCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
} else
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists.");
if ( !foundPayloadsBulkImportDir ) {
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it.");
payloadBulkImportCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
} else
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists.");
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
// We need all directories to be created in order for the app to function properly!
} catch (Exception e) {
logger.error("", e);
return false;
}
}
public boolean applyHDFSOperation(String hdfsOperationUrl)
{
try {
URL url = new URL(hdfsOperationUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(hdfsOperationUrl.contains("DELETE") ? "DELETE" : "PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode == -1 ) {
logger.error("Problem when getting the \"status-code\" for url: " + hdfsOperationUrl);
return false;
}
else if ( statusCode != 200 ) {
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + hdfsOperationUrl;
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return false;
}
if ( logger.isTraceEnabled() )
logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + GenericUtils.endOfLine + fileUtils.getMessageFromResponseBody(conn, false));
} catch (Exception e) {
logger.error("", e);
return false;
}
return true;
}
public SumParquetSuccess checkParquetFilesSuccess(List<Future<ParquetReport>> futures)
{
int numOfAllAttemptParquetFileCreations = 0;
int numOfFailedAttemptParquetFileCreations = 0;
int numOfAllPayloadParquetFileCreations = 0;
int numOfFailedPayloadParquetFileCreations = 0;
int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i )
{
try {
ParquetReport parquetReport = futures.get(i).get();
boolean hasProblems = (! parquetReport.isSuccessful());
ParquetReport.ParquetType parquetType = parquetReport.getParquetType();
if ( parquetType.equals(ParquetReport.ParquetType.attempt) ) {
numOfAllAttemptParquetFileCreations++;
if ( hasProblems )
numOfFailedAttemptParquetFileCreations++;
} else if ( parquetType.equals(ParquetReport.ParquetType.payload) ) {
numOfAllPayloadParquetFileCreations ++;
if ( hasProblems )
numOfFailedPayloadParquetFileCreations ++;
} else {
String errMsg = "An invalid \"ParquetReport.ParquetType\" was found: " + parquetType; // This should never happen, but anyway.
logger.error(errMsg);
return new SumParquetSuccess(false, false, ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errMsg));
}
} catch (ExecutionException ee) { // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error(GenericUtils.getSelectedStackTraceForCausedException(ee, "Parquet_task_" + i + " failed with: ", null, 15));
} catch (CancellationException ce) {
logger.error("Parquet_task_" + i + " was cancelled: " + ce.getMessage());
} catch (InterruptedException ie) {
logger.error("Parquet_task_" + i + " was interrupted: " + ie.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for parquet_task_" + i + " in the futures-list! " + ioobe.getMessage());
}
// We do not know if the failed "future" refers to a "payload" or to a "attempt".
// So we cannot increase a specific counter. That's ok, the only drawback if that we may try to "load" the non-existent data and get an exception.
} // End-for
boolean hasAttemptParquetFileProblem = (numOfFailedAttemptParquetFileCreations == numOfAllAttemptParquetFileCreations);
boolean hasPayloadParquetFileProblem = (numOfFailedPayloadParquetFileCreations == numOfAllPayloadParquetFileCreations);
return new SumParquetSuccess(hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, null);
}
/**
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
* Renames the clone to the original's name.
* Returns the errorMsg, if an error appears, otherwise is returns "null".
*/
public String mergeParquetFilesOfTable(String tableName, String whereClause, Object parameter) {
String errorMsg;
if ( (tableName == null) || tableName.isEmpty() ) {
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
logger.error(errorMsg);
return errorMsg; // Return the error-msg to indicate that something went wrong and pass it down to the Worker.
}
// Make sure the following are empty strings.
whereClause = (whereClause != null) ? (whereClause + " ") : "";
if ( parameter == null )
parameter = "";
else if ( parameter instanceof String )
parameter = "'" + parameter + "'"; // This will be a "string-check", thus the single-quotes.
// Else it is a "long", it will be used as is.
// Create a temp-table as a copy of the initial table.
try {
jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + " " + whereClause + parameter);
} catch (Exception e) {
errorMsg = "Problem when copying the contents of \"" + tableName + "\" table to a newly created \"" + tableName + "_tmp\" table, when merging the parquet-files!\n";
logger.error(errorMsg, e);
try { // Make sure we delete the possibly half-created temp-table.
jdbcTemplate.execute("DROP TABLE IF EXISTS " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
// We cannot move on with merging, but no harm happened, since the "table_tmp" name is still reserved for future use (after it was dropped immediately)..
} catch (Exception e1) {
logger.error("Failed to drop the \"" + tableName + "_tmp\" table!", e1);
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
return errorMsg; // We return only the initial error to the Worker, which is easily distinguished indie the "merge-queries".
}
// Drop the initial table.
try {
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + " PURGE");
} catch (Exception e) {
errorMsg = "Problem when dropping the initial \"" + tableName + "\" table, when merging the parquet-files!\n";
logger.error(errorMsg, e);
// The original table could not be dropped, so the temp-table cannot be renamed to the original..!
try { // Make sure we delete the already created temp-table, in order to be able to use it in the future. The merging has failed nevertheless.
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
} catch (Exception e1) {
logger.error((errorMsg += "Failed to drop the \"" + tableName + "_tmp\" table!"), e1); // Add this error to the original, both are very important.
}
// Here, the original table is created.
return errorMsg;
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
// Rename the temp-table to have the initial-table's name.
try {
jdbcTemplate.execute("ALTER TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp RENAME TO " + DatabaseConnector.databaseName + "." + tableName);
} catch (Exception e) {
errorMsg = "Problem in renaming the \"" + tableName + "_tmp\" table to \"" + tableName + "\", when merging the parquet-files!\n";
logger.error(errorMsg, e);
// At this point we only have a "temp-table", the original is already deleted..
// Try to create the original, as a copy of the temp-table. If that succeeds, then try to delete the temp-table.
try {
jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + " stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + "_tmp");
} catch (Exception e1) {
errorMsg = "Problem when copying the contents of \"" + tableName + "_tmp\" table to a newly created \"" + tableName + "\" table, when merging the parquet-files!\n";
logger.error(errorMsg, e1);
// If the original table was not created, then we have to intervene manually, if it was created but without any data, then we can safely move on handling other assignments and workerReports, but the data will be lost! So this workerReport failed to be handled.
try { // The below query normally returns a list, as it takes a "regex-pattern" as an input. BUT, we give just the table name, without wildcards. So the result is either the tableName itself or none (not any other table).
jdbcTemplate.queryForObject("SHOW TABLES IN " + DatabaseConnector.databaseName + " LIKE '" + tableName + "'", List.class);
} catch (EmptyResultDataAccessException erdae) {
// The table does not exist, so it was not even half-created by the previous query.
// Not having the original table anymore is a serious error. A manual action is needed!
logger.error((errorMsg += "The original table \"" + tableName + "\" must be created manually! Serious problems may appear otherwise!"));
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and fix it immediately to avoid other errors in the Service)..!
}
// Here, the original-table exists in the DB, BUT without any data inside! This workerReport failed to be handled! (some of its data could not be loaded to the database, and all previous data was lost).
return errorMsg;
}
// The creation of the original table was successful. Try to delete the temp-table.
try {
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
} catch (Exception e2) {
logger.error((errorMsg += "Problem when dropping the \"" + tableName + "_tmp\" table, when merging the parquet-files!\n"), e2);
// Manual deletion should be performed!
return errorMsg; // Return both errors here, as the second is so important that if it did not happen then we could move on with this workerReport.
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
// Here the original table exists and the temp-table is deleted. We eventually have the same state as if the "ALTER TABLE" succeeded.
}
// Gather information to be used for queries-optimization.
try {
jdbcTemplate.execute("COMPUTE STATS " + DatabaseConnector.databaseName + "." + tableName);
} catch (Exception e) {
logger.error("Problem when gathering information from table \"" + tableName + "\" to be used for queries-optimization.", e);
// In this case the error is not so important to the whole operation.. It's only that the performance of this specific table will be less optimal, only temporarily, unless every "COMPUTE STATS" query fails for future workerReports too.
}
return null; // No errorMsg, everything is fine.
}
}