diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java index 47837fe..b5bcbfd 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -88,7 +88,7 @@ public class ImpalaConnector { // Drop the "current_assignment" table. It is a temporary table which is created on-demand during execution. jdbcTemplate.execute("DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE"); - jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` bigint) stored as parquet"); + jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, assignments_batch_counter bigint, `date` bigint) stored as parquet"); jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".assignment"); jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` bigint, status string, error_class string, error_message string) stored as parquet"); diff --git a/src/main/java/eu/openaire/urls_controller/controllers/TestController.java b/src/main/java/eu/openaire/urls_controller/controllers/TestController.java index 57f8efb..f30bda1 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/TestController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/TestController.java @@ -59,6 +59,7 @@ public class TestController { boolean isFirstRun = true; boolean assignmentsLimitReached = false; Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records. + long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet(); // Start loading urls. while ( true ) { @@ -78,7 +79,7 @@ public class TestController { } int randomNum = GenericUtils.getRandomNumber(1, 5); - assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp)); + assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, curAssignmentsBatchCounter, timestamp)); }// end pairs-for-loop if ( assignmentsLimitReached ) { @@ -91,7 +92,6 @@ public class TestController { if ( scanner != null ) // Check if the initial value is null. scanner.close(); - long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet(); logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + testFileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId); return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments)); } diff --git a/src/main/java/eu/openaire/urls_controller/models/Assignment.java b/src/main/java/eu/openaire/urls_controller/models/Assignment.java index b002f6d..116befd 100644 --- a/src/main/java/eu/openaire/urls_controller/models/Assignment.java +++ b/src/main/java/eu/openaire/urls_controller/models/Assignment.java @@ -13,6 +13,7 @@ import java.sql.Timestamp; "original_url", "datasource", "workerId", + "assignments_batch_counter", "timestamp" }) public class Assignment { @@ -29,17 +30,22 @@ public class Assignment { @JsonProperty("workerid") private String workerId; + @JsonProperty("assignments_batch_counter") + private long assignmentsBatchCounter; + @JsonProperty("timestamp") private Timestamp timestamp; public Assignment() {} - public Assignment(String id, String originalUrl, Datasource datasource, String workerId, Timestamp timestamp) { + + public Assignment(String id, String originalUrl, Datasource datasource, String workerId, long assignmentsBatchCounter, Timestamp timestamp) { this.id = id; this.originalUrl = originalUrl; this.datasource = datasource; this.workerId = workerId; + this.assignmentsBatchCounter = assignmentsBatchCounter; this.timestamp = timestamp; } @@ -76,6 +82,14 @@ public class Assignment { this.workerId = workerId; } + public long getAssignmentsBatchCounter() { + return assignmentsBatchCounter; + } + + public void setAssignmentsBatchCounter(long assignmentsBatchCounter) { + this.assignmentsBatchCounter = assignmentsBatchCounter; + } + public Timestamp getTimestamp() { return timestamp; } @@ -91,7 +105,9 @@ public class Assignment { ", originalUrl='" + originalUrl + '\'' + ", datasource=" + datasource + ", workerId='" + workerId + '\'' + + ", assignmentsBatchCounter=" + assignmentsBatchCounter + ", timestamp=" + timestamp + '}'; } + } diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 2e86409..eea0020 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -145,6 +145,8 @@ public class UrlsServiceImpl implements UrlsService { List assignments = new ArrayList<>(assignmentsLimit); + long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet(); + ImpalaConnector.databaseLock.lock(); String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery); @@ -160,6 +162,7 @@ public class UrlsServiceImpl implements UrlsService { jdbcTemplate.query(getAssignmentsQuery, rs -> { Assignment assignment = new Assignment(); assignment.setWorkerId(workerId); + assignment.setAssignmentsBatchCounter(curAssignmentsBatchCounter); assignment.setTimestamp(timestamp); Datasource datasource = new Datasource(); try { // For each of the 4 columns returned. The indexing starts from 1 @@ -207,7 +210,7 @@ public class UrlsServiceImpl implements UrlsService { logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker."); // Write the Assignment details to the assignment-table. - String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pubid, url, '" + workerId + "', " + timestampMillis + String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pubid, url, '" + workerId + "', " + curAssignmentsBatchCounter + ", " + timestampMillis + "\nfrom " + ImpalaConnector.databaseName + ".current_assignment"; try { @@ -237,7 +240,6 @@ public class UrlsServiceImpl implements UrlsService { ImpalaConnector.databaseLock.unlock(); - long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet(); logger.info("Sending batch-assignments_" + curAssignmentsBatchCounter + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + "."); return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments)); } @@ -336,7 +338,7 @@ public class UrlsServiceImpl implements UrlsService { // This is a very rare case. At the moment, we just move on with table-merging. } catch (RuntimeException re) { ImpalaConnector.databaseLock.lock(); - String assignmentErrorMsg = deleteWorkerAssignments(curWorkerId); + String assignmentErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter); ImpalaConnector.databaseLock.unlock(); String errorMsg = re.getMessage(); if ( assignmentErrorMsg != null ) @@ -388,7 +390,7 @@ public class UrlsServiceImpl implements UrlsService { } } - mergeErrorMsg = deleteWorkerAssignments(curWorkerId); + mergeErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); @@ -449,12 +451,12 @@ public class UrlsServiceImpl implements UrlsService { } - private String deleteWorkerAssignments(String curWorkerId) + private String deleteAssignmentsBatch(long givenAssignmentsBatchCounter) { // This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data. // Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table. // We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload_aggregated" table for previously handled tasks. - return fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId); + return fileUtils.mergeParquetFiles("assignment", " WHERE assignments_batch_counter != ", givenAssignmentsBatchCounter); } diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 6f2abff..f1f8c55 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -81,7 +81,7 @@ public class FileUtils { * Renames the clone to the original's name. * Returns the errorMsg, if an error appears, otherwise is returns "null". */ - public String mergeParquetFiles(String tableName, String whereClause, String parameter) { + public String mergeParquetFiles(String tableName, String whereClause, Object parameter) { String errorMsg; if ( (tableName == null) || tableName.isEmpty() ) { errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!"; @@ -89,14 +89,14 @@ public class FileUtils { return errorMsg; // Return the error-msg to indicate that something went wrong and pass it down to the Worker. } - // Make sure the following are empty strings (in case another method call this one in the future with a null-value). - if ( whereClause == null ) - whereClause = ""; + // Make sure the following are empty strings. + whereClause = (whereClause != null) ? (whereClause + " ") : ""; if ( parameter == null ) parameter = ""; - else - parameter = " '" + parameter + "'"; // This will be a "string-check", thus the single-quotes. + else if ( parameter instanceof String ) + parameter = "'" + parameter + "'"; // This will be a "string-check", thus the single-quotes. + // Else it is a "long", it will be used as is. // Create a temp-table as a copy of the initial table. try {