package eu.openaire.urls_controller.services;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.controllers.BulkImportController;
import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.models.DocFileData;
import eu.openaire.urls_controller.models.FileLocationData;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.apache.avro.generic.GenericData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class BulkImportServiceImpl implements BulkImportService {
private static final Logger logger = LoggerFactory.getLogger(BulkImportServiceImpl.class);
private FileUtils fileUtils;
private ParquetFileUtils parquetFileUtils;
private JdbcTemplate jdbcTemplate;
* Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service.
* Also, it provides the guarantee that the failed files will not be deleted! A file can "fail" if any of the expected results fail (upload-to-S3, parquet-creation and upload, load-to-db, ect)
* */
public Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish)
String bulkImportReportLocation = bulkImportReport.getReportLocation();
// Write to bulkImport-report file.
bulkImportReport.addEvent("Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + ".");
// Do not write immediately to the file, wait for the following checks.
if ( (ParquetFileUtils.payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((ParquetFileUtils.payloadsSchema = ParquetFileUtils.parseSchema(ParquetFileUtils.payloadSchemaFilePath)) == null ) ) {
String errorMsg = "The payloadsSchema could not be parsed!";
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return false;
List<String> fileLocations = getFileLocationsInsideDir(bulkImportDirName);
if ( fileLocations == null ) {
bulkImportReport.addEvent("Could not retrieve the files for bulk-import!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return false;
int numOfFiles = fileLocations.size();
if ( numOfFiles == 0 ) {
String errorMsg = "No files were found inside the bulkImportDir: " + bulkImportDirName;
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return false;
if ( logger.isTraceEnabled() )
logger.trace("fileLocations:\n" + fileLocations);
String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/".
try {
Files.createDirectories(Paths.get(localParquetDir)); // No-op if it already exists.
} catch (Exception e) {
String errorMsg = "Could not create the local parquet-directory: " + localParquetDir;
logger.error(errorMsg, e);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return false;
// Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before.
String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir;
if ( ! parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick.
String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir;
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return false;
long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records.
List<Callable<Integer>> callableTasksForFileSegments = new ArrayList<>(numOfFiles);
List<List<String>> subLists = Lists.partition(fileLocations, BulkImportController.numOfThreadsPerBulkImportProcedure); // Divide the initial list to "numOfThreadsPerBulkImportProcedure" subLists. The last one may have marginally fewer files.
int subListsSize = subLists.size();
bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel.");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasksForFileSegments.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish);
int numFailedSegments = 0;
int numFailedFiles = 0;
try {
List<Future<Integer>> futures = BulkImportController.bulkImportExecutor.invokeAll(callableTasksForFileSegments); // This waits for all tasks to finish.
int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i ) {
try {
numFailedFiles += futures.get(i).get();
if ( numFailedFiles == subLists.get(i).size() ) { // Get and see if it was successfully or not, or if an exception is thrown..
// In case all the files failed to be bulk-imported, then we will detect it in the "numSuccessfulSegments"-check later.
// The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory.
} catch (ExecutionException ee) {
String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + "\n" + stackTraceMessage);
} catch (CancellationException ce) {
logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage());
} catch (Exception e) {
String errorMsg = "An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName;
logger.error(errorMsg, e);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return false;
} finally {
logger.debug("Deleting local parquet directory: " + localParquetDir);
fileUtils.deleteDirectory(new File(localParquetDir)); // It may not exist at all, if none of the parquet files were created.
// Check the results.
String msg;
if ( numFailedFiles == numOfFiles ) {
String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!";
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return false;
} else if ( numFailedFiles > 0 ) { // Some failed, but not all.
msg = numFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName;
} else {
msg = "All " + numOfFiles + " files, from bulkImportDir: " + bulkImportDirName + " were bulkImported successfully.";
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations.
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null);
if ( mergeErrorMsg != null ) {
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return false;
String successMsg = "Finished the bulk-import procedure for " + provenance + " and bulkImportDir: " + bulkImportDirName;
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// The report-file will be overwritten every now and then, instead of appended, since we want to add an update new JSON report-object each time.
// Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time,
// after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours).
return true;
private int processBulkImportedFilesSegment(BulkImportReport bulkImportReport, int segmentCounter, List<String> fileLocationsSegment, String bulkImportDirName, String localParquetDir, String currentBulkImportHdfsDir,
String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish)
// Inside this thread, process a segment of the files.
String bulkImportReportLocation = bulkImportReport.getReportLocation();
int numOfFilesInSegment = fileLocationsSegment.size();
String msg = "Going to import " + numOfFilesInSegment + " files for segment-" + segmentCounter + " , of bulkImport procedure: " + provenance + " | dir: " + bulkImportDirName;
List<GenericData.Record> payloadRecords = new ArrayList<>(numOfFilesInSegment);
// Use a HashSet for the failed files, in order to not remove them in the end.
HashSet<String> failedFiles = new HashSet<>();
int counter = 0;
// Upload files to S3 and collect payloadRecords.
for ( String fileLocation: fileLocationsSegment ) {
GenericData.Record record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis);
if ( record != null )
else {
bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!");
if ( ((++counter) % 100) == 0 ) { // Every 100 files, report the status.
bulkImportReport.addEvent("Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files.");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
int numOfPayloadRecords = payloadRecords.size();
if ( numOfPayloadRecords == 0 ) {
String errorMsg = "No payload-records were generated for any of the files inside the bulkImportDir: " + bulkImportDirName;
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
} else if ( numOfPayloadRecords != numOfFilesInSegment ) {
// Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early.
String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be imported, for segment-" + segmentCounter + " !";
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// Construct the parquet file, upload it to HDFS and load them it in the "payload_bulk_import" table.
String parquetFileName = "payloads_" + segmentCounter + ".parquet";
String fullLocalParquetFilePath = localParquetDir + parquetFileName;
if ( logger.isTraceEnabled() )
logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG!
if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) {
bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: " + parquetFileName + " !");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
if ( logger.isTraceEnabled() )
logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS."); // DEBUG!
// Upload and insert the data to the "payload" Impala table. (no database-locking is required)
String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir);
if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller.
bulkImportReport.addEvent("Could not upload the parquet-file " + parquetFileName + " to HDFS!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
if ( logger.isTraceEnabled() )
logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"."); // DEBUG!
if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) {
bulkImportReport.addEvent("Could not load the payload-records to the database!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + " , for segment-" + segmentCounter + ".";
if ( shouldDeleteFilesOnFinish ) {
segmentSuccessMsg = "As the user requested, the successfully imported files of " + provenance + " procedure, of bulk-import segment-" + segmentCounter + ", from directory " + bulkImportDirName + ", will be deleted.";
// Delete all files except the ones in the "failedHashSet"
for ( String fileLocation : fileLocationsSegment ) {
if ( !failedFiles.contains(fileLocation) )
if ( !fileUtils.deleteFile(fileLocation) )
bulkImportReport.addEvent("The file " + fileLocation + " could not be deleted! Please make sure you have provided the WRITE-permission.");
return (numOfFilesInSegment - numOfPayloadRecords); // Return the numOfFailedFiles.
private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis)
File fullTextFile = new File(fileLocation);
DocFileData docFileData = new DocFileData(fullTextFile, null, null, null);
// Check if this file is already found by crawling. Even though we started excluding this datasource from crawling, many full-texts have already been downloaded.
// Also, it may be the case that this file was downloaded by another datasource.
FileLocationData fileLocationData;
try {
fileLocationData = new FileLocationData(fileLocation);
} catch (RuntimeException re) {
return null;
String fileHash = docFileData.getHash();
if ( fileHash == null )
return null; // No check of past found full-text can be made nor the S3-fileName can be created.
String datasourceId = bulkImportSource.getDatasourceID();
String datasourcePrefix = bulkImportSource.getDatasourcePrefix();
String fileNameID = fileLocationData.getFileNameID();
String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources.
String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link.
final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1";
final int[] hashArgType = new int[] {Types.VARCHAR};
String alreadyFoundFileLocation = null;
try {
alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[]{fileHash}, hashArgType, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!\n", e);
// Continue with bulk-importing the file and uploading it to S3.
} finally {
String idMd5hash = getMD5hash(fileNameID.toLowerCase());
if ( idMd5hash == null )
return null;
// openaire id = <datasourcePrefix> + "::" + <md5(lowercase(arxivId))>
String openAireId = (datasourcePrefix + "::" + idMd5hash);
String s3Url = null;
if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded.
// This full-text was found to already be in the database.
// If it has the same datasourceID, then it likely was crawled before from an ID belonging to this datasource.
// If also has the same ID, then the exact same record from that datasource was retrieved previously.
// Else, the file was downloaded by another record of this datasource.
// ELse if the datasourceID is not the same, then the same file was retrieved from another datasource.
// The above analysis is educational, it does not need to take place and is not currently used.
s3Url = alreadyFoundFileLocation;
} else {
try {
s3Url = fileUtils.constructFileNameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), datasourceId, fileHash); // This throws Exception, in case the uploading failed.
if ( s3Url == null )
return null; // In case the 'datasourceID' or 'hash' is null. Which should never happen here, since both of them are checked before the execution reaches here.
} catch (Exception e) {
logger.error("Could not upload the file '" + fileLocationData.getFileName() + "' to the S3 ObjectStore!", e);
return null;
GenericData.Record record = new GenericData.Record(ParquetFileUtils.payloadsSchema);
record.put("id", openAireId);
record.put("original_url", originalUrl);
record.put("actual_url", actualUrl);
record.put("date", timeMillis);
record.put("mimetype", bulkImportSource.getMimeType());
Long size = docFileData.getSize();
record.put("size", ((size != null) ? String.valueOf(size) : null));
record.put("hash", fileHash); // This is already checked and will not be null here.
record.put("location", s3Url);
record.put("provenance", provenance);
return record;
public List<String> getFileLocationsInsideDir(String directory)
List<String> fileLocations = null;
try ( Stream<Path> walkStream = Files.find(Paths.get(directory), Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) )
// In case we ever include other type-of-Files inside the same directory, we need to add this filter: "&& !filePath.toString().endsWith("name.ext")"
fileLocations = walkStream.map(Path::toString).collect(Collectors.toList());
} catch (Exception e) {
String errorMsg = "Could not retrieve the files from directory: '" + directory + "'!";
logger.error(errorMsg, e);
return null;
return fileLocations;
public String getMD5hash(String string)
String md5 = null;
try {
MessageDigest md5MD = MessageDigest.getInstance("MD5"); // New instance for any new request. Otherwise, we need to synchronize the use of that object among multiple threads.
md5 = DatatypeConverter.printHexBinary(md5MD.digest()).toLowerCase();
} catch (Exception e) {
logger.error("Error when getting the MD5-hash for: " + string, e);
return null;
return md5;