- Add support for more than one full-text per id. Allow recognizing fileName additions: "id(1).pdf", "id(2).pdf", etc.

- Fix not giving the databaseName in the "ImpalaController.get10PublicationIdsTest()".
- Improve consistency in the "maxAttemptsPerRecord" value, among different threads. Also, reduce the value-increase by one.
- Check if the tableName string is empty, in the "mergeParquetFiles".
- Improve error-logging.
- Set some local variables to "final", optimizing code-execution by the JVM.
This commit is contained in:
Lampros Smyrnaios 2022-02-07 13:57:09 +02:00
parent 5d70e82504
commit 1111c850b9
4 changed files with 65 additions and 52 deletions

View File

@ -78,7 +78,7 @@ public class ImpalaConnector {
public static String handleQueryException(String queryName, String query, Exception e) { public static String handleQueryException(String queryName, String query, Exception e) {
String errorMsg = "Problem when creating " + (( ! queryName.startsWith("get")) ? "and executing " : "") + "the prepared statement for \"" + queryName + "\"!\n"; String errorMsg = "Problem when executing the query \"" + queryName + "\"!\n";
logger.error(errorMsg + "\n\n" + query + "\n\n", e); logger.error(errorMsg + "\n\n" + query + "\n\n", e);
return errorMsg; return errorMsg;
} }

View File

@ -3,6 +3,7 @@ package eu.openaire.urls_controller.controllers;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
@ -23,10 +24,13 @@ public class ImpalaController {
@Autowired @Autowired
private JdbcTemplate jdbcTemplate; private JdbcTemplate jdbcTemplate;
@Value("${services.pdfaggregation.controller.db.databaseName}")
private String databaseName;
@GetMapping("get10PublicationIdsTest") @GetMapping("get10PublicationIdsTest")
public ResponseEntity<?> get10PublicationIdsTest() { public ResponseEntity<?> get10PublicationIdsTest() {
String query = "SELECT id FROM publication LIMIT 10;"; String query = "SELECT id FROM " + databaseName + ".publication LIMIT 10;";
try { try {
List<String> publications = jdbcTemplate.queryForList(query, String.class); List<String> publications = jdbcTemplate.queryForList(query, String.class);
@ -38,4 +42,5 @@ public class ImpalaController {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} }
} }
} }

View File

@ -42,15 +42,19 @@ public class UrlController {
private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*"); private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*");
@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}")
private int maxAttemptsPerRecord;
@Value("${services.pdfaggregation.controller.assignmentLimit}") @Value("${services.pdfaggregation.controller.assignmentLimit}")
private int assignmentLimit; private int assignmentLimit;
@Value("${services.pdfaggregation.controller.db.databaseName}") @Value("${services.pdfaggregation.controller.db.databaseName}")
private String databaseName; private String databaseName;
private AtomicInteger maxAttemptsPerRecordAtomic;
public UrlController(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord) {
maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord);
}
@GetMapping("") @GetMapping("")
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) { public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
@ -88,12 +92,13 @@ public class UrlController {
" select a.id, a.original_url from " + databaseName + ".assignment a\n" + " select a.id, a.original_url from " + databaseName + ".assignment a\n" +
" union all\n" + " union all\n" +
" select pl.id, pl.original_url from " + databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" + " select pl.id, pl.original_url from " + databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecord + " and not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + " and not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" + "limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" + "order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
"limit " + assignmentsLimit + ") as findAssignmentsQuery"; "limit " + assignmentsLimit + ") as findAssignmentsQuery";
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
//logger.debug(findAssignmentsQuery); // DEBUG!
String createCurrentAssignmentsQuery = "create table " + databaseName + ".current_assignment as \n" + findAssignmentsQuery; String createCurrentAssignmentsQuery = "create table " + databaseName + ".current_assignment as \n" + findAssignmentsQuery;
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + databaseName + ".current_assignment"; String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + databaseName + ".current_assignment";
@ -113,14 +118,12 @@ public class UrlController {
try { try {
jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery); jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery);
} catch (Exception sqle) { } catch (Exception e) {
String errorMsg = dropCurrentAssignmentTable(); String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} }
@ -144,32 +147,25 @@ public class UrlController {
assignments.add(assignment); assignments.add(assignment);
}); });
} catch (Exception e) { } catch (Exception e) {
String errorMsg = dropCurrentAssignmentTable(); String errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n";
logger.error(errorMsg, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} }
int assignmentsSize = assignments.size(); int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) { if ( assignmentsSize == 0 ) {
String errorMsg = dropCurrentAssignmentTable(); String errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.";
ImpalaConnector.databaseLock.unlock();
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests.
errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests.";
logger.error(errorMsg); logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg); String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} else if ( assignmentsSize < assignmentsLimit ) { } else if ( assignmentsSize < assignmentsLimit ) {
maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests. logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests.");
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests.");
} }
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker."); logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
@ -183,14 +179,12 @@ public class UrlController {
try { try {
jdbcTemplate.execute(insertAssignmentsQuery); jdbcTemplate.execute(insertAssignmentsQuery);
} catch (Exception sqle) { } catch (Exception e) {
String errorMsg = dropCurrentAssignmentTable(); String errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e);
String tmpErrMsg = dropCurrentAssignmentTable();
if ( tmpErrMsg != null )
errorMsg += "\n" + tmpErrMsg;
ImpalaConnector.databaseLock.unlock(); ImpalaConnector.databaseLock.unlock();
if ( errorMsg != null )
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, sqle);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} }
@ -263,11 +257,11 @@ public class UrlController {
} }
// Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons. // Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons.
String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; final String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; final int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; final String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
int[] attemptArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; final int[] attemptArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
final AtomicInteger failedCount = new AtomicInteger(0); final AtomicInteger failedCount = new AtomicInteger(0);

View File

@ -56,7 +56,7 @@ public class FileUtils {
* */ * */
public String mergeParquetFiles(String tableName, String whereClause, String parameter) { public String mergeParquetFiles(String tableName, String whereClause, String parameter) {
String errorMsg; String errorMsg;
if ( tableName == null ) { if ( (tableName == null) || tableName.isEmpty() ) {
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!"; errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
logger.error(errorMsg); logger.error(errorMsg);
return errorMsg; return errorMsg;
@ -86,8 +86,9 @@ public class FileUtils {
} }
private final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$"); private final String numAndExtension = "(?:\\([\\d]+\\))?\\.[\\w]{2,10}";
private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$"); private final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)" + numAndExtension + "$");
private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+" + numAndExtension + ")$");
@Value("services.pdfaggregation.controller.baseTargetLocation") @Value("services.pdfaggregation.controller.baseTargetLocation")
private String baseTargetLocation; private String baseTargetLocation;
@ -157,7 +158,7 @@ public class FileUtils {
fileLocation = payload.getLocation(); fileLocation = payload.getLocation();
if ( fileLocation != null ) { // If the docFile was downloaded (without an error).. if ( fileLocation != null ) { // If the docFile was downloaded (without an error)..
Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation); Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation);
if ( !matcher.matches() ) { if ( ! matcher.matches() ) {
continue; continue;
} }
String fileNameWithExtension = matcher.group(1); String fileNameWithExtension = matcher.group(1);
@ -263,7 +264,7 @@ public class FileUtils {
try { try {
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath); String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url); setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url, fileName);
numUploadedFiles ++; numUploadedFiles ++;
} catch (Exception e) { } catch (Exception e) {
logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e); logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e);
@ -354,7 +355,6 @@ public class FileUtils {
private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch) { private String getRequestUrlForBatch(String baseUrl, List<String> fileNamesForCurBatch) {
final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50); final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50);
sb.append(baseUrl); sb.append(baseUrl);
int numFullTextsCurBatch = fileNamesForCurBatch.size(); int numFullTextsCurBatch = fileNamesForCurBatch.size();
for ( int j=0; j < numFullTextsCurBatch; ++j ){ for ( int j=0; j < numFullTextsCurBatch; ++j ){
@ -426,7 +426,7 @@ public class FileUtils {
return false; // It's not problematic. return false; // It's not problematic.
} }
logger.error("None of the locations of the payloads matched with the ID \"" + fileID + "\" are ending with the filename \"" + fileName + "\" they were supposed to."); logger.error("None of the locations of the payloads matched with the ID \"" + fileID + "\" are ending with the filename \"" + fileName + "\", as it was supposed to.\nThe related payloads are: " + payloads);
return true; return true;
} }
@ -488,8 +488,9 @@ public class FileUtils {
* @param fileIDs * @param fileIDs
* @param payloadsHashMultimap * @param payloadsHashMultimap
* @param s3Url * @param s3Url
* @param fileNameWithExt
*/ */
public void setFullTextForMultipleIDs(Set<String> fileIDs, HashMultimap<String, Payload> payloadsHashMultimap, String s3Url) { public void setFullTextForMultipleIDs(Set<String> fileIDs, HashMultimap<String, Payload> payloadsHashMultimap, String s3Url, String fileNameWithExt) {
for ( String id : fileIDs ) { for ( String id : fileIDs ) {
Set<Payload> payloads = payloadsHashMultimap.get(id); Set<Payload> payloads = payloadsHashMultimap.get(id);
if ( payloads.isEmpty() ) { if ( payloads.isEmpty() ) {
@ -497,9 +498,22 @@ public class FileUtils {
continue; continue;
} }
for ( Payload payload : payloads ) for ( Payload payload : payloads ) {
if ( payload.getHash() != null ) // Update only for the records which led to a file, not all the records of this ID (an ID might have multiple original_urls pointing to different directions). // Update only for the records which led to a file, not all the records of this ID (an ID might have multiple original_urls pointing to different directions).
String currentFileLoc = payload.getLocation();
if ( currentFileLoc != null ) {
// Check that the current payload does not have a different file waiting to be uploaded. It is possible that multiple Payloads with the same ID, point to different files (because of different sourceUrls).
Matcher matcher = FILENAME_WITH_EXTENSION.matcher(currentFileLoc);
if ( matcher.matches() ) {
String curFileNameWithExtension = matcher.group(1);
if ( (curFileNameWithExtension != null) && !curFileNameWithExtension.isEmpty()
&& ! curFileNameWithExtension.equals(fileNameWithExt) ) { // If the file of this payload is NOT that same with the given one, then do NOT update it.
continue; // This different file, is waiting its upload-time, in the loop, where this method was called.
}
}
payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker. payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker.
}
}
} }
} }