package eu.openaire.urls_controller.services; import com.google.common.collect.Lists; import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.controllers.BulkImportController; import eu.openaire.urls_controller.models.BulkImportReport; import eu.openaire.urls_controller.models.DocFileData; import eu.openaire.urls_controller.models.FileLocationData; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.GenericUtils; import eu.openaire.urls_controller.util.ParquetFileUtils; import org.apache.avro.generic.GenericData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import javax.xml.bind.DatatypeConverter; import java.io.File; import java.net.ConnectException; import java.net.UnknownHostException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.security.MessageDigest; import java.sql.Types; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.Stream; @Service public class BulkImportServiceImpl implements BulkImportService { private static final Logger logger = LoggerFactory.getLogger(BulkImportServiceImpl.class); @Autowired private FileUtils fileUtils; @Autowired private ParquetFileUtils parquetFileUtils; @Autowired private JdbcTemplate jdbcTemplate; /** * Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service. * Also, it provides the guarantee that the failed files will not be deleted! A file can "fail" if any of the expected results fail (upload-to-S3, parquet-creation and upload, load-to-db, ect) * */ public Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish) { String bulkImportReportLocation = bulkImportReport.getReportLocation(); // Write to bulkImport-report file. String msg = "Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + "."; logger.info(msg); bulkImportReport.addEvent(msg); // Do not write immediately to the file, wait for the following checks. String additionalLoggingMsg = " | provenance: \"" + provenance + "\" | bulkImportDir: \"" + bulkImportDirName + "\""; if ( (ParquetFileUtils.payloadsSchema == null) // Parse the schema if it's not already parsed. && ((ParquetFileUtils.payloadsSchema = ParquetFileUtils.parseSchema(ParquetFileUtils.payloadSchemaFilePath)) == null ) ) { String errorMsg = "The payloadsSchema could not be parsed!"; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } List fileLocations = getFileLocationsInsideDir(bulkImportDirName); // the error-msg has already been written if ( fileLocations == null ) { String errorMsg = "Could not retrieve the files for bulk-import!"; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } int numOfFiles = fileLocations.size(); if ( numOfFiles == 0 ) { String errorMsg = "No files were found inside the bulkImportDir: " + bulkImportDirName; logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } if ( logger.isTraceEnabled() ) logger.trace("fileLocations: " + additionalLoggingMsg + GenericUtils.endOfLine + fileLocations); String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/". try { Files.createDirectories(Paths.get(localParquetDir)); // No-op if it already exists. } catch (Exception e) { String errorMsg = "Could not create the local parquet-directory: " + localParquetDir; logger.error(errorMsg + additionalLoggingMsg, e); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } // Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before. String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir; if ( ! parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick. String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records. List> callableTasksForFileSegments = new ArrayList<>(numOfFiles); int sizeOfEachSegment = (numOfFiles / BulkImportController.numOfThreadsForBulkImportProcedures); List> subLists = Lists.partition(fileLocations, sizeOfEachSegment); // Divide the initial list to "numOfThreadsForBulkImportProcedures" subLists. The last one may have marginally fewer files. int subListsSize = subLists.size(); msg = "Going to bulk-import the " + numOfFiles + " files in parallel, after dividing them in " + subListsSize + " segments."; logger.debug(msg + additionalLoggingMsg); bulkImportReport.addEvent(msg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); for ( int i = 0; i < subListsSize; ++i ) { int finalI = i; callableTasksForFileSegments.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish, additionalLoggingMsg); }); } int numFailedSegments = 0; int numFailedFilesForSegment = 0; int numAllFailedFiles = 0; try { List> futures = BulkImportController.bulkImportExecutor.invokeAll(callableTasksForFileSegments); // This waits for all tasks to finish. int sizeOfFutures = futures.size(); // This is the same as the "subListsSize". for ( int i = 0; i < sizeOfFutures; ++i ) { // For each segment.. try { numFailedFilesForSegment = futures.get(i).get(); numAllFailedFiles += numFailedFilesForSegment; if ( numFailedFilesForSegment == subLists.get(i).size() ) numFailedSegments++; // In case all the files failed to be bulk-imported, then we will detect it in the "numSuccessfulSegments"-check later. // The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory. } catch (ExecutionException ee) { // These can be serious errors like an "out of memory exception" (Java HEAP). numFailedSegments ++; logger.error(GenericUtils.getSelectedStackTraceForCausedException(ee, "Task_" + i + " failed with: ", additionalLoggingMsg, 15)); } catch (CancellationException ce) { numFailedSegments ++; logger.error("Task_" + i + " was cancelled: " + ce.getMessage() + additionalLoggingMsg); } catch (InterruptedException ie) { numFailedSegments ++; logger.error("Task_" + i + " was interrupted: " + ie.getMessage()); } catch (IndexOutOfBoundsException ioobe) { logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage() + additionalLoggingMsg); } } } catch (Exception e) { String errorMsg = "An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName; logger.error(errorMsg, e); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } finally { logger.debug("Deleting local parquet directory: " + localParquetDir); fileUtils.deleteDirectory(new File(localParquetDir)); // It may not exist at all, if none of the parquet files were created. } // Check the results. if ( numAllFailedFiles == numOfFiles ) { String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!"; logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } else if ( numAllFailedFiles > 0 ) { // Some failed, but not all. msg = numAllFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName; logger.warn(msg); } else { msg = "All " + numOfFiles + " files, from bulkImportDir: " + bulkImportDirName + " were bulkImported successfully."; logger.info(msg); } bulkImportReport.addEvent(msg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); // Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations. DatabaseConnector.databaseLock.lock(); String mergeErrorMsg = parquetFileUtils.mergeParquetFilesOfTable("payload_bulk_import", "", null); // msg is already logged DatabaseConnector.databaseLock.unlock(); if ( mergeErrorMsg != null ) { // the message in already logged bulkImportReport.addEvent(mergeErrorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } String successMsg = "Finished the bulk-import procedure for " + provenance + " and bulkImportDir: " + bulkImportDirName; logger.info(successMsg); bulkImportReport.addEvent(successMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); // The report-file will be overwritten every now and then, instead of appended, since we want to add an update new JSON report-object each time. // Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time, // after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours). BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return true; } private int processBulkImportedFilesSegment(BulkImportReport bulkImportReport, int segmentCounter, List fileLocationsSegment, String bulkImportDirName, String localParquetDir, String currentBulkImportHdfsDir, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish, String additionalLoggingMsg) { // Inside this thread, process a segment of the files. String bulkImportReportLocation = bulkImportReport.getReportLocation(); int numOfFilesInSegment = fileLocationsSegment.size(); String msg = "Going to import " + numOfFilesInSegment + " files, for segment-" + segmentCounter; logger.debug(msg + additionalLoggingMsg); bulkImportReport.addEvent(msg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); List payloadRecords = new ArrayList<>(numOfFilesInSegment); // Use a HashSet for the failed files, in order to not remove them in the end. HashSet failedFiles = new HashSet<>(); int counter = 0; // Upload files to S3 and collect payloadRecords. for ( int i=0; i < numOfFilesInSegment; ++i ) { String fileLocation = fileLocationsSegment.get(i); GenericData.Record record = null; try { record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg); } catch (Exception e) { String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading the rest of the files for this segment.."; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); for ( int j=i; j < numOfFilesInSegment; ++j ) failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed". break; } if ( record != null ) payloadRecords.add(record); else { bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!"); failedFiles.add(fileLocation); } if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment and right it to the file. msg = "Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files."; if ( logger.isTraceEnabled() ) logger.trace(msg + additionalLoggingMsg); bulkImportReport.addEvent(msg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); } } // End of processing files for this segment. int numOfPayloadRecords = payloadRecords.size(); if ( numOfPayloadRecords == 0 ) { String errorMsg = "No payload-records were generated for any of the files, of segment-" + segmentCounter + ", inside the bulkImportDir: " + bulkImportDirName; logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; } else if ( numOfPayloadRecords != numOfFilesInSegment ) { // Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early. String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment-" + segmentCounter + " !"; logger.warn(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); } // Construct the parquet file, upload it to HDFS and load it in the "payload_bulk_import" table. String parquetFileName = "payloads_" + segmentCounter + ".parquet"; String fullLocalParquetFilePath = localParquetDir + parquetFileName; if ( logger.isTraceEnabled() ) logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath + additionalLoggingMsg); // DEBUG! if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { // Any errorMsg is already logged inside. String errorMsg = "Could not write the payload-records for segment-" + segmentCounter + " to the parquet-file: " + parquetFileName + " !"; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; // All files of this segment have failed. } if ( logger.isTraceEnabled() ) logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS." + additionalLoggingMsg); // DEBUG! // Upload and insert the data to the "payload" Impala table. (no database-locking is required) String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); // The returned message is already logged inside. if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller. errorMsg = "Could not upload the parquet-file " + parquetFileName + " to HDFS!"; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; // All files of this segment have failed. } if ( logger.isTraceEnabled() ) logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"." + additionalLoggingMsg); // DEBUG! DatabaseConnector.databaseLock.lock(); boolean parquetDataLoaded = parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import"); DatabaseConnector.databaseLock.unlock(); if ( !parquetDataLoaded ) { errorMsg = "Could not load the payload-records to the database, for segment-" + segmentCounter + "!"; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. return numOfFilesInSegment; // All files of this segment have failed. } String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + ", for segment-" + segmentCounter + "."; logger.info(segmentSuccessMsg + additionalLoggingMsg); bulkImportReport.addEvent(segmentSuccessMsg); if ( shouldDeleteFilesOnFinish ) { segmentSuccessMsg = "As the user requested, the successfully imported files of " + provenance + " procedure, of bulk-import segment-" + segmentCounter + ", from directory " + bulkImportDirName + ", will be deleted."; logger.info(segmentSuccessMsg); bulkImportReport.addEvent(segmentSuccessMsg); // Delete all files except the ones in the "failedHashSet". for ( String fileLocation : fileLocationsSegment ) { if ( !failedFiles.contains(fileLocation) ) if ( !fileUtils.deleteFile(fileLocation) ) // The "error-log-message" is shown inside. bulkImportReport.addEvent("The file " + fileLocation + " could not be deleted! Please make sure you have provided the WRITE-permission."); } } fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); return (numOfFilesInSegment - numOfPayloadRecords); // Return the numOfFailedFiles. } private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg) throws ConnectException, UnknownHostException { File fullTextFile = new File(fileLocation); DocFileData docFileData = new DocFileData(fullTextFile, null, null, null); docFileData.calculateAndSetHashAndSize(); // Check if this file is already found by crawling. Even though we started excluding this datasource from crawling, many full-texts have already been downloaded. // Also, it may be the case that this file was downloaded by another datasource. FileLocationData fileLocationData; try { fileLocationData = new FileLocationData(fileLocation); } catch (RuntimeException re) { logger.error(re.getMessage() + additionalLoggingMsg); return null; } String fileHash = docFileData.getHash(); if ( fileHash == null ) return null; // No check of past found full-text can be made nor the S3-fileName can be created. String datasourceId = bulkImportSource.getDatasourceID(); String datasourcePrefix = bulkImportSource.getDatasourcePrefix(); String fileNameID = fileLocationData.getFileNameID(); String openAireId = generateOpenaireId(fileNameID, datasourcePrefix, bulkImportSource.getIsAuthoritative()); if ( openAireId == null ) return null; String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources. String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link. final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload where `hash` = ? limit 1"; final int[] hashArgType = new int[] {Types.VARCHAR}; String alreadyFoundFileLocation = null; DatabaseConnector.databaseLock.lock(); try { alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[]{fileHash}, hashArgType, String.class); } catch (EmptyResultDataAccessException erdae) { // No fileLocation is found, it's ok. It will be null by default. } catch (Exception e) { logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!" + additionalLoggingMsg, e); // Continue with bulk-importing the file and uploading it to S3. } finally { DatabaseConnector.databaseLock.unlock(); } String s3Url = null; if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded. { // This full-text was found to already be in the database. // If it has the same datasourceID, then it likely was crawled before from an ID belonging to this datasource. // If also has the same ID, then the exact same record from that datasource was retrieved previously. // Else, the file was downloaded by another record of this datasource. // ELse if the datasourceID is not the same, then the same file was retrieved from another datasource. // The above analysis is educational, it does not need to take place and is not currently used. s3Url = alreadyFoundFileLocation; } else s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), fileNameID, fileLocationData.getDotFileExtension(), datasourceId, fileHash); return parquetFileUtils.getPayloadParquetRecord(openAireId, originalUrl, actualUrl, timeMillis, bulkImportSource.getMimeType(), docFileData.getSize(), fileHash, s3Url, provenance, true); // It may return null. } public List getFileLocationsInsideDir(String directory) { List fileLocations = null; try ( Stream walkStream = Files.find(Paths.get(directory), Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) ) // In case we ever include other type-of-Files inside the same directory, we need to add this filter: "&& !filePath.toString().endsWith("name.ext")" { fileLocations = walkStream.map(Path::toString).collect(Collectors.toList()); } catch (Exception e) { String errorMsg = "Could not retrieve the files from directory: '" + directory + "'!"; logger.error(errorMsg, e); return null; } return fileLocations; } public String getMD5Hash(String string) { String md5 = null; try { MessageDigest md5MD = MessageDigest.getInstance("MD5"); // New instance for any new request. Otherwise, we need to synchronize the use of that object among multiple threads. md5MD.update(string.getBytes()); md5 = DatatypeConverter.printHexBinary(md5MD.digest()).toLowerCase(); } catch (Exception e) { logger.error("Error when getting the MD5-hash for: " + string, e); return null; } return md5; } public String generateOpenaireId(String id, String datasourcePrefix, boolean isAuthoritative) { // If the "provenance" relates to an "authoritative" source, then its id has to be lowercase, before the md5() is applied to it. // general_openaire_id = + "::" + // authoritative_openaire_id = + "::" + if ( isAuthoritative ) id = id.toLowerCase(); String idMd5Hash = getMD5Hash(id); if ( idMd5Hash == null ) return null; return (datasourcePrefix + "::" + idMd5Hash); } }