Compare commits

...

2 Commits

  1. 90
      src/main/java/eu/openaire/urls_controller/controllers/UrlController.java
  2. 32
      src/main/java/eu/openaire/urls_controller/util/FileUtils.java

90
src/main/java/eu/openaire/urls_controller/controllers/UrlController.java

@ -9,7 +9,6 @@ import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.ControllerConstants;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
@ -118,7 +117,7 @@ public class UrlController {
computeCurrentAssignmentsStatsPreparedStatement.execute();
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle);
@ -137,7 +136,7 @@ public class UrlController {
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle);
@ -153,6 +152,7 @@ public class UrlController {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg);
ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
}*/
@ -177,7 +177,7 @@ public class UrlController {
}
} catch (Exception e) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
@ -195,7 +195,7 @@ public class UrlController {
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId;
@ -219,7 +219,7 @@ public class UrlController {
insertAssignmentsPreparedStatement.execute();
} catch (SQLException sqle) {
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle);
@ -234,7 +234,7 @@ public class UrlController {
}
String errorMsg = dropCurrentAssignmentTable(con);
if ( errorMsg != null )
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
@ -288,40 +288,40 @@ public class UrlController {
long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
if ( ! FileUtils.getAndUploadFullTexts(urlReports, con, request, curReportAssignments, curWorkerId) ) {
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
}
// The "databaseLock" was unlocked inside the "FileUtils.getAndUploadFullTexts" to avoid blocking the database while doing large irrelevant tasks like transferring files.
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
// Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, `date`, mimetype, size, `hash`, `location`, provenance) VALUES ";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, `date`, status, error_class, error_message) VALUES ";
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
String tempInsertQueryName = null;
Statement insertPayloadStatement = null, insertAttemptStatement = null;
PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null;
try {
tempInsertQueryName = "insertIntoPayloadBaseQuery";
insertPayloadStatement = con.createStatement();
preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery);
tempInsertQueryName = "insertIntoAttemptBaseQuery";
insertAttemptStatement = con.createStatement();
preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when creating the statement for \"" + tempInsertQueryName + "\"!\n";
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n";
logger.error(errorMsg + sqle.getMessage());
closeStatements(insertPayloadStatement, insertAttemptStatement, con);
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
@ -331,7 +331,7 @@ public class UrlController {
ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n";
logger.error(errorMsg + sqle.getMessage());
closeStatements(insertPayloadStatement, insertAttemptStatement, con);
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
@ -350,20 +350,26 @@ public class UrlController {
continue;
}
try {
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
preparedInsertPayloadStatement.setString(1, payload.getId());
preparedInsertPayloadStatement.setString(2, payload.getOriginal_url());
preparedInsertPayloadStatement.setString(3, payload.getActual_url());
preparedInsertPayloadStatement.setTimestamp(4, payload.getTimestamp_acquired());
preparedInsertPayloadStatement.setString(5, payload.getMime_type());
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
String stringSize = null;
Long size = payload.getSize();
if ( size != null )
stringSize = String.valueOf(size);
String insertIntoPayloadFullQuery = insertIntoPayloadBaseQuery + "('" + payload.getId() + "','" + payload.getOriginal_url() + "','" + payload.getActual_url() + "','"
+ payload.getTimestamp_acquired() + "','" + payload.getMime_type() + "','" + stringSize + "','" + payload.getHash() + "','"
+ payload.getLocation() + "','" + payload.getProvenance() + "')";
insertPayloadStatement.execute(insertIntoPayloadFullQuery);
preparedInsertPayloadStatement.setString(6, stringSize);
preparedInsertPayloadStatement.setString(7, payload.getHash());
preparedInsertPayloadStatement.setString(8, payload.getLocation());
preparedInsertPayloadStatement.setString(9, payload.getProvenance());
preparedInsertPayloadStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoPayloadFullQuery\": " + sqle.getMessage() + "\n\n");
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + sqle.getMessage() + "\n\n");
}
Error error = urlReport.getError();
@ -372,15 +378,14 @@ public class UrlController {
error = new Error(null, null);
}
try {
String errorCause = error.getMessage();
if ( errorCause != null )
errorCause = StringUtils.replace(errorCause, "'", "\\'", -1); // Escape single quotes in the error-cause-message.
String insertIntoAttemptFullQuery = insertIntoAttemptBaseQuery + "('" + payload.getId() + "','" + payload.getOriginal_url() + "','"
+ payload.getTimestamp_acquired() + "','" + urlReport.getStatus().toString() + "','" + error.getType() + "','" + errorCause + "')";
insertAttemptStatement.execute(insertIntoAttemptFullQuery);
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
preparedInsertAttemptStatement.setString(1, payload.getId());
preparedInsertAttemptStatement.setString(2, payload.getOriginal_url());
preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired());
preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString());
preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null".
preparedInsertAttemptStatement.setString(6, error.getMessage());
preparedInsertAttemptStatement.executeUpdate();
} catch (SQLException sqle) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage() + "\n\n");
}
@ -390,11 +395,12 @@ public class UrlController {
con.commit(); // Commit all the insert-queries to the database (write them to disk).
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
ImpalaConnector.closeConnection(con);
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
logger.error(errorMsg + "\n" + sqle.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
closeStatements(insertPayloadStatement, insertAttemptStatement, null); // Do not close the connection here!
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here, as we might move forward.
}
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");

32
src/main/java/eu/openaire/urls_controller/util/FileUtils.java

@ -106,25 +106,33 @@ public class FileUtils {
}
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError};
private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator;
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public static boolean getAndUploadFullTexts(List<UrlReport> urlReports, Connection con, HttpServletRequest request, long assignmentsBatchCounter, String workerId)
public static UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId)
{
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!");
ImpalaConnector.databaseLock.unlock();
return false;
return UploadFullTextsResponse.unsuccessful;
}
String remoteAddr = request.getHeader("X-FORWARDED-FOR");
if ( remoteAddr == null || "".equals(remoteAddr) )
remoteAddr = request.getRemoteAddr();
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the Impala-connection!");
return UploadFullTextsResponse.databaseError;
}
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ;
PreparedStatement getFileLocationForHashPreparedStatement = null;
try {
@ -132,7 +140,7 @@ public class FileUtils {
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the prepared statement for \"" + getFileLocationForHashQuery + "\"!\n" + sqle.getMessage());
return false;
return UploadFullTextsResponse.databaseError;
}
// Get the file-locations.
@ -177,6 +185,11 @@ public class FileUtils {
}
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage());
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - Since the database will have problems.. there is not point in trying to insert the payloads to Impala (handling it like we tried to insert and got an error).
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not autoclosed here)and the Database connection.
}
}
@ -205,6 +218,7 @@ public class FileUtils {
logger.error("Failed to close the \"getFileLocationForHashPreparedStatement\"!\n" + sqle.getMessage());
} finally {
ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database.
ImpalaConnector.closeConnection(con);
}
logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ").");
@ -213,8 +227,8 @@ public class FileUtils {
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithIDsHashMap.keySet());
int numAllFullTexts = allFileNames.size();
if ( numAllFullTexts == 0 ) {
logger.warn("The file retrieved by the Worker where < 0 > for assignments_" + assignmentsBatchCounter);
return true; // It was handled, no error.
logger.warn("The retrieved files where < 0 > for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return UploadFullTextsResponse.successful; // It was handled, no error.
}
// Request the full-texts in batches, compressed in zip.
@ -322,10 +336,10 @@ public class FileUtils {
// Check if none of the batches were handled..
if ( failedBatches == numOfBatches ) {
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return false;
return UploadFullTextsResponse.unsuccessful;
} else {
replaceNotUploadedFileLocations(urlReports); // Make sure all records without an s3Url have null file-data.
return true;
replaceNotUploadedFileLocations(urlReports); // Make sure all records without an s3Url have < null > file-data.
return UploadFullTextsResponse.successful;
}
}

Loading…
Cancel
Save