diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java index 4c7f1b8..b7197c8 100644 --- a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java @@ -8,16 +8,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import eu.dnetlib.enabling.resultset.client.ResultSetClient; -import eu.dnetlib.msro.workflows.graph.Arc; -import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; -import eu.dnetlib.msro.workflows.procs.Env; -import eu.dnetlib.msro.workflows.util.WorkflowsConstants; -import eu.dnetlib.rmi.common.ResultSet; -import eu.dnetlib.rmi.manager.MSROException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.HttpResponse; @@ -29,6 +19,18 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.springframework.beans.factory.annotation.Autowired; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.Gson; + +import eu.dnetlib.enabling.resultset.client.ResultSetClient; +import eu.dnetlib.msro.workflows.graph.Arc; +import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; +import eu.dnetlib.msro.workflows.procs.Env; +import eu.dnetlib.msro.workflows.util.WorkflowsConstants; +import eu.dnetlib.rmi.common.ResultSet; +import eu.dnetlib.rmi.manager.MSROException; + public class PublishGraphDBJobNode extends AsyncJobNode { @@ -40,6 +42,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode { private ResultSetClient resultSetClient; private String publisherEndpoint; + private String datasourceInterface; //for parallel requests to the publisher endpoint private int nThreads = 5; @@ -55,13 +58,12 @@ public class PublishGraphDBJobNode extends AsyncJobNode { int countAll = 0; int countOk = 0; Map errors = Maps.newHashMap(); - log.info("Publisher endpoint: " + getPublisherEndpoint()); - + log.info("Publish endpoint: " + getPublishEndpoint()); for (String record : getResultSetClient().iter(rsIn, String.class)) { countAll++; Future res = executorService.submit( () -> { try { - HttpPost post = new HttpPost(getPublisherEndpoint()); + HttpPost post = new HttpPost(getPublishEndpoint()); List params = Lists.newArrayList(); params.add(new BasicNameValuePair("record", record)); UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); @@ -79,7 +81,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode { return statusCode; } } catch (ConnectException ce) { - throw new MSROException("unable to connect to Publisher endpoint" + getPublisherEndpoint()); + throw new MSROException("unable to connect to Publisher endpoint" + getPublishEndpoint()); } catch (IOException e) { e.printStackTrace(); @@ -96,7 +98,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode { for(Future res : resList){ if(res.get() == 200) countOk++; } - log.info(String.format("Got all responses. Ok %s/%s", countOk, countAll)); + log.info(String.format("Got all responses. Ok responses: %s/%s", countOk, countAll)); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll); @@ -107,12 +109,54 @@ public class PublishGraphDBJobNode extends AsyncJobNode { log.warn("Problems in publishing: "+countOk+"/"+countAll+" see error maps for details"); } if(countAll == 0) log.warn("0 resources to publish"); + + env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk); + env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll); + env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "errorsMap", new Gson().toJson(errors)); + + if (countOk > 0) { + log.info("Feed provenance endpoint: " + getProvenanceFeedEndpoint()); + try { + HttpPost post = new HttpPost(getProvenanceFeedEndpoint()); + List params = Lists.newArrayList(); + String datasourceInterfaceValue = getDatasourceInterface(); + log.info("feeding provenance for datasourceInterface " + datasourceInterfaceValue); + params.add(new BasicNameValuePair("datasourceApi", datasourceInterfaceValue)); + UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); + post.setEntity(ent); + HttpClient client = HttpClients.createDefault(); + HttpResponse responsePOST = client.execute(post); + int statusCode = responsePOST.getStatusLine().getStatusCode(); + switch (statusCode) { + case 200: + log.info("feed provenance completed"); + break; + default: + log.error("error feeding provenance " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase()); + break; + } + } catch (ConnectException ce) { + throw new MSROException("unable to connect to Publisher endpoint" + getPublishEndpoint()); + } + catch (IOException e) { + log.error("error feeding provenance ", e); + } + } + return Arc.DEFAULT_ARC; } public String getPublisherEndpoint() { return publisherEndpoint; } + + private String getPublishEndpoint() { + return publisherEndpoint.concat("/publish"); + } + + private String getProvenanceFeedEndpoint() { + return publisherEndpoint.concat("/feedProvenance"); + } public void setPublisherEndpoint(final String publisherEndpoint) { this.publisherEndpoint = publisherEndpoint; @@ -133,5 +177,14 @@ public class PublishGraphDBJobNode extends AsyncJobNode { public void setEprParam(String eprParam) { this.eprParam = eprParam; } + + public String getDatasourceInterface() { + return datasourceInterface; + } + + + public void setDatasourceInterface(String datasourceInterface) { + this.datasourceInterface = datasourceInterface; + } } diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/graphdb_template.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/graphdb_template.xml index d85aaae..edab252 100644 --- a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/graphdb_template.xml +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/graphdb_template.xml @@ -30,6 +30,7 @@ +