UrlsWorker/src/main/java/eu/openaire/urls_worker/util/AssignmentsHandler.java

242 lines
14 KiB
Java

package eu.openaire.urls_worker.util;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import eu.openaire.publications_retriever.util.url.GenericUtils;
import eu.openaire.publications_retriever.util.url.UrlUtils;
import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.controllers.FullTextsController;
import eu.openaire.urls_worker.controllers.GeneralController;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.payloads.requests.AssignmentsRequest;
import eu.openaire.urls_worker.payloads.responces.WorkerReport;
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class AssignmentsHandler {
private static final Logger logger = LoggerFactory.getLogger(AssignmentsHandler.class);
public static List<UrlReport> urlReports = null;
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
public static Multimap<String, Assignment> assignmentsForPlugins = null;
private static final boolean askForTest = false; // Enable this only for testing.
private static String requestUrl;
private static final Duration requestConnectTimeoutDuration = Duration.ofMinutes(1); // 1 minute.
private static final Duration requestReadTimeoutDuration = Duration.ofHours(5); // 5 hours. Time to wait for the data to get transferred over the network. Many workers may try to get assignments from the Worker, so each worker might have to wait some 10s of minutes for work.
// The controller has to retrieve the data from the database, then prepare them in memory, insert them in the "assignment"-table and, finally, return them to the worker.
public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(requestReadTimeoutDuration).build();
public static boolean hadConnectionErrorOnRequest = false;
public static long numHandledAssignmentsBatches = 0; // No need to be synchronized.
public static final long idUrlsToHandleBeforeClearingDomainAndPathBlockingData = 300_000;
public static long timesClearingDomainAndPathBlockingData = 0;
public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 600_000;
public static long timesClearingDomainAndPathTrackingData = 0;
public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 200_000;
public static long timesClearingDuplicateUrlsData = 0;
public AssignmentsHandler()
{
urlReports = new ArrayList<>(UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
int expectedAssignmentsPerDatasource = (UrlsWorkerApplication.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest);
assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
requestUrl = UrlsWorkerApplication.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
}
public static AssignmentsRequest requestAssignments()
{
logger.info("Going to request " + UrlsWorkerApplication.maxAssignmentsLimitPerBatch + " assignments from the controller-server: " + requestUrl);
AssignmentsRequest assignmentRequest = null;
try { // Here, the HTTP-request is executed.
assignmentRequest = restTemplate.getForObject(requestUrl, AssignmentsRequest.class);
} catch (RestClientException rce) {
logger.error("Could not retrieve the assignments!\n" + rce.getMessage()); // It shows the response body (from Spring v.2.5.6 onwards).
hadConnectionErrorOnRequest = true;
return null;
}
//logger.debug(assignmentRequest.toString()); // DEBUG!
return assignmentRequest;
}
public static void handleAssignments()
{
AssignmentsRequest assignmentsRequest = requestAssignments();
if ( assignmentsRequest == null )
return;
Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter();
List<Assignment> assignments = assignmentsRequest.getAssignments();
if ( assignments == null ) {
if ( assignmentRequestCounter == -1 )
logger.warn("The Controller could not retrieve and assignments from the database. It will increase the attempts-number and retry in the next request.");
else
logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter);
return; // The Worker will just request the assignments again, immediately.
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter);
return;
}
logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " tasks.");
// Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later.
for ( Assignment assignment : assignments ) {
// Add each task in its own HashSet.
try {
assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment);
} catch (NullPointerException npe) {
logger.warn("An NPE was thrown when splitting the assignments based on the datasource-types. The assignment was: " + assignment); // Do not use "assignment.toString()", it may cause an NPE.
}
}
//countDatasourcesAndRecords(assignmentsSize); // Only for DEBUG! Keep it commented in normal run.
// TODO - Decide which tasks run with what plugin (depending on their datasource).
// First run -in parallel- the tasks which require some specific plugin.
// Then run the remaining tasks in the generic plugin (which handles parallelism itself).
// For now, let's just run all tasks in the generic plugin.
try {
PublicationsRetrieverPlugin.processAssignments(assignmentRequestCounter, assignmentsForPlugins.values());
} catch (Exception e) {
logger.error("Exception when processing the assignments_" + assignmentRequestCounter, e);
} // In this case, we will either have an empty WorkerReport or a half-filled one. Either way, we want to report back to the Controller.
// TODO - If we have more than one plugin running at the same time, then make the "AssignmentsHandler.urlReports"-list thread-safe.
if ( askForTest ) {
logger.debug("UrlReports:"); // DEBUG!
for ( UrlReport urlReport : urlReports )
logger.debug(urlReport.toString());
} // Avoid posting the results in "askForTestUrls"-mode. We don't want for test-results to be written into the database by the controller.
else
postWorkerReport(assignmentRequestCounter);
// The "postWorkerReport()" above, may fail, but the numbers below still stand, as they are affected by the results themselves, rather than the "posting" of them to the Controller.
numHandledAssignmentsBatches ++; // This is used later to stop this app, if a user-defined upper limit is set and reached.
// Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program.
// This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine.
long idUrlPairsHandled = (numHandledAssignmentsBatches * UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
if ( idUrlPairsHandled >= ((timesClearingDuplicateUrlsData +1) * idUrlsToHandleBeforeClearingDuplicateUrlsData) ) {
UrlUtils.duplicateUrls.clear();
timesClearingDuplicateUrlsData ++;
}
if ( idUrlPairsHandled >= ((timesClearingDomainAndPathTrackingData +1) * idUrlsToHandleBeforeClearingDomainAndPathTrackingData) ) {
GenericUtils.clearTrackingData(); // This includes the "blocking data", we may say "if this condition is true, do not bother checking the just-blocking condition".
timesClearingDomainAndPathTrackingData ++;
timesClearingDomainAndPathBlockingData ++; // Increment this also, as we avoid the following check in this case, but the counter has to be increased nevertheless.
} else if ( idUrlPairsHandled >= ((timesClearingDomainAndPathBlockingData +1) * idUrlsToHandleBeforeClearingDomainAndPathBlockingData) ) {
GenericUtils.clearDomainAndPathBlockingData();
timesClearingDomainAndPathBlockingData ++;
}
if ( GeneralController.shouldShutdownWorker
|| (AssignmentsHandler.numHandledAssignmentsBatches == UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeShutdown) )
{
logger.info("The worker will now shutdown, as " + (GeneralController.shouldShutdownWorker
? "it received a \"shutdownWorker\" request!"
: "the maximum assignments-batches (" + UrlsWorkerApplication.maxAssignmentsBatchesToHandleBeforeShutdown + ") to be handled was reached!"));
// TODO - Should this worker inform the Controller that will shutdown?
UrlsWorkerApplication.gentleAppShutdown(); // This will exit the app.
}
// Note: Cannot call this method, here, retrospectively, as if it runs 100s of times, the memory-stack may break..
// The scheduler will handle calling it every 15 mins, in case the Worker is available for work..
}
public static HashSet<Long> assignmentsNumsHandled = new HashSet<>();
/**
* Post the worker report and wait for the Controller to request the publication-files.
* Once the Controller finishes with uploading the files to the S3-ObjectStore, it returns an "HTTP-200-OK" response to the Worker.
* Afterwards, the Worker, even in case of an error, deletes the fulltext and zip files.
* */
public static boolean postWorkerReport(Long assignmentRequestCounter)
{
String postUrl = UrlsWorkerApplication.controllerBaseUrl + "urls/addWorkerReport";
logger.info("Going to post the WorkerReport of assignments_" + assignmentRequestCounter + " to the controller-server: " + postUrl);
try {
ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class);
// The worker sends the "WorkerReport" and before this "POST"-request returns here, the Controller, after analyzing the report, opens new request to the Worker in order to receive the full-texts.
// After the report and the full-texts are received and uploaded to the database/S3, the Controller returns a response to the Worker.
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode == HttpStatus.OK.value() ) {
logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, and the full-text delivering, were successful.");
assignmentsNumsHandled.add(assignmentRequestCounter);
return true;
} else if ( responseCode == HttpStatus.MULTI_STATUS.value() ) {
logger.warn("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller was successful, but the full-texts' delivering failed!");
return true;
} else { // This does not include HTTP-5XX errors. For them an "HttpServerErrorException" is thrown.
logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller! Error-code was: " + responseCode);
return false;
}
} catch (HttpServerErrorException hsee) {
logger.error("The Controller failed to handle the WorkerReport of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage());
return false;
} catch (Exception e) {
logger.error("Error when submitting the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller: ", e);
return false;
} finally {
urlReports.clear(); // Reset, without de-allocating.
assignmentsForPlugins.clear();
// It is possible that one or more full-texts-batches, are not sent to the Controller, or that the Controller failed to process them.
// In that case, the related "attempt"-records will keep their "success" state, but the related "payload" records will not be inserted into the database.
// When all the id-urls are processed at least one time, the Controller will start returning all the "couldRetry" records without a related "payload"-record.
FullTextsController.deleteDirectory(assignmentRequestCounter);
}
}
public static void countDatasourcesAndRecords(int assignmentsSize)
{
Set<String> datasources = assignmentsForPlugins.keySet();
int numDatasources = datasources.size();
logger.debug("Num of datasources: " + numDatasources);
for ( String datasource : datasources ) {
logger.debug("Num of records for datasource \"" + datasource + "\" is: " + assignmentsForPlugins.get(datasource).size() );
}
logger.debug("Average num of records per datasource: " + (assignmentsSize / numDatasources));
}
}