716 lines
41 KiB
Java
716 lines
41 KiB
Java
package eu.openaire.urls_controller.util;
|
|
|
|
import com.google.common.collect.Lists;
|
|
import eu.openaire.urls_controller.configuration.DatabaseConnector;
|
|
import eu.openaire.urls_controller.models.Error;
|
|
import eu.openaire.urls_controller.models.*;
|
|
import org.apache.avro.Schema;
|
|
import org.apache.avro.generic.GenericData;
|
|
import org.apache.avro.generic.GenericRecord;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.parquet.avro.AvroParquetWriter;
|
|
import org.apache.parquet.hadoop.ParquetWriter;
|
|
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
|
import org.apache.parquet.hadoop.util.HadoopOutputFile;
|
|
import org.apache.parquet.io.OutputFile;
|
|
import org.json.JSONArray;
|
|
import org.json.JSONException;
|
|
import org.json.JSONObject;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.core.io.ClassPathResource;
|
|
import org.springframework.http.HttpStatus;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
import java.io.BufferedInputStream;
|
|
import java.io.BufferedOutputStream;
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.URL;
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.nio.file.Files;
|
|
import java.nio.file.Paths;
|
|
import java.sql.SQLException;
|
|
import java.sql.Timestamp;
|
|
import java.util.ArrayList;
|
|
import java.util.Base64;
|
|
import java.util.List;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.Future;
|
|
import java.util.stream.Collectors;
|
|
|
|
@Component
|
|
public class ParquetFileUtils {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(ParquetFileUtils.class);
|
|
|
|
public final String parquetBaseLocalDirectoryPath;
|
|
|
|
// The "@Autowire" does not work in this case (gives an NPE when it's used).
|
|
private final FileUtils fileUtils;
|
|
|
|
@Autowired
|
|
private JdbcTemplate jdbcTemplate;
|
|
|
|
@Value("${hdfs.baseUrl}")
|
|
public String webHDFSBaseUrl;
|
|
|
|
private final String hdfsHttpAuthString;
|
|
|
|
private final String hdfsUserName;
|
|
|
|
|
|
private static final String attemptSchemaFilePath = "schemas/attempt.avsc";
|
|
|
|
public static final String payloadSchemaFilePath = "schemas/payload.avsc";
|
|
|
|
public static Schema payloadsSchema = null;
|
|
public Schema attemptsSchema;
|
|
|
|
public final String parquetHDFSDirectoryPathAttempts;
|
|
|
|
// The "payloads_legacy" are not handled by the Controller (no data is added there), they just exist in the database and are taken into account when performing READ queries to the "payload" VIEW.
|
|
public final String parquetHDFSDirectoryPathPayloadsAggregated;
|
|
public final String parquetHDFSDirectoryPathPayloadsBulkImport;
|
|
|
|
public String mkDirsAndParams;
|
|
|
|
//public String setPermAndParams;
|
|
|
|
public String rmDirsAndParams;
|
|
|
|
|
|
|
|
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
|
|
@Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${services.pdfaggregation.controller.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath,
|
|
@Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir,
|
|
@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, FileUtils fileUtils) throws IOException
|
|
{
|
|
this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName;
|
|
if ( webHDFSBaseUrl.endsWith("/") ) // We don't wand an ending slash in the url (as it causes problems when the file=path is added).
|
|
this.webHDFSBaseUrl = webHDFSBaseUrl.substring(0, (webHDFSBaseUrl.length() -1));
|
|
else
|
|
this.webHDFSBaseUrl = webHDFSBaseUrl;
|
|
|
|
if ( (hdfsUserName != null) && !hdfsUserName.isEmpty() )
|
|
this.hdfsUserName = hdfsUserName; // Set it to be used later as a url-param.
|
|
else
|
|
throw new RuntimeException("No hdfs-username was given!");
|
|
|
|
// If the "hdfsHttpAuthString" is given, then use it right away. In general this is better from "privacy perspective".
|
|
if ( (hdfsHttpAuthString != null) && !hdfsHttpAuthString.isEmpty() )
|
|
this.hdfsHttpAuthString = hdfsHttpAuthString;
|
|
else if ( (hdfsPassword != null) && !hdfsPassword.isEmpty() ) {
|
|
String authValue = (this.hdfsUserName + ":" + hdfsPassword);
|
|
this.hdfsHttpAuthString = ("Basic " + Base64.getEncoder().encodeToString(authValue.getBytes(StandardCharsets.UTF_8)));
|
|
}
|
|
else // At this point, all credential-checks have been made and there is no way the Controller can continue.
|
|
throw new RuntimeException("No hdfs-credentials were given, in any form!");
|
|
|
|
//logger.trace("\"hdfsHttpAuthString\": " + this.hdfsHttpAuthString); // DEBUG!
|
|
|
|
if ( ! parquetBaseDirectoryPath.endsWith(File.separator) )
|
|
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath + File.separator;
|
|
else
|
|
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath;
|
|
|
|
// Create the local parquet file base directory, if it does not exist.
|
|
Files.createDirectories(Paths.get(this.parquetBaseLocalDirectoryPath)); // No-op if dir exists. It does not throw a "alreadyExistsException"
|
|
|
|
// Create the remote directories for uploading the parquet-files, if those directories do not exist.
|
|
// The limited-permissions user in use, does not have permission to access other users' created directories, so we have to make sure it creates its own.
|
|
|
|
if ( !hdfsParquetBaseDir.endsWith("/") )
|
|
hdfsParquetBaseDir += "/";
|
|
|
|
if ( isTestEnvironment ) // Make sure the hdfs-remote-dir is different for running tests, in order to not cause conflicts with production.
|
|
hdfsParquetBaseDir += "test/";
|
|
|
|
this.parquetHDFSDirectoryPathAttempts = hdfsParquetBaseDir + "attempts/";
|
|
this.parquetHDFSDirectoryPathPayloadsAggregated = hdfsParquetBaseDir + "payloads_aggregated/";
|
|
this.parquetHDFSDirectoryPathPayloadsBulkImport = hdfsParquetBaseDir + "payloads_bulk_import/";
|
|
this.fileUtils = fileUtils;
|
|
this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName; // All permissions for user, group and others must be set, in order for this service' user to have access to the hdfs directory.
|
|
//this.setPermAndParams = "?op=SETPERMISSION&permission=777&user.name=" + hdfsUserName;
|
|
this.rmDirsAndParams = "?op=DELETE&recursive=true&user.name=" + hdfsUserName;
|
|
createRemoteParquetDirectories(hdfsParquetBaseDir);
|
|
}
|
|
|
|
|
|
public static Schema parseSchema(String schemaResourcePath) {
|
|
try {
|
|
return (new Schema.Parser()).parse(new ClassPathResource(schemaResourcePath).getInputStream());
|
|
} catch (Throwable e) {
|
|
logger.error("", e);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* The tasks created below must be executed inside a "database-lock" block.
|
|
* */
|
|
public List<Callable<ParquetReport>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOfUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
|
|
{
|
|
if ( (attemptsSchema == null) // Parse the schema if it's not already parsed.
|
|
&& ((attemptsSchema = parseSchema(attemptSchemaFilePath)) == null ) ) {
|
|
logger.error("Nothing can be done without the attemptsSchema! Exiting.."); // The cause is already logged inside the above method.
|
|
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
|
|
}
|
|
|
|
// Pre-define the tasks to run in multiple threads.
|
|
final List<Callable<ParquetReport>> callableTasks = new ArrayList<>(8); // 5 threads will handle the attempts and 3 the payloads.
|
|
|
|
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful )
|
|
{
|
|
List<Payload> initialPayloads = urlReports.parallelStream()
|
|
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
|
|
.collect(Collectors.toList());
|
|
|
|
int numInitialPayloads = initialPayloads.size();
|
|
if ( numInitialPayloads > 0 ) // If at least 1 payload was created by the processed records..
|
|
{ // (it's ok to have no payloads, if there were no full-texts available)
|
|
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved.
|
|
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
|
|
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
|
|
logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method.
|
|
System.exit(89); // Exit the whole app, as it cannot add the results to the database!
|
|
}
|
|
|
|
// The UrlsReports for the pre-filled are added only here, since we do not want attempt records fo these.
|
|
fileUtils.addUrlReportsByMatchingRecordsFromBacklog(urlReports, initialPayloads, numInitialPayloads, curReportAssignments); // This will add more Object in the update the "urlReports" list.
|
|
// In case the above method returns an error, nothing happens. We just have only the initial payloads to insert to the DB.
|
|
|
|
int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.33); // We want 3 sub-lists for the payloads.
|
|
// There may be 1 more with very few elements, due to non-persisted splitting. Unfortunately, we cannot st the number of splits, only the size of most splits.
|
|
if ( sizeOfEachSubList > 10 ) {
|
|
List<List<UrlReport>> finalSubLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
|
|
int numSubListsPayload = finalSubLists.size();
|
|
// We will run <numSubListsPayload> tasks for the payloads.
|
|
for ( int i = 0; i < numSubListsPayload; ++i ) {
|
|
int finalI = i;
|
|
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
|
|
return new ParquetReport(ParquetReport.ParquetType.payload, createPayloadParquetDataAndUploadToHDFS(finalI, finalSubLists.get(finalI), curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/")));
|
|
});
|
|
}
|
|
} else {
|
|
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the payload (sizeOfEachSubList * 5).
|
|
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
|
|
return new ParquetReport(ParquetReport.ParquetType.payload, createPayloadParquetDataAndUploadToHDFS(0, urlReports, curReportAssignments, currentParquetPath, (parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignments + "/")));
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.2); // We want 5 sub-lists for the attempts.
|
|
// There may be 1 more with very few elements, due to non-persisted splitting. Unfortunately, we cannot st the number of splits, only the size of most splits.
|
|
if ( sizeOfEachSubList > 10 ) {
|
|
List<List<UrlReport>> finalSubLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
|
|
// The above will create some sub-lists, each one containing 20% of total amount.
|
|
int numSubListsAttempt = finalSubLists.size();
|
|
// We will run <numSubListsAttempt> tasks for the payloads.
|
|
for ( int i = 0; i < numSubListsAttempt; ++i ) {
|
|
int finalI = i;
|
|
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
|
|
return new ParquetReport(ParquetReport.ParquetType.attempt, createAttemptParquetDataAndUploadToHDFS(finalI, finalSubLists.get(finalI), curReportAssignments, currentParquetPath));
|
|
});
|
|
}
|
|
} else {
|
|
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts (sizeOfEachSubList * 5).
|
|
callableTasks.add(() -> { // Handle inserts to the "attempt" table.
|
|
return new ParquetReport(ParquetReport.ParquetType.attempt, createAttemptParquetDataAndUploadToHDFS(0, urlReports, curReportAssignments, currentParquetPath));
|
|
});
|
|
}
|
|
|
|
return callableTasks;
|
|
}
|
|
|
|
|
|
public boolean createAttemptParquetDataAndUploadToHDFS(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignmentsCounter, String localParquetPath)
|
|
{
|
|
List<GenericData.Record> recordList = new ArrayList<>(urlReports.size());
|
|
GenericData.Record record;
|
|
|
|
for ( UrlReport urlReport : urlReports ) {
|
|
Payload payload = urlReport.getPayload();
|
|
if ( payload == null ) {
|
|
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + GenericUtils.endOfLine + urlReport);
|
|
continue;
|
|
}
|
|
|
|
Error error = urlReport.getError();
|
|
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
|
|
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
|
|
error = new Error(null, null);
|
|
}
|
|
|
|
try {
|
|
record = new GenericData.Record(attemptsSchema);
|
|
record.put("id", payload.getId());
|
|
record.put("original_url", payload.getOriginal_url());
|
|
Timestamp timestamp = payload.getTimestamp_acquired();
|
|
record.put("date", (timestamp != null) ? timestamp.getTime() : System.currentTimeMillis());
|
|
record.put("status", urlReport.getStatus().toString());
|
|
record.put("error_class", String.valueOf(error.getType()));
|
|
record.put("error_message", error.getMessage());
|
|
recordList.add(record);
|
|
} catch (Exception e) {
|
|
logger.error("Failed to create an attempt record!", e);
|
|
}
|
|
}
|
|
|
|
int recordsSize = recordList.size();
|
|
if ( recordsSize == 0 ) {
|
|
logger.warn("No attempts are available to be inserted to the database!");
|
|
return false;
|
|
}
|
|
|
|
String fileName = curReportAssignmentsCounter + "_attempts_" + attemptsIncNum + ".parquet";
|
|
//logger.trace("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG!
|
|
|
|
String fullFilePath = localParquetPath + fileName;
|
|
if ( writeToParquet(recordList, attemptsSchema, fullFilePath) ) {
|
|
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
|
|
|
|
// Upload and insert the data to the "attempt" Impala table.
|
|
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, (parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/"));
|
|
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
|
|
} else
|
|
return false;
|
|
|
|
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
|
|
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
|
|
}
|
|
|
|
|
|
public boolean createPayloadParquetDataAndUploadToHDFS(int payloadsCounter, List<UrlReport> urlReports, long curReportAssignmentsCounter, String localParquetPath, String parquetHDFSDirectoryPathPayloads)
|
|
{
|
|
List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2));
|
|
GenericData.Record record;
|
|
|
|
for ( UrlReport urlReport : urlReports )
|
|
{
|
|
Payload payload = urlReport.getPayload();
|
|
if ( payload == null ) {
|
|
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignmentsCounter + GenericUtils.endOfLine + urlReport);
|
|
continue;
|
|
}
|
|
|
|
String fileLocation = payload.getLocation();
|
|
if ( fileLocation == null ) // We want only the records with uploaded full-texts in the "payload" table.
|
|
continue;
|
|
|
|
Timestamp timestamp = payload.getTimestamp_acquired();
|
|
record = getPayloadParquetRecord(payload.getId(), payload.getOriginal_url(), payload.getActual_url(),
|
|
(timestamp != null) ? timestamp.getTime() : System.currentTimeMillis(),
|
|
payload.getMime_type(), payload.getSize(), payload.getHash(), fileLocation, payload.getProvenance(), false);
|
|
|
|
if ( record != null )
|
|
recordList.add(record);
|
|
}
|
|
|
|
int recordsSize = recordList.size();
|
|
if ( recordsSize == 0 ) {
|
|
logger.warn("No payloads are available to be inserted to the database!");
|
|
return false;
|
|
}
|
|
|
|
String fileName = curReportAssignmentsCounter + "_payloads_" + payloadsCounter + ".parquet";
|
|
//logger.trace("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG!
|
|
|
|
String fullFilePath = localParquetPath + fileName;
|
|
if ( writeToParquet(recordList, payloadsSchema, fullFilePath) ) {
|
|
//logger.trace("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
|
|
|
|
// Upload and insert the data to the "payload" Impala table.
|
|
String errorMsg = uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathPayloads);
|
|
return (errorMsg == null); // The possible error-message returned, is already logged by the Controller.
|
|
} else
|
|
return false;
|
|
|
|
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
|
|
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
|
|
}
|
|
|
|
|
|
public GenericData.Record getPayloadParquetRecord(String id, String original_url, String actual_url, long timeMillis, String mimetype, Long size,
|
|
String hash, String fileLocation, String provenance, boolean isForBulkImport)
|
|
{
|
|
GenericData.Record record;
|
|
try {
|
|
record = new GenericData.Record(payloadsSchema);
|
|
record.put("id", id);
|
|
record.put("original_url", original_url);
|
|
record.put("actual_url", actual_url);
|
|
record.put("date", timeMillis);
|
|
record.put("mimetype", mimetype);
|
|
record.put("size", ((size != null) ? String.valueOf(size) : null));
|
|
record.put("hash", hash);
|
|
record.put("location", fileLocation);
|
|
record.put("provenance", (isForBulkImport ? "bulk:" : "") + provenance);
|
|
// Add the "bulk:" prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW.
|
|
return record;
|
|
} catch (Exception e) {
|
|
logger.error("Failed to create a payload record!", e);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
public boolean writeToParquet(List<GenericData.Record> recordList, Schema schema, String fullFilePath)
|
|
{
|
|
OutputFile outputFile;
|
|
try { // TODO - Verify that this will create any directories which do not exist in the provided path. Currently we create the directories beforehand.
|
|
outputFile = HadoopOutputFile.fromPath(new Path(fullFilePath), new Configuration());
|
|
//logger.trace("Created the parquet " + outputFile); // DEBUG!
|
|
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
|
|
logger.error("", e);
|
|
return false;
|
|
}
|
|
|
|
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile).withSchema(schema)
|
|
.withCompressionCodec(CompressionCodecName.GZIP).build())
|
|
{
|
|
// When the app runs inside a Docker Container, it is NOT guaranteed that all compression-types will work. For example, the "SNAPPY"-compression does NOT work, while the "GZIP" works.
|
|
// Also, we would prefer ZSTD over GZIP, but the old version of the Impala-Database does not support it..
|
|
|
|
//logger.trace("Going to write to \"" + fullFilePath + "\" the record list: " + recordList); // DEBUG!
|
|
for ( GenericRecord record : recordList ) {
|
|
//logger.trace("Writing to \"" + fullFilePath + "\" the record: " + record); // DEBUG!
|
|
writer.write(record);
|
|
}
|
|
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
|
|
String errorMsg = "Problem when creating the \"ParquetWriter\" object or when writing the records with it!";
|
|
if ( e instanceof org.apache.hadoop.fs.FileAlreadyExistsException )
|
|
logger.error(errorMsg + GenericUtils.endOfLine + e.getMessage());
|
|
else
|
|
logger.error(errorMsg, e);
|
|
|
|
// At some point, I got an "NoSuchMethodError", because of a problem in the AvroSchema file: (java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;).
|
|
// The error was with the schema: {"name": "date", "type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]},
|
|
return false;
|
|
}
|
|
|
|
//logger.trace("Done writing to \"" + fullFilePath + "\""); // DEBUG!
|
|
return true;
|
|
}
|
|
|
|
|
|
public String uploadParquetFileToHDFS(String parquetFileFullLocalPath, String parquetFileName, String parquetRemoteDirectory)
|
|
{
|
|
/*
|
|
Create the new file in path.
|
|
curl -u <USERNAME> -i -X PUT "https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1/tmp/parquet_uploads/newFile.parquet?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>"
|
|
Gives a redirect to a DATANODE.
|
|
LOCATION: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE...
|
|
Create a new PUT request.
|
|
curl -u <USERNAME> -i -X PUT -T <PATH_TO_LOCAL_FILE> "https://<DATANODE>:<PORT>/webhdfs/v1/tmp/parquet_uploads/newFile.parquet?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>"
|
|
*/
|
|
|
|
try {
|
|
// Check if the parquet file exists locally.
|
|
File parquetFile = new File(parquetFileFullLocalPath);
|
|
if ( ! parquetFile.isFile() ) {
|
|
String errorMsg = "The local parquet file \"" + parquetFileFullLocalPath + "\" does not exist!";
|
|
logger.error(errorMsg);
|
|
return errorMsg;
|
|
}
|
|
|
|
// https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1/tmp/parquet_uploads/<parquetFileName.parquet>?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>
|
|
String parquetFileURI = webHDFSBaseUrl + parquetRemoteDirectory + parquetFileName;
|
|
|
|
URL url = new URL(parquetFileURI + "?op=CREATE&overwrite=true&blocksize=1048576&permission=777&buffersize=10485760&user.name=" + hdfsUserName);
|
|
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
|
conn.setRequestMethod("PUT");
|
|
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
|
|
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
|
|
conn.connect();
|
|
|
|
int statusCode = conn.getResponseCode();
|
|
if ( statusCode != 307 ) {
|
|
String errorMsg = "We expected a \"307 TEMPORARY_REDIRECT\" response, but got: \"" + statusCode + "\" instead!";
|
|
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
|
|
return errorMsg;
|
|
}
|
|
|
|
// We get the location in which we were redirected to. It's a "datanode".
|
|
String location = conn.getHeaderField("Location");
|
|
if ( location == null ) {
|
|
String errorMsg = "We expected a \"location\" from the last \"307 TEMPORARY_REDIRECT\" response, but got nothing..";
|
|
logger.error(errorMsg + "\n\n" + conn.getHeaderFields());
|
|
return errorMsg;
|
|
}
|
|
//logger.trace("The target location is: " + location + "\nWill do a silent redirect to HTTPS."); // DEBUG!
|
|
|
|
// In case the WebHDFS uses https, then perform the offline redirect to https here (to avoid the live redirection.)
|
|
if ( webHDFSBaseUrl.startsWith("https:") )
|
|
location = StringUtils.replace(location, "http:", "https:", 1);
|
|
// Unless we handle this here, we have to either complicate the process by handling the https-redirect or in any-way getting a hit in performance by having one more step each time ww want to upload a file.
|
|
|
|
conn = (HttpURLConnection) (new URL(location)).openConnection(); // This already contains the "user.name" parameter.
|
|
conn.setRequestMethod("PUT");
|
|
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
|
|
conn.setRequestProperty("content-type", "application/octet-stream");
|
|
conn.setDoOutput(true);
|
|
conn.setInstanceFollowRedirects(true); // It is possible that the "location" was an intermediate one.
|
|
conn.connect();
|
|
|
|
// Upload the parquet file.
|
|
try ( BufferedInputStream inputS = new BufferedInputStream(Files.newInputStream(parquetFile.toPath()), FileUtils.tenMb);
|
|
BufferedOutputStream outS = new BufferedOutputStream(conn.getOutputStream(), FileUtils.tenMb) )
|
|
{
|
|
int readByte = -1;
|
|
while ( (readByte = inputS.read()) != -1 )
|
|
outS.write(readByte);
|
|
} // Any exception will be caught in the end of this method.
|
|
|
|
statusCode = conn.getResponseCode();
|
|
if ( statusCode != 201 ) {
|
|
String errorMsg = "We expected a \"201 Created\" response, but got: \"" + statusCode + "\" instead, for url: " + location;
|
|
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
|
|
return errorMsg;
|
|
}
|
|
|
|
location = conn.getHeaderField("Location"); // The response should have a "location" header as per the official documentation.
|
|
if ( location == null ) {
|
|
//logger.warn("We expected a \"location\" from the last \"201 Created\" response, but got nothing..\n" + conn.getHeaderFields() + "\n\nIt's fine."); // The "201-Created" response has no content inside.
|
|
// Do not return here. The absence of the location is not critical. We can still create it on our own.
|
|
location = parquetFileURI; // This location does not include the "user.name" parameter.
|
|
}
|
|
//logger.trace("The file \"" + parquetFileName + "\" was successfully uploaded. It's location is: " + location); // DEBUG!
|
|
|
|
// Important note!
|
|
// Using the "load data inpath" command, he files are MOVED, not copied! So we don't have to delete them afterwards.
|
|
// See: https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_load_data.html
|
|
} catch (Throwable e) {
|
|
String errorMsg = "Error while uploading parquet file \"" + parquetFileFullLocalPath + "\" to HDFS!\n";
|
|
logger.error(errorMsg, e);
|
|
return errorMsg + e.getMessage();
|
|
}
|
|
|
|
// The local parquet file will be deleted later.
|
|
return null;
|
|
}
|
|
|
|
|
|
public boolean loadParquetDataIntoTable(String remoteParquetDataLocation, String tableName)
|
|
{
|
|
// Import the data from the parquet file into the database's table.
|
|
String loadParquetInTableQuery = "load data inpath '" + remoteParquetDataLocation + "' into table " + DatabaseConnector.databaseName + "." + tableName;
|
|
try {
|
|
jdbcTemplate.execute(loadParquetInTableQuery);
|
|
} catch (Exception e) {
|
|
// Check if this error is related to the files be missing from the HDFS directory.
|
|
Throwable cause = e.getCause();
|
|
if ( cause instanceof SQLException ) { // In this case the "parent" exception is: "org.springframework.jdbc.UncategorizedSQLException".
|
|
String errorMsg = cause.getMessage();
|
|
if ( (errorMsg != null) && errorMsg.contains("contains no visible files") ) {
|
|
logger.error("The \"remoteParquetDataLocation\": \"" + remoteParquetDataLocation + "\" was found empty, when tried to load its content into the \"" + tableName + "\" table!");
|
|
return false; // Since each thread is using a different subDir, by design, This error is unacceptable.
|
|
}
|
|
}
|
|
DatabaseConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged.
|
|
return false;
|
|
}
|
|
//logger.trace("The data from \"" + remoteParquetDataLocation + "\" was loaded into the " + tableName + " table."); // DEBUG!
|
|
return true;
|
|
}
|
|
|
|
|
|
public boolean createRemoteParquetDirectories(String parquetBaseRemoteDirectory)
|
|
{
|
|
// Check if the remote directories exist. If so, then return and continue with execution.
|
|
// If the directories do not exist, then make them in two requests.
|
|
// The WebHDFS uses the "mkdirs" operations which creates all the non-existent directories in the specified path.
|
|
// So with one request we will create the "parquet_uploads/" and the "parquet_uploads/attempts/" and with the seconds request, the "parquet_uploads/payloads/" directory.
|
|
|
|
logger.info("Going to check if the remote parquet directories exist.");
|
|
String initErrMsg = "The remote parquet HDFS-directory ";
|
|
|
|
String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName;
|
|
|
|
boolean attemptCreationSuccessful = true;
|
|
boolean payloadAggregatedCreationSuccessful = true;
|
|
boolean payloadBulkImportCreationSuccessful = true;
|
|
|
|
// Get the "fileStatuses" of the directories (main and subdirectories) in one request.
|
|
try {
|
|
URL url = new URL(listMainDirectoryUrl);
|
|
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
|
conn.setRequestMethod("GET");
|
|
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
|
|
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
|
|
conn.connect();
|
|
int statusCode = conn.getResponseCode();
|
|
if ( (statusCode != 200) && (statusCode != 404) ) {
|
|
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + listMainDirectoryUrl;
|
|
String retrievedMessage = fileUtils.getMessageFromResponseBody(conn, true);
|
|
logger.error(errorMsg + "\n\n" + ((retrievedMessage != null) ? retrievedMessage : ""));
|
|
return false;
|
|
}
|
|
|
|
if ( statusCode == 404 ) {
|
|
logger.info(initErrMsg + "\"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
|
|
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
|
|
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
|
|
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
|
|
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
|
|
// We need all directories to be created in order for the app to function properly!
|
|
}
|
|
|
|
// Check the json-response, to see if all the subdirectories exist.
|
|
|
|
// Take the json-response.
|
|
String jsonResponse = fileUtils.getMessageFromResponseBody(conn, false);
|
|
if ( (jsonResponse == null) || jsonResponse.isEmpty() ) {
|
|
logger.error("The \"jsonResponse\" was not retrieved!");
|
|
return false;
|
|
}
|
|
// Else, if an error message exists inside the response, then we will be alerted when parsing the Json bellow.
|
|
|
|
//logger.trace("\"jsonResponse\":\n" + jsonResponse); // DEBUG!
|
|
|
|
boolean foundAttemptsDir = false;
|
|
boolean foundPayloadsAggregatedDir = false;
|
|
boolean foundPayloadsBulkImportDir = false;
|
|
|
|
try { // Parse the jsonData
|
|
JSONObject jObj = new JSONObject(jsonResponse); // Construct a JSONObject from the retrieved jsonData.
|
|
JSONObject entityObject = jObj.getJSONObject("FileStatuses");
|
|
//logger.trace("EntityObject: " + entityObject.toString()); // DEBUG!
|
|
|
|
JSONArray directoryStatuses = entityObject.getJSONArray("FileStatus");
|
|
//logger.trace("directoryStatuses: " + directoryStatuses.toString()); // DEBUG!
|
|
|
|
// In case no fileStatuses are found, the follow for-loop will not run.
|
|
for ( Object fileStatusObject : directoryStatuses ) {
|
|
JSONObject fileStatusJsonObject = (JSONObject) fileStatusObject;
|
|
//logger.trace("FileStatusJsonObject: " + fileStatusJsonObject.toString()); // DEBUG!
|
|
|
|
String dirPath = fileStatusJsonObject.getString("pathSuffix");
|
|
//logger.trace("DirPath: " + dirPath); // DEBUG!
|
|
|
|
if ( dirPath.equals("attempts") )
|
|
foundAttemptsDir = true;
|
|
else if ( dirPath.equals("payloads_aggregated") )
|
|
foundPayloadsAggregatedDir = true;
|
|
else if ( dirPath.equals("payloads_bulk_import") )
|
|
foundPayloadsBulkImportDir = true;
|
|
else if ( ! dirPath.equals("test") ) // The "test" directory helps with testing the service, without interfering with the production directories.
|
|
logger.warn("Unknown remote parquet HDFS-directory found: " + dirPath);
|
|
}
|
|
} catch (JSONException je) { // In case any of the above "json-keys" was not found.
|
|
logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse);
|
|
return false;
|
|
}
|
|
|
|
// IMPORTANT NOTE: It is possible that the ".../attempts" dir exists, but the ".../payloads" dir does not and vise-versa (in case of remote filesystem failure of by accidental deletion by some other user).
|
|
// Also, it is possible that the Controller was terminated before creating all the directories, or that in the previous executions the second "create"-request failed, resulting in Controller's shut down.
|
|
|
|
// For each missing subdirectories, run the mkDirs-request.
|
|
if ( !foundAttemptsDir ) {
|
|
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it.");
|
|
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
|
|
} else
|
|
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
|
|
|
|
if ( !foundPayloadsAggregatedDir ) {
|
|
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it.");
|
|
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
|
|
} else
|
|
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists.");
|
|
|
|
if ( !foundPayloadsBulkImportDir ) {
|
|
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it.");
|
|
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
|
|
} else
|
|
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists.");
|
|
|
|
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
|
|
// We need all directories to be created in order for the app to function properly!
|
|
} catch (Exception e) {
|
|
logger.error("", e);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
public boolean applyHDFOperation(String hdfsOperationUrl)
|
|
{
|
|
try {
|
|
URL url = new URL(hdfsOperationUrl);
|
|
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
|
conn.setRequestMethod(hdfsOperationUrl.contains("DELETE") ? "DELETE" : "PUT");
|
|
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
|
|
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
|
|
conn.connect();
|
|
int statusCode = conn.getResponseCode();
|
|
if ( statusCode == -1 ) {
|
|
logger.error("Problem when getting the \"status-code\" for url: " + hdfsOperationUrl);
|
|
return false;
|
|
}
|
|
else if ( statusCode != 200 ) {
|
|
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + hdfsOperationUrl;
|
|
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
|
|
return false;
|
|
}
|
|
if ( logger.isTraceEnabled() )
|
|
logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + GenericUtils.endOfLine + fileUtils.getMessageFromResponseBody(conn, false));
|
|
} catch (Exception e) {
|
|
logger.error("", e);
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
public SumParquetSuccess checkParquetFilesSuccess(List<Future<ParquetReport>> futures)
|
|
{
|
|
int numOfAllAttemptParquetFileCreations = 0;
|
|
int numOfFailedAttemptParquetFileCreations = 0;
|
|
|
|
int numOfAllPayloadParquetFileCreations = 0;
|
|
int numOfFailedPayloadParquetFileCreations = 0;
|
|
|
|
for ( Future<ParquetReport> future : futures )
|
|
{
|
|
ParquetReport parquetReport = null;
|
|
try {
|
|
parquetReport = future.get();
|
|
boolean hasProblems = (! parquetReport.isSuccessful());
|
|
ParquetReport.ParquetType parquetType = parquetReport.getParquetType();
|
|
if ( parquetType.equals(ParquetReport.ParquetType.attempt) ) {
|
|
numOfAllAttemptParquetFileCreations++;
|
|
if ( hasProblems )
|
|
numOfFailedAttemptParquetFileCreations++;
|
|
} else if ( parquetType.equals(ParquetReport.ParquetType.payload) ) {
|
|
numOfAllPayloadParquetFileCreations ++;
|
|
if ( hasProblems )
|
|
numOfFailedPayloadParquetFileCreations ++;
|
|
} else {
|
|
String errMsg = "An invalid \"ParquetReport.ParquetType\" was found: " + parquetType; // This should never happen, but anyway.
|
|
logger.error(errMsg);
|
|
return new SumParquetSuccess(false, false, ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errMsg));
|
|
}
|
|
} catch (Exception e) {
|
|
logger.error("", e);
|
|
// We do not know if the failed "future" refers to a "payload" or to a "attempt".
|
|
// So we cannot increase a specific counter. That's ok, the only drawback if that we may try to "load" the non-existent data and get an exception.
|
|
}
|
|
} // End-for
|
|
|
|
boolean hasAttemptParquetFileProblem = (numOfFailedAttemptParquetFileCreations == numOfAllAttemptParquetFileCreations);
|
|
boolean hasPayloadParquetFileProblem = (numOfFailedPayloadParquetFileCreations == numOfAllPayloadParquetFileCreations);
|
|
|
|
return new SumParquetSuccess(hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, null);
|
|
}
|
|
|
|
}
|