diff --git a/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java b/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java index 360042e..c405640 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java @@ -11,7 +11,6 @@ import org.springframework.web.bind.annotation.RestController; import java.sql.Connection; import java.sql.ResultSet; -import java.sql.SQLException; import java.util.ArrayList; import java.util.List; @@ -49,15 +48,10 @@ public class ImpalaController { } catch (Exception e) { String errorMsg = "Problem when executing \"getAssignmentsQuery\": " + query; - logger.error(errorMsg); - e.printStackTrace(); + logger.error(errorMsg, e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } finally { - try { - con.close(); - } catch (SQLException sqle) { - logger.error("Could not close the connection with the Impala-database.\n" + sqle); - } + ImpalaConnector.closeConnection(con); } } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index 3a6b0ab..8484a9a 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -16,7 +16,6 @@ import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import java.sql.*; -import java.sql.Date; import java.util.*; import java.util.concurrent.atomic.AtomicLong; @@ -47,8 +46,7 @@ public class UrlController { assignmentsLimit = ControllerConstants.ASSIGNMENTS_LIMIT; } - - String getAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" + + String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" + "from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" + "select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" + "from " + ImpalaConnector.databaseName + ".publication p\n" + @@ -59,25 +57,16 @@ public class UrlController { " select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" + " union all\n" + " select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" + - "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= ? and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry')\n" + - ") as non_distinct_results\n" + + "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + ControllerConstants.MAX_ATTEMPTS_PER_RECORD + " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry')\n" + + "limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" + "order by coalesce(attempt_count, 0), reverse(pubid), url\n" + - "limit ?) as getAssignmentsQuery"; + "limit " + assignmentsLimit + ") as findAssignmentsQuery"; // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. - // TODO - If we add more limits it could be faster.. Inner queries could have a limit of e.g. < assignmentsLimit ^ 2 > - // The LIMIT of < assignmentsLimit > should be kept in the end, as we want 10_000 of distinct results. - - // This is just for tests without the attempts, payloads and the assignments - /*String getAssignmentsQuery = "select * from (select distinct pubid, url, datasourceid, datasourcetype from (\n" + - "select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype\n" + - "from " + ImpalaConnector.databaseName + ".publication p\n" + - "join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" + - "join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" + - "where d.allow_harvest=true " + - "order by reverse(p.id), pu.url) as distinct_results\n" + - "limit ? ) as getAssignmentsQuery";*/ + String createAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; + String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment"; + String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment"; List assignments = new ArrayList<>(assignmentsLimit); @@ -89,30 +78,51 @@ public class UrlController { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!"); } + PreparedStatement createCurrentAssignmentsPreparedStatement = null; + try { + createCurrentAssignmentsPreparedStatement = con.prepareStatement(createAssignmentsQuery); + createCurrentAssignmentsPreparedStatement.execute(); + } catch (SQLException sqle) { + ImpalaConnector.databaseLock.unlock(); + String errorMsg = ImpalaConnector.handlePreparedStatementException("createAssignmentsQuery", createAssignmentsQuery, "createCurrentAssignmentsPreparedStatement", createCurrentAssignmentsPreparedStatement, con, sqle); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } finally { + try { + if ( createCurrentAssignmentsPreparedStatement != null ) + createCurrentAssignmentsPreparedStatement.close(); + } catch (SQLException sqle2) { + logger.error("Failed to close the \"createCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage()); + } + } + + PreparedStatement computeCurrentAssignmentsStatsPreparedStatement = null; + try { + computeCurrentAssignmentsStatsPreparedStatement = con.prepareStatement(computeCurrentAssignmentsStatsQuery); + computeCurrentAssignmentsStatsPreparedStatement.execute(); + } catch (SQLException sqle) { + ImpalaConnector.databaseLock.unlock(); + String errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } finally { + try { + if ( computeCurrentAssignmentsStatsPreparedStatement != null ) + computeCurrentAssignmentsStatsPreparedStatement.close(); + } catch (SQLException sqle2) { + logger.error("Failed to close the \"computeCurrentAssignmentsStatsPreparedStatement\"!\n" + sqle2.getMessage()); + } + } + PreparedStatement getAssignmentsPreparedStatement = null; try { getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery); - getAssignmentsPreparedStatement.setInt(1, ControllerConstants.MAX_ATTEMPTS_PER_RECORD); - getAssignmentsPreparedStatement.setInt(2, assignmentsLimit); } catch (SQLException sqle) { ImpalaConnector.databaseLock.unlock(); - String errorMsg = "Problem when creating the prepared statement for \"getAssignmentsQuery\"!\n"; - logger.error(errorMsg + sqle.getMessage()); - try { - if ( getAssignmentsPreparedStatement != null ) - getAssignmentsPreparedStatement.close(); - } catch (SQLException sqle2) { - logger.error("Could not close the \"getAssignmentsPreparedStatement\".\n" + sqle2.getMessage()); - } - try { - con.close(); - } catch (SQLException sqle2) { - logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage()); - } + String errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle); + // The "getAssignmentsPreparedStatement" will always be null here, so we do not close it. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } - Date date = new Date(System.currentTimeMillis()); // Store it here, in order to have the same for all current records. + Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records. try ( ResultSet resultSet = getAssignmentsPreparedStatement.executeQuery() ) { // Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it. @@ -128,7 +138,7 @@ public class UrlController { // The following few lines, cannot be outside the "while" loop, since the same record is returned, despite that we update the inner-values. Assignment assignment = new Assignment(); assignment.setWorkerId(workerId); - assignment.setDate(date); + assignment.setTimestamp(timestamp); Datasource datasource = new Datasource(); try { // For each of the 4 columns returned. The indexing starts from 1 assignment.setId(resultSet.getString(1)); @@ -159,7 +169,7 @@ public class UrlController { int assignmentsSize = assignments.size(); if ( assignmentsSize == 0 ) { ImpalaConnector.databaseLock.unlock(); - String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId; + String errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId; logger.error(errorMsg); ImpalaConnector.closeConnection(con); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); @@ -167,68 +177,45 @@ public class UrlController { logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker."); - // The following is a test of inserting multiple rows with a singme insert-query. If applied with a preparedStatement, then the JDBC fails with "OutOfMemory"-Error. - /*String testInsert = "INSERT INTO assignment (id,original_url,workerid,`date`) VALUES ( 'doiboost____::4e8b1f12ac3ba5a9d8fbff9872000000', 'http://dx.doi.org/10.17267/2596-3368dentistry.v6i2.586', 'worker_1', CAST('2021-10-01' AS TIMESTAMP) ) , ( 'doiboost____::4e8b1f12ac3ba5a9d8fbff9872000000', 'https://academic.microsoft.com/#/detail/2887540302', 'worker_1', CAST('2021-10-01' AS TIMESTAMP) );"; - try (Statement insertStatement = con.createStatement()) { - insertStatement.execute(testInsert); + + // Write the Assignment details to the assignment-table. + + String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n" + + "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data"; + + PreparedStatement insertAssignmentsPreparedStatement = null; + try { + insertAssignmentsPreparedStatement = con.prepareStatement(insertAssignmentsQuery); + insertAssignmentsPreparedStatement.execute(); } catch (SQLException sqle) { ImpalaConnector.databaseLock.unlock(); - String mergeErrorMsg = "Problem when executing the testInsert statement for \"" + testInsert + "\""; - logger.error(mergeErrorMsg + sqle.getMessage()); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); - }*/ - - // Write the Assignment details to the database and then send it to the worker. - String insertIntoAssignmentBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".assignment (id, original_url, workerid, date) VALUES (?, ?, ?, ?)"; - - PreparedStatement preparedInsertAssignmentStatement; - try { // We use a "PreparedStatement" to do insertions, for security and performance reasons. - preparedInsertAssignmentStatement = con.prepareStatement(insertIntoAssignmentBaseQuery); - } catch (SQLException sqle) { - ImpalaConnector.databaseLock.unlock(); - String errorMsg = "Problem when creating the prepared statement for \"insertIntoAssignmentBaseQuery\"!\n"; - logger.error(errorMsg + sqle.getMessage()); + String errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } finally { try { - con.close(); + if ( insertAssignmentsPreparedStatement != null ) + insertAssignmentsPreparedStatement.close(); } catch (SQLException sqle2) { - logger.error("Could not close the connection with the Impala-database.\n" + sqle2.getMessage()); + logger.error("Failed to close the \"insertAssignmentsPreparedStatement\"!\n" + sqle2.getMessage()); } - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } - // Before, we wanted to execute the getAssignmentQuery and take the assignments immediately, but now it's more efficient to commit all the inserts in the end. + String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector.databaseName + ".current_assignment PURGE"; + PreparedStatement dropCurrentAssignmentsPreparedStatement = null; try { - con.setAutoCommit(false); - } catch (SQLException sqle) { // There is a database-error. The latter actions will probably fail as well. - ImpalaConnector.databaseLock.unlock(); - String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!"; - logger.error(errorMsg + "\n" + sqle.getMessage()); - closePreparedStatements(preparedInsertAssignmentStatement, null, con); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } - - String tempFullQueryString = null; - for ( Assignment assignment : assignments ) { - try { - preparedInsertAssignmentStatement.setString(1, assignment.getId()); - preparedInsertAssignmentStatement.setString(2, assignment.getOriginalUrl()); - preparedInsertAssignmentStatement.setString(3, workerId); - preparedInsertAssignmentStatement.setDate(4, date); - tempFullQueryString = getAssignmentsPreparedStatement.toString(); - preparedInsertAssignmentStatement.executeUpdate(); - } catch (SQLException sqle) { - logger.error("Problem when executing the \"insertIntoAssignmentQuery\":\n" + tempFullQueryString + "\n" + sqle.getMessage() + "\n\n"); - } - }//end for-loop - - try { - con.commit(); // Send all the insert-queries to the database. + dropCurrentAssignmentsPreparedStatement = con.prepareStatement(dropCurrentAssignmentsQuery); + dropCurrentAssignmentsPreparedStatement.execute(); } catch (SQLException sqle) { ImpalaConnector.databaseLock.unlock(); - String errorMsg = "Problem when committing changes to the database!"; - logger.error(errorMsg + "\n" + sqle.getMessage()); - closePreparedStatements(preparedInsertAssignmentStatement, null, con); + String errorMsg = ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, "dropCurrentAssignmentsPreparedStatement", dropCurrentAssignmentsPreparedStatement, con, sqle); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); + } finally { + try { + if ( dropCurrentAssignmentsPreparedStatement != null ) + dropCurrentAssignmentsPreparedStatement.close(); + } catch (SQLException sqle2) { + logger.error("Failed to close the \"dropCurrentAssignmentsPreparedStatement\"!\n" + sqle2.getMessage()); + } } logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table."); @@ -236,21 +223,12 @@ public class UrlController { String mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con); if ( mergeErrorMsg != null ) { ImpalaConnector.databaseLock.unlock(); - closePreparedStatements(preparedInsertAssignmentStatement, null, con); + ImpalaConnector.closeConnection(con); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); } - try { - con.commit(); // Apply the merge. - con.setAutoCommit(true); // Restore the "auto-commit" value for this connection of the pool. - } catch (SQLException sqle) { - String errorMsg = "Problem when committing changes to the database!"; - logger.error(errorMsg , sqle);//+ "\n" + sqle.getMessage()); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } finally { - ImpalaConnector.databaseLock.unlock(); - closePreparedStatements(preparedInsertAssignmentStatement, null, con); - } + ImpalaConnector.closeConnection(con); + ImpalaConnector.databaseLock.unlock(); logger.info("Sending batch-assignments_" + assignmentsBatchCounter.incrementAndGet() + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + "."); return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(assignmentsBatchCounter.get(), assignments)); @@ -331,7 +309,7 @@ public class UrlController { preparedInsertPayloadStatement.setString(1, payload.getId()); preparedInsertPayloadStatement.setString(2, payload.getOriginal_url()); preparedInsertPayloadStatement.setString(3, payload.getActual_url()); - preparedInsertPayloadStatement.setDate(4, payload.getDate_acquired()); + preparedInsertPayloadStatement.setTimestamp(4, payload.getTimestamp_acquired()); preparedInsertPayloadStatement.setString(5, payload.getMime_type()); // The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers. @@ -359,7 +337,7 @@ public class UrlController { try { // We use a "PreparedStatement" to do insertions, for security reasons. preparedInsertAttemptStatement.setString(1, payload.getId()); preparedInsertAttemptStatement.setString(2, payload.getOriginal_url()); - preparedInsertAttemptStatement.setDate(3, payload.getDate_acquired()); + preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired()); preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString()); preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null". preparedInsertAttemptStatement.setString(6, error.getMessage()); @@ -476,7 +454,7 @@ public class UrlController { HashMultimap loadedIdUrlPairs; boolean isFirstRun = true; boolean assignmentsLimitReached = false; - Date date = new Date(System.currentTimeMillis()); // Store it here, in order to have the same for all current records. + Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records. // Start loading urls. while ( true ) { @@ -497,7 +475,7 @@ public class UrlController { } int randomNum = GenericUtils.getRandomNumber(1, 5); - assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, date)); + assignments.add(new Assignment(pair.getKey(), pair.getValue(), new Datasource("ID_" + randomNum, "NAME_" + randomNum), workerId, timestamp)); }// end pairs-for-loop if ( assignmentsLimitReached ) { diff --git a/src/main/java/eu/openaire/urls_controller/models/Assignment.java b/src/main/java/eu/openaire/urls_controller/models/Assignment.java index 46d6d59..b002f6d 100644 --- a/src/main/java/eu/openaire/urls_controller/models/Assignment.java +++ b/src/main/java/eu/openaire/urls_controller/models/Assignment.java @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import java.util.Date; +import java.sql.Timestamp; @JsonInclude(JsonInclude.Include.NON_NULL) @@ -13,7 +13,7 @@ import java.util.Date; "original_url", "datasource", "workerId", - "date" + "timestamp" }) public class Assignment { @@ -29,19 +29,18 @@ public class Assignment { @JsonProperty("workerid") private String workerId; - @JsonProperty("date") - private Date date; + @JsonProperty("timestamp") + private Timestamp timestamp; public Assignment() {} - - public Assignment(String id, String originalUrl, Datasource datasource, String workerId, Date date) { + public Assignment(String id, String originalUrl, Datasource datasource, String workerId, Timestamp timestamp) { this.id = id; this.originalUrl = originalUrl; this.datasource = datasource; this.workerId = workerId; - this.date = date; + this.timestamp = timestamp; } @@ -77,12 +76,12 @@ public class Assignment { this.workerId = workerId; } - public Date getDate() { - return date; + public Timestamp getTimestamp() { + return timestamp; } - public void setDate(Date date) { - this.date = date; + public void setTimestamp(Timestamp timestamp) { + this.timestamp = timestamp; } @Override @@ -92,7 +91,7 @@ public class Assignment { ", originalUrl='" + originalUrl + '\'' + ", datasource=" + datasource + ", workerId='" + workerId + '\'' + - ", date=" + date + + ", timestamp=" + timestamp + '}'; } } diff --git a/src/main/java/eu/openaire/urls_controller/models/Payload.java b/src/main/java/eu/openaire/urls_controller/models/Payload.java index ca887ad..f87aa84 100644 --- a/src/main/java/eu/openaire/urls_controller/models/Payload.java +++ b/src/main/java/eu/openaire/urls_controller/models/Payload.java @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import java.sql.Date; +import java.sql.Timestamp; @JsonInclude(JsonInclude.Include.NON_NULL) @@ -12,7 +12,7 @@ import java.sql.Date; "id", "original_url", "actual_url", - "date", + "timestamp_acquired", "mime_type", "size", "hash", @@ -30,8 +30,8 @@ public class Payload { @JsonProperty("actual_url") private String actual_url; - @JsonProperty("date") - private Date date_acquired; + @JsonProperty("timestamp_acquired") + private Timestamp timestamp_acquired; @JsonProperty("mime_type") private String mime_type; @@ -50,11 +50,11 @@ public class Payload { public Payload() {} - public Payload(String id, String original_url, String actual_url, Date date_acquired, String mime_type, Long size, String hash, String location, String provenance) { + public Payload(String id, String original_url, String actual_url, Timestamp timestamp_acquired, String mime_type, Long size, String hash, String location, String provenance) { this.id = id; this.original_url = original_url; this.actual_url = actual_url; - this.date_acquired = date_acquired; + this.timestamp_acquired = timestamp_acquired; this.mime_type = mime_type; this.size = size; this.hash = hash; @@ -86,12 +86,12 @@ public class Payload { this.actual_url = actual_url; } - public Date getDate_acquired() { - return date_acquired; + public Timestamp getTimestamp_acquired() { + return timestamp_acquired; } - public void setDate_acquired(Date date_acquired) { - this.date_acquired = date_acquired; + public void setTimestamp_acquired(Timestamp timestamp_acquired) { + this.timestamp_acquired = timestamp_acquired; } public String getMime_type() { @@ -140,7 +140,7 @@ public class Payload { "id='" + id + '\'' + ", original_url='" + original_url + '\'' + ", actual_url='" + actual_url + '\'' + - ", date_acquired='" + date_acquired + '\'' + + ", timestamp_acquired='" + timestamp_acquired + '\'' + ", mime_type='" + mime_type + '\'' + ", size='" + size + '\'' + ", md5='" + hash + '\'' +