From 3ca7f7d00bf8d024bc99f0679308024ba436683f Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 14 Jan 2025 11:48:13 +0100 Subject: [PATCH] added parallelism for indexing --- .../directindex/tasks/ScheduledActions.java | 92 ++++++++++++------- src/main/resources/application.properties | 3 + .../controllers/LegacyApiControllerTest.java | 2 +- 3 files changed, 63 insertions(+), 34 deletions(-) diff --git a/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java b/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java index a80869d..c4258ab 100644 --- a/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java +++ b/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java @@ -8,6 +8,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -19,6 +21,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import eu.dnetlib.app.directindex.input.ResultEntry; import eu.dnetlib.app.directindex.mapping.SolrRecordMapper; @@ -44,58 +47,81 @@ public class ScheduledActions { @Autowired private PendingActionRepository pendingActionRepository; + @Value("${dnet.directindex.scheduling.nThreads}") + private int nThreads; + + @Value("${dnet.directindex.scheduling.maxActionsForThread}") + private int maxActionsForThread; + @Scheduled(initialDelay = 1, fixedDelay = 5, timeUnit = TimeUnit.MINUTES) - public synchronized void indexRecords() { + public synchronized void indexRecords() throws InterruptedException { if (!enabled) { log.info("SKIP"); return; } - try { - log.info("Indexing new records..."); - final Instant start = Instant.now(); + final Instant start = Instant.now(); - final List list = pendingActionRepository.findInsertOrUpdateOperations(); + final List list = pendingActionRepository.findInsertOrUpdateOperations(); + + if (list.isEmpty()) { return; } + + log.info("Start Indexing new records, size=" + list.size()); + + final ExecutorService executor = Executors.newFixedThreadPool(nThreads); + Lists.partition(list, maxActionsForThread).forEach(subList -> executor.execute(() -> indexRecords(subList))); + executor.shutdown(); + + if (executor.awaitTermination(20, TimeUnit.MINUTES)) { + final long timeElapsed = Duration.between(start, Instant.now()).toSeconds() + 1; // I ADD 1 TO AVOID DIVISION BY 0 + log.info(String.format("Indexed %s records in %d seconds (%.3f records/s)", list.size(), timeElapsed, (float) list.size() / timeElapsed)); + } else { + log.warn("Some threads continue to be running"); + } + } + + private void indexRecords(final List list) { + + log.info("(THREAD) Start indexing..."); + try { + final Instant start = Instant.now(); final Map invalids = new HashMap<>(); - if (list.size() > 0) { - final SolrIndexClient solr = solrIndexClientFactory.getClient(); + final SolrIndexClient solr = solrIndexClientFactory.getClient(); - final ObjectMapper objectMapper = new ObjectMapper(); + final ObjectMapper objectMapper = new ObjectMapper(); - final Iterator iter = list.stream() - .map(pendingAction -> { - final String id = pendingAction.getId(); - final String body = pendingAction.getBody(); + final Iterator iter = list.stream() + .map(pendingAction -> { + final String id = pendingAction.getId(); + final String body = pendingAction.getBody(); - try { - final ResultEntry resultEntry = objectMapper.readValue(body, ResultEntry.class); - final SolrInputDocument doc = solrRecordMapper.toSolrInputRecord(resultEntry); - return doc; - } catch (final Throwable e) { - invalids.put(id, e.getClass().getName() + ": " + e.getMessage()); - log.error(e); - return null; - } - }) - .filter(Objects::nonNull) - .iterator(); + try { + final ResultEntry resultEntry = objectMapper.readValue(body, ResultEntry.class); + final SolrInputDocument doc = solrRecordMapper.toSolrInputRecord(resultEntry); + return doc; + } catch (final Throwable e) { + invalids.put(id, e.getClass().getName() + ": " + e.getMessage()); + log.error(e); + return null; + } + }) + .filter(Objects::nonNull) + .iterator(); - solr.addRecords(iter); - solr.commit(); + solr.addRecords(iter); + solr.commit(); - updateExecutionDate(list, invalids); - } + updateExecutionDate(list, invalids); - final Instant finish = Instant.now(); - final long timeElapsed = Duration.between(start, finish).toSeconds() + 1; // I ADD 1 TO AVOID DIVISION BY 0 + final long timeElapsed = Duration.between(start, Instant.now()).toSeconds() + 1; // I ADD 1 TO AVOID DIVISION BY 0 + + log.info(String.format("(THREAD) Indexed %s records in %d seconds (%.3f records/s)", list.size(), timeElapsed, (float) list.size() / timeElapsed)); - log.info(String.format("Indexed %s records in %d seconds (%.3f records/s)", list.size(), timeElapsed, (float) list.size() / timeElapsed)); } catch (final Throwable e) { - log.error("The scheduled task is failed", e); + log.error("(THREAD) Error indexing records", e); } - } @Scheduled(initialDelay = 10, fixedDelay = 30, timeUnit = TimeUnit.MINUTES) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 745563d..efd1618 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -9,6 +9,9 @@ dnet.directindex.scheduling.enabled = true dnet.directindex.legacy.enabled = true dnet.directindex.sword.enabled = false +dnet.directindex.scheduling.nThreads = 10 +dnet.directindex.scheduling.maxActionsForThread = 100 + dnet.directindex.solr.urls = http://localhost:8981/solr,http://localhost:8982/solr,http://localhost:8983/solr dnet.directindex.solr.collection = DMF-index-openaire dnet.directindex.solr.client = LBHTTP2 diff --git a/src/test/java/eu/dnetlib/app/directindex/controllers/LegacyApiControllerTest.java b/src/test/java/eu/dnetlib/app/directindex/controllers/LegacyApiControllerTest.java index 71aa9f1..1fe615f 100644 --- a/src/test/java/eu/dnetlib/app/directindex/controllers/LegacyApiControllerTest.java +++ b/src/test/java/eu/dnetlib/app/directindex/controllers/LegacyApiControllerTest.java @@ -20,7 +20,7 @@ import eu.dnetlib.app.directindex.tasks.ScheduledActions; @Disabled public class LegacyApiControllerTest { - private static final int MAX_RESULTS = 1000; + private static final int MAX_RESULTS = 5423; @Autowired private LegacyApiController controller;