- Improve performance in "FileUtils.addUrlReportsByMatchingRecordsFromBacklog()".

- Make sure we remove the assignments of all "not-successful", old, worker-reports, even for the ones which failed to be renamed to indicate success or failure, or failed to be executed by the background threads (and thus never reached the renaming stage).
This commit is contained in:
Lampros Smyrnaios 2023-10-23 12:21:42 +03:00
parent a7581335f1
commit 0c7bf6357b
3 changed files with 12 additions and 12 deletions

View File

@ -357,7 +357,7 @@ public class ScheduledTasks {
logger.warn("The workerReport \"" + workerReportName + "\" was accessed " + elapsedDays + " days ago (passed the " + daysToWaitBeforeDeletion + " days limit) and will be deleted.");
numWorkerReportsToBeHandled ++;
if ( fileUtils.deleteFile(workerReportFile.getAbsolutePath()) // Either successful or failed.
&& workerReportName.contains("failed") // If this has failed, then delete the assignment-records. For the successful, they have already been deleted.
&& !workerReportName.contains("successful") // If this has failed or its state is unknown (it was never renamed), then delete the assignment-records. For the successful, they have already been deleted.
&& extractAssignmentsCounterAndDeleteRelatedAssignmentRecords(workerReportName) )
numWorkerReportsHandled ++;
}
@ -456,6 +456,8 @@ public class ScheduledTasks {
return false;
}
logger.debug("Will delete the assignments of the old, not-successful, workerReport: " + workerReportName);
DatabaseConnector.databaseLock.lock();
urlsService.deleteAssignmentsBatch(curReportAssignmentsCounter); // Any error-log is written inside.
DatabaseConnector.databaseLock.unlock();

View File

@ -732,13 +732,12 @@ public class FileUtils {
* Then, the program automatically generates "attempt" and "payload" records for these additional UrlReport-records.
* It must be executed inside the same "database-locked" block of code, along with the inserts of the attempt and payload records.
* */
public boolean addUrlReportsByMatchingRecordsFromBacklog(List<UrlReport> urlReports, List<Payload> initialPayloads, long assignmentsBatchCounter)
public boolean addUrlReportsByMatchingRecordsFromBacklog(List<UrlReport> urlReports, List<Payload> initialPayloads, int numInitialPayloads, long assignmentsBatchCounter)
{
int numInitialPayloads = initialPayloads.size();
logger.debug("numInitialPayloads: " + numInitialPayloads + " | assignmentsBatchCounter: " + assignmentsBatchCounter);
// Create a HashMultimap, containing the "original_url" or "actual_url" as the key and the related "payload" objects as its values.
final SetMultimap<String, Payload> urlToPayloadsMultimap = Multimaps.synchronizedSetMultimap(HashMultimap.create((numInitialPayloads / 3), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
final HashMultimap<String, Payload> urlToPayloadsMultimap = HashMultimap.create((numInitialPayloads / 3), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
for ( Payload payload : initialPayloads ) {
String original_url = payload.getOriginal_url();
String actual_url = payload.getActual_url();
@ -791,7 +790,7 @@ public class FileUtils {
}
Set<Payload> foundPayloads = urlToPayloadsMultimap.get(original_url);
// Select a random "foundPayload" to use its data to fill the "prefilledPayload" (in a "Set" the first element is pseudo-random)
// Select a random "foundPayload" to use its data to fill the "prefilledPayload" (in a "Set" the first element is pseudo-random).
Optional<Payload> optPayload = foundPayloads.stream().findFirst();
if ( !optPayload.isPresent() ) {
logger.error("Could not retrieve any payload for the \"original_url\": " + original_url);
@ -803,8 +802,7 @@ public class FileUtils {
prefilledPayloads.add(prefilledPayload);
});
} catch (EmptyResultDataAccessException erdae) {
String errorMsg = "No results retrieved from the \"getDataForPayloadPrefillQuery\", when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + ".";
logger.error(errorMsg);
logger.error("No results retrieved from the \"getDataForPayloadPrefillQuery\", when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + ".");
return false;
} catch (Exception e) {
DatabaseConnector.handleQueryException("getDataForPayloadPrefillQuery", getDataForPayloadPrefillQuery, e);
@ -813,8 +811,7 @@ public class FileUtils {
int numPrefilledPayloads = prefilledPayloads.size();
if ( numPrefilledPayloads == 0 ) {
String errorMsg = "Some results were retrieved from the \"getDataForPayloadPrefillQuery\", but no data could be extracted from them, when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + ".";
logger.error(errorMsg);
logger.error("Some results were retrieved from the \"getDataForPayloadPrefillQuery\", but no data could be extracted from them, when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + ".");
return false;
}

View File

@ -170,11 +170,12 @@ public class ParquetFileUtils {
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful )
{
List<Payload> currentPayloads = urlReports.parallelStream()
List<Payload> initialPayloads = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.collect(Collectors.toList());
if ( currentPayloads.size() > 0 ) // If at least 1 payload was created by the processed records..
int numInitialPayloads = initialPayloads.size();
if ( numInitialPayloads > 0 ) // If at least 1 payload was created by the processed records..
{ // (it's ok to have no payloads, if there were no full-texts available)
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved.
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
@ -184,7 +185,7 @@ public class ParquetFileUtils {
}
// The UrlsReports for the pre-filled are added only here, since we do not want attempt records fo these.
fileUtils.addUrlReportsByMatchingRecordsFromBacklog(urlReports, currentPayloads, curReportAssignments); // This will add more Object in the update the "urlReports" list.
fileUtils.addUrlReportsByMatchingRecordsFromBacklog(urlReports, initialPayloads, numInitialPayloads, curReportAssignments); // This will add more Object in the update the "urlReports" list.
// In case the above method returns an error, nothing happens. We just have only the initial payloads to insert to the DB.
int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.33); // We want 3 sub-lists for the payloads.