UrlsController/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java

479 lines
27 KiB
Java

package eu.openaire.urls_controller.services;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.BulkImportController;
import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.models.DocFileData;
import eu.openaire.urls_controller.models.FileLocationData;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.apache.avro.generic.GenericData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.net.ConnectException;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Service
public class BulkImportServiceImpl implements BulkImportService {
private static final Logger logger = LoggerFactory.getLogger(BulkImportServiceImpl.class);
@Autowired
private FileUtils fileUtils;
@Autowired
private ParquetFileUtils parquetFileUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service.
* Also, it provides the guarantee that the failed files will not be deleted! A file can "fail" if any of the expected results fail (upload-to-S3, parquet-creation and upload, load-to-db, ect)
* */
public Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish)
{
String bulkImportReportLocation = bulkImportReport.getReportLocation();
// Write to bulkImport-report file.
String msg = "Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + ".";
logger.info(msg);
bulkImportReport.addEvent(msg);
// Do not write immediately to the file, wait for the following checks.
String additionalLoggingMsg = " | provenance: \"" + provenance + "\" | bulkImportDir: \"" + bulkImportDirName + "\"";
if ( (ParquetFileUtils.payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((ParquetFileUtils.payloadsSchema = ParquetFileUtils.parseSchema(ParquetFileUtils.payloadSchemaFilePath)) == null ) ) {
String errorMsg = "The payloadsSchema could not be parsed!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
List<String> fileLocations = getFileLocationsInsideDir(bulkImportDirName); // the error-msg has already been written
if ( fileLocations == null ) {
String errorMsg = "Could not retrieve the files for bulk-import!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
int numOfFiles = fileLocations.size();
if ( numOfFiles == 0 ) {
String errorMsg = "No files were found inside the bulkImportDir: " + bulkImportDirName;
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
if ( logger.isTraceEnabled() )
logger.trace("fileLocations: " + additionalLoggingMsg + GenericUtils.endOfLine + fileLocations);
String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/".
try {
Files.createDirectories(Paths.get(localParquetDir)); // No-op if it already exists.
} catch (Exception e) {
String errorMsg = "Could not create the local parquet-directory: " + localParquetDir;
logger.error(errorMsg + additionalLoggingMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
// Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before.
String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir;
if ( ! parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick.
String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir;
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records.
List<Callable<Integer>> callableTasksForFileSegments = new ArrayList<>(numOfFiles);
int sizeOfEachSegment = (numOfFiles / BulkImportController.numOfThreadsForBulkImportProcedures);
List<List<String>> subLists = Lists.partition(fileLocations, sizeOfEachSegment); // Divide the initial list to "numOfThreadsForBulkImportProcedures" subLists. The last one may have marginally fewer files.
int subListsSize = subLists.size();
msg = "Going to bulk-import the " + numOfFiles + " files in parallel, after dividing them in " + subListsSize + " segments.";
logger.debug(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasksForFileSegments.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish, additionalLoggingMsg);
});
}
int numFailedSegments = 0;
int numFailedFilesForSegment = 0;
int numAllFailedFiles = 0;
try {
List<Future<Integer>> futures = BulkImportController.bulkImportExecutor.invokeAll(callableTasksForFileSegments); // This waits for all tasks to finish.
int sizeOfFutures = futures.size(); // This is the same as the "subListsSize".
for ( int i = 0; i < sizeOfFutures; ++i )
{ // For each segment..
try {
numFailedFilesForSegment = futures.get(i).get();
numAllFailedFiles += numFailedFilesForSegment;
if ( numFailedFilesForSegment == subLists.get(i).size() )
numFailedSegments++;
// In case all the files failed to be bulk-imported, then we will detect it in the "numSuccessfulSegments"-check later.
// The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory.
} catch (ExecutionException ee) { // These can be serious errors like an "out of memory exception" (Java HEAP).
numFailedSegments ++;
logger.error(GenericUtils.getSelectedStackTraceForCausedException(ee, "Task_" + i + " failed with: ", additionalLoggingMsg, 15));
} catch (CancellationException ce) {
numFailedSegments ++;
logger.error("Task_" + i + " was cancelled: " + ce.getMessage() + additionalLoggingMsg);
} catch (InterruptedException ie) {
numFailedSegments ++;
logger.error("Task_" + i + " was interrupted: " + ie.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage() + additionalLoggingMsg);
}
}
} catch (Exception e) {
String errorMsg = "An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName;
logger.error(errorMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
} finally {
logger.debug("Deleting local parquet directory: " + localParquetDir);
fileUtils.deleteDirectory(new File(localParquetDir)); // It may not exist at all, if none of the parquet files were created.
}
// Check the results.
if ( numAllFailedFiles == numOfFiles ) {
String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!";
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
} else if ( numAllFailedFiles > 0 ) { // Some failed, but not all.
msg = numAllFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName;
logger.warn(msg);
} else {
msg = "All " + numOfFiles + " files, from bulkImportDir: " + bulkImportDirName + " were bulkImported successfully.";
logger.info(msg);
}
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations.
DatabaseConnector.databaseLock.lock();
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null); // msg is already logged
if ( mergeErrorMsg != null ) { // the message in already logged
DatabaseConnector.databaseLock.unlock();
bulkImportReport.addEvent(mergeErrorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
DatabaseConnector.databaseLock.unlock();
String successMsg = "Finished the bulk-import procedure for " + provenance + " and bulkImportDir: " + bulkImportDirName;
logger.info(successMsg);
bulkImportReport.addEvent(successMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// The report-file will be overwritten every now and then, instead of appended, since we want to add an update new JSON report-object each time.
// Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time,
// after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours).
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return true;
}
private int processBulkImportedFilesSegment(BulkImportReport bulkImportReport, int segmentCounter, List<String> fileLocationsSegment, String bulkImportDirName, String localParquetDir, String currentBulkImportHdfsDir,
String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish, String additionalLoggingMsg)
{
// Inside this thread, process a segment of the files.
String bulkImportReportLocation = bulkImportReport.getReportLocation();
int numOfFilesInSegment = fileLocationsSegment.size();
String msg = "Going to import " + numOfFilesInSegment + " files, for segment-" + segmentCounter;
logger.debug(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
List<GenericData.Record> payloadRecords = new ArrayList<>(numOfFilesInSegment);
// Use a HashSet for the failed files, in order to not remove them in the end.
HashSet<String> failedFiles = new HashSet<>();
int counter = 0;
// Upload files to S3 and collect payloadRecords.
for ( int i=0; i < numOfFilesInSegment; ++i ) {
String fileLocation = fileLocationsSegment.get(i);
GenericData.Record record = null;
try {
record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg);
} catch (Exception e) {
String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading any file for this segment..";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
for ( int j=i; j < numOfFilesInSegment; ++j )
failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed".
break;
}
if ( record != null )
payloadRecords.add(record);
else {
bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!");
failedFiles.add(fileLocation);
}
if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment.
msg = "Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files.";
if ( logger.isTraceEnabled() )
logger.trace(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
}
int numOfPayloadRecords = payloadRecords.size();
if ( numOfPayloadRecords == 0 ) {
String errorMsg = "No payload-records were generated for any of the files, of segment-" + segmentCounter + ", inside the bulkImportDir: " + bulkImportDirName;
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
} else if ( numOfPayloadRecords != numOfFilesInSegment ) {
// Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early.
String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment-" + segmentCounter + " !";
logger.warn(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
// Construct the parquet file, upload it to HDFS and load it in the "payload_bulk_import" table.
String parquetFileName = "payloads_" + segmentCounter + ".parquet";
String fullLocalParquetFilePath = localParquetDir + parquetFileName;
if ( logger.isTraceEnabled() )
logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath + additionalLoggingMsg); // DEBUG!
if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { // Any errorMsg is already logged inside.
String errorMsg = "Could not write the payload-records for segment-" + segmentCounter + " to the parquet-file: " + parquetFileName + " !";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment; // All files of this segment have failed.
}
if ( logger.isTraceEnabled() )
logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS." + additionalLoggingMsg); // DEBUG!
// Upload and insert the data to the "payload" Impala table. (no database-locking is required)
String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); // The returned message is already logged inside.
if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller.
errorMsg = "Could not upload the parquet-file " + parquetFileName + " to HDFS!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment; // All files of this segment have failed.
}
if ( logger.isTraceEnabled() )
logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"." + additionalLoggingMsg); // DEBUG!
DatabaseConnector.databaseLock.lock();
if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) {
DatabaseConnector.databaseLock.unlock();
errorMsg = "Could not load the payload-records to the database, for segment-" + segmentCounter + "!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment; // All files of this segment have failed.
}
DatabaseConnector.databaseLock.unlock();
String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + ", for segment-" + segmentCounter + ".";
logger.info(segmentSuccessMsg + additionalLoggingMsg);
bulkImportReport.addEvent(segmentSuccessMsg);
if ( shouldDeleteFilesOnFinish ) {
segmentSuccessMsg = "As the user requested, the successfully imported files of " + provenance + " procedure, of bulk-import segment-" + segmentCounter + ", from directory " + bulkImportDirName + ", will be deleted.";
logger.info(segmentSuccessMsg);
bulkImportReport.addEvent(segmentSuccessMsg);
// Delete all files except the ones in the "failedHashSet".
for ( String fileLocation : fileLocationsSegment ) {
if ( !failedFiles.contains(fileLocation) )
if ( !fileUtils.deleteFile(fileLocation) )
bulkImportReport.addEvent("The file " + fileLocation + " could not be deleted! Please make sure you have provided the WRITE-permission.");
}
}
return (numOfFilesInSegment - numOfPayloadRecords); // Return the numOfFailedFiles.
}
private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg)
throws ConnectException, UnknownHostException
{
File fullTextFile = new File(fileLocation);
DocFileData docFileData = new DocFileData(fullTextFile, null, null, null);
docFileData.calculateAndSetHashAndSize();
// Check if this file is already found by crawling. Even though we started excluding this datasource from crawling, many full-texts have already been downloaded.
// Also, it may be the case that this file was downloaded by another datasource.
FileLocationData fileLocationData;
try {
fileLocationData = new FileLocationData(fileLocation);
} catch (RuntimeException re) {
logger.error(re.getMessage() + additionalLoggingMsg);
return null;
}
String fileHash = docFileData.getHash();
if ( fileHash == null )
return null; // No check of past found full-text can be made nor the S3-fileName can be created.
String datasourceId = bulkImportSource.getDatasourceID();
String datasourcePrefix = bulkImportSource.getDatasourcePrefix();
String fileNameID = fileLocationData.getFileNameID();
String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources.
String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link.
final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload where `hash` = ? limit 1";
final int[] hashArgType = new int[] {Types.VARCHAR};
String alreadyFoundFileLocation = null;
DatabaseConnector.databaseLock.lock();
try {
alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[]{fileHash}, hashArgType, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!" + additionalLoggingMsg, e);
// Continue with bulk-importing the file and uploading it to S3.
} finally {
DatabaseConnector.databaseLock.unlock();
}
String openAireId = generateOpenaireId(fileNameID, datasourcePrefix, bulkImportSource.getIsAuthoritative());
if ( openAireId == null )
return null;
String s3Url = null;
if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded.
{
// This full-text was found to already be in the database.
// If it has the same datasourceID, then it likely was crawled before from an ID belonging to this datasource.
// If also has the same ID, then the exact same record from that datasource was retrieved previously.
// Else, the file was downloaded by another record of this datasource.
// ELse if the datasourceID is not the same, then the same file was retrieved from another datasource.
// The above analysis is educational, it does not need to take place and is not currently used.
s3Url = alreadyFoundFileLocation;
} else
s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), fileNameID, fileLocationData.getDotFileExtension(), datasourceId, fileHash);
return parquetFileUtils.getPayloadParquetRecord(openAireId, originalUrl, actualUrl, timeMillis, bulkImportSource.getMimeType(),
docFileData.getSize(), fileHash, s3Url, provenance, true); // It may return null.
}
public List<String> getFileLocationsInsideDir(String directory)
{
List<String> fileLocations = null;
try ( Stream<Path> walkStream = Files.find(Paths.get(directory), Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) )
// In case we ever include other type-of-Files inside the same directory, we need to add this filter: "&& !filePath.toString().endsWith("name.ext")"
{
fileLocations = walkStream.map(Path::toString).collect(Collectors.toList());
} catch (Exception e) {
String errorMsg = "Could not retrieve the files from directory: '" + directory + "'!";
logger.error(errorMsg, e);
return null;
}
return fileLocations;
}
public String getMD5Hash(String string)
{
String md5 = null;
try {
MessageDigest md5MD = MessageDigest.getInstance("MD5"); // New instance for any new request. Otherwise, we need to synchronize the use of that object among multiple threads.
md5MD.update(string.getBytes());
md5 = DatatypeConverter.printHexBinary(md5MD.digest()).toLowerCase();
} catch (Exception e) {
logger.error("Error when getting the MD5-hash for: " + string, e);
return null;
}
return md5;
}
public String generateOpenaireId(String id, String datasourcePrefix, boolean isAuthoritative)
{
// If the "provenance" relates to an "authoritative" source, then its id has to be lowercase, before the md5() is applied to it.
// general_openaire_id = <datasourcePrefix> + "::" + <md5(ID)>
// authoritative_openaire_id = <datasourcePrefix> + "::" + <md5(lowercase(ID))>
if ( isAuthoritative )
id = id.toLowerCase();
String idMd5Hash = getMD5Hash(id);
if ( idMd5Hash == null )
return null;
return (datasourcePrefix + "::" + idMd5Hash);
}
}