- Allow to easily change the por used by workers.

- Show the number of active background-tasks and bulkImportDirs, which delay the Service's shutdown.
- Update dependencies.
- Code polishing.
master 2.6.1
Lampros Smyrnaios 4 months ago
parent d90ad51609
commit 2e60128084

@ -52,7 +52,7 @@ dependencies {
implementation("org.apache.commons:commons-compress:1.25.0") {
exclude group: 'com.github.luben', module: 'zstd-jni'
implementation 'com.github.luben:zstd-jni:1.5.5-10' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often.
implementation 'com.github.luben:zstd-jni:1.5.5-11' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often.
implementation 'io.minio:minio:8.5.7'
@ -120,7 +120,7 @@ dependencies {
// https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus
runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.12.0'
runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.12.1'
testImplementation 'org.springframework.security:spring-security-test'
testImplementation "org.springframework.boot:spring-boot-starter-test"

@ -1,6 +1,6 @@

@ -26,7 +26,7 @@ if [[ justRun -eq 1 && shouldRunInDocker -eq 1 ]]; then
if [[ justRun -eq 0 ]]; then

@ -60,6 +60,7 @@ public class UrlsControllerApplication {
public void gentleAppShutdown()
logger.info("Shutting down the app..");
int exitCode = 0;
try {

@ -129,7 +129,7 @@ public class ScheduledTasks {
logger.error("IOOBE for background task_" + i + " in the futures-list! " + ioobe.getMessage());
// Only here, the "future" will be null.
} finally {
if ( future != null )
if ( future != null ) // It may be null in case we have a IOBE.
futuresToDelete.add(future); // Do not delete them directly here, as the indexes will get messed up and we will get "IOOBE".
@ -152,8 +152,11 @@ public class ScheduledTasks {
return; // Either the service was never instructed to shut down, or the user canceled the request.
// Check whether there are still background tasks to be processed. Either workerReport or Bulk-import requests.
if ( UrlsController.futuresOfBackgroundTasks.size() > 0 )
int numOfFutures = UrlsController.futuresOfBackgroundTasks.size();
if ( numOfFutures > 0 ) {
logger.debug("There are still " + numOfFutures + " backgroundTasks waiting to be executed or have their status checked..");
// Here, the above may have given a result of < 0 >, but a new task may be asked for execution right next and still await for execution..
// The crawling-jobs can be safely finish, by avoiding to shut-down as long as at least one worker is still running (waiting for the Controller to verify that the assignments-batch is completed).
@ -161,8 +164,11 @@ public class ScheduledTasks {
// So the Controller will now shut down if either of takes-types have not finished.
// Check whether there are any active bulk-import procedures.
if ( BulkImportController.bulkImportDirsUnderProcessing.size() > 0 )
int numOfBulkImportDirsUnderProcessing = BulkImportController.bulkImportDirsUnderProcessing.size();
if ( numOfBulkImportDirsUnderProcessing > 0 ) {
logger.debug("There are still " + numOfBulkImportDirsUnderProcessing + " bulkImportDirsUnderProcessing..");
// Check whether the workers have not shutdown yet, which means that they either crawl assignments or/and they are waiting for the Controller to process the WorkerReport and then shutdown.
Set<String> workerIds = UrlsController.workersInfoMap.keySet();
@ -249,7 +255,7 @@ public class ScheduledTasks {
// The assignments just remain in the table, and the urls cannot be rechecked.
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DAY_OF_MONTH, - 3); // Subtract <daysToWaitBeforeDeletion> from current Date.
calendar.add(Calendar.DAY_OF_MONTH, - 3); // Subtract 3 from current Date.
urlsService.deleteAssignmentsWithOlderDate(calendar.getTimeInMillis()); // Any error-log is written inside.

@ -96,7 +96,7 @@ public class UrlsController {
if ( ShutdownController.shouldShutdownService ) {
// There might be the case that the Controller has not sent shutDown requests to the Workers yet, or it has, BUT:
// 1) A worker requests for new assignments before the shutDown request is handled by its side.
// 1) A worker requests for new assignments, before it can handle the shutDown request given to it.
// 2) A new Worker joins the Service (unexpected, but anyway).
String warnMsg = "The Service is about to shutdown, after all under-processing assignments and/or bulkImport requests are handled. No new requests are accepted!";
logger.warn(warnMsg); // It's likely not an actual error, but still it's not accepted.

@ -5,6 +5,7 @@ import eu.openaire.urls_controller.models.WorkerInfo;
import eu.openaire.urls_controller.util.UriBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
@ -25,6 +26,10 @@ public class ShutdownServiceImpl implements ShutdownService {
private static final Pattern PRIVATE_IP_ADDRESSES_RFC_1918 = Pattern.compile("(?:10.|172.(?:1[6-9]|2[0-9]|3[0-1])|192.168.)[0-9.]+");
private String workerPort;
public ResponseEntity<?> passSecurityChecks(String remoteAddr, String initMsg)
// In case the Controller is running inside a docker container, and we want to send the "shutdownServiceRequest" from the terminal (with curl), without entering inside the container,
@ -45,7 +50,7 @@ public class ShutdownServiceImpl implements ShutdownService {
if ( ! workerInfo.getHasShutdown() ) // A worker may have shutdown on its own (by sending it a shutDown request manually), so it will have told the Controller when it shut down. In case of a Worker-crash, the Controller will not know about it.
postShutdownOrCancelRequestToWorker(workerId, workerInfo.getWorkerIP(), shouldCancel);
logger.warn("Will not post " + (shouldCancel ? "Cancel-" : "") + " ShutdownRequest to Worker \"" + workerId + "\", since is it has already shutdown.");
logger.warn("Will not post " + (shouldCancel ? "Cancel-" : "") + "ShutdownRequest to Worker \"" + workerId + "\", since is it has already shutdown.");
@ -54,7 +59,7 @@ public class ShutdownServiceImpl implements ShutdownService {
public boolean postShutdownOrCancelRequestToWorker(String workerId, String workerIp, boolean shouldCancel)
String url = "http://" + workerIp + ":1881/api/" + (shouldCancel ? "cancelShutdownWorker" : "shutdownWorker");
String url = "http://" + workerIp + ":" + workerPort + "/api/" + (shouldCancel ? "cancelShutdownWorker" : "shutdownWorker");
try {
ResponseEntity<?> responseEntity = restTemplate.postForEntity(url, null, String.class);
int responseCode = responseEntity.getStatusCodeValue();

@ -93,6 +93,7 @@ public class StatsServiceImpl implements StatsService {
// To get the human-friendly timestamp format from the BigInt in the database:
// select from_timestamp(CAST(CAST(`date` as decimal(30,0))/1000 AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload
// Or simpler: select from_timestamp(CAST((`date`/1000) AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload

@ -58,6 +58,9 @@ public class UrlsServiceImpl implements UrlsService {
private String workerReportsDirPath;
private String workerPort;
public static final AtomicLong assignmentsBatchCounter = new AtomicLong(0);
private final AtomicInteger maxAttemptsPerRecordAtomic;
@ -534,7 +537,7 @@ public class UrlsServiceImpl implements UrlsService {
logger.error("Could not find any info for worker with id: \"" + workerId +"\".");
return false;
String url = "http://" + workerInfo.getWorkerIP() + ":1881/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will NOT be null.
String url = "http://" + workerInfo.getWorkerIP() + ":" + workerPort + "/api/addReportResultToWorker/" + assignmentRequestCounter; // This workerIP will NOT be null.
if ( logger.isTraceEnabled() )
logger.trace("Going to \"postReportResultToWorker\": \"" + workerId + "\", for assignments_" + assignmentRequestCounter + ((errorMsg != null) ? "\nError: " + errorMsg : ""));

@ -61,6 +61,10 @@ public class FileUtils {
private FileDecompressor fileDecompressor;
private String workerPort;
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}
@ -326,7 +330,7 @@ public class FileUtils {
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + workerIp + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
// TODO - The worker should send the port in which it accepts requests, along with the current request.
// TODO - The least we have to do it to expose the port-assignment somewhere more obvious like inside the "application.yml" file.

@ -34,7 +34,8 @@ services:
bucketName: XA
shouldEmptyBucket: false
shouldShowAllS3Buckets: true
port: 1881
baseBulkImportLocation: /mnt/bulk_import/
