added parallelism for indexing

This commit is contained in:
Michele Artini 2025-01-14 11:48:13 +01:00
parent 87d0cecf38
commit 3ca7f7d00b
3 changed files with 63 additions and 34 deletions

View File

@ -8,6 +8,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@ -19,6 +21,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.app.directindex.input.ResultEntry;
import eu.dnetlib.app.directindex.mapping.SolrRecordMapper;
@ -44,58 +47,81 @@ public class ScheduledActions {
@Autowired
private PendingActionRepository pendingActionRepository;
@Value("${dnet.directindex.scheduling.nThreads}")
private int nThreads;
@Value("${dnet.directindex.scheduling.maxActionsForThread}")
private int maxActionsForThread;
@Scheduled(initialDelay = 1, fixedDelay = 5, timeUnit = TimeUnit.MINUTES)
public synchronized void indexRecords() {
public synchronized void indexRecords() throws InterruptedException {
if (!enabled) {
log.info("SKIP");
return;
}
try {
log.info("Indexing new records...");
final Instant start = Instant.now();
final Instant start = Instant.now();
final List<PendingAction> list = pendingActionRepository.findInsertOrUpdateOperations();
final List<PendingAction> list = pendingActionRepository.findInsertOrUpdateOperations();
if (list.isEmpty()) { return; }
log.info("Start Indexing new records, size=" + list.size());
final ExecutorService executor = Executors.newFixedThreadPool(nThreads);
Lists.partition(list, maxActionsForThread).forEach(subList -> executor.execute(() -> indexRecords(subList)));
executor.shutdown();
if (executor.awaitTermination(20, TimeUnit.MINUTES)) {
final long timeElapsed = Duration.between(start, Instant.now()).toSeconds() + 1; // I ADD 1 TO AVOID DIVISION BY 0
log.info(String.format("Indexed %s records in %d seconds (%.3f records/s)", list.size(), timeElapsed, (float) list.size() / timeElapsed));
} else {
log.warn("Some threads continue to be running");
}
}
private void indexRecords(final List<PendingAction> list) {
log.info("(THREAD) Start indexing...");
try {
final Instant start = Instant.now();
final Map<String, String> invalids = new HashMap<>();
if (list.size() > 0) {
final SolrIndexClient solr = solrIndexClientFactory.getClient();
final SolrIndexClient solr = solrIndexClientFactory.getClient();
final ObjectMapper objectMapper = new ObjectMapper();
final ObjectMapper objectMapper = new ObjectMapper();
final Iterator<SolrInputDocument> iter = list.stream()
.map(pendingAction -> {
final String id = pendingAction.getId();
final String body = pendingAction.getBody();
final Iterator<SolrInputDocument> iter = list.stream()
.map(pendingAction -> {
final String id = pendingAction.getId();
final String body = pendingAction.getBody();
try {
final ResultEntry resultEntry = objectMapper.readValue(body, ResultEntry.class);
final SolrInputDocument doc = solrRecordMapper.toSolrInputRecord(resultEntry);
return doc;
} catch (final Throwable e) {
invalids.put(id, e.getClass().getName() + ": " + e.getMessage());
log.error(e);
return null;
}
})
.filter(Objects::nonNull)
.iterator();
try {
final ResultEntry resultEntry = objectMapper.readValue(body, ResultEntry.class);
final SolrInputDocument doc = solrRecordMapper.toSolrInputRecord(resultEntry);
return doc;
} catch (final Throwable e) {
invalids.put(id, e.getClass().getName() + ": " + e.getMessage());
log.error(e);
return null;
}
})
.filter(Objects::nonNull)
.iterator();
solr.addRecords(iter);
solr.commit();
solr.addRecords(iter);
solr.commit();
updateExecutionDate(list, invalids);
}
updateExecutionDate(list, invalids);
final Instant finish = Instant.now();
final long timeElapsed = Duration.between(start, finish).toSeconds() + 1; // I ADD 1 TO AVOID DIVISION BY 0
final long timeElapsed = Duration.between(start, Instant.now()).toSeconds() + 1; // I ADD 1 TO AVOID DIVISION BY 0
log.info(String.format("(THREAD) Indexed %s records in %d seconds (%.3f records/s)", list.size(), timeElapsed, (float) list.size() / timeElapsed));
log.info(String.format("Indexed %s records in %d seconds (%.3f records/s)", list.size(), timeElapsed, (float) list.size() / timeElapsed));
} catch (final Throwable e) {
log.error("The scheduled task is failed", e);
log.error("(THREAD) Error indexing records", e);
}
}
@Scheduled(initialDelay = 10, fixedDelay = 30, timeUnit = TimeUnit.MINUTES)

View File

@ -9,6 +9,9 @@ dnet.directindex.scheduling.enabled = true
dnet.directindex.legacy.enabled = true
dnet.directindex.sword.enabled = false
dnet.directindex.scheduling.nThreads = 10
dnet.directindex.scheduling.maxActionsForThread = 100
dnet.directindex.solr.urls = http://localhost:8981/solr,http://localhost:8982/solr,http://localhost:8983/solr
dnet.directindex.solr.collection = DMF-index-openaire
dnet.directindex.solr.client = LBHTTP2

View File

@ -20,7 +20,7 @@ import eu.dnetlib.app.directindex.tasks.ScheduledActions;
@Disabled
public class LegacyApiControllerTest {
private static final int MAX_RESULTS = 1000;
private static final int MAX_RESULTS = 5423;
@Autowired
private LegacyApiController controller;