forked from lsmyrnaios/UrlsController
- Handle the case when some results have been found from the "getAssignmentsQuery", but no data could be extracted from them.
- Code polishing.
This commit is contained in:
parent
ede7ca5a89
commit
865926fbc3
|
@ -199,8 +199,17 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
}
|
||||
|
||||
int assignmentsSize = assignments.size(); // It will not be zero here! As in case of no results, the "EmptyResultDataAccessException" is thrown and handled.
|
||||
if ( assignmentsSize < assignmentsLimit )
|
||||
int assignmentsSize = assignments.size();
|
||||
if ( assignmentsSize == 0 ) {
|
||||
String tmpErrMsg = dropCurrentAssignmentTable();
|
||||
DatabaseConnector.databaseLock.unlock();
|
||||
errorMsg = "Some results were retrieved from the \"getAssignmentsQuery\", but no data could be extracted from them, for worker with id: " + workerId;
|
||||
if ( tmpErrMsg != null )
|
||||
errorMsg += "\n" + tmpErrMsg;
|
||||
logger.error(errorMsg);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
|
||||
}
|
||||
else if ( assignmentsSize < assignmentsLimit )
|
||||
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + ", for the next requests.");
|
||||
|
||||
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
|
||||
|
@ -240,10 +249,9 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
|
||||
final HashMap<String, Assignment> uniquePairsAndAssignments = new HashMap<>((int) (assignmentsLimit * 0.9));
|
||||
|
||||
for ( Assignment assignment : assignments ) {
|
||||
for ( Assignment assignment : assignments )
|
||||
uniquePairsAndAssignments.put(assignment.getId() + "_" + assignment.getOriginalUrl(), assignment);
|
||||
// This will just update the duplicate record with another "assignment object", containing a different datasource.
|
||||
}
|
||||
|
||||
List<Assignment> distinctAssignments = new ArrayList<>(uniquePairsAndAssignments.values());
|
||||
int distinctAssignmentsSize = distinctAssignments.size();
|
||||
|
@ -384,7 +392,7 @@ public class UrlsServiceImpl implements UrlsService {
|
|||
}
|
||||
|
||||
// Delete the assignments each time, as they are bound to the "current" assignmentsCounter. Otherwise, they will never be deleted!
|
||||
// If this method exits sooner, due tio an error, then the assignments are not deleted in order to wait for the schedulers to retry them and not be given to workers, to avoid reprocessing the urls.
|
||||
// If this method exits sooner, due to an error, then the assignments are not deleted in order to wait for the schedulers to retry them and not be given to workers, to avoid reprocessing the urls.
|
||||
DatabaseConnector.databaseLock.lock();
|
||||
String deleteErrorMsg = deleteAssignmentsBatch(curReportAssignmentsCounter);
|
||||
DatabaseConnector.databaseLock.unlock();
|
||||
|
|
|
@ -297,9 +297,6 @@ public class FileUtils {
|
|||
DatabaseConnector.databaseLock.unlock(); // The remaining work of this function does not use the database.
|
||||
}
|
||||
|
||||
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextsFound.get() + " (out of " + sizeOfUrlReports + " | about " + df.format(numFullTextsFound.get() * 100.0 / sizeOfUrlReports) + "%).");
|
||||
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches.get());
|
||||
|
||||
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet());
|
||||
int numAllFullTexts = allFileNames.size();
|
||||
if ( numAllFullTexts == 0 ) {
|
||||
|
@ -307,6 +304,9 @@ public class FileUtils {
|
|||
return UploadFullTextsResponse.successful; // It was handled, no error.
|
||||
}
|
||||
|
||||
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextsFound.get() + " (out of " + sizeOfUrlReports + " | about " + df.format(numFullTextsFound.get() * 100.0 / sizeOfUrlReports) + "%).");
|
||||
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches.get());
|
||||
|
||||
// Request the full-texts in batches, compressed in a zstd tar file.
|
||||
int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch);
|
||||
int remainingFiles = (numAllFullTexts % numOfFullTextsPerBatch);
|
||||
|
@ -375,10 +375,10 @@ public class FileUtils {
|
|||
|
||||
// Check and warn about the number of failed payloads.
|
||||
// Possible reasons: failed to check their hash in the DB, the file was not found inside the worker, whole batch failed to be delivered from the worker, files failed t be uploaded to S3
|
||||
long finalPayloadsCounter = urlReports.parallelStream().filter(urlReport -> {
|
||||
Payload payload = urlReport.getPayload();
|
||||
return ((payload != null) && (payload.getLocation() != null));
|
||||
}).count();
|
||||
// Retrieve the payloads from the existing urlReports.
|
||||
long finalPayloadsCounter = urlReports.parallelStream()
|
||||
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
|
||||
.count();
|
||||
int numInitialPayloads = numPayloadsToBeHandled.get();
|
||||
long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter);
|
||||
if ( numFailedPayloads == numInitialPayloads ) {
|
||||
|
@ -420,7 +420,7 @@ public class FileUtils {
|
|||
logger.error("No full-texts' fleNames where extracted from directory: " + targetDirectory);
|
||||
return false;
|
||||
} else if ( (extractedFileNames.length - 2) != fileNamesForCurBatch.size() ) {
|
||||
logger.warn("The number of extracted files (" + (extractedFileNames.length - 2) + ") was not equal to the number of the current-batch's (" + batchCounter + ") files (" + fileNamesForCurBatch.size() + ").");
|
||||
logger.warn("The number of extracted files (" + (extractedFileNames.length - 2) + ") was not equal to the number of files (" + fileNamesForCurBatch.size() + ") of the current batch_" + batchCounter);
|
||||
// We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to <null>,
|
||||
// since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null.
|
||||
}
|
||||
|
@ -681,7 +681,7 @@ public class FileUtils {
|
|||
* @param filePayloads
|
||||
* @param s3Url
|
||||
*/
|
||||
public void setFullTextForMultiplePayloads(Set<Payload> filePayloads, String s3Url) {
|
||||
public void setFullTextForMultiplePayloads(@NotNull Set<Payload> filePayloads, String s3Url) {
|
||||
for ( Payload payload : filePayloads )
|
||||
if ( payload != null )
|
||||
payload.setLocation(s3Url); // Update the file-location to the new S3-url. All the other file-data is already set from the Worker.
|
||||
|
@ -736,4 +736,4 @@ public class FileUtils {
|
|||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue