Bug fixes and improvements:

- Fix an NPE, when the "getTestUrls"-endpoint is called. It was thrown because of an absent per-thread initialization of some thread-local variables.
- Fix JdbcTemplate error when querying the "getFileLocationForHashQuery".
- Fix the "S3ObjectStore.isLocationInStore" check.
- Fix not catching/handling some exceptions.
- Fix/improve log-messages.
- Optimize the "getFileLocationForHashQuery" to return only the first row. In the latest change, without this optimization, the query-result would cause non-handling the same-hash cases, because of an exception.
- Optimize the "ImpalaConnector.databaseLock.lock()" positioning.
- Update the "getTestUrls" api-path.
- Optimize list-allocation.
- Re-add the info-message about the successful emptying of the S3-bucket.
- Code cleanup.
This commit is contained in:
Lampros Smyrnaios 2022-02-02 20:19:46 +02:00
parent d1c86ff273
commit be4898e43e
7 changed files with 140 additions and 135 deletions

View File

@ -24,25 +24,27 @@ public class ImpalaConnector {
public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database.
public ImpalaConnector(@Value("${services.pdfaggregation.controller.db.oldDatabaseName}") String oldDatabaseName,
@Value("${services.pdfaggregation.controller.db.databaseName}") String databaseName) {
this.oldDatabaseName = oldDatabaseName;
this.databaseName = databaseName;
}
@PostConstruct
public void init() {
logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes.");
try {
if ( jdbcTemplate.getDataSource().getConnection().getMetaData().supportsBatchUpdates() )
logger.warn("The database does not support \"BatchUpdates\"!");
boolean supportsBatchUpdates = jdbcTemplate.getDataSource().getConnection().getMetaData().supportsBatchUpdates();
logger.info("The database " + (supportsBatchUpdates ? "supports" : "does not support") + " \"BatchUpdates\"!");
} catch (Exception e) {
logger.error("Error testing if database supports batch updates", e);
logger.error("Error testing if database supports batch updates!", e);
}
createDatabase();
createDatabase(); // In case of an exception, the App will exit with the stacktrace.
}
private void createDatabase() {
logger.info("Going to create the database and the tables, if they do not exist. Also will fill some tables with data from OpenAIRE.");
@ -74,9 +76,11 @@ public class ImpalaConnector {
logger.info("The database \"" + databaseName + "\" and its tables were created or validated.");
}
public static String handlePreparedStatementException(String queryName, String query, Exception e) {
public static String handleQueryException(String queryName, String query, Exception e) {
String errorMsg = "Problem when creating " + (( ! queryName.startsWith("get")) ? "and executing " : "") + "the prepared statement for \"" + queryName + "\"!\n";
logger.error(errorMsg + "\n\n" + query + "\n\n", e);
return errorMsg;
}
}

View File

@ -21,4 +21,5 @@ public class GeneralController {
return ResponseEntity.ok().build();
}
}

View File

@ -13,32 +13,35 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
@RestController
@RequestMapping("/test")
public class TestController extends GeneralController {
public class TestController {
private static final Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
private TestFileUtils fileUtils;
private TestFileUtils testFileUtils;
@Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentLimit;
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
@GetMapping("test")
@GetMapping("urls")
public ResponseEntity<?> getTestUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
logger.info("Worker with id: \"" + workerId + "\", requested " + workerAssignmentsLimit + " test-assignments. The assignments-limit of the controller is: " + this.assignmentLimit);
logger.debug("Going to retrieve the data from the inputResourceFile: " + testFileUtils.testResource.getFilename());
List<Assignment> assignments = new ArrayList<>();
HashMultimap<String, String> loadedIdUrlPairs;
@ -48,9 +51,9 @@ public class TestController extends GeneralController {
// Start loading urls.
while ( true ) {
loadedIdUrlPairs = fileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
loadedIdUrlPairs = testFileUtils.getNextIdUrlPairBatchFromJson(); // Take urls from jsonFile.
if ( fileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
if ( testFileUtils.isFinishedLoading(loadedIdUrlPairs.isEmpty(), isFirstRun) ) // Throws RuntimeException which is automatically passed on.
break;
else
isFirstRun = false;
@ -73,12 +76,13 @@ public class TestController extends GeneralController {
}
}// end loading-while-loop
Scanner scanner = fileUtils.inputScanner.get();
Scanner scanner = testFileUtils.inputScanner.get();
if ( scanner != null ) // Check if the initial value is null.
scanner.close();
long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet();
logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + fileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
logger.info("Sending batch_" + curAssignmentsBatchCounter + " with " + assignments.size() + " assignments (" + testFileUtils.duplicateIdUrlEntries.get() + " more assignments were discarded as duplicates), to worker with ID: " + workerId);
return ResponseEntity.status(HttpStatus.OK).header("Content-Type", "application/json").body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
}

View File

@ -13,7 +13,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
@ -23,6 +22,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@RestController
@RequestMapping("/urls")
public class UrlController {
@ -35,7 +35,7 @@ public class UrlController {
@Autowired
private FileUtils fileUtils;
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); // Just for the "getTestUrls"-endpoint.
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}")
@ -47,6 +47,7 @@ public class UrlController {
@Value("${services.pdfaggregation.controller.db.databaseName}")
private String databaseName;
@GetMapping("")
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
@ -90,78 +91,77 @@ public class UrlController {
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
String createAssignmentsQuery = "create table " + databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String createCurrentAssignmentsQuery = "create table " + databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + databaseName + ".current_assignment";
String getAssignmentsQuery = "select * from " + databaseName + ".current_assignment";
List<Assignment> assignments = new ArrayList<>();
List<Assignment> assignments = new ArrayList<>(assignmentsLimit);
ImpalaConnector.databaseLock.lock();
try {
jdbcTemplate.execute(createAssignmentsQuery);
jdbcTemplate.execute(createCurrentAssignmentsQuery);
} catch (Exception sqle) {
ImpalaConnector.databaseLock.unlock();
String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
try {
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
} catch (Exception sqle) {
String errorMsg = dropCurrentAssignmentTable();
ImpalaConnector.databaseLock.unlock();
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, sqle);
errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
try {
jdbcTemplate.query(getAssignmentsQuery, new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setTimestamp(timestamp);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
assignment.setId(rs.getString(1));
assignment.setOriginalUrl(rs.getString(2));
datasource.setId(rs.getString(3));
datasource.setName(rs.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
}
assignment.setDatasource(datasource);
assignments.add(assignment);
jdbcTemplate.query(getAssignmentsQuery, rs -> {
Assignment assignment = new Assignment();
assignment.setWorkerId(workerId);
assignment.setTimestamp(timestamp);
Datasource datasource = new Datasource();
try { // For each of the 4 columns returned. The indexing starts from 1
assignment.setId(rs.getString(1));
assignment.setOriginalUrl(rs.getString(2));
datasource.setId(rs.getString(3));
datasource.setName(rs.getString(4));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
}
assignment.setDatasource(datasource);
assignments.add(assignment);
});
} catch (Exception e) {
String errorMsg = dropCurrentAssignmentTable();
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
String errorMsg = dropCurrentAssignmentTable();
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests.";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg);
} else if ( assignmentsSize < assignmentsLimit ) {
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
@ -181,23 +181,26 @@ public class UrlController {
jdbcTemplate.execute(insertAssignmentsQuery);
} catch (Exception sqle) {
String errorMsg = dropCurrentAssignmentTable();
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
ImpalaConnector.databaseLock.unlock();
errorMsg = ImpalaConnector.handlePreparedStatementException("insertAssignmentsQuery", insertAssignmentsQuery, sqle);
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
String errorMsg = dropCurrentAssignmentTable();
if ( errorMsg != null ) // The "databaseLock" is already unlocked.
if ( errorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
}
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. Going to merge the parquet files for this table.");
String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
@ -208,6 +211,7 @@ public class UrlController {
return ResponseEntity.status(HttpStatus.OK).body(new AssignmentsResponse(curAssignmentsBatchCounter, assignments));
}
@PostMapping("addWorkerReport")
public ResponseEntity<?> addWorkerReport(@RequestBody WorkerReport workerReport, HttpServletRequest request) {
@ -248,44 +252,38 @@ public class UrlController {
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for assignments_" + curReportAssignments);
// The docUrls were still found! Just update ALL the fileLocations. sizes and hashes, to show that the files are not available and continue with writing the attempts and the Payloads.
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available and continue with writing the attempts and the payloads.
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports);
}
ImpalaConnector.databaseLock.lock();
// Store the workerReport into the database.
// Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons.
String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
String payloadErrorMsg = null;
int failedCount = 0;
// TODO - Think about handling this loop with multiple threads..
// The Impala-server will handle the synchronization itself..
// Check online what happens with "statement.setPoolable()" does it improves speed? in multi or also in single thread?
ImpalaConnector.databaseLock.lock();
// TODO - Think about handling this loop with multiple threads.. The Impala-server will handle the synchronization itself..
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
if ( payload == null ) {
logger.warn("Payload was \"null\" for a \"urlReport\", in assignments_" + curReportAssignments);
payloadErrorMsg = (++failedCount) + " urlReports failed to be processed because they had no payload!";
continue;
}
try { // We use a "PreparedStatement" to do insertions, for security and valid SQL syntax reasons.
try {
Long size = payload.getSize();
Object[] args = new Object[] {
payload.getId(), payload.getOriginal_url(), payload.getActual_url(), payload.getTimestamp_acquired(),
payload.getMime_type(), payload.getSize() != null?String.valueOf(payload.getSize()):null, payload.getHash(),
payload.getMime_type(), (size != null) ? String.valueOf(size) : null, payload.getHash(),
payload.getLocation(), payload.getProvenance()};
int[] argTypes = new int[] {
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
jdbcTemplate.update(insertIntoPayloadBaseQuery, args, argTypes);
} catch (Exception sqle) {
logger.error("Problem when executing the \"insertIntoPayloadBaseQuery\": ", sqle);
}
@ -300,17 +298,14 @@ public class UrlController {
Object[] args = new Object[] {
payload.getId(), payload.getOriginal_url(), payload.getTimestamp_acquired(),
urlReport.getStatus().toString(), String.valueOf(error.getType()), error.getMessage()};
int[] argTypes = new int[] {
Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR};
int[] argTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
jdbcTemplate.update(insertIntoAttemptBaseQuery, args, argTypes);
} catch (Exception sqle) {
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": ", sqle.getMessage());
logger.error("Problem when executing the \"insertIntoAttemptBaseQuery\": " + sqle.getMessage());
}
}//end for-loop
if ( payloadErrorMsg != null )
logger.debug("Finished inserting the payloads and the attempts into the \"payload\" and \"attempt\" tables, although " + payloadErrorMsg + " Going to merge the parquet files for those tables.");
else
@ -319,33 +314,30 @@ public class UrlController {
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
// This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We do not need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg);
}
ImpalaConnector.databaseLock.unlock();
logger.debug("Finished merging the database tables.");
return ResponseEntity.status(HttpStatus.OK).body(payloadErrorMsg);
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
private static PreparedStatement constructLargeInsertQuery(Connection con, String baseInsertQuery, int dataSize, int numParamsPerRow) throws RuntimeException {
@ -374,15 +366,15 @@ public class UrlController {
return preparedInsertStatement;
}
private String dropCurrentAssignmentTable() {
String dropCurrentAssignmentsQuery = "DROP TABLE " + databaseName + ".current_assignment PURGE";
try {
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
return null;
} catch (Exception sqle) {
ImpalaConnector.databaseLock.unlock();
return ImpalaConnector.handlePreparedStatementException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, sqle);
} catch (Exception e) {
return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e);
}
}
}

View File

@ -4,12 +4,12 @@ import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import org.codehaus.groovy.syntax.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@ -20,6 +20,7 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@ -46,8 +47,6 @@ public class FileUtils {
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}
public FileUtils() throws RuntimeException {
}
/**
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
@ -70,7 +69,7 @@ public class FileUtils {
if ( parameter == null )
parameter = "";
else
parameter = " '" + parameter + "'"; // This will be a "string-check".
parameter = " '" + parameter + "'"; // This will be a "string-check", thus the single-quotes.
try {
jdbcTemplate.execute("CREATE TABLE " + databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + databaseName + "." + tableName + " " + whereClause + parameter);
@ -86,6 +85,7 @@ public class FileUtils {
return null; // No errorMsg, everything is fine.
}
private final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$");
private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$");
@ -94,6 +94,7 @@ public class FileUtils {
private final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
public UploadFullTextsResponse getAndUploadFullTexts(List<UrlReport> urlReports, HttpServletRequest request, long assignmentsBatchCounter, String workerId) {
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
@ -106,14 +107,13 @@ public class FileUtils {
if ( remoteAddr == null || "".equals(remoteAddr) )
remoteAddr = request.getRemoteAddr();
ImpalaConnector.databaseLock.lock();
String getFileLocationForHashQuery = "select `location` from " + databaseName + ".payload where `hash` = ?" ;
// Get the file-locations.
int numFullTextUrlsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
HashMultimap<String, String> allFileNamesWithIDsHashMap = HashMultimap.create((urlReports.size() / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
String getFileLocationForHashQuery = "select `location` from " + databaseName + ".payload where `hash` = ? limit 1" ;
ImpalaConnector.databaseLock.lock();
for ( UrlReport urlReport : urlReports ) {
UrlReport.StatusType statusType = urlReport.getStatus();
@ -126,7 +126,7 @@ public class FileUtils {
if ( payload == null )
continue;
String fileLocation;
String fileLocation = null;
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
@ -134,21 +134,23 @@ public class FileUtils {
// Use the same prepared-statement for all requests, to improve speed (just like when inserting similar thing to the DB).
String fileHash = payload.getHash();
if ( fileHash != null ) {
fileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, new int[] {Types.STRING}, String.class);
try {
fileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, new int[] {Types.VARCHAR}, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n", e);
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection.
}
if ( fileLocation != null ) { // If the full-text of this record is already-found and uploaded.
payload.setLocation(fileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
//logger.debug("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + fileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
continue; // Do not request the file from the worker, it's already uploaded. Move on.
}
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - Since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// TODO - In case we DO return, UNLOCK the database-lock and close the Prepared statement (it's not auto-closed here)and the Database connection.
}
// If the full-text of this record was not found by a previous batch...
@ -165,9 +167,9 @@ public class FileUtils {
allFileNamesWithIDsHashMap.put(fileNameWithExtension, payload.getId()); // The keys and the values are not duplicate. Task with ID-1 might have an "ID-1.pdf" file.
// While a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}
}
}// end-for
ImpalaConnector.databaseLock.unlock(); // The rest work of this function does not use the database.
ImpalaConnector.databaseLock.unlock(); // The remaining work of this function does not use the database.
logger.info("NumFullTextUrlsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextUrlsFound + " (out of " + urlReports.size() + ").");
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
@ -259,10 +261,12 @@ public class FileUtils {
// At this point, we know that this file is related with one or more IDs of the payloads AND it has a valid fileName.
// Let's try to upload the file to S3 and update the payloads of all related IDs, either in successful upload or not.
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
if ( s3Url != null ) {
setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url); // It checks weather (s3Url != null) and acts accordingly.
try {
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url);
numUploadedFiles ++;
} catch (Exception e) {
logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e);
}
// Else, the record will have its file-data set to "null", in the end of this method.
}
@ -448,7 +452,7 @@ public class FileUtils {
if ( payload != null ) {
String fileLocation = payload.getLocation();
if ( (fileLocation != null) && (! s3ObjectStore.locationInStore(fileLocation)) )
if ( (fileLocation != null) && (! s3ObjectStore.isLocationInStore(fileLocation)) )
setUnretrievedFullText(payload);
}
}

View File

@ -50,11 +50,11 @@ public class S3ObjectStore {
}
// Make the bucket, if not exist.
if ( !bucketExists ) {
logger.info("Bucket \"" + bucketName + "\" does not exist! Going to create it..");
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());
} else
logger.debug("Bucket \"" + bucketName + "\" already exists.");
if ( !bucketExists ) {
logger.info("Bucket \"" + bucketName + "\" does not exist! Going to create it..");
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());
} else
logger.debug("Bucket \"" + bucketName + "\" already exists.");
if ( shouldShowAllS3Buckets ) {
List<Bucket> buckets = null;
@ -89,10 +89,10 @@ public class S3ObjectStore {
else {
if ( extension.equals("pdf") )
contentType = "application/pdf";
/*else if ( *//* TODO - other-extension-match *//* )
contentType = "application/pdf"; */
/*else if ( *//* TODO - other-extension-match *//* )
contentType = "application/EXTENSION"; */
else
contentType = "application/pdf";
contentType = "application/pdf"; // Default.
}
} else {
logger.warn("The file with key \"" + fileObjKeyName + "\" does not have a file-extension! Setting the \"pdf\"-mimeType.");
@ -104,8 +104,9 @@ public class S3ObjectStore {
.object(fileObjKeyName).filename(fileFullPath)
.contentType(contentType).build());
// TODO - What if the fileObjKeyName already exists?
// Right now it gets overwritten (unless we add versioning, which is irrelevant for different objects..)
// TODO - What if the fileObjKeyName already exists? Right now it gets overwritten (unless we add versioning0, which is not currently supported by our S3ObjectStore).
// Each Worker handles some of these cases, but in case of id-urls splitting between different workers or re-attempting some temporarily faulty urls later,
// duplicate fileNames may appear and cause file-overwriting from the part of S3ObjectStore.
String s3Url = s3Protocol + bucketName + "/" + fileObjKeyName; // Be aware: This url works only if the access to the bucket is public.
//logger.debug("Uploaded file \"" + fileObjKeyName + "\". The s3Url is: " + s3Url);
@ -130,10 +131,12 @@ public class S3ObjectStore {
// Lastly, delete the empty bucket.
minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
}
logger.info("Bucket " + bucketName + " was " + (shouldDeleteBucket ? "deleted!" : "emptied!"));
}
public boolean locationInStore(String location) {
return location.startsWith(endpoint);
public boolean isLocationInStore(String location) {
return location.startsWith(s3Protocol);
}
private void deleteFile(String fileObjKeyName, String bucketName) throws Exception {

View File

@ -4,14 +4,12 @@ import com.google.common.collect.HashMultimap;
import eu.openaire.urls_controller.models.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.boot.configurationprocessor.json.JSONObject;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Scanner;
@ -21,33 +19,30 @@ public class TestFileUtils {
private static final Logger logger = LoggerFactory.getLogger(TestFileUtils.class);
Resource testResource = new ClassPathResource("testInputFiles/orderedList1000.json");
public Resource testResource = new ClassPathResource("testInputFiles/orderedList1000.json");
public ThreadLocal<Integer> duplicateIdUrlEntries = new ThreadLocal<>();
public ThreadLocal<Scanner> inputScanner = new ThreadLocal<>(); // Every Thread has its own variable.
public ThreadLocal<Integer> duplicateIdUrlEntries;
public ThreadLocal<Scanner> inputScanner;
private final int jsonBatchSize = 3000;
private final ThreadLocal<Integer> fileIndex = new ThreadLocal<>();
private final ThreadLocal<Integer> unretrievableInputLines = new ThreadLocal<>();
private ThreadLocal<Integer> fileIndex;
private ThreadLocal<Integer> unretrievableInputLines;
private final String utf8Charset = "UTF-8";
public TestFileUtils() throws IOException {
String resourceFileName = "testInputFiles/orderedList1000.json";
public TestFileUtils() throws IOException {
InputStream inputStream = testResource.getInputStream();
if ( inputStream == null )
throw new RuntimeException("No resourceFile was found with name \"" + resourceFileName + "\".");
throw new RuntimeException("No resourceFile was found with name \"" + testResource.getFilename() + "\"!");
logger.debug("Going to retrieve the data from the inputResourceFile: " + resourceFileName);
inputScanner.set(new Scanner(inputStream, utf8Charset));
fileIndex.set(0);
unretrievableInputLines.set(0);
duplicateIdUrlEntries.set(0);
inputScanner = ThreadLocal.withInitial(() -> new Scanner(inputStream, utf8Charset));
fileIndex = ThreadLocal.withInitial(() -> 0);
unretrievableInputLines = ThreadLocal.withInitial(() -> 0);
duplicateIdUrlEntries = ThreadLocal.withInitial(() -> 0);
}
/**
* This method parses a Json file and extracts the urls, along with the IDs.
* @return HashMultimap<String, String>
@ -58,7 +53,6 @@ public class TestFileUtils {
int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID;
HashMultimap<String, String> idAndUrlMappedInput = HashMultimap.create(expectedIDsPerBatch, expectedPathsPerID);
int curBeginning = fileIndex.get();
while ( inputScanner.get().hasNextLine() && (fileIndex.get() < (curBeginning + jsonBatchSize)) )
@ -91,6 +85,7 @@ public class TestFileUtils {
return idAndUrlMappedInput;
}
/**
* This method decodes a Json String and returns its members.
* @param jsonLine String
@ -117,6 +112,7 @@ public class TestFileUtils {
return new Task(idStr, urlStr, null);
}
/**
* This method checks if there is no more input-data and returns true in that case.
* Otherwise, it returns false, if there is more input-data to be loaded.
@ -124,7 +120,6 @@ public class TestFileUtils {
* @param isEmptyOfData
* @param isFirstRun
* @return finished loading / not finished
* @throws RuntimeException
*/
public boolean isFinishedLoading(boolean isEmptyOfData, boolean isFirstRun) {
if ( isEmptyOfData ) {
@ -137,6 +132,7 @@ public class TestFileUtils {
return false;
}
/**
* This method returns the number of (non-heading, non-empty) lines we have read from the inputFile.
* @return loadedUrls
@ -144,4 +140,5 @@ public class TestFileUtils {
private int getCurrentlyLoadedUrls() { // In the end, it gives the total number of urls we have processed.
return fileIndex.get() - unretrievableInputLines.get();
}
}