- Avoid getting and uploading (to S3), full-texts which are already uploaded by previous assignments-batches.

- Fix not updating the fileLocation with the s3Url for records which share the same full-text.
- Set only one delete-order for each assignments-batch-files, not one (or more, by mistake) per zip-batch.
- Set the HttpStatus to "204 - NO_CONTENT", when no assignments are available to be returned to the Worker.
- Fix not unlocking the "dataBaseLock" in case of a "dataBase-connection"-error, in "addWorkerReport()".
- Improve some log-messages.
- Change the log-level for the "S3-bucket already exists" message.
- Update Gradle.
- Optimize imports.
- Code cleanup.
This commit is contained in:
Lampros Smyrnaios 2021-12-21 15:55:27 +02:00
parent 0178e44574
commit 33ba3e8d91
9 changed files with 230 additions and 117 deletions

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.1-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.2-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

View File

@ -8,7 +8,7 @@ elif [[ $# -gt 1 ]]; then
echo -e "Wrong number of arguments given: ${#}\nPlease execute it like: script.sh <justInstall: 0 | 1>"; exit 1 echo -e "Wrong number of arguments given: ${#}\nPlease execute it like: script.sh <justInstall: 0 | 1>"; exit 1
fi fi
gradleVersion="7.3.1" gradleVersion="7.3.2"
if [[ justInstall -eq 0 ]]; then if [[ justInstall -eq 0 ]]; then

View File

@ -8,8 +8,8 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.cors.CorsConfiguration; import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.CorsConfigurationSource; import org.springframework.web.cors.CorsConfigurationSource;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource; import org.springframework.web.cors.UrlBasedCorsConfigurationSource;

View File

@ -2,7 +2,7 @@ package eu.openaire.urls_controller.components;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled; //import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;

View File

@ -4,6 +4,7 @@ import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.beans.PropertyVetoException; import java.beans.PropertyVetoException;
import java.io.File; import java.io.File;
import java.io.FileReader; import java.io.FileReader;

View File

@ -6,7 +6,9 @@ import eu.openaire.urls_controller.models.Error;
import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.requests.WorkerReport; import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
import eu.openaire.urls_controller.util.*; import eu.openaire.urls_controller.util.ControllerConstants;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
@ -15,7 +17,6 @@ import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.sql.*; import java.sql.*;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -151,7 +152,7 @@ public class UrlController {
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId; String errorMsg = "No results retrieved from the \"getAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg); logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
}*/ }*/
// The cursor is automatically before the first element in this configuration. // The cursor is automatically before the first element in this configuration.
@ -199,7 +200,7 @@ public class UrlController {
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId; errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId;
logger.error(errorMsg); logger.error(errorMsg);
ImpalaConnector.closeConnection(con); ImpalaConnector.closeConnection(con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
} }
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker."); logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
@ -286,19 +287,24 @@ public class UrlController {
long curReportAssignments = workerReport.getAssignmentRequestCounter(); long curReportAssignments = workerReport.getAssignmentRequestCounter();
logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database."); logger.info("Received the WorkerReport for batch-assignments_" + curReportAssignments + ", from the worker with id: " + curWorkerId + ". It contains " + urlReports.size() + " urlReports. Going to request the fullTexts from the Worker and insert the UrlReports into the database.");
ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
}
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location". // Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
if ( ! FileUtils.getAndUploadFullTexts(urlReports, request, curReportAssignments, curWorkerId) ) { if ( ! FileUtils.getAndUploadFullTexts(urlReports, con, request, curReportAssignments, curWorkerId) ) {
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments); logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads. // The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports); FileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
} }
// The "databaseLock" was unlocked inside the "FileUtils.getAndUploadFullTexts" to avoid blocking the database while doing large irrelevant tasks like transferring files.
ImpalaConnector.databaseLock.lock(); ImpalaConnector.databaseLock.lock();
Connection con = ImpalaConnector.getInstance().getConnection();
if ( con == null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Problem when connecting with the Impala-database!");
// Store the workerReport into the database. // Store the workerReport into the database.
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
@ -322,8 +328,8 @@ public class UrlController {
con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end. con.setAutoCommit(false); // Avoid writing to disk for each insert. Write them all in the end.
} catch (SQLException sqle) { } catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!"; String errorMsg = "Problem when setting Connection.AutoCommit to \"false\"!\n";
logger.error(errorMsg + "\n" + sqle.getMessage()); logger.error(errorMsg + sqle.getMessage());
closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con); closePreparedStatements(preparedInsertPayloadStatement, preparedInsertAttemptStatement, con);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} }
@ -362,7 +368,7 @@ public class UrlController {
} }
Error error = urlReport.getError(); Error error = urlReport.getError();
if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of the loop) if ( error == null ) { // A bit rare to happen, but we should fix it (otherwise NPEs will be thrown for the rest of this loop).
logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members."); logger.warn("Error was \"null\" for \"urlReport\": " + urlReport + "\nSetting an empty object with \"null\" members.");
error = new Error(null, null); error = new Error(null, null);
} }
@ -409,6 +415,7 @@ public class UrlController {
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data. // This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table. // Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We do not need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, " WHERE workerid != ", curWorkerId); mergeErrorMsg = FileUtils.mergeParquetFiles("assignment", con, " WHERE workerid != ", curWorkerId);
if ( mergeErrorMsg != null ) { if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();

View File

@ -20,7 +20,7 @@ public class FileUnZipper {
public static void unzipFolder(Path source, Path target) throws Exception public static void unzipFolder(Path source, Path target) throws Exception
{ {
try (ZipInputStream zis = new ZipInputStream(new FileInputStream(source.toFile()))) try ( ZipInputStream zis = new ZipInputStream(new FileInputStream(source.toFile())) )
{ {
// Iterate over the files in zip and un-zip them. // Iterate over the files in zip and un-zip them.
ZipEntry zipEntry = zis.getNextEntry(); ZipEntry zipEntry = zis.getNextEntry();
@ -31,13 +31,12 @@ public class FileUnZipper {
if ( zipEntry.getName().endsWith(File.separator) ) // If we have a directory. if ( zipEntry.getName().endsWith(File.separator) ) // If we have a directory.
Files.createDirectories(targetPath); Files.createDirectories(targetPath);
else { else {
// Some zip stored file path only, need create parent directories, e.g data/folder/file.txt // Some zip -files store only the file-paths and not separate directories. We need to create parent directories, e.g data/folder/file.txt
if ( targetPath.getParent() != null ) { Path parentPath = targetPath.getParent();
if ( Files.notExists(targetPath.getParent()) ) { if ( (parentPath != null) && Files.notExists(parentPath) ) {
Files.createDirectories(targetPath.getParent()); Files.createDirectories(parentPath);
}
} }
Files.copy(zis, targetPath, StandardCopyOption.REPLACE_EXISTING); Files.copy(zis, targetPath, StandardCopyOption.REPLACE_EXISTING); // Copy an individual entry.
} }
zipEntry = zis.getNextEntry(); zipEntry = zis.getNextEntry();
} }

View File

@ -17,13 +17,8 @@ import java.net.URL;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.sql.Connection; import java.sql.*;
import java.sql.SQLException; import java.util.*;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Scanner;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -116,29 +111,77 @@ public class FileUtils {
public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator; public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator;
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames). private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public static boolean getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) public static boolean getAndUploadFullTexts(List<UrlReport> urlReports, Connection con, HttpServletRequest request, long assignmentsBatchCounter, String workerId)
{ {
// The Controller have to request the files from the Worker, in order to upload them to the S3. // The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database. // We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
if ( request == null ) { if ( request == null ) {
logger.error("The \"HttpServletRequest\" is null!"); logger.error("The \"HttpServletRequest\" is null!");
ImpalaConnector.databaseLock.unlock();
return false; return false;
} }
String remoteAddr = request.getHeader("X-FORWARDED-FOR"); String remoteAddr = request.getHeader("X-FORWARDED-FOR");
if ( remoteAddr == null || "".equals(remoteAddr) ) if ( remoteAddr == null || "".equals(remoteAddr) )
remoteAddr = request.getRemoteAddr(); remoteAddr = request.getRemoteAddr();
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ;
PreparedStatement getFileLocationForHashPreparedStatement = null;
try {
getFileLocationForHashPreparedStatement = con.prepareStatement(getFileLocationForHashQuery);
} catch (SQLException sqle) {
ImpalaConnector.databaseLock.unlock();
logger.error("Problem when creating the prepared statement for \"" + getFileLocationForHashQuery + "\"!\n" + sqle.getMessage());
return false;
}
// Get the file-locations. // Get the file-locations.
List<String> allFileNames = new ArrayList<>(urlReports.size()/2); int numFullTextUrlsFound = 0;
for ( UrlReport urlReport : urlReports ) { int numFilesFoundFromPreviousAssignmentsBatches = 0;
HashMultimap<String, String> allFileNamesWithIDsHashMap = HashMultimap.create((urlReports.size() / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( UrlReport urlReport : urlReports )
{
UrlReport.StatusType statusType = urlReport.getStatus(); UrlReport.StatusType statusType = urlReport.getStatus();
if ( (statusType == null) || statusType.equals(UrlReport.StatusType.non_accessible) ) { if ( (statusType == null) || statusType.equals(UrlReport.StatusType.non_accessible) ) {
continue; continue;
} }
numFullTextUrlsFound ++;
Payload payload = urlReport.getPayload(); Payload payload = urlReport.getPayload();
if ( payload != null ) { if ( payload != null )
String fileLocation = payload.getLocation(); {
String fileLocation = null;
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
// Use the same prepared-statement for all requests, to improve speed (just like when inserting similar thing to the DB).
String fileHash = payload.getHash();
if ( fileHash != null ) {
try {
getFileLocationForHashPreparedStatement.setString(1, fileHash);
} catch (SQLException sqle) {
logger.error("Error when setting the parameter in \"getFileLocationForHashQuery\"!\n" + sqle.getMessage());
}
try ( ResultSet resultSet = getFileLocationForHashPreparedStatement.executeQuery() ) {
if ( resultSet.next() ) { // Move the "cursor" to the first row. If there is any data..
fileLocation = resultSet.getString(1);
if ( fileLocation != null ) { // If the full-text of this record is already-found.
payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3.
logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\".");
numFilesFoundFromPreviousAssignmentsBatches ++;
continue;
}
}
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n" + e.getMessage());
}
}
// If the full-text of this record was not found by a previous batch..
fileLocation = payload.getLocation();
if ( fileLocation != null ) { // If the docFile was downloaded (without an error).. if ( fileLocation != null ) { // If the docFile was downloaded (without an error)..
Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation); Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation);
if ( !matcher.matches() ) { if ( !matcher.matches() ) {
@ -148,11 +191,26 @@ public class FileUtils {
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) { if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
continue; continue;
} }
allFileNames.add(fileNameWithExtension); allFileNamesWithIDsHashMap.put(fileNameWithExtension, payload.getId()); // The keys and the values are not duplicate. Task with ID-1 might have an "ID-1.pdf" file.
// While a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1.
} }
} }
} }
// Close the Prepared Statement.
try {
if ( getFileLocationForHashPreparedStatement != null )
getFileLocationForHashPreparedStatement.close();
} catch (SQLException sqle) {
logger.error("Failed to close the \"getFileLocationForHashPreparedStatement\"!\n" + sqle.getMessage());
} finally {
ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database.
}
logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ").");
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithIDsHashMap.keySet());
int numAllFullTexts = allFileNames.size(); int numAllFullTexts = allFileNames.size();
if ( numAllFullTexts == 0 ) { if ( numAllFullTexts == 0 ) {
logger.warn("The file retrieved by the Worker where < 0 > for assignments_" + assignmentsBatchCounter); logger.warn("The file retrieved by the Worker where < 0 > for assignments_" + assignmentsBatchCounter);
@ -164,17 +222,17 @@ public class FileUtils {
if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are. if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches ++; numOfBatches ++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " fullTexts. Going to request them from the Worker, in " + numOfBatches + " batches."); logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches.");
// Check if one full text is left out because of the division. Put it int the last batch. // Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
// Index all UrlReports to be more efficiently searched later. // Index all Payloads to be more efficiently searched later.
HashMap<String, Payload> payloadsHashMap = new HashMap<>(urlReports.size()); HashMultimap<String, Payload> payloadsHashMultimap = HashMultimap.create((urlReports.size() / 3), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( UrlReport urlReport : urlReports ) { for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload(); Payload payload = urlReport.getPayload();
if ( payload != null ) if ( payload != null )
payloadsHashMap.put(payload.getId(), payload); payloadsHashMultimap.put(payload.getId(), payload);
} }
String curAssignmentsBaseLocation = baseTargetLocation + "assignments_" + assignmentsBatchCounter + File.separator; String curAssignmentsBaseLocation = baseTargetLocation + "assignments_" + assignmentsBatchCounter + File.separator;
@ -186,37 +244,35 @@ public class FileUtils {
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter); List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId); HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId);
if ( conn == null ) { if ( conn == null ) {
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch); updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++; failedBatches ++;
continue; // To the next batch. continue; // To the next batch.
} }
String targetLocation = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator; // Get the extracted files.
File curBatchDir = new File(targetLocation); String targetDirectory = curAssignmentsBaseLocation + "batch_" + batchCounter + File.separator;
try { try {
// Get the extracted files., // Create this batch-directory.
Path targetPath = Files.createDirectories(Paths.get(targetLocation)); Path curBatchPath = Files.createDirectories(Paths.get(targetDirectory));
// Unzip the file. Iterate over the PDFs and upload each one of them and get the S3-Url // Unzip the file. Iterate over the PDFs and upload each one of them and get the S3-Url
String zipFileFullPath = targetLocation + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".zip"; String zipFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".zip";
File zipFile = new File(zipFileFullPath); File zipFile = new File(zipFileFullPath);
if ( ! saveZipFile(conn, zipFile) ) { if ( ! saveZipFile(conn, zipFile) ) {
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch); updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
deleteDirectory(curBatchDir);
failedBatches ++; failedBatches ++;
continue; // To the next batch. continue; // To the next batch.
} }
//logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG! //logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG!
FileUnZipper.unzipFolder(Paths.get(zipFileFullPath), targetPath); FileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
String[] fileNames = curBatchDir.list(); String[] fileNames = new File(targetDirectory).list();
if ( (fileNames == null) || (fileNames.length == 0) ) { if ( (fileNames == null) || (fileNames.length <= 1 ) ) { // The directory might have only one file, the "zip-file".
logger.error("No filenames where extracted from directory: " + targetLocation); logger.error("No full-text fileNames where extracted from directory: " + targetDirectory);
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch); updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
deleteDirectory(curBatchDir);
failedBatches ++; failedBatches ++;
continue; // To the next batch. continue; // To the next batch.
} }
@ -225,48 +281,38 @@ public class FileUtils {
int numUploadedFiles = 0; int numUploadedFiles = 0;
for ( String fileName : fileNames ) for ( String fileName : fileNames )
{ {
String fileFullPath = targetLocation + fileName; String fileFullPath = targetDirectory + fileName;
if ( fileFullPath.equals(zipFileFullPath) ) { // Exclude the zip-file from uploading. if ( fileFullPath.equals(zipFileFullPath) ) // Exclude the zip-file from uploading.
continue;
// Check if this stored file is related to one or more IDs from the Set.
Set<String> fileRelatedIDs = allFileNamesWithIDsHashMap.get(fileName);
if ( fileRelatedIDs.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithIDsHashMap" HashMultimap.
logger.error("The stored file \"" + fileName + "\" is not related to any ID which had a file requested from the Worker!");
continue; continue;
} }
// Get the ID of the file.
Matcher matcher = FILENAME_ID.matcher(fileName); if ( isFileNameProblematic(fileName, payloadsHashMultimap) ) // Do some more checks.
if ( !matcher.matches() ) {
continue; continue;
}
String id = matcher.group(1); // At this point, we know that this file is related with one or more IDs of the payloads AND it has a valid fileName.
if ( (id == null) || id.isEmpty() ) { // Let's try to upload the file to S3 and update the payloads of all related IDs, either in successful upload or not.
continue;
}
Payload payload = payloadsHashMap.get(id);
if ( payload == null ) {
continue;
}
String location = payload.getLocation();
if ( location == null ) {
continue;
}
if ( ! location.endsWith(fileName) ) { // That should NEVER happen...
logger.error("The location \"" + location + "\" of the payload matched with the ID \"" + id + "\" is not ending with the filename it was supposed to \"" + fileName + "\"");
continue;
}
String s3Url = S3ObjectStoreMinIO.uploadToS3(fileName, fileFullPath); String s3Url = S3ObjectStoreMinIO.uploadToS3(fileName, fileFullPath);
if ( s3Url != null ) { if ( s3Url != null ) {
payload.setLocation(s3Url); // Update the file-location to the new S3-url. setFullTextForMultipleIDs(payloadsHashMultimap, fileRelatedIDs, s3Url); // It checks weather (s3Url != null) and acts accordingly.
numUploadedFiles ++; numUploadedFiles++;
} else }
setUnretrievedFullText(payload); // Else, the record will have its file-data set to "null", in the end of this method.
} }
logger.info("Finished uploading " + numUploadedFiles + " full-texts of assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore."); logger.info("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -1) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
// (fileNames.length -1) --> minus the zip-file
} catch (Exception e) { } catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch); updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMultimap, fileNamesForCurBatch);
failedBatches ++; failedBatches ++;
} finally {
deleteDirectory(curBatchDir); // Delete the files of this batch (including the zip-file).
} }
} // End of batches. } // End of batches.
@ -278,7 +324,7 @@ public class FileUtils {
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId); logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
return false; return false;
} else { } else {
replaceNotUploadedFileLocations(urlReports); replaceNotUploadedFileLocations(urlReports); // Make sure all records without an s3Url have null file-data.
return true; return true;
} }
} }
@ -315,9 +361,8 @@ public class FileUtils {
String inputLine; String inputLine;
while ( (inputLine = br.readLine()) != null ) while ( (inputLine = br.readLine()) != null )
{ {
if ( !inputLine.isEmpty() ) { if ( !inputLine.isEmpty() )
errorMsgStrB.append(inputLine); errorMsgStrB.append(inputLine);
}
} }
return (errorMsgStrB.length() != 0) ? errorMsgStrB.toString() : null; // Make sure we return a "null" on empty string, to better handle the case in the caller-function. return (errorMsgStrB.length() != 0) ? errorMsgStrB.toString() : null; // Make sure we return a "null" on empty string, to better handle the case in the caller-function.
} catch ( IOException ioe ) { } catch ( IOException ioe ) {
@ -367,6 +412,74 @@ public class FileUtils {
} }
private static final int bufferSize = 20971520; // 20 MB
public static boolean saveZipFile(HttpURLConnection conn, File zipFile)
{
InputStream inStream = null;
FileOutputStream outStream = null;
try {
inStream = conn.getInputStream();
outStream = new FileOutputStream(zipFile);
byte[] byteBuffer = new byte[bufferSize]; // 20 MB
int bytesRead = -1;
while ( (bytesRead = inStream.read(byteBuffer, 0, bufferSize)) != -1 ) {
outStream.write(byteBuffer, 0, bytesRead);
}
return true;
} catch (Exception e) {
logger.error("Could not save the zip file \"" + zipFile.getName() + "\": " + e.getMessage(), e);
return false;
} finally {
try {
if ( inStream != null )
inStream.close();
if ( outStream != null )
outStream.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
private static boolean isFileNameProblematic(String fileName, HashMultimap<String, Payload> payloadsHashMultimap)
{
// Get the ID of the file.
Matcher matcher = FILENAME_ID.matcher(fileName);
if ( !matcher.matches() ) {
logger.error("The given fileName \"" + fileName + "\" was invalid! Could not be matched with matcher: " + matcher);
return true;
}
String fileID = matcher.group(1);
if ( (fileID == null) || fileID.isEmpty() ) {
logger.error("The given fileName \"" + fileName + "\" was invalid. No fileID was extracted!");
return true;
}
// Take the payloads which are related with this ID. An ID might have multiple original-urls, thus multiple payloads.
// The ID we have here, is the one from the first record which reached to this file.
// There might be other records pointing to this file. But, in order to mark this file as "valid", we have to match it with at least one of the records-IDs.
// We do this process to avoid handling and uploading irrelevant files which could find their way to the working directory (either because of a Worker's error or any other type of malfunction or even malicious action).
Set<Payload> payloads = payloadsHashMultimap.get(fileID);
if ( payloads.isEmpty() ) {
logger.error("The given fileID \"" + fileID + "\" was not part of the \"payloadsHashMultimap\"!");
return true;
}
// Search through the payloads to find at least one match, in order for this file to NOT be "problematic".
for ( Payload payload : payloads )
{
String location = payload.getLocation();
if ( (location != null) && location.endsWith(fileName) )
return false; // It's not problematic.
}
logger.error("None of the locations of the payloads matched with the ID \"" + fileID + "\" are ending with the filename \"" + fileName + "\" they were supposed to.");
return true;
}
/** /**
* This method updates the UrlReports to not point to any downloaded fullText files. * This method updates the UrlReports to not point to any downloaded fullText files.
* This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails. * This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails.
@ -397,7 +510,7 @@ public class FileUtils {
} }
public static void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMap<String, Payload> payloadsHashMap, List<String> fileNames) public static void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMultimap<String, Payload> payloadsHashMultimap, List<String> fileNames)
{ {
for ( String fileName : fileNames ) { for ( String fileName : fileNames ) {
// Get the ID of the file. // Get the ID of the file.
@ -409,9 +522,11 @@ public class FileUtils {
if ( (id == null) || id.isEmpty() ) { if ( (id == null) || id.isEmpty() ) {
continue; continue;
} }
Payload payload = payloadsHashMap.get(id); Set<Payload> payloads = payloadsHashMultimap.get(id);
if ( payload != null ) // Set for all payloads connected to this ID.
setUnretrievedFullText(payload); // It changes the payload in the original UrlReport list. for ( Payload payload : payloads )
if ( payload != null )
setUnretrievedFullText(payload); // It changes the payload in the original UrlReport list.
} }
} }
@ -426,33 +541,24 @@ public class FileUtils {
} }
private static final int bufferSize = 20971520; // 20 MB /**
public static boolean saveZipFile(HttpURLConnection conn, File zipFile) * Set the fileLocation for all those IDs related to the File. The IDs may have one or more payloads.
* @param payloadsHashMultimap
* @param fileIDs
* @param s3Url
*/
public static void setFullTextForMultipleIDs(HashMultimap<String, Payload> payloadsHashMultimap, Set<String> fileIDs, String s3Url)
{ {
FileOutputStream outStream = null; for ( String id : fileIDs ) {
InputStream inStream = null; Set<Payload> payloads = payloadsHashMultimap.get(id);
try { if ( payloads.isEmpty() ) {
inStream = conn.getInputStream(); logger.error("The given id \"" + id + "\" (coming from the \"allFileNamesWithIDsHashMap\"), is not found inside the \"payloadsHashMultimap\"!");
outStream = new FileOutputStream(zipFile); continue;
}
byte[] byteBuffer = new byte[bufferSize]; // 20 MB for ( Payload payload : payloads )
int bytesRead = -1; if ( payload.getHash() != null ) // Update only for the records which led to a file, not all the records of this ID (an ID might have multiple original_urls pointing to different directions).
while ( (bytesRead = inStream.read(byteBuffer, 0, bufferSize)) != -1 ) { payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker.
outStream.write(byteBuffer, 0, bytesRead);
}
return true;
} catch (Exception e) {
logger.error("Could not save the zip file \"" + zipFile.getName() + "\": " + e.getMessage(), e);
return false;
} finally {
try {
if ( inStream != null )
inStream.close();
if ( outStream != null )
outStream.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} }
} }

View File

@ -97,7 +97,7 @@ public class S3ObjectStoreMinIO {
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build()); minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());
} }
else else
logger.warn("Bucket \"" + bucketName + "\" already exists."); logger.debug("Bucket \"" + bucketName + "\" already exists.");
} catch (Exception e) { } catch (Exception e) {
String errorMsg = "Could not create the bucket \"" + bucketName + "\"!"; String errorMsg = "Could not create the bucket \"" + bucketName + "\"!";
logger.error(errorMsg ,e); logger.error(errorMsg ,e);