Drastically improve performance by applying a pre-processing algorithm for the assignments-list to open some "space" between assignments which have the same domain, which in return, causes the threads to block less during execution.

(The threads block, due to the mandatory "politeness-delay" before reconnecting with the same domain, in order to avoid overloading the remote servers.)
This commit is contained in:
Lampros Smyrnaios 2023-02-24 23:23:37 +02:00
parent 84a37bd4b7
commit 81b61b530f
2 changed files with 106 additions and 4 deletions

View File

@ -39,6 +39,8 @@ dependencies {
exclude group: 'io.minio' // This is not used in the Worker, since it's the Controller which uploads the full-texts to S3. It also includes an older "commons-compress" version which causes problems.
}
implementation group: 'com.google.guava', name: 'guava', version: '31.1-jre'
implementation 'org.apache.commons:commons-compress:1.22'
implementation 'com.github.luben:zstd-jni:1.5.4-1'

View File

@ -24,10 +24,7 @@ import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
@Component
@ -126,6 +123,11 @@ public class AssignmentsHandler {
logger.info("AssignmentRequest < " + assignmentRequestCounter + " > was received and it's ready to be processed. It contains " + assignmentsSize + " tasks.");
// Make sure there are no multiple occurrences of urls with the same domains next to each other on the list.
// If the same domains appear too close in the list, then this means we have large waiting-times between url-connections, due to "politeness-delays" to avoid server-overloading.
assignments = getAssignmentsSpacedOutByDomain(assignments, assignmentsSize, false);
// Iterate over the tasks and add each task in its own list depending on the DATASOURCE in order to decide which plugin to use later.
for ( Assignment assignment : assignments ) {
@ -249,6 +251,104 @@ public class AssignmentsHandler {
}
public static List<Assignment> getAssignmentsSpacedOutByDomain(List<Assignment> assignments, int assignmentsSize, boolean shouldPrintDifference)
{
List<Assignment> spacedOutAssignments = new ArrayList<>(assignmentsSize);
// Check the order of urls' domain in the list. Same domain-urls should be far away from each other, to improve parallelism. (this should happen after the plugin-categorization)
HashMultimap<String, Assignment> domainsWithAssignments = HashMultimap.create(assignmentsSize/3, 3);
StringBuilder sb = null;
if ( shouldPrintDifference )
sb = new StringBuilder(assignmentsSize * 20);
for ( Assignment assignment : assignments ) {
if ( assignment != null ) {
String url = assignment.getOriginalUrl();
if ( url != null ) {
String domain = UrlUtils.getDomainStr(url, null);
if ( domain != null ) {
domain = UrlUtils.getTopThreeLevelDomain(domain); // This does not return null, only the param itself, in case of an error.
domainsWithAssignments.put(domain, assignment); // Each "domain" will have multiple assignments.
if ( sb != null )
sb.append(domain).append("\n"); // DEBUG!
}
}
}
}
if ( sb != null ) {
logger.debug("Before change:\n" + sb.toString()); // DEBUG!
sb.setLength(0); // Reset it without re-sizing it.
}
List<String> domains = new ArrayList<>(domainsWithAssignments.keySet());
int domainsSize = domains.size();
Integer domainsCounter = -1;
for ( int i = 0; i < assignmentsSize; ++i )
{
HashMap<Object, Integer> result = getFirstAvailableObjectForSpacedOutDomains(domains, domainsCounter, domainsWithAssignments, domainsSize, sb);
if ( result == null ) { // Check whether the recursive method was left without data.
logger.warn("the recursive method was asked to do more, using less data!");
break;
}
Assignment nextAssignment = (Assignment) result.keySet().toArray()[0];
domainsCounter = result.get(nextAssignment);
spacedOutAssignments.add(nextAssignment);
}
// The HashMultimap is no longer needed.
domainsWithAssignments.clear();
if ( sb != null )
logger.debug("After change:\n" + sb.toString());
return spacedOutAssignments;
}
/**
* This method uses recursion to go through the "domainsWithAssignments" multimap and get the nextAssignment.
* The recursion terminates when there is no more data for any domain.
* This method may return null, in case it is called more time than the number of assignments all the domains hold inside "domainsWithAssignments".
* */
public static HashMap<Object, Integer> getFirstAvailableObjectForSpacedOutDomains(List<String> domainsList, Integer domainsCounter, HashMultimap<String, ?> domainsWithAssignments, int domainsSize, StringBuilder sb)
{
HashMap<Object, Integer> result = new HashMap<>();
Object nextAssignment = null;
// Normally, this method does not need a recursion-break-safety, as the initial-caller method should call this method exactly N times, where N is the number of all the values of "domainsWithAssignments".
// Although, for extra-safety and re-usability, let's have this check here.
Set<String> domainsSet = domainsWithAssignments.keySet();
if ( domainsSet.isEmpty() )
return null; // Break recursion when the domains run-out.
if ( domainsCounter < (domainsSize -1) )
domainsCounter ++;
else
domainsCounter = 0; // Start over.
String currentDomain = domainsList.get(domainsCounter);
Set<?> assignmentsOfCurrentDomain = domainsWithAssignments.get(currentDomain);
if ( assignmentsOfCurrentDomain.isEmpty() ) {
// This domain is out of assignments, check the next available one.
result = getFirstAvailableObjectForSpacedOutDomains(domainsList, domainsCounter, domainsWithAssignments, domainsSize, sb);
} else {
nextAssignment = assignmentsOfCurrentDomain.toArray()[0];
result.put(nextAssignment, domainsCounter);
domainsWithAssignments.remove(currentDomain, nextAssignment);
if ( sb != null )
sb.append(currentDomain).append("\n"); // DEBUG!
}
return result;
}
public static void countDatasourcesAndRecords(int assignmentsSize)
{
Set<String> datasources = assignmentsForPlugins.keySet();