398 lines
22 KiB
Java
398 lines
22 KiB
Java
package eu.openaire.urls_worker.components;
|
|
|
|
import com.google.common.collect.HashMultimap;
|
|
import com.google.common.collect.Multimap;
|
|
import eu.openaire.publications_retriever.PublicationsRetriever;
|
|
import eu.openaire.publications_retriever.util.url.GenericUtils;
|
|
import eu.openaire.publications_retriever.util.url.UrlUtils;
|
|
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
|
import eu.openaire.urls_worker.components.plugins.PublicationsRetrieverPlugin;
|
|
import eu.openaire.urls_worker.controllers.GeneralController;
|
|
import eu.openaire.urls_worker.models.Assignment;
|
|
import eu.openaire.urls_worker.models.UrlReport;
|
|
import eu.openaire.urls_worker.payloads.requests.AssignmentsRequest;
|
|
import eu.openaire.urls_worker.payloads.responces.WorkerReport;
|
|
import eu.openaire.urls_worker.util.FilesCompressor;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.boot.web.client.RestTemplateBuilder;
|
|
import org.springframework.http.HttpStatus;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.web.client.HttpServerErrorException;
|
|
import org.springframework.web.client.RestClientException;
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
|
import java.io.BufferedWriter;
|
|
import java.nio.file.Files;
|
|
import java.nio.file.Paths;
|
|
import java.time.Duration;
|
|
import java.time.Instant;
|
|
import java.util.*;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
@Component
|
|
public class AssignmentsHandler {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(AssignmentsHandler.class);
|
|
|
|
@Autowired
|
|
PublicationsRetrieverPlugin publicationsRetrieverPlugin;
|
|
|
|
private final String workerId;
|
|
private final String controllerBaseUrl;
|
|
private final int maxAssignmentsLimitPerBatch;
|
|
private final int maxAssignmentsBatchesToHandleBeforeShutdown;
|
|
|
|
public static List<UrlReport> urlReports = null;
|
|
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
|
|
public static Multimap<String, Assignment> assignmentsForPlugins = null;
|
|
private static final boolean askForTest = false; // Enable this only for testing.
|
|
private static String requestUrl;
|
|
|
|
public static final RestTemplate restTemplate = new RestTemplateBuilder().setConnectTimeout(Duration.ofMinutes(2)).setReadTimeout(Duration.ofHours(1)).build();
|
|
|
|
public static boolean hadConnectionErrorOnRequest = false;
|
|
|
|
public static long numHandledAssignmentsBatches = 0; // No need to be synchronized.
|
|
|
|
public static final long idUrlsToHandleBeforeClearingDomainAndPathBlockingData = 300_000;
|
|
public static long timesClearingDomainAndPathBlockingData = 0;
|
|
|
|
public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 600_000;
|
|
public static long timesClearingDomainAndPathTrackingData = 0;
|
|
|
|
public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 200_000;
|
|
public static long timesClearingDuplicateUrlsData = 0;
|
|
|
|
public String workerReportsDirPath;
|
|
|
|
|
|
public AssignmentsHandler(@Value("${info.workerId}") String workerId, @Value("${info.maxAssignmentsLimitPerBatch}") int maxAssignmentsLimitPerBatch,
|
|
@Value("${info.maxAssignmentsBatchesToHandleBeforeShutdown}") int maxAssignmentsBatchesToHandleBeforeShutdown,
|
|
@Value("${info.controllerBaseUrl}") String controllerBaseUrl,
|
|
@Value("${workerReportsDirPath}") String workerReportsDirPath)
|
|
{
|
|
this.workerId = workerId;
|
|
this.maxAssignmentsLimitPerBatch = maxAssignmentsLimitPerBatch;
|
|
this.maxAssignmentsBatchesToHandleBeforeShutdown = maxAssignmentsBatchesToHandleBeforeShutdown;
|
|
this.controllerBaseUrl = controllerBaseUrl;
|
|
urlReports = new ArrayList<>(this.maxAssignmentsLimitPerBatch);
|
|
int expectedAssignmentsPerDatasource = (this.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest);
|
|
assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
|
|
requestUrl = this.controllerBaseUrl + (askForTest ? "test/" : "") + "urls?workerId=" + this.workerId + "&workerAssignmentsLimit=" + this.maxAssignmentsLimitPerBatch;
|
|
|
|
if ( !workerReportsDirPath.endsWith("/") )
|
|
workerReportsDirPath += "/";
|
|
|
|
this.workerReportsDirPath = workerReportsDirPath;
|
|
try {
|
|
Files.createDirectories(Paths.get(this.workerReportsDirPath)); // No-op if it already exists.
|
|
} catch (Exception e) {
|
|
String errorMsg = "Could not create the \"workerReportsDirPath\": " + this.workerReportsDirPath;
|
|
logger.error(errorMsg, e);
|
|
throw new RuntimeException(errorMsg);
|
|
}
|
|
}
|
|
|
|
|
|
public AssignmentsRequest requestAssignments()
|
|
{
|
|
logger.info("Going to request up to " + this.maxAssignmentsLimitPerBatch + " assignments from the Controller: " + requestUrl);
|
|
AssignmentsRequest assignmentRequest = null;
|
|
try { // Here, the HTTP-request is executed.
|
|
assignmentRequest = restTemplate.getForObject(requestUrl, AssignmentsRequest.class);
|
|
} catch (RestClientException rce) {
|
|
logger.error("Could not retrieve the assignments!\n" + rce.getMessage()); // It shows the response body (from Spring v.2.5.6 onwards).
|
|
hadConnectionErrorOnRequest = true;
|
|
return null;
|
|
} catch (IllegalArgumentException iae) {
|
|
logger.error("Could not retrieve the assignments, as the provided Controller's url was malformed!\n" + iae.getMessage());
|
|
// We do not need to send a "ShutdownReport" to the Controller, since this error will appear upon the Worker's initialization and the Controller will not have any information about this Worker's existence.
|
|
UrlsWorkerApplication.gentleAppShutdown();
|
|
}
|
|
//logger.debug(assignmentRequest.toString()); // DEBUG!
|
|
return assignmentRequest;
|
|
}
|
|
|
|
|
|
public static boolean shouldNotRequestMore = false;
|
|
|
|
|
|
public void handleAssignments()
|
|
{
|
|
AssignmentsRequest assignmentsRequest = requestAssignments();
|
|
if ( assignmentsRequest == null )
|
|
return;
|
|
|
|
Long assignmentRequestCounter = assignmentsRequest.getAssignmentsCounter();
|
|
List<Assignment> assignments = assignmentsRequest.getAssignments();
|
|
if ( assignments == null ) {
|
|
if ( assignmentRequestCounter == -1 )
|
|
logger.warn("The Controller could not retrieve and assignments from the database. It will increase the attempts-number and retry in the next request.");
|
|
else
|
|
logger.warn("The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter);
|
|
return; // The Worker will just request the assignments again, immediately.
|
|
}
|
|
|
|
int assignmentsSize = assignments.size();
|
|
if ( assignmentsSize == 0 ) {
|
|
logger.warn("The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter);
|
|
return;
|
|
}
|
|
|
|
logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " assignments.");
|
|
|
|
Instant startTime = Instant.now();
|
|
|
|
// Make sure there are no multiple occurrences of urls with the same domains are present, next to each other, inside the list.
|
|
// If the same domains appear too close in the list, then this means we have large waiting-times between url-connections, due to "politeness-delays" to avoid server-overloading.
|
|
|
|
assignments = getAssignmentsSpacedOutByDomain(assignments, assignmentsSize, false);
|
|
|
|
// Iterate over the assignments and add each assignment in its own list depending on the DATASOURCE in order to decide which plugin to use later.
|
|
for ( Assignment assignment : assignments ) {
|
|
try {
|
|
assignmentsForPlugins.put(assignment.getDatasource().getId(), assignment);
|
|
} catch (NullPointerException npe) {
|
|
logger.warn("An NPE was thrown when splitting the assignments based on the datasource-types. The problematic assignment was: " + assignment); // Do not use "assignment.toString()", it may cause an NPE.
|
|
}
|
|
}
|
|
|
|
//countDatasourcesAndRecords(assignmentsSize); // Only for DEBUG! Keep it commented in normal run.
|
|
|
|
// TODO - Decide which assignments should run with what plugin (depending on their datasource).
|
|
// First run -in parallel- the assignments which require some specific plugin.
|
|
// Then, after the above plugins are finished, run the remaining assignments in the generic plugin (which handles parallelism itself).
|
|
// TODO - If we have more than one plugin running at the same time, then make the "AssignmentsHandler.urlReports"-list thread-safe.
|
|
|
|
// For now, let's just run all assignments in the generic plugin.
|
|
try {
|
|
publicationsRetrieverPlugin.processAssignments(assignmentRequestCounter, assignmentsForPlugins.values());
|
|
} catch (Exception e) {
|
|
logger.error("Exception when processing the assignments_" + assignmentRequestCounter, e);
|
|
return;
|
|
} // In this case, no assignments were processed.
|
|
|
|
PublicationsRetriever.calculateAndPrintElapsedTime(startTime, Instant.now(), "The processing of assignments_" + assignmentRequestCounter + " (containing " + assignmentsSize + " assignments) finished after: ");
|
|
|
|
if ( askForTest ) {
|
|
logger.debug("UrlReports:"); // DEBUG!
|
|
for ( UrlReport urlReport : urlReports )
|
|
logger.debug(urlReport.toString());
|
|
} // Avoid posting the results in "askForTestUrls"-mode. We don't want for test-results to be written into the database by the controller.
|
|
else
|
|
postWorkerReport(assignmentRequestCounter);
|
|
|
|
// The "postWorkerReport()" above, may fail, but the numbers below still stand, as they are affected by the results themselves, rather than the "posting" of them to the Controller.
|
|
|
|
numHandledAssignmentsBatches ++; // This is used later to stop this app, if a user-defined upper limit is set and reached.
|
|
|
|
// Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program.
|
|
// This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine.
|
|
long idUrlPairsHandled = (numHandledAssignmentsBatches * maxAssignmentsLimitPerBatch);
|
|
|
|
if ( idUrlPairsHandled >= ((timesClearingDuplicateUrlsData +1) * idUrlsToHandleBeforeClearingDuplicateUrlsData) ) {
|
|
UrlUtils.duplicateUrls.clear();
|
|
timesClearingDuplicateUrlsData ++;
|
|
}
|
|
|
|
if ( idUrlPairsHandled >= ((timesClearingDomainAndPathTrackingData +1) * idUrlsToHandleBeforeClearingDomainAndPathTrackingData) ) {
|
|
GenericUtils.clearTrackingData(); // This includes the "blocking data", we may say "if this condition is true, do not bother checking the just-blocking condition".
|
|
timesClearingDomainAndPathTrackingData ++;
|
|
timesClearingDomainAndPathBlockingData ++; // Increment this also, as we avoid the following check in this case, but the counter has to be increased nevertheless.
|
|
} else if ( idUrlPairsHandled >= ((timesClearingDomainAndPathBlockingData +1) * idUrlsToHandleBeforeClearingDomainAndPathBlockingData) ) {
|
|
GenericUtils.clearBlockingData();
|
|
timesClearingDomainAndPathBlockingData ++;
|
|
}
|
|
|
|
if ( GeneralController.shouldShutdownWorker
|
|
|| (numHandledAssignmentsBatches == maxAssignmentsBatchesToHandleBeforeShutdown) )
|
|
{
|
|
logger.info("The worker will shutdown, after the full-texts are delivered to the Controller, as " + (GeneralController.shouldShutdownWorker
|
|
? "it received a \"shutdownWorker\" request!"
|
|
: "the maximum assignments-batches (" + maxAssignmentsBatchesToHandleBeforeShutdown + ") to be handled was reached!"));
|
|
|
|
// Here, just specify that we do not want to request for more assignments. A scheduling job will check if the fulltexts were delivered to the Controller and then shutdown the Worker.
|
|
shouldNotRequestMore = true;
|
|
}
|
|
|
|
// Note: Cannot call this method, here, retrospectively, as if it runs 100s of times, the memory-stack may break..
|
|
// The scheduler will handle calling it repetitively, in case the Worker is available for work..
|
|
}
|
|
|
|
|
|
public static HashSet<Long> assignmentsNumsHandled = new HashSet<>();
|
|
|
|
|
|
/**
|
|
* Post the worker report and wait for the Controller to request the publication-files.
|
|
* Once the Controller finishes with uploading the files to the S3-ObjectStore, it returns an "HTTP-200-OK" response to the Worker.
|
|
* Afterwards, the Worker, even in case of an error, deletes the full-texts and the ".tar" and ".tar.zstd" files.
|
|
* */
|
|
public boolean postWorkerReport(Long assignmentRequestCounter)
|
|
{
|
|
String postUrl = this.controllerBaseUrl + "urls/addWorkerReport";
|
|
logger.info("Going to post the WorkerReport of assignments_" + assignmentRequestCounter + " to the controller-server: " + postUrl);
|
|
WorkerReport workerReport = new WorkerReport(this.workerId, assignmentRequestCounter, urlReports);
|
|
|
|
// Create the report file. It may be useful later, in case something goes wrong when sending the report to the Controller or the Controller cannot process it.
|
|
// The report-file is deleted, along with the full-texts) when the Controller posts that the processing of this report was successful.
|
|
writeToFile(this.workerReportsDirPath + this.workerId + "_assignments_" + assignmentRequestCounter + "_report.json", workerReport.getJsonReport(), false);
|
|
|
|
// The worker sends this "WorkerReport" to the Controller, which after some checks, it adds a job to a background thread and responds to the Worker with HTTP-200-OK.
|
|
try {
|
|
ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, workerReport, String.class);
|
|
int responseCode = responseEntity.getStatusCodeValue();
|
|
if ( responseCode == HttpStatus.OK.value() ) {
|
|
logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, was successful.");
|
|
assignmentsNumsHandled.add(assignmentRequestCounter);
|
|
return true;
|
|
} else { // This does not include HTTP-5XX errors. For them an "HttpServerErrorException" is thrown.
|
|
logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller! Error-code was: " + responseCode);
|
|
return false;
|
|
}
|
|
} catch (HttpServerErrorException hsee) {
|
|
logger.error("The Controller failed to handle the WorkerReport of assignments_" + assignmentRequestCounter + ": " + hsee.getMessage());
|
|
return false;
|
|
} catch (Exception e) {
|
|
logger.error("Error when submitting the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller: ", e);
|
|
return false;
|
|
} finally {
|
|
urlReports.clear(); // Reset, without de-allocating.
|
|
assignmentsForPlugins.clear();
|
|
// The full-text files will be deleted after being transferred to the Controller.
|
|
}
|
|
|
|
// Note: It is possible that one or more full-texts-batches, are not sent to the Controller, or that the Controller failed to process them.
|
|
// In that case, the related "attempt"-records will keep their "success" state, but the related "payload" records will not be inserted into the database.
|
|
// When all the id-urls are processed at least one time, the Service will start reprocessing all the "couldRetry" records without a related "payload"-record.
|
|
}
|
|
|
|
|
|
public static List<Assignment> getAssignmentsSpacedOutByDomain(List<Assignment> assignments, int assignmentsSize, boolean shouldPrintDifference)
|
|
{
|
|
List<Assignment> spacedOutAssignments = new ArrayList<>(assignmentsSize);
|
|
|
|
// Check the order of urls' domain in the list. Same domain-urls should be far away from each other, to improve parallelism. (this should happen after the plugin-categorization)
|
|
HashMultimap<String, Assignment> domainsWithAssignments = HashMultimap.create(assignmentsSize/3, 3);
|
|
|
|
StringBuilder sb = null;
|
|
if ( shouldPrintDifference )
|
|
sb = new StringBuilder(assignmentsSize * 20);
|
|
|
|
for ( Assignment assignment : assignments ) {
|
|
if ( assignment != null ) {
|
|
String url = assignment.getOriginalUrl();
|
|
if ( url != null ) {
|
|
String domain = UrlUtils.getDomainStr(url, null);
|
|
if ( domain != null ) {
|
|
domain = UrlUtils.getTopThreeLevelDomain(domain); // This does not return null, only the param itself, in case of an error.
|
|
domainsWithAssignments.put(domain, assignment); // Each "domain" will have multiple assignments.
|
|
if ( sb != null )
|
|
sb.append(domain).append("\n"); // DEBUG!
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if ( sb != null ) {
|
|
logger.debug("Before change:\n" + sb); // DEBUG!
|
|
sb.setLength(0); // Reset it without re-sizing it.
|
|
}
|
|
|
|
List<String> domains = new ArrayList<>(domainsWithAssignments.keySet());
|
|
int domainsSize = domains.size();
|
|
Integer domainsCounter = -1;
|
|
|
|
for ( int i = 0; i < assignmentsSize; ++i )
|
|
{
|
|
HashMap<Object, Integer> result = getFirstAvailableObjectForSpacedOutDomains(domains, domainsCounter, domainsWithAssignments, domainsSize, sb);
|
|
if ( result == null ) { // Check whether the recursive method was left without data.
|
|
logger.warn("the recursive method was asked to do more, using less data!");
|
|
break;
|
|
}
|
|
Assignment nextAssignment = (Assignment) result.keySet().toArray()[0];
|
|
domainsCounter = result.get(nextAssignment);
|
|
spacedOutAssignments.add(nextAssignment);
|
|
}
|
|
|
|
if ( sb != null )
|
|
logger.debug("After change:\n" + sb);
|
|
|
|
return spacedOutAssignments;
|
|
}
|
|
|
|
|
|
/**
|
|
* This method uses recursion to go through the "domainsWithAssignments" multimap and get the nextAssignment.
|
|
* The recursion terminates when there is no more data for any domain.
|
|
* This method may return null, in case it is called more time than the number of assignments all the domains hold inside "domainsWithAssignments".
|
|
* */
|
|
public static HashMap<Object, Integer> getFirstAvailableObjectForSpacedOutDomains(List<String> domainsList, Integer domainsCounter, HashMultimap<String, ?> domainsWithAssignments, int domainsSize, StringBuilder sb)
|
|
{
|
|
// Normally, this method does not need a recursion-break-safety, as the initial-caller method should call this method exactly N times, where N is the number of all the values of "domainsWithAssignments".
|
|
// Although, for extra-safety and re-usability, let's have this check here.
|
|
if ( domainsWithAssignments.keySet().isEmpty() )
|
|
return null; // Break recursion when the domains run-out.
|
|
|
|
if ( domainsCounter < (domainsSize -1) )
|
|
domainsCounter ++;
|
|
else
|
|
domainsCounter = 0; // Start over.
|
|
|
|
String currentDomain = domainsList.get(domainsCounter);
|
|
Set<?> assignmentsOfCurrentDomain = domainsWithAssignments.get(currentDomain);
|
|
if ( assignmentsOfCurrentDomain.isEmpty() ) // This domain is out of assignments, check the next available one.
|
|
return getFirstAvailableObjectForSpacedOutDomains(domainsList, domainsCounter, domainsWithAssignments, domainsSize, sb);
|
|
|
|
Object nextAssignment = assignmentsOfCurrentDomain.toArray()[0];
|
|
HashMap<Object, Integer> result = new HashMap<>();
|
|
result.put(nextAssignment, domainsCounter);
|
|
domainsWithAssignments.remove(currentDomain, nextAssignment);
|
|
if ( sb != null )
|
|
sb.append(currentDomain).append("\n"); // DEBUG!
|
|
|
|
return result;
|
|
}
|
|
|
|
|
|
private static final Lock fileWriteLock = new ReentrantLock(true);
|
|
|
|
public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads)
|
|
{
|
|
if ( shouldLockThreads )
|
|
fileWriteLock.lock();
|
|
|
|
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FilesCompressor.bufferSize) )
|
|
{
|
|
bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter.
|
|
} catch (Exception e) {
|
|
String errorMsg = "Failed to create or acquire the file \"" + fileFullPath + "\"!";
|
|
logger.error(errorMsg, e);
|
|
return errorMsg;
|
|
} finally {
|
|
if ( shouldLockThreads )
|
|
fileWriteLock.unlock();
|
|
}
|
|
return null;
|
|
}
|
|
|
|
|
|
public static void countDatasourcesAndRecords(int assignmentsSize)
|
|
{
|
|
Set<String> datasources = assignmentsForPlugins.keySet();
|
|
int numDatasources = datasources.size();
|
|
logger.debug("Num of datasources: " + numDatasources);
|
|
for ( String datasource : datasources ) {
|
|
logger.debug("Num of records for datasource \"" + datasource + "\" is: " + assignmentsForPlugins.get(datasource).size() );
|
|
}
|
|
logger.debug("Average num of records per datasource: " + (assignmentsSize / numDatasources));
|
|
}
|
|
|
|
}
|