package eu.dnetlib.ariadneplus.workflows.nodes; import java.io.IOException; import java.net.ConnectException; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.HttpClient; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.springframework.beans.factory.annotation.Autowired; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; import eu.dnetlib.enabling.resultset.client.ResultSetClient; import eu.dnetlib.msro.workflows.graph.Arc; import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; import eu.dnetlib.msro.workflows.procs.Env; import eu.dnetlib.msro.workflows.util.WorkflowsConstants; import eu.dnetlib.rmi.common.ResultSet; import eu.dnetlib.rmi.manager.MSROException; public class PublishGraphDBJobNode extends AsyncJobNode { private static final Log log = LogFactory.getLog(PublishGraphDBJobNode.class); private String eprParam; @Autowired private ResultSetClient resultSetClient; private String publisherEndpoint; private String datasourceInterface; //for parallel requests to the publisher endpoint private int nThreads = 5; private ExecutorService executorService = Executors.newFixedThreadPool(nThreads); private List> resList = Lists.newArrayList(); @Override protected String execute(final Env env) throws Exception { final ResultSet rsIn = env.getAttribute(getEprParam(), ResultSet.class); if ((rsIn == null)) { throw new MSROException("EprParam (" + getEprParam() + ") not found in ENV"); } int countAll = 0; int countOk = 0; Map errors = Maps.newHashMap(); log.info("Publish endpoint: " + getPublishEndpoint()); for (String record : getResultSetClient().iter(rsIn, String.class)) { countAll++; Future res = executorService.submit( () -> { try { HttpPost post = new HttpPost(getPublishEndpoint()); List params = Lists.newArrayList(); params.add(new BasicNameValuePair("record", record)); UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); post.setEntity(ent); HttpClient client = HttpClients.createDefault(); HttpResponse responsePOST = client.execute(post); int statusCode = responsePOST.getStatusLine().getStatusCode(); switch (statusCode) { case 200: return statusCode; default: log.error(responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase()); log.error("Source record causing error: " + record); errors.merge(statusCode, 1, Integer::sum); return statusCode; } } catch (ConnectException ce) { throw new MSROException("unable to connect to Publisher endpoint" + getPublishEndpoint()); } catch (IOException e) { e.printStackTrace(); errors.merge(-1, 1, Integer::sum); } return -1; }); resList.add(res); } executorService.shutdown(); //now let's wait for the results. We can block ourselves here: we have nothing else to do log.info("Waiting for responses"); for(Future res : resList){ if(res.get() == 200) countOk++; } log.info(String.format("Got all responses. Ok responses: %s/%s", countOk, countAll)); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "errorsMap", new Gson().toJson(errors)); log.info("publishing completed"); if (!errors.isEmpty()) { log.warn("Problems in publishing: "+countOk+"/"+countAll+" see error maps for details"); } if(countAll == 0) log.warn("0 resources to publish"); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "errorsMap", new Gson().toJson(errors)); if (countOk > 0) { log.info("Feed provenance endpoint: " + getProvenanceFeedEndpoint()); try { HttpPost post = new HttpPost(getProvenanceFeedEndpoint()); List params = Lists.newArrayList(); String datasourceInterfaceValue = getDatasourceInterface(); log.info("feeding provenance for datasourceInterface " + datasourceInterfaceValue); params.add(new BasicNameValuePair("datasourceApi", datasourceInterfaceValue)); UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); post.setEntity(ent); HttpClient client = HttpClients.createDefault(); HttpResponse responsePOST = client.execute(post); int statusCode = responsePOST.getStatusLine().getStatusCode(); switch (statusCode) { case 200: log.info("feed provenance completed"); break; default: log.error("error feeding provenance " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase()); break; } } catch (ConnectException ce) { throw new MSROException("unable to connect to Publisher endpoint" + getPublishEndpoint()); } catch (IOException e) { log.error("error feeding provenance ", e); } } return Arc.DEFAULT_ARC; } public String getPublisherEndpoint() { return publisherEndpoint; } private String getPublishEndpoint() { return publisherEndpoint.concat("/publish"); } private String getProvenanceFeedEndpoint() { return publisherEndpoint.concat("/feedProvenance"); } public void setPublisherEndpoint(final String publisherEndpoint) { this.publisherEndpoint = publisherEndpoint; } public ResultSetClient getResultSetClient() { return resultSetClient; } public void setResultSetClient(final ResultSetClient resultSetClient) { this.resultSetClient = resultSetClient; } public String getEprParam() { return eprParam; } public void setEprParam(String eprParam) { this.eprParam = eprParam; } public String getDatasourceInterface() { return datasourceInterface; } public void setDatasourceInterface(String datasourceInterface) { this.datasourceInterface = datasourceInterface; } }