
415 lines
23 KiB
Raw Normal View History

package eu.openaire.urls_worker.components;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import eu.openaire.publications_retriever.PublicationsRetriever;
import eu.openaire.publications_retriever.util.url.GenericUtils;
import eu.openaire.publications_retriever.util.url.UrlUtils;
import eu.openaire.urls_worker.UrlsWorkerApplication;
import eu.openaire.urls_worker.components.plugins.PublicationsRetrieverPlugin;
import eu.openaire.urls_worker.controllers.GeneralController;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.payloads.requests.AssignmentsRequest;
import eu.openaire.urls_worker.payloads.responces.WorkerReport;
import eu.openaire.urls_worker.util.FilesCompressor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.io.BufferedWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AssignmentsHandler {
private static final Logger logger = LoggerFactory.getLogger(AssignmentsHandler.class);
PublicationsRetrieverPlugin publicationsRetrieverPlugin;
private final String workerId;
private final String controllerBaseUrl;
private final int maxAssignmentsLimitPerBatch;
private final int maxAssignmentsBatchesToHandleBeforeShutdown;
public static List<UrlReport> urlReports = null;
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
public static Multimap<String, Assignment> assignmentsForPlugins = null;
private static final boolean askForTest = false; // Enable this only for testing.
private static String requestUrl;
private static final Duration requestConnectTimeoutDuration = Duration.ofMinutes(1); // 1 minute.
public static final RestTemplate restTemplateForRequest = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(Duration.ofMinutes(30)).build();
public static RestTemplate restTemplateForReport = null;
public static boolean hadConnectionErrorOnRequest = false;
public static long numHandledAssignmentsBatches = 0; // No need to be synchronized.
public static final long idUrlsToHandleBeforeClearingDomainAndPathBlockingData = 300_000;
public static long timesClearingDomainAndPathBlockingData = 0;
public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 600_000;
public static long timesClearingDomainAndPathTrackingData = 0;
public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 200_000;
public static long timesClearingDuplicateUrlsData = 0;
public String workerReportsDirPath;
public AssignmentsHandler(@Value("${info.workerId}") String workerId, @Value("${info.maxAssignmentsLimitPerBatch}") int maxAssignmentsLimitPerBatch,
@Value("${info.maxAssignmentsBatchesToHandleBeforeShutdown}") int maxAssignmentsBatchesToHandleBeforeShutdown,
@Value("${info.controllerBaseUrl}") String controllerBaseUrl,
@Value("${workerReportsDirPath}") String workerReportsDirPath)
this.workerId = workerId;
this.maxAssignmentsLimitPerBatch = maxAssignmentsLimitPerBatch;
this.maxAssignmentsBatchesToHandleBeforeShutdown = maxAssignmentsBatchesToHandleBeforeShutdown;
this.controllerBaseUrl = controllerBaseUrl;
urlReports = new ArrayList<>(this.maxAssignmentsLimitPerBatch);
int expectedAssignmentsPerDatasource = (this.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest);
assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
requestUrl = this.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + this.workerId + "&workerAssignmentsLimit=" + this.maxAssignmentsLimitPerBatch;
long durationInHours = (long) Math.ceil((double) this.maxAssignmentsLimitPerBatch / 1000); // For example, for 10_000 assignments we wait at most 10 hours.
logger.debug("Setting the max-connection duration for the \"post-worker-report\" to " + durationInHours + " hours.");
restTemplateForReport = new RestTemplateBuilder().setConnectTimeout(requestConnectTimeoutDuration).setReadTimeout(Duration.ofHours(durationInHours)).build();
// X hours. Time to wait for the data to get transferred over the network. Many workers may try to get assignments from the Worker, so each worker might have to wait some 10s of minutes for work.
// When giving the assignments, the Controller has to retrieve the data from the database, then prepare them in memory, insert them in the "assignment"-table and, finally, return them to the worker.
// When receiving the Worker-Report, the Controller has to check for existing fulltext files (by previous runs), request and get thousands of file from the Worker (in batches), upload them to S3, prepare and import the payload and the attempt records in the database and return to the Worker.
if ( !workerReportsDirPath.endsWith("/") )
workerReportsDirPath += "/";
this.workerReportsDirPath = workerReportsDirPath;
try {
Files.createDirectories(Paths.get(this.workerReportsDirPath)); // No-op if it already exists.
} catch (Exception e) {
String errorMsg = "Could not create the \"workerReportsDirPath\": " + this.workerReportsDirPath;
logger.error(errorMsg, e);
throw new RuntimeException(errorMsg);
public AssignmentsRequest requestAssignments()
logger.info("Going to request " + this.maxAssignmentsLimitPerBatch + " assignments from the Controller: " + requestUrl);
AssignmentsRequest assignmentRequest = null;
try { // Here, the HTTP-request is executed.
assignmentRequest = restTemplateForRequest.getForObject(requestUrl, AssignmentsRequest.class);
} catch (RestClientException rce) {
logger.error("Could not retrieve the assignments!\n" + rce.getMessage()); // It shows the response body (from Spring v.2.5.6 onwards).
hadConnectionErrorOnRequest = true;
return null;
} catch (IllegalArgumentException iae) {
logger.error("Could not retrieve the assignments, as the provided Controller's url was malformed!\n" + iae.getMessage());
// We do not need to send a "ShutdownReport" to the Controller, since this error will appear upon the Worker's initialization and the Controller will not have any information about this Worker's existence.
//logger.debug(assignmentRequest.toString()); // DEBUG!
return assignmentRequest;
public static boolean shouldNotRequestMore = false;
public void handleAssignments()
AssignmentsRequest assignmentsRequest = requestAssignments();
if ( assignmentsRequest == null )
Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter();
List<Assignment> assignments = assignmentsRequest.getAssignments();
if ( assignments == null ) {
if ( assignmentRequestCounter == -1 )
logger.warn("The Controller could not retrieve and assignments from the database. It will increase the attempts-number and retry in the next request.");
logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter);
return; // The Worker will just request the assignments again, immediately.
int assignmentsSize = assignments.size();
if ( assignmentsSize == 0 ) {
logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter);
logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " tasks.");
Instant startTime = Instant.now();
// Make sure there are no multiple occurrences of urls with the same domains are present, next to each other, inside the list.
// If the same domains appear too close in the list, then this means we have large waiting-times between url-connections, due to "politeness-delays" to avoid server-overloading.
assignments = getAssignmentsSpacedOutByDomain(assignments, assignmentsSize, false);
// Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later.
for ( Assignment assignment : assignments ) {
// Add each task in its own HashSet.
try {
assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment);
} catch (NullPointerException npe) {
logger.warn("An NPE was thrown when splitting the assignments based on the datasource-types. The problematic assignment was: " + assignment); // Do not use "assignment.toString()", it may cause an NPE.
//countDatasourcesAndRecords(assignmentsSize); // Only for DEBUG! Keep it commented in normal run.
// TODO - Decide which tasks run with what plugin (depending on their datasource).
// First run -in parallel- the tasks which require some specific plugin.
// Then, after the above plugins are finished, run the remaining tasks in the generic plugin (which handles parallelism itself).
// TODO - If we have more than one plugin running at the same time, then make the "AssignmentsHandler.urlReports"-list thread-safe.
// For now, let's just run all tasks in the generic plugin.
try {
publicationsRetrieverPlugin.processAssignments(assignmentRequestCounter, assignmentsForPlugins.values());
} catch (Exception e) {
logger.error("Exception when processing the assignments_" + assignmentRequestCounter, e);
} // In this case, we will either have an empty WorkerReport or a half-filled one. Either way, we want to report back to the Controller.
PublicationsRetriever.calculateAndPrintElapsedTime(startTime, Instant.now(), "The processing of assignments_" + assignmentRequestCounter + " finished after: ");
if ( askForTest ) {
logger.debug("UrlReports:"); // DEBUG!
for ( UrlReport urlReport : urlReports )
} // Avoid posting the results in "askForTestUrls"-mode. We don't want for test-results to be written into the database by the controller.
// The "postWorkerReport()" above, may fail, but the numbers below still stand, as they are affected by the results themselves, rather than the "posting" of them to the Controller.
numHandledAssignmentsBatches ++; // This is used later to stop this app, if a user-defined upper limit is set and reached.
// Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program.
// This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine.
long idUrlPairsHandled = (numHandledAssignmentsBatches * this.maxAssignmentsLimitPerBatch);
if ( idUrlPairsHandled >= ((timesClearingDuplicateUrlsData +1) * idUrlsToHandleBeforeClearingDuplicateUrlsData) ) {
timesClearingDuplicateUrlsData ++;
if ( idUrlPairsHandled >= ((timesClearingDomainAndPathTrackingData +1) * idUrlsToHandleBeforeClearingDomainAndPathTrackingData) ) {
GenericUtils.clearTrackingData(); // This includes the "blocking data", we may say "if this condition is true, do not bother checking the just-blocking condition".
timesClearingDomainAndPathTrackingData ++;
timesClearingDomainAndPathBlockingData ++; // Increment this also, as we avoid the following check in this case, but the counter has to be increased nevertheless.
} else if ( idUrlPairsHandled >= ((timesClearingDomainAndPathBlockingData +1) * idUrlsToHandleBeforeClearingDomainAndPathBlockingData) ) {
timesClearingDomainAndPathBlockingData ++;
if ( GeneralController.shouldShutdownWorker
|| (AssignmentsHandler.numHandledAssignmentsBatches == this.maxAssignmentsBatchesToHandleBeforeShutdown) )
logger.info("The worker will shutdown, after the full-texts are delivered to the Controller, as " + (GeneralController.shouldShutdownWorker
? "it received a \"shutdownWorker\" request!"
: "the maximum assignments-batches (" + this.maxAssignmentsBatchesToHandleBeforeShutdown + ") to be handled was reached!"));
// Here, just specify that we do not want to request for more assignments. A scheduling job will check if the fulltexts were delivered to the Controller and then shutdown the Worker.
shouldNotRequestMore = true;
// Note: Cannot call this method, here, retrospectively, as if it runs 100s of times, the memory-stack may break..
// The scheduler will handle calling it repetitively, in case the Worker is available for work..
public static HashSet<Long> assignmentsNumsHandled = new HashSet<>();
* Post the worker report and wait for the Controller to request the publication-files.
* Once the Controller finishes with uploading the files to the S3-ObjectStore, it returns an "HTTP-200-OK" response to the Worker.
* Afterwards, the Worker, even in case of an error, deletes the full-texts and the ".tar" and ".tar.zstd" files.
* */
public boolean postWorkerReport(Long assignmentRequestCounter)
String postUrl = this.controllerBaseUrl + "urls/addWorkerReport";
logger.info("Going to post the WorkerReport of assignments_" + assignmentRequestCounter + " to the controller-server: " + postUrl);
WorkerReport workerReport = new WorkerReport(this.workerId, assignmentRequestCounter, urlReports);
// Create the report file. It may be useful later, in case something goes wrong when sending the report to the Controller or the Controller cannot process it.
// The report-file is deleted, along with the full-texts) when the Controller posts that the processing of this report was successful.
writeToFile(this.workerReportsDirPath + this.workerId + "_assignments_" + assignmentRequestCounter + "_report.json", workerReport.getJsonReport(), false);
// The worker sends this "WorkerReport" to the Controller, which after some checks, it adds a job to a background thread and responds to the Worker with HTTP-200-OK.
try {
ResponseEntity<String> responseEntity = restTemplateForReport.postForEntity(postUrl, workerReport, String.class);
int responseCode = responseEntity.getStatusCodeValue();
if ( responseCode == HttpStatus.OK.value() ) {
logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, was successful.");
return true;
} else { // This does not include HTTP-5XX errors. For them an "HttpServerErrorException" is thrown.
logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller! Error-code was: " + responseCode);
return false;
} catch (HttpServerErrorException hsee) {
logger.error("The Controller failed to handle the WorkerReport of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage());
return false;
} catch (Exception e) {
logger.error("Error when submitting the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller: ", e);
return false;
} finally {
urlReports.clear(); // Reset, without de-allocating.
// The full-text files will be deleted after being transferred to the Controller.
// Note: It is possible that one or more full-texts-batches, are not sent to the Controller, or that the Controller failed to process them.
// In that case, the related "attempt"-records will keep their "success" state, but the related "payload" records will not be inserted into the database.
// When all the id-urls are processed at least one time, the Service will start reprocessing all the "couldRetry" records without a related "payload"-record.
public static List<Assignment> getAssignmentsSpacedOutByDomain(List<Assignment> assignments, int assignmentsSize, boolean shouldPrintDifference)
List<Assignment> spacedOutAssignments = new ArrayList<>(assignmentsSize);
// Check the order of urls' domain in the list. Same domain-urls should be far away from each other, to improve parallelism. (this should happen after the plugin-categorization)
HashMultimap<String, Assignment> domainsWithAssignments = HashMultimap.create(assignmentsSize/3, 3);
StringBuilder sb = null;
if ( shouldPrintDifference )
sb = new StringBuilder(assignmentsSize * 20);
for ( Assignment assignment : assignments ) {
if ( assignment != null ) {
String url = assignment.getOriginalUrl();
if ( url != null ) {
String domain = UrlUtils.getDomainStr(url, null);
if ( domain != null ) {
domain = UrlUtils.getTopThreeLevelDomain(domain); // This does not return null, only the param itself, in case of an error.
domainsWithAssignments.put(domain, assignment); // Each "domain" will have multiple assignments.
if ( sb != null )
sb.append(domain).append("\n"); // DEBUG!
if ( sb != null ) {
logger.debug("Before change:\n" + sb.toString()); // DEBUG!
sb.setLength(0); // Reset it without re-sizing it.
List<String> domains = new ArrayList<>(domainsWithAssignments.keySet());
int domainsSize = domains.size();
Integer domainsCounter = -1;
for ( int i = 0; i < assignmentsSize; ++i )
HashMap<Object, Integer> result = getFirstAvailableObjectForSpacedOutDomains(domains, domainsCounter, domainsWithAssignments, domainsSize, sb);
if ( result == null ) { // Check whether the recursive method was left without data.
logger.warn("the recursive method was asked to do more, using less data!");
Assignment nextAssignment = (Assignment) result.keySet().toArray()[0];
domainsCounter = result.get(nextAssignment);
if ( sb != null )
logger.debug("After change:\n" + sb.toString());
return spacedOutAssignments;
* This method uses recursion to go through the "domainsWithAssignments" multimap and get the nextAssignment.
* The recursion terminates when there is no more data for any domain.
* This method may return null, in case it is called more time than the number of assignments all the domains hold inside "domainsWithAssignments".
* */
public static HashMap<Object, Integer> getFirstAvailableObjectForSpacedOutDomains(List<String> domainsList, Integer domainsCounter, HashMultimap<String, ?> domainsWithAssignments, int domainsSize, StringBuilder sb)
HashMap<Object, Integer> result = new HashMap<>();
Object nextAssignment = null;
// Normally, this method does not need a recursion-break-safety, as the initial-caller method should call this method exactly N times, where N is the number of all the values of "domainsWithAssignments".
// Although, for extra-safety and re-usability, let's have this check here.
Set<String> domainsSet = domainsWithAssignments.keySet();
if ( domainsSet.isEmpty() )
return null; // Break recursion when the domains run-out.
if ( domainsCounter < (domainsSize -1) )
domainsCounter ++;
domainsCounter = 0; // Start over.
String currentDomain = domainsList.get(domainsCounter);
Set<?> assignmentsOfCurrentDomain = domainsWithAssignments.get(currentDomain);
if ( assignmentsOfCurrentDomain.isEmpty() ) {
// This domain is out of assignments, check the next available one.
result = getFirstAvailableObjectForSpacedOutDomains(domainsList, domainsCounter, domainsWithAssignments, domainsSize, sb);
} else {
nextAssignment = assignmentsOfCurrentDomain.toArray()[0];
result.put(nextAssignment, domainsCounter);
domainsWithAssignments.remove(currentDomain, nextAssignment);
if ( sb != null )
sb.append(currentDomain).append("\n"); // DEBUG!
return result;
Lock fileWriteLock = new ReentrantLock(true);
public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads)
if ( shouldLockThreads )
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FilesCompressor.bufferSize) )
bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter.
} catch (Exception e) {
String errorMsg = "Failed to create or acquire the file \"" + fileFullPath + "\"!";
logger.error(errorMsg, e);
return errorMsg;
} finally {
if ( shouldLockThreads )
return null;
public static void countDatasourcesAndRecords(int assignmentsSize)
Set<String> datasources = assignmentsForPlugins.keySet();
int numDatasources = datasources.size();
logger.debug("Num of datasources: " + numDatasources);
for ( String datasource : datasources ) {
logger.debug("Num of records for datasource \"" + datasource + "\" is: " + assignmentsForPlugins.get(datasource).size() );
logger.debug("Average num of records per datasource: " + (assignmentsSize / numDatasources));