From b8d0e3f74149cf968f43302fc71660f7afbd1ab5 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Thu, 7 Jan 2021 23:40:47 +0100 Subject: [PATCH] enrichment queries set is now splitted on workflow node side, because of timeout error on very large collection --- .../nodes/EnrichGraphDBContentJobNode.java | 70 ++++++++----------- 1 file changed, 30 insertions(+), 40 deletions(-) diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java index 4f6feba..8cfebdd 100644 --- a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java @@ -1,5 +1,6 @@ package eu.dnetlib.ariadneplus.workflows.nodes; +import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; @@ -12,6 +13,7 @@ import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider; import eu.dnetlib.msro.workflows.util.WorkflowsConstants; import eu.dnetlib.rmi.common.ResultSet; import eu.dnetlib.rmi.manager.MSROException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.NameValuePair; @@ -44,59 +46,47 @@ public class EnrichGraphDBContentJobNode extends AsyncJobNode { private String datasource; //for parallel requests to the publisher endpoint - private int nThreads = 5; + private int nThreads = 1; @Override protected String execute(final Env env) throws Exception { int statusCode = -1; - String enrichResult = "noResult"; - log.info("Publisher endpoint: " + getPublisherEndpoint()); - log.info("Enrich Query Value: " + getSparqlUpdateQuery()); + String enrichResult = ""; PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); cm.setMaxTotal(nThreads); CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build(); - log.info("Enrich endpoint: " + getEnrichEndpoint()); CloseableHttpResponse responsePOST = null; - try { - HttpPost post = new HttpPost(getEnrichEndpoint()); - List params = Lists.newArrayList(); - String datasourceInterfaceValue = getDatasourceInterface(); - StringEntity entity = new StringEntity(getSparqlUpdateQuery()); - post.setEntity(entity); - responsePOST = client.execute(post); - statusCode = responsePOST.getStatusLine().getStatusCode(); - switch (statusCode) { - case 200: - log.info("enrich graphDB content completed"); - break; - default: - log.error("error enriching graphDB " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase()); - break; + String queryValue = getSparqlUpdateQuery(); + int countQueries = 0; + int countSuccess = 0; + String endpoint = getEnrichEndpoint(); + for(String query : Splitter.on(";").split(queryValue)){ + if (StringUtils.isNoneBlank(query)) { + countQueries++; + HttpPost post = new HttpPost(endpoint); + StringEntity entity = new StringEntity(query); + post.setEntity(entity); + responsePOST = client.execute(post); + statusCode = responsePOST.getStatusLine().getStatusCode(); + switch (statusCode) { + case 200: + log.info(String.format("Query %d executed: %s", countQueries, query)); + break; + default: + log.error("error enriching graphDB " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase()); + throw new MSROException(String.format("Cannot execute sparql from %s", query)); + } + countSuccess++; } - } catch (ConnectException ce) { - log.error(ce); - throw new MSROException("Unable to connect to Publisher endpoint" + getEnrichEndpoint()); } - catch (IOException e) { - log.error(e); - throw new MSROException("IO Error" + getEnrichEndpoint()); - } - finally{ - if(responsePOST != null) responsePOST.close(); - client.close(); - cm.shutdown(); - } - - env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode)); - env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "enrichResult", enrichResult); - + enrichResult = String.format("Queries committed with success %d/%d", countSuccess, countQueries); log.info(enrichResult); - if (statusCode!=200) { - throw new MSROException("Error from Publisher endpoint [ status code: " + statusCode + " ]"); - } - + if(responsePOST != null) responsePOST.close(); + client.close(); + cm.shutdown(); + env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "enrichResult", enrichResult); return Arc.DEFAULT_ARC; }