From 12756f9d41f6b12ba2afdd6439d2776016cedae7 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Wed, 13 May 2020 16:11:40 +0200 Subject: [PATCH] multithread (4 threads) test to feed elastic search --- .../doiboost/orcid/ElasticSearchTest.java | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/ElasticSearchTest.java b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/ElasticSearchTest.java index 0908299b0..b4b2c7844 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/ElasticSearchTest.java +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/ElasticSearchTest.java @@ -11,6 +11,18 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.junit.jupiter.api.Test; +import java.net.ConnectException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + public class ElasticSearchTest { private static final String BASE_CFG_URL = "http://%s:9200/%s/%s/%s?pretty"; @@ -21,6 +33,11 @@ public class ElasticSearchTest { private String record; private int readTimeout = 30000; + private int nThreads = 4; + private int nTasks = 150; + private ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + private List> resList = Lists.newArrayList(); + public void setup() { indexHost = "ip-90-147-167-25.ct1.garrservices.it"; indexName = "orcid_update"; @@ -83,4 +100,86 @@ public class ElasticSearchTest { } } } + +// @Test + public void testMultiThreadFeed() throws Exception { + setup(); + int countAll = 0; + int countOk = 0; + int partial = 0; + String recordTemplate = "{\n" + + " \"timestamp\": 1540825815212,\n" + + " \"pid\": \"%s\",\n" + + " \"blob\": \"\"\n" + + " }"; + Map errors = Maps.newHashMap(); + PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); + cm.setMaxTotal(nThreads); + CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build(); + for (int i = 40000; i < 41000; i++) { + String orcidId = String.format("%s", i); + String record = String.format(recordTemplate, orcidId); + countAll++; + if(partial == nTasks) { + System.out.println("Waiting for tasks to complete before resubmitting to executor (countAll = "+countAll+") . . . "); + System.out.println("Getting replies"); + long startWait = System.currentTimeMillis(); + for(Future res : resList){ + if(res.get() == 200) countOk++; + } + resList.clear(); + partial = 0; + System.out.println(". . . Ready to submit again after "+(System.currentTimeMillis() - startWait)+" ms" ); + } + partial++; + Future res = executorService.submit( () -> { + CloseableHttpResponse responsePPOST = null; + try { + + String url = String.format(BASE_CFG_URL, indexHost, indexName, indexType, orcidId); + HttpPost post = new HttpPost(url); + post.setHeader("Accept", "application/json"); + post.setHeader("Content-type", "application/json"); + StringEntity entity = new StringEntity(record); + post.setEntity(entity); + responsePPOST = client.execute(post); + int statusCode = responsePPOST.getStatusLine().getStatusCode(); + switch (statusCode) { + case 200: + case 201: + return statusCode; + default: + System.out.println(responsePPOST.getStatusLine().getStatusCode() + ": " + responsePPOST.getStatusLine().getReasonPhrase()); + System.out.println("Source record causing error: " + record); + errors.merge(statusCode, 1, Integer::sum); + return statusCode; + } + } catch (ConnectException ce) { + throw ce; + } + catch (IOException e) { + e.printStackTrace(); + errors.merge(-1, 1, Integer::sum); + } + finally{ + if(responsePPOST != null) responsePPOST.close(); + } + return -1; + }); + resList.add(res); + } + executorService.shutdown(); + + //now let's wait for the results. We can block ourselves here: we have nothing else to do + System.out.println("Waiting for responses"); + for(Future res : resList){ + if(res.get() == 200) countOk++; + } + client.close(); + cm.shutdown(); + + System.out.println("countOk: "+countOk); + System.out.println("countAll: "+countAll); + System.out.println("errors count: "+errors.size()); + } }