jdbcTemplate.execute("CREATE TABLE "+ImpalaConnector.databaseName+"."+tableName+"_tmp stored as parquet AS SELECT * FROM "+ImpalaConnector.databaseName+"."+tableName+""+whereClause+parameter);
// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
SetMultimap<String,Payload>allFileNamesWithPayloads=Multimaps.synchronizedSetMultimap(HashMultimap.create((urlReportsSize/5),3));// Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
finalStringgetFileLocationForHashQuery="select `location` from "+ImpalaConnector.databaseName+".payload"+(isTestEnvironment?"_aggregated":"")+" where `hash` = ? limit 1";
returnnull;// The full-text was not retrieved, go to the next UrlReport.
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
// No fileLocation is found, it's ok. It will be null by default.
}catch(Exceptione){
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n",e);
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - The idea is that since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// Unless we do what it is said above, do not continue to the next UrlReport, this query-exception should not disrupt the normal full-text processing.
}
if(alreadyFoundFileLocation!=null){// If the full-text of this record is already-found and uploaded.
payload.setLocation(alreadyFoundFileLocation);// Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
logger.trace("The record with ID \""+payload.getId()+"\" has an \"alreadyRetrieved\" file, with hash \""+fileHash+"\" and location \""+alreadyFoundFileLocation+"\".");// DEBUG!
returnnull;// Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
logger.error("Failed to match the \"fileLocation\": \""+fileLocation+"\" of id: \""+payload.getId()+"\", originalUrl: \""+payload.getOriginal_url()+"\", using this regex: "+FILENAME_ID_EXTENSION);
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \""+fileLocation+"\", of id: \""+payload.getId()+"\", originalUrl: \""+payload.getOriginal_url()+"\", using this regex: "+FILENAME_ID_EXTENSION);
allFileNamesWithPayloads.put(fileNameWithExtension,payload);// The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
Voidresult=future.get();// The result is always "null" as we have a "Void" type.
}catch(Exceptione){
logger.error("",e);
}
}
}catch(InterruptedExceptionie){// In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish checking for already-found file-hashes: "+ie.getMessage());
// This is a very rare case. At the moment, we just move on with what we have so far.
}catch(Exceptione){
StringerrorMsg="Unexpected error when checking for already-found file-hashes in parallel! "+e.getMessage();
logger.error(errorMsg,e);
returnUploadFullTextsResponse.unsuccessful;
}finally{
ImpalaConnector.databaseLock.unlock();// The remaining work of this function does not use the database.
logger.info("NumFullTextsFound by assignments_"+assignmentsBatchCounter+" = "+numFullTextsFound.get()+" (out of "+urlReportsSize+" | about "+df.format(numFullTextsFound.get()*100.0/urlReportsSize)+"%).");
if(remainingFiles>0){// Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
logger.debug("The assignments_"+assignmentsBatchCounter+" have "+numAllFullTexts+" distinct non-already-uploaded fullTexts (total is: "+numFullTextsFound.get()+"). Going to request them from the Worker \""+workerId+"\", in "+numOfBatches+" batches ("+numOfFullTextsPerBatch+" files each, except for the final batch, which will have "+remainingFiles+" files).");
logger.debug("The assignments_"+assignmentsBatchCounter+" have "+numAllFullTexts+" distinct non-already-uploaded fullTexts (total is: "+numFullTextsFound.get()+"). Going to request them from the Worker \""+workerId+"\", in "+numOfBatches+" batches ("+numOfFullTextsPerBatch+" files each).");
failedBatches+=(1+(numOfBatches-batchCounter));// The "failedBatches" will have the previously failedBatches + this one + the remaining batches which will likely fail too, thus, they will not be tested.
if((extractedFileNames==null)||(extractedFileNames.length<=2)){// The directory might have only two files, the "tar-file" and the "tar.zstd-file", if the full-texts failed to be decompressed or untarred..
logger.error("No full-texts' fleNames where extracted from directory: "+targetDirectory);
logger.warn("The number of extracted files ("+(extractedFileNames.length-2)+") was not equal to the number of the current-batch's ("+batchCounter+") files ("+fileNamesForCurBatch.size()+").");
// We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to <null>,
// since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null.
logger.error("Could not extract and upload the full-texts for batch_"+batchCounter+" of assignments_"+assignmentsBatchCounter+"\n"+e.getMessage(),e);// It shows the response body (after Spring v.2.5.6).
updateUrlReportsToHaveNoFullTextFiles(urlReports,true);// Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed).
//logger.debug("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]");
logger.warn("HTTP-"+statusCode+": "+getMessageFromResponseBody(conn,true)+"\n\nProblem when requesting the ZstdFile of batch_"+batchNum+" from the Worker with ID \""+workerId+"\" and requestUrl: "+requestUrl);
if((statusCode>=500)&&(statusCode<=599))
thrownewRuntimeException();// Throw an exception to indicate that the Worker has problems and all remaining batches will fail as well.
logger.warn("Problem when requesting the ZstdFile of batch_"+batchNum+" of assignments_"+assignmentsBatchCounter+" from the Worker with ID \""+workerId+"\" and requestUrl: "+requestUrl+"\n"+exMessage);
// Check if this stored file is related to one or more Payloads from the Set. Defend against malicious file injection. It does not add more overhead, since we already need the "fileRelatedPayloads".
if(fileRelatedPayloads.isEmpty()){// In case the "fileName" is not inside the "allFileNamesWithPayloads" HashMultimap.
logger.error("The stored file \""+fileName+"\" is not related to any Payload returned from the Worker!");
continue;
}
// Let's try to upload the file to S3 and update the payloads, either in successful file-uploads (right-away) or not (in the end).
try{
// Prepare the filename as: "datasourceid/publicationid::hash.pdf"
// All related payloads point to this exact same file, BUT, may be related with different urlIDs, which in turn be related with different datasourceIDs.
// This file could have been found from different urlIds and thus be related to multiple datasourceIds.
// BUT, since the filename contains a specific urlID, the datasourceId should be the one related to that specific urlID.
// So, we extract this urlID, search the payload inside the "fileRelatedPayloads" and get the related datasourceID (instead of taking the first or a random datasourceID).
logger.error("The retrieved \"datasourceId\" was \"null\" for file: "+fileName);
returnnull;
}
if(hash==null){
logger.error("The retrieved \"hash\" was \"null\" for file: "+fileName);
returnnull;
}
StringfileFullPath=fileDir+File.separator+fileName;// The fullPath to the local file.
// Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName).
// Now we append the file-hash, so it is guaranteed that the filename will be unique.
fileName=datasourceId+"/"+openAireID+"::"+hash+dotFileExtension;// This is the fileName to be used in the objectStore, not of the local file!
// Mark this full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text ("actual_url") will be kept, for now.
payload.setLocation(null);// This will cause the payload to not be inserted into the "payload" table in the database. Only the "attempt" record will be inserted.