From 44459c86810fe01ebe12ee96e9ebc588b55eb51a Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Wed, 23 Aug 2023 16:55:23 +0300 Subject: [PATCH] - Rename "ImpalaConnector.java" to "DatabaseConnector.java". - Update dependencies. - Code polishing. --- build.gradle | 10 +-- gradle/wrapper/gradle-wrapper.properties | 2 +- installAndRun.sh | 2 +- .../components/ScheduledTasks.java | 7 +- ...aConnector.java => DatabaseConnector.java} | 10 +-- .../controllers/BulkImportController.java | 4 +- .../controllers/StatsController.java | 23 +++--- .../controllers/TestController.java | 4 +- .../services/BulkImportServiceImpl.java | 22 +++--- .../services/StatsServiceImpl.java | 16 ++-- .../services/UrlsServiceImpl.java | 76 +++++++++---------- .../urls_controller/util/FileUtils.java | 26 +++---- .../util/ParquetFileUtils.java | 6 +- 13 files changed, 103 insertions(+), 105 deletions(-) rename src/main/java/eu/openaire/urls_controller/configuration/{ImpalaConnector.java => DatabaseConnector.java} (94%) diff --git a/build.gradle b/build.gradle index c64195f..34362c5 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,6 @@ plugins { id 'org.springframework.boot' version '2.7.14' - id 'io.spring.dependency-management' version '1.1.2' + id 'io.spring.dependency-management' version '1.1.3' id 'java' } @@ -43,10 +43,10 @@ dependencies { //implementation group: 'jakarta.validation', name: 'jakarta.validation-api', version: '3.0.2' // https://mvnrepository.com/artifact/com.google.guava/guava - implementation group: 'com.google.guava', name: 'guava', version: '32.1.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.2-jre' // https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 - implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' + implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.13.0' // https://mvnrepository.com/artifact/org.apache.commons/commons-compress implementation("org.apache.commons:commons-compress:1.23.0") { @@ -54,7 +54,7 @@ dependencies { } implementation 'com.github.luben:zstd-jni:1.5.5-5' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often. - implementation 'io.minio:minio:8.5.4' + implementation 'io.minio:minio:8.5.5' // https://mvnrepository.com/artifact/com.cloudera.impala/jdbc implementation("com.cloudera.impala:jdbc:2.5.31") { @@ -120,7 +120,7 @@ dependencies { // https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus - runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.11.2' + runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.11.3' testImplementation 'org.springframework.security:spring-security-test' testImplementation "org.springframework.boot:spring-boot-starter-test" diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 9f4197d..ac72c34 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.2.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/installAndRun.sh b/installAndRun.sh index 9f712ad..ad95bce 100755 --- a/installAndRun.sh +++ b/installAndRun.sh @@ -26,7 +26,7 @@ if [[ justInstall -eq 1 && shouldRunInDocker -eq 1 ]]; then justInstall=0 fi -gradleVersion="8.2.1" +gradleVersion="8.3" if [[ justInstall -eq 0 ]]; then diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index 3df8adc..05655ef 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -72,7 +72,8 @@ public class ScheduledTasks { public void executeBackgroundTasks() { List> tempList = new ArrayList<>(UrlsController.backgroundCallableTasks); // Copy the list in order to know what was executed. - // So the items added while this execution happens, will be remain in the global-list, while the other will have already be deleted. + // So the items added while this execution happens, will remain in the global-list, while the others will have already been deleted. + // Also, the "Executor.invokeAll()" requires an "unchanged" list, otherwise there will be "undefined results". int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed. if ( numOfTasks == 0 ) return; @@ -150,15 +151,13 @@ public class ScheduledTasks { if ( workerReports == null ) { logger.error("There was an error when getting the subDirs of \"workerReportsDir\": " + workerReportsDir); return; - } - else if ( workerReports.length == 0 ) { + } else if ( workerReports.length == 0 ) { logger.debug("The \"workerReportsDir\" is empty, so there is nothing to delete."); return; } long currentTime = System.currentTimeMillis(); - // Loop through the array and print only the directories for ( File workerReport : workerReports ) { long lastModified = workerReport.lastModified(); diff --git a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java b/src/main/java/eu/openaire/urls_controller/configuration/DatabaseConnector.java similarity index 94% rename from src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java rename to src/main/java/eu/openaire/urls_controller/configuration/DatabaseConnector.java index b5bcbfd..cc9b9c8 100644 --- a/src/main/java/eu/openaire/urls_controller/configuration/ImpalaConnector.java +++ b/src/main/java/eu/openaire/urls_controller/configuration/DatabaseConnector.java @@ -12,9 +12,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Repository -public class ImpalaConnector { +public class DatabaseConnector { - private static final Logger logger = LoggerFactory.getLogger(ImpalaConnector.class); + private static final Logger logger = LoggerFactory.getLogger(DatabaseConnector.class); @Autowired private JdbcTemplate jdbcTemplate; @@ -28,9 +28,9 @@ public class ImpalaConnector { public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database. - public ImpalaConnector(@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, - @Value("${services.pdfaggregation.controller.db.initialDatabaseName}") String initialDatabaseName, - @Value("${services.pdfaggregation.controller.db.testDatabaseName}") String testDatabaseName) { + public DatabaseConnector(@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, + @Value("${services.pdfaggregation.controller.db.initialDatabaseName}") String initialDatabaseName, + @Value("${services.pdfaggregation.controller.db.testDatabaseName}") String testDatabaseName) { this.isTestEnvironment = isTestEnvironment; this.initialDatabaseName = initialDatabaseName; this.testDatabaseName = testDatabaseName; diff --git a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java index 4be541b..26075dd 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java @@ -66,8 +66,8 @@ public class BulkImportController { this.bulkImportService = bulkImportService; numOfThreadsForBulkImportProcedures = bulkImport.getNumOfThreadsForBulkImportProcedures(); - logger.info("Will use " + numOfThreadsForBulkImportProcedures + " threads per bulk-import procedure."); - bulkImportExecutor = Executors.newFixedThreadPool(numOfThreadsForBulkImportProcedures); // At most < numOfThreadsPerBulkImportProcedure > threads will be used per bulk-import procedure.. + logger.info("Will use " + numOfThreadsForBulkImportProcedures + " threads for the bulk-import procedures."); + bulkImportExecutor = Executors.newFixedThreadPool(numOfThreadsForBulkImportProcedures); // At most < numOfThreadsForBulkImportProcedures > threads will be used per bulk-import procedure.. } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java b/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java index 6ba555b..d780fe1 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java @@ -1,7 +1,7 @@ package eu.openaire.urls_controller.controllers; -import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.services.StatsService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +43,7 @@ public class StatsController { if ( ! isCalledFromScheduler ) logger.info("Received a \"getNumberOfAllPayloads\" request."); - final String getAllPayloadsNumberQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload"; + final String getAllPayloadsNumberQuery = "select count(id) from " + DatabaseConnector.databaseName + ".payload"; return statsService.getNumberOfPayloads(getAllPayloadsNumberQuery, "all payloads", 0); } @@ -57,7 +57,7 @@ public class StatsController { if ( ! isCalledFromScheduler ) logger.info("Received a \"getNumberOfPayloadsAggregatedByServiceThroughCrawling\" request."); - String getNumOfPayloadsAggregatedByServiceThroughCrawlingQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload_aggregated"; + String getNumOfPayloadsAggregatedByServiceThroughCrawlingQuery = "select count(id) from " + DatabaseConnector.databaseName + ".payload_aggregated"; return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceThroughCrawlingQuery, "payloads aggregated by the Service through crawling", 0); } @@ -71,7 +71,7 @@ public class StatsController { if ( ! isCalledFromScheduler ) logger.info("Received a \"getNumberOfPayloadsAggregatedByServiceThroughBulkImport\" request."); - String getNumOfPayloadsAggregatedByServiceThroughBulkImportQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload_bulk_import"; + String getNumOfPayloadsAggregatedByServiceThroughBulkImportQuery = "select count(id) from " + DatabaseConnector.databaseName + ".payload_bulk_import"; return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceThroughBulkImportQuery, "payloads aggregated by the Service through BulkImport procedures", 0); } @@ -86,9 +86,9 @@ public class StatsController { logger.info("Received a \"getNumberOfPayloadsAggregatedByService\" request."); String getNumOfPayloadsAggregatedByServiceQuery = "select count(id) from\n" + - " (select id from " + ImpalaConnector.databaseName + ".payload_aggregated\n" + + " (select id from " + DatabaseConnector.databaseName + ".payload_aggregated\n" + " union all\n" + - " select id from " + ImpalaConnector.databaseName + ".payload_bulk_import)\n" + + " select id from " + DatabaseConnector.databaseName + ".payload_bulk_import)\n" + " as payloads_from_service"; return statsService.getNumberOfPayloads(getNumOfPayloadsAggregatedByServiceQuery, "payloads aggregated by the Service, through both crawling and bulk-import procedures", 0); } @@ -103,7 +103,7 @@ public class StatsController { if ( ! isCalledFromScheduler ) logger.info("Received a \"getNumberOfLegacyPayloads\" request."); - String getNumOfLegacyPayloadsQuery = "select count(id) from " + ImpalaConnector.databaseName + ".payload_legacy"; + String getNumOfLegacyPayloadsQuery = "select count(id) from " + DatabaseConnector.databaseName + ".payload_legacy"; return statsService.getNumberOfPayloads(getNumOfLegacyPayloadsQuery, "legacy payloads", 0); } @@ -115,8 +115,8 @@ public class StatsController { public ResponseEntity getNumberOfPayloadsForDatasource(@RequestParam String datasourceId) { logger.info("Received a \"getNumberOfPayloadsForDatasource\" request."); final String getNumOfPayloadsForDatasourceQuery = - "select count(p.id) from " + ImpalaConnector.databaseName + ".payload p\n" + - " join " + ImpalaConnector.databaseName + ".publication pu on pu.id=p.id and pu.datasourceid=\"" + datasourceId + "\""; + "select count(p.id) from " + DatabaseConnector.databaseName + ".payload p\n" + + " join " + DatabaseConnector.databaseName + ".publication pu on pu.id=p.id and pu.datasourceid=\"" + datasourceId + "\""; if ( logger.isTraceEnabled() ) logger.trace("getNumOfPayloadsForDatasourceQuery:\n" + getNumOfPayloadsForDatasourceQuery); @@ -148,8 +148,7 @@ public class StatsController { // TODO - Add an endpoint to return the number of payloads found for each publication-year, in descending order.. // For example the number of payloads for publications published in 2016 is // --//-- the number for 2017 is - // Add a "limit" parameter for the user to specify that wants only the last 5 years (2019-2023) - + // Add a "limit" parameter for the user to specify that wants only the last 5 years (2019-2023). /** @@ -158,7 +157,7 @@ public class StatsController { @GetMapping(value = "getNumberOfAllDistinctFullTexts", produces = MediaType.TEXT_PLAIN_VALUE) public ResponseEntity getNumberOfAllDistinctFullTexts() { logger.info("Received a \"getNumberOfAllDistinctFullTexts\" request."); - final String getPayloadsNumberQuery = "select count(distinct `hash`) from " + ImpalaConnector.databaseName + ".payload"; + final String getPayloadsNumberQuery = "select count(distinct `hash`) from " + DatabaseConnector.databaseName + ".payload"; return statsService.getNumberOfPayloads(getPayloadsNumberQuery, "distinct full-text files", 0); } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/TestController.java b/src/main/java/eu/openaire/urls_controller/controllers/TestController.java index d06391d..05ee5e7 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/TestController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/TestController.java @@ -1,7 +1,7 @@ package eu.openaire.urls_controller.controllers; import com.google.common.collect.HashMultimap; -import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.models.Assignment; import eu.openaire.urls_controller.models.Datasource; import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; @@ -97,7 +97,7 @@ public class TestController { @GetMapping("get10PublicationIdsTest") public ResponseEntity get10PublicationIdsTest() { - String query = "SELECT id FROM " + ImpalaConnector.databaseName + ".publication LIMIT 10;"; + String query = "SELECT id FROM " + DatabaseConnector.databaseName + ".publication LIMIT 10;"; try { List publications = jdbcTemplate.queryForList(query, String.class); diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index 9ab38c1..07f5568 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -3,7 +3,7 @@ package eu.openaire.urls_controller.services; import com.google.common.collect.Lists; import eu.openaire.urls_controller.components.BulkImport; -import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.controllers.BulkImportController; import eu.openaire.urls_controller.models.BulkImportReport; import eu.openaire.urls_controller.models.DocFileData; @@ -125,7 +125,7 @@ public class BulkImportServiceImpl implements BulkImportService { List> callableTasksForFileSegments = new ArrayList<>(numOfFiles); int sizeOfEachSegment = (numOfFiles / BulkImportController.numOfThreadsForBulkImportProcedures); - List> subLists = Lists.partition(fileLocations, sizeOfEachSegment); // Divide the initial list to "numOfThreadsPerBulkImportProcedure" subLists. The last one may have marginally fewer files. + List> subLists = Lists.partition(fileLocations, sizeOfEachSegment); // Divide the initial list to "numOfThreadsForBulkImportProcedures" subLists. The last one may have marginally fewer files. int subListsSize = subLists.size(); bulkImportReport.addEvent("Going to import the files in parallel, after dividing them in " + subListsSize + " segments."); @@ -194,16 +194,16 @@ public class BulkImportServiceImpl implements BulkImportService { fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations. - ImpalaConnector.databaseLock.lock(); + DatabaseConnector.databaseLock.lock(); String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null); if ( mergeErrorMsg != null ) { - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); bulkImportReport.addEvent(mergeErrorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); String successMsg = "Finished the bulk-import procedure for " + provenance + " and bulkImportDir: " + bulkImportDirName; logger.info(successMsg); @@ -296,15 +296,15 @@ public class BulkImportServiceImpl implements BulkImportService { if ( logger.isTraceEnabled() ) logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"."); // DEBUG! - ImpalaConnector.databaseLock.lock(); + DatabaseConnector.databaseLock.lock(); if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) { - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); bulkImportReport.addEvent("Could not load the payload-records to the database, for segment-" + segmentCounter + "!"); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; // All files of this segment have failed. } - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + ", for segment-" + segmentCounter + "."; logger.info(segmentSuccessMsg); @@ -355,10 +355,10 @@ public class BulkImportServiceImpl implements BulkImportService { String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources. String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link. - final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1"; + final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload where `hash` = ? limit 1"; final int[] hashArgType = new int[] {Types.VARCHAR}; String alreadyFoundFileLocation = null; - ImpalaConnector.databaseLock.lock(); + DatabaseConnector.databaseLock.lock(); try { alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[]{fileHash}, hashArgType, String.class); } catch (EmptyResultDataAccessException erdae) { @@ -367,7 +367,7 @@ public class BulkImportServiceImpl implements BulkImportService { logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!\n", e); // Continue with bulk-importing the file and uploading it to S3. } finally { - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); } String idMd5hash = getMD5hash(fileNameID.toLowerCase()); diff --git a/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java index cbb033d..2d497c4 100644 --- a/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java @@ -1,6 +1,6 @@ package eu.openaire.urls_controller.services; -import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.configuration.DatabaseConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -36,12 +36,12 @@ public class StatsServiceImpl implements StatsService { Object result = jdbcTemplate.queryForObject(getNumberQuery, Integer.class); if ( result != null ) { int numOfPayloads = (int) result; - logger.info("The number of " + message + " in the database \"" + ImpalaConnector.databaseName + "\" is " + numOfPayloads); + logger.info("The number of " + message + " in the database \"" + DatabaseConnector.databaseName + "\" is " + numOfPayloads); return ResponseEntity.ok(Integer.toString(numOfPayloads)); } else - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The number of " + message + " could not be retrieved (it was null) from the database \"" + ImpalaConnector.databaseName + "\" using the getNumberQuery: " + getNumberQuery); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The number of " + message + " could not be retrieved (it was null) from the database \"" + DatabaseConnector.databaseName + "\" using the getNumberQuery: " + getNumberQuery); } catch (EmptyResultDataAccessException erdae) { - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The number of " + message + " could not be retrieved (empty result) from the database \"" + ImpalaConnector.databaseName + "\" using the getNumberQuery: " + getNumberQuery); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The number of " + message + " could not be retrieved (empty result) from the database \"" + DatabaseConnector.databaseName + "\" using the getNumberQuery: " + getNumberQuery); } catch (Exception e) { String exMsg = e.getMessage(); if ( (exMsg != null) && (exMsg.contains("Could not resolve table reference") || exMsg.contains("Failed to open HDFS file")) ) { @@ -69,17 +69,17 @@ public class StatsServiceImpl implements StatsService { // So in order to get the number of inspected records, we want the distinct number, which at some point it will remain stable, even though the Service will try again and again some records. // Before all the records are inspected, this endpoint will report all the inspected records MINUS the duplicate records which come straight from the "publication" table. - final String getInspectedRecordsNumberQuery = "select count(dist.id) from (select distinct id, original_url from " + ImpalaConnector.databaseName + ".attempt) as dist"; + final String getInspectedRecordsNumberQuery = "select count(dist.id) from (select distinct id, original_url from " + DatabaseConnector.databaseName + ".attempt) as dist"; try { Object result = jdbcTemplate.queryForObject(getInspectedRecordsNumberQuery, Integer.class); if ( result != null ) { int numOfInspectedRecords = (int) result; - logger.info("Number of crawling-inspected records from the database \"" + ImpalaConnector.databaseName + "\" is " + numOfInspectedRecords); + logger.info("Number of crawling-inspected records from the database \"" + DatabaseConnector.databaseName + "\" is " + numOfInspectedRecords); return ResponseEntity.ok(Integer.toString(numOfInspectedRecords)); } else - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + DatabaseConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery); } catch (EmptyResultDataAccessException erdae) { - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + ImpalaConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("The inspected records' number could not be retrieved from the database \"" + DatabaseConnector.databaseName + "\" using the getInspectedRecordsNumberQuery: " + getInspectedRecordsNumberQuery); } catch (Exception e) { String exMsg = e.getMessage(); if ( (exMsg != null) && (exMsg.contains("Could not resolve table reference") || exMsg.contains("Failed to open HDFS file")) ) { diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 119e879..6071ffc 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -1,7 +1,7 @@ package eu.openaire.urls_controller.services; import eu.openaire.urls_controller.components.BulkImport; -import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; @@ -112,24 +112,24 @@ public class UrlsServiceImpl implements UrlsService { "select pubid, url, datasourceid, datasourcename\n" + // Select the final sorted data with "assignmentsLimit". "from (select distinct pubid, url, datasourceid, datasourcename, attempt_count, pub_year\n" + // Select the distinct id-url data. Beware that this will return duplicate id-url paris, wince one pair may be associated with multiple datasources. " from (select p.id as pubid, pu.url as url, pb.level as level, attempts.counts as attempt_count, p.year as pub_year, d.id as datasourceid, d.name as datasourcename\n" + // Select all needed columns frm JOINs, order by "boost.level" and limit them to (assignmentsLimit * 10) - " from " + ImpalaConnector.databaseName + ".publication p\n" + - " join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" + - " join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" + // This is needed for the "d.allow_harvest=true" check later on. - " left outer join " + ImpalaConnector.databaseName + ".publication_boost pb\n" + + " from " + DatabaseConnector.databaseName + ".publication p\n" + + " join " + DatabaseConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" + + " join " + DatabaseConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" + // This is needed for the "d.allow_harvest=true" check later on. + " left outer join " + DatabaseConnector.databaseName + ".publication_boost pb\n" + " on p.id=pb.id\n" + - " left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts\n" + + " left outer join (select count(a.id) as counts, a.id from " + DatabaseConnector.databaseName + ".attempt a group by a.id) as attempts\n" + " on attempts.id=p.id\n" + " left outer join (\n" + - " select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" + + " select a.id, a.original_url from " + DatabaseConnector.databaseName + ".assignment a\n" + " union all\n" + - " select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl\n" + // Here we access the payload-VIEW which includes the three payload-tables. + " select pl.id, pl.original_url from " + DatabaseConnector.databaseName + ".payload pl\n" + // Here we access the payload-VIEW which includes the three payload-tables. " ) as existing\n" + " on existing.id=p.id and existing.original_url=pu.url\n" + " where d.allow_harvest=true and existing.id is null\n" + // For records not found on existing, the "existing.id" will be null. ((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below. (" and d.id not in " + excludedDatasourceIDsStringList + "\n") : "") + " and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecordAtomic.get() + "\n" + - " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + + " and not exists (select 1 from " + DatabaseConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" + " and pu.url != '' and pu.url is not null\n" + // Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing. " order by coalesce(level, -1000) desc\n" + " limit " + (assignmentsLimit * 10) + "\n" + @@ -144,19 +144,19 @@ public class UrlsServiceImpl implements UrlsService { // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. //logger.trace("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG! - final String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment"; + final String getAssignmentsQuery = "select * from " + DatabaseConnector.databaseName + ".current_assignment"; List assignments = new ArrayList<>(assignmentsLimit); long curAssignmentsBatchCounter = assignmentsBatchCounter.incrementAndGet(); - ImpalaConnector.databaseLock.lock(); + DatabaseConnector.databaseLock.lock(); // For performance reasons, we collect the returned assignments and create a temp-table with them, so that later they can be copied efficiently to the "a-bit-more-permanent" assignment table. // This way, we avoid having to manually insert thousands of assignment records there. Instead, we create a new table AS the results from the "findAssignmentsQuery". String errorMsg = createAndInitializeCurrentAssignmentsTable(findAssignmentsQuery); if ( errorMsg != null ) { - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } @@ -184,15 +184,15 @@ public class UrlsServiceImpl implements UrlsService { } catch (EmptyResultDataAccessException erdae) { errorMsg = "No results were returned for \"getAssignmentsQuery\":\n" + getAssignmentsQuery; String tmpErrMsg = dropCurrentAssignmentTable(); - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; logger.warn(errorMsg); return ResponseEntity.status(HttpStatus.NO_CONTENT).body(errorMsg); } catch (Exception e) { - errorMsg = ImpalaConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e); + errorMsg = DatabaseConnector.handleQueryException("getAssignmentsQuery", getAssignmentsQuery, e); String tmpErrMsg = dropCurrentAssignmentTable(); - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); @@ -203,7 +203,7 @@ public class UrlsServiceImpl implements UrlsService { errorMsg = "No results retrieved from the \"findAssignmentsQuery\" for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + " for the next requests."; logger.error(errorMsg); String tmpErrMsg = dropCurrentAssignmentTable(); - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); if ( tmpErrMsg != null ) { errorMsg += "\n" + tmpErrMsg; // The additional error-msg is already logged. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); @@ -215,23 +215,23 @@ public class UrlsServiceImpl implements UrlsService { logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker."); // Write the Assignment details to the assignment-table. - String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pubid, url, '" + workerId + "', " + curAssignmentsBatchCounter + ", " + timestampMillis - + "\nfrom " + ImpalaConnector.databaseName + ".current_assignment"; + String insertAssignmentsQuery = "insert into " + DatabaseConnector.databaseName + ".assignment \n select pubid, url, '" + workerId + "', " + curAssignmentsBatchCounter + ", " + timestampMillis + + "\nfrom " + DatabaseConnector.databaseName + ".current_assignment"; try { jdbcTemplate.execute(insertAssignmentsQuery); } catch (Exception e) { - errorMsg = ImpalaConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e); + errorMsg = DatabaseConnector.handleQueryException("insertAssignmentsQuery", insertAssignmentsQuery, e); String tmpErrMsg = dropCurrentAssignmentTable(); if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } errorMsg = dropCurrentAssignmentTable(); if ( errorMsg != null ) { - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } @@ -239,11 +239,11 @@ public class UrlsServiceImpl implements UrlsService { String mergeErrorMsg = fileUtils.mergeParquetFiles("assignment", "", null); if ( mergeErrorMsg != null ) { - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(mergeErrorMsg); } - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); // Due to the fact that one publication with an id-url pair can be connected with multiple datasources, the results returned from the query may be duplicates. // So, we apply a post-processing step where we collect only one instance of each id-url pair and send it to the Worker. @@ -349,7 +349,7 @@ public class UrlsServiceImpl implements UrlsService { } // Load all the parquet files of each type into its table. - ImpalaConnector.databaseLock.lock(); + DatabaseConnector.databaseLock.lock(); if ( ! hasAttemptParquetFileProblem ) hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/", "attempt"); @@ -357,7 +357,7 @@ public class UrlsServiceImpl implements UrlsService { if ( ! hasPayloadParquetFileProblem ) hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignmentsCounter + "/", "payload_aggregated"); - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); if ( hasAttemptParquetFileProblem && hasPayloadParquetFileProblem ) throw new RuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter); @@ -370,9 +370,9 @@ public class UrlsServiceImpl implements UrlsService { logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: " + ie.getMessage()); // This is a very rare case. At the moment, we just move on with table-merging. } catch (RuntimeException re) { - ImpalaConnector.databaseLock.lock(); + DatabaseConnector.databaseLock.lock(); String assignmentErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter); - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); String errorMsg = re.getMessage(); if ( assignmentErrorMsg != null ) errorMsg += "\n" + assignmentErrorMsg; @@ -418,13 +418,13 @@ public class UrlsServiceImpl implements UrlsService { private String createAndInitializeCurrentAssignmentsTable(String findAssignmentsQuery) { - final String createCurrentAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; - final String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment"; + final String createCurrentAssignmentsQuery = "create table " + DatabaseConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery; + final String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + DatabaseConnector.databaseName + ".current_assignment"; try { jdbcTemplate.execute(createCurrentAssignmentsQuery); } catch (Exception e) { - String errorMsg = ImpalaConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e); + String errorMsg = DatabaseConnector.handleQueryException("createCurrentAssignmentsQuery", createCurrentAssignmentsQuery, e); String tmpErrMsg = dropCurrentAssignmentTable(); // The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened). if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; @@ -434,7 +434,7 @@ public class UrlsServiceImpl implements UrlsService { try { jdbcTemplate.execute(computeCurrentAssignmentsStatsQuery); } catch (Exception e) { - String errorMsg = ImpalaConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e); + String errorMsg = DatabaseConnector.handleQueryException("computeCurrentAssignmentsStatsQuery", computeCurrentAssignmentsStatsQuery, e); String tmpErrMsg = dropCurrentAssignmentTable(); if ( tmpErrMsg != null ) errorMsg += "\n" + tmpErrMsg; @@ -446,12 +446,12 @@ public class UrlsServiceImpl implements UrlsService { private String dropCurrentAssignmentTable() { - String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE"; + String dropCurrentAssignmentsQuery = "DROP TABLE IF EXISTS " + DatabaseConnector.databaseName + ".current_assignment PURGE"; try { jdbcTemplate.execute(dropCurrentAssignmentsQuery); return null; // All good. No error-message. } catch (Exception e) { - return ImpalaConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e); // The error is already logged inside. + return DatabaseConnector.handleQueryException("dropCurrentAssignmentsQuery", dropCurrentAssignmentsQuery, e); // The error is already logged inside. } } @@ -464,12 +464,12 @@ public class UrlsServiceImpl implements UrlsService { String mergeErrorMsg; - ImpalaConnector.databaseLock.lock(); + DatabaseConnector.databaseLock.lock(); if ( ! hasAttemptParquetFileProblem ) { mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null); if ( mergeErrorMsg != null ) { - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); return false; } @@ -478,7 +478,7 @@ public class UrlsServiceImpl implements UrlsService { if ( ! hasPayloadParquetFileProblem ) { mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null); if ( mergeErrorMsg != null ) { - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); return false; } @@ -486,12 +486,12 @@ public class UrlsServiceImpl implements UrlsService { mergeErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter); if ( mergeErrorMsg != null ) { - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg); return false; } - ImpalaConnector.databaseLock.unlock(); + DatabaseConnector.databaseLock.unlock(); logger.debug("Finished merging the database tables."); return true; } diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index c6472c3..7d72f38 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -3,7 +3,7 @@ package eu.openaire.urls_controller.util; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; -import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.controllers.UrlsController; import eu.openaire.urls_controller.models.Payload; import eu.openaire.urls_controller.models.UrlReport; @@ -100,12 +100,12 @@ public class FileUtils { // Create a temp-table as a copy of the initial table. try { - jdbcTemplate.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter); + jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + " " + whereClause + parameter); } catch (Exception e) { errorMsg = "Problem when copying the contents of \"" + tableName + "\" table to a newly created \"" + tableName + "_tmp\" table, when merging the parquet-files!\n"; logger.error(errorMsg, e); try { // Make sure we delete the possibly half-created temp-table. - jdbcTemplate.execute("DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + "." + tableName + "_tmp PURGE"); + jdbcTemplate.execute("DROP TABLE IF EXISTS " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE"); // We cannot move on with merging, but no harm happened, since the "table_tmp" name is still reserved for future use (after it was dropped immediately).. } catch (Exception e1) { logger.error("Failed to drop the \"" + tableName + "_tmp\" table!", e1); @@ -116,13 +116,13 @@ public class FileUtils { // Drop the initial table. try { - jdbcTemplate.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE"); + jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + " PURGE"); } catch (Exception e) { errorMsg = "Problem when dropping the initial \"" + tableName + "\" table, when merging the parquet-files!\n"; logger.error(errorMsg, e); // The original table could not be dropped, so the temp-table cannot be renamed to the original..! try { // Make sure we delete the already created temp-table, in order to be able to use it in the future. The merging has failed nevertheless. - jdbcTemplate.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp PURGE"); + jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE"); } catch (Exception e1) { logger.error((errorMsg += "Failed to drop the \"" + tableName + "_tmp\" table!"), e1); // Add this error to the original, both are very important. } @@ -133,20 +133,20 @@ public class FileUtils { // Rename the temp-table to have the initial-table's name. try { - jdbcTemplate.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName); + jdbcTemplate.execute("ALTER TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp RENAME TO " + DatabaseConnector.databaseName + "." + tableName); } catch (Exception e) { errorMsg = "Problem in renaming the \"" + tableName + "_tmp\" table to \"" + tableName + "\", when merging the parquet-files!\n"; logger.error(errorMsg, e); // At this point we only have a "temp-table", the original is already deleted.. // Try to create the original, as a copy of the temp-table. If that succeeds, then try to delete the temp-table. try { - jdbcTemplate.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + " stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + "_tmp"); + jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + " stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + "_tmp"); } catch (Exception e1) { errorMsg = "Problem when copying the contents of \"" + tableName + "_tmp\" table to a newly created \"" + tableName + "\" table, when merging the parquet-files!\n"; logger.error(errorMsg, e1); // If the original table was not created, then we have to intervene manually, if it was created but without any data, then we can safely move on handling other assignments and workerReports, but the data will be lost! So this workerReport failed to be handled. try { // The below query normally returns a list, as it takes a "regex-pattern" as an input. BUT, we give just the table name, without wildcards. So the result is either the tableName itself or none (not any other table). - jdbcTemplate.queryForObject("SHOW TABLES IN " + ImpalaConnector.databaseName + " LIKE '" + tableName + "'", List.class); + jdbcTemplate.queryForObject("SHOW TABLES IN " + DatabaseConnector.databaseName + " LIKE '" + tableName + "'", List.class); } catch (EmptyResultDataAccessException erdae) { // The table does not exist, so it was not even half-created by the previous query. // Not having the original table anymore is a serious error. A manual action is needed! @@ -159,7 +159,7 @@ public class FileUtils { // The creation of the original table was successful. Try to delete the temp-table. try { - jdbcTemplate.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp PURGE"); + jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE"); } catch (Exception e2) { logger.error((errorMsg += "Problem when dropping the \"" + tableName + "_tmp\" table, when merging the parquet-files!\n"), e2); // Manual deletion should be performed! @@ -171,7 +171,7 @@ public class FileUtils { // Gather information to be used for queries-optimization. try { - jdbcTemplate.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName); + jdbcTemplate.execute("COMPUTE STATS " + DatabaseConnector.databaseName + "." + tableName); } catch (Exception e) { logger.error("Problem when gathering information from table \"" + tableName + "\" to be used for queries-optimization.", e); // In this case the error is not so important to the whole operation.. It's only that the performance of this specific table will be less optimal, only temporarily, unless every "COMPUTE STATS" query fails for future workerReports too. @@ -212,7 +212,7 @@ public class FileUtils { SetMultimap allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((urlReportsSize / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. - final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload" + (isTestEnvironment ? "_aggregated" : "") + " where `hash` = ? limit 1"; + final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload" + (isTestEnvironment ? "_aggregated" : "") + " where `hash` = ? limit 1"; final int[] hashArgType = new int[] {Types.VARCHAR}; List> callableTasks = new ArrayList<>(6); @@ -274,7 +274,7 @@ public class FileUtils { }); }// end-for - ImpalaConnector.databaseLock.lock(); // The execution uses the database. + DatabaseConnector.databaseLock.lock(); // The execution uses the database. try { // Invoke all the tasks and wait for them to finish before moving to the next batch. List> futures = hashMatchingExecutor.invokeAll(callableTasks); for ( Future future : futures ) { @@ -292,7 +292,7 @@ public class FileUtils { logger.error(errorMsg, e); return UploadFullTextsResponse.unsuccessful; } finally { - ImpalaConnector.databaseLock.unlock(); // The remaining work of this function does not use the database. + DatabaseConnector.databaseLock.unlock(); // The remaining work of this function does not use the database. } logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextsFound.get() + " (out of " + urlReportsSize + " | about " + df.format(numFullTextsFound.get() * 100.0 / urlReportsSize) + "%)."); diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index 1086fb7..7e09c1c 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -1,7 +1,7 @@ package eu.openaire.urls_controller.util; import com.google.common.collect.Lists; -import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.models.Error; import eu.openaire.urls_controller.models.*; import org.apache.avro.Schema; @@ -458,7 +458,7 @@ public class ParquetFileUtils { public boolean loadParquetDataIntoTable(String remoteParquetDataLocation, String tableName) { // Import the data from the parquet file into the database's table. - String loadParquetInTableQuery = "load data inpath '" + remoteParquetDataLocation + "' into table " + ImpalaConnector.databaseName + "." + tableName; + String loadParquetInTableQuery = "load data inpath '" + remoteParquetDataLocation + "' into table " + DatabaseConnector.databaseName + "." + tableName; try { jdbcTemplate.execute(loadParquetInTableQuery); } catch (Exception e) { @@ -471,7 +471,7 @@ public class ParquetFileUtils { return false; // Since each thread is using a different subDir, by design, This error is unacceptable. } } - ImpalaConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged. + DatabaseConnector.handleQueryException("loadParquetInTableQuery", loadParquetInTableQuery, e); // It's already logged. return false; } //logger.trace("The data from \"" + remoteParquetDataLocation + "\" was loaded into the " + tableName + " table."); // DEBUG!