Compare commits
No commits in common. "83f40a23d94a0f414a886dbff82a16c846fc6c52" and "82bf11b9b39a37f333c4e19b8b805d5d77277d40" have entirely different histories.
83f40a23d9
...
82bf11b9b3
|
@ -9,6 +9,7 @@ import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
|
|||
import eu.openaire.urls_controller.util.ControllerConstants;
|
||||
import eu.openaire.urls_controller.util.FileUtils;
|
||||
import eu.openaire.urls_controller.util.GenericUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
@ -117,7 +118,7 @@ public class UrlController {
|
|||
computeCurrentAssignmentsStatsPreparedStatement.execute();
|
||||
} catch (SQLException sqle) {
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
if ( errorMsg != null )
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, "computeCurrentAssignmentsStatsPreparedStatement", computeCurrentAssignmentsStatsPreparedStatement, con, sqle);
|
||||
|
@ -136,7 +137,7 @@ public class UrlController {
|
|||
getAssignmentsPreparedStatement = con.prepareStatement(getAssignmentsQuery);
|
||||
} catch (SQLException sqle) {
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
if ( errorMsg != null )
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
errorMsg = ImpalaConnector.handlePreparedStatementException("getAssignmentsQuery", getAssignmentsQuery, "getAssignmentsPreparedStatement", getAssignmentsPreparedStatement, con, sqle);
|
||||
|
@ -152,7 +153,6 @@ public class UrlController {
|
|||
ImpalaConnector.databaseLock.unlock();
|
||||
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
|
||||
logger.error(errorMsg);
|
||||
ImpalaConnector.closeConnection(con);
|
||||
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
|
||||
}*/
|
||||
|
||||
|
@ -177,7 +177,7 @@ public class UrlController {
|
|||
}
|
||||
} catch (Exception e) {
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
if ( errorMsg != null )
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
|
||||
|
@ -195,7 +195,7 @@ public class UrlController {
|
|||
int assignmentsSize = assignments.size();
|
||||
if ( assignmentsSize == 0 ) {
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
if ( errorMsg != null )
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId;
|
||||
|
@ -219,7 +219,7 @@ public class UrlController {
|
|||
insertAssignmentsPreparedStatement.execute();
|
||||
} catch (SQLException sqle) {
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
if ( errorMsg != null )
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, "insertAssignmentsPreparedStatement", insertAssignmentsPreparedStatement, con, sqle);
|
||||
|
@ -234,7 +234,7 @@ public class UrlController {
|
|||
}
|
||||
|
||||
String errorMsg = dropCurrentAssignmentTable(con);
|
||||
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
|
||||
if ( errorMsg != null )
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
|
||||
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
|
||||
|
@ -288,40 +288,40 @@ public class UrlController {
|
|||
long curReportAssignments = workerReport.getAssignmentRequestCounter();
|
||||
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
|
||||
|
||||
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
|
||||
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId);
|
||||
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem with the Impala-database!");
|
||||
}
|
||||
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
|
||||
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
|
||||
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
|
||||
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
|
||||
}
|
||||
|
||||
ImpalaConnector.databaseLock.lock();
|
||||
|
||||
Connection con = ImpalaConnector.getInstance().getConnection();
|
||||
if ( con == null ) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
|
||||
}
|
||||
|
||||
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
|
||||
if ( ! FileUtils.getAndUploadFullTexts(urlReports, con, request, curReportAssignments, curWorkerId) ) {
|
||||
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
|
||||
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
|
||||
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
|
||||
}
|
||||
|
||||
// The "databaseLock" was unlocked inside the "FileUtils.getAndUploadFullTexts" to avoid blocking the database while doing large irrelevant tasks like transferring files.
|
||||
ImpalaConnector.databaseLock.lock();
|
||||
|
||||
// Store the workerReport into the database.
|
||||
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
||||
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
|
||||
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, `date`, mimetype, size, `hash`, `location`, provenance) VALUES ";
|
||||
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, `date`, status, error_class, error_message) VALUES ";
|
||||
|
||||
String tempInsertQueryName = null;
|
||||
PreparedStatement preparedInsertPayloadStatement = null, preparedInsertAttemptStatement = null;
|
||||
Statement insertPayloadStatement = null, insertAttemptStatement = null;
|
||||
try {
|
||||
tempInsertQueryName = "insertIntoPayloadBaseQuery";
|
||||
preparedInsertPayloadStatement = con.prepareStatement(insertIntoPayloadBaseQuery);
|
||||
insertPayloadStatement = con.createStatement();
|
||||
tempInsertQueryName = "insertIntoAttemptBaseQuery";
|
||||
preparedInsertAttemptStatement = con.prepareStatement(insertIntoAttemptBaseQuery);
|
||||
insertAttemptStatement = con.createStatement();
|
||||
} catch (SQLException sqle) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
String errorMsg = "Problem when creating the prepared statement for \"" + tempInsertQueryName + "\"!\n";
|
||||
String errorMsg = "Problem when creating the statement for \"" + tempInsertQueryName + "\"!\n";
|
||||
logger.error(errorMsg + sqle.getMessage());
|
||||
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
|
||||
closeStatements(insertPayloadStatement, insertAttemptStatement, con);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
}
|
||||
|
||||
|
@ -331,7 +331,7 @@ public class UrlController {
|
|||
ImpalaConnector.databaseLock.unlock();
|
||||
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n";
|
||||
logger.error(errorMsg + sqle.getMessage());
|
||||
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
|
||||
closeStatements(insertPayloadStatement, insertAttemptStatement, con);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
}
|
||||
|
||||
|
@ -350,26 +350,20 @@ public class UrlController {
|
|||
continue;
|
||||
}
|
||||
|
||||
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
|
||||
preparedInsertPayloadStatement.setString(1, payload.getId());
|
||||
preparedInsertPayloadStatement.setString(2, payload.getOriginal_url());
|
||||
preparedInsertPayloadStatement.setString(3, payload.getActual_url());
|
||||
preparedInsertPayloadStatement.setTimestamp(4, payload.getTimestamp_acquired());
|
||||
preparedInsertPayloadStatement.setString(5, payload.getMime_type());
|
||||
|
||||
try {
|
||||
// The column "size" in the table is of type "String" so we cast the Long to String. The Parquet-format in the database does not work well with integers.
|
||||
String stringSize = null;
|
||||
Long size = payload.getSize();
|
||||
if ( size != null )
|
||||
stringSize = String.valueOf(size);
|
||||
|
||||
preparedInsertPayloadStatement.setString(6, stringSize);
|
||||
preparedInsertPayloadStatement.setString(7, payload.getHash());
|
||||
preparedInsertPayloadStatement.setString(8, payload.getLocation());
|
||||
preparedInsertPayloadStatement.setString(9, payload.getProvenance());
|
||||
preparedInsertPayloadStatement.executeUpdate();
|
||||
String insertIntoPayloadFullQuery = insertIntoPayloadBaseQuery + "('" + payload.getId() + "','" + payload.getOriginal_url() + "','" + payload.getActual_url() + "','"
|
||||
+ payload.getTimestamp_acquired() + "','" + payload.getMime_type() + "','" + stringSize + "','" + payload.getHash() + "','"
|
||||
+ payload.getLocation() + "','" + payload.getProvenance() + "')";
|
||||
|
||||
insertPayloadStatement.execute(insertIntoPayloadFullQuery);
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": " + sqle.getMessage() + "\n\n");
|
||||
logger.error("Problem when executing the \"insertIntoPayloadFullQuery\": " + sqle.getMessage() + "\n\n");
|
||||
}
|
||||
|
||||
Error error = urlReport.getError();
|
||||
|
@ -378,14 +372,15 @@ public class UrlController {
|
|||
error = new Error(null, null);
|
||||
}
|
||||
|
||||
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
|
||||
preparedInsertAttemptStatement.setString(1, payload.getId());
|
||||
preparedInsertAttemptStatement.setString(2, payload.getOriginal_url());
|
||||
preparedInsertAttemptStatement.setTimestamp(3, payload.getTimestamp_acquired());
|
||||
preparedInsertAttemptStatement.setString(4, urlReport.getStatus().toString());
|
||||
preparedInsertAttemptStatement.setString(5, String.valueOf(error.getType())); // This covers the case of "null".
|
||||
preparedInsertAttemptStatement.setString(6, error.getMessage());
|
||||
preparedInsertAttemptStatement.executeUpdate();
|
||||
try {
|
||||
String errorCause = error.getMessage();
|
||||
if ( errorCause != null )
|
||||
errorCause = StringUtils.replace(errorCause, "'", "\\'", -1); // Escape single quotes in the error-cause-message.
|
||||
|
||||
String insertIntoAttemptFullQuery = insertIntoAttemptBaseQuery + "('" + payload.getId() + "','" + payload.getOriginal_url() + "','"
|
||||
+ payload.getTimestamp_acquired() + "','" + urlReport.getStatus().toString() + "','" + error.getType() + "','" + errorCause + "')";
|
||||
|
||||
insertAttemptStatement.execute(insertIntoAttemptFullQuery);
|
||||
} catch (SQLException sqle) {
|
||||
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage() + "\n\n");
|
||||
}
|
||||
|
@ -395,12 +390,11 @@ public class UrlController {
|
|||
con.commit(); // Commit all the insert-queries to the database (write them to disk).
|
||||
} catch (SQLException sqle) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
ImpalaConnector.closeConnection(con);
|
||||
String errorMsg = "Problem when committing changes to the database or when setting Connection.AutoCommit to \"true\"!";
|
||||
logger.error(errorMsg + "\n" + sqle.getMessage());
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
} finally {
|
||||
closeStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, null); // Do not close the connection here, as we might move forward.
|
||||
closeStatements(insertPayloadStatement, insertAttemptStatement, null); // Do not close the connection here!
|
||||
}
|
||||
|
||||
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables. Going to merge the parquet files for those tables.");
|
||||
|
|
|
@ -106,33 +106,25 @@ public class FileUtils {
|
|||
}
|
||||
|
||||
|
||||
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError};
|
||||
private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
|
||||
private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
|
||||
public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator;
|
||||
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
|
||||
|
||||
public static UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId)
|
||||
public static boolean getAndUploadFullTexts(List<UrlReport> urlReports, Connection con, HttpServletRequest request, long assignmentsBatchCounter, String workerId)
|
||||
{
|
||||
// The Controller have to request the files from the Worker, in order to upload them to the S3.
|
||||
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
|
||||
|
||||
if ( request == null ) {
|
||||
logger.error("The \"HttpServletRequest\" is null!");
|
||||
return UploadFullTextsResponse.unsuccessful;
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
return false;
|
||||
}
|
||||
String remoteAddr = request.getHeader("X-FORWARDED-FOR");
|
||||
if ( remoteAddr == null || "".equals(remoteAddr) )
|
||||
remoteAddr = request.getRemoteAddr();
|
||||
|
||||
ImpalaConnector.databaseLock.lock();
|
||||
Connection con = ImpalaConnector.getInstance().getConnection();
|
||||
if ( con == null ) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
logger.error("Problem when creating the Impala-connection!");
|
||||
return UploadFullTextsResponse.databaseError;
|
||||
}
|
||||
|
||||
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ;
|
||||
PreparedStatement getFileLocationForHashPreparedStatement = null;
|
||||
try {
|
||||
|
@ -140,7 +132,7 @@ public class FileUtils {
|
|||
} catch (SQLException sqle) {
|
||||
ImpalaConnector.databaseLock.unlock();
|
||||
logger.error("Problem when creating the prepared statement for \"" + getFileLocationForHashQuery + "\"!\n" + sqle.getMessage());
|
||||
return UploadFullTextsResponse.databaseError;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Get the file-locations.
|
||||
|
@ -185,11 +177,6 @@ public class FileUtils {
|
|||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage());
|
||||
|
||||
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
|
||||
// TODO - Since the database will have problems.. there is not point in trying to insert the payloads to Impala (handling it like we tried to insert and got an error).
|
||||
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not autoclosed here)and the Database connection.
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -218,7 +205,6 @@ public class FileUtils {
|
|||
logger.error("Failed to close the \"getFileLocationForHashPreparedStatement\"!\n" + sqle.getMessage());
|
||||
} finally {
|
||||
ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database.
|
||||
ImpalaConnector.closeConnection(con);
|
||||
}
|
||||
|
||||
logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ").");
|
||||
|
@ -227,8 +213,8 @@ public class FileUtils {
|
|||
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithIDsHashMap.keySet());
|
||||
int numAllFullTexts = allFileNames.size();
|
||||
if ( numAllFullTexts == 0 ) {
|
||||
logger.warn("The retrieved files where < 0 > for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
|
||||
return UploadFullTextsResponse.successful; // It was handled, no error.
|
||||
logger.warn("The file retrieved by the Worker where < 0 > for assignments_" + assignmentsBatchCounter);
|
||||
return true; // It was handled, no error.
|
||||
}
|
||||
|
||||
// Request the full-texts in batches, compressed in zip.
|
||||
|
@ -336,10 +322,10 @@ public class FileUtils {
|
|||
// Check if none of the batches were handled..
|
||||
if ( failedBatches == numOfBatches ) {
|
||||
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
|
||||
return UploadFullTextsResponse.unsuccessful;
|
||||
return false;
|
||||
} else {
|
||||
replaceNotUploadedFileLocations(urlReports); // Make sure all records without an s3Url have < null > file-data.
|
||||
return UploadFullTextsResponse.successful;
|
||||
replaceNotUploadedFileLocations(urlReports); // Make sure all records without an s3Url have null file-data.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue