jdbcTemplate.execute("CREATE TABLE "+ImpalaConnector.databaseName+"."+tableName+"_tmp stored as parquet AS SELECT * FROM "+ImpalaConnector.databaseName+"."+tableName+""+whereClause+parameter);
errorMsg="Problem when copying the contents of \""+tableName+"\" table to a newly created \""+tableName+"_tmp\" table, when merging the parquet-files!\n";
logger.error(errorMsg,e);
try{// Make sure we delete the possibly half-created temp-table.
jdbcTemplate.execute("DROP TABLE IF EXISTS"+ImpalaConnector.databaseName+"."+tableName+"_tmp PURGE");
// We cannot move on with merging, but no harm happened, since the "table_tmp" name is still reserved for future use (after it was dropped immediately)..
}catch(Exceptione1){
logger.error("Failed to drop the \""+tableName+"_tmp\" table!",e1);
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
returnerrorMsg;// We return only the initial error to the Worker, which is easily distinguished indie the "merge-queries".
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
// Rename the temp-table to have the initial-table's name.
try{
jdbcTemplate.execute("ALTER TABLE "+ImpalaConnector.databaseName+"."+tableName+"_tmp RENAME TO "+ImpalaConnector.databaseName+"."+tableName);
}catch(Exceptione){
errorMsg="Problem in renaming the \""+tableName+"_tmp\" table to \""+tableName+"\", when merging the parquet-files!\n";
logger.error(errorMsg,e);
// At this point we only have a "temp-table", the original is already deleted..
// Try to create the original, as a copy of the temp-table. If that succeeds, then try to delete the temp-table.
try{
jdbcTemplate.execute("CREATE TABLE "+ImpalaConnector.databaseName+"."+tableName+" stored as parquet AS SELECT * FROM "+ImpalaConnector.databaseName+"."+tableName+"_tmp");
}catch(Exceptione1){
errorMsg="Problem when copying the contents of \""+tableName+"_tmp\" table to a newly created \""+tableName+"\" table, when merging the parquet-files!\n";
logger.error(errorMsg,e1);
// If the original table was not created, then we have to intervene manually, if it was created but without any data, then we can safely move on handling other assignments and workerReports, but the data will be lost! So this workerReport failed to be handled.
try{// The below query normally returns a list, as it takes a "regex-pattern" as an input. BUT, we give just the table name, without wildcards. So the result is either the tableName itself or none (not any other table).
jdbcTemplate.queryForObject("SHOW TABLES IN "+ImpalaConnector.databaseName+" LIKE '"+tableName+"'",List.class);
}catch(EmptyResultDataAccessExceptionerdae){
// The table does not exist, so it was not even half-created by the previous query.
// Not having the original table anymore is a serious error. A manual action is needed!
logger.error((errorMsg+="The original table \""+tableName+"\" must be created manually! Serious problems may appear otherwise!"));
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
// Here, the original-table exists in the DB, BUT without any data inside! This worker Report failed to handled! (some of its data could not be loaded to the database, and all previous data was lost).
returnerrorMsg;
}
// The creation of the original table was successful. Try to delete the temp-table.
logger.error("Problem when gathering information from table \""+tableName+"\" to be used for queries-optimization.",e);
// In this case the error is not so important to the whole operation.. It's only that the performance of this specific table will be less optimal, only temporarily, unless every "COMPUTE STATS" query fails for future workerReports too.
// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
SetMultimap<String,Payload>allFileNamesWithPayloads=Multimaps.synchronizedSetMultimap(HashMultimap.create((urlReportsSize/5),3));// Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
finalStringgetFileLocationForHashQuery="select `location` from "+ImpalaConnector.databaseName+".payload"+(isTestEnvironment?"_aggregated":"")+" where `hash` = ? limit 1";
returnnull;// The full-text was not retrieved, go to the next UrlReport.
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
// No fileLocation is found, it's ok. It will be null by default.
}catch(Exceptione){
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n",e);
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - The idea is that since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// Unless we do what it is said above, do not continue to the next UrlReport, this query-exception should not disrupt the normal full-text processing.
}
if(alreadyFoundFileLocation!=null){// If the full-text of this record is already-found and uploaded.
payload.setLocation(alreadyFoundFileLocation);// Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
logger.trace("The record with ID \""+payload.getId()+"\" has an \"alreadyRetrieved\" file, with hash \""+fileHash+"\" and location \""+alreadyFoundFileLocation+"\".");// DEBUG!
returnnull;// Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
logger.error("Failed to match the \"fileLocation\": \""+fileLocation+"\" of id: \""+payload.getId()+"\", originalUrl: \""+payload.getOriginal_url()+"\", using this regex: "+FILENAME_ID_EXTENSION);
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \""+fileLocation+"\", of id: \""+payload.getId()+"\", originalUrl: \""+payload.getOriginal_url()+"\", using this regex: "+FILENAME_ID_EXTENSION);
allFileNamesWithPayloads.put(fileNameWithExtension,payload);// The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
Voidresult=future.get();// The result is always "null" as we have a "Void" type.
}catch(Exceptione){
logger.error("",e);
}
}
}catch(InterruptedExceptionie){// In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish checking for already-found file-hashes: "+ie.getMessage());
// This is a very rare case. At the moment, we just move on with what we have so far.
}catch(Exceptione){
StringerrorMsg="Unexpected error when checking for already-found file-hashes in parallel! "+e.getMessage();
logger.error(errorMsg,e);
returnUploadFullTextsResponse.unsuccessful;
}finally{
ImpalaConnector.databaseLock.unlock();// The remaining work of this function does not use the database.
logger.info("NumFullTextsFound by assignments_"+assignmentsBatchCounter+" = "+numFullTextsFound.get()+" (out of "+urlReportsSize+" | about "+df.format(numFullTextsFound.get()*100.0/urlReportsSize)+"%).");
if(remainingFiles>0){// Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
logger.debug("The assignments_"+assignmentsBatchCounter+" have "+numAllFullTexts+" distinct non-already-uploaded fullTexts (total is: "+numFullTextsFound.get()+"). Going to request them from the Worker \""+workerId+"\", in "+numOfBatches+" batches ("+numOfFullTextsPerBatch+" files each, except for the final batch, which will have "+remainingFiles+" files).");
logger.debug("The assignments_"+assignmentsBatchCounter+" have "+numAllFullTexts+" distinct non-already-uploaded fullTexts (total is: "+numFullTextsFound.get()+"). Going to request them from the Worker \""+workerId+"\", in "+numOfBatches+" batches ("+numOfFullTextsPerBatch+" files each).");
// TODO - Currently, for big assignments (e.g. 10000), it takes 2 mins (actually 1,5 mins after using the Zstandard compression) for the worker to zstd the files and return them FOR EACH BATCH
// Also it takes around 3 mins for the Controller to process the received files FOR EACH BATCH
// So, for 24 batches, it takes around 24 * 2 * 3 = 144 mins to process all the full-texts for each assignments-batch.
// The worker will not have 2 parallel requests for zstd files, so the single CPU there will not be stressed to zstd many files in parallel.
// Yes the Controller may have the situation in which before finishing uploading the previously receive files to S3, it receives the new zstd from the Worker.
// TODO - BUT, we can make the new thread "WAIT" for the previous to finish.
// The base-directory will be created along with the first batch-directory.
}catch(Exceptione){
logger.error("Could not create the \"curBatchPath\" directory: "+targetDirectory+"\n"+e.getMessage(),e);// It shows the response body (after Spring v.2.5.6).
failedBatches+=(1+(numOfBatches-batchCounter));// The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested. Some initial batches may have succeeded.
updateUrlReportsToHaveNoFullTextFiles(urlReports,true);// Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed).
if((extractedFileNames==null)||(extractedFileNames.length<=2)){// The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred..
logger.error("No full-texts' fleNames where extracted from directory: "+targetDirectory);
logger.warn("The number of extracted files ("+(extractedFileNames.length-2)+") was not equal to the number of the current-batch's ("+batchCounter+") files ("+fileNamesForCurBatch.size()+").");
// We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to <null>,
// since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null.
logger.error("Could not extract and upload the full-texts for batch_"+batchCounter+" of assignments_"+assignmentsBatchCounter+"\n"+e.getMessage(),e);// It shows the response body (after Spring v.2.5.6).
//logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
logger.warn("HTTP-"+statusCode+": "+getMessageFromResponseBody(conn,true)+"\n\nProblem when requesting the ZstdFile of batch_"+batchNum+" from the Worker with ID \""+workerId+"\" and requestUrl: "+requestUrl);
if((statusCode>=500)&&(statusCode<=599))
thrownewRuntimeException();// Throw an exception to indicate that the Worker has problems and all remaining batches will fail as well.
logger.warn("Problem when requesting the ZstdFile of batch_"+batchNum+" of assignments_"+assignmentsBatchCounter+" from the Worker with ID \""+workerId+"\" and requestUrl: "+requestUrl+"\n"+exMessage);
// Check if this stored file is related to one or more Payloads from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedPayloads".
if(fileRelatedPayloads.isEmpty()){// In case the "fileName" is not inside the "allFileNamesWithPayloads" HashMultimap.
logger.error("The stored file \""+fileName+"\" is not related to any Payload returned from the Worker!");
continue;
}
// Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end).
try{
// Prepare the filename as: "datasourceid/publicationid::hash.pdf"
// All related payloads point to this exact same file, BUT, may be related with different urlIDs, which in turn be related with different datasourceIDs.
// This file could have been found from different urlIds and thus be related to multiple datasourceIds.
// BUT, since the filename contains a specific urlID, the datasourceId should be the one related to that specific urlID.
// So, we extract this urlID, search the payload inside the "fileRelatedPayloads" and get the related datasourceID (instead of taking the first or a random datasourceID).
logger.error("The retrieved \"datasourceId\" was \"null\" for file: "+fileName);
returnnull;
}
if(hash==null){
logger.error("The retrieved \"hash\" was \"null\" for file: "+fileName);
returnnull;
}
StringfileFullPath=fileDir+File.separator+fileName;// The fullPath to the local file.
// Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName).
// Now we append the file-hash, so it is guaranteed that the filename will be unique.
fileName=datasourceId+"/"+openAireID+"::"+hash+dotFileExtension;// This is the fileName to be used in the objectStore, not of the local file!
// Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text ("actual_url") will be kept, for now.
payload.setLocation(null);// This will cause the payload to not be inserted into the "payload" table in the database. Only the "attempt" record will be inserted.