|
|
|
@ -9,7 +9,6 @@ import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
|
|
|
|
|
import eu.openaire.urls_controller.util.ControllerConstants;
|
|
|
|
|
import eu.openaire.urls_controller.util.FileUtils;
|
|
|
|
|
import eu.openaire.urls_controller.util.GenericUtils;
|
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
import org.springframework.http.HttpStatus;
|
|
|
|
@ -118,7 +117,7 @@ public class UrlController {
|
|
|
|
|
computeCurrentAssignmentsStatsPreparedStatement.execute();
|
|
|
|
|
} catch (SQLException sqle) {
|
|
|
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
|
|
|
if ( errorMsg != null )
|
|
|
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
|
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle);
|
|
|
|
@ -137,7 +136,7 @@ public class UrlController {
|
|
|
|
|
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
|
|
|
|
|
} catch (SQLException sqle) {
|
|
|
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
|
|
|
if ( errorMsg != null )
|
|
|
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
|
errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle);
|
|
|
|
@ -153,6 +152,7 @@ public class UrlController {
|
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
|
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
|
|
|
|
|
logger.error(errorMsg);
|
|
|
|
|
ImpalaConnector.closeConnection(con);
|
|
|
|
|
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
|
|
|
|
|
}*/
|
|
|
|
|
|
|
|
|
@ -177,7 +177,7 @@ public class UrlController {
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
|
|
|
if ( errorMsg != null )
|
|
|
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
|
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
|
|
|
|
@ -195,7 +195,7 @@ public class UrlController {
|
|
|
|
|
int assignmentsSize = assignments.size();
|
|
|
|
|
if ( assignmentsSize == 0 ) {
|
|
|
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
|
|
|
if ( errorMsg != null )
|
|
|
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
|
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId;
|
|
|
|
@ -219,7 +219,7 @@ public class UrlController {
|
|
|
|
|
insertAssignmentsPreparedStatement.execute();
|
|
|
|
|
} catch (SQLException sqle) {
|
|
|
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
|
|
|
if ( errorMsg != null )
|
|
|
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
|
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle);
|
|
|
|
@ -234,7 +234,7 @@ public class UrlController {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String errorMsg = dropCurrentAssignmentTable(con);
|
|
|
|
|
if ( errorMsg != null )
|
|
|
|
|
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
|
|
|
|
|
|
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
|
|
|
|
@ -288,40 +288,40 @@ public class UrlController {
|
|
|
|
|
long curReportAssignments = workerReport.getAssignmentRequestCounter();
|
|
|
|
|
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
|
|
|
|
|
|
|
|
|
|
ImpalaConnector.databaseLock.lock();
|
|
|
|
|
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
|
|
|
|
|
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
|
|
|
|
|
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
|
|
|
|
|
}
|
|
|
|
|
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
|
|
|
|
|
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
|
|
|
|
|
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
|
|
|
|
|
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ImpalaConnector.databaseLock.lock();
|
|
|
|
|
Connection con = ImpalaConnector.getInstance().getConnection();
|
|
|
|
|
if ( con == null ) {
|
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
|
|
|
|
|
if ( ! FileUtils.getAndUploadFullTexts(urlReports, con, request, curReportAssignments, curWorkerId) ) {
|
|
|
|
|
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
|
|
|
|
|
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
|
|
|
|
|
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The "databaseLock" was unlocked inside the "FileUtils.getAndUploadFullTexts" to avoid blocking the database while doing large irrelevant tasks like transferring files.
|
|
|
|
|
ImpalaConnector.databaseLock.lock();
|
|
|
|
|
|
|
|
|
|
// Store the workerReport into the database.
|
|
|
|
|
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, `date`, mimetype, size, `hash`, `location`, provenance) VALUES ";
|
|
|
|
|
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, `date`, status, error_class, error_message) VALUES ";
|
|
|
|
|
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
|
|
|
|
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
|
|
|
|
|
|
|
|
|
|
String tempInsertQueryName = null;
|
|
|
|
|
Statement insertPayloadStatement = null, insertAttemptStatement = null;
|
|
|
|
|
PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null;
|
|
|
|
|
try {
|
|
|
|
|
tempInsertQueryName = "insertIntoPayloadBaseQuery";
|
|
|
|
|
insertPayloadStatement = con.createStatement();
|
|
|
|
|
preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery);
|
|
|
|
|
tempInsertQueryName = "insertIntoAttemptBaseQuery";
|
|
|
|
|
insertAttemptStatement = con.createStatement();
|
|
|
|
|
preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery);
|
|
|
|
|
} catch (SQLException sqle) {
|
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
|
String errorMsg = "Problem when creating the statement for \"" + tempInsertQueryName + "\"!\n";
|
|
|
|
|
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n";
|
|
|
|
|
logger.error(errorMsg + sqle.getMessage());
|
|
|
|
|
closeStatements(insertPayloadStatement, insertAttemptStatement, con);
|
|
|
|
|
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -331,7 +331,7 @@ public class UrlController {
|
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
|
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n";
|
|
|
|
|
logger.error(errorMsg + sqle.getMessage());
|
|
|
|
|
closeStatements(insertPayloadStatement, insertAttemptStatement, con);
|
|
|
|
|
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -350,20 +350,26 @@ public class UrlController {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
|
|
|
|
|
preparedInsertPayloadStatement.setString(1, payload.getId());
|
|
|
|
|
preparedInsertPayloadStatement.setString(2, payload.getOriginal_url());
|
|
|
|
|
preparedInsertPayloadStatement.setString(3, payload.getActual_url());
|
|
|
|
|
preparedInsertPayloadStatement.setTimestamp(4, payload.getTimestamp_acquired());
|
|
|
|
|
preparedInsertPayloadStatement.setString(5, payload.getMime_type());
|
|
|
|
|
|
|
|
|
|
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
|
|
|
|
|
String stringSize = null;
|
|
|
|
|
Long size = payload.getSize();
|
|
|
|
|
if ( size != null )
|
|
|
|
|
stringSize = String.valueOf(size);
|
|
|
|
|
|
|
|
|
|
String insertIntoPayloadFullQuery = insertIntoPayloadBaseQuery + "('" + payload.getId() + "','" + payload.getOriginal_url() + "','" + payload.getActual_url() + "','"
|
|
|
|
|
+ payload.getTimestamp_acquired() + "','" + payload.getMime_type() + "','" + stringSize + "','" + payload.getHash() + "','"
|
|
|
|
|
+ payload.getLocation() + "','" + payload.getProvenance() + "')";
|
|
|
|
|
|
|
|
|
|
insertPayloadStatement.execute(insertIntoPayloadFullQuery);
|
|
|
|
|
preparedInsertPayloadStatement.setString(6, stringSize);
|
|
|
|
|
preparedInsertPayloadStatement.setString(7, payload.getHash());
|
|
|
|
|
preparedInsertPayloadStatement.setString(8, payload.getLocation());
|
|
|
|
|
preparedInsertPayloadStatement.setString(9, payload.getProvenance());
|
|
|
|
|
preparedInsertPayloadStatement.executeUpdate();
|
|
|
|
|
} catch (SQLException sqle) {
|
|
|
|
|
logger.error("Problem when executing the \"insertIntoPayloadFullQuery\": " + sqle.getMessage() + "\n\n");
|
|
|
|
|
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + sqle.getMessage() + "\n\n");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Error error = urlReport.getError();
|
|
|
|
@ -372,15 +378,14 @@ public class UrlController {
|
|
|
|
|
error = new Error(null, null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
String errorCause = error.getMessage();
|
|
|
|
|
if ( errorCause != null )
|
|
|
|
|
errorCause = StringUtils.replace(errorCause, "'", "\\'", -1); // Escape single quotes in the error-cause-message.
|
|
|
|
|
|
|
|
|
|
String insertIntoAttemptFullQuery = insertIntoAttemptBaseQuery + "('" + payload.getId() + "','" + payload.getOriginal_url() + "','"
|
|
|
|
|
+ payload.getTimestamp_acquired() + "','" + urlReport.getStatus().toString() + "','" + error.getType() + "','" + errorCause + "')";
|
|
|
|
|
|
|
|
|
|
insertAttemptStatement.execute(insertIntoAttemptFullQuery);
|
|
|
|
|
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
|
|
|
|
|
preparedInsertAttemptStatement.setString(1, payload.getId());
|
|
|
|
|
preparedInsertAttemptStatement.setString(2, payload.getOriginal_url());
|
|
|
|
|
preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired());
|
|
|
|
|
preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString());
|
|
|
|
|
preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null".
|
|
|
|
|
preparedInsertAttemptStatement.setString(6, error.getMessage());
|
|
|
|
|
preparedInsertAttemptStatement.executeUpdate();
|
|
|
|
|
} catch (SQLException sqle) {
|
|
|
|
|
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage() + "\n\n");
|
|
|
|
|
}
|
|
|
|
@ -390,11 +395,12 @@ public class UrlController {
|
|
|
|
|
con.commit(); // Commit all the insert-queries to the database (write them to disk).
|
|
|
|
|
} catch (SQLException sqle) {
|
|
|
|
|
ImpalaConnector.databaseLock.unlock();
|
|
|
|
|
ImpalaConnector.closeConnection(con);
|
|
|
|
|
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
|
|
|
|
|
logger.error(errorMsg + "\n" + sqle.getMessage());
|
|
|
|
|
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
|
|
|
|
} finally {
|
|
|
|
|
closeStatements(insertPayloadStatement, insertAttemptStatement, null); // Do not close the connection here!
|
|
|
|
|
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here, as we might move forward.
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
|
|
|
|
|