2023-01-25 17:33:49 +01:00
package eu.openaire.urls_worker.components ;
2021-05-20 02:28:48 +02:00
import com.google.common.collect.HashMultimap ;
import com.google.common.collect.Multimap ;
2023-03-02 16:34:44 +01:00
import eu.openaire.publications_retriever.PublicationsRetriever ;
Bug fixes:
- Fix a bug, where, in case it took too long to get the assignments from the Controller (possible when there are too many workers requesting at the same time or if the database is responding slowly), the Worker's scheduler would request for new assignments, in the meantime.
- Fix a bug, where, if the "maxAssignmentsBatchesToHandleBeforeRestart" was set, the Worker's scheduler could request another batch, right before the Worker was about to shut down.
- Fix a bug, where the condition of when to clear the over-sized data-structures was based on the "assignmentRequestCounter" send by the Controller (which is increased on each request by any worker and not for each individual one), and not on the "numHandledAssignmentsBatches" kept by each individual worker. This would result in much earlier cleanup, relative to the number of the Workers.
2022-02-19 16:09:02 +01:00
import eu.openaire.publications_retriever.util.url.GenericUtils ;
import eu.openaire.publications_retriever.util.url.UrlUtils ;
2021-09-21 15:21:39 +02:00
import eu.openaire.urls_worker.UrlsWorkerApplication ;
2023-01-25 17:33:49 +01:00
import eu.openaire.urls_worker.components.plugins.PublicationsRetrieverPlugin ;
2022-06-22 17:53:27 +02:00
import eu.openaire.urls_worker.controllers.GeneralController ;
2021-05-20 02:28:48 +02:00
import eu.openaire.urls_worker.models.Assignment ;
2021-06-22 04:58:07 +02:00
import eu.openaire.urls_worker.models.UrlReport ;
2021-11-27 01:37:33 +01:00
import eu.openaire.urls_worker.payloads.requests.AssignmentsRequest ;
2021-06-22 04:58:07 +02:00
import eu.openaire.urls_worker.payloads.responces.WorkerReport ;
2023-05-23 21:19:41 +02:00
import eu.openaire.urls_worker.util.FilesCompressor ;
2021-05-20 02:28:48 +02:00
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
2023-05-23 21:19:41 +02:00
import org.springframework.beans.factory.annotation.Autowired ;
2023-01-25 17:33:49 +01:00
import org.springframework.beans.factory.annotation.Value ;
2021-05-20 02:28:48 +02:00
import org.springframework.boot.web.client.RestTemplateBuilder ;
2021-06-22 04:58:07 +02:00
import org.springframework.http.HttpStatus ;
import org.springframework.http.ResponseEntity ;
2023-01-25 17:33:49 +01:00
import org.springframework.stereotype.Component ;
2023-01-16 14:22:32 +01:00
import org.springframework.web.client.HttpServerErrorException ;
2021-09-23 15:23:49 +02:00
import org.springframework.web.client.RestClientException ;
import org.springframework.web.client.RestTemplate ;
2021-05-20 02:28:48 +02:00
2023-05-23 21:19:41 +02:00
import java.io.BufferedWriter ;
import java.nio.file.Files ;
import java.nio.file.Paths ;
2021-09-23 15:23:49 +02:00
import java.time.Duration ;
2023-03-02 16:34:44 +01:00
import java.time.Instant ;
2023-02-24 22:23:37 +01:00
import java.util.* ;
2023-08-31 16:52:52 +02:00
import java.util.concurrent.ConcurrentHashMap ;
2023-05-23 21:19:41 +02:00
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
2021-05-20 02:28:48 +02:00
2023-01-25 17:33:49 +01:00
@Component
2021-11-27 01:37:33 +01:00
public class AssignmentsHandler {
2021-05-20 02:28:48 +02:00
2021-11-27 01:37:33 +01:00
private static final Logger logger = LoggerFactory . getLogger ( AssignmentsHandler . class ) ;
2021-05-20 02:28:48 +02:00
2023-05-23 21:19:41 +02:00
@Autowired
PublicationsRetrieverPlugin publicationsRetrieverPlugin ;
2023-01-25 17:33:49 +01:00
private final String workerId ;
private final String controllerBaseUrl ;
private final int maxAssignmentsLimitPerBatch ;
private final int maxAssignmentsBatchesToHandleBeforeShutdown ;
2021-12-06 23:52:40 +01:00
public static List < UrlReport > urlReports = null ;
2021-10-30 16:14:18 +02:00
private static final int expectedDatasourcesPerRequest = 1400 ; // Per 10_000 assignments.
2021-12-06 23:52:40 +01:00
public static Multimap < String , Assignment > assignmentsForPlugins = null ;
2021-09-21 15:21:39 +02:00
private static final boolean askForTest = false ; // Enable this only for testing.
2022-02-21 11:48:21 +01:00
private static String requestUrl ;
2021-08-05 14:09:28 +02:00
2023-10-27 17:39:10 +02:00
public static final RestTemplate restTemplate = new RestTemplateBuilder ( ) . setConnectTimeout ( Duration . ofMinutes ( 2 ) ) . setReadTimeout ( Duration . ofHours ( 2 ) ) . build ( ) ;
2023-04-29 16:24:16 +02:00
2022-02-21 11:48:21 +01:00
public static boolean hadConnectionErrorOnRequest = false ;
2021-06-22 04:58:07 +02:00
2021-12-23 23:12:34 +01:00
public static long numHandledAssignmentsBatches = 0 ; // No need to be synchronized.
2022-09-28 18:10:01 +02:00
public static final long idUrlsToHandleBeforeClearingDomainAndPathBlockingData = 300_000 ;
public static long timesClearingDomainAndPathBlockingData = 0 ;
public static final long idUrlsToHandleBeforeClearingDomainAndPathTrackingData = 600_000 ;
public static long timesClearingDomainAndPathTrackingData = 0 ;
2022-07-04 17:42:05 +02:00
public static final long idUrlsToHandleBeforeClearingDuplicateUrlsData = 200_000 ;
2022-09-28 18:10:01 +02:00
public static long timesClearingDuplicateUrlsData = 0 ;
Bug fixes:
- Fix a bug, where, in case it took too long to get the assignments from the Controller (possible when there are too many workers requesting at the same time or if the database is responding slowly), the Worker's scheduler would request for new assignments, in the meantime.
- Fix a bug, where, if the "maxAssignmentsBatchesToHandleBeforeRestart" was set, the Worker's scheduler could request another batch, right before the Worker was about to shut down.
- Fix a bug, where the condition of when to clear the over-sized data-structures was based on the "assignmentRequestCounter" send by the Controller (which is increased on each request by any worker and not for each individual one), and not on the "numHandledAssignmentsBatches" kept by each individual worker. This would result in much earlier cleanup, relative to the number of the Workers.
2022-02-19 16:09:02 +01:00
2023-05-23 21:19:41 +02:00
public String workerReportsDirPath ;
2021-12-06 23:52:40 +01:00
2023-01-25 17:33:49 +01:00
public AssignmentsHandler ( @Value ( " ${info.workerId} " ) String workerId , @Value ( " ${info.maxAssignmentsLimitPerBatch} " ) int maxAssignmentsLimitPerBatch ,
@Value ( " ${info.maxAssignmentsBatchesToHandleBeforeShutdown} " ) int maxAssignmentsBatchesToHandleBeforeShutdown ,
2023-05-23 21:19:41 +02:00
@Value ( " ${info.controllerBaseUrl} " ) String controllerBaseUrl ,
@Value ( " ${workerReportsDirPath} " ) String workerReportsDirPath )
2021-12-06 23:52:40 +01:00
{
2023-01-25 17:33:49 +01:00
this . workerId = workerId ;
this . maxAssignmentsLimitPerBatch = maxAssignmentsLimitPerBatch ;
this . maxAssignmentsBatchesToHandleBeforeShutdown = maxAssignmentsBatchesToHandleBeforeShutdown ;
this . controllerBaseUrl = controllerBaseUrl ;
urlReports = new ArrayList < > ( this . maxAssignmentsLimitPerBatch ) ;
int expectedAssignmentsPerDatasource = ( this . maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest ) ;
2021-12-06 23:52:40 +01:00
assignmentsForPlugins = HashMultimap . create ( expectedDatasourcesPerRequest , expectedAssignmentsPerDatasource ) ;
2023-01-25 17:33:49 +01:00
requestUrl = this . controllerBaseUrl + ( askForTest ? " test/ " : " " ) + " urls?workerId= " + this . workerId + " &workerAssignmentsLimit= " + this . maxAssignmentsLimitPerBatch ;
2023-05-16 18:08:59 +02:00
2023-05-23 21:19:41 +02:00
if ( ! workerReportsDirPath . endsWith ( " / " ) )
workerReportsDirPath + = " / " ;
this . workerReportsDirPath = workerReportsDirPath ;
try {
Files . createDirectories ( Paths . get ( this . workerReportsDirPath ) ) ; // No-op if it already exists.
} catch ( Exception e ) {
String errorMsg = " Could not create the \" workerReportsDirPath \" : " + this . workerReportsDirPath ;
logger . error ( errorMsg , e ) ;
throw new RuntimeException ( errorMsg ) ;
}
2021-12-06 23:52:40 +01:00
}
2021-12-17 07:24:09 +01:00
2023-01-25 17:33:49 +01:00
public AssignmentsRequest requestAssignments ( )
2021-06-10 13:29:20 +02:00
{
2023-07-27 16:46:17 +02:00
logger . info ( " Going to request up to " + this . maxAssignmentsLimitPerBatch + " assignments from the Controller: " + requestUrl ) ;
2021-11-27 01:37:33 +01:00
AssignmentsRequest assignmentRequest = null ;
2021-10-30 16:14:18 +02:00
try { // Here, the HTTP-request is executed.
2023-05-29 13:15:55 +02:00
assignmentRequest = restTemplate . getForObject ( requestUrl , AssignmentsRequest . class ) ;
2021-10-30 16:14:18 +02:00
} catch ( RestClientException rce ) {
2023-10-27 17:39:10 +02:00
logger . error ( " Could not retrieve the assignments! " + rce . getMessage ( ) ) ; // The exMsg also shows the response body of the response (from Spring v.2.5.6 onwards).
hadConnectionErrorOnRequest = true ;
2021-09-23 15:23:49 +02:00
return null ;
2023-02-21 14:22:49 +01:00
} catch ( IllegalArgumentException iae ) {
2023-10-27 16:36:54 +02:00
logger . error ( " Could not retrieve the assignments, as the provided Controller's url was malformed! " + iae . getMessage ( ) ) ;
2023-05-23 21:22:57 +02:00
// We do not need to send a "ShutdownReport" to the Controller, since this error will appear upon the Worker's initialization and the Controller will not have any information about this Worker's existence.
2023-02-21 14:22:49 +01:00
UrlsWorkerApplication . gentleAppShutdown ( ) ;
2021-05-20 02:28:48 +02:00
}
2021-06-09 04:45:07 +02:00
//logger.debug(assignmentRequest.toString()); // DEBUG!
2021-07-05 14:00:29 +02:00
return assignmentRequest ;
2021-05-20 02:28:48 +02:00
}
2023-05-23 21:19:41 +02:00
public static boolean shouldNotRequestMore = false ;
2023-01-25 17:33:49 +01:00
public void handleAssignments ( )
2021-06-10 13:29:20 +02:00
{
2021-11-27 01:37:33 +01:00
AssignmentsRequest assignmentsRequest = requestAssignments ( ) ;
2022-02-21 11:48:21 +01:00
if ( assignmentsRequest = = null )
2021-08-05 14:09:28 +02:00
return ;
2021-07-05 14:00:29 +02:00
2021-11-27 01:37:33 +01:00
Long assignmentRequestCounter = assignmentsRequest . getAssignmentsCounter ( ) ;
List < Assignment > assignments = assignmentsRequest . getAssignments ( ) ;
2021-07-05 14:00:29 +02:00
if ( assignments = = null ) {
2022-12-05 15:47:15 +01:00
if ( assignmentRequestCounter = = - 1 )
logger . warn ( " The Controller could not retrieve and assignments from the database. It will increase the attempts-number and retry in the next request. " ) ;
else
logger . warn ( " The assignments were found to be null for assignmentRequestCounter = " + assignmentRequestCounter ) ;
return ; // The Worker will just request the assignments again, immediately.
2021-05-20 02:28:48 +02:00
}
2021-06-11 12:44:33 +02:00
2021-10-14 02:03:47 +02:00
int assignmentsSize = assignments . size ( ) ;
if ( assignmentsSize = = 0 ) {
logger . warn ( " The assignmentsSize was < 0 > for assignmentRequestCounter = " + assignmentRequestCounter ) ;
return ;
}
2023-07-27 16:46:17 +02:00
logger . info ( " AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " assignments. " ) ;
2021-10-14 02:03:47 +02:00
2023-03-02 16:34:44 +01:00
Instant startTime = Instant . now ( ) ;
// Make sure there are no multiple occurrences of urls with the same domains are present, next to each other, inside the list.
2023-02-24 22:23:37 +01:00
// If the same domains appear too close in the list, then this means we have large waiting-times between url-connections, due to "politeness-delays" to avoid server-overloading.
assignments = getAssignmentsSpacedOutByDomain ( assignments , assignmentsSize , false ) ;
2023-07-27 16:46:17 +02:00
// Iterate over the assignments and add each assignment in its own list depending on the DATASOURCE in order to decide which plugin to use later.
2021-07-05 14:00:29 +02:00
for ( Assignment assignment : assignments ) {
2021-10-30 16:14:18 +02:00
try {
assignmentsForPlugins . put ( assignment . getDatasource ( ) . getId ( ) , assignment ) ;
} catch ( NullPointerException npe ) {
2023-05-22 20:25:22 +02:00
logger . warn ( " An NPE was thrown when splitting the assignments based on the datasource-types. The problematic assignment was: " + assignment ) ; // Do not use "assignment.toString()", it may cause an NPE.
2021-10-30 16:14:18 +02:00
}
2021-05-20 02:28:48 +02:00
}
2021-10-30 16:14:18 +02:00
//countDatasourcesAndRecords(assignmentsSize); // Only for DEBUG! Keep it commented in normal run.
2021-06-22 04:58:07 +02:00
2023-07-27 16:46:17 +02:00
// TODO - Decide which assignments should run with what plugin (depending on their datasource).
// First run -in parallel- the assignments which require some specific plugin.
// Then, after the above plugins are finished, run the remaining assignments in the generic plugin (which handles parallelism itself).
2023-03-02 16:34:44 +01:00
// TODO - If we have more than one plugin running at the same time, then make the "AssignmentsHandler.urlReports"-list thread-safe.
2021-05-20 02:28:48 +02:00
2023-07-27 16:46:17 +02:00
// For now, let's just run all assignments in the generic plugin.
2021-06-22 04:58:07 +02:00
try {
2023-05-23 21:19:41 +02:00
publicationsRetrieverPlugin . processAssignments ( assignmentRequestCounter , assignmentsForPlugins . values ( ) ) ;
2021-06-22 04:58:07 +02:00
} catch ( Exception e ) {
2021-12-03 03:09:40 +01:00
logger . error ( " Exception when processing the assignments_ " + assignmentRequestCounter , e ) ;
2023-05-31 14:25:36 +02:00
return ;
} // In this case, no assignments were processed.
2021-06-22 04:58:07 +02:00
2023-08-28 15:11:26 +02:00
PublicationsRetriever . calculateAndPrintElapsedTime ( startTime , Instant . now ( ) , " The processing of assignments_ " + assignmentRequestCounter + " (containing " + assignmentsSize + " assignments) finished after: " ) ;
2022-01-17 22:46:15 +01:00
2021-09-23 15:23:49 +02:00
if ( askForTest ) {
logger . debug ( " UrlReports: " ) ; // DEBUG!
for ( UrlReport urlReport : urlReports )
logger . debug ( urlReport . toString ( ) ) ;
} // Avoid posting the results in "askForTestUrls"-mode. We don't want for test-results to be written into the database by the controller.
else
postWorkerReport ( assignmentRequestCounter ) ;
2021-09-21 15:21:39 +02:00
2022-09-28 21:41:43 +02:00
// The "postWorkerReport()" above, may fail, but the numbers below still stand, as they are affected by the results themselves, rather than the "posting" of them to the Controller.
2022-06-22 17:53:27 +02:00
numHandledAssignmentsBatches + + ; // This is used later to stop this app, if a user-defined upper limit is set and reached.
2021-12-23 23:12:34 +01:00
Bug fixes:
- Fix a bug, where, in case it took too long to get the assignments from the Controller (possible when there are too many workers requesting at the same time or if the database is responding slowly), the Worker's scheduler would request for new assignments, in the meantime.
- Fix a bug, where, if the "maxAssignmentsBatchesToHandleBeforeRestart" was set, the Worker's scheduler could request another batch, right before the Worker was about to shut down.
- Fix a bug, where the condition of when to clear the over-sized data-structures was based on the "assignmentRequestCounter" send by the Controller (which is increased on each request by any worker and not for each individual one), and not on the "numHandledAssignmentsBatches" kept by each individual worker. This would result in much earlier cleanup, relative to the number of the Workers.
2022-02-19 16:09:02 +01:00
// Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program.
// This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine.
2023-07-06 12:22:09 +02:00
long idUrlPairsHandled = ( numHandledAssignmentsBatches * maxAssignmentsLimitPerBatch ) ;
2022-09-28 18:10:01 +02:00
if ( idUrlPairsHandled > = ( ( timesClearingDuplicateUrlsData + 1 ) * idUrlsToHandleBeforeClearingDuplicateUrlsData ) ) {
Bug fixes:
- Fix a bug, where, in case it took too long to get the assignments from the Controller (possible when there are too many workers requesting at the same time or if the database is responding slowly), the Worker's scheduler would request for new assignments, in the meantime.
- Fix a bug, where, if the "maxAssignmentsBatchesToHandleBeforeRestart" was set, the Worker's scheduler could request another batch, right before the Worker was about to shut down.
- Fix a bug, where the condition of when to clear the over-sized data-structures was based on the "assignmentRequestCounter" send by the Controller (which is increased on each request by any worker and not for each individual one), and not on the "numHandledAssignmentsBatches" kept by each individual worker. This would result in much earlier cleanup, relative to the number of the Workers.
2022-02-19 16:09:02 +01:00
UrlUtils . duplicateUrls . clear ( ) ;
2022-09-28 18:10:01 +02:00
timesClearingDuplicateUrlsData + + ;
}
Bug fixes:
- Fix a bug, where, in case it took too long to get the assignments from the Controller (possible when there are too many workers requesting at the same time or if the database is responding slowly), the Worker's scheduler would request for new assignments, in the meantime.
- Fix a bug, where, if the "maxAssignmentsBatchesToHandleBeforeRestart" was set, the Worker's scheduler could request another batch, right before the Worker was about to shut down.
- Fix a bug, where the condition of when to clear the over-sized data-structures was based on the "assignmentRequestCounter" send by the Controller (which is increased on each request by any worker and not for each individual one), and not on the "numHandledAssignmentsBatches" kept by each individual worker. This would result in much earlier cleanup, relative to the number of the Workers.
2022-02-19 16:09:02 +01:00
2022-09-28 18:10:01 +02:00
if ( idUrlPairsHandled > = ( ( timesClearingDomainAndPathTrackingData + 1 ) * idUrlsToHandleBeforeClearingDomainAndPathTrackingData ) ) {
2023-01-17 17:25:49 +01:00
GenericUtils . clearTrackingData ( ) ; // This includes the "blocking data", we may say "if this condition is true, do not bother checking the just-blocking condition".
2022-09-28 18:10:01 +02:00
timesClearingDomainAndPathTrackingData + + ;
timesClearingDomainAndPathBlockingData + + ; // Increment this also, as we avoid the following check in this case, but the counter has to be increased nevertheless.
} else if ( idUrlPairsHandled > = ( ( timesClearingDomainAndPathBlockingData + 1 ) * idUrlsToHandleBeforeClearingDomainAndPathBlockingData ) ) {
2023-01-18 15:55:59 +01:00
GenericUtils . clearBlockingData ( ) ;
2022-09-28 18:10:01 +02:00
timesClearingDomainAndPathBlockingData + + ;
}
2021-10-30 16:14:18 +02:00
2022-06-22 17:53:27 +02:00
if ( GeneralController . shouldShutdownWorker
2023-07-06 12:22:09 +02:00
| | ( numHandledAssignmentsBatches = = maxAssignmentsBatchesToHandleBeforeShutdown ) )
2021-12-31 02:51:58 +01:00
{
2023-05-23 21:19:41 +02:00
logger . info ( " The worker will shutdown, after the full-texts are delivered to the Controller, as " + ( GeneralController . shouldShutdownWorker
2022-06-22 17:53:27 +02:00
? " it received a \" shutdownWorker \" request! "
2023-07-06 12:22:09 +02:00
: " the maximum assignments-batches ( " + maxAssignmentsBatchesToHandleBeforeShutdown + " ) to be handled was reached! " ) ) ;
2023-01-17 17:25:49 +01:00
2023-05-23 21:19:41 +02:00
// Here, just specify that we do not want to request for more assignments. A scheduling job will check if the fulltexts were delivered to the Controller and then shutdown the Worker.
shouldNotRequestMore = true ;
2021-12-31 02:51:58 +01:00
}
Bug fixes:
- Fix a bug, where, in case it took too long to get the assignments from the Controller (possible when there are too many workers requesting at the same time or if the database is responding slowly), the Worker's scheduler would request for new assignments, in the meantime.
- Fix a bug, where, if the "maxAssignmentsBatchesToHandleBeforeRestart" was set, the Worker's scheduler could request another batch, right before the Worker was about to shut down.
- Fix a bug, where the condition of when to clear the over-sized data-structures was based on the "assignmentRequestCounter" send by the Controller (which is increased on each request by any worker and not for each individual one), and not on the "numHandledAssignmentsBatches" kept by each individual worker. This would result in much earlier cleanup, relative to the number of the Workers.
2022-02-19 16:09:02 +01:00
// Note: Cannot call this method, here, retrospectively, as if it runs 100s of times, the memory-stack may break..
2023-05-22 20:25:22 +02:00
// The scheduler will handle calling it repetitively, in case the Worker is available for work..
2021-06-22 04:58:07 +02:00
}
2021-05-20 02:28:48 +02:00
2023-05-22 20:25:22 +02:00
2023-08-31 16:52:52 +02:00
public static final Set < Long > handledAssignmentsCounters = Collections . newSetFromMap ( new ConcurrentHashMap < Long , Boolean > ( ) ) ;
2022-12-07 11:29:05 +01:00
2021-05-20 02:28:48 +02:00
2022-06-22 17:53:27 +02:00
/ * *
* Post the worker report and wait for the Controller to request the publication - files .
* Once the Controller finishes with uploading the files to the S3 - ObjectStore , it returns an " HTTP-200-OK " response to the Worker .
2023-03-07 15:25:10 +01:00
* Afterwards , the Worker , even in case of an error , deletes the full - texts and the " .tar " and " .tar.zstd " files .
2022-06-22 17:53:27 +02:00
* * /
2023-01-25 17:33:49 +01:00
public boolean postWorkerReport ( Long assignmentRequestCounter )
2021-06-22 04:58:07 +02:00
{
2023-01-25 17:33:49 +01:00
String postUrl = this . controllerBaseUrl + " urls/addWorkerReport " ;
2023-01-16 14:22:32 +01:00
logger . info ( " Going to post the WorkerReport of assignments_ " + assignmentRequestCounter + " to the controller-server: " + postUrl ) ;
2023-05-23 21:19:41 +02:00
WorkerReport workerReport = new WorkerReport ( this . workerId , assignmentRequestCounter , urlReports ) ;
2022-12-07 11:29:05 +01:00
2023-05-23 21:19:41 +02:00
// Create the report file. It may be useful later, in case something goes wrong when sending the report to the Controller or the Controller cannot process it.
// The report-file is deleted, along with the full-texts) when the Controller posts that the processing of this report was successful.
writeToFile ( this . workerReportsDirPath + this . workerId + " _assignments_ " + assignmentRequestCounter + " _report.json " , workerReport . getJsonReport ( ) , false ) ;
2022-12-07 11:29:05 +01:00
2023-05-23 21:19:41 +02:00
// The worker sends this "WorkerReport" to the Controller, which after some checks, it adds a job to a background thread and responds to the Worker with HTTP-200-OK.
try {
2023-05-29 13:15:55 +02:00
ResponseEntity < String > responseEntity = restTemplate . postForEntity ( postUrl , workerReport , String . class ) ;
2021-10-11 12:27:40 +02:00
int responseCode = responseEntity . getStatusCodeValue ( ) ;
2021-12-23 23:12:34 +01:00
if ( responseCode = = HttpStatus . OK . value ( ) ) {
2023-05-23 21:19:41 +02:00
logger . info ( " The submission of the WorkerReport of assignments_ " + assignmentRequestCounter + " to the Controller, was successful. " ) ;
2023-08-31 16:52:52 +02:00
handledAssignmentsCounters . add ( assignmentRequestCounter ) ;
2021-12-23 23:12:34 +01:00
return true ;
2023-01-16 14:22:32 +01:00
} else { // This does not include HTTP-5XX errors. For them an "HttpServerErrorException" is thrown.
logger . error ( " HTTP-Connection problem with the submission of the WorkerReport of assignments_ " + assignmentRequestCounter + " to the Controller! Error-code was: " + responseCode ) ;
2021-06-22 04:58:07 +02:00
return false ;
}
2023-01-16 14:22:32 +01:00
} catch ( HttpServerErrorException hsee ) {
logger . error ( " The Controller failed to handle the WorkerReport of assignments_ " + assignmentRequestCounter + " : " + hsee . getMessage ( ) ) ;
return false ;
2021-06-22 04:58:07 +02:00
} catch ( Exception e ) {
2023-01-16 14:22:32 +01:00
logger . error ( " Error when submitting the WorkerReport of assignments_ " + assignmentRequestCounter + " to the Controller: " , e ) ;
2021-06-22 04:58:07 +02:00
return false ;
2021-09-21 15:21:39 +02:00
} finally {
2021-10-30 16:14:18 +02:00
urlReports . clear ( ) ; // Reset, without de-allocating.
assignmentsForPlugins . clear ( ) ;
2023-05-23 21:19:41 +02:00
// The full-text files will be deleted after being transferred to the Controller.
2021-05-20 02:28:48 +02:00
}
2023-01-18 15:55:59 +01:00
// Note: It is possible that one or more full-texts-batches, are not sent to the Controller, or that the Controller failed to process them.
// In that case, the related "attempt"-records will keep their "success" state, but the related "payload" records will not be inserted into the database.
2023-05-22 20:25:22 +02:00
// When all the id-urls are processed at least one time, the Service will start reprocessing all the "couldRetry" records without a related "payload"-record.
2021-05-20 02:28:48 +02:00
}
2021-10-30 16:14:18 +02:00
2023-02-24 22:23:37 +01:00
public static List < Assignment > getAssignmentsSpacedOutByDomain ( List < Assignment > assignments , int assignmentsSize , boolean shouldPrintDifference )
{
List < Assignment > spacedOutAssignments = new ArrayList < > ( assignmentsSize ) ;
// Check the order of urls' domain in the list. Same domain-urls should be far away from each other, to improve parallelism. (this should happen after the plugin-categorization)
HashMultimap < String , Assignment > domainsWithAssignments = HashMultimap . create ( assignmentsSize / 3 , 3 ) ;
StringBuilder sb = null ;
if ( shouldPrintDifference )
sb = new StringBuilder ( assignmentsSize * 20 ) ;
for ( Assignment assignment : assignments ) {
if ( assignment ! = null ) {
String url = assignment . getOriginalUrl ( ) ;
if ( url ! = null ) {
String domain = UrlUtils . getDomainStr ( url , null ) ;
if ( domain ! = null ) {
domain = UrlUtils . getTopThreeLevelDomain ( domain ) ; // This does not return null, only the param itself, in case of an error.
domainsWithAssignments . put ( domain , assignment ) ; // Each "domain" will have multiple assignments.
if ( sb ! = null )
sb . append ( domain ) . append ( " \ n " ) ; // DEBUG!
}
}
}
}
if ( sb ! = null ) {
2023-07-06 12:22:09 +02:00
logger . debug ( " Before change: \ n " + sb ) ; // DEBUG!
2023-02-24 22:23:37 +01:00
sb . setLength ( 0 ) ; // Reset it without re-sizing it.
}
List < String > domains = new ArrayList < > ( domainsWithAssignments . keySet ( ) ) ;
int domainsSize = domains . size ( ) ;
Integer domainsCounter = - 1 ;
for ( int i = 0 ; i < assignmentsSize ; + + i )
{
HashMap < Object , Integer > result = getFirstAvailableObjectForSpacedOutDomains ( domains , domainsCounter , domainsWithAssignments , domainsSize , sb ) ;
if ( result = = null ) { // Check whether the recursive method was left without data.
logger . warn ( " the recursive method was asked to do more, using less data! " ) ;
break ;
}
Assignment nextAssignment = ( Assignment ) result . keySet ( ) . toArray ( ) [ 0 ] ;
domainsCounter = result . get ( nextAssignment ) ;
spacedOutAssignments . add ( nextAssignment ) ;
}
if ( sb ! = null )
2023-07-06 12:22:09 +02:00
logger . debug ( " After change: \ n " + sb ) ;
2023-02-24 22:23:37 +01:00
return spacedOutAssignments ;
}
/ * *
* This method uses recursion to go through the " domainsWithAssignments " multimap and get the nextAssignment .
* The recursion terminates when there is no more data for any domain .
* This method may return null , in case it is called more time than the number of assignments all the domains hold inside " domainsWithAssignments " .
* * /
public static HashMap < Object , Integer > getFirstAvailableObjectForSpacedOutDomains ( List < String > domainsList , Integer domainsCounter , HashMultimap < String , ? > domainsWithAssignments , int domainsSize , StringBuilder sb )
{
// Normally, this method does not need a recursion-break-safety, as the initial-caller method should call this method exactly N times, where N is the number of all the values of "domainsWithAssignments".
// Although, for extra-safety and re-usability, let's have this check here.
2023-07-06 12:22:09 +02:00
if ( domainsWithAssignments . keySet ( ) . isEmpty ( ) )
2023-02-24 22:23:37 +01:00
return null ; // Break recursion when the domains run-out.
if ( domainsCounter < ( domainsSize - 1 ) )
domainsCounter + + ;
else
domainsCounter = 0 ; // Start over.
String currentDomain = domainsList . get ( domainsCounter ) ;
Set < ? > assignmentsOfCurrentDomain = domainsWithAssignments . get ( currentDomain ) ;
2023-07-06 12:22:09 +02:00
if ( assignmentsOfCurrentDomain . isEmpty ( ) ) // This domain is out of assignments, check the next available one.
return getFirstAvailableObjectForSpacedOutDomains ( domainsList , domainsCounter , domainsWithAssignments , domainsSize , sb ) ;
Object nextAssignment = assignmentsOfCurrentDomain . toArray ( ) [ 0 ] ;
HashMap < Object , Integer > result = new HashMap < > ( ) ;
result . put ( nextAssignment , domainsCounter ) ;
domainsWithAssignments . remove ( currentDomain , nextAssignment ) ;
if ( sb ! = null )
sb . append ( currentDomain ) . append ( " \ n " ) ; // DEBUG!
2023-02-24 22:23:37 +01:00
return result ;
}
2023-07-06 12:22:09 +02:00
private static final Lock fileWriteLock = new ReentrantLock ( true ) ;
2023-05-23 21:19:41 +02:00
public String writeToFile ( String fileFullPath , String stringToWrite , boolean shouldLockThreads )
{
if ( shouldLockThreads )
fileWriteLock . lock ( ) ;
try ( BufferedWriter bufferedWriter = new BufferedWriter ( Files . newBufferedWriter ( Paths . get ( fileFullPath ) ) , FilesCompressor . bufferSize ) )
{
bufferedWriter . write ( stringToWrite ) ; // This will overwrite the file. If the new string is smaller, then it does not matter.
} catch ( Exception e ) {
String errorMsg = " Failed to create or acquire the file \" " + fileFullPath + " \" ! " ;
logger . error ( errorMsg , e ) ;
return errorMsg ;
} finally {
if ( shouldLockThreads )
fileWriteLock . unlock ( ) ;
}
return null ;
}
2021-10-30 16:14:18 +02:00
public static void countDatasourcesAndRecords ( int assignmentsSize )
{
Set < String > datasources = assignmentsForPlugins . keySet ( ) ;
int numDatasources = datasources . size ( ) ;
logger . debug ( " Num of datasources: " + numDatasources ) ;
for ( String datasource : datasources ) {
logger . debug ( " Num of records for datasource \" " + datasource + " \" is: " + assignmentsForPlugins . get ( datasource ) . size ( ) ) ;
}
logger . debug ( " Average num of records per datasource: " + ( assignmentsSize / numDatasources ) ) ;
}
2021-05-20 02:28:48 +02:00
}