diff --git a/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java index 2864c6d..182d90b 100644 --- a/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java @@ -278,6 +278,8 @@ public class FullTextsServiceImpl implements FullTextsService { } //logger.trace("Parquet file '" + parquetFileName + "' was created and filled."); // DEBUG! + logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS."); // DEBUG! + // Upload and insert the data to the "payload" Impala table. (no database-locking is required) String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller. @@ -287,6 +289,8 @@ public class FullTextsServiceImpl implements FullTextsService { return numOfFilesInSegment; } + logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"."); // DEBUG! + ImpalaConnector.databaseLock.lock(); if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) { ImpalaConnector.databaseLock.unlock(); @@ -327,10 +331,6 @@ public class FullTextsServiceImpl implements FullTextsService { // Check if this file is already found by crawling. Even though we started excluding this datasource from crawling, many full-texts have already been downloaded. // Also, it may be the case that this file was downloaded by another datasource. - String fileHash = docFileData.getHash(); - if ( fileHash == null ) - return null; // No check of past found full-text can be made nor the S3-fileName can be created. - FileLocationData fileLocationData; try { fileLocationData = new FileLocationData(fileLocation); @@ -339,11 +339,15 @@ public class FullTextsServiceImpl implements FullTextsService { return null; } + String fileHash = docFileData.getHash(); + if ( fileHash == null ) + return null; // No check of past found full-text can be made nor the S3-fileName can be created. + String datasourceId = bulkImportSource.getDatasourceID(); String datasourcePrefix = bulkImportSource.getDatasourcePrefix(); String fileNameID = fileLocationData.getFileNameID(); - String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This is the urls with the ArvixId. + String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources. String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link. final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1"; diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index 4733fd5..2352285 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -29,10 +29,10 @@ import org.springframework.http.ResponseEntity; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; @@ -372,7 +372,7 @@ public class ParquetFileUtils { // Check if the parquet file exists locally. File parquetFile = new File(parquetFileFullLocalPath); if ( ! parquetFile.isFile() ) { - String errorMsg = "The parquet file \"" + parquetFileFullLocalPath + "\" does not exist!"; + String errorMsg = "The local parquet file \"" + parquetFileFullLocalPath + "\" does not exist!"; logger.error(errorMsg); return errorMsg; } @@ -411,12 +411,16 @@ public class ParquetFileUtils { conn.setRequestProperty("Authorization", hdfsHttpAuthString); conn.setRequestProperty("content-type", "application/octet-stream"); conn.setDoOutput(true); - conn.setInstanceFollowRedirects(true); // It is possible that the "location was" + conn.setInstanceFollowRedirects(true); // It is possible that the "location" was an intermediate one. conn.connect(); // Write the parquet file. - try ( InputStream inputS = Files.newInputStream(parquetFile.toPath()); OutputStream outS = conn.getOutputStream()) { - int readByte = -1; while ( (readByte = inputS.read()) != -1 ) outS.write(readByte); + try ( BufferedInputStream inputS = new BufferedInputStream(Files.newInputStream(parquetFile.toPath()), FileUtils.tenMb); + BufferedOutputStream outS = new BufferedOutputStream(conn.getOutputStream(), FileUtils.tenMb) ) + { + int readByte = -1; + while ( (readByte = inputS.read()) != -1 ) + outS.write(readByte); } // Any exception will be caught in the end of this method. statusCode = conn.getResponseCode(); @@ -500,83 +504,84 @@ public class ParquetFileUtils { attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams); payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams); payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams); + return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful); + // We need all directories to be created in order for the app to function properly! } - else { - // Check the json-response, to see if all the subdirectories exist. - // Take the json-response. - String jsonResponse = fileUtils.getMessageFromResponseBody(conn, false); - if ( (jsonResponse == null) || jsonResponse.isEmpty() ) { - logger.error("The \"jsonResponse\" was not retrieved!"); - return false; - } - // Else, if an error message exists inside the response, then we will be alerted when parsing the Json bellow. + // Check the json-response, to see if all the subdirectories exist. - //logger.trace("\"jsonResponse\":\n" + jsonResponse); // DEBUG! - - boolean foundAttemptsDir = false; - boolean foundPayloadsAggregatedDir = false; - boolean foundPayloadsBulkImportDir = false; - - try { // Parse the jsonData - JSONObject jObj = new JSONObject(jsonResponse); // Construct a JSONObject from the retrieved jsonData. - JSONObject entityObject = jObj.getJSONObject("FileStatuses"); - //logger.trace("EntityObject: " + entityObject.toString()); // DEBUG! - - JSONArray directoryStatuses = entityObject.getJSONArray("FileStatus"); - //logger.trace("directoryStatuses: " + directoryStatuses.toString()); // DEBUG! - - // In case no fileStatuses are found, the follow for-loop will not run. - for ( Object fileStatusObject : directoryStatuses ) { - JSONObject fileStatusJsonObject = (JSONObject) fileStatusObject; - //logger.trace("FileStatusJsonObject: " + fileStatusJsonObject.toString()); // DEBUG! - - String dirPath = fileStatusJsonObject.getString("pathSuffix"); - //logger.trace("DirPath: " + dirPath); // DEBUG! - - if ( dirPath.equals("attempts") ) - foundAttemptsDir = true; - else if ( dirPath.equals("payloads_aggregated") ) - foundPayloadsAggregatedDir = true; - else if ( dirPath.equals("payloads_bulk_import") ) - foundPayloadsBulkImportDir = true; - else - logger.warn("Unknown remote parquet directory found: " + dirPath); - } - } catch ( JSONException je ) { // In case any of the above "json-keys" was not found. - logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse); - return false; - } - - // IMPORTANT NOTE: It is possible that the ".../attempts" dir exists, but the ".../payloads" dir does not and vise-versa (in case of remote filesystem failure of by accidental deletion by some other user). - // Also, it is possible that the Controller was terminated before creating all the directories, or that in the previous executions the second "create"-request failed, resulting in Controller's shut down. - - // For each missing subdirectories, run the mkDirs-request. - if ( !foundAttemptsDir ) { - logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it."); - attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams); - } else - logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" exists."); - - if ( !foundPayloadsAggregatedDir ) { - logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it."); - payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams); - } else - logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists."); - - if ( !foundPayloadsBulkImportDir ) { - logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it."); - payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams); - } else - logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists."); + // Take the json-response. + String jsonResponse = fileUtils.getMessageFromResponseBody(conn, false); + if ( (jsonResponse == null) || jsonResponse.isEmpty() ) { + logger.error("The \"jsonResponse\" was not retrieved!"); + return false; } + // Else, if an error message exists inside the response, then we will be alerted when parsing the Json bellow. + + //logger.trace("\"jsonResponse\":\n" + jsonResponse); // DEBUG! + + boolean foundAttemptsDir = false; + boolean foundPayloadsAggregatedDir = false; + boolean foundPayloadsBulkImportDir = false; + + try { // Parse the jsonData + JSONObject jObj = new JSONObject(jsonResponse); // Construct a JSONObject from the retrieved jsonData. + JSONObject entityObject = jObj.getJSONObject("FileStatuses"); + //logger.trace("EntityObject: " + entityObject.toString()); // DEBUG! + + JSONArray directoryStatuses = entityObject.getJSONArray("FileStatus"); + //logger.trace("directoryStatuses: " + directoryStatuses.toString()); // DEBUG! + + // In case no fileStatuses are found, the follow for-loop will not run. + for ( Object fileStatusObject : directoryStatuses ) { + JSONObject fileStatusJsonObject = (JSONObject) fileStatusObject; + //logger.trace("FileStatusJsonObject: " + fileStatusJsonObject.toString()); // DEBUG! + + String dirPath = fileStatusJsonObject.getString("pathSuffix"); + //logger.trace("DirPath: " + dirPath); // DEBUG! + + if ( dirPath.equals("attempts") ) + foundAttemptsDir = true; + else if ( dirPath.equals("payloads_aggregated") ) + foundPayloadsAggregatedDir = true; + else if ( dirPath.equals("payloads_bulk_import") ) + foundPayloadsBulkImportDir = true; + else + logger.warn("Unknown remote parquet directory found: " + dirPath); + } + } catch ( JSONException je ) { // In case any of the above "json-keys" was not found. + logger.warn("JSON Exception was thrown while trying to retrieve the subdirectories \"attempts\" and \"payloads\": " + je.getMessage() + "\n\nJsonResponse: " + jsonResponse); + return false; + } + + // IMPORTANT NOTE: It is possible that the ".../attempts" dir exists, but the ".../payloads" dir does not and vise-versa (in case of remote filesystem failure of by accidental deletion by some other user). + // Also, it is possible that the Controller was terminated before creating all the directories, or that in the previous executions the second "create"-request failed, resulting in Controller's shut down. + + // For each missing subdirectories, run the mkDirs-request. + if ( !foundAttemptsDir ) { + logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it."); + attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams); + } else + logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" exists."); + + if ( !foundPayloadsAggregatedDir ) { + logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it."); + payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams); + } else + logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists."); + + if ( !foundPayloadsBulkImportDir ) { + logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it."); + payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams); + } else + logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists."); + + return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful); + // We need all directories to be created in order for the app to function properly! } catch (Exception e) { logger.error("", e); return false; } - - return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful); - // We need all directories to be created in order for the app to function properly! }