Upgrade the "processBulkImportedFilesSegment" code:

1) Pre-calculate the file-hashes for all files of the segment and perform a single "getHashLocationsQuery", instead of thousands
2) Write some important events to the bulkImportReport file, as soon as they are added in the list.
This commit is contained in:
Lampros Smyrnaios 2024-04-02 14:35:19 +03:00
parent bd323ad69a
commit e2d43a9af0
2 changed files with 78 additions and 70 deletions

View File

@ -15,8 +15,6 @@ import org.apache.avro.generic.GenericData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.xml.bind.DatatypeConverter;
@ -27,10 +25,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@ -44,18 +39,12 @@ public class BulkImportServiceImpl implements BulkImportService {
private static final Logger logger = LoggerFactory.getLogger(BulkImportServiceImpl.class);
@Autowired
private FileUtils fileUtils;
@Autowired
private ParquetFileUtils parquetFileUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service.
@ -244,6 +233,43 @@ public class BulkImportServiceImpl implements BulkImportService {
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
HashMap<String, DocFileData> fileLocationsWithData = new HashMap<>(numOfFilesInSegment); // Map the fileLocations with hashes in order to quickly retrieve them later, when constructing the s3-file-location.
Set<String> fileHashes = new HashSet<>(numOfFilesInSegment);
// Get file-data for this segment's files.
for ( String fileLocation : fileLocationsSegment ) {
DocFileData docFileData = new DocFileData(new File(fileLocation), null, null, fileLocation);
docFileData.calculateAndSetHashAndSize();
String fileHash = docFileData.getHash();
if ( fileHash != null ) { // We will not work with files for which we cannot produce the hash, since the s3-file-location requires it.
fileHashes.add(fileHash);
fileLocationsWithData.put(fileLocation, docFileData);
}
}
int fileHashesSetSize = fileHashes.size();
if ( fileHashesSetSize == 0 ) {
msg = "No fileHashes were retrieved for segment_" + segmentCounter + ", no files will be processed from it.";
logger.warn(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return numOfFilesInSegment; // All files of this segment failed.
}
// Get the "alreadyFound" fileLocations for the fileHashes which exist in the DB.
HashMap<String, String> hashWithExistingLocationMap = fileUtils.getHashLocationMap(fileHashes, fileHashesSetSize, segmentCounter, "segment");
int numAlreadyRetrievedFiles = hashWithExistingLocationMap.size();
if ( numAlreadyRetrievedFiles > 0 ) {
msg = numAlreadyRetrievedFiles + " files from segment_" + segmentCounter + ", have been already retrieved in the past.";
logger.warn(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
// Generate the payload-records to be inserted in the DB.
// The OpenAireID which is synthesised from the fileName, may be different from the ID related to the exact same file which was retrieved in the past.
// The same file may be related with multiple publications from different datasources. The deduplication is out of the scope of this service.
// All different OpenAIRE IDs should be saved, independently of the fact that their urls or files may be related with other publications as well.
List<GenericData.Record> payloadRecords = new ArrayList<>(numOfFilesInSegment);
// Use a HashSet for the failed files, in order to not remove them in the end.
@ -253,23 +279,30 @@ public class BulkImportServiceImpl implements BulkImportService {
// Upload files to S3 and collect payloadRecords.
for ( int i=0; i < numOfFilesInSegment; ++i ) {
String fileLocation = fileLocationsSegment.get(i);
GenericData.Record record = null;
try {
record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg);
} catch (Exception e) {
String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading the rest of the files for this segment..";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
for ( int j=i; j < numOfFilesInSegment; ++j )
failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed".
break;
}
if ( record != null )
payloadRecords.add(record);
else {
bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!");
DocFileData docFileData = fileLocationsWithData.get(fileLocation);
if ( docFileData == null ) {
failedFiles.add(fileLocation);
} else {
String alreadyRetrievedFileLocation = hashWithExistingLocationMap.get(docFileData.getHash());
GenericData.Record record = null;
try {
record = processBulkImportedFile(docFileData, alreadyRetrievedFileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg);
} catch (Exception e) {
String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading the rest of the files for this segment.. " + e.getMessage();
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
for ( int j=i; j < numOfFilesInSegment; ++j )
failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed".
break;
}
if ( record != null )
payloadRecords.add(record);
else {
bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!");
failedFiles.add(fileLocation);
}
}
if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment and write it to the file.
@ -294,6 +327,7 @@ public class BulkImportServiceImpl implements BulkImportService {
String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment-" + segmentCounter + " !";
logger.warn(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
// Construct the parquet file, upload it to HDFS and load it in the "payload_bulk_import" table.
@ -362,71 +396,45 @@ public class BulkImportServiceImpl implements BulkImportService {
}
private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg)
private GenericData.Record processBulkImportedFile(DocFileData docFileData, String alreadyRetrievedFileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg)
throws ConnectException, UnknownHostException
{
DocFileData docFileData = new DocFileData(new File(fileLocation), null, null, fileLocation);
docFileData.calculateAndSetHashAndSize();
// Check if this file is already found by crawling. Even though we started excluding this datasource from crawling, many full-texts have already been downloaded.
// Also, it may be the case that this file was downloaded by another datasource.
FileLocationData fileLocationData;
try {
fileLocationData = new FileLocationData(fileLocation);
fileLocationData = new FileLocationData(docFileData.getLocation());
} catch (RuntimeException re) {
logger.error(re.getMessage() + additionalLoggingMsg);
return null;
}
String fileHash = docFileData.getHash();
if ( fileHash == null )
return null; // No check of past found full-text can be made nor the S3-fileName can be created.
String datasourceId = bulkImportSource.getDatasourceID();
String datasourcePrefix = bulkImportSource.getDatasourcePrefix();
String fileNameID = fileLocationData.getFileNameID();
String openAireId = generateOpenaireId(fileNameID, datasourcePrefix, bulkImportSource.getIsAuthoritative());
String openAireId = generateOpenaireId(fileNameID, bulkImportSource.getDatasourcePrefix(), bulkImportSource.getIsAuthoritative());
if ( openAireId == null )
return null;
String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources.
String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link.
final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload where `hash` = ? limit 1";
final int[] hashArgType = new int[] {Types.VARCHAR};
String alreadyFoundFileLocation = null;
DatabaseConnector.databaseLock.lock();
try {
alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[]{fileHash}, hashArgType, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!" + additionalLoggingMsg, e);
// Continue with bulk-importing the file and uploading it to S3.
} finally {
DatabaseConnector.databaseLock.unlock();
}
String fileHash = docFileData.getHash(); // It's guaranteed to NOT be null at this point.
String s3Url = null;
if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded.
if ( alreadyRetrievedFileLocation != null ) // If the full-text of this record is already-found and uploaded.
{
// This full-text was found to already be in the database.
// If it has the same datasourceID, then it likely was crawled before from an ID belonging to this datasource.
// If also has the same ID, then the exact same record from that datasource was retrieved previously.
// If also, has the same ID, then the exact same record from that datasource was retrieved previously.
// Else, the file was downloaded by another record of this datasource.
// ELse if the datasourceID is not the same, then the same file was retrieved from another datasource.
// The above analysis is educational, it does not need to take place and is not currently used.
s3Url = alreadyFoundFileLocation;
s3Url = alreadyRetrievedFileLocation;
} else {
s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), datasourceId, fileHash);
s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), bulkImportSource.getDatasourceID(), fileHash);
if ( s3Url == null )
return null;
}
// TODO - If another url-schema is introduced for other datasources, have a "switch"-statement and perform the right "actualUrl"-creation based on current schema.
String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources.
String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link.
return parquetFileUtils.getPayloadParquetRecord(openAireId, originalUrl, actualUrl, timeMillis, bulkImportSource.getMimeType(),
docFileData.getSize(), fileHash, s3Url, provenance, true); // It may return null.
}

View File

@ -121,7 +121,7 @@ public class FileUtils {
return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
HashMap<String, String> hashLocationMap = getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter);
HashMap<String, String> hashLocationMap = getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter, "assignments");
for ( String fileHash : fileHashes )
{
@ -302,7 +302,7 @@ public class FileUtils {
}
public HashMap<String, String> getHashLocationMap(Set<String> fileHashes, int fileHashesSetSize, long assignmentsBatchCounter)
public HashMap<String, String> getHashLocationMap(Set<String> fileHashes, int fileHashesSetSize, long batchCounter, String groupType)
{
// Prepare the "fileHashListString" to be used inside the "getHashLocationsQuery". Create the following string-pattern:
// ("HASH_1", "HASH_2", ...)
@ -323,9 +323,9 @@ public class FileUtils {
}
});
} catch (EmptyResultDataAccessException erdae) {
logger.warn("No previously-found hash-locations where found for assignments_" + assignmentsBatchCounter);
logger.warn("No previously-found hash-locations where found for " + groupType + "_" + batchCounter);
} catch (Exception e) {
logger.error("Unexpected error when checking for already-found file-hashes!", e);
logger.error("Unexpected error when checking for already-found file-hashes, for " + groupType + "_" + batchCounter, e);
// We will continue with storing the files, we do not want to lose them.
} finally {
DatabaseConnector.databaseLock.unlock();