forked from lsmyrnaios/UrlsController
- Fix not finding the parquet-schema files when the app was run inside a Docker Container.
- Update the "namespaces" and the "names" inside the parquet schemas. - Code polishing.
This commit is contained in:
parent
95c38c4a24
commit
c8baf5a5fc
|
@ -54,7 +54,7 @@ public class ImpalaConnector {
|
||||||
private void createDatabase()
|
private void createDatabase()
|
||||||
{
|
{
|
||||||
if ( isTestEnvironment ) {
|
if ( isTestEnvironment ) {
|
||||||
logger.info("Going to create (if not exist) the test-database \"" + testDatabaseName + "\" and its tables. Also will fill some tables with data from initial-database \"" + initialDatabaseName + "\".");
|
logger.info("Going to create (if not exist) the test-database \"" + testDatabaseName + "\" and its tables. Also will fill some tables with data from the initial-database \"" + initialDatabaseName + "\".");
|
||||||
jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + testDatabaseName);
|
jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + testDatabaseName);
|
||||||
|
|
||||||
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication stored as parquet as select * from " + initialDatabaseName + ".publication");
|
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication stored as parquet as select * from " + initialDatabaseName + ".publication");
|
||||||
|
|
|
@ -184,10 +184,12 @@ public class FileUtils {
|
||||||
|
|
||||||
// Request the full-texts in batches, compressed in zip.
|
// Request the full-texts in batches, compressed in zip.
|
||||||
int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch);
|
int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch);
|
||||||
if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
|
int remainingFiles = (numAllFullTexts % numOfFullTextsPerBatch);
|
||||||
numOfBatches ++;
|
if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
|
||||||
|
numOfBatches++;
|
||||||
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches.");
|
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
|
||||||
|
} else
|
||||||
|
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
|
||||||
|
|
||||||
// Check if one full text is left out because of the division. Put it int the last batch.
|
// Check if one full text is left out because of the division. Put it int the last batch.
|
||||||
String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
|
String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@ -63,11 +64,9 @@ public class ParquetFileUtils {
|
||||||
private final String hdfsUserName;
|
private final String hdfsUserName;
|
||||||
|
|
||||||
|
|
||||||
@Value("${schema.payload.filePath}")
|
private final String payloadSchemaFilePath = "schemas/payload.avsc";
|
||||||
String payloadSchemaFilePath;
|
|
||||||
|
|
||||||
@Value("${schema.attempt.filePath}")
|
private final String attemptSchemaFilePath = "schemas/attempt.avsc";
|
||||||
String attemptSchemaFilePath;
|
|
||||||
|
|
||||||
public Schema payloadsSchema;
|
public Schema payloadsSchema;
|
||||||
public Schema attemptsSchema;
|
public Schema attemptsSchema;
|
||||||
|
@ -78,7 +77,6 @@ public class ParquetFileUtils {
|
||||||
public final String parquetHDFSDirectoryPathPayloads;
|
public final String parquetHDFSDirectoryPathPayloads;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
|
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
|
||||||
@Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${output.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath,
|
@Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${output.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath,
|
||||||
@Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir, FileUtils fileUtils) throws IOException
|
@Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir, FileUtils fileUtils) throws IOException
|
||||||
|
@ -111,9 +109,8 @@ public class ParquetFileUtils {
|
||||||
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath;
|
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath;
|
||||||
|
|
||||||
java.nio.file.Path parquetDirPath = Paths.get(this.parquetBaseLocalDirectoryPath);
|
java.nio.file.Path parquetDirPath = Paths.get(this.parquetBaseLocalDirectoryPath);
|
||||||
if ( !Files.isDirectory(parquetDirPath) ) {
|
if ( !Files.isDirectory(parquetDirPath) )
|
||||||
Files.createDirectories(parquetDirPath);
|
Files.createDirectories(parquetDirPath);
|
||||||
}
|
|
||||||
|
|
||||||
// Create the remote directories for uploading the parquet-files, if those directories do not exist.
|
// Create the remote directories for uploading the parquet-files, if those directories do not exist.
|
||||||
// The limited-permissions user in use, does not have permission to acces other users' created directories, so we have to make sure it creates its own.
|
// The limited-permissions user in use, does not have permission to acces other users' created directories, so we have to make sure it creates its own.
|
||||||
|
@ -128,9 +125,9 @@ public class ParquetFileUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Schema parseSchema(String schemaFilePath) {
|
public Schema parseSchema(String schemaResourcePath) {
|
||||||
try {
|
try {
|
||||||
return (new Schema.Parser()).parse(Files.newInputStream(Paths.get(schemaFilePath)));
|
return (new Schema.Parser()).parse(new ClassPathResource(schemaResourcePath).getInputStream());
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
logger.error("", e);
|
logger.error("", e);
|
||||||
return null;
|
return null;
|
||||||
|
@ -172,7 +169,7 @@ public class ParquetFileUtils {
|
||||||
{
|
{
|
||||||
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
|
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
|
||||||
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
|
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
|
||||||
logger.error("Nothing can be done without the payloadsSchema. Exiting.."); // The cause is already logged inside the above method.
|
logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method.
|
||||||
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
|
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
|
||||||
}
|
}
|
||||||
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
|
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
|
||||||
|
@ -183,7 +180,7 @@ public class ParquetFileUtils {
|
||||||
|
|
||||||
if ( (attemptsSchema == null) // Parse the schema if it's not already parsed.
|
if ( (attemptsSchema == null) // Parse the schema if it's not already parsed.
|
||||||
&& ((attemptsSchema = parseSchema(attemptSchemaFilePath)) == null ) ) {
|
&& ((attemptsSchema = parseSchema(attemptSchemaFilePath)) == null ) ) {
|
||||||
logger.error("Nothing can be done without the attemptsSchema. Exiting.."); // The cause is already logged inside the above method.
|
logger.error("Nothing can be done without the attemptsSchema! Exiting.."); // The cause is already logged inside the above method.
|
||||||
System.exit(89); // Exit the whole app, as it cannot add the results to the database!
|
System.exit(89); // Exit the whole app, as it cannot add the results to the database!
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,8 +322,9 @@ public class ParquetFileUtils {
|
||||||
writer.write(record);
|
writer.write(record);
|
||||||
}
|
}
|
||||||
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
|
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
|
||||||
logger.error("Problem when creating the \"ParquetWriter\" object or when writing a record with it!", e); // Last time, I got an "NoSuchMethodError", because of a problem in the AvroSchema file: (java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;).
|
logger.error("Problem when creating the \"ParquetWriter\" object or when writing a record with it!", e);
|
||||||
// {"name": "date", "type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]},
|
// At some point, I got an "NoSuchMethodError", because of a problem in the AvroSchema file: (java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;).
|
||||||
|
// The error was with the schema: {"name": "date", "type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]},
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,8 +60,6 @@ hdfs.httpAuth=
|
||||||
hdfs.userName=
|
hdfs.userName=
|
||||||
hdfs.password=
|
hdfs.password=
|
||||||
|
|
||||||
schema.payload.filePath=src/main/resources/schemas/payload.avsc
|
|
||||||
schema.attempt.filePath=src/main/resources/schemas/attempt.avsc
|
|
||||||
output.parquetLocalDirectoryPath=parquetFiles/
|
output.parquetLocalDirectoryPath=parquetFiles/
|
||||||
hdfs.parquetRemoteBaseDirectoryPath=/tmp/parquet_uploads/
|
hdfs.parquetRemoteBaseDirectoryPath=/tmp/parquet_uploads/
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
{
|
{
|
||||||
"type": "record",
|
"type": "record",
|
||||||
"namespace": "UrlsController",
|
"namespace": "eu.openaire.urls_controller",
|
||||||
"name": "Attempt",
|
"name": "eu.openaire.urls_controller.Attempt",
|
||||||
"fields": [
|
"fields": [
|
||||||
{"name": "id", "type": "string"},
|
{"name": "id", "type": "string"},
|
||||||
{"name": "original_url", "type": "string"},
|
{"name": "original_url", "type": "string"},
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
{
|
{
|
||||||
"type": "record",
|
"type": "record",
|
||||||
"namespace": "UrlsController",
|
"namespace": "eu.openaire.urls_controller",
|
||||||
"name": "Payload",
|
"name": "eu.openaire.urls_controller.Payload",
|
||||||
"fields": [
|
"fields": [
|
||||||
{"name": "id", "type": "string"},
|
{"name": "id", "type": "string"},
|
||||||
{"name": "original_url", "type": "string"},
|
{"name": "original_url", "type": "string"},
|
||||||
|
|
Loading…
Reference in New Issue