- Allow the user to set a maximum number of assignments-batches for the Worker to handle. After handling those batches, the Worker will shut down. A number of < 0 > indicates an infinite number of batches.

- Avoid converting the zero fileSize to < null >. Now, the default value is < null >, so the zero-value will indicate a zero-byte file.
- Update dependencies.
- Code cleanup.
This commit is contained in:
Lampros Smyrnaios 2021-12-24 00:12:34 +02:00
parent a8e2ddcf54
commit 1ddfd34236
7 changed files with 49 additions and 20 deletions

View File

@ -1,5 +1,5 @@
plugins { plugins {
id 'org.springframework.boot' version '2.6.1' id 'org.springframework.boot' version '2.6.2'
id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java' id 'java'
} }

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.2-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

View File

@ -28,7 +28,7 @@ if [[ ! -f $inputDataFile ]]; then
echo -e "\n\n" echo -e "\n\n"
fi fi
gradleVersion="7.3.2" gradleVersion="7.3.3"
if [[ justInstall -eq 0 ]]; then if [[ justInstall -eq 0 ]]; then

View File

@ -12,6 +12,7 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
@ -39,8 +40,11 @@ public class UrlsWorkerApplication {
private static final String inputDataFilePath = FileUtils.workingDir + "inputData.txt"; private static final String inputDataFilePath = FileUtils.workingDir + "inputData.txt";
public static String workerId = null; public static String workerId = null;
public static int maxAssignmentsLimitPerBatch = 0; public static int maxAssignmentsLimitPerBatch = 0;
public static int maxAssignmentsBatchesToHandleBeforeRestart = -1; // Default value: -1 = argument-absent, 0 = infinite-batches
public static String controllerBaseUrl = null; // BaseUrl template: "http://IP:PORT/api/" public static String controllerBaseUrl = null; // BaseUrl template: "http://IP:PORT/api/"
private static ConfigurableApplicationContext context;
public static void main(String[] args) { public static void main(String[] args) {
@ -48,7 +52,7 @@ public class UrlsWorkerApplication {
new PublicationsRetrieverPlugin(); new PublicationsRetrieverPlugin();
new AssignmentsHandler(); new AssignmentsHandler();
SpringApplication.run(UrlsWorkerApplication.class, args); context = SpringApplication.run(UrlsWorkerApplication.class, args);
Runtime javaRuntime = Runtime.getRuntime(); Runtime javaRuntime = Runtime.getRuntime();
logger.debug("HeapSize: " + javaRuntime.totalMemory()); logger.debug("HeapSize: " + javaRuntime.totalMemory());
@ -57,12 +61,19 @@ public class UrlsWorkerApplication {
} }
public static void gentleShutdown()
{
int exitCode = SpringApplication.exit(context, () -> 0); // The "PreDestroy" method will be called.
System.exit(exitCode);
}
@PreDestroy @PreDestroy
public static void preDestroy() public static void preDestroy()
{ {
if ( PublicationsRetriever.executor != null ) if ( PublicationsRetriever.executor != null )
{ {
logger.info("Shutting down the threads.."); logger.info("Shutting down the threads used by \"PublicationsRetriever\"-plugin..");
PublicationsRetriever.executor.shutdown(); // Define that no new tasks will be scheduled. PublicationsRetriever.executor.shutdown(); // Define that no new tasks will be scheduled.
try { try {
if ( !PublicationsRetriever.executor.awaitTermination(1, TimeUnit.MINUTES) ) { if ( !PublicationsRetriever.executor.awaitTermination(1, TimeUnit.MINUTES) ) {
@ -115,7 +126,7 @@ public class UrlsWorkerApplication {
myReader = new Scanner(inputDataFile); myReader = new Scanner(inputDataFile);
if ( myReader.hasNextLine() ) { if ( myReader.hasNextLine() ) {
String[] data = myReader.nextLine().split(","); String[] data = myReader.nextLine().split(",");
if ( data.length < 3 ) { if ( data.length < 4 ) {
String errorMsg = "Not all data were retrieved from file \"" + inputDataFilePath + "\"!"; String errorMsg = "Not all data were retrieved from file \"" + inputDataFilePath + "\"!";
logger.error(errorMsg); logger.error(errorMsg);
System.err.println(errorMsg); System.err.println(errorMsg);
@ -129,7 +140,14 @@ public class UrlsWorkerApplication {
logger.warn("The given \"maxAssignmentsLimitPerBatch\" (" + maxAssignmentsLimitStr + ") was not a number! Will use the default one: " + WorkerConstants.ASSIGNMENTS_LIMIT); logger.warn("The given \"maxAssignmentsLimitPerBatch\" (" + maxAssignmentsLimitStr + ") was not a number! Will use the default one: " + WorkerConstants.ASSIGNMENTS_LIMIT);
maxAssignmentsLimitPerBatch = WorkerConstants.ASSIGNMENTS_LIMIT; maxAssignmentsLimitPerBatch = WorkerConstants.ASSIGNMENTS_LIMIT;
} }
controllerBaseUrl = data[2].trim(); String maxAssignmentsBatchesStr = data[2].trim();
try {
maxAssignmentsBatchesToHandleBeforeRestart = Integer.parseInt(maxAssignmentsBatchesStr);
} catch (NumberFormatException nfe) {
logger.warn("The given \"maxAssignmentsBatchesToHandleBeforeRestart\" (" + maxAssignmentsBatchesStr + ") was not a number! Will handle an infinite number of batches!");
maxAssignmentsBatchesToHandleBeforeRestart = 0;
}
controllerBaseUrl = data[3].trim();
try { try {
new URL(controllerBaseUrl); new URL(controllerBaseUrl);
} catch (MalformedURLException mue) { } catch (MalformedURLException mue) {
@ -142,14 +160,14 @@ public class UrlsWorkerApplication {
controllerBaseUrl += "/"; // Make sure the other urls will not break later. controllerBaseUrl += "/"; // Make sure the other urls will not break later.
} }
if ( (workerId == null) || (maxAssignmentsLimitPerBatch == 0) || (controllerBaseUrl == null) ) { if ( (workerId == null) || (maxAssignmentsLimitPerBatch == 0) || (maxAssignmentsBatchesToHandleBeforeRestart == -1) || (controllerBaseUrl == null) ) {
String errorMsg = "No \"workerId\" or/and \"maxAssignmentsLimitPerBatch\" or/and \"controllerBaseUrl\" could be retrieved from the file: " + inputDataFilePath; String errorMsg = "No \"workerId\" or/and \"maxAssignmentsLimitPerBatch\" or/and \"maxAssignmentsBatchesToHandleBeforeRestart\" or/and \"controllerBaseUrl\" could be retrieved from the file: " + inputDataFilePath;
logger.error(errorMsg); logger.error(errorMsg);
System.err.println(errorMsg); System.err.println(errorMsg);
System.exit(63); System.exit(63);
} }
logger.info("workerId: " + workerId + ", maxAssignmentsLimitPerBatch: " + maxAssignmentsLimitPerBatch + ", controllerBaseUrl: " + controllerBaseUrl); // It's safe and helpful to show them in the logs. logger.info("workerId: " + workerId + ", maxAssignmentsLimitPerBatch: " + maxAssignmentsLimitPerBatch + ", maxAssignmentsBatchesToHandleBeforeRestart: " + maxAssignmentsBatchesToHandleBeforeRestart + ", controllerBaseUrl: " + controllerBaseUrl); // It's safe and helpful to show them in the logs.
} catch (Exception e) { } catch (Exception e) {
String errorMsg = "An error prevented the retrieval of the workerId and the controllerBaseUrl from the file: " + inputDataFilePath + "\n" + e.getMessage(); String errorMsg = "An error prevented the retrieval of the workerId and the controllerBaseUrl from the file: " + inputDataFilePath + "\n" + e.getMessage();

View File

@ -1,5 +1,6 @@
package eu.openaire.urls_worker.components; package eu.openaire.urls_worker.components;
import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.controllers.FullTextsController; import eu.openaire.urls_worker.controllers.FullTextsController;
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin; import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
import eu.openaire.urls_worker.util.AssignmentsHandler; import eu.openaire.urls_worker.util.AssignmentsHandler;
@ -32,8 +33,15 @@ public class ScheduledTasks {
@Scheduled(fixedRate = 900_000) // Every 15 mins: 900_000 @Scheduled(fixedRate = 900_000) // Every 15 mins: 900_000
public void handleNewAssignments() { public void handleNewAssignments() {
if ( AssignmentsHandler.isAvailableForWork ) if ( AssignmentsHandler.isAvailableForWork )
AssignmentsHandler.handleAssignments(); {
else { if ( (UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart == 0) // Infinite batches.
|| (AssignmentsHandler.numHandledAssignmentsBatches < UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart) )
AssignmentsHandler.handleAssignments();
else {
logger.info("The maximum assignments-batches (" + UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeRestart + ") to be handled was reached! Shut down, in order for the external Linux-service to restart on its own..");
UrlsWorkerApplication.gentleShutdown();
}
} else {
//logger.debug("The worker is not available for work at the moment.."); // JUST FOR DEBUG! //logger.debug("The worker is not available for work at the moment.."); // JUST FOR DEBUG!
} }
} }

View File

@ -151,7 +151,7 @@ public class PublicationsRetrieverPlugin {
if ( "true".equals(data.getWasDocumentOrDatasetAccessible()) ) // The reversed order defends against a potential NPE. if ( "true".equals(data.getWasDocumentOrDatasetAccessible()) ) // The reversed order defends against a potential NPE.
{ {
status = UrlReport.StatusType.accessible; status = UrlReport.StatusType.accessible;
if ( comment.contains(UrlUtils.alreadyDownloadedByIDMessage) ) { if ( comment.startsWith(UrlUtils.alreadyDownloadedByIDMessage, 0) ) {
// The file of this docUrl was already downloaded by another docUrl. // The file of this docUrl was already downloaded by another docUrl.
String initialId = comment.substring(UrlUtils.alreadyDownloadedByIDMessage.length()); // The fileName starts right after the "message". String initialId = comment.substring(UrlUtils.alreadyDownloadedByIDMessage.length()); // The fileName starts right after the "message".
//logger.debug("initialId: " + initialId); // DEBUG! //logger.debug("initialId: " + initialId); // DEBUG!
@ -171,7 +171,7 @@ public class PublicationsRetrieverPlugin {
fileLocation = comment; // This is the full-file-path. fileLocation = comment; // This is the full-file-path.
mimeType = "application/pdf"; mimeType = "application/pdf";
} else // Else the file was not retrieved, so all file-related data are kept "null". } else // Else the file was not retrieved, so all file-related data are kept "null".
error = new Error(Error.ErrorType.couldRetry, comment); // We can still try to download it in the future. error = new Error(Error.ErrorType.couldRetry, comment); // We can still try to download it from the found docUrl, in the future.
if ( error == null ) // If the file was retrieved, in any time. if ( error == null ) // If the file was retrieved, in any time.
error = new Error(Error.ErrorType.couldRetry, null); // We do not want to send a "null" object, since it just adds more complicated handling in the controller.. error = new Error(Error.ErrorType.couldRetry, null); // We do not want to send a "null" object, since it just adds more complicated handling in the controller..
@ -188,10 +188,7 @@ public class PublicationsRetrieverPlugin {
if ( docOrDatasetUrl.equals(UrlUtils.unreachableDocOrDatasetUrlIndicator) || docOrDatasetUrl.equals(UrlUtils.duplicateUrlIndicator) ) if ( docOrDatasetUrl.equals(UrlUtils.unreachableDocOrDatasetUrlIndicator) || docOrDatasetUrl.equals(UrlUtils.duplicateUrlIndicator) )
docOrDatasetUrl = null; docOrDatasetUrl = null;
// Cleanup some data. // Convert "null" strings to actual < null >
if ( (size != null) && (size == 0L) )
size = null;
if ( (hash != null) && (hash.equals("null")) ) if ( (hash != null) && (hash.equals("null")) )
hash = null; hash = null;

View File

@ -38,6 +38,8 @@ public class AssignmentsHandler {
public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build(); public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build();
public static long numHandledAssignmentsBatches = 0; // No need to be synchronized.
public AssignmentsHandler() public AssignmentsHandler()
{ {
@ -124,6 +126,8 @@ public class AssignmentsHandler {
else else
postWorkerReport(assignmentRequestCounter); postWorkerReport(assignmentRequestCounter);
numHandledAssignmentsBatches ++; // This is used later to stop this app, when a user-defined upper limit is reached.
isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run. isAvailableForWork = true; // State this after posting, to avoid breaking the "UrlReports" in the current or the next run.
// Also, since the worker has limited resources, it's better to finish sending the full-texts first and then request a new batch of assignments. // Also, since the worker has limited resources, it's better to finish sending the full-texts first and then request a new batch of assignments.
@ -139,7 +143,10 @@ public class AssignmentsHandler {
try { try {
ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class); ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class);
int responseCode = responseEntity.getStatusCodeValue(); int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode != HttpStatus.OK.value() ) { if ( responseCode == HttpStatus.OK.value() ) {
logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, and the full-text delivering, were successful!");
return true;
} else {
logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode); logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode);
return false; return false;
} }
@ -150,7 +157,6 @@ public class AssignmentsHandler {
urlReports.clear(); // Reset, without de-allocating. urlReports.clear(); // Reset, without de-allocating.
assignmentsForPlugins.clear(); assignmentsForPlugins.clear();
} }
return true;
} }