UrlsController/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java

497 lines
30 KiB
Java

package eu.openaire.urls_controller.services;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.BulkImportController;
import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.models.DocFileData;
import eu.openaire.urls_controller.models.FileLocationData;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.apache.avro.generic.GenericData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.net.ConnectException;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Service
public class BulkImportServiceImpl implements BulkImportService {
private static final Logger logger = LoggerFactory.getLogger(BulkImportServiceImpl.class);
@Autowired
private FileUtils fileUtils;
@Autowired
private ParquetFileUtils parquetFileUtils;
/**
* Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service.
* Also, it provides the guarantee that the failed files will not be deleted! A file can "fail" if any of the expected results fail (upload-to-S3, parquet-creation and upload, load-to-db, ect)
* */
public Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish)
{
String bulkImportReportLocation = bulkImportReport.getReportLocation();
// Write to bulkImport-report file.
String msg = "Initializing the bulkImport " + provenance + " procedure with bulkImportDir: " + bulkImportDirName + ".";
logger.info(msg);
bulkImportReport.addEvent(msg);
// Do not write immediately to the file, wait for the following checks.
String additionalLoggingMsg = " | provenance: \"" + provenance + "\" | bulkImportDir: \"" + bulkImportDirName + "\"";
if ( (ParquetFileUtils.payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((ParquetFileUtils.payloadsSchema = ParquetFileUtils.parseSchema(ParquetFileUtils.payloadSchemaFilePath)) == null ) ) {
String errorMsg = "The payloadsSchema could not be parsed!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
List<String> fileLocations = getFileLocationsInsideDir(bulkImportDirName); // the error-msg has already been written
if ( fileLocations == null ) {
String errorMsg = "Could not retrieve the files for bulk-import!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
int numOfFiles = fileLocations.size();
if ( numOfFiles == 0 ) {
String errorMsg = "No files were found inside the bulkImportDir: " + bulkImportDirName;
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
if ( logger.isTraceEnabled() )
logger.trace("fileLocations: " + additionalLoggingMsg + GenericUtils.endOfLine + fileLocations);
String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/".
try {
Files.createDirectories(Paths.get(localParquetDir)); // No-op if it already exists.
} catch (Exception e) {
String errorMsg = "Could not create the local parquet-directory: " + localParquetDir;
logger.error(errorMsg + additionalLoggingMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
// Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before.
String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir;
if ( ! parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick.
String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir;
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records.
List<Callable<Integer>> callableTasksForFileSegments = new ArrayList<>(numOfFiles);
int sizeOfEachSegment = (numOfFiles / BulkImportController.numOfThreadsForBulkImportProcedures);
if ( sizeOfEachSegment == 0 ) // In case the numFiles are fewer than the numOfThreads to handle them.
sizeOfEachSegment = 1;
List<List<String>> subLists = Lists.partition(fileLocations, sizeOfEachSegment); // Divide the initial list to "numOfThreadsForBulkImportProcedures" subLists. The last one may have marginally fewer files.
int subListsSize = subLists.size();
msg = "Going to bulk-import the " + numOfFiles + " files in parallel, after dividing them in " + subListsSize + " segments.";
logger.debug(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasksForFileSegments.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish, additionalLoggingMsg);
});
}
int numFailedSegments = 0;
int numFailedFilesForSegment = 0;
int numAllFailedFiles = 0;
try {
List<Future<Integer>> futures = BulkImportController.bulkImportExecutor.invokeAll(callableTasksForFileSegments); // This waits for all tasks to finish.
int sizeOfFutures = futures.size(); // This is the same as the "subListsSize".
for ( int i = 0; i < sizeOfFutures; ++i )
{ // For each segment..
try {
numFailedFilesForSegment = futures.get(i).get();
numAllFailedFiles += numFailedFilesForSegment;
if ( numFailedFilesForSegment == subLists.get(i).size() )
numFailedSegments++;
// In case all the files failed to be bulk-imported, then we will detect it in the "numSuccessfulSegments"-check later.
// The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory.
} catch (ExecutionException ee) { // These can be serious errors like an "out of memory exception" (Java HEAP).
numFailedSegments ++;
numAllFailedFiles += subLists.get(i).size(); // We assume all files of this segment failed, as all are passed through the same parts of code, so any serious exception should arise from the 1st files being processed and the rest of the files wil be skipped..
logger.error(GenericUtils.getSelectedStackTraceForCausedException(ee, "Task_" + i + " failed with: ", additionalLoggingMsg, 20));
bulkImportReport.addEvent("Segment_" + i + " failed with: " + ee.getCause().getMessage());
} catch (CancellationException ce) {
numFailedSegments ++;
numAllFailedFiles += subLists.get(i).size(); // We assume all files have failed.
logger.error("Task_" + i + " was cancelled: " + ce.getMessage() + additionalLoggingMsg);
bulkImportReport.addEvent("Segment_" + i + " failed with: " + ce.getMessage());
} catch (InterruptedException ie) {
numFailedSegments ++;
numAllFailedFiles += (subLists.get(i).size() / 3); // In this case, only some of the files will have failed (do not know how many).
logger.error("Task_" + i + " was interrupted: " + ie.getMessage());
bulkImportReport.addEvent("Segment_" + i + " failed with: " + ie.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage() + additionalLoggingMsg);
}
}
} catch (Exception e) {
String errorMsg = "An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName;
logger.error(errorMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
} finally {
logger.debug("Deleting local parquet directory: " + localParquetDir);
fileUtils.deleteDirectory(new File(localParquetDir)); // It may not exist at all, if none of the parquet files were created.
}
// Check the results.
if ( numAllFailedFiles == numOfFiles ) {
String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!";
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
} else if ( numAllFailedFiles > 0 ) { // Some failed, but not all.
msg = numAllFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName;
logger.warn(msg);
} else {
msg = "All " + numOfFiles + " files, from bulkImportDir: " + bulkImportDirName + " were bulkImported successfully.";
logger.info(msg);
}
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
// Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations.
DatabaseConnector.databaseLock.lock();
String mergeErrorMsg = parquetFileUtils.mergeParquetFilesOfTable("payload_bulk_import", "", null); // msg is already logged
DatabaseConnector.databaseLock.unlock();
if ( mergeErrorMsg != null ) { // the message in already logged
bulkImportReport.addEvent(mergeErrorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
String successMsg = "Finished the bulk-import procedure for " + provenance + " and bulkImportDir: " + bulkImportDirName;
logger.info(successMsg);
bulkImportReport.addEvent(successMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
// The report-file will be overwritten every now and then, instead of appended, since we want to add an update new JSON report-object each time.
// Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time,
// after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours).
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return true;
}
private int processBulkImportedFilesSegment(BulkImportReport bulkImportReport, int segmentCounter, List<String> fileLocationsSegment, String bulkImportDirName, String localParquetDir, String currentBulkImportHdfsDir,
String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish, String additionalLoggingMsg)
{
// Inside this thread, process a segment of the files.
String bulkImportReportLocation = bulkImportReport.getReportLocation();
int numOfFilesInSegment = fileLocationsSegment.size();
String msg = "Going to import " + numOfFilesInSegment + " files, for segment-" + segmentCounter;
logger.debug(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
HashMap<String, DocFileData> fileLocationsWithData = new HashMap<>(numOfFilesInSegment); // Map the fileLocations with hashes in order to quickly retrieve them later, when constructing the s3-file-location.
Set<String> fileHashes = new HashSet<>(numOfFilesInSegment);
// Get file-data for this segment's files.
for ( String fileLocation : fileLocationsSegment ) {
DocFileData docFileData = new DocFileData(new File(fileLocation), null, null, fileLocation);
docFileData.calculateAndSetHashAndSize();
String fileHash = docFileData.getHash();
if ( fileHash != null ) { // We will not work with files for which we cannot produce the hash, since the s3-file-location requires it.
fileHashes.add(fileHash);
fileLocationsWithData.put(fileLocation, docFileData);
}
}
int fileHashesSetSize = fileHashes.size();
if ( fileHashesSetSize == 0 ) {
msg = "No fileHashes were retrieved for segment_" + segmentCounter + ", no files will be processed from it.";
logger.warn(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return numOfFilesInSegment; // All files of this segment failed.
}
// Get the "alreadyFound" fileLocations for the fileHashes which exist in the DB.
HashMap<String, String> hashWithExistingLocationMap = fileUtils.getHashLocationMap(fileHashes, fileHashesSetSize, segmentCounter, "segment");
int numAlreadyRetrievedFiles = hashWithExistingLocationMap.size();
if ( numAlreadyRetrievedFiles > 0 ) {
msg = numAlreadyRetrievedFiles + " files from segment_" + segmentCounter + ", have been already retrieved in the past.";
logger.warn(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
// Generate the payload-records to be inserted in the DB.
// The OpenAireID which is synthesised from the fileName, may be different from the ID related to the exact same file which was retrieved in the past.
// The same file may be related with multiple publications from different datasources. The deduplication is out of the scope of this service.
// All different OpenAIRE IDs should be saved, independently of the fact that their urls or files may be related with other publications as well.
List<GenericData.Record> payloadRecords = new ArrayList<>(numOfFilesInSegment);
// Use a HashSet for the failed files, in order to not remove them in the end.
HashSet<String> failedFiles = new HashSet<>();
int counter = 0;
// Upload files to S3 and collect payloadRecords.
for ( int i=0; i < numOfFilesInSegment; ++i ) {
String fileLocation = fileLocationsSegment.get(i);
DocFileData docFileData = fileLocationsWithData.get(fileLocation);
if ( docFileData == null ) {
failedFiles.add(fileLocation);
} else {
String alreadyRetrievedFileLocation = hashWithExistingLocationMap.get(docFileData.getHash());
GenericData.Record record = null;
try {
record = processBulkImportedFile(docFileData, alreadyRetrievedFileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg);
} catch (Exception e) {
String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading the rest of the files for this segment.. " + e.getMessage();
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
for ( int j=i; j < numOfFilesInSegment; ++j )
failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed".
break;
}
if ( record != null )
payloadRecords.add(record);
else {
bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!");
failedFiles.add(fileLocation);
}
}
if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment and write it to the file.
msg = "Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files.";
if ( logger.isTraceEnabled() )
logger.trace(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
} // End of processing files for this segment.
int numOfPayloadRecords = payloadRecords.size();
if ( numOfPayloadRecords == 0 ) {
String errorMsg = "No payload-records were generated for any of the files, of segment-" + segmentCounter + ", inside the bulkImportDir: " + bulkImportDirName;
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
} else if ( numOfPayloadRecords != numOfFilesInSegment ) {
// Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early.
String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment-" + segmentCounter + " !";
logger.warn(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
// Construct the parquet file, upload it to HDFS and load it in the "payload_bulk_import" table.
String parquetFileName = "payloads_" + segmentCounter + ".parquet";
String fullLocalParquetFilePath = localParquetDir + parquetFileName;
if ( logger.isTraceEnabled() )
logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath + additionalLoggingMsg); // DEBUG!
if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { // Any errorMsg is already logged inside.
String errorMsg = "Could not write the payload-records for segment-" + segmentCounter + " to the parquet-file: " + parquetFileName + " !";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment; // All files of this segment have failed.
}
if ( logger.isTraceEnabled() )
logger.trace("Going to upload the parquet file: " + fullLocalParquetFilePath + " to HDFS." + additionalLoggingMsg); // DEBUG!
// Upload and insert the data to the "payload" Impala table. (no database-locking is required)
String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); // The returned message is already logged inside.
if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller.
errorMsg = "Could not upload the parquet-file " + parquetFileName + " to HDFS!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment; // All files of this segment have failed.
}
if ( logger.isTraceEnabled() )
logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"." + additionalLoggingMsg); // DEBUG!
DatabaseConnector.databaseLock.lock();
boolean parquetDataLoaded = parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import");
DatabaseConnector.databaseLock.unlock();
if ( !parquetDataLoaded ) {
errorMsg = "Could not load the payload-records to the database, for segment-" + segmentCounter + "!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment; // All files of this segment have failed.
}
String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + ", for segment-" + segmentCounter + ".";
logger.info(segmentSuccessMsg + additionalLoggingMsg);
bulkImportReport.addEvent(segmentSuccessMsg);
if ( shouldDeleteFilesOnFinish ) {
segmentSuccessMsg = "As the user requested, the successfully imported files of " + provenance + " procedure, of bulk-import segment-" + segmentCounter + ", from directory " + bulkImportDirName + ", will be deleted.";
logger.info(segmentSuccessMsg);
bulkImportReport.addEvent(segmentSuccessMsg);
// Delete all files except the ones in the "failedHashSet".
for ( String fileLocation : fileLocationsSegment ) {
if ( !failedFiles.contains(fileLocation) )
if ( !fileUtils.deleteFile(fileLocation) ) // The "error-log-message" is shown inside.
bulkImportReport.addEvent("The file " + fileLocation + " could not be deleted! Please make sure you have provided the WRITE-permission.");
}
}
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return (numOfFilesInSegment - numOfPayloadRecords); // Return the numOfFailedFiles.
}
private GenericData.Record processBulkImportedFile(DocFileData docFileData, String alreadyRetrievedFileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg)
throws ConnectException, UnknownHostException
{
FileLocationData fileLocationData;
try {
fileLocationData = new FileLocationData(docFileData.getLocation());
} catch (RuntimeException re) {
logger.error(re.getMessage() + additionalLoggingMsg);
return null;
}
String fileNameID = fileLocationData.getFileNameID();
String openAireId = generateOpenaireId(fileNameID, bulkImportSource.getDatasourcePrefix(), bulkImportSource.getIsAuthoritative());
if ( openAireId == null )
return null;
String fileHash = docFileData.getHash(); // It's guaranteed to NOT be null at this point.
String s3Url = null;
if ( alreadyRetrievedFileLocation != null ) // If the full-text of this record is already-found and uploaded.
{
// This full-text was found to already be in the database.
// If it has the same datasourceID, then it likely was crawled before from an ID belonging to this datasource.
// If also, has the same ID, then the exact same record from that datasource was retrieved previously.
// Else, the file was downloaded by another record of this datasource.
// ELse if the datasourceID is not the same, then the same file was retrieved from another datasource.
// The above analysis is educational, it does not need to take place and is not currently used.
s3Url = alreadyRetrievedFileLocation;
} else {
s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), bulkImportSource.getDatasourceID(), fileHash);
if ( s3Url == null )
return null;
}
// TODO - If another url-schema is introduced for other datasources, have a "switch"-statement and perform the right "actualUrl"-creation based on current schema.
String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources.
String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link.
return parquetFileUtils.getPayloadParquetRecord(openAireId, originalUrl, actualUrl, timeMillis, bulkImportSource.getMimeType(),
docFileData.getSize(), fileHash, s3Url, provenance, true); // It may return null.
}
public List<String> getFileLocationsInsideDir(String directory)
{
List<String> fileLocations = null;
try ( Stream<Path> walkStream = Files.find(Paths.get(directory), Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) )
// In case we ever include other type-of-Files inside the same directory, we need to add this filter: "&& !filePath.toString().endsWith("name.ext")"
{
fileLocations = walkStream.map(Path::toString).collect(Collectors.toList());
} catch (Exception e) {
String errorMsg = "Could not retrieve the files from directory: '" + directory + "'!";
logger.error(errorMsg, e);
return null;
}
return fileLocations;
}
public String getMD5Hash(String string)
{
String md5 = null;
try {
MessageDigest md5MD = MessageDigest.getInstance("MD5"); // New instance for any new request. Otherwise, we need to synchronize the use of that object among multiple threads.
md5MD.update(string.getBytes());
md5 = DatatypeConverter.printHexBinary(md5MD.digest()).toLowerCase();
} catch (Exception e) {
logger.error("Error when getting the MD5-hash for: " + string, e);
return null;
}
return md5;
}
public String generateOpenaireId(String id, String datasourcePrefix, boolean isAuthoritative)
{
// If the "provenance" relates to an "authoritative" source, then its id has to be lowercase, before the md5() is applied to it.
// general_openaire_id = <datasourcePrefix> + "::" + <md5(ID)>
// authoritative_openaire_id = <datasourcePrefix> + "::" + <md5(lowercase(ID))>
if ( isAuthoritative )
id = id.toLowerCase();
String idMd5Hash = getMD5Hash(id);
if ( idMd5Hash == null )
return null;
return (datasourcePrefix + "::" + idMd5Hash);
}
}