From c8baf5a5fc7ad558bfa6112de149f46d47e6cd07 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Thu, 8 Dec 2022 12:16:05 +0200 Subject: [PATCH] - Fix not finding the parquet-schema files when the app was run inside a Docker Container. - Update the "namespaces" and the "names" inside the parquet schemas. - Code polishing. --- .../configuration/ImpalaConnector.java | 2 +- .../urls_controller/util/FileUtils.java | 10 ++++---- .../util/ParquetFileUtils.java | 24 +++++++++---------- src/main/resources/application.properties | 2 -- src/main/resources/schemas/attempt.avsc | 4 ++-- src/main/resources/schemas/payload.avsc | 4 ++-- 6 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java index fe1e7a6..12b51ca 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -54,7 +54,7 @@ public class ImpalaConnector { private void createDatabase() { if ( isTestEnvironment ) { - logger.info("Going to create (if not exist) the test-database \"" + testDatabaseName + "\" and its tables. Also will fill some tables with data from initial-database \"" + initialDatabaseName + "\"."); + logger.info("Going to create (if not exist) the test-database \"" + testDatabaseName + "\" and its tables. Also will fill some tables with data from the initial-database \"" + initialDatabaseName + "\"."); jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + testDatabaseName); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication stored as parquet as select * from " + initialDatabaseName + ".publication"); diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index c57c5c4..d951e1b 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -184,10 +184,12 @@ public class FileUtils { // Request the full-texts in batches, compressed in zip. int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch); - if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are. - numOfBatches ++; - - logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches."); + int remainingFiles = (numAllFullTexts % numOfFullTextsPerBatch); + if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are. + numOfBatches++; + logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files)."); + } else + logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each)."); // Check if one full text is left out because of the division. Put it int the last batch. String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index 7002dc8..ae945e0 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.io.ClassPathResource; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @@ -63,11 +64,9 @@ public class ParquetFileUtils { private final String hdfsUserName; - @Value("${schema.payload.filePath}") - String payloadSchemaFilePath; + private final String payloadSchemaFilePath = "schemas/payload.avsc"; - @Value("${schema.attempt.filePath}") - String attemptSchemaFilePath; + private final String attemptSchemaFilePath = "schemas/attempt.avsc"; public Schema payloadsSchema; public Schema attemptsSchema; @@ -78,7 +77,6 @@ public class ParquetFileUtils { public final String parquetHDFSDirectoryPathPayloads; - public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl, @Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${output.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath, @Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir, FileUtils fileUtils) throws IOException @@ -111,9 +109,8 @@ public class ParquetFileUtils { this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath; java.nio.file.Path parquetDirPath = Paths.get(this.parquetBaseLocalDirectoryPath); - if ( !Files.isDirectory(parquetDirPath) ) { + if ( !Files.isDirectory(parquetDirPath) ) Files.createDirectories(parquetDirPath); - } // Create the remote directories for uploading the parquet-files, if those directories do not exist. // The limited-permissions user in use, does not have permission to acces other users' created directories, so we have to make sure it creates its own. @@ -128,9 +125,9 @@ public class ParquetFileUtils { } - public Schema parseSchema(String schemaFilePath) { + public Schema parseSchema(String schemaResourcePath) { try { - return (new Schema.Parser()).parse(Files.newInputStream(Paths.get(schemaFilePath))); + return (new Schema.Parser()).parse(new ClassPathResource(schemaResourcePath).getInputStream()); } catch (Throwable e) { logger.error("", e); return null; @@ -172,7 +169,7 @@ public class ParquetFileUtils { { if ( (payloadsSchema == null) // Parse the schema if it's not already parsed. && ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) { - logger.error("Nothing can be done without the payloadsSchema. Exiting.."); // The cause is already logged inside the above method. + logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method. System.exit(88); // Exit the whole app, as it cannot add the results to the database! } callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount. @@ -183,7 +180,7 @@ public class ParquetFileUtils { if ( (attemptsSchema == null) // Parse the schema if it's not already parsed. && ((attemptsSchema = parseSchema(attemptSchemaFilePath)) == null ) ) { - logger.error("Nothing can be done without the attemptsSchema. Exiting.."); // The cause is already logged inside the above method. + logger.error("Nothing can be done without the attemptsSchema! Exiting.."); // The cause is already logged inside the above method. System.exit(89); // Exit the whole app, as it cannot add the results to the database! } @@ -325,8 +322,9 @@ public class ParquetFileUtils { writer.write(record); } } catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING! - logger.error("Problem when creating the \"ParquetWriter\" object or when writing a record with it!", e); // Last time, I got an "NoSuchMethodError", because of a problem in the AvroSchema file: (java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;). - // {"name": "date", "type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]}, + logger.error("Problem when creating the \"ParquetWriter\" object or when writing a record with it!", e); + // At some point, I got an "NoSuchMethodError", because of a problem in the AvroSchema file: (java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;). + // The error was with the schema: {"name": "date", "type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]}, return false; } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 0c32dc6..6131be9 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -60,8 +60,6 @@ hdfs.httpAuth= hdfs.userName= hdfs.password= -schema.payload.filePath=src/main/resources/schemas/payload.avsc -schema.attempt.filePath=src/main/resources/schemas/attempt.avsc output.parquetLocalDirectoryPath=parquetFiles/ hdfs.parquetRemoteBaseDirectoryPath=/tmp/parquet_uploads/ diff --git a/src/main/resources/schemas/attempt.avsc b/src/main/resources/schemas/attempt.avsc index cd6fe8a..34a1387 100644 --- a/src/main/resources/schemas/attempt.avsc +++ b/src/main/resources/schemas/attempt.avsc @@ -1,7 +1,7 @@ { "type": "record", - "namespace": "UrlsController", - "name": "Attempt", + "namespace": "eu.openaire.urls_controller", + "name": "eu.openaire.urls_controller.Attempt", "fields": [ {"name": "id", "type": "string"}, {"name": "original_url", "type": "string"}, diff --git a/src/main/resources/schemas/payload.avsc b/src/main/resources/schemas/payload.avsc index 07b90a3..52cef93 100644 --- a/src/main/resources/schemas/payload.avsc +++ b/src/main/resources/schemas/payload.avsc @@ -1,7 +1,7 @@ { "type": "record", - "namespace": "UrlsController", - "name": "Payload", + "namespace": "eu.openaire.urls_controller", + "name": "eu.openaire.urls_controller.Payload", "fields": [ {"name": "id", "type": "string"}, {"name": "original_url", "type": "string"},