You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
UrlsWorker/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java

207 lines
11 KiB
Java

package eu.openaire.urls_worker.util;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import eu.openaire.publications_retriever.util.http.HttpConnUtils;
import eu.openaire.publications_retriever.util.url.GenericUtils;
import eu.openaire.publications_retriever.util.url.UrlUtils;
import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.controllers.GeneralController;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.payloads.requests.AssignmentsRequest;
import eu.openaire.urls_worker.payloads.responces.WorkerReport;
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.net.CookieStore;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class AssignmentsHandler {
private static final Logger logger = LoggerFactory.getLogger(AssignmentsHandler.class);
public static List<UrlReport> urlReports = null;
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
public static Multimap<String, Assignment> assignmentsForPlugins = null;
private static final boolean askForTest = false; // Enable this only for testing.
private static String requestUrl;
private static final Duration requestConnectTimeoutDuration = Duration.ofMinutes(1); // 1 minute.
private static final Duration requestReadTimeoutDuration = Duration.ofHours(3); // 3 hours. Time to wait for the data to get transferred over the network. Many workers may try to get assignments from the Worker, so each worker might have to wait some 10s of minutes for work.
// The controller has to retrieve the data from the database, then prepare them in memory, insert them in the "assignment"-table and, finally, return them to the worker.
public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build();
public static boolean hadConnectionErrorOnRequest = false;
public static long numHandledAssignmentsBatches = 0; // No need to be synchronized.
public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 10_000_000;
public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 1_000_000;
private static CookieStore cookieStore = null;
public AssignmentsHandler()
{
urlReports = new ArrayList<>(UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
int expectedAssignmentsPerDatasource = (UrlsWorkerApplication.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest);
assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
requestUrl = UrlsWorkerApplication.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
cookieStore = HttpConnUtils.cookieManager.getCookieStore();
}
public static AssignmentsRequest requestAssignments()
{
logger.info("Going to request assignments from the controller-server: " + requestUrl);
AssignmentsRequest assignmentRequest = null;
try { // Here, the HTTP-request is executed.
assignmentRequest = restTemplate.getForObject(requestUrl, AssignmentsRequest.class);
} catch (RestClientException rce) {
logger.error("Could not retrieve the assignments!\n" + rce.getMessage()); // It shows the response body (from Spring v.2.5.6 onwards).
hadConnectionErrorOnRequest = true;
return null;
}
//logger.debug(assignmentRequest.toString()); // DEBUG!
return assignmentRequest;
}
public static void handleAssignments()
{
AssignmentsRequest assignmentsRequest = requestAssignments();
if ( assignmentsRequest == null )
return;
Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter();
List<Assignment> assignments = assignmentsRequest.getAssignments();
if ( assignments == null ) {
logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter);
return;
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter);
return;
}
logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " tasks.");
// Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later.
for ( Assignment assignment : assignments ) {
// Add each task in its own HashSet.
try {
assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment);
} catch (NullPointerException npe) {
logger.warn("An NPE was thrown when splitting the assignments based on the datasource-types. The assignment was: " + assignment); // Do not use "assignment.toString()", it may cause an NPE.
}
}
//countDatasourcesAndRecords(assignmentsSize); // Only for DEBUG! Keep it commented in normal run.
// TODO - Decide which tasks run with what plugin (depending on their datasource).
// First run -in parallel- the tasks which require some specific plugin.
// Then run the remaining tasks in the generic plugin (which handles parallelism itself).
// For now, let's just run all tasks in the generic plugin.
try {
PublicationsRetrieverPlugin.processAssignments(assignmentRequestCounter, assignmentsForPlugins.values());
} catch (Exception e) {
logger.error("Exception when processing the assignments_" + assignmentRequestCounter, e);
} // In this case, we will either have an empty WorkerReport or a half-filled one. Either way, we want to report back to the Controller.
// TODO - If we have more than one plugin running at the same time, then make the "AssignmentsHandler.urlReports"-list thread-safe.
if ( askForTest ) {
logger.debug("UrlReports:"); // DEBUG!
for ( UrlReport urlReport : urlReports )
logger.debug(urlReport.toString());
} // Avoid posting the results in "askForTestUrls"-mode. We don't want for test-results to be written into the database by the controller.
else
postWorkerReport(assignmentRequestCounter);
numHandledAssignmentsBatches ++; // This is used later to stop this app, if a user-defined upper limit is set and reached.
// Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program.
// This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine.
long idUrlPairsHandled = (numHandledAssignmentsBatches * UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDuplicateUrlsData )
UrlUtils.duplicateUrls.clear();
if ( idUrlPairsHandled >= idUrlsToHandleBeforeClearingDomainAndPathTrackingData )
GenericUtils.clearDomainAndPathTrackingData();
if ( GeneralController.shouldShutdownWorker
|| (AssignmentsHandler.numHandledAssignmentsBatches == UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeShutdown) )
{
logger.info("The worker will now shutdown, as " + (GeneralController.shouldShutdownWorker
? "it received a \"shutdownWorker\" request!"
: "the maximum assignments-batches (" + UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeShutdown + ") to be handled was reached!"));
UrlsWorkerApplication.gentleAppShutdown();
// The "gentleAppShutdown()" will exit the app, so the "isAvailableForWork" will not be set below.
}
// Note: Cannot call this method, here, retrospectively, as if it runs 100s of times, the memory-stack may break..
// The scheduler will handle calling it every 15 mins, in case the Worker is available for work..
}
/**
* Post the worker report and wait for the Controller to request the publication-files.
* Once the Controller finishes with uploading the files to the S3-ObjectStore, it returns an "HTTP-200-OK" response to the Worker.
* */
public static boolean postWorkerReport(Long assignmentRequestCounter)
{
String postUrl = UrlsWorkerApplication.controllerBaseUrl + "urls/addWorkerReport";
logger.info("Going to post the WorkerReport of assignment_" + assignmentRequestCounter + " to the controller-server: " + postUrl);
try {
ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class);
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode == HttpStatus.OK.value() ) {
logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, and the full-text delivering, were successful!");
return true;
} else {
logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode);
return false;
}
} catch (Exception e) {
logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e);
return false;
} finally {
urlReports.clear(); // Reset, without de-allocating.
assignmentsForPlugins.clear();
logger.debug("The number of cookies is: " + cookieStore.getCookies().size());
boolean cookiesDeleted = cookieStore.removeAll();
logger.debug(cookiesDeleted ? "The cookies where removed!" : "No cookies where removed!");
}
}
public static void countDatasourcesAndRecords(int assignmentsSize)
{
Set<String> datasources = assignmentsForPlugins.keySet();
int numDatasources = datasources.size();
logger.debug("Num of datasources: " + numDatasources);
for ( String datasource : datasources ) {
logger.debug("Num of records for datasource \"" + datasource + "\" is: " + assignmentsForPlugins.get(datasource).size() );
}
logger.debug("Average num of records per datasource: " + (assignmentsSize / numDatasources));
}
}