- Move the "uploadFullTexts"-code in its own method.
- Code polishing.
This commit is contained in:
parent
b0c57d79a5
commit
f183df276b
|
@ -46,11 +46,11 @@ public class FileUtils {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
|
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
|
||||||
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
|
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
|
||||||
* Renames the clone to the original's name.
|
* Renames the clone to the original's name.
|
||||||
* Returns the errorMsg, if an error appears, otherwise is returns "null".
|
* Returns the errorMsg, if an error appears, otherwise is returns "null".
|
||||||
* */
|
*/
|
||||||
public String mergeParquetFiles(String tableName, String whereClause, String parameter) {
|
public String mergeParquetFiles(String tableName, String whereClause, String parameter) {
|
||||||
String errorMsg;
|
String errorMsg;
|
||||||
if ( (tableName == null) || tableName.isEmpty() ) {
|
if ( (tableName == null) || tableName.isEmpty() ) {
|
||||||
|
@ -187,7 +187,7 @@ public class FileUtils {
|
||||||
if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
|
if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
|
||||||
numOfBatches ++;
|
numOfBatches ++;
|
||||||
|
|
||||||
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches.");
|
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts. Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches.");
|
||||||
|
|
||||||
// Check if one full text is left out because of the division. Put it int the last batch.
|
// Check if one full text is left out because of the division. Put it int the last batch.
|
||||||
String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
|
String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
|
||||||
|
@ -208,7 +208,7 @@ public class FileUtils {
|
||||||
} catch (RuntimeException re) {
|
} catch (RuntimeException re) {
|
||||||
// The "cause" was logged inside "getConnection()".
|
// The "cause" was logged inside "getConnection()".
|
||||||
failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested.
|
failedBatches += (1 + (numOfBatches - batchCounter)); // The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested.
|
||||||
break;
|
break; // The rest of the batches will likely fail as well.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the extracted files.
|
// Get the extracted files.
|
||||||
|
@ -230,97 +230,13 @@ public class FileUtils {
|
||||||
fileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
|
fileUnZipper.unzipFolder(Paths.get(zipFileFullPath), curBatchPath);
|
||||||
|
|
||||||
String[] fileNames = new File(targetDirectory).list();
|
String[] fileNames = new File(targetDirectory).list();
|
||||||
if ( (fileNames == null) || (fileNames.length <= 1 ) ) { // The directory might have only one file, the "zip-file".
|
if ( (fileNames == null) || (fileNames.length <= 1) ) { // The directory might have only one file, the "zip-file".
|
||||||
logger.error("No full-text fileNames where extracted from directory: " + targetDirectory);
|
logger.error("No full-text fileNames where extracted from directory: " + targetDirectory);
|
||||||
failedBatches ++;
|
failedBatches ++;
|
||||||
continue; // To the next batch.
|
continue; // To the next batch.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate over the files and upload them to S3.
|
uploadFullTexts(fileNames, targetDirectory, zipFileFullPath, allFileNamesWithPayloads);
|
||||||
//int numUploadedFiles = 0;
|
|
||||||
for ( String fileName : fileNames )
|
|
||||||
{
|
|
||||||
String fileFullPath = targetDirectory + fileName;
|
|
||||||
if ( fileFullPath.equals(zipFileFullPath) ) // Exclude the zip-file from uploading.
|
|
||||||
continue;
|
|
||||||
|
|
||||||
// Check if this stored file is related to one or more Payloads from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedPayloads".
|
|
||||||
Set<Payload> fileRelatedPayloads = allFileNamesWithPayloads.get(fileName);
|
|
||||||
if ( fileRelatedPayloads.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithPayloads" HashMultimap.
|
|
||||||
logger.error("The stored file \"" + fileName + "\" is not related to any Payload returned from the Worker!");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end).
|
|
||||||
try {
|
|
||||||
// Prepare the filename as: "datasourceid/publicationid::hash.pdf"
|
|
||||||
// All related payloads point to this exact same file, BUT, may be related with different urlIDs, which in turn be related with different datasourceIDs.
|
|
||||||
// This file could have been found from different urlIds and thus be related to multiple datasourceIds.
|
|
||||||
// BUT, since the filename contains a specific urlID, the datasourceId should be the one related to that specific urlID.
|
|
||||||
// So, we extract this urlID, search the payload inside the "fileRelatedPayloads" and get the related datasourceID (instead of taking the first or a random datasourceID).
|
|
||||||
|
|
||||||
Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileName);
|
|
||||||
if ( ! matcher.matches() ) {
|
|
||||||
logger.error("Failed to match the \"" + fileName + "\" with the regex: " + FILENAME_ID_EXTENSION);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String filenameWithoutExtension = matcher.group(2);
|
|
||||||
if ( (filenameWithoutExtension == null) || filenameWithoutExtension.isEmpty() ) {
|
|
||||||
logger.error("Failed to extract the \"filenameWithoutExtension\" from \"" + fileName + "\".");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String fileNameID = matcher.group(3);
|
|
||||||
if ( (fileNameID == null) || fileNameID.isEmpty() ) {
|
|
||||||
logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\".");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String fileExtension = matcher.group(4);
|
|
||||||
if ( (fileExtension == null) || fileExtension.isEmpty() ) {
|
|
||||||
logger.error("Failed to extract the \"fileExtension\" from \"" + fileName + "\".");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// This file is related with some payloads, in a sense that these payloads have urls which lead to the same full-text url.
|
|
||||||
// These payloads might have different IDs and sourceUrls. But, in the end, the different sourceUrls give the same full-text.
|
|
||||||
// Below, we make sure we pick the "datasource" from the payload, which has the same id as the full-text's name.
|
|
||||||
// If there are multiple payloads with the same id, which point to the same file, then we can take whatever datasource we want from those payloads.
|
|
||||||
// It is possible that payloads with same IDs, but different sourceUrls pointing to the same full-text, can be related with different datasources
|
|
||||||
// (especially for IDs of type: "doiboost_____::XXXXXXXXXXXXXXXXXXXXX")
|
|
||||||
// It does not really matter, since the first-ever payload to give this full-text could very well be another one,
|
|
||||||
// since the crawling happens in multiple threads which compete with each other for CPU time.
|
|
||||||
|
|
||||||
String datasourceId = null;
|
|
||||||
String hash = null;
|
|
||||||
boolean isFound = false;
|
|
||||||
for ( Payload payload : fileRelatedPayloads ) {
|
|
||||||
if ( fileNameID.equals(payload.getId()) ) {
|
|
||||||
datasourceId = payload.getDatasourceId();
|
|
||||||
hash = payload.getHash();
|
|
||||||
isFound = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( ! isFound ) { // This should never normally happen. If it does, then a very bad change will have taken place.
|
|
||||||
logger.error("The \"fileNameID\" (" + fileNameID + ") was not found inside the \"fileRelatedPayloads\" for fileName: " + fileName);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use the "fileID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName).
|
|
||||||
// Now we append the file-hash, so it is guaranteed that the filename will be unique.
|
|
||||||
fileName = datasourceId + "/" + fileNameID + "::" + hash + fileExtension;
|
|
||||||
|
|
||||||
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
|
|
||||||
setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url);
|
|
||||||
//numUploadedFiles ++;
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore, exception: " + e.getMessage(), e);
|
|
||||||
}
|
|
||||||
// Else, the record will have its file-data set to "null", in the end of this method.
|
|
||||||
}
|
|
||||||
|
|
||||||
//logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -1) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
|
|
||||||
// (fileNames.length -1) --> minus the zip-file
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
|
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
|
||||||
|
@ -332,7 +248,7 @@ public class FileUtils {
|
||||||
deleteDirectory(curAssignmentsBaseDir);
|
deleteDirectory(curAssignmentsBaseDir);
|
||||||
|
|
||||||
if ( failedBatches == numOfBatches ) {
|
if ( failedBatches == numOfBatches ) {
|
||||||
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
|
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
|
||||||
return UploadFullTextsResponse.unsuccessful;
|
return UploadFullTextsResponse.unsuccessful;
|
||||||
} else
|
} else
|
||||||
return UploadFullTextsResponse.successful;
|
return UploadFullTextsResponse.successful;
|
||||||
|
@ -368,6 +284,92 @@ public class FileUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void uploadFullTexts(String[] fileNames, String targetDirectory, String zipFileFullPath, HashMultimap<String, Payload> allFileNamesWithPayloads)
|
||||||
|
{
|
||||||
|
// Iterate over the files and upload them to S3.
|
||||||
|
//int numUploadedFiles = 0;
|
||||||
|
for( String fileName : fileNames )
|
||||||
|
{
|
||||||
|
String fileFullPath = targetDirectory + fileName;
|
||||||
|
if ( fileFullPath.equals(zipFileFullPath) ) // Exclude the zip-file from uploading.
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// Check if this stored file is related to one or more Payloads from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedPayloads".
|
||||||
|
Set<Payload> fileRelatedPayloads = allFileNamesWithPayloads.get(fileName);
|
||||||
|
if ( fileRelatedPayloads.isEmpty() ) { // In case the "fileName" is not inside the "allFileNamesWithPayloads" HashMultimap.
|
||||||
|
logger.error("The stored file \"" + fileName + "\" is not related to any Payload returned from the Worker!");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end).
|
||||||
|
try {
|
||||||
|
// Prepare the filename as: "datasourceid/publicationid::hash.pdf"
|
||||||
|
// All related payloads point to this exact same file, BUT, may be related with different urlIDs, which in turn be related with different datasourceIDs.
|
||||||
|
// This file could have been found from different urlIds and thus be related to multiple datasourceIds.
|
||||||
|
// BUT, since the filename contains a specific urlID, the datasourceId should be the one related to that specific urlID.
|
||||||
|
// So, we extract this urlID, search the payload inside the "fileRelatedPayloads" and get the related datasourceID (instead of taking the first or a random datasourceID).
|
||||||
|
|
||||||
|
Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileName);
|
||||||
|
if ( !matcher.matches() ) {
|
||||||
|
logger.error("Failed to match the \"" + fileName + "\" with the regex: " + FILENAME_ID_EXTENSION);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// The "matcher.group(2)" returns the "filenameWithoutExtension", which is currently not used.
|
||||||
|
String fileNameID = matcher.group(3);
|
||||||
|
if ( (fileNameID == null) || fileNameID.isEmpty() ) {
|
||||||
|
logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\".");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String dotFileExtension = matcher.group(4);
|
||||||
|
if ( (dotFileExtension == null) || dotFileExtension.isEmpty() ) {
|
||||||
|
logger.error("Failed to extract the \"dotFileExtension\" from \"" + fileName + "\".");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This file is related with some payloads, in a sense that these payloads have urls which lead to the same full-text url.
|
||||||
|
// These payloads might have different IDs and sourceUrls. But, in the end, the different sourceUrls give the same full-text.
|
||||||
|
// Below, we make sure we pick the "datasource" from the payload, which has the same id as the full-text's name.
|
||||||
|
// If there are multiple payloads with the same id, which point to the same file, then we can take whatever datasource we want from those payloads.
|
||||||
|
// It is possible that payloads with same IDs, but different sourceUrls pointing to the same full-text, can be related with different datasources
|
||||||
|
// (especially for IDs of type: "doiboost_____::XXXXXXXXXXXXXXXXXXXXX").
|
||||||
|
// It does not really matter, since the first-ever payload to give this full-text could very well be another one,
|
||||||
|
// since the crawling happens in multiple threads which compete with each other for CPU time.
|
||||||
|
|
||||||
|
String datasourceId = null;
|
||||||
|
String hash = null;
|
||||||
|
boolean isFound = false;
|
||||||
|
for ( Payload payload : fileRelatedPayloads ) {
|
||||||
|
if ( fileNameID.equals(payload.getId()) ) {
|
||||||
|
datasourceId = payload.getDatasourceId();
|
||||||
|
hash = payload.getHash();
|
||||||
|
isFound = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( !isFound ) { // This should never normally happen. If it does, then a very bad change will have taken place.
|
||||||
|
logger.error("The \"fileNameID\" (" + fileNameID + ") was not found inside the \"fileRelatedPayloads\" for fileName: " + fileName);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName).
|
||||||
|
// Now we append the file-hash, so it is guaranteed that the filename will be unique.
|
||||||
|
fileName = datasourceId + "/" + fileNameID + "::" + hash + dotFileExtension;
|
||||||
|
|
||||||
|
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
|
||||||
|
setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url);
|
||||||
|
//numUploadedFiles ++;
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Could not upload the file \"" + fileName + "\" to the S3 ObjectStore!", e);
|
||||||
|
}
|
||||||
|
// Else, the record will have its file-data set to "null", in the end of this method.
|
||||||
|
}
|
||||||
|
|
||||||
|
//logger.debug("Finished uploading " + numUploadedFiles + " full-texts (out of " + (fileNames.length -1) + " distinct files) from assignments_" + assignmentsBatchCounter + ", batch_" + batchCounter + " on S3-ObjectStore.");
|
||||||
|
// (fileNames.length -1) --> minus the zip-file
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public String getMessageFromResponseBody(HttpURLConnection conn, boolean isError) {
|
public String getMessageFromResponseBody(HttpURLConnection conn, boolean isError) {
|
||||||
final StringBuilder msgStrB = new StringBuilder(500);
|
final StringBuilder msgStrB = new StringBuilder(500);
|
||||||
try ( BufferedReader br = new BufferedReader(new InputStreamReader((isError ? conn.getErrorStream() : conn.getInputStream()))) ) { // Try-with-resources
|
try ( BufferedReader br = new BufferedReader(new InputStreamReader((isError ? conn.getErrorStream() : conn.getInputStream()))) ) { // Try-with-resources
|
||||||
|
|
Loading…
Reference in New Issue