- Read the Controller's url from a file, when starting the Application.
- Switch the "AssignmentsHandler.askForTest" to "false". - Get the size and the hash of a docFile which is previously downloaded by another ID in that batch. - Reset the "AssignmentHandler.urlReports" list after posting the results to the Controller. - Enhance logging and comments. - Add more guidelines in the README. - Disable the scheduled test-live job. - Code cleanup.
This commit is contained in:
parent
32aff8c44a
commit
61597d1627
16
README.md
16
README.md
|
@ -1,9 +1,15 @@
|
|||
# Urls_Worker
|
||||
# UrlsWorker
|
||||
|
||||
This is the Worker's Application.<br>
|
||||
It requests assignments from the [controller](https://code-repo.d4science.org/lsmyrnaios/UrlsController) and processes them.<br>
|
||||
[...] <br>
|
||||
|
||||
To install and run the application, run ```git clone``` and then execute the ```installAndRun.sh``` script.<br>
|
||||
That script, installs the [PublicationsRetriever](https://github.com/LSmyrnaios/PublicationsRetriever), as a library and then compiles and runs the whole Application.<br>
|
||||
It posts the results to the controller, which in turn, puts them in a database.<br>
|
||||
<br>
|
||||
|
||||
To install and run the application:
|
||||
- Run ```git clone``` and then ```cd UrlsWorker```.
|
||||
- Create the file ```controllerBaseUrl.txt``` which contains just one line with the controller's base api-url, for example: ```http://IP:PORT/api/```.
|
||||
- Execute the ```installAndRun.sh``` script.<br>
|
||||
|
||||
That script, installs the [PublicationsRetriever](https://github.com/LSmyrnaios/PublicationsRetriever), as a library and then compiles and runs the whole Application.<br>
|
||||
If you want to just run the app, then run the script with the argument "1": ```./installAndRun.sh 1```.<br>
|
||||
<br>
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package eu.openaire.urls_worker;
|
||||
|
||||
import eu.openaire.publications_retriever.PublicationsRetriever;
|
||||
import eu.openaire.publications_retriever.util.file.FileUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
|
@ -8,6 +9,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.io.File;
|
||||
import java.util.Scanner;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
|
@ -17,7 +20,13 @@ public class UrlsWorkerApplication {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(UrlsWorkerApplication.class);
|
||||
|
||||
private static final String controllerBaseUrlFilePath = FileUtils.workingDir + "controllerBaseUrl.txt";
|
||||
public static String controllerBaseUrl = null; // BaseUrl template: "http://IP:PORT/api/"
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
setControllerBaseUrl(); // This may cause the Server to terminate early, in case the controllerBaseUrl cannot be found.
|
||||
|
||||
SpringApplication.run(UrlsWorkerApplication.class, args);
|
||||
}
|
||||
|
||||
|
@ -38,4 +47,44 @@ public class UrlsWorkerApplication {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private static void setControllerBaseUrl()
|
||||
{
|
||||
// Take the controllerBaseUrl from the file.
|
||||
Scanner myReader = null;
|
||||
try {
|
||||
File controllerBaseUrlFile = new File(controllerBaseUrlFilePath);
|
||||
if ( !controllerBaseUrlFile.exists() ) {
|
||||
String errorMsg = "controllerBaseUrlFile \"" + controllerBaseUrlFilePath + "\" does not exists!";
|
||||
logger.error(errorMsg);
|
||||
System.err.println(errorMsg);
|
||||
System.exit(60);
|
||||
}
|
||||
|
||||
myReader = new Scanner(controllerBaseUrlFile);
|
||||
if ( !myReader.hasNextLine() ) {
|
||||
String errorMsg = "The controllerBaseUrlFile is empty! No WorkerReports can be sent from this worker! Exiting..";
|
||||
logger.error(errorMsg);
|
||||
System.err.println(errorMsg);
|
||||
System.exit(61);
|
||||
}
|
||||
|
||||
controllerBaseUrl = myReader.nextLine().trim();
|
||||
if ( !controllerBaseUrl.endsWith("/") )
|
||||
controllerBaseUrl += "/"; // Make sure the whole urls will not break later.
|
||||
|
||||
logger.info("The controllerBaseUrl is: " + controllerBaseUrl);
|
||||
|
||||
} catch (Exception e) {
|
||||
String errorMsg = "An error prevented the retrieval of the controllerBaseUrl from the file: " + controllerBaseUrlFilePath + "\n" + e.getMessage();
|
||||
logger.error(errorMsg);
|
||||
System.err.println(errorMsg);
|
||||
e.printStackTrace();
|
||||
System.exit(62);
|
||||
} finally {
|
||||
if ( myReader != null )
|
||||
myReader.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ public class ScheduledTasks {
|
|||
|
||||
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
|
||||
|
||||
@Scheduled(fixedRate = 600_000) // Every 10 mins: 600_000
|
||||
//@Scheduled(fixedRate = 600_000) // Every 10 mins: 600_000
|
||||
public void reportCurrentTime() {
|
||||
logger.info("Server is live! Time is now {}", dateFormat.format(new Date()));
|
||||
}
|
||||
|
@ -27,6 +27,8 @@ public class ScheduledTasks {
|
|||
public void handleAssignment() {
|
||||
if ( AssignmentHandler.isAvailableForWork )
|
||||
AssignmentHandler.handleAssignments();
|
||||
else
|
||||
logger.debug("The worker is not available for work at the moment..");
|
||||
}
|
||||
|
||||
//@Scheduled(fixedRate = 20_000) // Every 20 secs.
|
||||
|
|
|
@ -17,8 +17,7 @@ public class GeneralController {
|
|||
private static final Logger logger = LoggerFactory.getLogger(GeneralController.class);
|
||||
|
||||
|
||||
public GeneralController() {
|
||||
}
|
||||
public GeneralController() {}
|
||||
|
||||
@GetMapping("isAlive")
|
||||
public ResponseEntity<?> isWorkerAlive() {
|
||||
|
|
|
@ -1,65 +0,0 @@
|
|||
package eu.openaire.urls_worker.models;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
|
||||
|
||||
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
@JsonPropertyOrder({
|
||||
"id",
|
||||
"url",
|
||||
"datasource"
|
||||
})
|
||||
public class Task {
|
||||
|
||||
@JsonProperty("id")
|
||||
private String id;
|
||||
|
||||
@JsonProperty("url")
|
||||
private String url;
|
||||
|
||||
@JsonProperty("datasource")
|
||||
private Datasource datasource;
|
||||
|
||||
public Task() {}
|
||||
|
||||
public Task(String id, String url, Datasource datasource) {
|
||||
this.id = id;
|
||||
this.url = url;
|
||||
this.datasource = datasource;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
public void setUrl(String url) {
|
||||
this.url = url;
|
||||
}
|
||||
|
||||
public Datasource getDatasource() {
|
||||
return datasource;
|
||||
}
|
||||
|
||||
public void setDatasource(Datasource datasource) {
|
||||
this.datasource = datasource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Task{" +
|
||||
"id='" + id + '\'' +
|
||||
", url='" + url + '\'' +
|
||||
", datasource=" + datasource +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -70,9 +70,9 @@ public class PublicationsRetrieverPlugin {
|
|||
|
||||
FileUtils.setOutput(new FileOutputStream(assignmentsBasePath + "assignment_" + assignmentRequestCounter + "_generic_results.json"));
|
||||
|
||||
int tasksSize = assignments.size();
|
||||
int tasksNumber = assignments.size();
|
||||
int batchCount = 0;
|
||||
int taskCount = 0;
|
||||
int tasksCount = 0;
|
||||
List<Callable<Boolean>> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize);
|
||||
|
||||
// Start loading and checking urls.
|
||||
|
@ -121,9 +121,10 @@ public class PublicationsRetrieverPlugin {
|
|||
return true;
|
||||
});
|
||||
|
||||
if ( ((++taskCount) >= FileUtils.jsonBatchSize) || (taskCount >= tasksSize) )
|
||||
// Invoke the tasks every time we reach the "jsonBatchSize" tasks, or we are at the end of the list.
|
||||
if ( ((++tasksCount) >= FileUtils.jsonBatchSize) || (tasksCount >= tasksNumber) )
|
||||
{
|
||||
logger.info("Batch counter: " + (++batchCount) + " | progress: " + PublicationsRetriever.df.format(((batchCount-1) * taskCount) * 100.0 / tasksSize) + "% | every batch contains " + FileUtils.jsonBatchSize + " id-url pairs.");
|
||||
logger.info("Batch counter: " + (++batchCount) + " | progress: " + PublicationsRetriever.df.format(((batchCount-1) * tasksCount) * 100.0 / tasksNumber) + "% | every batch contains " + FileUtils.jsonBatchSize + " id-url pairs.");
|
||||
LoaderAndChecker.invokeAllTasksAndWait(callableTasks);
|
||||
addUrlReportsToWorkerReport();
|
||||
callableTasks.clear(); // Reset the thread-tasks-list for the next batch.
|
||||
|
@ -138,7 +139,8 @@ public class PublicationsRetrieverPlugin {
|
|||
{
|
||||
for ( DataToBeLogged data : FileUtils.dataToBeLoggedList )
|
||||
{
|
||||
String status = null, fileLocation = null, comment = data.getComment(), mimeType = null;
|
||||
String status = null, fileLocation = null, comment = data.getComment(), mimeType = null, hash = data.getHash();
|
||||
Long size = data.getSize();
|
||||
Error error = null;
|
||||
|
||||
if ( data.getWasDocumentOrDatasetAccessible().equals("true") )
|
||||
|
@ -152,10 +154,13 @@ public class PublicationsRetrieverPlugin {
|
|||
for ( DataToBeLogged data_2 : FileUtils.dataToBeLoggedList ) {
|
||||
if ( data_2.getUrlId().equals(previousId) && data_2.getWasDocumentOrDatasetAccessible().equals("true") ) {
|
||||
fileLocation = data_2.getComment();
|
||||
mimeType = "application/pdf";
|
||||
size = data_2.getSize();
|
||||
hash = data_2.getHash();
|
||||
mimeType = "application/pdf"; // TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
|
||||
break;
|
||||
}
|
||||
}
|
||||
// TODO - The case where the "twin-ID" is not found, should "never" happen. But should we check? How to handle if that is the case..?
|
||||
}
|
||||
else if ( comment.contains(DocFileNotRetrievedExceptionName) )
|
||||
fileLocation = "File not retrieved";
|
||||
|
@ -177,7 +182,7 @@ public class PublicationsRetrieverPlugin {
|
|||
if ( docOrDatasetUrl.equals(UrlUtils.unreachableDocOrDatasetUrlIndicator) || docOrDatasetUrl.equals(UrlUtils.duplicateUrlIndicator) )
|
||||
docOrDatasetUrl = null;
|
||||
|
||||
Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), docOrDatasetUrl, new Date(), mimeType, data.getSize(), data.getHash(), fileLocation, "crawl:PublicationsRetriever");
|
||||
Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), docOrDatasetUrl, new Date(), mimeType, size, hash, fileLocation, "crawl:PublicationsRetriever");
|
||||
// TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
|
||||
|
||||
AssignmentHandler.urlReports.add(new UrlReport(status, payload, error));
|
||||
|
|
|
@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Multimap;
|
||||
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
||||
import eu.openaire.urls_worker.models.Assignment;
|
||||
import eu.openaire.urls_worker.models.UrlReport;
|
||||
import eu.openaire.urls_worker.payloads.requests.AssignmentRequest;
|
||||
|
@ -14,7 +15,6 @@ import org.slf4j.LoggerFactory;
|
|||
import org.springframework.boot.web.client.RestTemplateBuilder;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
|
@ -27,25 +27,25 @@ public class AssignmentHandler {
|
|||
|
||||
public static List<UrlReport> urlReports = null;
|
||||
|
||||
private static final boolean askForTest = true; // Enable this only for testing.
|
||||
private static final boolean askForTest = false; // Enable this only for testing.
|
||||
|
||||
|
||||
public static AssignmentRequest requestAssignments()
|
||||
{
|
||||
RestTemplate restTemplate = new RestTemplateBuilder().build();
|
||||
String url = "http://localhost:1880/api/urls" + (askForTest ? "/test" : "") + "?workerId=" + WorkerConstants.WORKER_ID + "&workerAssignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT;
|
||||
String requestUrl = UrlsWorkerApplication.controllerBaseUrl + "urls" + (askForTest ? "/test" : "") + "?workerId=" + WorkerConstants.WORKER_ID + "&workerAssignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT;
|
||||
logger.info("Going to request assignments from the controller-server: " + requestUrl);
|
||||
|
||||
String json = null;
|
||||
try {
|
||||
json = restTemplate.getForObject(url, String.class);
|
||||
json = new RestTemplateBuilder().build().getForObject(requestUrl, String.class);
|
||||
} catch (Exception e) {
|
||||
logger.error("Could not retrieve the assignment: " + e.getMessage());
|
||||
logger.error("Could not retrieve the assignments!\n" + e.getMessage());
|
||||
return null;
|
||||
}
|
||||
|
||||
AssignmentRequest assignmentRequest = null;
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
try {
|
||||
assignmentRequest = objectMapper.readValue(json, AssignmentRequest.class);
|
||||
assignmentRequest = new ObjectMapper().readValue(json, AssignmentRequest.class);
|
||||
} catch (JsonProcessingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
@ -111,17 +111,25 @@ public class AssignmentHandler {
|
|||
|
||||
isAvailableForWork = true; // State this before posting, to catch the soonest next scheduled request.
|
||||
|
||||
/*logger.debug("UrlReports:"); // DEBUG!
|
||||
for ( UrlReport urlReport : urlReports ) {
|
||||
logger.debug(urlReport.toString());
|
||||
}*/
|
||||
|
||||
postWorkerReport(assignmentRequestCounter);
|
||||
|
||||
// Note: Cannot call this method here retrospectively, as if it runs 100s of times, the memory may break..
|
||||
// The scheduler will handle calling it every half an hour, in case the Worker is available for work..
|
||||
}
|
||||
|
||||
|
||||
public static boolean postWorkerReport(Long assignmentRequestCounter)
|
||||
{
|
||||
RestTemplate restTemplate = new RestTemplateBuilder().build();
|
||||
String url = "http://localhost:1880/api/urls/addWorkerReport";
|
||||
String postUrl = UrlsWorkerApplication.controllerBaseUrl + "urls/addWorkerReport";
|
||||
logger.info("Going to post the WorkerReport to the controller-server: " + postUrl);
|
||||
|
||||
try {
|
||||
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, new WorkerReport(WorkerConstants.WORKER_ID, assignmentRequestCounter, urlReports), String.class);
|
||||
ResponseEntity<String> responseEntity = new RestTemplateBuilder().build().postForEntity(postUrl, new WorkerReport(WorkerConstants.WORKER_ID, assignmentRequestCounter, urlReports), String.class);
|
||||
int responseCode = responseEntity.getStatusCode().value();
|
||||
if ( responseCode != HttpStatus.OK.value() ) {
|
||||
logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode);
|
||||
|
@ -130,6 +138,8 @@ public class AssignmentHandler {
|
|||
} catch (Exception e) {
|
||||
logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e);
|
||||
return false;
|
||||
} finally {
|
||||
AssignmentHandler.urlReports.clear(); // Reset the UrlReports list for the next assignments-handling.
|
||||
}
|
||||
|
||||
return true;
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="info">
|
||||
<root level="debug">
|
||||
<appender-ref ref="Console" />
|
||||
</root>
|
||||
|
||||
|
|
Loading…
Reference in New Issue