UrlsWorker/src/main/java/eu/openaire/urls_worker/components/AssignmentsHandler.java

409 lines
23 KiB
Java

package eu.openaire.urls_worker.components;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import eu.openaire.publications_retriever.PublicationsRetriever;
import eu.openaire.publications_retriever.util.url.GenericUtils;
import eu.openaire.publications_retriever.util.url.UrlUtils;
import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.components.plugins.PublicationsRetrieverPlugin;
import eu.openaire.urls_worker.controllers.GeneralController;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.payloads.requests.AssignmentsRequest;
import eu.openaire.urls_worker.payloads.responces.WorkerReport;
import eu.openaire.urls_worker.util.FilesCompressor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.io.BufferedWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Component
public class AssignmentsHandler {
private static final Logger logger = LoggerFactory.getLogger(AssignmentsHandler.class);
@Autowired
PublicationsRetrieverPlugin publicationsRetrieverPlugin;
private final String workerId;
private final String controllerBaseUrl;
private final int maxAssignmentsLimitPerBatch;
private final int maxAssignmentsBatchesToHandleBeforeShutdown;
public static List<UrlReport> urlReports = null;
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
public static Multimap<String, Assignment> assignmentsForPlugins = null;
private static final boolean askForTest = false; // Enable this only for testing.
private static String requestUrl;
public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(Duration.ofMinutes(2)).setReadTimeout(Duration.ofHours(1)).build();
public static boolean hadConnectionErrorOnRequest = false;
public static long numHandledAssignmentsBatches = 0; // No need to be synchronized.
public static final long idUrlsToHandleBeforeClearingDomainAndPathBlockingData = 300_000;
public static long timesClearingDomainAndPathBlockingData = 0;
public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 600_000;
public static long timesClearingDomainAndPathTrackingData = 0;
public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 200_000;
public static long timesClearingDuplicateUrlsData = 0;
public String workerReportsDirPath;
public AssignmentsHandler(@Value("${info.workerId}") String workerId, @Value("${info.maxAssignmentsLimitPerBatch}") int maxAssignmentsLimitPerBatch,
@Value("${info.maxAssignmentsBatchesToHandleBeforeShutdown}") int maxAssignmentsBatchesToHandleBeforeShutdown,
@Value("${info.controllerBaseUrl}") String controllerBaseUrl,
@Value("${workerReportsDirPath}") String workerReportsDirPath)
{
this.workerId = workerId;
this.maxAssignmentsLimitPerBatch = maxAssignmentsLimitPerBatch;
this.maxAssignmentsBatchesToHandleBeforeShutdown = maxAssignmentsBatchesToHandleBeforeShutdown;
this.controllerBaseUrl = controllerBaseUrl;
urlReports = new ArrayList<>(this.maxAssignmentsLimitPerBatch);
int expectedAssignmentsPerDatasource = (this.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest);
assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
requestUrl = this.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + this.workerId + "&workerAssignmentsLimit=" + this.maxAssignmentsLimitPerBatch;
if ( !workerReportsDirPath.endsWith("/") )
workerReportsDirPath += "/";
this.workerReportsDirPath = workerReportsDirPath;
try {
Files.createDirectories(Paths.get(this.workerReportsDirPath)); // No-op if it already exists.
} catch (Exception e) {
String errorMsg = "Could not create the \"workerReportsDirPath\": " + this.workerReportsDirPath;
logger.error(errorMsg, e);
throw new RuntimeException(errorMsg);
}
}
public AssignmentsRequest requestAssignments()
{
logger.info("Going to request up to " + this.maxAssignmentsLimitPerBatch + " assignments from the Controller: " + requestUrl);
AssignmentsRequest assignmentRequest = null;
try { // Here, the HTTP-request is executed.
assignmentRequest = restTemplate.getForObject(requestUrl, AssignmentsRequest.class);
} catch (RestClientException rce) {
final String errorMsg = "Could not retrieve the assignments! ";
String exceptionMsg = rce.getMessage(); // It also shows the response body of the response (from Spring v.2.5.6 onwards).
if ( (exceptionMsg != null) && !exceptionMsg.isEmpty() ) {
hadConnectionErrorOnRequest = true;
logger.error(errorMsg + exceptionMsg);
}
else { // Otherwise, it's an undefined error, which occurs randomly
// and does not mean that the Controller has some problem, or the Worker requested something in a wrong way,
// or that the firewall disallow the connection (in this case we get "connection refused/timed out").
logger.error(errorMsg, rce);
// Try again immediately, do not wait 15 mins. The Controller will take some minutes to prepare the data, before it sends them anyway.
}
return null;
} catch (IllegalArgumentException iae) {
logger.error("Could not retrieve the assignments, as the provided Controller's url was malformed! " + iae.getMessage());
// We do not need to send a "ShutdownReport" to the Controller, since this error will appear upon the Worker's initialization and the Controller will not have any information about this Worker's existence.
UrlsWorkerApplication.gentleAppShutdown();
}
//logger.debug(assignmentRequest.toString()); // DEBUG!
return assignmentRequest;
}
public static boolean shouldNotRequestMore = false;
public void handleAssignments()
{
AssignmentsRequest assignmentsRequest = requestAssignments();
if ( assignmentsRequest == null )
return;
Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter();
List<Assignment> assignments = assignmentsRequest.getAssignments();
if ( assignments == null ) {
if ( assignmentRequestCounter == -1 )
logger.warn("The Controller could not retrieve and assignments from the database. It will increase the attempts-number and retry in the next request.");
else
logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter);
return; // The Worker will just request the assignments again, immediately.
}
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter);
return;
}
logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " assignments.");
Instant startTime = Instant.now();
// Make sure there are no multiple occurrences of urls with the same domains are present, next to each other, inside the list.
// If the same domains appear too close in the list, then this means we have large waiting-times between url-connections, due to "politeness-delays" to avoid server-overloading.
assignments = getAssignmentsSpacedOutByDomain(assignments, assignmentsSize, false);
// Iterate over the assignments and add each assignment in its own list depending on the DATASOURCE in order to decide which plugin to use later.
for ( Assignment assignment : assignments ) {
try {
assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment);
} catch (NullPointerException npe) {
logger.warn("An NPE was thrown when splitting the assignments based on the datasource-types. The problematic assignment was: " + assignment); // Do not use "assignment.toString()", it may cause an NPE.
}
}
//countDatasourcesAndRecords(assignmentsSize); // Only for DEBUG! Keep it commented in normal run.
// TODO - Decide which assignments should run with what plugin (depending on their datasource).
// First run -in parallel- the assignments which require some specific plugin.
// Then, after the above plugins are finished, run the remaining assignments in the generic plugin (which handles parallelism itself).
// TODO - If we have more than one plugin running at the same time, then make the "AssignmentsHandler.urlReports"-list thread-safe.
// For now, let's just run all assignments in the generic plugin.
try {
publicationsRetrieverPlugin.processAssignments(assignmentRequestCounter, assignmentsForPlugins.values());
} catch (Exception e) {
logger.error("Exception when processing the assignments_" + assignmentRequestCounter, e);
return;
} // In this case, no assignments were processed.
PublicationsRetriever.calculateAndPrintElapsedTime(startTime, Instant.now(), "The processing of assignments_" + assignmentRequestCounter + " (containing " + assignmentsSize + " assignments) finished after: ");
if ( askForTest ) {
logger.debug("UrlReports:"); // DEBUG!
for ( UrlReport urlReport : urlReports )
logger.debug(urlReport.toString());
} // Avoid posting the results in "askForTestUrls"-mode. We don't want for test-results to be written into the database by the controller.
else
postWorkerReport(assignmentRequestCounter);
// The "postWorkerReport()" above, may fail, but the numbers below still stand, as they are affected by the results themselves, rather than the "posting" of them to the Controller.
numHandledAssignmentsBatches ++; // This is used later to stop this app, if a user-defined upper limit is set and reached.
// Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program.
// This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine.
long idUrlPairsHandled = (numHandledAssignmentsBatches * maxAssignmentsLimitPerBatch);
if ( idUrlPairsHandled >= ((timesClearingDuplicateUrlsData +1) * idUrlsToHandleBeforeClearingDuplicateUrlsData) ) {
UrlUtils.duplicateUrls.clear();
timesClearingDuplicateUrlsData ++;
}
if ( idUrlPairsHandled >= ((timesClearingDomainAndPathTrackingData +1) * idUrlsToHandleBeforeClearingDomainAndPathTrackingData) ) {
GenericUtils.clearTrackingData(); // This includes the "blocking data", we may say "if this condition is true, do not bother checking the just-blocking condition".
timesClearingDomainAndPathTrackingData ++;
timesClearingDomainAndPathBlockingData ++; // Increment this also, as we avoid the following check in this case, but the counter has to be increased nevertheless.
} else if ( idUrlPairsHandled >= ((timesClearingDomainAndPathBlockingData +1) * idUrlsToHandleBeforeClearingDomainAndPathBlockingData) ) {
GenericUtils.clearBlockingData();
timesClearingDomainAndPathBlockingData ++;
}
if ( GeneralController.shouldShutdownWorker
|| (numHandledAssignmentsBatches == maxAssignmentsBatchesToHandleBeforeShutdown) )
{
logger.info("The worker will shutdown, after the full-texts are delivered to the Controller, as " + (GeneralController.shouldShutdownWorker
? "it received a \"shutdownWorker\" request!"
: "the maximum assignments-batches (" + maxAssignmentsBatchesToHandleBeforeShutdown + ") to be handled was reached!"));
// Here, just specify that we do not want to request for more assignments. A scheduling job will check if the fulltexts were delivered to the Controller and then shutdown the Worker.
shouldNotRequestMore = true;
}
// Note: Cannot call this method, here, retrospectively, as if it runs 100s of times, the memory-stack may break..
// The scheduler will handle calling it repetitively, in case the Worker is available for work..
}
public static final Set<Long> handledAssignmentsCounters = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
/**
* Post the worker report and wait for the Controller to request the publication-files.
* Once the Controller finishes with uploading the files to the S3-ObjectStore, it returns an "HTTP-200-OK" response to the Worker.
* Afterwards, the Worker, even in case of an error, deletes the full-texts and the ".tar" and ".tar.zstd" files.
* */
public boolean postWorkerReport(Long assignmentRequestCounter)
{
String postUrl = this.controllerBaseUrl + "urls/addWorkerReport";
logger.info("Going to post the WorkerReport of assignments_" + assignmentRequestCounter + " to the controller-server: " + postUrl);
WorkerReport workerReport = new WorkerReport(this.workerId, assignmentRequestCounter, urlReports);
// Create the report file. It may be useful later, in case something goes wrong when sending the report to the Controller or the Controller cannot process it.
// The report-file is deleted, along with the full-texts) when the Controller posts that the processing of this report was successful.
writeToFile(this.workerReportsDirPath + this.workerId + "_assignments_" + assignmentRequestCounter + "_report.json", workerReport.getJsonReport(), false);
// The worker sends this "WorkerReport" to the Controller, which after some checks, it adds a job to a background thread and responds to the Worker with HTTP-200-OK.
try {
ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, workerReport, String.class);
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode == HttpStatus.OK.value() ) {
logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, was successful.");
handledAssignmentsCounters.add(assignmentRequestCounter);
return true;
} else { // This does not include HTTP-5XX errors. For them an "HttpServerErrorException" is thrown.
logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller! Error-code was: " + responseCode);
return false;
}
} catch (HttpServerErrorException hsee) {
logger.error("The Controller failed to handle the WorkerReport of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage());
return false;
} catch (Exception e) {
logger.error("Error when submitting the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller: ", e);
return false;
} finally {
urlReports.clear(); // Reset, without de-allocating.
assignmentsForPlugins.clear();
// The full-text files will be deleted after being transferred to the Controller.
}
// Note: It is possible that one or more full-texts-batches, are not sent to the Controller, or that the Controller failed to process them.
// In that case, the related "attempt"-records will keep their "success" state, but the related "payload" records will not be inserted into the database.
// When all the id-urls are processed at least one time, the Service will start reprocessing all the "couldRetry" records without a related "payload"-record.
}
public static List<Assignment> getAssignmentsSpacedOutByDomain(List<Assignment> assignments, int assignmentsSize, boolean shouldPrintDifference)
{
List<Assignment> spacedOutAssignments = new ArrayList<>(assignmentsSize);
// Check the order of urls' domain in the list. Same domain-urls should be far away from each other, to improve parallelism. (this should happen after the plugin-categorization)
HashMultimap<String, Assignment> domainsWithAssignments = HashMultimap.create(assignmentsSize/3, 3);
StringBuilder sb = null;
if ( shouldPrintDifference )
sb = new StringBuilder(assignmentsSize * 20);
for ( Assignment assignment : assignments ) {
if ( assignment != null ) {
String url = assignment.getOriginalUrl();
if ( url != null ) {
String domain = UrlUtils.getDomainStr(url, null);
if ( domain != null ) {
domain = UrlUtils.getTopThreeLevelDomain(domain); // This does not return null, only the param itself, in case of an error.
domainsWithAssignments.put(domain, assignment); // Each "domain" will have multiple assignments.
if ( sb != null )
sb.append(domain).append("\n"); // DEBUG!
}
}
}
}
if ( sb != null ) {
logger.debug("Before change:\n" + sb); // DEBUG!
sb.setLength(0); // Reset it without re-sizing it.
}
List<String> domains = new ArrayList<>(domainsWithAssignments.keySet());
int domainsSize = domains.size();
Integer domainsCounter = -1;
for ( int i = 0; i < assignmentsSize; ++i )
{
HashMap<Object, Integer> result = getFirstAvailableObjectForSpacedOutDomains(domains, domainsCounter, domainsWithAssignments, domainsSize, sb);
if ( result == null ) { // Check whether the recursive method was left without data.
logger.warn("the recursive method was asked to do more, using less data!");
break;
}
Assignment nextAssignment = (Assignment) result.keySet().toArray()[0];
domainsCounter = result.get(nextAssignment);
spacedOutAssignments.add(nextAssignment);
}
if ( sb != null )
logger.debug("After change:\n" + sb);
return spacedOutAssignments;
}
/**
* This method uses recursion to go through the "domainsWithAssignments" multimap and get the nextAssignment.
* The recursion terminates when there is no more data for any domain.
* This method may return null, in case it is called more time than the number of assignments all the domains hold inside "domainsWithAssignments".
* */
public static HashMap<Object, Integer> getFirstAvailableObjectForSpacedOutDomains(List<String> domainsList, Integer domainsCounter, HashMultimap<String, ?> domainsWithAssignments, int domainsSize, StringBuilder sb)
{
// Normally, this method does not need a recursion-break-safety, as the initial-caller method should call this method exactly N times, where N is the number of all the values of "domainsWithAssignments".
// Although, for extra-safety and re-usability, let's have this check here.
if ( domainsWithAssignments.keySet().isEmpty() )
return null; // Break recursion when the domains run-out.
if ( domainsCounter < (domainsSize -1) )
domainsCounter ++;
else
domainsCounter = 0; // Start over.
String currentDomain = domainsList.get(domainsCounter);
Set<?> assignmentsOfCurrentDomain = domainsWithAssignments.get(currentDomain);
if ( assignmentsOfCurrentDomain.isEmpty() ) // This domain is out of assignments, check the next available one.
return getFirstAvailableObjectForSpacedOutDomains(domainsList, domainsCounter, domainsWithAssignments, domainsSize, sb);
Object nextAssignment = assignmentsOfCurrentDomain.toArray()[0];
HashMap<Object, Integer> result = new HashMap<>();
result.put(nextAssignment, domainsCounter);
domainsWithAssignments.remove(currentDomain, nextAssignment);
if ( sb != null )
sb.append(currentDomain).append("\n"); // DEBUG!
return result;
}
private static final Lock fileWriteLock = new ReentrantLock(true);
public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads)
{
if ( shouldLockThreads )
fileWriteLock.lock();
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FilesCompressor.bufferSize) )
{
bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter.
} catch (Exception e) {
String errorMsg = "Failed to create or acquire the file \"" + fileFullPath + "\"!";
logger.error(errorMsg, e);
return errorMsg;
} finally {
if ( shouldLockThreads )
fileWriteLock.unlock();
}
return null;
}
public static void countDatasourcesAndRecords(int assignmentsSize)
{
Set<String> datasources = assignmentsForPlugins.keySet();
int numDatasources = datasources.size();
logger.debug("Num of datasources: " + numDatasources);
for ( String datasource : datasources ) {
logger.debug("Num of records for datasource \"" + datasource + "\" is: " + assignmentsForPlugins.get(datasource).size() );
}
logger.debug("Average num of records per datasource: " + (assignmentsSize / numDatasources));
}
}