- Allow the user to set the "maxAssignmentsLimitPerBatch" value.
- Set increased lower and upper limits for the Java Heap Size. - Update the "ServerBaseURL" to the Public IP Address of the machine which is running the app. - Improve two log-messages.
This commit is contained in:
parent
ce49bff50e
commit
fd5b56e3c6
|
@ -48,6 +48,11 @@ configurations {
|
||||||
all*.exclude group: 'ch.qos.logback'
|
all*.exclude group: 'ch.qos.logback'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set increased lower and upper limits for the java-execution.
|
||||||
|
tasks.withType(JavaExec) {
|
||||||
|
jvmArgs = ['-Xms512m', '-Xmx8g']
|
||||||
|
}
|
||||||
|
|
||||||
test {
|
test {
|
||||||
useJUnitPlatform()
|
useJUnitPlatform()
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,14 @@ if [[ ! -f $inputDataFile ]]; then
|
||||||
echo "Give the ID of this worker:"
|
echo "Give the ID of this worker:"
|
||||||
read -r workerId
|
read -r workerId
|
||||||
|
|
||||||
|
echo -e "\nGive the max-assignments-limit-per-batch for the Worker to handle: "
|
||||||
|
read -r maxAssignmentsLimitPerBatch
|
||||||
|
|
||||||
echo -e "\nGive the baseUrl of the controller (e.g.: http://IP:PORT/api/):"
|
echo -e "\nGive the baseUrl of the controller (e.g.: http://IP:PORT/api/):"
|
||||||
read -r controllerBaseUrl
|
read -r controllerBaseUrl
|
||||||
|
|
||||||
touch $inputDataFile
|
touch $inputDataFile
|
||||||
echo "$workerId,$controllerBaseUrl" >> $inputDataFile
|
echo "$workerId,$maxAssignmentsLimitPerBatch,$controllerBaseUrl" >> $inputDataFile
|
||||||
echo -e "\n\n"
|
echo -e "\n\n"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,9 @@ package eu.openaire.urls_worker;
|
||||||
import eu.openaire.publications_retriever.PublicationsRetriever;
|
import eu.openaire.publications_retriever.PublicationsRetriever;
|
||||||
import eu.openaire.publications_retriever.util.file.FileUtils;
|
import eu.openaire.publications_retriever.util.file.FileUtils;
|
||||||
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
||||||
|
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
||||||
import eu.openaire.urls_worker.util.UriBuilder;
|
import eu.openaire.urls_worker.util.UriBuilder;
|
||||||
|
import eu.openaire.urls_worker.util.WorkerConstants;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.boot.CommandLineRunner;
|
import org.springframework.boot.CommandLineRunner;
|
||||||
|
@ -34,6 +36,7 @@ public class UrlsWorkerApplication {
|
||||||
|
|
||||||
private static final String inputDataFilePath = FileUtils.workingDir + "inputData.txt";
|
private static final String inputDataFilePath = FileUtils.workingDir + "inputData.txt";
|
||||||
public static String workerId = null;
|
public static String workerId = null;
|
||||||
|
public static int maxAssignmentsLimitPerBatch = 0;
|
||||||
public static String controllerBaseUrl = null; // BaseUrl template: "http://IP:PORT/api/"
|
public static String controllerBaseUrl = null; // BaseUrl template: "http://IP:PORT/api/"
|
||||||
|
|
||||||
|
|
||||||
|
@ -41,6 +44,7 @@ public class UrlsWorkerApplication {
|
||||||
|
|
||||||
setInputData(); // This may cause the Server to terminate early, in case the workerId or the controllerBaseUrl cannot be found.
|
setInputData(); // This may cause the Server to terminate early, in case the workerId or the controllerBaseUrl cannot be found.
|
||||||
new PublicationsRetrieverPlugin();
|
new PublicationsRetrieverPlugin();
|
||||||
|
new AssignmentsHandler();
|
||||||
|
|
||||||
SpringApplication.run(UrlsWorkerApplication.class, args);
|
SpringApplication.run(UrlsWorkerApplication.class, args);
|
||||||
|
|
||||||
|
@ -109,26 +113,33 @@ public class UrlsWorkerApplication {
|
||||||
myReader = new Scanner(inputDataFile);
|
myReader = new Scanner(inputDataFile);
|
||||||
if ( myReader.hasNextLine() ) {
|
if ( myReader.hasNextLine() ) {
|
||||||
String[] data = myReader.nextLine().split(",");
|
String[] data = myReader.nextLine().split(",");
|
||||||
if ( data.length < 2 ) {
|
if ( data.length < 3 ) {
|
||||||
String errorMsg = "Not all data were retrieved from file \"" + inputDataFilePath + "\"!";
|
String errorMsg = "Not all data were retrieved from file \"" + inputDataFilePath + "\"!";
|
||||||
logger.error(errorMsg);
|
logger.error(errorMsg);
|
||||||
System.err.println(errorMsg);
|
System.err.println(errorMsg);
|
||||||
System.exit(61);
|
System.exit(61);
|
||||||
}
|
}
|
||||||
workerId = data[0].trim();
|
workerId = data[0].trim();
|
||||||
controllerBaseUrl = data[1].trim();
|
String maxAssignmentsLimitStr = data[1].trim();
|
||||||
|
try {
|
||||||
|
maxAssignmentsLimitPerBatch = Integer.parseInt(maxAssignmentsLimitStr);
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
logger.warn("The given \"maxAssignmentsLimitPerBatch\" (" + maxAssignmentsLimitStr + ") was not a number! Will use the default one: " + WorkerConstants.ASSIGNMENTS_LIMIT);
|
||||||
|
maxAssignmentsLimitPerBatch = WorkerConstants.ASSIGNMENTS_LIMIT;
|
||||||
|
}
|
||||||
|
controllerBaseUrl = data[2].trim();
|
||||||
if ( !controllerBaseUrl.endsWith("/") )
|
if ( !controllerBaseUrl.endsWith("/") )
|
||||||
controllerBaseUrl += "/"; // Make sure the whole urls will not break later.
|
controllerBaseUrl += "/"; // Make sure the other urls will not break later.
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( (workerId == null) || (controllerBaseUrl == null) ) {
|
if ( (workerId == null) || (maxAssignmentsLimitPerBatch == 0) || (controllerBaseUrl == null) ) {
|
||||||
String errorMsg = "No \"workerId\" or/and \"controllerBaseUrl\" could be retrieved from the file: " + inputDataFilePath;
|
String errorMsg = "No \"workerId\" or/and \"maxAssignmentsLimitPerBatch\" or/and \"controllerBaseUrl\" could be retrieved from the file: " + inputDataFilePath;
|
||||||
logger.error(errorMsg);
|
logger.error(errorMsg);
|
||||||
System.err.println(errorMsg);
|
System.err.println(errorMsg);
|
||||||
System.exit(62);
|
System.exit(62);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("workerId: " + workerId + ", controllerBaseUrl: " + controllerBaseUrl); // It's safe and helpful to show them in the logs.
|
logger.info("workerId: " + workerId + ", maxAssignmentsLimitPerBatch: " + maxAssignmentsLimitPerBatch + ", controllerBaseUrl: " + controllerBaseUrl); // It's safe and helpful to show them in the logs.
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errorMsg = "An error prevented the retrieval of the workerId and the controllerBaseUrl from the file: " + inputDataFilePath + "\n" + e.getMessage();
|
String errorMsg = "An error prevented the retrieval of the workerId and the controllerBaseUrl from the file: " + inputDataFilePath + "\n" + e.getMessage();
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class FullTextsController {
|
||||||
return ResponseEntity.badRequest().body(errorMsg);
|
return ResponseEntity.badRequest().body(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Received a \"getMultipleFullTexts\" request for returning a zip-file containing " + fileNamesListNum + " full-texts, from assignments_" + assignmentsCounter + ", for batch_" + zipBatchCounter);
|
logger.info("Received a \"getMultipleFullTexts\" request for returning a zip-file containing " + fileNamesListNum + " full-texts, from assignments_" + assignmentsCounter + ", for batch_" + zipBatchCounter + " (out of " + totalZipBatches + ").");
|
||||||
|
|
||||||
String currentAssignmentsBaseFullTextsPath = assignmentsBaseDir + "assignments_" + assignmentsCounter + "_fullTexts" + File.separator;
|
String currentAssignmentsBaseFullTextsPath = assignmentsBaseDir + "assignments_" + assignmentsCounter + "_fullTexts" + File.separator;
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ public class FullTextsController {
|
||||||
// If this is the last batch for this assignments-count, then make sure it is deleted in the next scheduled delete-operation.
|
// If this is the last batch for this assignments-count, then make sure it is deleted in the next scheduled delete-operation.
|
||||||
if ( zipBatchCounter == totalZipBatches ) {
|
if ( zipBatchCounter == totalZipBatches ) {
|
||||||
assignmentsNumsHandledAndLocallyDeleted.put(assignmentsCounter, false);
|
assignmentsNumsHandledAndLocallyDeleted.put(assignmentsCounter, false);
|
||||||
logger.debug("Will return the last batch of Assignments_" + assignmentsCounter + " to the Controller and these assignments will be deleted later.");
|
logger.debug("Will return the last batch (" + zipBatchCounter + ") of Assignments_" + assignmentsCounter + " to the Controller and these assignments will be deleted later.");
|
||||||
}
|
}
|
||||||
|
|
||||||
String contentType = request.getServletContext().getMimeType(zipFileFullPath);
|
String contentType = request.getServletContext().getMimeType(zipFileFullPath);
|
||||||
|
|
|
@ -40,7 +40,7 @@ public class GeneralController {
|
||||||
|
|
||||||
if ( AssignmentsHandler.isAvailableForWork ) {
|
if ( AssignmentsHandler.isAvailableForWork ) {
|
||||||
logger.info("The worker is available for an assignment.");
|
logger.info("The worker is available for an assignment.");
|
||||||
return ResponseEntity.status(200).body(new WorkerResponse(UrlsWorkerApplication.workerId, WorkerConstants.ASSIGNMENTS_LIMIT));
|
return ResponseEntity.status(200).body(new WorkerResponse(UrlsWorkerApplication.workerId, UrlsWorkerApplication.maxAssignmentsLimitPerBatch));
|
||||||
} else {
|
} else {
|
||||||
logger.info("The worker is busy with another assignment.");
|
logger.info("The worker is busy with another assignment.");
|
||||||
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
|
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
|
||||||
|
|
|
@ -9,13 +9,13 @@ import eu.openaire.publications_retriever.util.http.HttpConnUtils;
|
||||||
import eu.openaire.publications_retriever.util.url.DataToBeLogged;
|
import eu.openaire.publications_retriever.util.url.DataToBeLogged;
|
||||||
import eu.openaire.publications_retriever.util.url.LoaderAndChecker;
|
import eu.openaire.publications_retriever.util.url.LoaderAndChecker;
|
||||||
import eu.openaire.publications_retriever.util.url.UrlUtils;
|
import eu.openaire.publications_retriever.util.url.UrlUtils;
|
||||||
|
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
||||||
import eu.openaire.urls_worker.models.Assignment;
|
import eu.openaire.urls_worker.models.Assignment;
|
||||||
import eu.openaire.urls_worker.models.Error;
|
import eu.openaire.urls_worker.models.Error;
|
||||||
import eu.openaire.urls_worker.models.Payload;
|
import eu.openaire.urls_worker.models.Payload;
|
||||||
import eu.openaire.urls_worker.models.UrlReport;
|
import eu.openaire.urls_worker.models.UrlReport;
|
||||||
import eu.openaire.urls_worker.services.FileStorageService;
|
import eu.openaire.urls_worker.services.FileStorageService;
|
||||||
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
||||||
import eu.openaire.urls_worker.util.WorkerConstants;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -42,7 +42,7 @@ public class PublicationsRetrieverPlugin {
|
||||||
FileUtils.shouldDownloadDocFiles = true;
|
FileUtils.shouldDownloadDocFiles = true;
|
||||||
FileUtils.docFileNameType = FileUtils.DocFileNameType.idName;
|
FileUtils.docFileNameType = FileUtils.DocFileNameType.idName;
|
||||||
PublicationsRetriever.targetUrlType = "docUrl";
|
PublicationsRetriever.targetUrlType = "docUrl";
|
||||||
FileUtils.jsonBatchSize = WorkerConstants.ASSIGNMENTS_LIMIT;
|
FileUtils.jsonBatchSize = UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
|
||||||
|
|
||||||
assignmentsBasePath = FileStorageService.assignmentsLocation.toString();
|
assignmentsBasePath = FileStorageService.assignmentsLocation.toString();
|
||||||
if ( !assignmentsBasePath.endsWith(File.separator) )
|
if ( !assignmentsBasePath.endsWith(File.separator) )
|
||||||
|
|
|
@ -25,10 +25,10 @@ public class AssignmentsHandler {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(AssignmentsHandler.class);
|
private static final Logger logger = LoggerFactory.getLogger(AssignmentsHandler.class);
|
||||||
|
|
||||||
public static boolean isAvailableForWork = true;
|
public static boolean isAvailableForWork = true;
|
||||||
public static final List<UrlReport> urlReports = new ArrayList<>(WorkerConstants.ASSIGNMENTS_LIMIT);
|
public static List<UrlReport> urlReports = null;
|
||||||
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
|
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
|
||||||
private static final int expectedAssignmentsPerDatasource = (WorkerConstants.ASSIGNMENTS_LIMIT / expectedDatasourcesPerRequest);
|
private static int expectedAssignmentsPerDatasource = 0;
|
||||||
public static final Multimap<String, Assignment> assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
|
public static Multimap<String, Assignment> assignmentsForPlugins = null;
|
||||||
private static final boolean askForTest = false; // Enable this only for testing.
|
private static final boolean askForTest = false; // Enable this only for testing.
|
||||||
|
|
||||||
private static final Duration requestConnectTimeoutDuration = Duration.ofMinutes(1); // 1 minute.
|
private static final Duration requestConnectTimeoutDuration = Duration.ofMinutes(1); // 1 minute.
|
||||||
|
@ -37,9 +37,17 @@ public class AssignmentsHandler {
|
||||||
|
|
||||||
public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build();
|
public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build();
|
||||||
|
|
||||||
|
|
||||||
|
public AssignmentsHandler()
|
||||||
|
{
|
||||||
|
urlReports = new ArrayList<>(UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
|
||||||
|
expectedAssignmentsPerDatasource = (UrlsWorkerApplication.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest);
|
||||||
|
assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
|
||||||
|
}
|
||||||
|
|
||||||
public static AssignmentsRequest requestAssignments()
|
public static AssignmentsRequest requestAssignments()
|
||||||
{
|
{
|
||||||
String requestUrl = UrlsWorkerApplication.controllerBaseUrl + "urls" + (askForTest ? "/test" : "") + "?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT;
|
String requestUrl = UrlsWorkerApplication.controllerBaseUrl + "urls" + (askForTest ? "/test" : "") + "?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
|
||||||
logger.info("Going to request assignments from the controller-server: " + requestUrl);
|
logger.info("Going to request assignments from the controller-server: " + requestUrl);
|
||||||
|
|
||||||
AssignmentsRequest assignmentRequest = null;
|
AssignmentsRequest assignmentRequest = null;
|
||||||
|
|
|
@ -5,7 +5,11 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.core.env.Environment;
|
import org.springframework.core.env.Environment;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
public class UriBuilder {
|
public class UriBuilder {
|
||||||
|
|
||||||
|
@ -25,7 +29,10 @@ public class UriBuilder {
|
||||||
|
|
||||||
baseUrl += "://";
|
baseUrl += "://";
|
||||||
|
|
||||||
String hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null.
|
String hostName = getPublicIP();
|
||||||
|
if ( hostName == null )
|
||||||
|
hostName = InetAddress.getLoopbackAddress().getHostName(); // Non-null.
|
||||||
|
|
||||||
baseUrl += hostName;
|
baseUrl += hostName;
|
||||||
|
|
||||||
String serverPort = environment.getProperty("server.port");
|
String serverPort = environment.getProperty("server.port");
|
||||||
|
@ -36,11 +43,11 @@ public class UriBuilder {
|
||||||
baseUrl += ":" + serverPort;
|
baseUrl += ":" + serverPort;
|
||||||
|
|
||||||
String baseInternalPath = environment.getProperty("server.servlet.context-path");
|
String baseInternalPath = environment.getProperty("server.servlet.context-path");
|
||||||
if (baseInternalPath != null) {
|
if ( baseInternalPath != null ) {
|
||||||
if (!baseInternalPath.startsWith("/"))
|
if ( !baseInternalPath.startsWith("/") )
|
||||||
baseUrl += "/";
|
baseUrl += "/";
|
||||||
baseUrl += baseInternalPath;
|
baseUrl += baseInternalPath;
|
||||||
if (!baseInternalPath.endsWith("/"))
|
if ( !baseInternalPath.endsWith("/") )
|
||||||
baseUrl += "/";
|
baseUrl += "/";
|
||||||
} else {
|
} else {
|
||||||
logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected.
|
logger.warn("No property \"server.servlet.context-path\" was found in \"application.properties\"!"); // Yes it's expected.
|
||||||
|
@ -50,6 +57,25 @@ public class UriBuilder {
|
||||||
logger.debug("ServerBaseURL: " + baseUrl);
|
logger.debug("ServerBaseURL: " + baseUrl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String getPublicIP()
|
||||||
|
{
|
||||||
|
String publicIpAddress = "";
|
||||||
|
URL url_name;
|
||||||
|
try {
|
||||||
|
url_name = new URL("https://api.ipify.org/");
|
||||||
|
} catch (MalformedURLException mue) {
|
||||||
|
logger.warn(mue.getMessage());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try ( BufferedReader bf = new BufferedReader(new InputStreamReader(url_name.openStream()))) {
|
||||||
|
publicIpAddress = bf.readLine().trim();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("Cannot get the publicIP address for this machine!", e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return publicIpAddress;
|
||||||
|
}
|
||||||
|
|
||||||
public static String getBaseUrl() {
|
public static String getBaseUrl() {
|
||||||
return baseUrl;
|
return baseUrl;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue