UrlsController/src/main/java/eu/openaire/urls_controller/util/FileUtils.java

721 lines
36 KiB
Java
Raw Normal View History

package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.Task;
import eu.openaire.urls_controller.models.UrlReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.boot.configurationprocessor.json.JSONObject;
import javax.servlet.http.HttpServletRequest;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.*;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class FileUtils {
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
public static ThreadLocal<Scanner> inputScanner = new ThreadLocal<Scanner>(); // Every Thread has its own variable.
private static final ThreadLocal<Integer> fileIndex = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> unretrievableInputLines = new ThreadLocal<Integer>();
public static ThreadLocal<Integer> duplicateIdUrlEntries = new ThreadLocal<Integer>();
public static final int jsonBatchSize = 3000;
private static final String utf8Charset = "UTF-8";
public static String inputFileFullPath;
private static final String workingDir = System.getProperty("user.dir") + File.separator;
public FileUtils() throws RuntimeException
{
inputFileFullPath = workingDir + "src" + File.separator + "main" + File.separator + "resources";
String resourceFileName = "testInputFiles" + File.separator + "orderedList1000.json";
inputFileFullPath += File.separator + resourceFileName;
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(resourceFileName);
if ( inputStream == null )
throw new RuntimeException("No resourceFile was found with name \"" + resourceFileName + "\".");
logger.debug("Going to retrieve the data from the inputResourceFile: " + resourceFileName);
FileUtils.inputScanner.set(new Scanner(inputStream, utf8Charset));
fileIndex.set(0); // Re-initialize the file-number-pointer.
unretrievableInputLines.set(0);
duplicateIdUrlEntries.set(0);
}
/**
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
* Renames the clone to the original's name.
* Returns the errorMsg, if an error appears, otherwise is returns "null".
* */
public static String mergeParquetFiles(String tableName, Connection con, String whereClause, String parameter)
{
String errorMsg;
if ( tableName == null ) {
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
logger.error(errorMsg);
return errorMsg;
}
// Make sure the following are empty strings (in case another method call this one in the future with a null-value).
if ( whereClause == null )
whereClause = "";
if ( parameter == null )
parameter = "";
else
parameter = " '" + parameter + "'"; // This will be a "string-check".
Statement statement;
try {
statement = con.createStatement();
} catch (SQLException sqle) {
errorMsg = "Problem when creating a connection-statement!\n";
logger.error(errorMsg + sqle.getMessage());
return errorMsg;
}
try {
statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter);
statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
} catch (SQLException sqle) {
errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage()), sqle);
return errorMsg;
} finally {
// Make sure we close the statement.
try { statement.close(); }
catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); }
}
return null; // No errorMsg, everything is fine.
}
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError};
private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator;
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public static UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId)
{
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!");
return UploadFullTextsResponse.unsuccessful;
}
String remoteAddr = request.getHeader("X-FORWARDED-FOR");
if ( remoteAddr == null || "".equals(remoteAddr) )
remoteAddr = request.getRemoteAddr();
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the Impala-connection!");
return UploadFullTextsResponse.databaseError;
}
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ;
PreparedStatement getFileLocationForHashPreparedStatement = null;
try {
getFileLocationForHashPreparedStatement = con.prepareStatement(getFileLocationForHashQuery);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the prepared statement for \"" + getFileLocationForHashQuery + "\"!\n" + sqle.getMessage());
return UploadFullTextsResponse.databaseError;
}
// Get the file-locations.
int numFullTextUrlsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
HashMultimap<String, String> allFileNamesWithIDsHashMap = HashMultimap.create((urlReports.size() / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( UrlReport urlReport : urlReports )
{
UrlReport.StatusType statusType = urlReport.getStatus();
if ( (statusType == null) || statusType.equals(UrlReport.StatusType.non_accessible) ) {
continue;
}
numFullTextUrlsFound ++;
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
String fileLocation = null;
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
// Use the same prepared-statement for all requests, to improve speed (just like when inserting similar thing to the DB).
String fileHash = payload.getHash();
if ( fileHash != null ) {
try {
getFileLocationForHashPreparedStatement.setString(1, fileHash);
} catch (SQLException sqle) {
logger.error("Error when setting the parameter in \"getFileLocationForHashQuery\"!\n" + sqle.getMessage());
}
try ( ResultSet resultSet = getFileLocationForHashPreparedStatement.executeQuery() ) {
if ( resultSet.next() ) { // Move the "cursor" to the first row. If there is any data, then take the first result (there should not be more, but we still want the first anyway).
fileLocation = resultSet.getString(1);
if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded.
payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
continue; // Do not request the file from the worker, it's already uploaded. Move on.
}
}
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage());
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection.
}
}
// If the full-text of this record was not found by a previous batch..
fileLocation = payload.getLocation();
if ( fileLocation != null ) { // If the docFile was downloaded (without an error)..
Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation);
if ( !matcher.matches() ) {
continue;
}
String fileNameWithExtension = matcher.group(1);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
continue;
}
allFileNamesWithIDsHashMap.put(fileNameWithExtension, payload.getId()); // The keys and the values are not duplicate. Task with ID-1 might have an "ID-1.pdf" file.
// While a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}
}
// Close the Prepared Statement.
try {
if ( getFileLocationForHashPreparedStatement != null )
getFileLocationForHashPreparedStatement.close();
} catch (SQLException sqle) {
logger.error("Failed to close the \"getFileLocationForHashPreparedStatement\"!\n" + sqle.getMessage());
} finally {
ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database.
ImpalaConnector.closeConnection(con);
}
logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ").");
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithIDsHashMap.keySet());
int numAllFullTexts = allFileNames.size();
if ( numAllFullTexts == 0 ) {
logger.warn("The retrieved files where < 0 > for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return UploadFullTextsResponse.successful; // It was handled, no error.
}
// Request the full-texts in batches, compressed in zip.
int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch);
if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches ++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches.");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
// Index all Payloads to be more efficiently searched later.
HashMultimap<String, Payload> payloadsHashMultimap = HashMultimap.create((urlReports.size() / 3), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null )
payloadsHashMultimap.put(payload.getId(), payload);
}
String curAssignmentsBaseLocation = baseTargetLocation + "assignments_" + assignmentsBatchCounter + File.separator;
File curAssignmentsBaseDir = new File(curAssignmentsBaseLocation);
int failedBatches = 0;
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter )
{
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId);
if ( conn == null ) {
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
continue; // To the next batch.
}
// Get the extracted files.
String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator;
try {
// Create this batch-directory.
Path curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// Unzip the file. Iterate over the PDFs and upload each one of them and get the S3-Url
String zipFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".zip";
File zipFile = new File(zipFileFullPath);
if ( ! saveZipFile(conn, zipFile) ) {
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
continue; // To the next batch.
}
//logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG!
FileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
String[] fileNames = new File(targetDirectory).list();
if ( (fileNames == null) || (fileNames.length <= 1 ) ) { // The directory might have only one file, the "zip-file".
logger.error("No full-text fileNames where extracted from directory: " + targetDirectory);
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
continue; // To the next batch.
}
// Iterate over the files and upload them to S3.
int numUploadedFiles = 0;
for ( String fileName : fileNames )
{
String fileFullPath = targetDirectory + fileName;
if ( fileFullPath.equals(zipFileFullPath) ) // Exclude the zip-file from uploading.
continue;
// Check if this stored file is related to one or more IDs from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedIDs".
Set<String> fileRelatedIDs = allFileNamesWithIDsHashMap.get(fileName);
if ( fileRelatedIDs.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithIDsHashMap" HashMultimap.
logger.error("The stored file \"" + fileName + "\" is not related to any ID which had a file requested from the Worker!");
continue;
}
if ( isFileNameProblematic(fileName, payloadsHashMultimap) ) // Do some more checks.
continue;
// At this point, we know that this file is related with one or more IDs of the payloads AND it has a valid fileName.
// Let's try to upload the file to S3 and update the payloads of all related IDs, either in successful upload or not.
String s3Url = S3ObjectStoreMinIO.uploadToS3(fileName, fileFullPath);
if ( s3Url != null ) {
setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url); // It checks weather (s3Url != null) and acts accordingly.
numUploadedFiles ++;
}
// Else, the record will have its file-data set to "null", in the end of this method.
}
logger.info("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -1) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
// (fileNames.length -1) --> minus the zip-file
} catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
}
} // End of batches.
// Delete this assignments-num directory.
deleteDirectory(curAssignmentsBaseDir);
// Check if none of the batches were handled..
if ( failedBatches == numOfBatches ) {
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return UploadFullTextsResponse.unsuccessful;
} else {
replaceNotUploadedFileLocations(urlReports); // Make sure all records without an s3Url have < null > file-data.
return UploadFullTextsResponse.successful;
}
}
private static HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId)
{
baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
logger.info("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) new URL(requestUrl).openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("User-Agent", "UrlsController");
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode != 200 ) {
logger.warn("HTTP-" + statusCode + ": " + getErrorMessageFromResponseBody(conn) + "\nProblem when requesting the ZipFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl);
return null;
}
} catch (Exception e) {
logger.warn("Problem when requesting the ZipFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + e.getMessage());
return null;
}
return conn;
}
private static String getErrorMessageFromResponseBody(HttpURLConnection conn)
{
StringBuilder errorMsgStrB = new StringBuilder(500);
try ( BufferedReader br = new BufferedReader(new InputStreamReader(conn.getErrorStream())) ) { // Try-with-resources
String inputLine;
while ( (inputLine = br.readLine()) != null )
{
if ( !inputLine.isEmpty() )
errorMsgStrB.append(inputLine);
}
return (errorMsgStrB.length() != 0) ? errorMsgStrB.toString() : null; // Make sure we return a "null" on empty string, to better handle the case in the caller-function.
} catch ( IOException ioe ) {
logger.error("IOException when retrieving the error-message: " + ioe.getMessage());
return null;
} catch ( Exception e ) {
logger.error("Could not extract the errorMessage!", e);
return null;
}
}
private static List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch)
{
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
int endingIndex = (curBatch * numOfFullTextsPerBatch);
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
endingIndex = numAllFullTexts;
List<String> fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch);
for ( int i = initialIndex; i < endingIndex; ++i ) {
try {
fileNamesOfCurBatch.add(allFileNames.get(i));
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for i=" + i + "\n" + ioobe.getMessage(), ioobe);
}
}
return fileNamesOfCurBatch;
}
private static final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
// TODO - Make it THREAD-LOCAL, if we move to multi-thread batch requests.
private static String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch)
{
sb.append(baseUrl);
int numFullTextsCurBatch = fileNamesForCurBatch.size();
for ( int j=0; j < numFullTextsCurBatch; ++j ){
sb.append(fileNamesForCurBatch.get(j));
if ( j < (numFullTextsCurBatch -1) )
sb.append(",");
}
String requestUrl = sb.toString();
sb.setLength(0); // Reset for the next batch.
return requestUrl;
}
private static final int bufferSize = 20971520; // 20 MB
public static boolean saveZipFile(HttpURLConnection conn, File zipFile)
{
InputStream inStream = null;
FileOutputStream outStream = null;
try {
inStream = conn.getInputStream();
outStream = new FileOutputStream(zipFile);
byte[] byteBuffer = new byte[bufferSize]; // 20 MB
int bytesRead = -1;
while ( (bytesRead = inStream.read(byteBuffer, 0, bufferSize)) != -1 ) {
outStream.write(byteBuffer, 0, bytesRead);
}
return true;
} catch (Exception e) {
logger.error("Could not save the zip file \"" + zipFile.getName() + "\": " + e.getMessage(), e);
return false;
} finally {
try {
if ( inStream != null )
inStream.close();
if ( outStream != null )
outStream.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
private static boolean isFileNameProblematic(String fileName, HashMultimap<String, Payload> payloadsHashMultimap)
{
// Get the ID of the file.
Matcher matcher = FILENAME_ID.matcher(fileName);
if ( !matcher.matches() ) {
logger.error("The given fileName \"" + fileName + "\" was invalid! Could not be matched with matcher: " + matcher);
return true;
}
String fileID = matcher.group(1);
if ( (fileID == null) || fileID.isEmpty() ) {
logger.error("The given fileName \"" + fileName + "\" was invalid. No fileID was extracted!");
return true;
}
// Take the payloads which are related with this ID. An ID might have multiple original-urls, thus multiple payloads.
// The ID we have here, is the one from the first record which reached to this file.
// There might be other records pointing to this file. But, in order to mark this file as "valid", we have to match it with at least one of the records-IDs.
// We do this process to avoid handling and uploading irrelevant files which could find their way to the working directory (either because of a Worker's error or any other type of malfunction or even malicious action).
Set<Payload> payloads = payloadsHashMultimap.get(fileID);
if ( payloads.isEmpty() ) {
logger.error("The given fileID \"" + fileID + "\" was not part of the \"payloadsHashMultimap\"!");
return true;
}
// Search through the payloads to find at least one match, in order for this file to NOT be "problematic".
for ( Payload payload : payloads )
{
String location = payload.getLocation();
if ( (location != null) && location.endsWith(fileName) )
return false; // It's not problematic.
}
logger.error("None of the locations of the payloads matched with the ID \"" + fileID + "\" are ending with the filename \"" + fileName + "\" they were supposed to.");
return true;
}
/**
* This method updates the UrlReports to not point to any downloaded fullText files.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails.
* Then, we don't want any "links" to locally stored files, which will be deleted.
* @param urlReports
* @return
*/
public static void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports)
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null )
setUnretrievedFullText(payload);
}
}
private static void replaceNotUploadedFileLocations(List<UrlReport> urlReports)
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null ) {
String fileLocation = payload.getLocation();
if ( (fileLocation != null) && (! fileLocation.startsWith(S3ObjectStoreMinIO.endpoint)) )
setUnretrievedFullText(payload);
}
}
}
public static void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMultimap<String, Payload> payloadsHashMultimap, List<String> fileNames)
{
for ( String fileName : fileNames ) {
// Get the ID of the file.
Matcher matcher = FILENAME_ID.matcher(fileName);
if ( !matcher.matches() ) {
continue;
}
String id = matcher.group(1);
if ( (id == null) || id.isEmpty() ) {
continue;
}
Set<Payload> payloads = payloadsHashMultimap.get(id);
// Set for all payloads connected to this ID.
for ( Payload payload : payloads )
if ( payload != null )
setUnretrievedFullText(payload); // It changes the payload in the original UrlReport list.
}
}
public static void setUnretrievedFullText(Payload payload)
{
// Mark the full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept.
payload.setLocation(null);
payload.setHash(null);
payload.setMime_type(null);
payload.setSize(null);
}
/**
* Set the fileLocation for all those IDs related to the File. The IDs may have one or more payloads.
* @param fileIDs
* @param payloadsHashMultimap
* @param s3Url
*/
public static void setFullTextForMultipleIDs(Set<String> fileIDs, HashMultimap<String, Payload> payloadsHashMultimap, String s3Url)
{
for ( String id : fileIDs ) {
Set<Payload> payloads = payloadsHashMultimap.get(id);
if ( payloads.isEmpty() ) {
logger.error("The given id \"" + id + "\" (coming from the \"allFileNamesWithIDsHashMap\"), is not found inside the \"payloadsHashMultimap\"!");
continue;
}
for ( Payload payload : payloads )
if ( payload.getHash() != null ) // Update only for the records which led to a file, not all the records of this ID (an ID might have multiple original_urls pointing to different directions).
payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker.
}
}
public static boolean deleteDirectory(File curBatchDir) {
try {
org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir);
return true;
} catch (IOException e) {
logger.error("The following directory could not be deleted: " + curBatchDir.getName(), e);
return false;
}
}
private static String getCutBatchExceptionMessage(String sqleMessage)
{
// The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging.
int maxEnding = 1500;
if ( sqleMessage.length() > maxEnding )
return (sqleMessage.substring(0, maxEnding) + "...");
else
return sqleMessage;
}
// This is currently not used, but it may be useful in a future scenario.
private static long getInputFileLinesNum()
{
long numOfLines = 0;
try {
numOfLines = Files.lines(Paths.get(inputFileFullPath)).count();
logger.debug("The numOfLines in the inputFile is " + numOfLines);
} catch (IOException e) {
logger.error("Could not retrieve the numOfLines. " + e);
return -1;
}
return numOfLines;
}
/**
* This method decodes a Json String and returns its members.
* @param jsonLine String
* @return HashMap<String,String>
*/
public static Task jsonDecoder(String jsonLine)
{
// Get ID and url and put them in the HashMap
String idStr = null;
String urlStr = null;
try {
JSONObject jObj = new JSONObject(jsonLine); // Construct a JSONObject from the retrieved jsonLine.
idStr = jObj.get("id").toString();
urlStr = jObj.get("url").toString();
} catch (JSONException je) {
logger.warn("JSONException caught when tried to parse and extract values from jsonLine: \t" + jsonLine, je);
return null;
}
if ( urlStr.isEmpty() ) {
if ( !idStr.isEmpty() ) // If we only have the id, then go and log it.
logger.warn("The url was not found for id: \"" + idStr + "\"");
return null;
}
return new Task(idStr, urlStr, null);
}
/**
* This method parses a Json file and extracts the urls, along with the IDs.
* @return HashMultimap<String, String>
*/
public static HashMultimap<String, String> getNextIdUrlPairBatchFromJson()
{
Task inputIdUrlTuple;
int expectedPathsPerID = 5;
int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID;
HashMultimap<String, String> idAndUrlMappedInput = HashMultimap.create(expectedIDsPerBatch, expectedPathsPerID);
int curBeginning = fileIndex.get();
while ( inputScanner.get().hasNextLine() && (fileIndex.get() < (curBeginning + jsonBatchSize)) )
{// While (!EOF) and inside the current url-batch, iterate through lines.
//logger.debug("fileIndex: " + FileUtils.fileIndex.get()); // DEBUG!
// Take each line, remove potential double quotes.
String retrievedLineStr = inputScanner.get().nextLine();
//logger.debug("Loaded from inputFile: " + retrievedLineStr); // DEBUG!
fileIndex.set(fileIndex.get() +1);
if ( retrievedLineStr.isEmpty() ) {
unretrievableInputLines.set(unretrievableInputLines.get() +1);
continue;
}
if ( (inputIdUrlTuple = jsonDecoder(retrievedLineStr)) == null ) { // Decode the jsonLine and take the two attributes.
logger.warn("A problematic inputLine found: \t" + retrievedLineStr);
unretrievableInputLines.set(unretrievableInputLines.get() +1);
continue;
}
if ( !idAndUrlMappedInput.put(inputIdUrlTuple.getId(), inputIdUrlTuple.getUrl()) ) { // We have a duplicate url in the input.. log it here as we cannot pass it through the HashMultimap. It's possible that this as well as the original might be/give a docUrl.
duplicateIdUrlEntries.set(duplicateIdUrlEntries.get() +1);
}
}
return idAndUrlMappedInput;
}
/**
* This method returns the number of (non-heading, non-empty) lines we have read from the inputFile.
* @return loadedUrls
*/
public static int getCurrentlyLoadedUrls() // In the end, it gives the total number of urls we have processed.
{
return FileUtils.fileIndex.get() - FileUtils.unretrievableInputLines.get();
}
/**
* This method checks if there is no more input-data and returns true in that case.
* Otherwise, it returns false, if there is more input-data to be loaded.
* A "RuntimeException" is thrown if no input-urls were retrieved in general.
* @param isEmptyOfData
* @param isFirstRun
* @return finished loading / not finished
* @throws RuntimeException
*/
public static boolean isFinishedLoading(boolean isEmptyOfData, boolean isFirstRun)
{
if ( isEmptyOfData ) {
if ( isFirstRun )
logger.error("Could not retrieve any urls from the inputFile!");
else
logger.debug("Done loading " + FileUtils.getCurrentlyLoadedUrls() + " urls from the inputFile.");
return true;
}
return false;
}
}