package eu.openaire.urls_worker.components; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import eu.openaire.publications_retriever.PublicationsRetriever; import eu.openaire.publications_retriever.util.url.GenericUtils; import eu.openaire.publications_retriever.util.url.UrlUtils; import eu.openaire.urls_worker.UrlsWorkerApplication; import eu.openaire.urls_worker.components.plugins.PublicationsRetrieverPlugin; import eu.openaire.urls_worker.controllers.GeneralController; import eu.openaire.urls_worker.models.Assignment; import eu.openaire.urls_worker.models.UrlReport; import eu.openaire.urls_worker.payloads.requests.AssignmentsRequest; import eu.openaire.urls_worker.payloads.responces.WorkerReport; import eu.openaire.urls_worker.util.FilesCompressor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; import java.io.BufferedWriter; import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; import java.time.Instant; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Component public class AssignmentsHandler { private static final Logger logger = LoggerFactory.getLogger(AssignmentsHandler.class); @Autowired PublicationsRetrieverPlugin publicationsRetrieverPlugin; private final String workerId; private final String controllerBaseUrl; private final int maxAssignmentsLimitPerBatch; private final int maxAssignmentsBatchesToHandleBeforeShutdown; public static List urlReports = null; private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments. public static Multimap assignmentsForPlugins = null; private static final boolean askForTest = false; // Enable this only for testing. private static String requestUrl; public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(Duration.ofMinutes(2)).setReadTimeout(Duration.ofHours(1)).build(); public static boolean hadConnectionErrorOnRequest = false; public static long numHandledAssignmentsBatches = 0; // No need to be synchronized. public static final long idUrlsToHandleBeforeClearingDomainAndPathBlockingData = 300_000; public static long timesClearingDomainAndPathBlockingData = 0; public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 600_000; public static long timesClearingDomainAndPathTrackingData = 0; public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 200_000; public static long timesClearingDuplicateUrlsData = 0; public String workerReportsDirPath; public AssignmentsHandler(@Value("${info.workerId}") String workerId, @Value("${info.maxAssignmentsLimitPerBatch}") int maxAssignmentsLimitPerBatch, @Value("${info.maxAssignmentsBatchesToHandleBeforeShutdown}") int maxAssignmentsBatchesToHandleBeforeShutdown, @Value("${info.controllerBaseUrl}") String controllerBaseUrl, @Value("${workerReportsDirPath}") String workerReportsDirPath) { this.workerId = workerId; this.maxAssignmentsLimitPerBatch = maxAssignmentsLimitPerBatch; this.maxAssignmentsBatchesToHandleBeforeShutdown = maxAssignmentsBatchesToHandleBeforeShutdown; this.controllerBaseUrl = controllerBaseUrl; urlReports = new ArrayList<>(this.maxAssignmentsLimitPerBatch); int expectedAssignmentsPerDatasource = (this.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest); assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource); requestUrl = this.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + this.workerId + "&workerAssignmentsLimit=" + this.maxAssignmentsLimitPerBatch; if ( !workerReportsDirPath.endsWith("/") ) workerReportsDirPath += "/"; this.workerReportsDirPath = workerReportsDirPath; try { Files.createDirectories(Paths.get(this.workerReportsDirPath)); // No-op if it already exists. } catch (Exception e) { String errorMsg = "Could not create the \"workerReportsDirPath\": " + this.workerReportsDirPath; logger.error(errorMsg, e); throw new RuntimeException(errorMsg); } } public AssignmentsRequest requestAssignments() { logger.info("Going to request up to " + this.maxAssignmentsLimitPerBatch + " assignments from the Controller: " + requestUrl); AssignmentsRequest assignmentRequest = null; try { // Here, the HTTP-request is executed. assignmentRequest = restTemplate.getForObject(requestUrl, AssignmentsRequest.class); } catch (RestClientException rce) { logger.error("Could not retrieve the assignments!\n" + rce.getMessage()); // It shows the response body (from Spring v.2.5.6 onwards). hadConnectionErrorOnRequest = true; return null; } catch (IllegalArgumentException iae) { logger.error("Could not retrieve the assignments, as the provided Controller's url was malformed!\n" + iae.getMessage()); // We do not need to send a "ShutdownReport" to the Controller, since this error will appear upon the Worker's initialization and the Controller will not have any information about this Worker's existence. UrlsWorkerApplication.gentleAppShutdown(); } //logger.debug(assignmentRequest.toString()); // DEBUG! return assignmentRequest; } public static boolean shouldNotRequestMore = false; public void handleAssignments() { AssignmentsRequest assignmentsRequest = requestAssignments(); if ( assignmentsRequest == null ) return; Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter(); List assignments = assignmentsRequest.getAssignments(); if ( assignments == null ) { if ( assignmentRequestCounter == -1 ) logger.warn("The Controller could not retrieve and assignments from the database. It will increase the attempts-number and retry in the next request."); else logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter); return; // The Worker will just request the assignments again, immediately. } int assignmentsSize = assignments.size(); if ( assignmentsSize == 0 ) { logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter); return; } logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " assignments."); Instant startTime = Instant.now(); // Make sure there are no multiple occurrences of urls with the same domains are present, next to each other, inside the list. // If the same domains appear too close in the list, then this means we have large waiting-times between url-connections, due to "politeness-delays" to avoid server-overloading. assignments = getAssignmentsSpacedOutByDomain(assignments, assignmentsSize, false); // Iterate over the assignments and add each assignment in its own list depending on the DATASOURCE in order to decide which plugin to use later. for ( Assignment assignment : assignments ) { try { assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment); } catch (NullPointerException npe) { logger.warn("An NPE was thrown when splitting the assignments based on the datasource-types. The problematic assignment was: " + assignment); // Do not use "assignment.toString()", it may cause an NPE. } } //countDatasourcesAndRecords(assignmentsSize); // Only for DEBUG! Keep it commented in normal run. // TODO - Decide which assignments should run with what plugin (depending on their datasource). // First run -in parallel- the assignments which require some specific plugin. // Then, after the above plugins are finished, run the remaining assignments in the generic plugin (which handles parallelism itself). // TODO - If we have more than one plugin running at the same time, then make the "AssignmentsHandler.urlReports"-list thread-safe. // For now, let's just run all assignments in the generic plugin. try { publicationsRetrieverPlugin.processAssignments(assignmentRequestCounter, assignmentsForPlugins.values()); } catch (Exception e) { logger.error("Exception when processing the assignments_" + assignmentRequestCounter, e); return; } // In this case, no assignments were processed. PublicationsRetriever.calculateAndPrintElapsedTime(startTime, Instant.now(), "The processing of assignments_" + assignmentRequestCounter + " finished after: "); if ( askForTest ) { logger.debug("UrlReports:"); // DEBUG! for ( UrlReport urlReport : urlReports ) logger.debug(urlReport.toString()); } // Avoid posting the results in "askForTestUrls"-mode. We don't want for test-results to be written into the database by the controller. else postWorkerReport(assignmentRequestCounter); // The "postWorkerReport()" above, may fail, but the numbers below still stand, as they are affected by the results themselves, rather than the "posting" of them to the Controller. numHandledAssignmentsBatches ++; // This is used later to stop this app, if a user-defined upper limit is set and reached. // Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program. // This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine. long idUrlPairsHandled = (numHandledAssignmentsBatches * maxAssignmentsLimitPerBatch); if ( idUrlPairsHandled >= ((timesClearingDuplicateUrlsData +1) * idUrlsToHandleBeforeClearingDuplicateUrlsData) ) { UrlUtils.duplicateUrls.clear(); timesClearingDuplicateUrlsData ++; } if ( idUrlPairsHandled >= ((timesClearingDomainAndPathTrackingData +1) * idUrlsToHandleBeforeClearingDomainAndPathTrackingData) ) { GenericUtils.clearTrackingData(); // This includes the "blocking data", we may say "if this condition is true, do not bother checking the just-blocking condition". timesClearingDomainAndPathTrackingData ++; timesClearingDomainAndPathBlockingData ++; // Increment this also, as we avoid the following check in this case, but the counter has to be increased nevertheless. } else if ( idUrlPairsHandled >= ((timesClearingDomainAndPathBlockingData +1) * idUrlsToHandleBeforeClearingDomainAndPathBlockingData) ) { GenericUtils.clearBlockingData(); timesClearingDomainAndPathBlockingData ++; } if ( GeneralController.shouldShutdownWorker || (numHandledAssignmentsBatches == maxAssignmentsBatchesToHandleBeforeShutdown) ) { logger.info("The worker will shutdown, after the full-texts are delivered to the Controller, as " + (GeneralController.shouldShutdownWorker ? "it received a \"shutdownWorker\" request!" : "the maximum assignments-batches (" + maxAssignmentsBatchesToHandleBeforeShutdown + ") to be handled was reached!")); // Here, just specify that we do not want to request for more assignments. A scheduling job will check if the fulltexts were delivered to the Controller and then shutdown the Worker. shouldNotRequestMore = true; } // Note: Cannot call this method, here, retrospectively, as if it runs 100s of times, the memory-stack may break.. // The scheduler will handle calling it repetitively, in case the Worker is available for work.. } public static HashSet assignmentsNumsHandled = new HashSet<>(); /** * Post the worker report and wait for the Controller to request the publication-files. * Once the Controller finishes with uploading the files to the S3-ObjectStore, it returns an "HTTP-200-OK" response to the Worker. * Afterwards, the Worker, even in case of an error, deletes the full-texts and the ".tar" and ".tar.zstd" files. * */ public boolean postWorkerReport(Long assignmentRequestCounter) { String postUrl = this.controllerBaseUrl + "urls/addWorkerReport"; logger.info("Going to post the WorkerReport of assignments_" + assignmentRequestCounter + " to the controller-server: " + postUrl); WorkerReport workerReport = new WorkerReport(this.workerId, assignmentRequestCounter, urlReports); // Create the report file. It may be useful later, in case something goes wrong when sending the report to the Controller or the Controller cannot process it. // The report-file is deleted, along with the full-texts) when the Controller posts that the processing of this report was successful. writeToFile(this.workerReportsDirPath + this.workerId + "_assignments_" + assignmentRequestCounter + "_report.json", workerReport.getJsonReport(), false); // The worker sends this "WorkerReport" to the Controller, which after some checks, it adds a job to a background thread and responds to the Worker with HTTP-200-OK. try { ResponseEntity responseEntity = restTemplate.postForEntity(postUrl, workerReport, String.class); int responseCode = responseEntity.getStatusCodeValue(); if ( responseCode == HttpStatus.OK.value() ) { logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, was successful."); assignmentsNumsHandled.add(assignmentRequestCounter); return true; } else { // This does not include HTTP-5XX errors. For them an "HttpServerErrorException" is thrown. logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller! Error-code was: " + responseCode); return false; } } catch (HttpServerErrorException hsee) { logger.error("The Controller failed to handle the WorkerReport of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage()); return false; } catch (Exception e) { logger.error("Error when submitting the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller: ", e); return false; } finally { urlReports.clear(); // Reset, without de-allocating. assignmentsForPlugins.clear(); // The full-text files will be deleted after being transferred to the Controller. } // Note: It is possible that one or more full-texts-batches, are not sent to the Controller, or that the Controller failed to process them. // In that case, the related "attempt"-records will keep their "success" state, but the related "payload" records will not be inserted into the database. // When all the id-urls are processed at least one time, the Service will start reprocessing all the "couldRetry" records without a related "payload"-record. } public static List getAssignmentsSpacedOutByDomain(List assignments, int assignmentsSize, boolean shouldPrintDifference) { List spacedOutAssignments = new ArrayList<>(assignmentsSize); // Check the order of urls' domain in the list. Same domain-urls should be far away from each other, to improve parallelism. (this should happen after the plugin-categorization) HashMultimap domainsWithAssignments = HashMultimap.create(assignmentsSize/3, 3); StringBuilder sb = null; if ( shouldPrintDifference ) sb = new StringBuilder(assignmentsSize * 20); for ( Assignment assignment : assignments ) { if ( assignment != null ) { String url = assignment.getOriginalUrl(); if ( url != null ) { String domain = UrlUtils.getDomainStr(url, null); if ( domain != null ) { domain = UrlUtils.getTopThreeLevelDomain(domain); // This does not return null, only the param itself, in case of an error. domainsWithAssignments.put(domain, assignment); // Each "domain" will have multiple assignments. if ( sb != null ) sb.append(domain).append("\n"); // DEBUG! } } } } if ( sb != null ) { logger.debug("Before change:\n" + sb); // DEBUG! sb.setLength(0); // Reset it without re-sizing it. } List domains = new ArrayList<>(domainsWithAssignments.keySet()); int domainsSize = domains.size(); Integer domainsCounter = -1; for ( int i = 0; i < assignmentsSize; ++i ) { HashMap result = getFirstAvailableObjectForSpacedOutDomains(domains, domainsCounter, domainsWithAssignments, domainsSize, sb); if ( result == null ) { // Check whether the recursive method was left without data. logger.warn("the recursive method was asked to do more, using less data!"); break; } Assignment nextAssignment = (Assignment) result.keySet().toArray()[0]; domainsCounter = result.get(nextAssignment); spacedOutAssignments.add(nextAssignment); } if ( sb != null ) logger.debug("After change:\n" + sb); return spacedOutAssignments; } /** * This method uses recursion to go through the "domainsWithAssignments" multimap and get the nextAssignment. * The recursion terminates when there is no more data for any domain. * This method may return null, in case it is called more time than the number of assignments all the domains hold inside "domainsWithAssignments". * */ public static HashMap getFirstAvailableObjectForSpacedOutDomains(List domainsList, Integer domainsCounter, HashMultimap domainsWithAssignments, int domainsSize, StringBuilder sb) { // Normally, this method does not need a recursion-break-safety, as the initial-caller method should call this method exactly N times, where N is the number of all the values of "domainsWithAssignments". // Although, for extra-safety and re-usability, let's have this check here. if ( domainsWithAssignments.keySet().isEmpty() ) return null; // Break recursion when the domains run-out. if ( domainsCounter < (domainsSize -1) ) domainsCounter ++; else domainsCounter = 0; // Start over. String currentDomain = domainsList.get(domainsCounter); Set assignmentsOfCurrentDomain = domainsWithAssignments.get(currentDomain); if ( assignmentsOfCurrentDomain.isEmpty() ) // This domain is out of assignments, check the next available one. return getFirstAvailableObjectForSpacedOutDomains(domainsList, domainsCounter, domainsWithAssignments, domainsSize, sb); Object nextAssignment = assignmentsOfCurrentDomain.toArray()[0]; HashMap result = new HashMap<>(); result.put(nextAssignment, domainsCounter); domainsWithAssignments.remove(currentDomain, nextAssignment); if ( sb != null ) sb.append(currentDomain).append("\n"); // DEBUG! return result; } private static final Lock fileWriteLock = new ReentrantLock(true); public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads) { if ( shouldLockThreads ) fileWriteLock.lock(); try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FilesCompressor.bufferSize) ) { bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter. } catch (Exception e) { String errorMsg = "Failed to create or acquire the file \"" + fileFullPath + "\"!"; logger.error(errorMsg, e); return errorMsg; } finally { if ( shouldLockThreads ) fileWriteLock.unlock(); } return null; } public static void countDatasourcesAndRecords(int assignmentsSize) { Set datasources = assignmentsForPlugins.keySet(); int numDatasources = datasources.size(); logger.debug("Num of datasources: " + numDatasources); for ( String datasource : datasources ) { logger.debug("Num of records for datasource \"" + datasource + "\" is: " + assignmentsForPlugins.get(datasource).size() ); } logger.debug("Average num of records per datasource: " + (assignmentsSize / numDatasources)); } }