UrlsController/src/main/java/eu/openaire/urls_controller/util/FileUtils.java

429 lines
22 KiB
Java

package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Types;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Component
public class FileUtils {
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private S3ObjectStore s3ObjectStore;
@Autowired
private FileUnZipper fileUnZipper;
@Value("${services.pdfaggregation.controller.db.databaseName}")
private String databaseName;
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}
/**
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
* Renames the clone to the original's name.
* Returns the errorMsg, if an error appears, otherwise is returns "null".
* */
public String mergeParquetFiles(String tableName, String whereClause, String parameter) {
String errorMsg;
if ( (tableName == null) || tableName.isEmpty() ) {
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
logger.error(errorMsg);
return errorMsg;
}
// Make sure the following are empty strings (in case another method call this one in the future with a null-value).
if ( whereClause == null )
whereClause = "";
if ( parameter == null )
parameter = "";
else
parameter = " '" + parameter + "'"; // This will be a "string-check", thus the single-quotes.
try {
jdbcTemplate.execute("CREATE TABLE " + databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + databaseName + "." + tableName + " " + whereClause + parameter);
jdbcTemplate.execute("DROP TABLE " + databaseName + "." + tableName + " PURGE");
jdbcTemplate.execute("ALTER TABLE " + databaseName + "." + tableName + "_tmp RENAME TO " + databaseName + "." + tableName);
jdbcTemplate.execute("COMPUTE STATS " + databaseName + "." + tableName);
} catch (DataAccessException e) {
errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
logger.error(errorMsg, e);
return errorMsg;
}
return null; // No errorMsg, everything is fine.
}
@Value("services.pdfaggregation.controller.baseTargetLocation")
private String baseTargetLocation;
public static DecimalFormat df = new DecimalFormat("0.00");
private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:()]+\\.[\\w]{2,10})$");
private final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) {
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!");
return UploadFullTextsResponse.unsuccessful;
}
String remoteAddr = request.getHeader("X-FORWARDED-FOR");
if ( remoteAddr == null || "".equals(remoteAddr) )
remoteAddr = request.getRemoteAddr();
// Get the file-locations.
int numFullTextsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
int urlReportsSize = urlReports.size();
HashMultimap<String, Payload> allFileNamesWithPayloads = HashMultimap.create((urlReportsSize / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
String getFileLocationForHashQuery = "select `location` from " + databaseName + ".payload where `hash` = ? limit 1" ;
final int[] hashArgType = new int[] {Types.VARCHAR};
ImpalaConnector.databaseLock.lock();
for ( UrlReport urlReport : urlReports )
{
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
String fileLocation = payload.getLocation();
if ( fileLocation == null )
continue; // The full-text was not retrieved, go to the next UrlReport.
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
String fileHash = payload.getHash();
if ( fileHash != null ) {
String alreadyFoundFileLocation = null;
try {
alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, hashArgType, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n", e);
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection.
// Unless we do what it is said above, do not continue to the next UrlReport, this query-exception should not disrupt the normal full-text processing.
}
if ( alreadyFoundFileLocation != null ) { // If the full-text of this record is already-found and uploaded.
payload.setLocation(alreadyFoundFileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
numFullTextsFound ++;
continue; // Do not request the file from the worker, it's already uploaded. Move on.
}
}
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
continue;
}
String fileNameWithExtension = matcher.group(1);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
continue;
}
numFullTextsFound ++;
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}// end-for
ImpalaConnector.databaseLock.unlock(); // The remaining work of this function does not use the database.
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextsFound + " (out of " + urlReportsSize + " | about " + df.format(numFullTextsFound * 100.0 / urlReportsSize) + "%).");
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet());
int numAllFullTexts = allFileNames.size();
if ( numAllFullTexts == 0 ) {
logger.warn("The retrieved files where < 0 > for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return UploadFullTextsResponse.successful; // It was handled, no error.
}
// Request the full-texts in batches, compressed in zip.
int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch);
if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches ++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches.");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
String curAssignmentsBaseLocation = baseTargetLocation + "assignments_" + assignmentsBatchCounter + File.separator;
File curAssignmentsBaseDir = new File(curAssignmentsBaseLocation);
int failedBatches = 0;
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter ) {
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId);
if ( conn == null ) {
failedBatches ++;
continue; // To the next batch.
}
// Get the extracted files.
String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator;
try {
// Create this batch-directory.
Path curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// Unzip the file. Iterate over the PDFs and upload each one of them and get the S3-Url
String zipFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".zip";
File zipFile = new File(zipFileFullPath);
if ( ! saveZipFile(conn, zipFile) ) {
failedBatches ++;
continue; // To the next batch.
}
//logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG!
fileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
String[] fileNames = new File(targetDirectory).list();
if ( (fileNames == null) || (fileNames.length <= 1 ) ) { // The directory might have only one file, the "zip-file".
logger.error("No full-text fileNames where extracted from directory: " + targetDirectory);
failedBatches ++;
continue; // To the next batch.
}
// Iterate over the files and upload them to S3.
int numUploadedFiles = 0;
for ( String fileName : fileNames )
{
String fileFullPath = targetDirectory + fileName;
if ( fileFullPath.equals(zipFileFullPath) ) // Exclude the zip-file from uploading.
continue;
// Check if this stored file is related to one or more Payloads from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedPayloads".
Set<Payload> fileRelatedPayloads = allFileNamesWithPayloads.get(fileName);
if ( fileRelatedPayloads.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithPayloads" HashMultimap.
logger.error("The stored file \"" + fileName + "\" is not related to any Payload returned from the Worker!");
continue;
}
// Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end).
try {
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url);
numUploadedFiles ++;
} catch (Exception e) {
logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e);
}
// Else, the record will have its file-data set to "null", in the end of this method.
}
//logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -1) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
// (fileNames.length -1) --> minus the zip-file
} catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
failedBatches ++;
}
} // End of batches.
updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an s3Url have < null > file-data (some batches or uploads might have failed).
deleteDirectory(curAssignmentsBaseDir);
if ( failedBatches == numOfBatches ) {
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return UploadFullTextsResponse.unsuccessful;
} else
return UploadFullTextsResponse.successful;
}
private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) {
baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
//logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) new URL(requestUrl).openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("User-Agent", "UrlsController");
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode != 200 ) {
logger.warn("HTTP-" + statusCode + ": " + getErrorMessageFromResponseBody(conn) + "\nProblem when requesting the ZipFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl);
return null;
}
} catch (Exception e) {
logger.warn("Problem when requesting the ZipFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + e.getMessage());
return null;
}
return conn;
}
private String getErrorMessageFromResponseBody(HttpURLConnection conn) {
StringBuilder errorMsgStrB = new StringBuilder(500);
try ( BufferedReader br = new BufferedReader(new InputStreamReader(conn.getErrorStream())) ) { // Try-with-resources
String inputLine;
while ( (inputLine = br.readLine()) != null )
{
if ( !inputLine.isEmpty() )
errorMsgStrB.append(inputLine);
}
return (errorMsgStrB.length() != 0) ? errorMsgStrB.toString() : null; // Make sure we return a "null" on empty string, to better handle the case in the caller-function.
} catch ( IOException ioe ) {
logger.error("IOException when retrieving the error-message: " + ioe.getMessage());
return null;
} catch ( Exception e ) {
logger.error("Could not extract the errorMessage!", e);
return null;
}
}
private List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch) {
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
int endingIndex = (curBatch * numOfFullTextsPerBatch);
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
endingIndex = numAllFullTexts;
List<String> fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch);
for ( int i = initialIndex; i < endingIndex; ++i ) {
try {
fileNamesOfCurBatch.add(allFileNames.get(i));
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for i=" + i + "\n" + ioobe.getMessage(), ioobe);
}
}
return fileNamesOfCurBatch;
}
private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch) {
final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
sb.append(baseUrl);
int numFullTextsCurBatch = fileNamesForCurBatch.size();
for ( int j=0; j < numFullTextsCurBatch; ++j ){
sb.append(fileNamesForCurBatch.get(j));
if ( j < (numFullTextsCurBatch -1) )
sb.append(",");
}
return sb.toString();
}
private final int bufferSize = 20971520; // 20 MB
public boolean saveZipFile(HttpURLConnection conn, File zipFile) {
InputStream inStream = null;
FileOutputStream outStream = null;
try {
inStream = conn.getInputStream();
outStream = new FileOutputStream(zipFile);
byte[] byteBuffer = new byte[bufferSize]; // 20 MB
int bytesRead = -1;
while ( (bytesRead = inStream.read(byteBuffer, 0, bufferSize)) != -1 ) {
outStream.write(byteBuffer, 0, bytesRead);
}
return true;
} catch (Exception e) {
logger.error("Could not save the zip file \"" + zipFile.getName() + "\": " + e.getMessage(), e);
return false;
} finally {
try {
if ( inStream != null )
inStream.close();
if ( outStream != null )
outStream.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
/**
* This method updates the UrlReports to not point to any downloaded fullText files.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails, and we don't want any "links" to locally stored files, which will be deleted.
* If the "shouldCheckAndKeepS3UploadedFiles" is set to "true", then the payloads which have their file uploaded to the S3-ObjectStore, are excluded.
* @param urlReports
* @param shouldCheckAndKeepS3UploadedFiles
*/
public void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports, boolean shouldCheckAndKeepS3UploadedFiles) {
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
if ( shouldCheckAndKeepS3UploadedFiles ) {
String fileLocation = payload.getLocation();
if ( (fileLocation == null) || s3ObjectStore.isLocationInStore(fileLocation) )
continue;
}
// Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept.
payload.setLocation(null);
payload.setHash(null);
payload.setMime_type(null);
payload.setSize(null);
}
}
/**
* Set the fileLocation for all those Payloads related to the File.
* @param filePayloads
* @param s3Url
*/
public void setFullTextForMultiplePayloads(Set<Payload> filePayloads, String s3Url) {
for ( Payload payload : filePayloads )
if ( payload != null )
payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker.
}
public boolean deleteDirectory(File curBatchDir) {
try {
org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir);
return true;
} catch (IOException e) {
logger.error("The following directory could not be deleted: " + curBatchDir.getName(), e);
return false;
}
}
}