diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index beda9ea..9407e86 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -15,8 +15,6 @@ import org.apache.avro.generic.GenericData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.EmptyResultDataAccessException; -import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import javax.xml.bind.DatatypeConverter; @@ -27,10 +25,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.security.MessageDigest; -import java.sql.Types; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -44,18 +39,12 @@ public class BulkImportServiceImpl implements BulkImportService { private static final Logger logger = LoggerFactory.getLogger(BulkImportServiceImpl.class); - @Autowired private FileUtils fileUtils; - @Autowired private ParquetFileUtils parquetFileUtils; - @Autowired - private JdbcTemplate jdbcTemplate; - - /** * Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service. @@ -244,6 +233,43 @@ public class BulkImportServiceImpl implements BulkImportService { bulkImportReport.addEvent(msg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + HashMap fileLocationsWithData = new HashMap<>(numOfFilesInSegment); // Map the fileLocations with hashes in order to quickly retrieve them later, when constructing the s3-file-location. + Set fileHashes = new HashSet<>(numOfFilesInSegment); + + // Get file-data for this segment's files. + for ( String fileLocation : fileLocationsSegment ) { + DocFileData docFileData = new DocFileData(new File(fileLocation), null, null, fileLocation); + docFileData.calculateAndSetHashAndSize(); + String fileHash = docFileData.getHash(); + if ( fileHash != null ) { // We will not work with files for which we cannot produce the hash, since the s3-file-location requires it. + fileHashes.add(fileHash); + fileLocationsWithData.put(fileLocation, docFileData); + } + } + int fileHashesSetSize = fileHashes.size(); + if ( fileHashesSetSize == 0 ) { + msg = "No fileHashes were retrieved for segment_" + segmentCounter + ", no files will be processed from it."; + logger.warn(msg + additionalLoggingMsg); + bulkImportReport.addEvent(msg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + return numOfFilesInSegment; // All files of this segment failed. + } + + // Get the "alreadyFound" fileLocations for the fileHashes which exist in the DB. + HashMap hashWithExistingLocationMap = fileUtils.getHashLocationMap(fileHashes, fileHashesSetSize, segmentCounter, "segment"); + int numAlreadyRetrievedFiles = hashWithExistingLocationMap.size(); + if ( numAlreadyRetrievedFiles > 0 ) { + msg = numAlreadyRetrievedFiles + " files from segment_" + segmentCounter + ", have been already retrieved in the past."; + logger.warn(msg + additionalLoggingMsg); + bulkImportReport.addEvent(msg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + } + + // Generate the payload-records to be inserted in the DB. + // The OpenAireID which is synthesised from the fileName, may be different from the ID related to the exact same file which was retrieved in the past. + // The same file may be related with multiple publications from different datasources. The deduplication is out of the scope of this service. + // All different OpenAIRE IDs should be saved, independently of the fact that their urls or files may be related with other publications as well. + List payloadRecords = new ArrayList<>(numOfFilesInSegment); // Use a HashSet for the failed files, in order to not remove them in the end. @@ -253,23 +279,30 @@ public class BulkImportServiceImpl implements BulkImportService { // Upload files to S3 and collect payloadRecords. for ( int i=0; i < numOfFilesInSegment; ++i ) { String fileLocation = fileLocationsSegment.get(i); - GenericData.Record record = null; - try { - record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg); - } catch (Exception e) { - String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading the rest of the files for this segment.."; - logger.error(errorMsg + additionalLoggingMsg); - bulkImportReport.addEvent(errorMsg); - for ( int j=i; j < numOfFilesInSegment; ++j ) - failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed". - break; - } - - if ( record != null ) - payloadRecords.add(record); - else { - bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!"); + DocFileData docFileData = fileLocationsWithData.get(fileLocation); + if ( docFileData == null ) { failedFiles.add(fileLocation); + } else { + String alreadyRetrievedFileLocation = hashWithExistingLocationMap.get(docFileData.getHash()); + GenericData.Record record = null; + try { + record = processBulkImportedFile(docFileData, alreadyRetrievedFileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg); + } catch (Exception e) { + String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading the rest of the files for this segment.. " + e.getMessage(); + logger.error(errorMsg + additionalLoggingMsg); + bulkImportReport.addEvent(errorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + for ( int j=i; j < numOfFilesInSegment; ++j ) + failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed". + break; + } + + if ( record != null ) + payloadRecords.add(record); + else { + bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!"); + failedFiles.add(fileLocation); + } } if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment and write it to the file. @@ -294,6 +327,7 @@ public class BulkImportServiceImpl implements BulkImportService { String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment-" + segmentCounter + " !"; logger.warn(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); } // Construct the parquet file, upload it to HDFS and load it in the "payload_bulk_import" table. @@ -362,71 +396,45 @@ public class BulkImportServiceImpl implements BulkImportService { } - private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg) + private GenericData.Record processBulkImportedFile(DocFileData docFileData, String alreadyRetrievedFileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg) throws ConnectException, UnknownHostException { - DocFileData docFileData = new DocFileData(new File(fileLocation), null, null, fileLocation); - docFileData.calculateAndSetHashAndSize(); - - // Check if this file is already found by crawling. Even though we started excluding this datasource from crawling, many full-texts have already been downloaded. - // Also, it may be the case that this file was downloaded by another datasource. - FileLocationData fileLocationData; try { - fileLocationData = new FileLocationData(fileLocation); + fileLocationData = new FileLocationData(docFileData.getLocation()); } catch (RuntimeException re) { logger.error(re.getMessage() + additionalLoggingMsg); return null; } - String fileHash = docFileData.getHash(); - if ( fileHash == null ) - return null; // No check of past found full-text can be made nor the S3-fileName can be created. - - String datasourceId = bulkImportSource.getDatasourceID(); - String datasourcePrefix = bulkImportSource.getDatasourcePrefix(); String fileNameID = fileLocationData.getFileNameID(); - String openAireId = generateOpenaireId(fileNameID, datasourcePrefix, bulkImportSource.getIsAuthoritative()); + String openAireId = generateOpenaireId(fileNameID, bulkImportSource.getDatasourcePrefix(), bulkImportSource.getIsAuthoritative()); if ( openAireId == null ) return null; - String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources. - String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link. - - final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload where `hash` = ? limit 1"; - final int[] hashArgType = new int[] {Types.VARCHAR}; - String alreadyFoundFileLocation = null; - DatabaseConnector.databaseLock.lock(); - try { - alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[]{fileHash}, hashArgType, String.class); - } catch (EmptyResultDataAccessException erdae) { - // No fileLocation is found, it's ok. It will be null by default. - } catch (Exception e) { - logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!" + additionalLoggingMsg, e); - // Continue with bulk-importing the file and uploading it to S3. - } finally { - DatabaseConnector.databaseLock.unlock(); - } + String fileHash = docFileData.getHash(); // It's guaranteed to NOT be null at this point. String s3Url = null; - - if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded. + if ( alreadyRetrievedFileLocation != null ) // If the full-text of this record is already-found and uploaded. { // This full-text was found to already be in the database. // If it has the same datasourceID, then it likely was crawled before from an ID belonging to this datasource. - // If also has the same ID, then the exact same record from that datasource was retrieved previously. + // If also, has the same ID, then the exact same record from that datasource was retrieved previously. // Else, the file was downloaded by another record of this datasource. // ELse if the datasourceID is not the same, then the same file was retrieved from another datasource. // The above analysis is educational, it does not need to take place and is not currently used. - - s3Url = alreadyFoundFileLocation; + s3Url = alreadyRetrievedFileLocation; } else { - s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), datasourceId, fileHash); + s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), bulkImportSource.getDatasourceID(), fileHash); if ( s3Url == null ) return null; } + // TODO - If another url-schema is introduced for other datasources, have a "switch"-statement and perform the right "actualUrl"-creation based on current schema. + String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources. + String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link. + return parquetFileUtils.getPayloadParquetRecord(openAireId, originalUrl, actualUrl, timeMillis, bulkImportSource.getMimeType(), docFileData.getSize(), fileHash, s3Url, provenance, true); // It may return null. } diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index dce131b..ec8de7b 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -121,7 +121,7 @@ public class FileUtils { return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error. } - HashMap hashLocationMap = getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter); + HashMap hashLocationMap = getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter, "assignments"); for ( String fileHash : fileHashes ) { @@ -302,7 +302,7 @@ public class FileUtils { } - public HashMap getHashLocationMap(Set fileHashes, int fileHashesSetSize, long assignmentsBatchCounter) + public HashMap getHashLocationMap(Set fileHashes, int fileHashesSetSize, long batchCounter, String groupType) { // Prepare the "fileHashListString" to be used inside the "getHashLocationsQuery". Create the following string-pattern: // ("HASH_1", "HASH_2", ...) @@ -323,9 +323,9 @@ public class FileUtils { } }); } catch (EmptyResultDataAccessException erdae) { - logger.warn("No previously-found hash-locations where found for assignments_" + assignmentsBatchCounter); + logger.warn("No previously-found hash-locations where found for " + groupType + "_" + batchCounter); } catch (Exception e) { - logger.error("Unexpected error when checking for already-found file-hashes!", e); + logger.error("Unexpected error when checking for already-found file-hashes, for " + groupType + "_" + batchCounter, e); // We will continue with storing the files, we do not want to lose them. } finally { DatabaseConnector.databaseLock.unlock();