- Simplify the creation of local directories.

- Improve exception messages.
This commit is contained in:
Lampros Smyrnaios 2023-04-28 14:58:33 +03:00
parent 55ea5118ac
commit 49662319a1
2 changed files with 18 additions and 16 deletions

View File

@ -224,19 +224,16 @@ public class UrlsServiceImpl implements UrlsService {
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
}
else
} else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments);
String currentParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignments + File.separator;
java.nio.file.Path parquetDirPath = Paths.get(currentParquetPath);
if ( !Files.isDirectory(parquetDirPath) ) {
try {
Files.createDirectories(parquetDirPath);
} catch (Exception e) {
logger.error("", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage());
}
try {
Files.createDirectories(Paths.get(currentParquetPath)); // No-op if it already exists. It does not throw a "alreadyExistsException"
} catch (Exception e) {
String errorMsg = "Could not create the parquet-directory: " + currentParquetPath;
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments);

View File

@ -113,9 +113,7 @@ public class ParquetFileUtils {
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath;
// Create the local parquet file base directory, if it does not exist.
java.nio.file.Path parquetDirPath = Paths.get(this.parquetBaseLocalDirectoryPath);
if ( !Files.isDirectory(parquetDirPath) )
Files.createDirectories(parquetDirPath);
Files.createDirectories(Paths.get(this.parquetBaseLocalDirectoryPath)); // No-op if dir exists. It does not throw a "alreadyExistsException"
// Create the remote directories for uploading the parquet-files, if those directories do not exist.
// The limited-permissions user in use, does not have permission to access other users' created directories, so we have to make sure it creates its own.
@ -318,8 +316,8 @@ public class ParquetFileUtils {
OutputFile outputFile;
try {
outputFile = HadoopOutputFile.fromPath(new Path(fullFilePath), new Configuration());
//logger.debug("Created the parquet " + outputFile); // DEBUG!
} catch (Throwable e) {
//logger.trace("Created the parquet " + outputFile); // DEBUG!
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
logger.error("", e);
return false;
}
@ -336,7 +334,12 @@ public class ParquetFileUtils {
writer.write(record);
}
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
logger.error("Problem when creating the \"ParquetWriter\" object or when writing the records with it!", e);
String errorMsg = "Problem when creating the \"ParquetWriter\" object or when writing the records with it!";
if ( e instanceof org.apache.hadoop.fs.FileAlreadyExistsException )
logger.error(errorMsg + "\n" + e.getMessage());
else
logger.error(errorMsg, e);
// At some point, I got an "NoSuchMethodError", because of a problem in the AvroSchema file: (java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;).
// The error was with the schema: {"name": "date", "type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]},
return false;
@ -434,6 +437,7 @@ public class ParquetFileUtils {
return errorMsg;
}
// The local parquet file will be deleted later.
return null;
}
@ -638,6 +642,7 @@ public class ParquetFileUtils {
// Use this if we decide to delete undeleted files (probably due to failed "load" attempts). For now, it's better to leave them there, in order to fix potential problems more easily.
// Also, the leftover files will be automatically be loaded to the table in the next "load" attempt, since we make one "load" operation with the whole directory and multiple loads, one for each file.
public String deleteFileFromHDFS(String fileLocation, String parquetFileName) throws Exception
{
// Delete the file from the temporal storage on HDFS.