package eu.dnetlib.ariadneplus.workflows.nodes; import java.io.IOException; import java.net.ConnectException; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.NameValuePair; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicNameValuePair; import org.springframework.beans.factory.annotation.Autowired; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; import eu.dnetlib.enabling.resultset.client.ResultSetClient; import eu.dnetlib.msro.workflows.graph.Arc; import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; import eu.dnetlib.msro.workflows.procs.Env; import eu.dnetlib.msro.workflows.procs.Token; import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider; import eu.dnetlib.msro.workflows.util.WorkflowsConstants; import eu.dnetlib.rmi.common.ResultSet; import eu.dnetlib.rmi.manager.MSROException; public class PublishGraphDBJobNode extends AsyncJobNode { private static final Log log = LogFactory.getLog(PublishGraphDBJobNode.class); private String eprParam; @Autowired private ResultSetClient resultSetClient; private String publisherEndpoint; private String datasourceInterface; private String datasource; //for parallel requests to the publisher endpoint private int nThreads = 5; private int nTasks = 150; private ExecutorService executorService = Executors.newFixedThreadPool(nThreads); private List> resList = Lists.newArrayList(); @Override protected String execute(final Env env) throws Exception { final ResultSet rsIn = env.getAttribute(getEprParam(), ResultSet.class); if ((rsIn == null)) { throw new MSROException("EprParam (" + getEprParam() + ") not found in ENV"); } int countAll = 0; int countOk = 0; int partial = 0; Map errors = Maps.newHashMap(); log.info("Publisher endpoint: " + getPublisherEndpoint()); PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); cm.setMaxTotal(nThreads); CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build(); log.info("DropDatasourceApisPartitionInfo endpoint: " + getDropDatasourceApisPartitionInfoEndpoint()); CloseableHttpResponse responseDDAPIPOST = null; try { HttpPost post = new HttpPost(getDropDatasourceApisPartitionInfoEndpoint()); List params = Lists.newArrayList(); String datasourceInterfaceValue = getDatasourceInterface(); log.info("drop datasourceApis partition info for datasourceInterface " + datasourceInterfaceValue); params.add(new BasicNameValuePair("datasourceApi", datasourceInterfaceValue)); UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); post.setEntity(ent); responseDDAPIPOST = client.execute(post); int statusCode = responseDDAPIPOST.getStatusLine().getStatusCode(); switch (statusCode) { case 200: log.info("drop datasourceApis partition info completed"); break; default: log.error("error dropping datasourceApis partition info " + statusCode + ": " + responseDDAPIPOST.getStatusLine().getReasonPhrase()); break; } } catch (ConnectException ce) { throw new MSROException("unable to connect to Publisher endpoint" + getPublishEndpoint()); } catch (IOException e) { log.error("error feeding provenance ", e); } finally{ if(responseDDAPIPOST != null) responseDDAPIPOST.close(); } //need to slow down the producer to avoid OOM errors due to many tasks in the queue of the executor //see for example here: https://stackoverflow.com/questions/42108351/executorservice-giving-out-of-memory-error //let's stop and wait after submission of nLatch tasks boolean forceExit = false; for (String record : getResultSetClient().iter(rsIn, String.class)) { if(forceExit) break; countAll++; if(partial == nTasks) { log.debug("Waiting for tasks to complete before resubmitting to executor (countAll = "+countAll+") . . . "); log.debug("Getting replies"); long startWait = System.currentTimeMillis(); for(Future res : resList){ if(res.get() == 200) countOk++; if(res.get() == 400 || res.get() == 401 || res.get() == 402 | res.get() == 403 || res.get() == 404){ executorService.shutdownNow(); throw new MSROException("Client error "+ res.get()); } } resList.clear(); partial = 0; log.debug(". . . Ready to submit again after "+(System.currentTimeMillis() - startWait)+" ms" ); } partial++; Future res = executorService.submit( () -> { CloseableHttpResponse responsePPOST = null; try { HttpPost post = new HttpPost(getPublishEndpoint()); List params = Lists.newArrayList(); params.add(new BasicNameValuePair("record", record)); UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); post.setEntity(ent); responsePPOST = client.execute(post); int statusCode = responsePPOST.getStatusLine().getStatusCode(); switch (statusCode) { case 200: return statusCode; default: log.error(responsePPOST.getStatusLine().getStatusCode() + ": " + responsePPOST.getStatusLine().getReasonPhrase()); log.error("Source record causing error: " + record); errors.merge(statusCode, 1, Integer::sum); return statusCode; } } catch (ConnectException ce) { throw new MSROException("unable to connect to Publisher endpoint" + getPublishEndpoint()); } catch (IOException e) { e.printStackTrace(); errors.merge(-1, 1, Integer::sum); } finally{ if(responsePPOST != null) responsePPOST.close(); } return -1; }); resList.add(res); } executorService.shutdown(); //now let's wait for the results. We can block ourselves here: we have nothing else to do log.info("Waiting for responses"); for(Future res : resList){ if(res.get() == 200) countOk++; } log.info(String.format("Got all responses. Ok responses: %s/%s", countOk, countAll)); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "errorsMap", new Gson().toJson(errors)); log.info("publishing completed"); if (!errors.isEmpty()) { log.warn("Problems in publishing: "+countOk+"/"+countAll+" see error maps for details"); } if(countAll == 0) { log.warn("0 resources to publish"); } if (countOk > 0) { log.info("Feed provenance endpoint: " + getProvenanceFeedEndpoint()); CloseableHttpResponse responsePFPOST = null; try { HttpPost post = new HttpPost(getProvenanceFeedEndpoint()); List params = Lists.newArrayList(); String datasourceInterfaceValue = getDatasourceInterface(); log.info("feeding provenance for datasourceInterface " + datasourceInterfaceValue); params.add(new BasicNameValuePair("datasourceApi", datasourceInterfaceValue)); String datasourceValue = getDatasource(); log.info("feeding provenance for datasource " + datasourceValue); params.add(new BasicNameValuePair("datasource", datasourceValue)); UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); post.setEntity(ent); responsePFPOST = client.execute(post); int statusCode = responsePFPOST.getStatusLine().getStatusCode(); switch (statusCode) { case 200: log.info("feed provenance completed"); break; default: log.error("error feeding provenance " + responsePFPOST.getStatusLine().getStatusCode() + ": " + responsePFPOST.getStatusLine().getReasonPhrase()); break; } } catch (ConnectException ce) { throw new MSROException("unable to connect to Publisher endpoint" + getPublishEndpoint()); } catch (IOException e) { log.error("error feeding provenance ", e); } finally{ if(responsePFPOST != null) responsePFPOST.close(); client.close(); cm.shutdown(); } } return Arc.DEFAULT_ARC; } public String getPublisherEndpoint() { return publisherEndpoint; } private String getPublishEndpoint() { return publisherEndpoint.concat("/publish"); } private String getProvenanceFeedEndpoint() { return publisherEndpoint.concat("/feedProvenance"); } private String getDropDatasourceApisPartitionInfoEndpoint() { return publisherEndpoint.concat("/dropDatasourceApiGraph"); } public void setPublisherEndpoint(final String publisherEndpoint) { this.publisherEndpoint = publisherEndpoint; } public ResultSetClient getResultSetClient() { return resultSetClient; } public void setResultSetClient(final ResultSetClient resultSetClient) { this.resultSetClient = resultSetClient; } public String getEprParam() { return eprParam; } public void setEprParam(String eprParam) { this.eprParam = eprParam; } public String getDatasourceInterface() { return datasourceInterface; } public void setDatasourceInterface(String datasourceInterface) { this.datasourceInterface = datasourceInterface; } @Override protected void beforeStart(Token token) { token.setProgressProvider(new ResultsetProgressProvider(token.getEnv().getAttribute(getEprParam(), ResultSet.class), this.resultSetClient)); } public String getDatasource() { return datasource; } public void setDatasource(String datasource) { this.datasource = datasource; } }