@ -16,7 +16,6 @@ import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest ;
import java.sql.* ;
import java.sql.Date ;
import java.util.* ;
import java.util.concurrent.atomic.AtomicLong ;
@ -47,8 +46,7 @@ public class UrlController {
assignmentsLimit = ControllerConstants . ASSIGNMENTS_LIMIT ;
}
String getAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
"from " + ImpalaConnector . databaseName + ".publication p\n" +
@ -59,25 +57,16 @@ public class UrlController {
" select a.id, a.original_url from " + ImpalaConnector . databaseName + ".assignment a\n" +
" union all\n" +
" select pl.id, pl.original_url from " + ImpalaConnector . databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= ? and not exists (select 1 from " + ImpalaConnector . databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry')\n" +
" ) as non_distinct_results\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + ControllerConstants . MAX_ATTEMPTS_PER_RECORD + " and not exists (select 1 from " + ImpalaConnector . databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry')\n" +
" limit " + ( assignmentsLimit * 10 ) + " ) as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit ?) as get AssignmentsQuery";
"limit " + assignmentsLimit + ") as find AssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
// TODO - If we add more limits it could be faster.. Inner queries could have a limit of e.g. < assignmentsLimit ^ 2 >
// The LIMIT of < assignmentsLimit > should be kept in the end, as we want 10_000 of distinct results.
// This is just for tests without the attempts, payloads and the assignments
/ * String getAssignmentsQuery = "select * from (select distinct pubid, url, datasourceid, datasourcetype from (\n" +
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype\n" +
"from " + ImpalaConnector . databaseName + ".publication p\n" +
"join " + ImpalaConnector . databaseName + ".publication_urls pu on pu.id=p.id\n" +
"join " + ImpalaConnector . databaseName + ".datasource d on d.id=p.datasourceid\n" +
"where d.allow_harvest=true " +
"order by reverse(p.id), pu.url) as distinct_results\n" +
"limit ? ) as getAssignmentsQuery" ; * /
String createAssignmentsQuery = "create table " + ImpalaConnector . databaseName + ".current_assignment as \n" + findAssignmentsQuery ;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector . databaseName + ".current_assignment" ;
String getAssignmentsQuery = "select * from " + ImpalaConnector . databaseName + ".current_assignment" ;
List < Assignment > assignments = new ArrayList < > ( assignmentsLimit ) ;
@ -89,30 +78,51 @@ public class UrlController {
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( "Problem when connecting with the Impala-database!" ) ;
}
PreparedStatement ge tAssignmentsPreparedStatement = null ;
PreparedStatement createCurren tAssignmentsPreparedStatement = null ;
try {
getAssignmentsPreparedStatement = con . prepareStatement ( getAssignmentsQuery ) ;
getAssignmentsPreparedStatement . setInt ( 1 , ControllerConstants . MAX_ATTEMPTS_PER_RECORD ) ;
getAssignmentsPreparedStatement . setInt ( 2 , assignmentsLimit ) ;
createCurrentAssignmentsPreparedStatement = con . prepareStatement ( createAssignmentsQuery ) ;
createCurrentAssignmentsPreparedStatement . execute ( ) ;
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "Problem when creating the prepared statement for \"getAssignmentsQuery\"!\n" ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
String errorMsg = ImpalaConnector . handlePreparedStatementException ( "createAssignmentsQuery" , createAssignmentsQuery , "createCurrentAssignmentsPreparedStatement" , createCurrentAssignmentsPreparedStatement , con , sqle ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} finally {
try {
if ( ge tAssignmentsPreparedStatement ! = null )
ge tAssignmentsPreparedStatement. close ( ) ;
if ( createCurren tAssignmentsPreparedStatement ! = null )
createCurren tAssignmentsPreparedStatement. close ( ) ;
} catch ( SQLException sqle2 ) {
logger . error ( " Could not close the \"getAssignmentsPreparedStatement\". \n" + sqle2 . getMessage ( ) ) ;
logger . error ( " Failed to close the \"createCurrentAssignmentsPreparedStatement\"! \n" + sqle2 . getMessage ( ) ) ;
}
}
PreparedStatement computeCurrentAssignmentsStatsPreparedStatement = null ;
try {
computeCurrentAssignmentsStatsPreparedStatement = con . prepareStatement ( computeCurrentAssignmentsStatsQuery ) ;
computeCurrentAssignmentsStatsPreparedStatement . execute ( ) ;
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = ImpalaConnector . handlePreparedStatementException ( "computeCurrentAssignmentsStatsQuery" , computeCurrentAssignmentsStatsQuery , "computeCurrentAssignmentsStatsPreparedStatement" , computeCurrentAssignmentsStatsPreparedStatement , con , sqle ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} finally {
try {
con . close ( ) ;
if ( computeCurrentAssignmentsStatsPreparedStatement ! = null )
computeCurrentAssignmentsStatsPreparedStatement . close ( ) ;
} catch ( SQLException sqle2 ) {
logger . error ( "Could not close the connection with the Impala-database.\n" + sqle2 . getMessage ( ) ) ;
logger . error ( " Failed to close the \"computeCurrentAssignmentsStatsPreparedStatement\"! \n" + sqle2 . getMessage ( ) ) ;
}
}
PreparedStatement getAssignmentsPreparedStatement = null ;
try {
getAssignmentsPreparedStatement = con . prepareStatement ( getAssignmentsQuery ) ;
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = ImpalaConnector . handlePreparedStatementException ( "getAssignmentsQuery" , getAssignmentsQuery , "getAssignmentsPreparedStatement" , getAssignmentsPreparedStatement , con , sqle ) ;
// The "getAssignmentsPreparedStatement" will always be null here, so we do not close it.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
Date date = new Date ( System . currentTimeMillis ( ) ) ; // Store it here, in order to have the same for all current records.
Timestamp timestamp = new Timestamp ( System . currentTimeMillis ( ) ) ; // Store it here, in order to have the same for all current records.
try ( ResultSet resultSet = getAssignmentsPreparedStatement . executeQuery ( ) ) {
// Unfortunately, we cannot use the following as the used version of the Impala-driver does not support it.
@ -128,7 +138,7 @@ public class UrlController {
// The following few lines, cannot be outside the "while" loop, since the same record is returned, despite that we update the inner-values.
Assignment assignment = new Assignment ( ) ;
assignment . setWorkerId ( workerId ) ;
assignment . set Date( date ) ;
assignment . set Timestamp( timestamp ) ;
Datasource datasource = new Datasource ( ) ;
try { // For each of the 4 columns returned. The indexing starts from 1
assignment . setId ( resultSet . getString ( 1 ) ) ;
@ -159,7 +169,7 @@ public class UrlController {
int assignmentsSize = assignments . size ( ) ;
if ( assignmentsSize = = 0 ) {
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "No results retrieved from the \" get AssignmentsQuery\" for worker with id: " + workerId ;
String errorMsg = "No results retrieved from the \" find AssignmentsQuery\" for worker with id: " + workerId ;
logger . error ( errorMsg ) ;
ImpalaConnector . closeConnection ( con ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
@ -167,68 +177,45 @@ public class UrlController {
logger . debug ( "Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker." ) ;
// The following is a test of inserting multiple rows with a singme insert-query. If applied with a preparedStatement, then the JDBC fails with "OutOfMemory"-Error.
/ * String testInsert = "INSERT INTO assignment (id,original_url,workerid,`date`) VALUES ( 'doiboost____::4e8b1f12ac3ba5a9d8fbff9872000000', 'http://dx.doi.org/10.17267/2596-3368dentistry.v6i2.586', 'worker_1', CAST('2021-10-01' AS TIMESTAMP) ) , ( 'doiboost____::4e8b1f12ac3ba5a9d8fbff9872000000', 'https://academic.microsoft.com/#/detail/2887540302', 'worker_1', CAST('2021-10-01' AS TIMESTAMP) );" ;
try ( Statement insertStatement = con . createStatement ( ) ) {
insertStatement . execute ( testInsert ) ;
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
String mergeErrorMsg = "Problem when executing the testInsert statement for \"" + testInsert + "\"" ;
logger . error ( mergeErrorMsg + sqle . getMessage ( ) ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
} * /
// Write the Assignment details to the database and then send it to the worker.
String insertIntoAssignmentBaseQuery = "INSERT INTO " + ImpalaConnector . databaseName + ".assignment (id, original_url, workerid, date) VALUES (?, ?, ?, ?)" ;
// Write the Assignment details to the assignment-table.
String insertAssignmentsQuery = "insert into " + ImpalaConnector . databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
+ "from (\n select pubid, url from " + ImpalaConnector . databaseName + ".current_assignment) as pub_data" ;
PreparedStatement preparedInsertAssignmentStatement ;
try { // We use a "PreparedStatement" to do insertions, for security and performance reasons.
preparedInsertAssignmentStatement = con . prepareStatement ( insertIntoAssignmentBaseQuery ) ;
PreparedStatement insertAssignmentsPreparedStatement = null ;
try {
insertAssignmentsPreparedStatement = con . prepareStatement ( insertAssignmentsQuery ) ;
insertAssignmentsPreparedStatement . execute ( ) ;
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "Problem when creating the prepared statement for \"insertIntoAssignmentBaseQuery\"!\n" ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
String errorMsg = ImpalaConnector . handlePreparedStatementException ( "insertAssignmentsQuery" , insertAssignmentsQuery , "insertAssignmentsPreparedStatement" , insertAssignmentsPreparedStatement , con , sqle ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} finally {
try {
con . close ( ) ;
if ( insertAssignmentsPreparedStatement ! = null )
insertAssignmentsPreparedStatement . close ( ) ;
} catch ( SQLException sqle2 ) {
logger . error ( " Could not close the connection with the Impala-database. \n" + sqle2 . getMessage ( ) ) ;
logger . error ( " Failed to close the \"insertAssignmentsPreparedStatement\"! \n" + sqle2 . getMessage ( ) ) ;
}
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
// Before, we wanted to execute the getAssignmentQuery and take the assignments immediately, but now it's more efficient to commit all the inserts in the end.
String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector . databaseName + ".current_assignment PURGE" ;
PreparedStatement dropCurrentAssignmentsPreparedStatement = null ;
try {
con . setAutoCommit ( false ) ;
} catch ( SQLException sqle ) { // There is a database-error. The latter actions will probably fail as well.
dropCurrentAssignmentsPreparedStatement = con . prepareStatement ( dropCurrentAssignmentsQuery ) ;
dropCurrentAssignmentsPreparedStatement . execute ( ) ;
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!" ;
logger . error ( errorMsg + "\n" + sqle . getMessage ( ) ) ;
closePreparedStatements ( preparedInsertAssignmentStatement , null , con ) ;
String errorMsg = ImpalaConnector . handlePreparedStatementException ( "dropCurrentAssignmentsQuery" , dropCurrentAssignmentsQuery , "dropCurrentAssignmentsPreparedStatement" , dropCurrentAssignmentsPreparedStatement , con , sqle ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
String tempFullQueryString = null ;
for ( Assignment assignment : assignments ) {
} finally {
try {
preparedInsertAssignmentStatement . setString ( 1 , assignment . getId ( ) ) ;
preparedInsertAssignmentStatement . setString ( 2 , assignment . getOriginalUrl ( ) ) ;
preparedInsertAssignmentStatement . setString ( 3 , workerId ) ;
preparedInsertAssignmentStatement . setDate ( 4 , date ) ;
tempFullQueryString = getAssignmentsPreparedStatement . toString ( ) ;
preparedInsertAssignmentStatement . executeUpdate ( ) ;
} catch ( SQLException sqle ) {
logger . error ( "Problem when executing the \"insertIntoAssignmentQuery\":\n" + tempFullQueryString + "\n" + sqle . getMessage ( ) + "\n\n" ) ;
if ( dropCurrentAssignmentsPreparedStatement ! = null )
dropCurrentAssignmentsPreparedStatement . close ( ) ;
} catch ( SQLException sqle2 ) {
logger . error ( "Failed to close the \"dropCurrentAssignmentsPreparedStatement\"!\n" + sqle2 . getMessage ( ) ) ;
}
} //end for-loop
try {
con . commit ( ) ; // Send all the insert-queries to the database.
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "Problem when committing changes to the database!" ;
logger . error ( errorMsg + "\n" + sqle . getMessage ( ) ) ;
closePreparedStatements ( preparedInsertAssignmentStatement , null , con ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
logger . debug ( "Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table." ) ;
@ -236,21 +223,12 @@ public class UrlController {
String mergeErrorMsg = FileUtils . mergeParquetFiles ( "assignment" , con ) ;
if ( mergeErrorMsg ! = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
closePreparedStatements( preparedInsertAssignmentStatement , null , con ) ;
ImpalaConnector. closeConnection ( con ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( mergeErrorMsg ) ;
}
try {
con . commit ( ) ; // Apply the merge.
con . setAutoCommit ( true ) ; // Restore the "auto-commit" value for this connection of the pool.
} catch ( SQLException sqle ) {
String errorMsg = "Problem when committing changes to the database!" ;
logger . error ( errorMsg , sqle ) ; //+ "\n" + sqle.getMessage());
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} finally {
ImpalaConnector . databaseLock . unlock ( ) ;
closePreparedStatements ( preparedInsertAssignmentStatement , null , con ) ;
}
ImpalaConnector . closeConnection ( con ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
logger . info ( "Sending batch-assignments_" + assignmentsBatchCounter . incrementAndGet ( ) + " with " + assignmentsSize + " assignments to worker with ID: " + workerId + "." ) ;
return ResponseEntity . status ( HttpStatus . OK ) . body ( new AssignmentsResponse ( assignmentsBatchCounter . get ( ) , assignments ) ) ;
@ -331,7 +309,7 @@ public class UrlController {
preparedInsertPayloadStatement . setString ( 1 , payload . getId ( ) ) ;
preparedInsertPayloadStatement . setString ( 2 , payload . getOriginal_url ( ) ) ;
preparedInsertPayloadStatement . setString ( 3 , payload . getActual_url ( ) ) ;
preparedInsertPayloadStatement . set Date( 4 , payload . getDate _acquired( ) ) ;
preparedInsertPayloadStatement . set Timestamp( 4 , payload . getTimestamp _acquired( ) ) ;
preparedInsertPayloadStatement . setString ( 5 , payload . getMime_type ( ) ) ;
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
@ -359,7 +337,7 @@ public class UrlController {
try { // We use a "PreparedStatement" to do insertions, for security reasons.
preparedInsertAttemptStatement . setString ( 1 , payload . getId ( ) ) ;
preparedInsertAttemptStatement . setString ( 2 , payload . getOriginal_url ( ) ) ;
preparedInsertAttemptStatement . set Date( 3 , payload . getDate _acquired( ) ) ;
preparedInsertAttemptStatement . set Timestamp( 3 , payload . getTimestamp _acquired( ) ) ;
preparedInsertAttemptStatement . setString ( 4 , urlReport . getStatus ( ) . toString ( ) ) ;
preparedInsertAttemptStatement . setString ( 5 , String . valueOf ( error . getType ( ) ) ) ; // This covers the case of "null".
preparedInsertAttemptStatement . setString ( 6 , error . getMessage ( ) ) ;
@ -476,7 +454,7 @@ public class UrlController {
HashMultimap < String , String > loadedIdUrlPairs ;
boolean isFirstRun = true ;
boolean assignmentsLimitReached = false ;
Date date = new Date ( System . currentTimeMillis ( ) ) ; // Store it here, in order to have the same for all current records.
Timestamp timestamp = new Timestamp ( System . currentTimeMillis ( ) ) ; // Store it here, in order to have the same for all current records.
// Start loading urls.
while ( true ) {
@ -497,7 +475,7 @@ public class UrlController {
}
int randomNum = GenericUtils . getRandomNumber ( 1 , 5 ) ;
assignments . add ( new Assignment ( pair . getKey ( ) , pair . getValue ( ) , new Datasource ( "ID_" + randomNum , "NAME_" + randomNum ) , workerId , da te) ) ;
assignments . add ( new Assignment ( pair . getKey ( ) , pair . getValue ( ) , new Datasource ( "ID_" + randomNum , "NAME_" + randomNum ) , workerId , tim estamp ) ) ;
} // end pairs-for-loop
if ( assignmentsLimitReached ) {