- Fix a "@JsonProperty" annotation inside "Payload.java".

- Fix a "@Value" annotation inside "FileUtils.java".
- Add a new database and show its name along with the initial's name in the logs.
- Code cleanup and improvement.
This commit is contained in:
Lampros Smyrnaios 2022-04-05 00:01:44 +03:00
parent 5e4fad2479
commit a23c918a42
5 changed files with 30 additions and 26 deletions

View File

@ -19,15 +19,15 @@ public class ImpalaConnector {
@Autowired @Autowired
private JdbcTemplate jdbcTemplate; private JdbcTemplate jdbcTemplate;
private final String oldDatabaseName; private final String initialDatabaseName;
private final String databaseName; private final String databaseName;
public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database. public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database.
public ImpalaConnector(@Value("${services.pdfaggregation.controller.db.oldDatabaseName}") String oldDatabaseName, public ImpalaConnector(@Value("${services.pdfaggregation.controller.db.initialDatabaseName}") String initialDatabaseName,
@Value("${services.pdfaggregation.controller.db.databaseName}") String databaseName) { @Value("${services.pdfaggregation.controller.db.databaseName}") String databaseName) {
this.oldDatabaseName = oldDatabaseName; this.initialDatabaseName = initialDatabaseName;
this.databaseName = databaseName; this.databaseName = databaseName;
} }
@ -46,20 +46,20 @@ public class ImpalaConnector {
private void createDatabase() { private void createDatabase() {
logger.info("Going to create the database and the tables, if they do not exist. Also will fill some tables with data from OpenAIRE."); logger.info("Going to create (if not exist) the database \"" + databaseName + "\" and its tables. Also will fill some tables with data from database \"" + initialDatabaseName + "\".");
jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + databaseName); jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + databaseName);
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + oldDatabaseName + ".publication"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication stored as parquet as select * from " + initialDatabaseName + ".publication");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + oldDatabaseName + ".publication_pids"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_pids stored as parquet as select * from " + initialDatabaseName + ".publication_pids");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_pids"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_pids");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + oldDatabaseName + ".publication_urls"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".publication_urls stored as parquet as select * from " + initialDatabaseName + ".publication_urls");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_urls"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".publication_urls");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + oldDatabaseName + ".datasource"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".datasource stored as parquet as select * from " + initialDatabaseName + ".datasource");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".datasource"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".datasource");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet");

View File

@ -243,15 +243,16 @@ public class UrlController {
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg); return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorMsg);
} }
int sizeOUrlReports = 0;
List<UrlReport> urlReports = workerReport.getUrlReports(); List<UrlReport> urlReports = workerReport.getUrlReports();
if ( (urlReports == null) || urlReports.isEmpty() ) { if ( (urlReports == null) || ((sizeOUrlReports = urlReports.size()) == 0) ) {
String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!"; String errorMsg = "The given \"WorkerReport\" from worker with ID \"" + curWorkerId + "\" was empty (without any UrlReports)!";
logger.error(errorMsg); logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMsg);
} }
long curReportAssignments = workerReport.getAssignmentRequestCounter(); long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database."); logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + sizeOUrlReports + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId); FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
@ -259,12 +260,12 @@ public class UrlController {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!"); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
} }
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments); logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available and continue with writing the attempts and the payloads. // The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available and continue with writing the attempts and the payloads.
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false); fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
} }
else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments); logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignments);
// Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons. // Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons.
final String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; final String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
@ -276,7 +277,7 @@ public class UrlController {
final AtomicInteger failedCount = new AtomicInteger(0); final AtomicInteger failedCount = new AtomicInteger(0);
// Split the "UrlReports" into some sub-lists // Split the "UrlReports" into some sub-lists
int sizeOfEachSubList = (int)(urlReports.size() * 0.2); int sizeOfEachSubList = (int)(sizeOUrlReports * 0.2);
List<List<UrlReport>> subLists = Lists.partition(urlReports, sizeOfEachSubList); List<List<UrlReport>> subLists = Lists.partition(urlReports, sizeOfEachSubList);
// The above will create some sub-lists, each one containing 20% of total amount. // The above will create some sub-lists, each one containing 20% of total amount.
@ -309,7 +310,8 @@ public class UrlController {
return null; return null;
}); });
for ( int i = 0; i < subLists.size(); ++i ) { int subListsSize = subLists.size();
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i; int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. callableTasks.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
runInsertsToAttemptTable(subLists.get(finalI), curReportAssignments, insertIntoAttemptBaseQuery, attemptArgTypes, failedCount); runInsertsToAttemptTable(subLists.get(finalI), curReportAssignments, insertIntoAttemptBaseQuery, attemptArgTypes, failedCount);
@ -323,7 +325,7 @@ public class UrlController {
insertsExecutor.invokeAll(callableTasks); insertsExecutor.invokeAll(callableTasks);
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled. } catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage()); logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage());
// TODO - This is a very rare case, but what should be done..? // This is a very rare casa. At the moment, we just move on with table-merging.
} catch (Exception e) { } catch (Exception e) {
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage(); String errorMsg = "Unexpected error when inserting into the \"payload\" and \"attempt\" tables in parallel! " + e.getMessage();
@ -332,7 +334,7 @@ public class UrlController {
} }
int failedQueries = failedCount.get(); int failedQueries = failedCount.get();
String failedQueriesMsg = failedQueries + " out of " + (urlReports.size() *2) + " failed to be processed!"; String failedQueriesMsg = failedQueries + " out of " + (sizeOUrlReports *2) + " failed to be processed!";
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables" + ((failedQueries > 0) ? (", although " + failedQueriesMsg) : ".") logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables" + ((failedQueries > 0) ? (", although " + failedQueriesMsg) : ".")
+ " Going to merge the parquet files for those tables."); + " Going to merge the parquet files for those tables.");
@ -368,7 +370,7 @@ public class UrlController {
String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE"; String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE";
try { try {
jdbcTemplate.execute(dropCurrentAssignmentsQuery); jdbcTemplate.execute(dropCurrentAssignmentsQuery);
return null; return null; // All good. No error-message.
} catch (Exception e) { } catch (Exception e) {
return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e); return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e);
} }

View File

@ -49,8 +49,8 @@ public class Payload {
@JsonProperty("provenance") @JsonProperty("provenance")
private String provenance; // "crawl:<PluginName>" private String provenance; // "crawl:<PluginName>"
@JsonProperty("provenance") @JsonProperty("datasourceId")
private String datasourceId; // "crawl:<PluginName>" private String datasourceId; // This is NOT inserted into the "payload"-table.
public Payload() {} public Payload() {}

View File

@ -87,8 +87,9 @@ public class FileUtils {
} }
@Value("services.pdfaggregation.controller.baseTargetLocation") @Value("${services.pdfaggregation.controller.baseTargetLocation}")
private String baseTargetLocation; private String baseTargetLocation;
public static DecimalFormat df = new DecimalFormat("0.00"); public static DecimalFormat df = new DecimalFormat("0.00");
private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:()]+\\.[\\w]{2,10})$"); private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:()]+\\.[\\w]{2,10})$");
@ -233,7 +234,7 @@ public class FileUtils {
} }
// Iterate over the files and upload them to S3. // Iterate over the files and upload them to S3.
int numUploadedFiles = 0; //int numUploadedFiles = 0;
for ( String fileName : fileNames ) for ( String fileName : fileNames )
{ {
String fileFullPath = targetDirectory + fileName; String fileFullPath = targetDirectory + fileName;
@ -260,7 +261,7 @@ public class FileUtils {
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath); String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url); setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url);
numUploadedFiles ++; //numUploadedFiles ++;
} catch (Exception e) { } catch (Exception e) {
logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e); logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e);
} }
@ -276,7 +277,7 @@ public class FileUtils {
} }
} // End of batches. } // End of batches.
updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an s3Url have < null > file-data (some batches or uploads might have failed). updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed).
deleteDirectory(curAssignmentsBaseDir); deleteDirectory(curAssignmentsBaseDir);
if ( failedBatches == numOfBatches ) { if ( failedBatches == numOfBatches ) {

View File

@ -4,8 +4,9 @@ server.port = 1880
server.servlet.context-path=/api server.servlet.context-path=/api
#Service config #Service config
services.pdfaggregation.controller.db.oldDatabaseName = pdfaggregation_i services.pdfaggregation.controller.db.initialDatabaseName = pdfaggregation_i
services.pdfaggregation.controller.db.databaseName = pdfAggregationDatabase services.pdfaggregation.controller.db.databaseName = pdfaggregationdatabase_new_s3_names
services.pdfaggregation.controller.baseTargetLocation = /tmp/ services.pdfaggregation.controller.baseTargetLocation = /tmp/
services.pdfaggregation.controller.maxAttemptsPerRecord = 3 services.pdfaggregation.controller.maxAttemptsPerRecord = 3
services.pdfaggregation.controller.assignmentLimit = 10000 services.pdfaggregation.controller.assignmentLimit = 10000