diff --git a/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java b/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java index d549c5e..902ef6f 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/StatsController.java @@ -58,7 +58,10 @@ public class StatsController { final String getPayloadsNumberForDatasourceQuery = "select count(p.id) from " + ImpalaConnector.databaseName + ".payload p\n" + " join " + ImpalaConnector.databaseName + ".publication pu on pu.id=p.id and pu.datasourceid=\"" + datasourceId + "\""; - logger.trace("getPayloadsNumberForDatasourceQuery:\n" + getPayloadsNumberForDatasourceQuery); + + if ( logger.isTraceEnabled() ) + logger.trace("getPayloadsNumberForDatasourceQuery:\n" + getPayloadsNumberForDatasourceQuery); + return statsService.getNumberOfPayloads(getPayloadsNumberForDatasourceQuery, "payloads related to datasourceId \"" + datasourceId + "\""); } diff --git a/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java index 182d90b..359292d 100644 --- a/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java @@ -100,7 +100,8 @@ public class FullTextsServiceImpl implements FullTextsService { return false; } - logger.trace("fileLocations:\n" + fileLocations); + if ( logger.isTraceEnabled() ) + logger.trace("fileLocations:\n" + fileLocations); String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/". try { @@ -268,7 +269,9 @@ public class FullTextsServiceImpl implements FullTextsService { // Construct the parquet file, upload it to HDFS and load them it in the "payload_bulk_import" table. String parquetFileName = "payloads_" + segmentCounter + ".parquet"; String fullLocalParquetFilePath = localParquetDir + parquetFileName; - logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG! + + if ( logger.isTraceEnabled() ) + logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG! if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: '" + parquetFileName + "'!"); @@ -276,9 +279,9 @@ public class FullTextsServiceImpl implements FullTextsService { // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; } - //logger.trace("Parquet file '" + parquetFileName + "' was created and filled."); // DEBUG! - logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS."); // DEBUG! + if ( logger.isTraceEnabled() ) + logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS."); // DEBUG! // Upload and insert the data to the "payload" Impala table. (no database-locking is required) String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); @@ -289,7 +292,8 @@ public class FullTextsServiceImpl implements FullTextsService { return numOfFilesInSegment; } - logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"."); // DEBUG! + if ( logger.isTraceEnabled() ) + logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"."); // DEBUG! ImpalaConnector.databaseLock.lock(); if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) { @@ -371,7 +375,6 @@ public class FullTextsServiceImpl implements FullTextsService { // openaire id = + "::" + String openAireId = (datasourcePrefix + "::" + idMd5hash); - //logger.trace("openAireId: " + openAireId); String s3Url = null; diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index d1a5027..eb8487b 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -68,7 +68,8 @@ public class UrlsServiceImpl implements UrlsService { if ( bulkImportSources.isEmpty() ) return; // So the "excludedDatasourceIDsStringList" -code should be placed last in this Constructor-method. - logger.trace("BulkImportSources:\n" + bulkImportSources); + if ( logger.isTraceEnabled() ) + logger.trace("BulkImportSources:\n" + bulkImportSources); List excludedIDs = new ArrayList<>(); for ( BulkImport.BulkImportSource source : bulkImportSources.values() ) { @@ -128,7 +129,7 @@ public class UrlsServiceImpl implements UrlsService { // The "order by" in the end makes sure the older attempted records will be re-attempted after a long time. - logger.trace("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG! + //logger.trace("findAssignmentsQuery:\n" + findAssignmentsQuery); // DEBUG! final String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment"; diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index f197f23..009f6f1 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -58,8 +58,10 @@ public class FileUtils { public static final String workingDir = System.getProperty("user.dir") + File.separator; + private boolean isTestEnvironment; - public FileUtils (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation) { + + public FileUtils (@Value("${services.pdfaggregation.controller.baseFilesLocation}") String baseFilesLocation, @Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment) { if ( !baseFilesLocation.endsWith(File.separator) ) baseFilesLocation += File.separator; @@ -67,6 +69,8 @@ public class FileUtils { baseFilesLocation = workingDir + baseFilesLocation; this.baseFilesLocation = baseFilesLocation; + + this.isTestEnvironment = isTestEnvironment; } @@ -139,7 +143,7 @@ public class FileUtils { SetMultimap allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((urlReportsSize / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. - final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1"; + final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload" + (isTestEnvironment ? "_aggregated" : "") + " where `hash` = ? limit 1"; final int[] hashArgType = new int[] {Types.VARCHAR}; List> callableTasks = new ArrayList<>(6); @@ -174,7 +178,8 @@ public class FileUtils { if ( alreadyFoundFileLocation != null ) { // If the full-text of this record is already-found and uploaded. payload.setLocation(alreadyFoundFileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical. - //logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG! + if ( logger.isTraceEnabled() ) + logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG! numFilesFoundFromPreviousAssignmentsBatches.incrementAndGet(); numFullTextsFound.incrementAndGet(); return null; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later. @@ -447,7 +452,6 @@ public class FileUtils { logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName); return null; } - if ( hash == null ) { logger.error("The retrieved \"hash\" was \"null\" for file: " + fileName); return null; diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index 2352285..a6e0296 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -414,7 +414,7 @@ public class ParquetFileUtils { conn.setInstanceFollowRedirects(true); // It is possible that the "location" was an intermediate one. conn.connect(); - // Write the parquet file. + // Upload the parquet file. try ( BufferedInputStream inputS = new BufferedInputStream(Files.newInputStream(parquetFile.toPath()), FileUtils.tenMb); BufferedOutputStream outS = new BufferedOutputStream(conn.getOutputStream(), FileUtils.tenMb) ) { @@ -604,7 +604,8 @@ public class ParquetFileUtils { logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true)); return false; } - logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + "\n" + fileUtils.getMessageFromResponseBody(conn, false)); + if ( logger.isTraceEnabled() ) + logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + "\n" + fileUtils.getMessageFromResponseBody(conn, false)); } catch (Exception e) { logger.error("", e); return false;