@ -9,7 +9,6 @@ import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.ControllerConstants ;
import eu.openaire.urls_controller.util.ControllerConstants ;
import eu.openaire.urls_controller.util.FileUtils ;
import eu.openaire.urls_controller.util.FileUtils ;
import eu.openaire.urls_controller.util.GenericUtils ;
import eu.openaire.urls_controller.util.GenericUtils ;
import org.apache.commons.lang3.StringUtils ;
import org.slf4j.Logger ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.slf4j.LoggerFactory ;
import org.springframework.http.HttpStatus ;
import org.springframework.http.HttpStatus ;
@ -118,7 +117,7 @@ public class UrlController {
computeCurrentAssignmentsStatsPreparedStatement . execute ( ) ;
computeCurrentAssignmentsStatsPreparedStatement . execute ( ) ;
} catch ( SQLException sqle ) {
} catch ( SQLException sqle ) {
String errorMsg = dropCurrentAssignmentTable ( con ) ;
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
errorMsg = ImpalaConnector . handlePreparedStatementException ( "computeCurrentAssignmentsStatsQuery" , computeCurrentAssignmentsStatsQuery , "computeCurrentAssignmentsStatsPreparedStatement" , computeCurrentAssignmentsStatsPreparedStatement , con , sqle ) ;
errorMsg = ImpalaConnector . handlePreparedStatementException ( "computeCurrentAssignmentsStatsQuery" , computeCurrentAssignmentsStatsQuery , "computeCurrentAssignmentsStatsPreparedStatement" , computeCurrentAssignmentsStatsPreparedStatement , con , sqle ) ;
@ -137,7 +136,7 @@ public class UrlController {
getAssignmentsPreparedStatement = con . prepareStatement ( getAssignmentsQuery ) ;
getAssignmentsPreparedStatement = con . prepareStatement ( getAssignmentsQuery ) ;
} catch ( SQLException sqle ) {
} catch ( SQLException sqle ) {
String errorMsg = dropCurrentAssignmentTable ( con ) ;
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
errorMsg = ImpalaConnector . handlePreparedStatementException ( "getAssignmentsQuery" , getAssignmentsQuery , "getAssignmentsPreparedStatement" , getAssignmentsPreparedStatement , con , sqle ) ;
errorMsg = ImpalaConnector . handlePreparedStatementException ( "getAssignmentsQuery" , getAssignmentsQuery , "getAssignmentsPreparedStatement" , getAssignmentsPreparedStatement , con , sqle ) ;
@ -153,6 +152,7 @@ public class UrlController {
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId ;
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId ;
logger . error ( errorMsg ) ;
logger . error ( errorMsg ) ;
ImpalaConnector . closeConnection ( con ) ;
return ResponseEntity . status ( HttpStatus . NO_CONTENT ) . body ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . NO_CONTENT ) . body ( errorMsg ) ;
} * /
} * /
@ -177,7 +177,7 @@ public class UrlController {
}
}
} catch ( Exception e ) {
} catch ( Exception e ) {
String errorMsg = dropCurrentAssignmentTable ( con ) ;
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n" ;
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n" ;
@ -195,7 +195,7 @@ public class UrlController {
int assignmentsSize = assignments . size ( ) ;
int assignmentsSize = assignments . size ( ) ;
if ( assignmentsSize = = 0 ) {
if ( assignmentsSize = = 0 ) {
String errorMsg = dropCurrentAssignmentTable ( con ) ;
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId ;
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId ;
@ -219,7 +219,7 @@ public class UrlController {
insertAssignmentsPreparedStatement . execute ( ) ;
insertAssignmentsPreparedStatement . execute ( ) ;
} catch ( SQLException sqle ) {
} catch ( SQLException sqle ) {
String errorMsg = dropCurrentAssignmentTable ( con ) ;
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
errorMsg = ImpalaConnector . handlePreparedStatementException ( "insertAssignmentsQuery" , insertAssignmentsQuery , "insertAssignmentsPreparedStatement" , insertAssignmentsPreparedStatement , con , sqle ) ;
errorMsg = ImpalaConnector . handlePreparedStatementException ( "insertAssignmentsQuery" , insertAssignmentsQuery , "insertAssignmentsPreparedStatement" , insertAssignmentsPreparedStatement , con , sqle ) ;
@ -234,7 +234,7 @@ public class UrlController {
}
}
String errorMsg = dropCurrentAssignmentTable ( con ) ;
String errorMsg = dropCurrentAssignmentTable ( con ) ;
if ( errorMsg ! = null )
if ( errorMsg ! = null ) // The "databaseLock" is already unlocked.
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
logger . debug ( "Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table." ) ;
logger . debug ( "Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table." ) ;
@ -288,40 +288,40 @@ public class UrlController {
long curReportAssignments = workerReport . getAssignmentRequestCounter ( ) ;
long curReportAssignments = workerReport . getAssignmentRequestCounter ( ) ;
logger . info ( "Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports . size ( ) + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database." ) ;
logger . info ( "Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports . size ( ) + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database." ) ;
ImpalaConnector . databaseLock . lock ( ) ;
Connection con = ImpalaConnector . getInstance ( ) . getConnection ( ) ;
if ( con = = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( "Problem when connecting with the Impala-database!" ) ;
}
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
if ( ! FileUtils . getAndUploadFullTexts ( urlReports , con , request , curReportAssignments , curWorkerId ) ) {
FileUtils . UploadFullTextsResponse uploadFullTextsResponse = FileUtils . getAndUploadFullTexts ( urlReports , request , curReportAssignments , curWorkerId ) ;
if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . databaseError ) {
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( "Problem with the Impala-database!" ) ;
}
else if ( uploadFullTextsResponse = = FileUtils . UploadFullTextsResponse . unsuccessful ) {
logger . error ( "Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments ) ;
logger . error ( "Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments ) ;
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
FileUtils . updateUrlReportsToHaveNoFullTextFiles ( urlReports ) ;
FileUtils . updateUrlReportsToHaveNoFullTextFiles ( urlReports ) ;
}
}
// The "databaseLock" was unlocked inside the "FileUtils.getAndUploadFullTexts" to avoid blocking the database while doing large irrelevant tasks like transferring files.
ImpalaConnector . databaseLock . lock ( ) ;
ImpalaConnector . databaseLock . lock ( ) ;
Connection con = ImpalaConnector . getInstance ( ) . getConnection ( ) ;
if ( con = = null ) {
ImpalaConnector . databaseLock . unlock ( ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( "Problem when connecting with the Impala-database!" ) ;
}
// Store the workerReport into the database.
// Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector . databaseName + ".payload (id, original_url, actual_url, `date`, mimetype, size, `hash`, `location`, provenance) VALUES " ;
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector . databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector . databaseName + ".attempt (id, original_url, `date`, status, error_class, error_message) VALUES " ;
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector . databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?) ";
String tempInsertQueryName = null ;
String tempInsertQueryName = null ;
Statement insertPayloadStatement = null , i nsertAttemptStatement = null ;
PreparedStatement preparedInsertPayloadStatement = null , preparedI nsertAttemptStatement = null ;
try {
try {
tempInsertQueryName = "insertIntoPayloadBaseQuery" ;
tempInsertQueryName = "insertIntoPayloadBaseQuery" ;
insertPayloadStatement = con . createStatement ( ) ;
preparedInsertPayloadStatement = con . prepareStatement ( insertIntoPayloadBaseQuery ) ;
tempInsertQueryName = "insertIntoAttemptBaseQuery" ;
tempInsertQueryName = "insertIntoAttemptBaseQuery" ;
insertAttemptStatement = con . createStatement ( ) ;
preparedInsertAttemptStatement = con . prepareStatement ( insertIntoAttemptBaseQuery ) ;
} catch ( SQLException sqle ) {
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "Problem when creating the statement for \"" + tempInsertQueryName + "\"!\n" ;
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n" ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
closeStatements ( insertPayloadStatement, i nsertAttemptStatement, con ) ;
closeStatements ( preparedInsertPayloadStatement, preparedI nsertAttemptStatement, con ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
}
@ -331,7 +331,7 @@ public class UrlController {
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n" ;
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n" ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
closeStatements ( insertPayloadStatement, i nsertAttemptStatement, con ) ;
closeStatements ( preparedInsertPayloadStatement, preparedI nsertAttemptStatement, con ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
}
}
@ -350,20 +350,26 @@ public class UrlController {
continue ;
continue ;
}
}
try {
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
preparedInsertPayloadStatement . setString ( 1 , payload . getId ( ) ) ;
preparedInsertPayloadStatement . setString ( 2 , payload . getOriginal_url ( ) ) ;
preparedInsertPayloadStatement . setString ( 3 , payload . getActual_url ( ) ) ;
preparedInsertPayloadStatement . setTimestamp ( 4 , payload . getTimestamp_acquired ( ) ) ;
preparedInsertPayloadStatement . setString ( 5 , payload . getMime_type ( ) ) ;
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
String stringSize = null ;
String stringSize = null ;
Long size = payload . getSize ( ) ;
Long size = payload . getSize ( ) ;
if ( size ! = null )
if ( size ! = null )
stringSize = String . valueOf ( size ) ;
stringSize = String . valueOf ( size ) ;
String insertIntoPayloadFullQuery = insertIntoPayloadBaseQuery + "('" + payload . getId ( ) + "','" + payload . getOriginal_url ( ) + "','" + payload . getActual_url ( ) + "','"
preparedInsertPayloadStatement. setString ( 6 , stringSize ) ;
+ payload . getTimestamp_acquired ( ) + "','" + payload . getMime_type ( ) + "','" + stringSize + "','" + payload . getHash ( ) + "','"
preparedInsertPayloadStatement . setString ( 7 , payload . getHash ( ) ) ;
+ payload . getLocation ( ) + "','" + payload . getProvenance ( ) + "')" ;
preparedInsertPayloadStatement . setString ( 8 , payload . getLocation ( ) ) ;
preparedInsertPayloadStatement . setString ( 9 , payload . getProvenance ( ) ) ;
insertPayloadStatement. execute ( insertIntoPayloadFullQuery ) ;
preparedInsertPayloadStatement. executeUpdate ( ) ;
} catch ( SQLException sqle ) {
} catch ( SQLException sqle ) {
logger . error ( "Problem when executing the \"insertIntoPayload Full Query\": " + sqle . getMessage ( ) + "\n\n" ) ;
logger . error ( "Problem when executing the \"insertIntoPayload Base Query\": " + sqle . getMessage ( ) + "\n\n" ) ;
}
}
Error error = urlReport . getError ( ) ;
Error error = urlReport . getError ( ) ;
@ -372,15 +378,14 @@ public class UrlController {
error = new Error ( null , null ) ;
error = new Error ( null , null ) ;
}
}
try {
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
String errorCause = error . getMessage ( ) ;
preparedInsertAttemptStatement . setString ( 1 , payload . getId ( ) ) ;
if ( errorCause ! = null )
preparedInsertAttemptStatement . setString ( 2 , payload . getOriginal_url ( ) ) ;
errorCause = StringUtils . replace ( errorCause , "'" , "\\'" , - 1 ) ; // Escape single quotes in the error-cause-message.
preparedInsertAttemptStatement . setTimestamp ( 3 , payload . getTimestamp_acquired ( ) ) ;
preparedInsertAttemptStatement . setString ( 4 , urlReport . getStatus ( ) . toString ( ) ) ;
String insertIntoAttemptFullQuery = insertIntoAttemptBaseQuery + "('" + payload . getId ( ) + "','" + payload . getOriginal_url ( ) + "','"
preparedInsertAttemptStatement . setString ( 5 , String . valueOf ( error . getType ( ) ) ) ; // This covers the case of "null".
+ payload . getTimestamp_acquired ( ) + "','" + urlReport . getStatus ( ) . toString ( ) + "','" + error . getType ( ) + "','" + errorCause + "')" ;
preparedInsertAttemptStatement . setString ( 6 , error . getMessage ( ) ) ;
preparedInsertAttemptStatement . executeUpdate ( ) ;
insertAttemptStatement . execute ( insertIntoAttemptFullQuery ) ;
} catch ( SQLException sqle ) {
} catch ( SQLException sqle ) {
logger . error ( "Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle . getMessage ( ) + "\n\n" ) ;
logger . error ( "Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle . getMessage ( ) + "\n\n" ) ;
}
}
@ -390,11 +395,12 @@ public class UrlController {
con . commit ( ) ; // Commit all the insert-queries to the database (write them to disk).
con . commit ( ) ; // Commit all the insert-queries to the database (write them to disk).
} catch ( SQLException sqle ) {
} catch ( SQLException sqle ) {
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . databaseLock . unlock ( ) ;
ImpalaConnector . closeConnection ( con ) ;
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!" ;
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!" ;
logger . error ( errorMsg + "\n" + sqle . getMessage ( ) ) ;
logger . error ( errorMsg + "\n" + sqle . getMessage ( ) ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
return ResponseEntity . status ( HttpStatus . INTERNAL_SERVER_ERROR ) . body ( errorMsg ) ;
} finally {
} finally {
closeStatements ( insertPayloadStatement, i nsertAttemptStatement, null ) ; // Do not close the connection here !
closeStatements ( preparedInsertPayloadStatement, preparedI nsertAttemptStatement, null ) ; // Do not close the connection here , as we might move forward.
}
}
logger . debug ( "Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables." ) ;
logger . debug ( "Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables." ) ;