diff --git a/build.gradle b/build.gradle index b8a2df8..2addcba 100644 --- a/build.gradle +++ b/build.gradle @@ -106,7 +106,7 @@ dependencies { // https://mvnrepository.com/artifact/org.json/json implementation 'org.json:json:20220924' - testImplementation group: 'org.springframework.security', name: 'spring-security-test' + testImplementation 'org.springframework.security:spring-security-test' testImplementation "org.springframework.boot:spring-boot-starter-test" } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java index 17bd4d4..adc110d 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/UrlController.java @@ -113,32 +113,14 @@ public class UrlController { // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. //logger.debug(findAssignmentsQuery); // DEBUG! - String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; - String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment"; String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment"; List assignments = new ArrayList<>(assignmentsLimit); ImpalaConnector.databaseLock.lock(); - try { - jdbcTemplate.execute(createCurrentAssignmentsQuery); - } catch (Exception e) { - String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e); - String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened). - if ( tmpErrMsg != null ) - errorMsg += "\n" + tmpErrMsg; - ImpalaConnector.databaseLock.unlock(); - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); - } - - try { - jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery); - } catch (Exception e) { - String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e); - String tmpErrMsg = dropCurrentAssignmentTable(); - if ( tmpErrMsg != null ) - errorMsg += "\n" + tmpErrMsg; + String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery); + if ( errorMsg != null ) { ImpalaConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } @@ -164,7 +146,7 @@ public class UrlController { assignments.add(assignment); }); } catch (Exception e) { - String errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e); + errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e); String tmpErrMsg = dropCurrentAssignmentTable(); if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; @@ -174,7 +156,7 @@ public class UrlController { int assignmentsSize = assignments.size(); if ( assignmentsSize == 0 ) { - String errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests."; + errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests."; logger.error(errorMsg); String tmpErrMsg = dropCurrentAssignmentTable(); ImpalaConnector.databaseLock.unlock(); @@ -196,7 +178,7 @@ public class UrlController { try { jdbcTemplate.execute(insertAssignmentsQuery); } catch (Exception e) { - String errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e); + errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e); String tmpErrMsg = dropCurrentAssignmentTable(); if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; @@ -204,7 +186,7 @@ public class UrlController { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } - String errorMsg = dropCurrentAssignmentTable(); + errorMsg = dropCurrentAssignmentTable(); if ( errorMsg != null ) { ImpalaConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); @@ -300,9 +282,6 @@ public class UrlController { // Load all the parquet files of each type into its table. ImpalaConnector.databaseLock.lock(); - // Important note: It may be possible for a thread to acquire the lock and load its own and another thread's data into the table. - // So when the other thread acquire the lock, it will load ZERO data. - // That's ok, and we do not need to add any check if the remote data exist, since this process happens in milliseconds. (so a few milliseconds will be wasted for no data) errorMsgAttempts = parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts, "attempt"); errorMsgPayloads = parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloads, "payload"); ImpalaConnector.databaseLock.unlock(); @@ -351,7 +330,7 @@ public class UrlController { } } - // This will delete the rows of the "assignment" table which refer to the curWorkerId. As we have non-kudu Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data. + // This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data. // Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table. // We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks. mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", " WHERE workerid != ", curWorkerId); @@ -370,6 +349,34 @@ public class UrlController { } + private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery) + { + String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; + String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment"; + + try { + jdbcTemplate.execute(createCurrentAssignmentsQuery); + } catch (Exception e) { + String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e); + String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened). + if ( tmpErrMsg != null ) + errorMsg += "\n" + tmpErrMsg; + return errorMsg; + } + + try { + jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery); + } catch (Exception e) { + String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e); + String tmpErrMsg = dropCurrentAssignmentTable(); + if ( tmpErrMsg != null ) + errorMsg += "\n" + tmpErrMsg; + return errorMsg; + } + + return null; // All good. + } + private String dropCurrentAssignmentTable() { String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE"; try { diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java b/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java index 3a459a2..40d22dc 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUnZipper.java @@ -1,7 +1,5 @@ package eu.openaire.urls_controller.util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.io.File; @@ -15,11 +13,10 @@ import java.util.zip.ZipInputStream; @Component public class FileUnZipper { - private static final Logger logger = LoggerFactory.getLogger(FileUnZipper.class); public void unzipFolder(Path source, Path target) throws Exception { try ( ZipInputStream zis = new ZipInputStream(Files.newInputStream(source.toFile().toPath())) ) { - // Iterate over the files in zip and un-zip them. + // Iterate over the files in zip and unzip them. ZipEntry zipEntry = zis.getNextEntry(); while ( zipEntry != null ) { Path targetPath = zipSlipProtect(zipEntry, target); diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index b71dd15..c57c5c4 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -114,7 +114,7 @@ public class FileUtils { String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1" ; final int[] hashArgType = new int[] {Types.VARCHAR}; - ImpalaConnector.databaseLock.lock(); + ImpalaConnector.databaseLock.lock(); // The following loop uses the database. for ( UrlReport urlReport : urlReports ) { @@ -230,7 +230,7 @@ public class FileUtils { fileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath); String[] fileNames = new File(targetDirectory).list(); - if ( (fileNames == null) || (fileNames.length <= 1) ) { // The directory might have only one file, the "zip-file". + if ( (fileNames == null) || (fileNames.length <= 1) ) { // The directory might have only one file, the "zip-file", if the full-texts failed to be unzipped.. logger.error("No full-text fileNames where extracted from directory: " + targetDirectory); failedBatches ++; continue; // To the next batch. @@ -489,13 +489,17 @@ public class FileUtils { } - public boolean deleteDirectory(File curBatchDir) { + public boolean deleteDirectory(File curBatchDir) + { try { org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir); return true; } catch (IOException e) { logger.error("The following directory could not be deleted: " + curBatchDir.getName(), e); return false; + } catch (IllegalArgumentException iae) { + logger.error("This batch-dir does not exist: " + curBatchDir.getName()); + return false; } }