From 30b3fa2140f2c01be2e00c5efb160f060ac02717 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Fri, 29 May 2020 16:19:55 +0200 Subject: [PATCH] New JobNode and workflow to enrich content on GraphDB --- .../ariadneplus/graphdb/GraphDBClient.java | 2 +- .../AriadnePlusPublisherController.java | 14 +- .../publisher/AriadnePlusPublisherHelper.java | 10 +- .../nodes/EnrichGraphDBContentJobNode.java | 160 ++++++++++++++++++ ...licationContext-ariadneplus-msro-nodes.xml | 2 +- .../repo-hi/enrich_graphdb_wf.xml.st | 56 ++++++ .../workflows/enrich_graphdb_template.xml | 34 ++++ .../workflows/repo_hi_enrich_graphdb.xml | 60 +++++++ 8 files changed, 320 insertions(+), 18 deletions(-) create mode 100644 dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java create mode 100644 dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/enrich_graphdb_wf.xml.st create mode 100644 dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/enrich_graphdb_template.xml create mode 100644 dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_enrich_graphdb.xml diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java index 3619f37..d65d9b3 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java @@ -241,7 +241,7 @@ public class GraphDBClient { this.repository = repository; } - public String executeSparql(final String queryValue) throws AriadnePlusPublisherException{ + public String updateSparql(final String queryValue) throws AriadnePlusPublisherException{ try { String result = new String(""); log.debug("init connection to graphDBServerUrl " + this.graphDBServerUrl); diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherController.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherController.java index 2a45cf8..353b7b2 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherController.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherController.java @@ -68,16 +68,8 @@ public class AriadnePlusPublisherController { this.ariadneplusPublisherHelper = ariadneplusPublisherHelper; } - @RequestMapping(value = "/executeSparql", method = RequestMethod.POST) - public String executeSparql(@RequestBody final String queryValue) throws AriadnePlusPublisherException { -// queryValue = "PREFIX aocat: \n" + -// " PREFIX skos:\n" + -// " INSERT { GRAPH { aocat:has_title \" inserito da controller rest\" } }\n" + -// " WHERE{\n" + -// " GRAPH {\n" + -// " { aocat:has_title ?title } .\n" + -// " }\n" + -// " };"; - return getAriadnePlusPublisherHelper().executeSparql(queryValue, getTarget(DEFAULT_TARGET_ENDPOINT)); + @RequestMapping(value = "/updateSparql", method = RequestMethod.POST) + public String updateSparql(@RequestBody final String queryValue) throws AriadnePlusPublisherException { + return getAriadnePlusPublisherHelper().updateSparql(queryValue, getTarget(DEFAULT_TARGET_ENDPOINT)); } } \ No newline at end of file diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java index 6d053d4..db55703 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java @@ -66,11 +66,11 @@ public class AriadnePlusPublisherHelper { return res; } - public String executeSparql(final String queryValue, final AriadnePlusTargets target) throws AriadnePlusPublisherException { + public String updateSparql(final String queryValue, final AriadnePlusTargets target) throws AriadnePlusPublisherException { String res; switch(target){ case GRAPHDB: - res = executeSparqlGraphDB(queryValue); + res = updateSparqlGraphDB(queryValue); break; default: throw new AriadnePlusPublisherException("Target "+target+" not supported yet"); } @@ -102,9 +102,9 @@ public class AriadnePlusPublisherHelper { return 0; } - private String executeSparqlGraphDB(final String queryValue) throws AriadnePlusPublisherException { - log.info("executeSparqlGraphDB "+queryValue); + private String updateSparqlGraphDB(final String queryValue) throws AriadnePlusPublisherException { + log.info("updateSparqlGraphDB "+queryValue); GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient(); - return graphDBClient.executeSparql(queryValue); + return graphDBClient. updateSparql(queryValue); } } diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java new file mode 100644 index 0000000..fb4d727 --- /dev/null +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java @@ -0,0 +1,160 @@ +package eu.dnetlib.ariadneplus.workflows.nodes; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import eu.dnetlib.enabling.resultset.client.ResultSetClient; +import eu.dnetlib.msro.workflows.graph.Arc; +import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; +import eu.dnetlib.msro.workflows.procs.Env; +import eu.dnetlib.msro.workflows.procs.Token; +import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider; +import eu.dnetlib.msro.workflows.util.WorkflowsConstants; +import eu.dnetlib.rmi.common.ResultSet; +import eu.dnetlib.rmi.manager.MSROException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.NameValuePair; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicNameValuePair; +import org.springframework.beans.factory.annotation.Autowired; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + + +public class EnrichGraphDBContentJobNode extends AsyncJobNode { + + private static final Log log = LogFactory.getLog(EnrichGraphDBContentJobNode.class); + + private String eprParam; + + @Autowired + private ResultSetClient resultSetClient; + + private String sparqlUpdateQuery; + private String publisherEndpoint; + private String datasourceInterface; + private String datasource; + + //for parallel requests to the publisher endpoint + private int nThreads = 5; + + @Override + protected String execute(final Env env) throws Exception { + + int statusCode = -1; + String enrichResult = "noResult"; + log.info("Publisher endpoint: " + getPublisherEndpoint()); + log.info("Enrich Query Value: " + getSparqlUpdateQuery()); + PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); + cm.setMaxTotal(nThreads); + CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build(); + + log.info("Enrich endpoint: " + getEnrichEndpoint()); + CloseableHttpResponse responsePOST = null; + try { + HttpPost post = new HttpPost(getEnrichEndpoint()); + List params = Lists.newArrayList(); + String datasourceInterfaceValue = getDatasourceInterface(); + StringEntity entity = new StringEntity(getSparqlUpdateQuery()); + post.setEntity(entity); + responsePOST = client.execute(post); + statusCode = responsePOST.getStatusLine().getStatusCode(); + switch (statusCode) { + case 200: + log.info("enrich graphDB content completed"); + break; + default: + log.error("error enriching graphDB " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase()); + break; + } + } catch (ConnectException ce) { + throw new MSROException("unable to connect to Publisher endpoint" + getEnrichEndpoint()); + } + catch (IOException e) { + log.error("IO error enriching graphDB ", e); + } + finally{ + if(responsePOST != null) responsePOST.close(); + client.close(); + cm.shutdown(); + } + + env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode)); + env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "enrichResult", enrichResult); + + log.info("enriching completed"); + + return Arc.DEFAULT_ARC; + } + + public String getPublisherEndpoint() { + return publisherEndpoint; + } + + private String getEnrichEndpoint() { + return publisherEndpoint.concat("/updateSparql"); + } + + public void setPublisherEndpoint(final String publisherEndpoint) { + this.publisherEndpoint = publisherEndpoint; + } + + public ResultSetClient getResultSetClient() { + return resultSetClient; + } + + public void setResultSetClient(final ResultSetClient resultSetClient) { + this.resultSetClient = resultSetClient; + } + + public String getEprParam() { + return eprParam; + } + + public void setEprParam(String eprParam) { + this.eprParam = eprParam; + } + + public String getDatasourceInterface() { + return datasourceInterface; + } + + + public void setDatasourceInterface(String datasourceInterface) { + this.datasourceInterface = datasourceInterface; + } + + @Override + protected void beforeStart(Token token) { + token.setProgressProvider(new ResultsetProgressProvider(token.getEnv().getAttribute(getEprParam(), ResultSet.class), this.resultSetClient)); + } + + public String getDatasource() { + return datasource; + } + + public void setDatasource(String datasource) { + this.datasource = datasource; + } + + public String getSparqlUpdateQuery() { + return sparqlUpdateQuery; + } + + public void setSparqlUpdateQuery(String sparqlUpdateQuery) { + this.sparqlUpdateQuery = sparqlUpdateQuery; + } +} diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml index 187bc05..45b5715 100644 --- a/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml @@ -13,6 +13,6 @@ - + diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/enrich_graphdb_wf.xml.st b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/enrich_graphdb_wf.xml.st new file mode 100644 index 0000000..b5c778a --- /dev/null +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/enrich_graphdb_wf.xml.st @@ -0,0 +1,56 @@ + + +
+ + + + + +
+ + $name$ + $desc$ + + aggregator + $priority$ + + + + + http://localhost:8080/ariadneplus/publish + + + + + Enrich GraphDB with sparql update query + + + + + + + + + + + + + + + + + + + + + + + + + + 9 9 9 ? * * + 10080 + + + +
diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/enrich_graphdb_template.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/enrich_graphdb_template.xml new file mode 100644 index 0000000..30137dc --- /dev/null +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/enrich_graphdb_template.xml @@ -0,0 +1,34 @@ + +
+ + + + + +
+ + + + + + + http://localhost:8080/ariadneplus/publish + + + + + Enrich GraphDB Content with a sparql update query + + + + + + + + + + + + + +
\ No newline at end of file diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_enrich_graphdb.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_enrich_graphdb.xml new file mode 100644 index 0000000..2449cf8 --- /dev/null +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_enrich_graphdb.xml @@ -0,0 +1,60 @@ + + +
+ + + + + +
+ + ENRICH GRAPHDB CONTENT + Enrich GraphDB Content with sparql update query + + Enrich + Content Provider + + REPO_HI + 20 + + + + + Verify if DS is pending + + + + + + + + + + + Validate DS + + + + + + + Create Workflow + + + + + + + + + + + + + + 9 9 9 ? * * + 10080 + + + +