From db929d893177498695385cc55ee6ee41cc92e606 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Mon, 30 Oct 2023 12:29:54 +0200 Subject: [PATCH] - Add a scheduling job to delete assignments older than 7 days. These may be left behind when the worker throws a "SocketTimeoutException" before it can receive the assignments and process them. No workerReport gets created for those assignments. - Improve some log-messages. - Code polishing. --- ...on.java => UrlsControllerApplication.java} | 6 ++-- .../components/ScheduledTasks.java | 30 ++++++++++++++----- .../services/UrlsServiceImpl.java | 20 +++++++++---- 3 files changed, 40 insertions(+), 16 deletions(-) rename src/main/java/eu/openaire/urls_controller/{Application.java => UrlsControllerApplication.java} (97%) diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/UrlsControllerApplication.java similarity index 97% rename from src/main/java/eu/openaire/urls_controller/Application.java rename to src/main/java/eu/openaire/urls_controller/UrlsControllerApplication.java index d072610..4b6c4b7 100644 --- a/src/main/java/eu/openaire/urls_controller/Application.java +++ b/src/main/java/eu/openaire/urls_controller/UrlsControllerApplication.java @@ -31,9 +31,9 @@ import java.util.concurrent.TimeUnit; @SpringBootApplication @EnableScheduling -public class Application { +public class UrlsControllerApplication { - private static final Logger logger = LoggerFactory.getLogger(Application.class); + private static final Logger logger = LoggerFactory.getLogger(UrlsControllerApplication.class); @Autowired HikariDataSource hikariDataSource; @@ -42,7 +42,7 @@ public class Application { public static void main(String[] args) { - context = SpringApplication.run(Application.class, args); + context = SpringApplication.run(UrlsControllerApplication.class, args); } @Bean diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index 4da7174..171d4a2 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -2,7 +2,7 @@ package eu.openaire.urls_controller.components; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; -import eu.openaire.urls_controller.Application; +import eu.openaire.urls_controller.UrlsControllerApplication; import eu.openaire.urls_controller.configuration.DatabaseConnector; import eu.openaire.urls_controller.controllers.BulkImportController; import eu.openaire.urls_controller.controllers.ShutdownController; @@ -24,10 +24,7 @@ import org.springframework.stereotype.Component; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -48,7 +45,7 @@ public class ScheduledTasks { UrlsServiceImpl urlsService; @Autowired - private Application application; + private UrlsControllerApplication urlsControllerApplication; private final StatsController statsController; @@ -173,7 +170,7 @@ public class ScheduledTasks { // Any left-over worker-reports are kept to be retried next time the Controller starts. - application.gentleAppShutdown(); + urlsControllerApplication.gentleAppShutdown(); } @@ -232,6 +229,25 @@ public class ScheduledTasks { } + @Scheduled(initialDelay = 604_800_000, fixedDelay = 604_800_000) // Run every 7 days. + //@Scheduled(initialDelay = 1_200_000, fixedDelay = 1_200_000) // Just for testing (every 1200 secs). + public void checkAndDeleteUnhandledAssignments() + { + // Remove the assignments having a "date" older than 7 days. + + // For some reason, sometimes, the worker cannot receive the assignments in time. + // In this case, no job is created for those assignments nd no workerReport gets stored in storage. + // The assignments just remain in the table, and the urls cannot be rechecked. + + Calendar calendar = Calendar.getInstance(); + calendar.add(Calendar.DAY_OF_MONTH, -7); // Subtract 7 days from current. + + DatabaseConnector.databaseLock.lock(); + urlsService.deleteAssignmentsWithOlderDate(calendar.getTimeInMillis()); // Any error-log is written inside. + DatabaseConnector.databaseLock.unlock(); + } + + // Scheduled Metrics for Prometheus. // Prometheus scrapes for metrics usually every 15 seconds, but that is an extremely short time-period for DB-statistics. diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 228e6d0..bc0abf4 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -212,7 +212,7 @@ public class UrlsServiceImpl implements UrlsService { else if ( assignmentsSize < assignmentsLimit ) logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + ", for the next requests."); - logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker."); + logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker. | assignments_" + curAssignmentsBatchCounter); // Write the Assignment details to the assignment-table. String insertAssignmentsQuery = "insert into " + DatabaseConnector.databaseName + ".assignment \n select pubid, url, '" + workerId + "', " + curAssignmentsBatchCounter + ", " + timestampMillis @@ -236,7 +236,7 @@ public class UrlsServiceImpl implements UrlsService { // We do not need to "merge" the parquet files for the "assignment" table here, since this happens every time we delete the assignments of a specific batch. - logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table."); + logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. | curAssignmentsBatchCounter: " + curAssignmentsBatchCounter + " | worker: " + workerId); // Due to the fact that one publication with an id-url pair can be connected with multiple datasources, the results returned from the query may be duplicates. // So, we apply a post-processing step where we collect only one instance of each id-url pair and send it to the Worker. @@ -410,9 +410,7 @@ public class UrlsServiceImpl implements UrlsService { return false; } - // For every "numOfWorkers" assignment-batches that go to workers, we merge the tables, once a workerReport comes in. - // After the first few increases of "assignmentsBatchCounter" until all workers get assignment-batches, - // there will always be a time when the counter will be just before the "golden-value" and then one workerReport has to be processed here and the counter will be incremented by one and signal the merging-time. + // For every "numOfWorkers" assignment-batches that get processed, we merge the "attempts" and "payload_aggregated" tables. if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) // The workersNum will not be zero! if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem) ) // The "postReportResultToWorker()" was called inside. @@ -508,9 +506,19 @@ public class UrlsServiceImpl implements UrlsService { // This will delete the rows of the "assignment" table which refer to the "givenAssignmentsBatchCounter". // As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data. // Only the rows referring to OTHER "givenAssignmentsBatchCounter" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table. - // We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload" table for previously handled tasks. + // We don't need to keep the assignment-info forever, as the "findAssignmentsQuery" checks the "payload" table for previously handled tasks. return fileUtils.mergeParquetFiles("assignment", " WHERE assignments_batch_counter != ", givenAssignmentsBatchCounter); } + + + public String deleteAssignmentsWithOlderDate(long givenDate) + { + // This will delete the rows of the "assignment" table which are older than "givenDate". + // As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data. + // Only the rows referring to NEWER than "givenDate" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table. + // We don't need to keep the assignment-info forever, as the "findAssignmentsQuery" checks the "payload" table for previously handled tasks. + return fileUtils.mergeParquetFiles("assignment", " WHERE `date` >= ", givenDate); + } private static final RestTemplate restTemplate = new RestTemplate();