From c9626de120ec227f850864fcca110e885de08d25 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Wed, 4 Oct 2023 13:01:13 +0300 Subject: [PATCH] Handle the case when the "upload-file-to-S3" operation fails with a "ConnectException". In this case, all remaining upload operations for the files of that particular batch or segment, are canceled. --- .../services/BulkImportServiceImpl.java | 30 ++-- .../urls_controller/util/FileUtils.java | 148 ++++++++++-------- 2 files changed, 102 insertions(+), 76 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index bb930f1..698d1c4 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -21,6 +21,7 @@ import org.springframework.stereotype.Service; import javax.xml.bind.DatatypeConverter; import java.io.File; +import java.net.ConnectException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -243,8 +244,20 @@ public class BulkImportServiceImpl implements BulkImportService { int counter = 0; // Upload files to S3 and collect payloadRecords. - for ( String fileLocation: fileLocationsSegment ) { - GenericData.Record record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg); + for ( int i=0; i < numOfFilesInSegment; ++i ) { + String fileLocation = fileLocationsSegment.get(i); + GenericData.Record record = null; + try { + record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg); + } catch (ConnectException ce) { + String errorMsg = "ConnectException when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading any file for this segment.."; + logger.error(errorMsg + additionalLoggingMsg); + bulkImportReport.addEvent(errorMsg); + for ( int j=i; j < numOfFilesInSegment; ++j ) + failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed". + break; + } + if ( record != null ) payloadRecords.add(record); else { @@ -343,6 +356,7 @@ public class BulkImportServiceImpl implements BulkImportService { private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg) + throws ConnectException { File fullTextFile = new File(fileLocation); DocFileData docFileData = new DocFileData(fullTextFile, null, null, null); @@ -401,16 +415,8 @@ public class BulkImportServiceImpl implements BulkImportService { // The above analysis is educational, it does not need to take place and is not currently used. s3Url = alreadyFoundFileLocation; - } else { - try { - s3Url = fileUtils.constructFileNameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), datasourceId, fileHash); // This throws Exception, in case the uploading failed. - if ( s3Url == null ) - return null; // In case the 'datasourceID' or 'hash' is null. Which should never happen here, since both of them are checked before the execution reaches here. - } catch (Exception e) { - logger.error("Could not upload the file '" + fileLocationData.getFileName() + "' to the S3 ObjectStore!" + additionalLoggingMsg, e); - return null; - } - } + } else + s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), fileNameID, fileLocationData.getDotFileExtension(), datasourceId, fileHash); GenericData.Record record = new GenericData.Record(ParquetFileUtils.payloadsSchema); record.put("id", openAireId); diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index cbd0e94..3b91c04 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -18,6 +18,7 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import java.io.*; +import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.URL; import java.nio.file.Files; @@ -487,66 +488,68 @@ public class FileUtils { } // Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end). - try { - // Prepare the filename as: "datasourceid/publicationid::hash.pdf" - // All related payloads point to this exact same file, BUT, may be related with different urlIDs, which in turn be related with different datasourceIDs. - // This file could have been found from different urlIds and thus be related to multiple datasourceIds. - // BUT, since the filename contains a specific urlID, the datasourceId should be the one related to that specific urlID. - // So, we extract this urlID, search the payload inside the "fileRelatedPayloads" and get the related datasourceID (instead of taking the first or a random datasourceID). - Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileName); - if ( !matcher.matches() ) { - logger.error("Failed to match the \"" + fileName + "\" with the regex: " + FILENAME_ID_EXTENSION); - continue; - } - // The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used. - String fileNameID = matcher.group(4); - if ( (fileNameID == null) || fileNameID.isEmpty() ) { - logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\"."); - continue; - } - String dotFileExtension = matcher.group(5); - if ( (dotFileExtension == null) || dotFileExtension.isEmpty() ) { - logger.error("Failed to extract the \"dotFileExtension\" from \"" + fileName + "\"."); - continue; - } + // Prepare the filename as: "datasourceid/publicationid::hash.pdf" + // All related payloads point to this exact same file, BUT, may be related with different urlIDs, which in turn be related with different datasourceIDs. + // This file could have been found from different urlIds and thus be related to multiple datasourceIds. + // BUT, since the filename contains a specific urlID, the datasourceId should be the one related to that specific urlID. + // So, we extract this urlID, search the payload inside the "fileRelatedPayloads" and get the related datasourceID (instead of taking the first or a random datasourceID). - // This file is related with some payloads, in a sense that these payloads have urls which lead to the same full-text url. - // These payloads might have different IDs and sourceUrls. But, in the end, the different sourceUrls give the same full-text. - // Below, we make sure we pick the "datasource" from the payload, which has the same id as the full-text's name. - // If there are multiple payloads with the same id, which point to the same file, then we can take whatever datasource we want from those payloads. - // It is possible that payloads with same IDs, but different sourceUrls pointing to the same full-text, can be related with different datasources - // (especially for IDs of type: "doiboost_____::XXXXXXXXXXXXXXXXXXXXX"). - // It does not really matter, since the first-ever payload to give this full-text could very well be another one, - // since the crawling happens in multiple threads which compete with each other for CPU time. - - String datasourceId = null; - String hash = null; - boolean isFound = false; - for ( Payload payload : fileRelatedPayloads ) { - if ( fileNameID.equals(payload.getId()) ) { - datasourceId = payload.getDatasourceId(); - hash = payload.getHash(); - isFound = true; - break; - } - } - - if ( !isFound ) { // This should never normally happen. If it does, then a very bad change will have taken place. - logger.error("The \"fileNameID\" (" + fileNameID + ") was not found inside the \"fileRelatedPayloads\" for fileName: " + fileName); - continue; - } - - String s3Url = constructFileNameAndUploadToS3(targetDirectory, fileName, fileNameID, dotFileExtension, datasourceId, hash); - if ( s3Url == null ) - continue; - - setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url); - //numUploadedFiles ++; - } catch (Exception e) { - logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore!", e); + Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileName); + if ( !matcher.matches() ) { + logger.error("Failed to match the \"" + fileName + "\" with the regex: " + FILENAME_ID_EXTENSION); + continue; } - // Else, the record will have its file-data set to "null", in the end of this method. + // The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used. + // Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName). + String fileNameID = matcher.group(4); + if ( (fileNameID == null) || fileNameID.isEmpty() ) { + logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\"."); + continue; + } + String dotFileExtension = matcher.group(5); + if ( (dotFileExtension == null) || dotFileExtension.isEmpty() ) { + logger.error("Failed to extract the \"dotFileExtension\" from \"" + fileName + "\"."); + continue; + } + + // This file is related with some payloads, in a sense that these payloads have urls which lead to the same full-text url. + // These payloads might have different IDs and sourceUrls. But, in the end, the different sourceUrls give the same full-text. + // Below, we make sure we pick the "datasource" from the payload, which has the same id as the full-text's name. + // If there are multiple payloads with the same id, which point to the same file, then we can take whatever datasource we want from those payloads. + // It is possible that payloads with same IDs, but different sourceUrls pointing to the same full-text, can be related with different datasources + // (especially for IDs of type: "doiboost_____::XXXXXXXXXXXXXXXXXXXXX"). + // It does not really matter, since the first-ever payload to give this full-text could very well be another one, + // since the crawling happens in multiple threads which compete with each other for CPU time. + + String datasourceId = null; + String hash = null; + boolean isFound = false; + for ( Payload payload : fileRelatedPayloads ) { + if ( fileNameID.equals(payload.getId()) ) { + datasourceId = payload.getDatasourceId(); + hash = payload.getHash(); + isFound = true; + break; + } + } + + if ( !isFound ) { // This should never normally happen. If it does, then a very bad change will have taken place. + logger.error("The \"fileNameID\" (" + fileNameID + ") was not found inside the \"fileRelatedPayloads\" for fileName: " + fileName); + continue; + } + + try { + String s3Url = constructS3FilenameAndUploadToS3(targetDirectory, fileName, fileNameID, dotFileExtension, datasourceId, hash); + if (s3Url != null) { + setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url); + //numUploadedFiles ++; + } + } catch (ConnectException ce) { + logger.error("Avoid uploading the rest of the files of this batch.."); + break; + } + // Else, the record will have its file-data set to "null", in the end of the caller method (as it will not have an s3Url as its location). }//end filenames-for-loop //logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -2) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore."); @@ -554,7 +557,29 @@ public class FileUtils { } - public String constructFileNameAndUploadToS3(String fileDir, String fileName, String openAireID, String dotFileExtension, String datasourceId, String hash) throws Exception + public String constructS3FilenameAndUploadToS3(String targetDirectory, String fileName, String fileNameID, + String dotFileExtension, String datasourceId, String hash) throws ConnectException + { + String filenameForS3 = constructS3FileName(fileName, fileNameID, dotFileExtension, datasourceId, hash); // This name is for the uploaded file, in the S3 Object Store. + if ( filenameForS3 == null ) // The error is logged inside. + return null; + + String fileFullPath = targetDirectory + File.separator + fileName; // The fullPath to the local file (which has the previous name). + String s3Url = null; + try { + s3Url = s3ObjectStore.uploadToS3(filenameForS3, fileFullPath); + } catch (ConnectException ce) { + logger.error("Could not connect with the S3 Object Store! " + ce.getMessage()); + throw ce; + } catch (Exception e) { + logger.error("Could not upload the local-file \"" + fileFullPath + "\" to the S3 ObjectStore, with S3-filename: \"" + filenameForS3 + "\"!", e); + return null; + } + return s3Url; + } + + + public String constructS3FileName(String fileName, String openAireID, String dotFileExtension, String datasourceId, String hash) { if ( datasourceId == null ) { logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName); @@ -565,13 +590,8 @@ public class FileUtils { return null; } - String fileFullPath = fileDir + File.separator + fileName; // The fullPath to the local file. - - // Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName). // Now we append the file-hash, so it is guaranteed that the filename will be unique. - fileName = datasourceId + "/" + openAireID + "::" + hash + dotFileExtension; // This is the fileName to be used in the objectStore, not of the local file! - - return s3ObjectStore.uploadToS3(fileName, fileFullPath); + return datasourceId + "/" + openAireID + "::" + hash + dotFileExtension; // This is the fileName to be used in the objectStore, not of the local file! }