diff --git a/build.gradle b/build.gradle index 0c1a315..a8d9f23 100644 --- a/build.gradle +++ b/build.gradle @@ -39,6 +39,8 @@ dependencies { exclude group: 'io.minio' // This is not used in the Worker, since it's the Controller which uploads the full-texts to S3. It also includes an older "commons-compress" version which causes problems. } + implementation group: 'com.google.guava', name: 'guava', version: '31.1-jre' + implementation 'org.apache.commons:commons-compress:1.22' implementation 'com.github.luben:zstd-jni:1.5.4-1' diff --git a/src/main/java/eu/openaire/urls_worker/components/AssignmentsHandler.java b/src/main/java/eu/openaire/urls_worker/components/AssignmentsHandler.java index c0781c8..cc8327b 100644 --- a/src/main/java/eu/openaire/urls_worker/components/AssignmentsHandler.java +++ b/src/main/java/eu/openaire/urls_worker/components/AssignmentsHandler.java @@ -24,10 +24,7 @@ import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; import java.time.Duration; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; @Component @@ -126,6 +123,11 @@ public class AssignmentsHandler { logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " tasks."); + // Make sure there are no multiple occurrences of urls with the same domains next to each other on the list. + // If the same domains appear too close in the list, then this means we have large waiting-times between url-connections, due to "politeness-delays" to avoid server-overloading. + + assignments = getAssignmentsSpacedOutByDomain(assignments, assignmentsSize, false); + // Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later. for ( Assignment assignment : assignments ) { @@ -249,6 +251,104 @@ public class AssignmentsHandler { } + public static List getAssignmentsSpacedOutByDomain(List assignments, int assignmentsSize, boolean shouldPrintDifference) + { + List spacedOutAssignments = new ArrayList<>(assignmentsSize); + + // Check the order of urls' domain in the list. Same domain-urls should be far away from each other, to improve parallelism. (this should happen after the plugin-categorization) + HashMultimap domainsWithAssignments = HashMultimap.create(assignmentsSize/3, 3); + + StringBuilder sb = null; + if ( shouldPrintDifference ) + sb = new StringBuilder(assignmentsSize * 20); + + for ( Assignment assignment : assignments ) { + if ( assignment != null ) { + String url = assignment.getOriginalUrl(); + if ( url != null ) { + String domain = UrlUtils.getDomainStr(url, null); + if ( domain != null ) { + domain = UrlUtils.getTopThreeLevelDomain(domain); // This does not return null, only the param itself, in case of an error. + domainsWithAssignments.put(domain, assignment); // Each "domain" will have multiple assignments. + if ( sb != null ) + sb.append(domain).append("\n"); // DEBUG! + } + } + } + } + + if ( sb != null ) { + logger.debug("Before change:\n" + sb.toString()); // DEBUG! + sb.setLength(0); // Reset it without re-sizing it. + } + + List domains = new ArrayList<>(domainsWithAssignments.keySet()); + int domainsSize = domains.size(); + + Integer domainsCounter = -1; + + for ( int i = 0; i < assignmentsSize; ++i ) + { + HashMap result = getFirstAvailableObjectForSpacedOutDomains(domains, domainsCounter, domainsWithAssignments, domainsSize, sb); + if ( result == null ) { // Check whether the recursive method was left without data. + logger.warn("the recursive method was asked to do more, using less data!"); + break; + } + Assignment nextAssignment = (Assignment) result.keySet().toArray()[0]; + domainsCounter = result.get(nextAssignment); + spacedOutAssignments.add(nextAssignment); + } + + // The HashMultimap is no longer needed. + domainsWithAssignments.clear(); + + if ( sb != null ) + logger.debug("After change:\n" + sb.toString()); + + return spacedOutAssignments; + } + + + /** + * This method uses recursion to go through the "domainsWithAssignments" multimap and get the nextAssignment. + * The recursion terminates when there is no more data for any domain. + * This method may return null, in case it is called more time than the number of assignments all the domains hold inside "domainsWithAssignments". + * */ + public static HashMap getFirstAvailableObjectForSpacedOutDomains(List domainsList, Integer domainsCounter, HashMultimap domainsWithAssignments, int domainsSize, StringBuilder sb) + { + HashMap result = new HashMap<>(); + Object nextAssignment = null; + + // Normally, this method does not need a recursion-break-safety, as the initial-caller method should call this method exactly N times, where N is the number of all the values of "domainsWithAssignments". + // Although, for extra-safety and re-usability, let's have this check here. + Set domainsSet = domainsWithAssignments.keySet(); + if ( domainsSet.isEmpty() ) + return null; // Break recursion when the domains run-out. + + if ( domainsCounter < (domainsSize -1) ) + domainsCounter ++; + else + domainsCounter = 0; // Start over. + + String currentDomain = domainsList.get(domainsCounter); + Set assignmentsOfCurrentDomain = domainsWithAssignments.get(currentDomain); + if ( assignmentsOfCurrentDomain.isEmpty() ) { + // This domain is out of assignments, check the next available one. + result = getFirstAvailableObjectForSpacedOutDomains(domainsList, domainsCounter, domainsWithAssignments, domainsSize, sb); + } else { + nextAssignment = assignmentsOfCurrentDomain.toArray()[0]; + result.put(nextAssignment, domainsCounter); + + domainsWithAssignments.remove(currentDomain, nextAssignment); + + if ( sb != null ) + sb.append(currentDomain).append("\n"); // DEBUG! + } + + return result; + } + + public static void countDatasourcesAndRecords(int assignmentsSize) { Set datasources = assignmentsForPlugins.keySet();