@ -9,7 +9,6 @@ import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.ControllerConstants ;
import eu.openaire.urls_controller.util.FileUtils ;
import eu.openaire.urls_controller.util.GenericUtils ;
import org.apache.commons.lang3.StringUtils ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.http.HttpStatus ;
@ -118,7 +117,7 @@ public class UrlController {
computeCurrentAssignmentsStatsPreparedStatement . execute ( ) ;
} catch ( SQLException sqle ) {
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
errorMsg = ImpalaConnector . handlePreparedStatementException ( "computeCurrentAssignmentsStatsQuery" , computeCurrentAssignmentsStatsQuery , "computeCurrentAssignmentsStatsPreparedStatement" , computeCurrentAssignmentsStatsPreparedStatement , con , sqle ) ;
@ -137,7 +136,7 @@ public class UrlController {
getAssignmentsPreparedStatement = con . prepareStatement ( getAssignmentsQuery ) ;
} catch ( SQLException sqle ) {
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
errorMsg = ImpalaConnector . handlePreparedStatementException ( "getAssignmentsQuery" , getAssignmentsQuery , "getAssignmentsPreparedStatement" , getAssignmentsPreparedStatement , con , sqle ) ;
@ -153,6 +152,7 @@ public class UrlController {
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId ;
logger . error ( errorMsg ) ;
ImpalaConnector . closeConnection ( con ) ;
return ResponseEntity . status ( HttpStatus . NO_CONTENT ) . body ( errorMsg ) ;
} * /
@ -177,7 +177,7 @@ public class UrlController {
}
} catch ( Exception e ) {
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n" ;
@ -195,7 +195,7 @@ public class UrlController {
int assignmentsSize = assignments . size ( ) ;
if ( assignmentsSize = = 0 ) {
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId ;
@ -219,7 +219,7 @@ public class UrlController {
insertAssignmentsPreparedStatement . execute ( ) ;
} catch ( SQLException sqle ) {
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
errorMsg = ImpalaConnector . handlePreparedStatementException ( "insertAssignmentsQuery" , insertAssignmentsQuery , "insertAssignmentsPreparedStatement" , insertAssignmentsPreparedStatement , con , sqle ) ;
@ -234,7 +234,7 @@ public class UrlController {
}
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
logger . debug ( "Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table." ) ;
@ -288,40 +288,40 @@ public class UrlController {
long curReportAssignments = workerReport . getAssignmentRequestCounter ( ) ;
logger . info ( "Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports . size ( ) + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database." ) ;
ImpalaConnector . databaseLock . lock ( ) ;
Connection con = ImpalaConnector . getInstance ( ) . getConnection ( ) ;
if ( con = = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( "Problem when connecting with the Impala-database!" ) ;
}
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
if ( ! FileUtils . getAndUploadFullTexts ( urlReports , con , request , curReportAssignments , curWorkerId ) ) {
FileUtils . UploadFullTextsResponse uploadFullTextsResponse = FileUtils . getAndUploadFullTexts ( urlReports , request , curReportAssignments , curWorkerId ) ;
if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . databaseError ) {
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( "Problem with the Impala-database!" ) ;
}
else if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . unsuccessful ) {
logger . error ( "Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments ) ;
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
FileUtils . updateUrlReportsToHaveNoFullTextFiles ( urlReports ) ;
}
// The "databaseLock" was unlocked inside the "FileUtils.getAndUploadFullTexts" to avoid blocking the database while doing large irrelevant tasks like transferring files.
ImpalaConnector . databaseLock . lock ( ) ;
Connection con = ImpalaConnector . getInstance ( ) . getConnection ( ) ;
if ( con = = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( "Problem when connecting with the Impala-database!" ) ;
}
// Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector . databaseName + ".payload (id, original_url, actual_url, `date`, mimetype, size, `hash`, `location`, provenance) VALUES " ;
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector . databaseName + ".attempt (id, original_url, `date`, status, error_class, error_message) VALUES " ;
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector . databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector . databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?) ";
String tempInsertQueryName = null ;
Statement insertPayloadStatement = null , i nsertAttemptStatement = null ;
PreparedStatement preparedInsertPayloadStatement = null , preparedI nsertAttemptStatement = null ;
try {
tempInsertQueryName = "insertIntoPayloadBaseQuery" ;
insertPayloadStatement = con . createStatement ( ) ;
preparedInsertPayloadStatement = con . prepareStatement ( insertIntoPayloadBaseQuery ) ;
tempInsertQueryName = "insertIntoAttemptBaseQuery" ;
insertAttemptStatement = con . createStatement ( ) ;
preparedInsertAttemptStatement = con . prepareStatement ( insertIntoAttemptBaseQuery ) ;
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "Problem when creating the statement for \"" + tempInsertQueryName + "\"!\n" ;
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n" ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
closeStatements ( insertPayloadStatement, i nsertAttemptStatement, con ) ;
closeStatements ( preparedInsertPayloadStatement, preparedI nsertAttemptStatement, con ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
@ -331,7 +331,7 @@ public class UrlController {
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n" ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
closeStatements ( insertPayloadStatement, i nsertAttemptStatement, con ) ;
closeStatements ( preparedInsertPayloadStatement, preparedI nsertAttemptStatement, con ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
@ -350,20 +350,26 @@ public class UrlController {
continue ;
}
try {
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
preparedInsertPayloadStatement . setString ( 1 , payload . getId ( ) ) ;
preparedInsertPayloadStatement . setString ( 2 , payload . getOriginal_url ( ) ) ;
preparedInsertPayloadStatement . setString ( 3 , payload . getActual_url ( ) ) ;
preparedInsertPayloadStatement . setTimestamp ( 4 , payload . getTimestamp_acquired ( ) ) ;
preparedInsertPayloadStatement . setString ( 5 , payload . getMime_type ( ) ) ;
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
String stringSize = null ;
Long size = payload . getSize ( ) ;
if ( size ! = null )
stringSize = String . valueOf ( size ) ;
String insertIntoPayloadFullQuery = insertIntoPayloadBaseQuery + "('" + payload . getId ( ) + "','" + payload . getOriginal_url ( ) + "','" + payload . getActual_url ( ) + "','"
+ payload . getTimestamp_acquired ( ) + "','" + payload . getMime_type ( ) + "','" + stringSize + "','" + payload . getHash ( ) + "','"
+ payload . getLocation ( ) + "','" + payload . getProvenance ( ) + "')" ;
insertPayloadStatement. execute ( insertIntoPayloadFullQuery ) ;
preparedInsertPayloadStatement. setString ( 6 , stringSize ) ;
preparedInsertPayloadStatement . setString ( 7 , payload . getHash ( ) ) ;
preparedInsertPayloadStatement . setString ( 8 , payload . getLocation ( ) ) ;
preparedInsertPayloadStatement . setString ( 9 , payload . getProvenance ( ) ) ;
preparedInsertPayloadStatement. executeUpdate ( ) ;
} catch ( SQLException sqle ) {
logger . error ( "Problem when executing the \"insertIntoPayload Full Query\": " + sqle . getMessage ( ) + "\n\n" ) ;
logger . error ( "Problem when executing the \"insertIntoPayload Base Query\": " + sqle . getMessage ( ) + "\n\n" ) ;
}
Error error = urlReport . getError ( ) ;
@ -372,15 +378,14 @@ public class UrlController {
error = new Error ( null , null ) ;
}
try {
String errorCause = error . getMessage ( ) ;
if ( errorCause ! = null )
errorCause = StringUtils . replace ( errorCause , "'" , "\\'" , - 1 ) ; // Escape single quotes in the error-cause-message.
String insertIntoAttemptFullQuery = insertIntoAttemptBaseQuery + "('" + payload . getId ( ) + "','" + payload . getOriginal_url ( ) + "','"
+ payload . getTimestamp_acquired ( ) + "','" + urlReport . getStatus ( ) . toString ( ) + "','" + error . getType ( ) + "','" + errorCause + "')" ;
insertAttemptStatement . execute ( insertIntoAttemptFullQuery ) ;
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
preparedInsertAttemptStatement . setString ( 1 , payload . getId ( ) ) ;
preparedInsertAttemptStatement . setString ( 2 , payload . getOriginal_url ( ) ) ;
preparedInsertAttemptStatement . setTimestamp ( 3 , payload . getTimestamp_acquired ( ) ) ;
preparedInsertAttemptStatement . setString ( 4 , urlReport . getStatus ( ) . toString ( ) ) ;
preparedInsertAttemptStatement . setString ( 5 , String . valueOf ( error . getType ( ) ) ) ; // This covers the case of "null".
preparedInsertAttemptStatement . setString ( 6 , error . getMessage ( ) ) ;
preparedInsertAttemptStatement . executeUpdate ( ) ;
} catch ( SQLException sqle ) {
logger . error ( "Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle . getMessage ( ) + "\n\n" ) ;
}
@ -390,11 +395,12 @@ public class UrlController {
con . commit ( ) ; // Commit all the insert-queries to the database (write them to disk).
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . closeConnection ( con ) ;
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!" ;
logger . error ( errorMsg + "\n" + sqle . getMessage ( ) ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} finally {
closeStatements ( insertPayloadStatement, i nsertAttemptStatement, null ) ; // Do not close the connection here !
closeStatements ( preparedInsertPayloadStatement, preparedI nsertAttemptStatement, null ) ; // Do not close the connection here , as we might move forward.
}
logger . debug ( "Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables." ) ;