- Refactor the assignments-handling. In order to match with the database schema, now the AssignmentRequest returns a list of Assignments instead of a single assignment having a list of Tasks.

- Cleanup the members of the "Payload" model.
This commit is contained in:
Lampros Smyrnaios 2021-07-05 15:00:29 +03:00
parent f6e53ca289
commit 6307cda23a
10 changed files with 132 additions and 113 deletions

View File

@ -26,7 +26,7 @@ public class ScheduledTasks {
@Scheduled(fixedRate = 1800_000) // Every 30 mins: 1800_000
public void handleAssignment() {
if ( AssignmentHandler.isAvailableForWork )
AssignmentHandler.handleAssignment();
AssignmentHandler.handleAssignments();
}
//@Scheduled(fixedRate = 20_000) // Every 20 secs.

View File

@ -36,7 +36,7 @@ public class GeneralController {
if ( AssignmentHandler.isAvailableForWork ) {
logger.info("The worker is available for an assignment.");
return ResponseEntity.status(200).body(new WorkerResponse(WorkerConstants.WORKER_ID, WorkerConstants.TASKS_LIMIT));
return ResponseEntity.status(200).body(new WorkerResponse(WorkerConstants.WORKER_ID, WorkerConstants.ASSIGNMENTS_LIMIT));
}
else {
logger.info("The worker is busy with another assignment.");

View File

@ -5,53 +5,68 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import java.util.Date;
import java.util.List;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"assignmentId",
"tasks",
"id",
"original_url",
"datasource",
"workerId",
"date"
})
public class Assignment {
@JsonProperty("assignmentId")
private int assignmentId;
@JsonProperty("id")
private String id;
@JsonProperty("tasks")
private List<Task> tasks;
@JsonProperty("original_url")
private String originalUrl;
@JsonProperty("workerId")
@JsonProperty("datasource")
private Datasource datasource;
@JsonProperty("workerid")
private String workerId;
@JsonProperty("date")
private Date date;
public Assignment() {}
public Assignment(int assignmentId, List<Task> tasks, String workerId, Date date) {
this.assignmentId = assignmentId;
this.tasks = tasks;
public Assignment(String id, String originalUrl, Datasource datasource, String workerId, Date date) {
this.id = id;
this.originalUrl = originalUrl;
this.datasource = datasource;
this.workerId = workerId;
this.date = date;
}
public int getAssignmentId() {
return assignmentId;
public String getId() {
return id;
}
public void setAssignmentId(int assignmentId) {
this.assignmentId = assignmentId;
public void setId(String id) {
this.id = id;
}
public List<Task> getTasks() {
return tasks;
public String getOriginalUrl() {
return originalUrl;
}
public void setTasks(List<Task> tasks) {
this.tasks = tasks;
public void setOriginalUrl(String originalUrl) {
this.originalUrl = originalUrl;
}
public Datasource getDatasource() {
return datasource;
}
public void setDatasource(Datasource datasource) {
this.datasource = datasource;
}
public String getWorkerId() {
@ -70,11 +85,13 @@ public class Assignment {
this.date = date;
}
@Override
public String toString() {
return "Assignment{" +
"assignmentId=" + assignmentId +
", tasks=" + tasks +
"id='" + id + '\'' +
", originalUrl='" + originalUrl + '\'' +
", datasource=" + datasource +
", workerId='" + workerId + '\'' +
", date=" + date +
'}';

View File

@ -12,11 +12,10 @@ import java.util.Date;
"id",
"original_url",
"actual_url",
"date_acquired",
"date",
"mime_type",
"size",
"more_info",
"md5",
"hash",
"location",
"provenance"
})
@ -31,7 +30,7 @@ public class Payload {
@JsonProperty("actual_url")
private String actual_url;
@JsonProperty("date_acquired")
@JsonProperty("date")
private Date date_acquired;
@JsonProperty("mime_type")
@ -40,28 +39,24 @@ public class Payload {
@JsonProperty("size")
private Long size; // In bytes.
@JsonProperty("more_info")
private String more_info;
@JsonProperty("md5")
private String md5;
@JsonProperty("hash")
private String hash;
@JsonProperty("location")
private String location;
@JsonProperty("provenance")
private String provenance;
private String provenance; // "crawl:<PluginName>"
public Payload(String id, String original_url, String actual_url, Date date_acquired, String mime_type, Long size, String more_info, String md5, String location, String provenance) {
public Payload(String id, String original_url, String actual_url, Date date_acquired, String mime_type, Long size, String hash, String location, String provenance) {
this.id = id;
this.original_url = original_url;
this.actual_url = actual_url;
this.date_acquired = date_acquired;
this.mime_type = mime_type;
this.size = size;
this.more_info = more_info;
this.md5 = md5;
this.hash = hash;
this.location = location;
this.provenance = provenance;
}
@ -114,20 +109,12 @@ public class Payload {
this.size = size;
}
public String getMore_info() {
return more_info;
public String getHash() {
return hash;
}
public void setMore_info(String more_info) {
this.more_info = more_info;
}
public String getMd5() {
return md5;
}
public void setMd5(String md5) {
this.md5 = md5;
public void setHash(String hash) {
this.hash = hash;
}
public String getLocation() {
@ -155,8 +142,7 @@ public class Payload {
", date_acquired='" + date_acquired + '\'' +
", mime_type='" + mime_type + '\'' +
", size='" + size + '\'' +
", more_info='" + more_info + '\'' +
", md5='" + md5 + '\'' +
", hash='" + hash + '\'' +
", location='" + location + '\'' +
", provenance='" + provenance + '\'' +
'}';

View File

@ -4,31 +4,46 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import eu.openaire.urls_worker.models.Assignment;
import java.util.List;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class AssignmentRequest {
@JsonProperty("assignment")
private Assignment assignment;
@JsonProperty("assignmentCounter")
private Long assignmentCounter;
@JsonProperty("assignments")
private List<Assignment> assignments;
public AssignmentRequest() { }
public AssignmentRequest(Assignment assignment) {
this.assignment = assignment;
public AssignmentRequest(Long assignmentCounter, List<Assignment> assignments) {
this.assignmentCounter = assignmentCounter;
this.assignments = assignments;
}
public Assignment getAssignment() {
return assignment;
public Long getAssignmentCounter() {
return assignmentCounter;
}
public void setAssignment(Assignment assignment) {
this.assignment = assignment;
public void setAssignmentCounter(Long assignmentCounter) {
this.assignmentCounter = assignmentCounter;
}
public List<Assignment> getAssignments() {
return assignments;
}
public void setAssignments(List<Assignment> assignments) {
this.assignments = assignments;
}
@Override
public String toString() {
return "AssignmentRequest{" +
"assignment=" + assignment +
"assignmentCounter=" + assignmentCounter +
", assignments=" + assignments +
'}';
}
}

View File

@ -19,15 +19,15 @@ public class WorkerReport {
@JsonProperty("workerId")
private String workerId;
@JsonProperty("assignmentId")
private int assignmentId;
@JsonProperty("assignmentRequestCounter")
private Long assignmentRequestCounter;
@JsonProperty("urlReports")
private List<UrlReport> urlReports;
public WorkerReport(String workerId, int assignmentId, List<UrlReport> urlReports) {
public WorkerReport(String workerId, Long assignmentRequestCounter, List<UrlReport> urlReports) {
this.workerId = workerId;
this.assignmentId = assignmentId;
this.assignmentRequestCounter = assignmentRequestCounter;
this.urlReports = urlReports;
}
@ -39,12 +39,12 @@ public class WorkerReport {
this.workerId = workerId;
}
public int getAssignmentId() {
return this.assignmentId;
public Long getAssignmentRequestCounter() {
return assignmentRequestCounter;
}
public void setAssignmentId(int assignmentId) {
this.assignmentId = assignmentId;
public void setAssignmentRequestCounter(Long assignmentRequestCounter) {
this.assignmentRequestCounter = assignmentRequestCounter;
}
public List<UrlReport> getUrlReports() {
@ -59,7 +59,7 @@ public class WorkerReport {
public String toString() {
return "WorkerReport{" +
"workerId='" + workerId + '\'' +
", assignmentId=" + assignmentId +
", assignmentRequestCounter=" + assignmentRequestCounter +
", urlReports=" + urlReports +
'}';
}

View File

@ -8,19 +8,19 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"workerId",
"tasksLimit"
"assignmentsLimit"
})
public class WorkerResponse {
@JsonProperty("workerId")
private String workerId;
@JsonProperty("tasksLimit")
private int tasksLimit;
@JsonProperty("assignmentsLimit")
private int assignmentsLimit;
public WorkerResponse(String workerId, int tasksLimit) {
public WorkerResponse(String workerId, int assignmentsLimit) {
this.workerId = workerId;
this.tasksLimit = tasksLimit;
this.assignmentsLimit = assignmentsLimit;
}
public String getWorkerId() {
@ -32,18 +32,18 @@ public class WorkerResponse {
}
public int getTasksLimit() {
return tasksLimit;
return assignmentsLimit;
}
public void setTasksLimit(int tasksLimit) {
this.tasksLimit = tasksLimit;
public void setTasksLimit(int assignmentsLimit) {
this.assignmentsLimit = assignmentsLimit;
}
@Override
public String toString() {
return "WorkerRequest{" +
"id='" + workerId + '\'' +
", tasksLimit=" + tasksLimit +
", assignmentsLimit=" + assignmentsLimit +
'}';
}
}

View File

@ -10,6 +10,7 @@ import eu.openaire.publications_retriever.util.http.HttpConnUtils;
import eu.openaire.publications_retriever.util.url.DataToBeLogged;
import eu.openaire.publications_retriever.util.url.LoaderAndChecker;
import eu.openaire.publications_retriever.util.url.UrlUtils;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.Payload;
import eu.openaire.urls_worker.models.Task;
import eu.openaire.urls_worker.models.UrlReport;
@ -57,25 +58,25 @@ public class PublicationsRetrieverPlugin {
}
public static void processTasks(int assignmentId, Collection<Task> tasks) throws RuntimeException, FileNotFoundException
public static void processAssginmets(Long assignmentRequestCounter, Collection<Assignment> assignments) throws RuntimeException, FileNotFoundException
{
ConnSupportUtils.setKnownMimeTypes();
FileUtils.storeDocFilesDir = assignmentsBaseFullTextsPath + "assignment_" + assignmentId + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored.
FileUtils.storeDocFilesDir = assignmentsBaseFullTextsPath + "assignment_" + assignmentRequestCounter + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored.
FileUtils.setOutput(new FileOutputStream(assignmentsBasePath + "assignment_" + assignmentId + "_generic_results.json"));
FileUtils.setOutput(new FileOutputStream(assignmentsBasePath + "assignment_" + assignmentRequestCounter + "_generic_results.json"));
int tasksSize = tasks.size();
int tasksSize = assignments.size();
int batchCount = 0;
int taskCount = 0;
List<Callable<Boolean>> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize);
// Start loading and checking urls.
for ( Task task : tasks )
for ( Assignment assignment : assignments )
{
callableTasks.add(() -> {
String id = task.getId();
String url = task.getUrl();
String id = assignment.getId();
String url = assignment.getOriginalUrl();
if ( (url = LoaderAndChecker.handleUrlChecks(id, url)) == null ) {
return false;
@ -133,7 +134,7 @@ public class PublicationsRetrieverPlugin {
{
for ( DataToBeLogged data : FileUtils.dataToBeLoggedList )
{
String status = null, fileLocation = null, md5 = null;
String status = null, fileLocation = null, hash = null;
Long size = null;
if ( data.getWasDocumentOrDatasetAccessible().equals("true") )
{
@ -158,14 +159,14 @@ public class PublicationsRetrieverPlugin {
try {
File docFile = new File(fileLocation);
if ( docFile.isFile() ) {
md5 = Files.hash(docFile, Hashing.md5()).toString(); // These hashing functions are deprecated, but just to inform us that MD5 is not secure. Luckily, we use MD5 just to identify duplicate files.
//logger.debug("MD5 for file \"" + docFile.getName() + "\": " + md5); // DEBUG!
hash = Files.hash(docFile, Hashing.md5()).toString(); // These hashing functions are deprecated, but just to inform us that MD5 is not secure. Luckily, we use MD5 just to identify duplicate files.
//logger.debug("MD5 for file \"" + docFile.getName() + "\": " + hash); // DEBUG!
size = java.nio.file.Files.size(Paths.get(fileLocation));
//logger.debug("Size of file \"" + docFile.getName() + "\": " + size); // DEBUG!
} else
logger.error("No file was found with path: " + fileLocation);
} catch (Exception e) {
if ( md5 == null )
if ( hash == null )
logger.error("Could not retrieve the MD5-hash for the file: " + fileLocation);
if ( size == null )
@ -178,7 +179,7 @@ public class PublicationsRetrieverPlugin {
} else
status = "non-accessible";
Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), data.getDocOrDatasetUrl(), new Date(), "application/pdf", size, "more_info", md5, fileLocation, null);
Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), data.getDocOrDatasetUrl(), new Date(), "application/pdf", size, hash, fileLocation, "crawl:PublicationsRetriever");
AssignmentHandler.urlReports.add(new UrlReport(status, payload));
}
FileUtils.dataToBeLoggedList.clear(); // Empty the list, to be re-populated by the next batch / assignment.

View File

@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.Task;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.payloads.requests.AssignmentRequest;
import eu.openaire.urls_worker.payloads.responces.WorkerReport;
@ -29,10 +28,10 @@ public class AssignmentHandler {
public static List<UrlReport> urlReports = null;
public static Assignment requestAssignment()
public static AssignmentRequest requestAssignments()
{
RestTemplate restTemplate = new RestTemplateBuilder().build();
String url = "http://localhost:8080/api/urls/test?workerId=" + WorkerConstants.WORKER_ID + "&tasksLimit=" + WorkerConstants.TASKS_LIMIT;
String url = "http://localhost:8080/api/urls/test?workerId=" + WorkerConstants.WORKER_ID + "&assignmentsLimit=" + WorkerConstants.ASSIGNMENTS_LIMIT;
String json = null;
try {
json = restTemplate.getForObject(url, String.class);
@ -58,37 +57,38 @@ public class AssignmentHandler {
//logger.debug(assignmentRequest.toString()); // DEBUG!
Assignment assignment = assignmentRequest.getAssignment();
logger.info("Assignment with id < " + assignment.getAssignmentId() + " > was received and it's ready to be processed. It contains " + assignment.getTasks().size() + " tasks.");
logger.info("AssignmentRequest < " + assignmentRequest.getAssignmentCounter() + " > was received and it's ready to be processed. It contains " + assignmentRequest.getAssignments().size() + " tasks.");
// TODO - Maybe create a HashSet with these IDs. It may be useful for the Worker to know and report which assignments (and how many) it has processed.
return assignment;
return assignmentRequest;
}
public static void handleAssignment()
public static void handleAssignments()
{
Assignment assignment = requestAssignment();
if ( assignment == null ) {
logger.warn("The assignment was found to be null!");
AssignmentRequest assignmentRequest = requestAssignments();
Long assignmentRequestCounter = assignmentRequest.getAssignmentCounter();
List<Assignment> assignments = assignmentRequest.getAssignments();
if ( assignments == null ) {
logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter);
return;
}
// Start handling the assignment, the worker is busy.
// Start handling the assignments, the worker is busy.
isAvailableForWork = false;
List<Task> tasks = assignment.getTasks();
// Iterate over the tasks and add each task in it's own list depending on the DATASOURCE in order to decide which plugin to use later.
Multimap<String, Task> tasksForPlugins = HashMultimap.create();
Multimap<String, Assignment> assignmentsForPlugins = HashMultimap.create();
for ( Task task : tasks ) {
for ( Assignment assignment : assignments ) {
// Add each task in its own HashSet.
tasksForPlugins.put(task.getDatasource().getId(), task);
assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment);
}
urlReports = new ArrayList<>(tasks.size()); // Define the new UrlReport-list. It is reinitialized each time.
urlReports = new ArrayList<>(assignments.size()); // Define the new UrlReport-list. It is reinitialized each time.
// TODO - Decide which tasks run with what plugin (depending on their datasource).
@ -99,31 +99,31 @@ public class AssignmentHandler {
// For now, let's just run all tasks in the generic plugin.
try {
PublicationsRetrieverPlugin.processTasks(assignment.getAssignmentId(), tasksForPlugins.values());
PublicationsRetrieverPlugin.processAssginmets(assignmentRequestCounter, assignmentsForPlugins.values());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
postWorkerReport(assignment.getAssignmentId());
postWorkerReport(assignmentRequestCounter);
isAvailableForWork = true;
}
public static boolean postWorkerReport(int assignmentId)
public static boolean postWorkerReport(Long assignmentRequestCounter)
{
RestTemplate restTemplate = new RestTemplateBuilder().build();
String url = "http://localhost:8080/api/urls/addWorkerReport";
try {
ResponseEntity responseEntity = restTemplate.postForEntity(url, new WorkerReport(WorkerConstants.WORKER_ID, assignmentId, urlReports), String.class);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, new WorkerReport(WorkerConstants.WORKER_ID, assignmentRequestCounter, urlReports), String.class);
int responseCode = responseEntity.getStatusCode().value();
if ( responseCode != HttpStatus.OK.value() ) {
logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentId + " to the Controller. Error-code was: " + responseCode);
logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode);
return false;
}
} catch (Exception e) {
logger.error("Error when submitting the WorkerReport of assignment_" + assignmentId + " to the Controller: ", e);
logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e);
return false;
}

View File

@ -3,6 +3,6 @@ package eu.openaire.urls_worker.util;
public interface WorkerConstants {
String WORKER_ID = "worker_1"; // This should be different for every deployment of a Worker.
int TASKS_LIMIT = 10000;
int ASSIGNMENTS_LIMIT = 10000;
}