From 0c7bf6357baf5f412bb02fe9c765855193133c0a Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Mon, 23 Oct 2023 12:21:42 +0300 Subject: [PATCH] - Improve performance in "FileUtils.addUrlReportsByMatchingRecordsFromBacklog()". - Make sure we remove the assignments of all "not-successful", old, worker-reports, even for the ones which failed to be renamed to indicate success or failure, or failed to be executed by the background threads (and thus never reached the renaming stage). --- .../urls_controller/components/ScheduledTasks.java | 4 +++- .../eu/openaire/urls_controller/util/FileUtils.java | 13 +++++-------- .../urls_controller/util/ParquetFileUtils.java | 7 ++++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index 866d134..9a3d333 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -357,7 +357,7 @@ public class ScheduledTasks { logger.warn("The workerReport \"" + workerReportName + "\" was accessed " + elapsedDays + " days ago (passed the " + daysToWaitBeforeDeletion + " days limit) and will be deleted."); numWorkerReportsToBeHandled ++; if ( fileUtils.deleteFile(workerReportFile.getAbsolutePath()) // Either successful or failed. - && workerReportName.contains("failed") // If this has failed, then delete the assignment-records. For the successful, they have already been deleted. + && !workerReportName.contains("successful") // If this has failed or its state is unknown (it was never renamed), then delete the assignment-records. For the successful, they have already been deleted. && extractAssignmentsCounterAndDeleteRelatedAssignmentRecords(workerReportName) ) numWorkerReportsHandled ++; } @@ -456,6 +456,8 @@ public class ScheduledTasks { return false; } + logger.debug("Will delete the assignments of the old, not-successful, workerReport: " + workerReportName); + DatabaseConnector.databaseLock.lock(); urlsService.deleteAssignmentsBatch(curReportAssignmentsCounter); // Any error-log is written inside. DatabaseConnector.databaseLock.unlock(); diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index f9a0e0f..dddc042 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -732,13 +732,12 @@ public class FileUtils { * Then, the program automatically generates "attempt" and "payload" records for these additional UrlReport-records. * It must be executed inside the same "database-locked" block of code, along with the inserts of the attempt and payload records. * */ - public boolean addUrlReportsByMatchingRecordsFromBacklog(List urlReports, List initialPayloads, long assignmentsBatchCounter) + public boolean addUrlReportsByMatchingRecordsFromBacklog(List urlReports, List initialPayloads, int numInitialPayloads, long assignmentsBatchCounter) { - int numInitialPayloads = initialPayloads.size(); logger.debug("numInitialPayloads: " + numInitialPayloads + " | assignmentsBatchCounter: " + assignmentsBatchCounter); // Create a HashMultimap, containing the "original_url" or "actual_url" as the key and the related "payload" objects as its values. - final SetMultimap urlToPayloadsMultimap = Multimaps.synchronizedSetMultimap(HashMultimap.create((numInitialPayloads / 3), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. + final HashMultimap urlToPayloadsMultimap = HashMultimap.create((numInitialPayloads / 3), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it. for ( Payload payload : initialPayloads ) { String original_url = payload.getOriginal_url(); String actual_url = payload.getActual_url(); @@ -791,7 +790,7 @@ public class FileUtils { } Set foundPayloads = urlToPayloadsMultimap.get(original_url); - // Select a random "foundPayload" to use its data to fill the "prefilledPayload" (in a "Set" the first element is pseudo-random) + // Select a random "foundPayload" to use its data to fill the "prefilledPayload" (in a "Set" the first element is pseudo-random). Optional optPayload = foundPayloads.stream().findFirst(); if ( !optPayload.isPresent() ) { logger.error("Could not retrieve any payload for the \"original_url\": " + original_url); @@ -803,8 +802,7 @@ public class FileUtils { prefilledPayloads.add(prefilledPayload); }); } catch (EmptyResultDataAccessException erdae) { - String errorMsg = "No results retrieved from the \"getDataForPayloadPrefillQuery\", when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + "."; - logger.error(errorMsg); + logger.error("No results retrieved from the \"getDataForPayloadPrefillQuery\", when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + "."); return false; } catch (Exception e) { DatabaseConnector.handleQueryException("getDataForPayloadPrefillQuery", getDataForPayloadPrefillQuery, e); @@ -813,8 +811,7 @@ public class FileUtils { int numPrefilledPayloads = prefilledPayloads.size(); if ( numPrefilledPayloads == 0 ) { - String errorMsg = "Some results were retrieved from the \"getDataForPayloadPrefillQuery\", but no data could be extracted from them, when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + "."; - logger.error(errorMsg); + logger.error("Some results were retrieved from the \"getDataForPayloadPrefillQuery\", but no data could be extracted from them, when trying to prefill payloads, from assignment_" + assignmentsBatchCounter + "."); return false; } diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index 0702eb5..fccfc62 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -170,11 +170,12 @@ public class ParquetFileUtils { if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful ) { - List currentPayloads = urlReports.parallelStream() + List initialPayloads = urlReports.parallelStream() .map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null))) .collect(Collectors.toList()); - if ( currentPayloads.size() > 0 ) // If at least 1 payload was created by the processed records.. + int numInitialPayloads = initialPayloads.size(); + if ( numInitialPayloads > 0 ) // If at least 1 payload was created by the processed records.. { // (it's ok to have no payloads, if there were no full-texts available) // At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved. if ( (payloadsSchema == null) // Parse the schema if it's not already parsed. @@ -184,7 +185,7 @@ public class ParquetFileUtils { } // The UrlsReports for the pre-filled are added only here, since we do not want attempt records fo these. - fileUtils.addUrlReportsByMatchingRecordsFromBacklog(urlReports, currentPayloads, curReportAssignments); // This will add more Object in the update the "urlReports" list. + fileUtils.addUrlReportsByMatchingRecordsFromBacklog(urlReports, initialPayloads, numInitialPayloads, curReportAssignments); // This will add more Object in the update the "urlReports" list. // In case the above method returns an error, nothing happens. We just have only the initial payloads to insert to the DB. int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.33); // We want 3 sub-lists for the payloads.