Handle the case when the "upload-file-to-S3" operation fails with a "ConnectException". In this case, all remaining upload operations for the files of that particular batch or segment, are canceled.

This commit is contained in:
Lampros Smyrnaios 2023-10-04 13:01:13 +03:00
parent 865926fbc3
commit c9626de120
2 changed files with 102 additions and 76 deletions

View File

@ -21,6 +21,7 @@ import org.springframework.stereotype.Service;
import javax.xml.bind.DatatypeConverter; import javax.xml.bind.DatatypeConverter;
import java.io.File; import java.io.File;
import java.net.ConnectException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
@ -243,8 +244,20 @@ public class BulkImportServiceImpl implements BulkImportService {
int counter = 0; int counter = 0;
// Upload files to S3 and collect payloadRecords. // Upload files to S3 and collect payloadRecords.
for ( String fileLocation: fileLocationsSegment ) { for ( int i=0; i < numOfFilesInSegment; ++i ) {
GenericData.Record record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg); String fileLocation = fileLocationsSegment.get(i);
GenericData.Record record = null;
try {
record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg);
} catch (ConnectException ce) {
String errorMsg = "ConnectException when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading any file for this segment..";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
for ( int j=i; j < numOfFilesInSegment; ++j )
failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed".
break;
}
if ( record != null ) if ( record != null )
payloadRecords.add(record); payloadRecords.add(record);
else { else {
@ -343,6 +356,7 @@ public class BulkImportServiceImpl implements BulkImportService {
private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg) private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg)
throws ConnectException
{ {
File fullTextFile = new File(fileLocation); File fullTextFile = new File(fileLocation);
DocFileData docFileData = new DocFileData(fullTextFile, null, null, null); DocFileData docFileData = new DocFileData(fullTextFile, null, null, null);
@ -401,16 +415,8 @@ public class BulkImportServiceImpl implements BulkImportService {
// The above analysis is educational, it does not need to take place and is not currently used. // The above analysis is educational, it does not need to take place and is not currently used.
s3Url = alreadyFoundFileLocation; s3Url = alreadyFoundFileLocation;
} else { } else
try { s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), fileNameID, fileLocationData.getDotFileExtension(), datasourceId, fileHash);
s3Url = fileUtils.constructFileNameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), datasourceId, fileHash); // This throws Exception, in case the uploading failed.
if ( s3Url == null )
return null; // In case the 'datasourceID' or 'hash' is null. Which should never happen here, since both of them are checked before the execution reaches here.
} catch (Exception e) {
logger.error("Could not upload the file '" + fileLocationData.getFileName() + "' to the S3 ObjectStore!" + additionalLoggingMsg, e);
return null;
}
}
GenericData.Record record = new GenericData.Record(ParquetFileUtils.payloadsSchema); GenericData.Record record = new GenericData.Record(ParquetFileUtils.payloadsSchema);
record.put("id", openAireId); record.put("id", openAireId);

View File

@ -18,6 +18,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.*; import java.io.*;
import java.net.ConnectException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.nio.file.Files; import java.nio.file.Files;
@ -487,7 +488,7 @@ public class FileUtils {
} }
// Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end). // Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end).
try {
// Prepare the filename as: "datasourceid/publicationid::hash.pdf" // Prepare the filename as: "datasourceid/publicationid::hash.pdf"
// All related payloads point to this exact same file, BUT, may be related with different urlIDs, which in turn be related with different datasourceIDs. // All related payloads point to this exact same file, BUT, may be related with different urlIDs, which in turn be related with different datasourceIDs.
// This file could have been found from different urlIds and thus be related to multiple datasourceIds. // This file could have been found from different urlIds and thus be related to multiple datasourceIds.
@ -500,6 +501,7 @@ public class FileUtils {
continue; continue;
} }
// The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used. // The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used.
// Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName).
String fileNameID = matcher.group(4); String fileNameID = matcher.group(4);
if ( (fileNameID == null) || fileNameID.isEmpty() ) { if ( (fileNameID == null) || fileNameID.isEmpty() ) {
logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\"."); logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\".");
@ -537,16 +539,17 @@ public class FileUtils {
continue; continue;
} }
String s3Url = constructFileNameAndUploadToS3(targetDirectory, fileName, fileNameID, dotFileExtension, datasourceId, hash); try {
if ( s3Url == null ) String s3Url = constructS3FilenameAndUploadToS3(targetDirectory, fileName, fileNameID, dotFileExtension, datasourceId, hash);
continue; if (s3Url != null) {
setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url); setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url);
//numUploadedFiles ++; //numUploadedFiles ++;
} catch (Exception e) {
logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore!", e);
} }
// Else, the record will have its file-data set to "null", in the end of this method. } catch (ConnectException ce) {
logger.error("Avoid uploading the rest of the files of this batch..");
break;
}
// Else, the record will have its file-data set to "null", in the end of the caller method (as it will not have an s3Url as its location).
}//end filenames-for-loop }//end filenames-for-loop
//logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -2) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore."); //logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -2) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
@ -554,7 +557,29 @@ public class FileUtils {
} }
public String constructFileNameAndUploadToS3(String fileDir, String fileName, String openAireID, String dotFileExtension, String datasourceId, String hash) throws Exception public String constructS3FilenameAndUploadToS3(String targetDirectory, String fileName, String fileNameID,
String dotFileExtension, String datasourceId, String hash) throws ConnectException
{
String filenameForS3 = constructS3FileName(fileName, fileNameID, dotFileExtension, datasourceId, hash); // This name is for the uploaded file, in the S3 Object Store.
if ( filenameForS3 == null ) // The error is logged inside.
return null;
String fileFullPath = targetDirectory + File.separator + fileName; // The fullPath to the local file (which has the previous name).
String s3Url = null;
try {
s3Url = s3ObjectStore.uploadToS3(filenameForS3, fileFullPath);
} catch (ConnectException ce) {
logger.error("Could not connect with the S3 Object Store! " + ce.getMessage());
throw ce;
} catch (Exception e) {
logger.error("Could not upload the local-file \"" + fileFullPath + "\" to the S3 ObjectStore, with S3-filename: \"" + filenameForS3 + "\"!", e);
return null;
}
return s3Url;
}
public String constructS3FileName(String fileName, String openAireID, String dotFileExtension, String datasourceId, String hash)
{ {
if ( datasourceId == null ) { if ( datasourceId == null ) {
logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName); logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName);
@ -565,13 +590,8 @@ public class FileUtils {
return null; return null;
} }
String fileFullPath = fileDir + File.separator + fileName; // The fullPath to the local file.
// Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName).
// Now we append the file-hash, so it is guaranteed that the filename will be unique. // Now we append the file-hash, so it is guaranteed that the filename will be unique.
fileName = datasourceId + "/" + openAireID + "::" + hash + dotFileExtension; // This is the fileName to be used in the objectStore, not of the local file! return datasourceId + "/" + openAireID + "::" + hash + dotFileExtension; // This is the fileName to be used in the objectStore, not of the local file!
return s3ObjectStore.uploadToS3(fileName, fileFullPath);
} }