diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java index 195fad3..63ec6f7 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java @@ -78,7 +78,7 @@ public class ImpalaConnector { public static String handleQueryException(String queryName, String query, Exception e) { - String errorMsg = "Problem when creating " + (( ! queryName.startsWith("get")) ? "and executing " : "") + "the prepared statement for \"" + queryName + "\"!\n"; + String errorMsg = "Problem when executing the query \"" + queryName + "\"!\n"; logger.error(errorMsg + "\n\n" + query + "\n\n", e); return errorMsg; } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java b/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java index ba29794..1b53130 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/ImpalaController.java @@ -3,6 +3,7 @@ package eu.openaire.urls_controller.controllers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.jdbc.core.JdbcTemplate; @@ -23,10 +24,13 @@ public class ImpalaController { @Autowired private JdbcTemplate jdbcTemplate; + @Value("${services.pdfaggregation.controller.db.databaseName}") + private String databaseName; + @GetMapping("get10PublicationIdsTest") public ResponseEntity get10PublicationIdsTest() { - String query = "SELECT id FROM publication LIMIT 10;"; + String query = "SELECT id FROM " + databaseName + ".publication LIMIT 10;"; try { List publications = jdbcTemplate.queryForList(query, String.class); @@ -38,4 +42,5 @@ public class ImpalaController { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } } + } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index eb47805..fd092c3 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -42,15 +42,19 @@ public class UrlController { private static final AtomicLong assignmentsBatchCounter = new AtomicLong(0); private static final Pattern MALICIOUS_INPUT_STRING = Pattern.compile(".*[';`\"]+.*"); - @Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") - private int maxAttemptsPerRecord; - @Value("${services.pdfaggregation.controller.assignmentLimit}") private int assignmentLimit; @Value("${services.pdfaggregation.controller.db.databaseName}") private String databaseName; + private AtomicInteger maxAttemptsPerRecordAtomic; + + + public UrlController(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord) { + maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord); + } + @GetMapping("") public ResponseEntity getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) { @@ -88,12 +92,13 @@ public class UrlController { " select a.id, a.original_url from " + databaseName + ".assignment a\n" + " union all\n" + " select pl.id, pl.original_url from " + databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" + - "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecord + " and not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + + "where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + " and not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + "limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" + "order by coalesce(attempt_count, 0), reverse(pubid), url\n" + "limit " + assignmentsLimit + ") as findAssignmentsQuery"; // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. + //logger.debug(findAssignmentsQuery); // DEBUG! String createCurrentAssignmentsQuery = "create table " + databaseName + ".current_assignment as \n" + findAssignmentsQuery; String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + databaseName + ".current_assignment"; @@ -113,14 +118,12 @@ public class UrlController { try { jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery); - } catch (Exception sqle) { - String errorMsg = dropCurrentAssignmentTable(); + } catch (Exception e) { + String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e); + String tmpErrMsg = dropCurrentAssignmentTable(); + if ( tmpErrMsg != null ) + errorMsg += "\n" + tmpErrMsg; ImpalaConnector.databaseLock.unlock(); - - if ( errorMsg != null ) - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - - errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, sqle); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } @@ -144,32 +147,25 @@ public class UrlController { assignments.add(assignment); }); } catch (Exception e) { - String errorMsg = dropCurrentAssignmentTable(); + String errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e); + String tmpErrMsg = dropCurrentAssignmentTable(); + if ( tmpErrMsg != null ) + errorMsg += "\n" + tmpErrMsg; ImpalaConnector.databaseLock.unlock(); - - if ( errorMsg != null ) - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - - errorMsg = "Problem when executing the \"getAssignmentsQuery\"!\n"; - logger.error(errorMsg, e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } int assignmentsSize = assignments.size(); if ( assignmentsSize == 0 ) { - String errorMsg = dropCurrentAssignmentTable(); - ImpalaConnector.databaseLock.unlock(); - - if ( errorMsg != null ) - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - - maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests. - errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests."; + String errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests."; logger.error(errorMsg); - return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg); + String tmpErrMsg = dropCurrentAssignmentTable(); + if ( tmpErrMsg != null ) + errorMsg += "\n" + tmpErrMsg; + ImpalaConnector.databaseLock.unlock(); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } else if ( assignmentsSize < assignmentsLimit ) { - maxAttemptsPerRecord += 2; // Increase the max-attempts to try again some very old records, in the next requests. - logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecord + " for the next requests."); + logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests."); } logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker."); @@ -183,14 +179,12 @@ public class UrlController { try { jdbcTemplate.execute(insertAssignmentsQuery); - } catch (Exception sqle) { - String errorMsg = dropCurrentAssignmentTable(); + } catch (Exception e) { + String errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e); + String tmpErrMsg = dropCurrentAssignmentTable(); + if ( tmpErrMsg != null ) + errorMsg += "\n" + tmpErrMsg; ImpalaConnector.databaseLock.unlock(); - - if ( errorMsg != null ) - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - - errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, sqle); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } @@ -263,11 +257,11 @@ public class UrlController { } // Store the workerReport into the database. We use "PreparedStatements" to do insertions, for security and valid SQL syntax reasons. - String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; + final String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + final int[] payloadArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; - String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; - int[] attemptArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; + final String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)"; + final int[] attemptArgTypes = new int[] {Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}; final AtomicInteger failedCount = new AtomicInteger(0); diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index 476bf88..09306b2 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -56,7 +56,7 @@ public class FileUtils { * */ public String mergeParquetFiles(String tableName, String whereClause, String parameter) { String errorMsg; - if ( tableName == null ) { + if ( (tableName == null) || tableName.isEmpty() ) { errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!"; logger.error(errorMsg); return errorMsg; @@ -86,8 +86,9 @@ public class FileUtils { } - private final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$"); - private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$"); + private final String numAndExtension = "(?:\\([\\d]+\\))?\\.[\\w]{2,10}"; + private final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)" + numAndExtension + "$"); + private final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+" + numAndExtension + ")$"); @Value("services.pdfaggregation.controller.baseTargetLocation") private String baseTargetLocation; @@ -157,7 +158,7 @@ public class FileUtils { fileLocation = payload.getLocation(); if ( fileLocation != null ) { // If the docFile was downloaded (without an error).. Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation); - if ( !matcher.matches() ) { + if ( ! matcher.matches() ) { continue; } String fileNameWithExtension = matcher.group(1); @@ -263,7 +264,7 @@ public class FileUtils { try { String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath); - setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url); + setFullTextForMultipleIDs(fileRelatedIDs, payloadsHashMultimap, s3Url, fileName); numUploadedFiles ++; } catch (Exception e) { logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e); @@ -354,7 +355,6 @@ public class FileUtils { private String getRequestUrlForBatch(String baseUrl, List fileNamesForCurBatch) { final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 50); - sb.append(baseUrl); int numFullTextsCurBatch = fileNamesForCurBatch.size(); for ( int j=0; j < numFullTextsCurBatch; ++j ){ @@ -426,7 +426,7 @@ public class FileUtils { return false; // It's not problematic. } - logger.error("None of the locations of the payloads matched with the ID \"" + fileID + "\" are ending with the filename \"" + fileName + "\" they were supposed to."); + logger.error("None of the locations of the payloads matched with the ID \"" + fileID + "\" are ending with the filename \"" + fileName + "\", as it was supposed to.\nThe related payloads are: " + payloads); return true; } @@ -488,8 +488,9 @@ public class FileUtils { * @param fileIDs * @param payloadsHashMultimap * @param s3Url + * @param fileNameWithExt */ - public void setFullTextForMultipleIDs(Set fileIDs, HashMultimap payloadsHashMultimap, String s3Url) { + public void setFullTextForMultipleIDs(Set fileIDs, HashMultimap payloadsHashMultimap, String s3Url, String fileNameWithExt) { for ( String id : fileIDs ) { Set payloads = payloadsHashMultimap.get(id); if ( payloads.isEmpty() ) { @@ -497,9 +498,22 @@ public class FileUtils { continue; } - for ( Payload payload : payloads ) - if ( payload.getHash() != null ) // Update only for the records which led to a file, not all the records of this ID (an ID might have multiple original_urls pointing to different directions). + for ( Payload payload : payloads ) { + // Update only for the records which led to a file, not all the records of this ID (an ID might have multiple original_urls pointing to different directions). + String currentFileLoc = payload.getLocation(); + if ( currentFileLoc != null ) { + // Check that the current payload does not have a different file waiting to be uploaded. It is possible that multiple Payloads with the same ID, point to different files (because of different sourceUrls). + Matcher matcher = FILENAME_WITH_EXTENSION.matcher(currentFileLoc); + if ( matcher.matches() ) { + String curFileNameWithExtension = matcher.group(1); + if ( (curFileNameWithExtension != null) && !curFileNameWithExtension.isEmpty() + && ! curFileNameWithExtension.equals(fileNameWithExt) ) { // If the file of this payload is NOT that same with the given one, then do NOT update it. + continue; // This different file, is waiting its upload-time, in the loop, where this method was called. + } + } payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker. + } + } } }