diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java index b7197c8..19af614 100644 --- a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java @@ -14,8 +14,11 @@ import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.HttpClient; import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicNameValuePair; import org.springframework.beans.factory.annotation.Autowired; @@ -46,6 +49,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode { //for parallel requests to the publisher endpoint private int nThreads = 5; + private int nTasks = 150; private ExecutorService executorService = Executors.newFixedThreadPool(nThreads); private List> resList = Lists.newArrayList(); @@ -57,19 +61,39 @@ public class PublishGraphDBJobNode extends AsyncJobNode { int countAll = 0; int countOk = 0; + int partial = 0; Map errors = Maps.newHashMap(); - log.info("Publish endpoint: " + getPublishEndpoint()); + log.info("Publisher endpoint: " + getPublisherEndpoint()); + PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); + cm.setMaxTotal(nThreads); + + CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build(); + //need to slow down the producer to avoid OOM errors due to many tasks in the queue of the executor + //see for example here: https://stackoverflow.com/questions/42108351/executorservice-giving-out-of-memory-error + //let's stop and wait after submission of nLatch tasks for (String record : getResultSetClient().iter(rsIn, String.class)) { countAll++; + if(partial == nTasks) { + log.debug("Waiting for tasks to complete before resubmitting to executor (countAll = "+countAll+") . . . "); + log.debug("Getting replies"); + long startWait = System.currentTimeMillis(); + for(Future res : resList){ + if(res.get() == 200) countOk++; + } + resList.clear(); + partial = 0; + log.debug(". . . Ready to submit again after "+(System.currentTimeMillis() - startWait)+" ms" ); + } + partial++; Future res = executorService.submit( () -> { + CloseableHttpResponse responsePOST = null; try { HttpPost post = new HttpPost(getPublishEndpoint()); List params = Lists.newArrayList(); params.add(new BasicNameValuePair("record", record)); UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); post.setEntity(ent); - HttpClient client = HttpClients.createDefault(); - HttpResponse responsePOST = client.execute(post); + responsePOST = client.execute(post); int statusCode = responsePOST.getStatusLine().getStatusCode(); switch (statusCode) { case 200: @@ -87,6 +111,9 @@ public class PublishGraphDBJobNode extends AsyncJobNode { e.printStackTrace(); errors.merge(-1, 1, Integer::sum); } + finally{ + if(responsePOST != null) responsePOST.close(); + } return -1; }); resList.add(res); @@ -98,6 +125,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode { for(Future res : resList){ if(res.get() == 200) countOk++; } + client.close(); log.info(String.format("Got all responses. Ok responses: %s/%s", countOk, countAll)); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk); @@ -108,14 +136,14 @@ public class PublishGraphDBJobNode extends AsyncJobNode { if (!errors.isEmpty()) { log.warn("Problems in publishing: "+countOk+"/"+countAll+" see error maps for details"); } - if(countAll == 0) log.warn("0 resources to publish"); - - env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk); - env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll); - env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "errorsMap", new Gson().toJson(errors)); + if(countAll == 0) { + log.warn("0 resources to publish"); + cm.shutdown(); + } if (countOk > 0) { log.info("Feed provenance endpoint: " + getProvenanceFeedEndpoint()); + CloseableHttpResponse responsePOST = null; try { HttpPost post = new HttpPost(getProvenanceFeedEndpoint()); List params = Lists.newArrayList(); @@ -124,8 +152,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode { params.add(new BasicNameValuePair("datasourceApi", datasourceInterfaceValue)); UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); post.setEntity(ent); - HttpClient client = HttpClients.createDefault(); - HttpResponse responsePOST = client.execute(post); + responsePOST = client.execute(post); int statusCode = responsePOST.getStatusLine().getStatusCode(); switch (statusCode) { case 200: @@ -141,8 +168,12 @@ public class PublishGraphDBJobNode extends AsyncJobNode { catch (IOException e) { log.error("error feeding provenance ", e); } + finally{ + if(responsePOST != null) responsePOST.close(); + cm.shutdown(); + } } - + return Arc.DEFAULT_ARC; }