- Upgrade the results-loading process: Instead of making thousands of sql-insert requests to Impala now we write the results to parquet files, upload them to HDFS and then import the data into the Impala tables with just 2 requests. This results in a huge performance improvement.

One side effect of using the parquet-files, is that the timestamps are now BIGDECIMAL numbers, instead of "Timestamp" objects, but, converting them to such objects is pretty easy, if we ever need to do it.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2022-11-10 17:18:21 +02:00
parent 6a03103b79
commit 6226e2298d
11 changed files with 908 additions and 158 deletions

View File

@ -49,17 +49,70 @@ dependencies {
// https://mvnrepository.com/artifact/com.cloudera.impala/jdbc
implementation("com.cloudera.impala:jdbc:2.5.31") {
exclude group: 'org.apache.hive', module: 'hive-exec'
exclude group: 'com.twitter', module: 'parquet-hadoop-bundle'
exclude group: 'org.apache.parquet', module: 'parquet-avro'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'org.apache.derby', module: 'derby'
exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all'
exclude group: 'ch.qos.log4j', module: 'log4j'
exclude group: 'ch.qos.log4j', module: 'apache-log4j-extras'
// Vulnerable dependencies:
exclude group: 'log4j', module: 'log4j'
exclude group: 'log4j', module: 'apache-log4j-extras'
exclude group: 'org.apache.ant', module: 'ant'
exclude group: 'org.apache.thrift', module: 'libthrift' // This is an older version (we add the updated one later).
exclude group: 'org.apache.hive', module: 'hive-metastore'
// Avoid excluding 'org.apache.hive:hive-service', as this is needed and unfortunately, even adding a newer version separately, it introducing other vulnerable dependencies.
}
// Add back some updated version of the needed dependencies.
implementation 'org.apache.thrift:libthrift:0.17.0'
// https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro
implementation('org.apache.parquet:parquet-avro:1.12.3')
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common
implementation('org.apache.hadoop:hadoop-common:3.3.4') {
exclude group: 'org.apache.parquet', module: 'parquet-avro'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-api'
exclude group: 'org.slf4j', module: 'slf4j-reload4j'
exclude group: 'ch.qos.reload4j', module: 'reload4j'
// Vulnerable dependencies:
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.codehaus.jackson', module: 'jackson-core-asl'
exclude group: 'org.codehaus.jackson', module: 'jackson-mapper-asl'
//exclude group: 'commons-collections', module: 'commons-collections' // This dependency is required in order for the program to run without errors.
}
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core
implementation('org.apache.hadoop:hadoop-mapreduce-client-core:3.3.4') {
exclude group: 'org.apache.parquet', module: 'parquet-avro'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-api'
exclude group: 'org.slf4j', module: 'slf4j-reload4j'
exclude group: 'ch.qos.reload4j', module: 'reload4j'
// Vulnerable dependencies:
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'io.netty', module: 'netty'
}
// https://mvnrepository.com/artifact/org.json/json
implementation 'org.json:json:20220924'
testImplementation group: 'org.springframework.security', name: 'spring-security-test'
testImplementation "org.springframework.boot:spring-boot-starter-test"
}
configurations.implementation {
exclude group: 'com.twitter', module: 'parquet-hadoop-bundle'
}
// Set increased lower and upper limits for the java-execution.
tasks.withType(JavaExec) {
jvmArgs = ['-Xms512m', '-Xmx8g']

View File

@ -39,7 +39,7 @@ public class ImpalaConnector {
@PostConstruct
public void init() {
logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes.");
logger.info("Max available memory to the Controller: " + (Runtime.getRuntime().maxMemory() / 1024) + " Mb.");
try {
boolean supportsBatchUpdates = jdbcTemplate.getDataSource().getConnection().getMetaData().supportsBatchUpdates();
logger.info("The database " + (supportsBatchUpdates ? "supports" : "does not support") + " \"BatchUpdates\"!");
@ -79,13 +79,13 @@ public class ImpalaConnector {
// Drop the "current_assignment" table. It is a temporary table which is created on-demand during execution.
jdbcTemplate.execute("DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".assignment (id string, original_url string, workerid string, `date` bigint) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".assignment");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` bigint, status string, error_class string, error_message string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` timestamp, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload");
logger.info("The " + (isTestEnvironment ? "test-" : "") + "database \"" + databaseName + "\" and its tables were created or validated.");

View File

@ -1,43 +0,0 @@
package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* This controller will test the connectivity with the database and return statistics!
*/
@RestController
@RequestMapping("/impala")
public class ImpalaController {
private static final Logger logger = LoggerFactory.getLogger(ImpalaController.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@GetMapping("get10PublicationIdsTest")
public ResponseEntity<?> get10PublicationIdsTest() {
String query = "SELECT id FROM " + ImpalaConnector.databaseName + ".publication LIMIT 10;";
try {
List<String> publications = jdbcTemplate.queryForList(query, String.class);
return new ResponseEntity<>(publications.toString(), HttpStatus.OK);
} catch (Exception e) {
String errorMsg = "Problem when executing \"getAssignmentsQuery\": " + query;
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
}
}

View File

@ -1,10 +1,12 @@
package eu.openaire.urls_controller.controllers;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Assignment;
import eu.openaire.urls_controller.models.Datasource;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import eu.openaire.urls_controller.util.TestFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -12,11 +14,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.File;
import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
@ -28,12 +32,19 @@ public class TestController {
private static final Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private ParquetFileUtils parquetFileUtils;
@Autowired
private TestFileUtils testFileUtils;
@Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentLimit;
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
@ -85,4 +96,36 @@ public class TestController {
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
@GetMapping("get10PublicationIdsTest")
public ResponseEntity<?> get10PublicationIdsTest() {
String query = "SELECT id FROM " + ImpalaConnector.databaseName + ".publication LIMIT 10;";
try {
List<String> publications = jdbcTemplate.queryForList(query, String.class);
return new ResponseEntity<>(publications.toString(), HttpStatus.OK);
} catch (Exception e) {
String errorMsg = "Problem when executing \"getAssignmentsQuery\": " + query;
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
}
@GetMapping("parquet_upload")
public ResponseEntity<?> uploadParquetFile() {
logger.debug("We got a \"parquet_upload\" request.");
String parquetFileName = "1_attempts_0.parquet";
String parquetFileFullPath = System.getProperty("user.dir") + File.separator + parquetFileName;
String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(parquetFileFullPath, parquetFileName, parquetFileUtils.parquetHDFSDirectoryPathAttempts);
if ( errorMsg != null ) // The error-message is already logged by the Controller.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
return ResponseEntity.ok().build();
}
}

View File

@ -1,12 +1,14 @@
package eu.openaire.urls_controller.controllers;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.models.Assignment;
import eu.openaire.urls_controller.models.Datasource;
import eu.openaire.urls_controller.models.UrlReport;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.apache.commons.io.FileDeleteStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -17,7 +19,14 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.sql.*;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@ -40,7 +49,10 @@ public class UrlController {
@Autowired
private FileUtils fileUtils;
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
@Autowired
private ParquetFileUtils parquetFileUtils;
public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
@Value("${services.pdfaggregation.controller.assignmentLimit}")
@ -131,7 +143,8 @@ public class UrlController {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
long timestampMillis = System.currentTimeMillis();
Timestamp timestamp = new Timestamp(timestampMillis); // Store it here, in order to have the same for all current records.
try {
jdbcTemplate.query(getAssignmentsQuery, rs -> {
@ -174,12 +187,9 @@ public class UrlController {
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
// The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party.
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', " + timestampMillis + "\n"
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
try {
jdbcTemplate.execute(insertAssignmentsQuery);
@ -266,87 +276,77 @@ public class UrlController {
else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments);
// Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons.
final String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
final int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
final String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
final int[] attemptArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
final AtomicInteger failedQueriesCount = new AtomicInteger(0);
// Split the "UrlReports" into some sub-lists
int sizeOfEachSubList = (int)(sizeOUrlReports * 0.2);
List<List<UrlReport>> subLists = Lists.partition(urlReports, sizeOfEachSubList);
// The above will create some sub-lists, each one containing 20% of total amount.
List<Callable<Void>> callableTasks = new ArrayList<>(6);
// One thread will handle the inserts to the "payload" table adn the other to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala).
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
String fileLocation = payload.getLocation();
if ( fileLocation == null ) // We want only the records with uploaded full-texts in the "payload" table.
continue;
try {
Long size = payload.getSize();
Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(),
payload.getMime_type(), (size != null) ? String.valueOf(size) : null, payload.getHash(), fileLocation, payload.getProvenance()};
jdbcTemplate.update(insertIntoPayloadBaseQuery, args, payloadArgTypes);
} catch (Exception e) {
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + e.getMessage());
failedCount.incrementAndGet();
}
String currentParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + assignmentsBatchCounter.get() + File.separator;
java.nio.file.Path parquetDirPath = Paths.get(currentParquetPath);
if ( !Files.isDirectory(parquetDirPath) ) {
try {
Files.createDirectories(parquetDirPath);
} catch (Exception e) {
logger.error("", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage());
}
return null;
});
int subListsSize = subLists.size();
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
runInsertsToAttemptTable(subLists.get(finalI), curReportAssignments, insertIntoAttemptBaseQuery, attemptArgTypes, failedCount);
return null;
});
}
ImpalaConnector.databaseLock.lock();
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_" + curReportAssignments);
List<Callable<Void>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOUrlReports, curReportAssignments, currentParquetPath, uploadFullTextsResponse);
String errorMsgAttempts = null, errorMsgPayloads = null;
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
insertsExecutor.invokeAll(callableTasks);
// Load all the parquet files of each type into its table.
ImpalaConnector.databaseLock.lock();
// Important note: It may be possible for a thread to acquire the lock and load its own and another thread's data into the table.
// So when the other thread acquire the lock, it will load ZERO data.
// That's ok, and we do not need to add any check if the remote data exist, since this process happens in milliseconds. (so a few milliseconds will be wasted for no data)
errorMsgAttempts = parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt");
errorMsgPayloads = parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload");
ImpalaConnector.databaseLock.unlock();
if ( (errorMsgAttempts == null) && (errorMsgPayloads == null) )
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
else
logger.warn("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or/and the \"payload\" tables, for batch-assignments_" + curReportAssignments);
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// This is a very rare casa. At the moment, we just move on with table-merging.
// This is a very rare case. At the moment, we just move on with table-merging.
} catch (Exception e) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage();
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
try {
logger.debug("Deleting directory: " + currentParquetPath);
FileDeleteStrategy.FORCE.delete(new File(currentParquetPath));
} catch (IOException e) {
logger.error("", e);
}
}
int failedQueries = failedQueriesCount.get();
String failedQueriesMsg = failedQueries + " out of " + (sizeOUrlReports *2) + " failed to be processed!";
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables" + ((failedQueries > 0) ? (", although " + failedQueriesMsg) : ".")
+ " Going to merge the parquet files for those tables.");
logger.debug("Going to merge the parquet files for the tables which were altered.");
// When the uploaded parquet files are "loaded" into the tables, ther are actually moved into the directory which contains the data of the table.
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
String mergeErrorMsg;
ImpalaConnector.databaseLock.lock();
if ( errorMsgPayloads == null ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
}
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
if ( errorMsgAttempts == null ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
}
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
@ -357,14 +357,14 @@ public class UrlController {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.databaseLock.unlock();
logger.debug("Finished merging the database tables.");
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful )
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body("The full-text files failed to be acquired from the worker!\n" + failedQueriesMsg);
return ResponseEntity.status(HttpStatus.MULTI_STATUS).body("The full-text files failed to be acquired from the worker!");
else
return ResponseEntity.status(HttpStatus.OK).body(failedQueriesMsg);
return ResponseEntity.status(HttpStatus.OK).build();
}
@ -379,33 +379,6 @@ public class UrlController {
}
private void runInsertsToAttemptTable(List<UrlReport> urlReports, long curReportAssignments, String insertIntoAttemptBaseQuery, int[] attemptArgTypes, AtomicInteger failedCount )
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
Object[] args = new Object[] {payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(),
urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()};
jdbcTemplate.update(insertIntoAttemptBaseQuery, args, attemptArgTypes);
} catch (Exception e) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + e.getMessage());
failedCount.incrementAndGet();
}
}
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {

View File

@ -0,0 +1,87 @@
package eu.openaire.urls_controller.models;
public class Attempt {
private String id;
private String original_url;
private long dateMillis;
private String status;
private String error_class;
private String error_message;
public Attempt(String id, String original_url, long dateMillis, String status, String error_class, String error_message) {
this.id = id;
this.original_url = original_url;
this.dateMillis = dateMillis;
this.status = status;
this.error_class = error_class;
this.error_message = error_message;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getOriginal_url() {
return original_url;
}
public void setOriginal_url(String original_url) {
this.original_url = original_url;
}
public long getDateMillis() {
return dateMillis;
}
public void setDateMillis(long dateMillis) {
this.dateMillis = dateMillis;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getError_class() {
return error_class;
}
public void setError_class(String error_class) {
this.error_class = error_class;
}
public String getError_message() {
return error_message;
}
public void setError_message(String error_message) {
this.error_message = error_message;
}
@Override
public String toString() {
return "Attempt{" +
"id='" + id + '\'' +
", original_url='" + original_url + '\'' +
", dateMillis=" + dateMillis +
", status='" + status + '\'' +
", error_class='" + error_class + '\'' +
", error_message='" + error_message + '\'' +
'}';
}
}

View File

@ -88,7 +88,7 @@ public class FileUtils {
public static DecimalFormat df = new DecimalFormat("0.00");
// The following regex might be usefull in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension". TODO - It may even be merged with the above regex.
// The following regex might be usefull in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension".
// Possible full-filenames are: "path1/path2/ID.pdf", "ID2.pdf", "path1/path2/ID(12).pdf", "ID2(25).pdf"
private static final Pattern FILENAME_ID_EXTENSION = Pattern.compile("(?:[^.()]+/)?((([^./()]+)[^./]*)(\\.[\\w]{2,10}))$");
@ -149,7 +149,7 @@ public class FileUtils {
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
numFullTextsFound ++;
continue; // Do not request the file from the worker, it's already uploaded. Move on.
continue; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
}
}
@ -282,7 +282,7 @@ public class FileUtils {
// This file is related with some payloads, in a sense that these payloads have urls which lead to the same full-text url.
// These payloads might have different IDs and sourceUrls. But, in the end, the different sourceUrls give the same full-text.
// Below, we make sure we pick the database from the payload which has the same id as the full-text's name.
// Below, we make sure we pick the "datasource" from the payload, which has the same id as the full-text's name.
// If there are multiple payloads with the same id, which point to the same file, then we can take whatever datasource we want from those payloads.
// It is possible that payloads with same IDs, but different sourceUrls pointing to the same full-text, can be related with different datasources
// (especially for IDs of type: "doiboost_____::XXXXXXXXXXXXXXXXXXXXX")
@ -369,7 +369,7 @@ public class FileUtils {
public String getMessageFromResponseBody(HttpURLConnection conn, boolean isError) {
StringBuilder msgStrB = new StringBuilder(500);
final StringBuilder msgStrB = new StringBuilder(500);
try ( BufferedReader br = new BufferedReader(new InputStreamReader((isError ? conn.getErrorStream() : conn.getInputStream()))) ) { // Try-with-resources
String inputLine;
while ( (inputLine = br.readLine()) != null )

View File

@ -0,0 +1,589 @@
package eu.openaire.urls_controller.util;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.controllers.UrlController;
import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.io.OutputFile;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.Callable;
@Component
public class ParquetFileUtils {
private static final Logger logger = LoggerFactory.getLogger(ParquetFileUtils.class);
public final String parquetBaseLocalDirectoryPath;
// The "@Autowire" does not work in this case (gives an NPE when it's used).
private final FileUtils fileUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Value("${hdfs.baseUrl}")
private String webHDFSBaseUrl;
private final String hdfsHttpAuthString;
private final String hdfsUserName;
@Value("${schema.payload.filePath}")
String payloadSchemaFilePath;
@Value("${schema.attempt.filePath}")
String attemptSchemaFilePath;
public Schema payloadsSchema;
public Schema attemptsSchema;
public final String parquetHDFSDirectoryPathAttempts;
public final String parquetHDFSDirectoryPathPayloads;
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
@Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${output.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath,
@Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir, FileUtils fileUtils) throws IOException
{
if ( webHDFSBaseUrl.endsWith("/") ) // We don't wand an ending slash in the url (as it causes problems when the file=path is added).
this.webHDFSBaseUrl = webHDFSBaseUrl.substring(0, (webHDFSBaseUrl.length() -1));
else
this.webHDFSBaseUrl = webHDFSBaseUrl;
if ( (hdfsUserName != null) && !hdfsUserName.isEmpty() )
this.hdfsUserName = hdfsUserName; // Set it to be used later as a url-param.
else
throw new RuntimeException("No hdfs-username was given!");
// If the "hdfsHttpAuthString" is given, then use it right away. In general this is better from "privacy perspective".
if ( (hdfsHttpAuthString != null) && !hdfsHttpAuthString.isEmpty() )
this.hdfsHttpAuthString = hdfsHttpAuthString;
else if ( (hdfsPassword != null) && !hdfsPassword.isEmpty() ) {
String authValue = (this.hdfsUserName + ":" + hdfsPassword);
this.hdfsHttpAuthString = ("Basic " + Base64.getEncoder().encodeToString(authValue.getBytes(StandardCharsets.UTF_8)));
}
else // At this point, all credential-checks have been made and there is no way the Controller can continue.
throw new RuntimeException("No hdfs-credentials were given, in any form!");
//logger.debug("\"hdfsHttpAuthString\": " + this.hdfsHttpAuthString); // DEBUG!
if ( ! parquetBaseDirectoryPath.endsWith(File.separator) )
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath + File.separator;
else
this.parquetBaseLocalDirectoryPath = parquetBaseDirectoryPath;
java.nio.file.Path parquetDirPath = Paths.get(this.parquetBaseLocalDirectoryPath);
if ( !Files.isDirectory(parquetDirPath) ) {
Files.createDirectories(parquetDirPath);
}
// Create the remote directories for uploading the parquet-files, if those directories do not exist.
// The limited-permissions user in use, does not have permission to acces other users' created directories, so we have to make sure it creates its own.
if ( !hdfsParquetBaseDir.endsWith("/") )
hdfsParquetBaseDir += "/";
this.parquetHDFSDirectoryPathPayloads = hdfsParquetBaseDir + "payloads/";
this.parquetHDFSDirectoryPathAttempts = hdfsParquetBaseDir + "attempts/";
this.fileUtils = fileUtils;
createRemoteParquetDirectories(hdfsParquetBaseDir);
}
public Schema parseSchema(String schemaFilePath) {
try {
return (new Schema.Parser()).parse(Files.newInputStream(Paths.get(schemaFilePath)));
} catch (Throwable e) {
logger.error("", e);
return null;
}
}
public List<Callable<Void>> getTasksForCreatingAndUploadingParquetFiles(List<UrlReport> urlReports, int sizeOUrlReports, long curReportAssignments, String currentParquetPath, FileUtils.UploadFullTextsResponse uploadFullTextsResponse)
{
// Split the "UrlReports" into some sub-lists.
List<List<UrlReport>> subLists;
// Pre-define the tasks to run.
List<Callable<Void>> callableTasks = new ArrayList<>(6);
// One thread will handle the inserts to the "payload" table and the others to the "attempt" table. This way there will be as little blocking as possible (from the part of Impala).
int sizeOfEachSubList = (int)(sizeOUrlReports * 0.2);
if ( sizeOfEachSubList > 10 )
{
subLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
// The above will create some sub-lists, each one containing 20% of total amount.
int subListsSize = subLists.size();
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
createAndLoadParquetDataIntoAttemptTable(finalI, subLists.get(finalI), curReportAssignments, currentParquetPath);
return null;
});
}
} else {
// If the "urlReports" are so few, that we cannot get big "sublists", assign a single task to handle all the attempts.
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
createAndLoadParquetDataIntoAttemptTable(0, urlReports, curReportAssignments, currentParquetPath);
return null;
});
}
if ( uploadFullTextsResponse != FileUtils.UploadFullTextsResponse.unsuccessful )
{
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the payloadsSchema. Exiting.."); // The cause is already logged inside the above method.
System.exit(88); // Exit the whole app, as it cannot add the results to the database!
}
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
createAndLoadParquetDataIntoPayloadTable(urlReports, curReportAssignments, currentParquetPath);
return null;
});
}
if ( (attemptsSchema == null) // Parse the schema if it's not already parsed.
&& ((attemptsSchema = parseSchema(attemptSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the attemptsSchema. Exiting.."); // The cause is already logged inside the above method.
System.exit(89); // Exit the whole app, as it cannot add the results to the database!
}
return callableTasks;
}
public void createAndLoadParquetDataIntoAttemptTable(int attemptsIncNum, List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
{
List<GenericData.Record> recordList = new ArrayList<>();
GenericData.Record record;
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null);
}
try {
record = new GenericData.Record(attemptsSchema);
record.put("id",payload.getId());
record.put("original_url", payload.getOriginal_url());
Timestamp timestamp = payload.getTimestamp_acquired();
record.put("date", (timestamp != null) ? timestamp.getTime() : System.currentTimeMillis());
record.put("status", urlReport.getStatus().toString());
record.put("error_class", String.valueOf(error.getType()));
record.put("error_message", error.getMessage());
recordList.add(record);
} catch (Exception e) {
logger.error("Failed to create an attempt record!", e);
}
}
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No attempts are available to be inserted to the database!");
return;
}
String fileName = UrlController.assignmentsBatchCounter.get() + "_attempts_" + attemptsIncNum + ".parquet";
//logger.debug("Going to write " + recordsSize + " attempt-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = currentParquetPath + fileName;
if ( writeToParquet(recordList, attemptsSchema, fullFilePath) ) {
//logger.debug("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "attempt" Impala table.
uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathAttempts);
// The possible error-message returned from the above method, is already logged by the Controller.
}
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
}
public void createAndLoadParquetDataIntoPayloadTable(List<UrlReport> urlReports, long curReportAssignments, String currentParquetPath)
{
List<GenericData.Record> recordList = new ArrayList<>();
GenericData.Record record;
for ( UrlReport urlReport : urlReports )
{
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments + "\n" + urlReport);
continue;
}
String fileLocation = payload.getLocation();
if ( fileLocation == null ) // We want only the records with uploaded full-texts in the "payload" table.
continue;
try {
record = new GenericData.Record(payloadsSchema);
record.put("id", payload.getId());
record.put("original_url", payload.getOriginal_url());
record.put("actual_url", payload.getActual_url());
Timestamp timestamp = payload.getTimestamp_acquired();
record.put("date", (timestamp != null) ? timestamp.getTime() : System.currentTimeMillis());
record.put("mimetype", payload.getMime_type());
Long size = payload.getSize();
record.put("size", ((size != null) ? String.valueOf(size) : null));
record.put("hash", payload.getHash());
record.put("location", fileLocation);
record.put("provenance", payload.getProvenance());
recordList.add(record);
} catch (Exception e) {
logger.error("Failed to create a payload record!", e);
}
}
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No payloads are available to be inserted to the database!");
return;
}
String fileName = UrlController.assignmentsBatchCounter.get() + "_payloads.parquet";
//logger.debug("Going to write " + recordsSize + " payload-records to the parquet file: " + fileName); // DEBUG!
String fullFilePath = currentParquetPath + fileName;
if ( writeToParquet(recordList, payloadsSchema, fullFilePath) ) {
//logger.debug("Parquet file \"" + fileName + "\" was created and filled."); // DEBUG!
// Upload and insert the data to the "payload" Impala table.
uploadParquetFileToHDFS(fullFilePath, fileName, parquetHDFSDirectoryPathPayloads);
// The possible error-message returned from the above method, is already logged by the Controller.
}
// Multiple parquet files will be created and filled at the same time, before they are uploaded and moved into the database table.
// Each thread serving a worker, will have its own parquet subdirectory, which will be deleted after inserting the parquet data to the database.
}
public boolean writeToParquet(List<GenericData.Record> recordList, Schema schema, String fullFilePath)
{
OutputFile outputFile;
try {
outputFile = HadoopOutputFile.fromPath(new Path(fullFilePath), new Configuration());
//logger.debug("Created the parquet " + outputFile); // DEBUG!
} catch (Throwable e) {
logger.error("", e);
return false;
}
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile).withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY).build())
{
//logger.debug("Going to write to \"" + fullFilePath + "\" the record list: " + recordList); // DEBUG!
for ( GenericRecord record : recordList ) {
//logger.debug("Writing to \"" + fullFilePath + "\" the record: " + record); // DEBUG!
writer.write(record);
}
} catch (Throwable e) { // The simple "Exception" may not be thrown here, but an "Error" may be thrown. "Throwable" catches EVERYTHING!
logger.error("Problem when creating the \"ParquetWriter\" object or when writing a record with it!", e); // Last time, I got an "NoSuchMethodError", because of a problem in the AvroSchema file: (java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;).
// {"name": "date", "type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]},
return false;
}
//logger.debug("Done writing to \"" + fullFilePath + "\""); // DEBUG!
return true;
}
public String uploadParquetFileToHDFS(String parquetFileFullLocalPath, String parquetFileName, String parquetRemoteDirectory)
{
/*
Create the new file in path.
curl -u <USERNAME> -i -X PUT "https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1/tmp/parquet_uploads/newFile.parquet?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>"
Gives a redirect to a DATANODE.
LOCATION: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE...
Create a new PUT request.
curl -u <USERNAME> -i -X PUT -T <PATH_TO_LOCAL_FILE> "https://<DATANODE>:<PORT>/webhdfs/v1/tmp/parquet_uploads/newFile.parquet?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>"
*/
try {
// Check if the parquet file exists locally.
File parquetFile = new File(parquetFileFullLocalPath);
if ( ! parquetFile.isFile() ) {
String errorMsg = "The parquet file \"" + parquetFileFullLocalPath + "\" does not exist!";
logger.error(errorMsg);
return errorMsg;
}
// https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1/tmp/parquet_uploads/<parquetFileName.parquet>?op=CREATE&overwrite=false&blocksize=1048576&permission=644&buffersize=10485760&user.name=<USERNAME>
String parquetFileURI = webHDFSBaseUrl + parquetRemoteDirectory + parquetFileName;
URL url = new URL(parquetFileURI + "?op=CREATE&overwrite=true&blocksize=1048576&permission=777&buffersize=10485760&user.name=" + hdfsUserName);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode != 307 ) {
String errorMsg = "We expected a \"307 TEMPORARY_REDIRECT\" response, but got: \"" + statusCode + "\" instead!";
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return errorMsg;
}
// We get the location in which we were redirected to. It's a "datanode".
String location = conn.getHeaderField("Location");
if ( location == null ) {
String errorMsg = "We expected a \"location\" from the last \"307 TEMPORARY_REDIRECT\" response, but got nothing..";
logger.error(errorMsg + "\n\n" + conn.getHeaderFields());
return errorMsg;
}
//logger.debug("The target location is: " + location + "\nWill do a silent redirect to HTTPS."); // DEBUG!
location = StringUtils.replace(location, "http:", "https:", 1);
// Unless we handle this here, we have to either complicate the process by handling the https-redirect or in any-way getting a hit in performance by having one more step each time ww want to upload a file.
conn = (HttpURLConnection) (new URL(location)).openConnection(); // This already contains the "user.name" parameter.
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setRequestProperty("content-type", "application/octet-stream");
conn.setDoOutput(true);
conn.setInstanceFollowRedirects(true); // It is possible that the "location was"
conn.connect();
// Write the parquet file.
try ( InputStream inputS = Files.newInputStream(parquetFile.toPath()); OutputStream outS = conn.getOutputStream()) {
int readByte = -1; while ( (readByte = inputS.read()) != -1 ) outS.write(readByte);
}
statusCode = conn.getResponseCode();
if ( statusCode != 201 ) {
String errorMsg = "We expected a \"201 Created\" response, but got: \"" + statusCode + "\" instead, for url: " + location;
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return errorMsg;
}
location = conn.getHeaderField("Location"); // The response should have a "location" header as per the official documentation.
if ( location == null ) {
//logger.warn("We expected a \"location\" from the last \"201 Created\" response, but got nothing..\n" + conn.getHeaderFields() + "\n\nIt's fine."); // The "201-Created" response has no content inside.
// Do not return here. The absence of the location is not critical. We can still create it on our own.
location = parquetFileURI; // This location does not include the "user.name" parameter.
}
//logger.debug("The file \"" + parquetFileName + "\" was successfully uploaded. It's location is: " + location); // DEBUG!
// Important note!
// Using the "load data inpath" command, he files are MOVED, not copied! So we don't have to delete them afterwards.
// See: https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_load_data.html
} catch (Throwable e) {
String errorMsg = "Error while importing parquet data into the Impala Database, through WebHDFS!\n" + e;
logger.error(errorMsg);
return errorMsg;
}
return null;
}
public String loadParquetDataIntoTable(String remoteParquetDataDirectory, String tableName)
{
// Import the data from the parquet file into the database's table.
String loadParquetInTableQuery = "load data inpath '" + remoteParquetDataDirectory + "' into table " + ImpalaConnector.databaseName + "." + tableName;
try {
jdbcTemplate.execute(loadParquetInTableQuery);
} catch (Exception e) {
return ImpalaConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged.
}
//logger.debug("The data from \"" + remoteParquetDataDirectory + "\" was loaded into the " + tableName + " table."); // DEBUG!
return null;
}
public boolean createRemoteParquetDirectories(String parquetBaseRemoteDirectory)
{
// Check if the remote directories exist. If so, then return and continue with execution.
// If the directories do not exist, then make them in two requests.
// The WebHDFS uses the "mkdirs" operations which creates all the non-existent directories in the specified path.
// So with one request we will create the "parquet_uploads/" and the "parquet_uploads/payloads/" and with the seconds request, the "parquet_uploads/attempts/" directory.
String mkdirsParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName;
logger.info("Going to check if the remote parquet directories exist.");
String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName;
// Get the "fileStatuses" of the directories (main and subdirectories) in one request.
try {
URL url = new URL(listMainDirectoryUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( (statusCode != 200) && (statusCode != 404) ) {
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + listMainDirectoryUrl;
String retrievedMessage = fileUtils.getMessageFromResponseBody(conn, true);
logger.error(errorMsg + "\n\n" + ((retrievedMessage != null) ? retrievedMessage : ""));
return false;
}
if ( statusCode == 404 ) {
logger.info("The directory \"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkdirsParams);
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkdirsParams);
return true;
}
else {
// Check the json-response, to see if all the subdirectories exist.
// Take the json-response.
String jsonResponse = fileUtils.getMessageFromResponseBody(conn, false);
if ( (jsonResponse == null) || jsonResponse.isEmpty() ) {
logger.error("The \"jsonResponse\" was not retrieved!");
return false;
}
// Else, if an error message exists inside the response, then we will be alerted when parsing the Json bellow.
//logger.debug("\"jsonResponse\":\n" + jsonResponse); // DEBUG!
boolean foundAttemptsDir = false;
boolean foundPayloadsDir = false;
try { // Parse the jsonData
JSONObject jObj = new JSONObject(jsonResponse); // Construct a JSONObject from the retrieved jsonData.
JSONObject entityObject = jObj.getJSONObject("FileStatuses");
//logger.debug("EntityObject: " + entityObject.toString()); // DEBUG!
JSONArray directoryStatuses = entityObject.getJSONArray("FileStatus");
//logger.debug("directoryStatuses: " + directoryStatuses.toString()); // DEBUG!
// In case no fileStatuses are found, the follow for-loop will not run.
for ( Object fileStatusObject : directoryStatuses ) {
JSONObject fileStatusJsonObject = (JSONObject) fileStatusObject;
//logger.debug("FileStatusJsonObject: " + fileStatusJsonObject.toString()); // DEBUG!
String dirPath = fileStatusJsonObject.getString("pathSuffix");
//logger.debug("DirPath: " + dirPath); // DEBUG!
if ( dirPath.equals("attempts") )
foundAttemptsDir = true;
else if ( dirPath.equals("payloads") )
foundPayloadsDir = true;
}
} catch ( JSONException je ) { // In case any of the above "json-keys" was not found.
logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse);
return false;
}
// IMPORTANT NOTE: It is possible that the ".../payloads" dir exists, but the ".../attempts" dir does not (in case of remote filesystem failure of by accidental deletion by some other user).
// Also, it is possible that the Controller was terminated before creating all the directories, or that in the previous executions the second "create"-request failed, resulting in Controller's shut down.
// For each missing subdirectories, run the mkdirs-request.
if ( !foundAttemptsDir ) {
logger.debug("The \"" + parquetHDFSDirectoryPathAttempts + "\" was not found! Going to create it.");
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkdirsParams);
} else
logger.info("The \"" + parquetHDFSDirectoryPathAttempts + "\" was found.");
if ( !foundPayloadsDir ) {
logger.debug("The \"" + parquetHDFSDirectoryPathPayloads + "\" was not found! Going to create it.");
createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloads + mkdirsParams);
} else
logger.info("The \"" + parquetHDFSDirectoryPathPayloads + "\" was found.");
}
} catch (Exception e) {
logger.error("", e);
return false;
}
return true;
}
public boolean createHDFSDirectory(String createDirectoryUrl)
{
try {
URL url = new URL(createDirectoryUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.setInstanceFollowRedirects(false); // We will handle the redirection ourselves.
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode != 200 ) {
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + createDirectoryUrl;
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return false;
}
} catch (Exception e) {
logger.error("", e);
return false;
}
return true;
}
// Use this if we decide to delete undeleted files (probably due to failed "load" attempts). For now, it's better to leave them there, in order to fix potential problems more easily.
public String deleteFileFromHDFS(String fileLocation, String parquetFileName) throws Exception
{
// Delete the file from the temporal storage on HDFS.
HttpURLConnection conn = (HttpURLConnection) (new URL(fileLocation + "?op=DELETE&user.name=" + hdfsUserName)).openConnection();
conn.setRequestMethod("DELETE");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode == 200 ) {
logger.debug("The file \"" + parquetFileName + "\" was successfully deleted."); // DEBUG!
} else {
String errorMsg = "The file \"" + parquetFileName + "\" could not be deleted! Response-code: " + statusCode;
logger.error(errorMsg);
return errorMsg;
}
return null;
}
}

View File

@ -26,6 +26,7 @@ services.pdfaggregation.controller.s3.bucketName = xa
services.pdfaggregation.controller.s3.shouldEmptyBucket = false
services.pdfaggregation.controller.s3.shouldShowAllS3Buckets = true
# Database
spring.datasource.url=jdbc:impala://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/
spring.datasource.username=
@ -43,9 +44,27 @@ spring.datasource.hikari.idleTimeout=600000
logging.level.root=INFO
logging.level.org.springframework.web=INFO
logging.level.org.springframework.security=WARN
logging.level.org.apache.hadoop.io.compress=WARN
logging.level.eu.openaire.urls_controller=DEBUG
spring.output.ansi.enabled=always
# Parquet settings
hdfs.baseUrl=https://iis-cdh5-test-gw.ocean.icm.edu.pl/webhdfs/v1
# HTTP-Authorization --> Authorization: Basic Base64Encode(username:password)
# Give the credentials by either giving the Http-Auth-string AND the username (used as parameter in the WebHdfs-requests)
# Or by giving the username AND the password, in order for the program to crete the auth-String programmatically.
# The first approach is intended for more privacy, while the second for more ease. Either way, all three should be uncommented, no matter which ones are used.
hdfs.httpAuth=
hdfs.userName=
hdfs.password=
schema.payload.filePath=src/main/resources/schemas/payload.avsc
schema.attempt.filePath=src/main/resources/schemas/attempt.avsc
output.parquetLocalDirectoryPath=parquetFiles/
hdfs.parquetRemoteBaseDirectoryPath=/tmp/parquet_uploads/
## MULTIPART (MultipartProperties)
# Enable multipart uploads

View File

@ -0,0 +1,13 @@
{
"type": "record",
"namespace": "UrlsController",
"name": "Attempt",
"fields": [
{"name": "id", "type": "string"},
{"name": "original_url", "type": "string"},
{"name": "date", "type" : {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "status", "type": "string"},
{"name": "error_class", "type": ["null","string"]},
{"name": "error_message", "type": ["null","string"]}
]
}

View File

@ -0,0 +1,16 @@
{
"type": "record",
"namespace": "UrlsController",
"name": "Payload",
"fields": [
{"name": "id", "type": "string"},
{"name": "original_url", "type": "string"},
{"name": "actual_url", "type": "string"}, // This should not be null, since only the "found" pdf-publications are processed in parquet.
{"name": "date", "type" : {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "mimetype", "type": "string"},
{"name": "size", "type": ["null","string"]},
{"name": "hash", "type": "string"},
{"name": "location", "type": "string"}, // This is not null, a check is added before processing any record.
{"name": "provenance", "type": "string"}
]
}