You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
UrlsController/src/main/java/eu/openaire/urls_controller/util/FileUtils.java

722 lines
36 KiB
Java

package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.Task;
import eu.openaire.urls_controller.models.UrlReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.boot.configurationprocessor.json.JSONObject;
import javax.servlet.http.HttpServletRequest;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.*;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class FileUtils {
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
public static ThreadLocal<Scanner> inputScanner = new ThreadLocal<Scanner>(); // Every Thread has its own variable.
private static final ThreadLocal<Integer> fileIndex = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> unretrievableInputLines = new ThreadLocal<Integer>();
public static ThreadLocal<Integer> duplicateIdUrlEntries = new ThreadLocal<Integer>();
public static final int jsonBatchSize = 3000;
private static final String utf8Charset = "UTF-8";
public static String inputFileFullPath;
private static final String workingDir = System.getProperty("user.dir") + File.separator;
public FileUtils() throws RuntimeException
{
inputFileFullPath = workingDir + "src" + File.separator + "main" + File.separator + "resources";
String resourceFileName = "testInputFiles" + File.separator + "orderedList1000.json";
inputFileFullPath += File.separator + resourceFileName;
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(resourceFileName);
if ( inputStream == null )
throw new RuntimeException("No resourceFile was found with name \"" + resourceFileName + "\".");
logger.debug("Going to retrieve the data from the inputResourceFile: " + resourceFileName);
FileUtils.inputScanner.set(new Scanner(inputStream, utf8Charset));
fileIndex.set(0); // Re-initialize the file-number-pointer.
unretrievableInputLines.set(0);
duplicateIdUrlEntries.set(0);
}
/**
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
* Renames the clone to the original's name.
* Returns the errorMsg, if an error appears, otherwise is returns "null".
* */
public static String mergeParquetFiles(String tableName, Connection con, String whereClause, String parameter)
{
String errorMsg;
if ( tableName == null ) {
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
logger.error(errorMsg);
return errorMsg;
}
// Make sure the following are empty strings (in case another method call this one in the future with a null-value).
if ( whereClause == null )
whereClause = "";
if ( parameter == null )
parameter = "";
else
parameter = " '" + parameter + "'"; // This will be a "string-check".
Statement statement;
try {
statement = con.createStatement();
} catch (SQLException sqle) {
errorMsg = "Problem when creating a connection-statement!\n";
logger.error(errorMsg + sqle.getMessage());
return errorMsg;
}
try {
statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter);
statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
} catch (SQLException sqle) {
errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage()), sqle);
return errorMsg;
} finally {
// Make sure we close the statement.
try { statement.close(); }
catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); }
}
return null; // No errorMsg, everything is fine.
}
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError};
private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator;
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public static UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId)
{
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!");
return UploadFullTextsResponse.unsuccessful;
}
String remoteAddr = request.getHeader("X-FORWARDED-FOR");
if ( remoteAddr == null || "".equals(remoteAddr) )
remoteAddr = request.getRemoteAddr();
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the Impala-connection!");
return UploadFullTextsResponse.databaseError;
}
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ;
PreparedStatement getFileLocationForHashPreparedStatement = null;
try {
getFileLocationForHashPreparedStatement = con.prepareStatement(getFileLocationForHashQuery);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the prepared statement for \"" + getFileLocationForHashQuery + "\"!\n" + sqle.getMessage());
return UploadFullTextsResponse.databaseError;
}
// Get the file-locations.
int numFullTextUrlsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
HashMultimap<String, String> allFileNamesWithIDsHashMap = HashMultimap.create((urlReports.size() / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( UrlReport urlReport : urlReports )
{
UrlReport.StatusType statusType = urlReport.getStatus();
if ( (statusType == null) || statusType.equals(UrlReport.StatusType.non_accessible) ) {
continue;
}
numFullTextUrlsFound ++;
Payload payload = urlReport.getPayload();
if ( payload != null )
{
String fileLocation = null;
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
// Use the same prepared-statement for all requests, to improve speed (just like when inserting similar thing to the DB).
String fileHash = payload.getHash();
if ( fileHash != null ) {
try {
getFileLocationForHashPreparedStatement.setString(1, fileHash);
} catch (SQLException sqle) {
logger.error("Error when setting the parameter in \"getFileLocationForHashQuery\"!\n" + sqle.getMessage());
}
try ( ResultSet resultSet = getFileLocationForHashPreparedStatement.executeQuery() ) {
if ( resultSet.next() ) { // Move the "cursor" to the first row. If there is any data..
fileLocation = resultSet.getString(1);
if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded.
payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3.
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
continue;
}
}
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage());
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - Since the database will have problems.. there is not point in trying to insert the payloads to Impala (handling it like we tried to insert and got an error).
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not autoclosed here)and the Database connection.
}
}
// If the full-text of this record was not found by a previous batch..
fileLocation = payload.getLocation();
if ( fileLocation != null ) { // If the docFile was downloaded (without an error)..
Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation);
if ( !matcher.matches() ) {
continue;
}
String fileNameWithExtension = matcher.group(1);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
continue;
}
allFileNamesWithIDsHashMap.put(fileNameWithExtension, payload.getId()); // The keys and the values are not duplicate. Task with ID-1 might have an "ID-1.pdf" file.
// While a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1.
}
}
}
// Close the Prepared Statement.
try {
if ( getFileLocationForHashPreparedStatement != null )
getFileLocationForHashPreparedStatement.close();
} catch (SQLException sqle) {
logger.error("Failed to close the \"getFileLocationForHashPreparedStatement\"!\n" + sqle.getMessage());
} finally {
ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database.
ImpalaConnector.closeConnection(con);
}
logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ").");
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithIDsHashMap.keySet());
int numAllFullTexts = allFileNames.size();
if ( numAllFullTexts == 0 ) {
logger.warn("The retrieved files where < 0 > for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return UploadFullTextsResponse.successful; // It was handled, no error.
}
// Request the full-texts in batches, compressed in zip.
int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch);
if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches ++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches.");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
// Index all Payloads to be more efficiently searched later.
HashMultimap<String, Payload> payloadsHashMultimap = HashMultimap.create((urlReports.size() / 3), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null )
payloadsHashMultimap.put(payload.getId(), payload);
}
String curAssignmentsBaseLocation = baseTargetLocation + "assignments_" + assignmentsBatchCounter + File.separator;
File curAssignmentsBaseDir = new File(curAssignmentsBaseLocation);
int failedBatches = 0;
for ( int batchCounter = 1; batchCounter <= numOfBatches; ++batchCounter )
{
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId);
if ( conn == null ) {
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
continue; // To the next batch.
}
// Get the extracted files.
String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator;
try {
// Create this batch-directory.
Path curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// Unzip the file. Iterate over the PDFs and upload each one of them and get the S3-Url
String zipFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".zip";
File zipFile = new File(zipFileFullPath);
if ( ! saveZipFile(conn, zipFile) ) {
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
continue; // To the next batch.
}
//logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG!
FileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
String[] fileNames = new File(targetDirectory).list();
if ( (fileNames == null) || (fileNames.length <= 1 ) ) { // The directory might have only one file, the "zip-file".
logger.error("No full-text fileNames where extracted from directory: " + targetDirectory);
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
continue; // To the next batch.
}
// Iterate over the files and upload them to S3.
int numUploadedFiles = 0;
for ( String fileName : fileNames )
{
String fileFullPath = targetDirectory + fileName;
if ( fileFullPath.equals(zipFileFullPath) ) // Exclude the zip-file from uploading.
continue;
// Check if this stored file is related to one or more IDs from the Set.
Set<String> fileRelatedIDs = allFileNamesWithIDsHashMap.get(fileName);
if ( fileRelatedIDs.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithIDsHashMap" HashMultimap.
logger.error("The stored file \"" + fileName + "\" is not related to any ID which had a file requested from the Worker!");
continue;
}
if ( isFileNameProblematic(fileName, payloadsHashMultimap) ) // Do some more checks.
continue;
// At this point, we know that this file is related with one or more IDs of the payloads AND it has a valid fileName.
// Let's try to upload the file to S3 and update the payloads of all related IDs, either in successful upload or not.
String s3Url = S3ObjectStoreMinIO.uploadToS3(fileName, fileFullPath);
if ( s3Url != null ) {
setFullTextForMultipleIDs(payloadsHashMultimap, fileRelatedIDs, s3Url); // It checks weather (s3Url != null) and acts accordingly.
numUploadedFiles++;
}
// Else, the record will have its file-data set to "null", in the end of this method.
}
logger.info("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -1) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
// (fileNames.length -1) --> minus the zip-file
} catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++;
}
} // End of batches.
// Delete this assignments-num directory.
deleteDirectory(curAssignmentsBaseDir);
// Check if none of the batches were handled..
if ( failedBatches == numOfBatches ) {
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return UploadFullTextsResponse.unsuccessful;
} else {
replaceNotUploadedFileLocations(urlReports); // Make sure all records without an s3Url have < null > file-data.
return UploadFullTextsResponse.successful;
}
}
private static HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId)
{
baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);
logger.info("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) new URL(requestUrl).openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("User-Agent", "UrlsController");
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode != 200 ) {
logger.warn("HTTP-" + statusCode + ": " + getErrorMessageFromResponseBody(conn) + "\nProblem when requesting the ZipFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl);
return null;
}
} catch (Exception e) {
logger.warn("Problem when requesting the ZipFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + e.getMessage());
return null;
}
return conn;
}
private static String getErrorMessageFromResponseBody(HttpURLConnection conn)
{
StringBuilder errorMsgStrB = new StringBuilder(500);
try ( BufferedReader br = new BufferedReader(new InputStreamReader(conn.getErrorStream())) ) { // Try-with-resources
String inputLine;
while ( (inputLine = br.readLine()) != null )
{
if ( !inputLine.isEmpty() )
errorMsgStrB.append(inputLine);
}
return (errorMsgStrB.length() != 0) ? errorMsgStrB.toString() : null; // Make sure we return a "null" on empty string, to better handle the case in the caller-function.
} catch ( IOException ioe ) {
logger.error("IOException when retrieving the error-message: " + ioe.getMessage());
return null;
} catch ( Exception e ) {
logger.error("Could not extract the errorMessage!", e);
return null;
}
}
private static List<String> getFileNamesForBatch(List<String> allFileNames, int numAllFullTexts, int curBatch)
{
int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch);
int endingIndex = (curBatch * numOfFullTextsPerBatch);
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
endingIndex = numAllFullTexts;
List<String> fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch);
for ( int i = initialIndex; i < endingIndex; ++i ) {
try {
fileNamesOfCurBatch.add(allFileNames.get(i));
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for i=" + i + "\n" + ioobe.getMessage(), ioobe);
}
}
return fileNamesOfCurBatch;
}
private static final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
// TODO - Make it THREAD-LOCAL, if we move to multi-thread batch requests.
private static String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch)
{
sb.append(baseUrl);
int numFullTextsCurBatch = fileNamesForCurBatch.size();
for ( int j=0; j < numFullTextsCurBatch; ++j ){
sb.append(fileNamesForCurBatch.get(j));
if ( j < (numFullTextsCurBatch -1) )
sb.append(",");
}
String requestUrl = sb.toString();
sb.setLength(0); // Reset for the next batch.
return requestUrl;
}
private static final int bufferSize = 20971520; // 20 MB
public static boolean saveZipFile(HttpURLConnection conn, File zipFile)
{
InputStream inStream = null;
FileOutputStream outStream = null;
try {
inStream = conn.getInputStream();
outStream = new FileOutputStream(zipFile);
byte[] byteBuffer = new byte[bufferSize]; // 20 MB
int bytesRead = -1;
while ( (bytesRead = inStream.read(byteBuffer, 0, bufferSize)) != -1 ) {
outStream.write(byteBuffer, 0, bytesRead);
}
return true;
} catch (Exception e) {
logger.error("Could not save the zip file \"" + zipFile.getName() + "\": " + e.getMessage(), e);
return false;
} finally {
try {
if ( inStream != null )
inStream.close();
if ( outStream != null )
outStream.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
private static boolean isFileNameProblematic(String fileName, HashMultimap<String, Payload> payloadsHashMultimap)
{
// Get the ID of the file.
Matcher matcher = FILENAME_ID.matcher(fileName);
if ( !matcher.matches() ) {
logger.error("The given fileName \"" + fileName + "\" was invalid! Could not be matched with matcher: " + matcher);
return true;
}
String fileID = matcher.group(1);
if ( (fileID == null) || fileID.isEmpty() ) {
logger.error("The given fileName \"" + fileName + "\" was invalid. No fileID was extracted!");
return true;
}
// Take the payloads which are related with this ID. An ID might have multiple original-urls, thus multiple payloads.
// The ID we have here, is the one from the first record which reached to this file.
// There might be other records pointing to this file. But, in order to mark this file as "valid", we have to match it with at least one of the records-IDs.
// We do this process to avoid handling and uploading irrelevant files which could find their way to the working directory (either because of a Worker's error or any other type of malfunction or even malicious action).
Set<Payload> payloads = payloadsHashMultimap.get(fileID);
if ( payloads.isEmpty() ) {
logger.error("The given fileID \"" + fileID + "\" was not part of the \"payloadsHashMultimap\"!");
return true;
}
// Search through the payloads to find at least one match, in order for this file to NOT be "problematic".
for ( Payload payload : payloads )
{
String location = payload.getLocation();
if ( (location != null) && location.endsWith(fileName) )
return false; // It's not problematic.
}
logger.error("None of the locations of the payloads matched with the ID \"" + fileID + "\" are ending with the filename \"" + fileName + "\" they were supposed to.");
return true;
}
/**
* This method updates the UrlReports to not point to any downloaded fullText files.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails.
* Then, we don't want any "links" to locally stored files, which will be deleted.
* @param urlReports
* @return
*/
public static void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports)
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null )
setUnretrievedFullText(payload);
}
}
private static void replaceNotUploadedFileLocations(List<UrlReport> urlReports)
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload != null ) {
String fileLocation = payload.getLocation();
if ( (fileLocation != null) && (! fileLocation.startsWith(S3ObjectStoreMinIO.endpoint)) )
setUnretrievedFullText(payload);
}
}
}
public static void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMultimap<String, Payload> payloadsHashMultimap, List<String> fileNames)
{
for ( String fileName : fileNames ) {
// Get the ID of the file.
Matcher matcher = FILENAME_ID.matcher(fileName);
if ( !matcher.matches() ) {
continue;
}
String id = matcher.group(1);
if ( (id == null) || id.isEmpty() ) {
continue;
}
Set<Payload> payloads = payloadsHashMultimap.get(id);
// Set for all payloads connected to this ID.
for ( Payload payload : payloads )
if ( payload != null )
setUnretrievedFullText(payload); // It changes the payload in the original UrlReport list.
}
}
public static void setUnretrievedFullText(Payload payload)
{
// Mark the full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept.
payload.setLocation(null);
payload.setHash(null);
payload.setMime_type(null);
payload.setSize(null);
}
/**
* Set the fileLocation for all those IDs related to the File. The IDs may have one or more payloads.
* @param payloadsHashMultimap
* @param fileIDs
* @param s3Url
*/
public static void setFullTextForMultipleIDs(HashMultimap<String, Payload> payloadsHashMultimap, Set<String> fileIDs, String s3Url)
{
for ( String id : fileIDs ) {
Set<Payload> payloads = payloadsHashMultimap.get(id);
if ( payloads.isEmpty() ) {
logger.error("The given id \"" + id + "\" (coming from the \"allFileNamesWithIDsHashMap\"), is not found inside the \"payloadsHashMultimap\"!");
continue;
}
for ( Payload payload : payloads )
if ( payload.getHash() != null ) // Update only for the records which led to a file, not all the records of this ID (an ID might have multiple original_urls pointing to different directions).
payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker.
}
}
public static boolean deleteDirectory(File curBatchDir) {
try {
org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir);
return true;
} catch (IOException e) {
logger.error("The following directory could not be deleted: " + curBatchDir.getName(), e);
return false;
}
}
private static String getCutBatchExceptionMessage(String sqleMessage)
{
// The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging.
int maxEnding = 1500;
if ( sqleMessage.length() > maxEnding )
return (sqleMessage.substring(0, maxEnding) + "...");
else
return sqleMessage;
}
// This is currently not used, but it may be useful in a future scenario.
private static long getInputFileLinesNum()
{
long numOfLines = 0;
try {
numOfLines = Files.lines(Paths.get(inputFileFullPath)).count();
logger.debug("The numOfLines in the inputFile is " + numOfLines);
} catch (IOException e) {
logger.error("Could not retrieve the numOfLines. " + e);
return -1;
}
return numOfLines;
}
/**
* This method decodes a Json String and returns its members.
* @param jsonLine String
* @return HashMap<String,String>
*/
public static Task jsonDecoder(String jsonLine)
{
// Get ID and url and put them in the HashMap
String idStr = null;
String urlStr = null;
try {
JSONObject jObj = new JSONObject(jsonLine); // Construct a JSONObject from the retrieved jsonLine.
idStr = jObj.get("id").toString();
urlStr = jObj.get("url").toString();
} catch (JSONException je) {
logger.warn("JSONException caught when tried to parse and extract values from jsonLine: \t" + jsonLine, je);
return null;
}
if ( urlStr.isEmpty() ) {
if ( !idStr.isEmpty() ) // If we only have the id, then go and log it.
logger.warn("The url was not found for id: \"" + idStr + "\"");
return null;
}
return new Task(idStr, urlStr, null);
}
/**
* This method parses a Json file and extracts the urls, along with the IDs.
* @return HashMultimap<String, String>
*/
public static HashMultimap<String, String> getNextIdUrlPairBatchFromJson()
{
Task inputIdUrlTuple;
int expectedPathsPerID = 5;
int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID;
HashMultimap<String, String> idAndUrlMappedInput = HashMultimap.create(expectedIDsPerBatch, expectedPathsPerID);
int curBeginning = fileIndex.get();
while ( inputScanner.get().hasNextLine() && (fileIndex.get() < (curBeginning + jsonBatchSize)) )
{// While (!EOF) and inside the current url-batch, iterate through lines.
//logger.debug("fileIndex: " + FileUtils.fileIndex.get()); // DEBUG!
// Take each line, remove potential double quotes.
String retrievedLineStr = inputScanner.get().nextLine();
//logger.debug("Loaded from inputFile: " + retrievedLineStr); // DEBUG!
fileIndex.set(fileIndex.get() +1);
if ( retrievedLineStr.isEmpty() ) {
unretrievableInputLines.set(unretrievableInputLines.get() +1);
continue;
}
if ( (inputIdUrlTuple = jsonDecoder(retrievedLineStr)) == null ) { // Decode the jsonLine and take the two attributes.
logger.warn("A problematic inputLine found: \t" + retrievedLineStr);
unretrievableInputLines.set(unretrievableInputLines.get() +1);
continue;
}
if ( !idAndUrlMappedInput.put(inputIdUrlTuple.getId(), inputIdUrlTuple.getUrl()) ) { // We have a duplicate url in the input.. log it here as we cannot pass it through the HashMultimap. It's possible that this as well as the original might be/give a docUrl.
duplicateIdUrlEntries.set(duplicateIdUrlEntries.get() +1);
}
}
return idAndUrlMappedInput;
}
/**
* This method returns the number of (non-heading, non-empty) lines we have read from the inputFile.
* @return loadedUrls
*/
public static int getCurrentlyLoadedUrls() // In the end, it gives the total number of urls we have processed.
{
return FileUtils.fileIndex.get() - FileUtils.unretrievableInputLines.get();
}
/**
* This method checks if there is no more input-data and returns true in that case.
* Otherwise, it returns false, if there is more input-data to be loaded.
* A "RuntimeException" is thrown if no input-urls were retrieved in general.
* @param isEmptyOfData
* @param isFirstRun
* @return finished loading / not finished
* @throws RuntimeException
*/
public static boolean isFinishedLoading(boolean isEmptyOfData, boolean isFirstRun)
{
if ( isEmptyOfData ) {
if ( isFirstRun )
logger.error("Could not retrieve any urls from the inputFile!");
else
logger.debug("Done loading " + FileUtils.getCurrentlyLoadedUrls() + " urls from the inputFile.");
return true;
}
return false;
}
}