- Add a scheduling job to delete assignments older than 7 days. These may be left behind when the worker throws a "SocketTimeoutException" before it can receive the assignments and process them. No workerReport gets created for those assignments.

- Improve some log-messages.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2023-10-30 12:29:54 +02:00
parent 856c62887d
commit db929d8931
3 changed files with 40 additions and 16 deletions

View File

@ -31,9 +31,9 @@ import java.util.concurrent.TimeUnit;
@SpringBootApplication
@EnableScheduling
public class Application {
public class UrlsControllerApplication {
private static final Logger logger = LoggerFactory.getLogger(Application.class);
private static final Logger logger = LoggerFactory.getLogger(UrlsControllerApplication.class);
@Autowired
HikariDataSource hikariDataSource;
@ -42,7 +42,7 @@ public class Application {
public static void main(String[] args) {
context = SpringApplication.run(Application.class, args);
context = SpringApplication.run(UrlsControllerApplication.class, args);
}
@Bean

View File

@ -2,7 +2,7 @@ package eu.openaire.urls_controller.components;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import eu.openaire.urls_controller.Application;
import eu.openaire.urls_controller.UrlsControllerApplication;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.BulkImportController;
import eu.openaire.urls_controller.controllers.ShutdownController;
@ -24,10 +24,7 @@ import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -48,7 +45,7 @@ public class ScheduledTasks {
UrlsServiceImpl urlsService;
@Autowired
private Application application;
private UrlsControllerApplication urlsControllerApplication;
private final StatsController statsController;
@ -173,7 +170,7 @@ public class ScheduledTasks {
// Any left-over worker-reports are kept to be retried next time the Controller starts.
application.gentleAppShutdown();
urlsControllerApplication.gentleAppShutdown();
}
@ -232,6 +229,25 @@ public class ScheduledTasks {
}
@Scheduled(initialDelay = 604_800_000, fixedDelay = 604_800_000) // Run every 7 days.
//@Scheduled(initialDelay = 1_200_000, fixedDelay = 1_200_000) // Just for testing (every 1200 secs).
public void checkAndDeleteUnhandledAssignments()
{
// Remove the assignments having a "date" older than 7 days.
// For some reason, sometimes, the worker cannot receive the assignments in time.
// In this case, no job is created for those assignments nd no workerReport gets stored in storage.
// The assignments just remain in the table, and the urls cannot be rechecked.
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DAY_OF_MONTH, -7); // Subtract 7 days from current.
DatabaseConnector.databaseLock.lock();
urlsService.deleteAssignmentsWithOlderDate(calendar.getTimeInMillis()); // Any error-log is written inside.
DatabaseConnector.databaseLock.unlock();
}
// Scheduled Metrics for Prometheus.
// Prometheus scrapes for metrics usually every 15 seconds, but that is an extremely short time-period for DB-statistics.

View File

@ -212,7 +212,7 @@ public class UrlsServiceImpl implements UrlsService {
else if ( assignmentsSize < assignmentsLimit )
logger.warn("The retrieved results were fewer (" + assignmentsSize + ") than the \"assignmentsLimit\" (" + assignmentsLimit + "), for worker with id: " + workerId + ". Will increase the \"maxAttempts\" to " + maxAttemptsPerRecordAtomic.incrementAndGet() + ", for the next requests.");
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker.");
logger.debug("Finished gathering " + assignmentsSize + " assignments for worker with id \"" + workerId + "\". Going to insert them into the \"assignment\" table and then return them to the worker. | assignments_" + curAssignmentsBatchCounter);
// Write the Assignment details to the assignment-table.
String insertAssignmentsQuery = "insert into " + DatabaseConnector.databaseName + ".assignment \n select pubid, url, '" + workerId + "', " + curAssignmentsBatchCounter + ", " + timestampMillis
@ -236,7 +236,7 @@ public class UrlsServiceImpl implements UrlsService {
// We do not need to "merge" the parquet files for the "assignment" table here, since this happens every time we delete the assignments of a specific batch.
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table.");
logger.debug("Finished inserting " + assignmentsSize + " assignments into the \"assignment\"-table. | curAssignmentsBatchCounter: " + curAssignmentsBatchCounter + " | worker: " + workerId);
// Due to the fact that one publication with an id-url pair can be connected with multiple datasources, the results returned from the query may be duplicates.
// So, we apply a post-processing step where we collect only one instance of each id-url pair and send it to the Worker.
@ -410,9 +410,7 @@ public class UrlsServiceImpl implements UrlsService {
return false;
}
// For every "numOfWorkers" assignment-batches that go to workers, we merge the tables, once a workerReport comes in.
// After the first few increases of "assignmentsBatchCounter" until all workers get assignment-batches,
// there will always be a time when the counter will be just before the "golden-value" and then one workerReport has to be processed here and the counter will be incremented by one and signal the merging-time.
// For every "numOfWorkers" assignment-batches that get processed, we merge the "attempts" and "payload_aggregated" tables.
if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) // The workersNum will not be zero!
if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem) )
// The "postReportResultToWorker()" was called inside.
@ -508,9 +506,19 @@ public class UrlsServiceImpl implements UrlsService {
// This will delete the rows of the "assignment" table which refer to the "givenAssignmentsBatchCounter".
// As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER "givenAssignmentsBatchCounter" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the "payload" table for previously handled tasks.
// We don't need to keep the assignment-info forever, as the "findAssignmentsQuery" checks the "payload" table for previously handled tasks.
return fileUtils.mergeParquetFiles("assignment", " WHERE assignments_batch_counter != ", givenAssignmentsBatchCounter);
}
public String deleteAssignmentsWithOlderDate(long givenDate)
{
// This will delete the rows of the "assignment" table which are older than "givenDate".
// As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to NEWER than "givenDate" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info forever, as the "findAssignmentsQuery" checks the "payload" table for previously handled tasks.
return fileUtils.mergeParquetFiles("assignment", " WHERE `date` >= ", givenDate);
}
private static final RestTemplate restTemplate = new RestTemplate();