New JobNode and workflow to enrich content on GraphDB

This commit is contained in:
Enrico Ottonello 2020-05-29 16:19:55 +02:00
parent 739dcc6b82
commit 30b3fa2140
8 changed files with 320 additions and 18 deletions

View File

@ -241,7 +241,7 @@ public class GraphDBClient {
this.repository = repository;
}
public String executeSparql(final String queryValue) throws AriadnePlusPublisherException{
public String updateSparql(final String queryValue) throws AriadnePlusPublisherException{
try {
String result = new String("");
log.debug("init connection to graphDBServerUrl " + this.graphDBServerUrl);

View File

@ -68,16 +68,8 @@ public class AriadnePlusPublisherController {
this.ariadneplusPublisherHelper = ariadneplusPublisherHelper;
}
@RequestMapping(value = "/executeSparql", method = RequestMethod.POST)
public String executeSparql(@RequestBody final String queryValue) throws AriadnePlusPublisherException {
// queryValue = "PREFIX aocat: <https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/>\n" +
// " PREFIX skos:<http://www.w3.org/2004/02/skos/core#>\n" +
// " INSERT { GRAPH <https://ariadne-infrastructure.eu/api_________::ariadne_plus::ads::271> { <https://ariadne-infrastructure.eu/aocat/Resource/0D02D6C0-E687-342E-891D-82B39A880F4E> aocat:has_title \" inserito da controller rest\" } }\n" +
// " WHERE{\n" +
// " GRAPH <https://ariadne-infrastructure.eu/api_________::ariadne_plus::ads::271> {\n" +
// " { <https://ariadne-infrastructure.eu/aocat/Resource/0D02D6C0-E687-342E-891D-82B39A880F4E> aocat:has_title ?title } .\n" +
// " }\n" +
// " };";
return getAriadnePlusPublisherHelper().executeSparql(queryValue, getTarget(DEFAULT_TARGET_ENDPOINT));
@RequestMapping(value = "/updateSparql", method = RequestMethod.POST)
public String updateSparql(@RequestBody final String queryValue) throws AriadnePlusPublisherException {
return getAriadnePlusPublisherHelper().updateSparql(queryValue, getTarget(DEFAULT_TARGET_ENDPOINT));
}
}

View File

@ -66,11 +66,11 @@ public class AriadnePlusPublisherHelper {
return res;
}
public String executeSparql(final String queryValue, final AriadnePlusTargets target) throws AriadnePlusPublisherException {
public String updateSparql(final String queryValue, final AriadnePlusTargets target) throws AriadnePlusPublisherException {
String res;
switch(target){
case GRAPHDB:
res = executeSparqlGraphDB(queryValue);
res = updateSparqlGraphDB(queryValue);
break;
default: throw new AriadnePlusPublisherException("Target "+target+" not supported yet");
}
@ -102,9 +102,9 @@ public class AriadnePlusPublisherHelper {
return 0;
}
private String executeSparqlGraphDB(final String queryValue) throws AriadnePlusPublisherException {
log.info("executeSparqlGraphDB "+queryValue);
private String updateSparqlGraphDB(final String queryValue) throws AriadnePlusPublisherException {
log.info("updateSparqlGraphDB "+queryValue);
GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient();
return graphDBClient.executeSparql(queryValue);
return graphDBClient. updateSparql(queryValue);
}
}

View File

@ -0,0 +1,160 @@
package eu.dnetlib.ariadneplus.workflows.nodes;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
import eu.dnetlib.msro.workflows.graph.Arc;
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
import eu.dnetlib.msro.workflows.procs.Env;
import eu.dnetlib.msro.workflows.procs.Token;
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.manager.MSROException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.net.ConnectException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class EnrichGraphDBContentJobNode extends AsyncJobNode {
private static final Log log = LogFactory.getLog(EnrichGraphDBContentJobNode.class);
private String eprParam;
@Autowired
private ResultSetClient resultSetClient;
private String sparqlUpdateQuery;
private String publisherEndpoint;
private String datasourceInterface;
private String datasource;
//for parallel requests to the publisher endpoint
private int nThreads = 5;
@Override
protected String execute(final Env env) throws Exception {
int statusCode = -1;
String enrichResult = "noResult";
log.info("Publisher endpoint: " + getPublisherEndpoint());
log.info("Enrich Query Value: " + getSparqlUpdateQuery());
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(nThreads);
CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build();
log.info("Enrich endpoint: " + getEnrichEndpoint());
CloseableHttpResponse responsePOST = null;
try {
HttpPost post = new HttpPost(getEnrichEndpoint());
List<NameValuePair> params = Lists.newArrayList();
String datasourceInterfaceValue = getDatasourceInterface();
StringEntity entity = new StringEntity(getSparqlUpdateQuery());
post.setEntity(entity);
responsePOST = client.execute(post);
statusCode = responsePOST.getStatusLine().getStatusCode();
switch (statusCode) {
case 200:
log.info("enrich graphDB content completed");
break;
default:
log.error("error enriching graphDB " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
break;
}
} catch (ConnectException ce) {
throw new MSROException("unable to connect to Publisher endpoint" + getEnrichEndpoint());
}
catch (IOException e) {
log.error("IO error enriching graphDB ", e);
}
finally{
if(responsePOST != null) responsePOST.close();
client.close();
cm.shutdown();
}
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode));
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "enrichResult", enrichResult);
log.info("enriching completed");
return Arc.DEFAULT_ARC;
}
public String getPublisherEndpoint() {
return publisherEndpoint;
}
private String getEnrichEndpoint() {
return publisherEndpoint.concat("/updateSparql");
}
public void setPublisherEndpoint(final String publisherEndpoint) {
this.publisherEndpoint = publisherEndpoint;
}
public ResultSetClient getResultSetClient() {
return resultSetClient;
}
public void setResultSetClient(final ResultSetClient resultSetClient) {
this.resultSetClient = resultSetClient;
}
public String getEprParam() {
return eprParam;
}
public void setEprParam(String eprParam) {
this.eprParam = eprParam;
}
public String getDatasourceInterface() {
return datasourceInterface;
}
public void setDatasourceInterface(String datasourceInterface) {
this.datasourceInterface = datasourceInterface;
}
@Override
protected void beforeStart(Token token) {
token.setProgressProvider(new ResultsetProgressProvider(token.getEnv().getAttribute(getEprParam(), ResultSet.class), this.resultSetClient));
}
public String getDatasource() {
return datasource;
}
public void setDatasource(String datasource) {
this.datasource = datasource;
}
public String getSparqlUpdateQuery() {
return sparqlUpdateQuery;
}
public void setSparqlUpdateQuery(String sparqlUpdateQuery) {
this.sparqlUpdateQuery = sparqlUpdateQuery;
}
}

View File

@ -13,6 +13,6 @@
<bean id="wfNodeX3MTransformAriadnePlus" class="eu.dnetlib.ariadneplus.workflows.nodes.X3MTransformAriadnePlusJobNode" scope="prototype"/>
<bean id="wfNodeElasticSearchIndex" class="eu.dnetlib.ariadneplus.workflows.nodes.ElasticSearchIndexJobNode" scope="prototype"/>
<bean id="wfNodeEnrichGraphDBContent" class="eu.dnetlib.ariadneplus.workflows.nodes.EnrichGraphDBContentJobNode" scope="prototype"/>
</beans>

View File

@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value=""/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value=""/>
</HEADER>
<BODY>
<WORKFLOW_NAME>$name$</WORKFLOW_NAME>
<WORKFLOW_DESCRIPTION>$desc$</WORKFLOW_DESCRIPTION>
<WORKFLOW_INFO />
<WORKFLOW_FAMILY>aggregator</WORKFLOW_FAMILY>
<WORKFLOW_PRIORITY>$priority$</WORKFLOW_PRIORITY>
<DATASOURCE id="$dsId$" interface="$interface$" />
<CONFIGURATION status="WAIT_SYS_SETTINGS" start="MANUAL">
<PARAMETERS>
<PARAM name="publisherEndpoint" description="AriadnePlus Publisher Endpoint" required="true" managedBy="user" type="string">http://localhost:8080/ariadneplus/publish</PARAM>
<PARAM name="sparqlUpdateQuery" description="Sparql update query to enrich GraphDB content" required="true" managedBy="user" type="string"></PARAM>
</PARAMETERS>
<WORKFLOW>
<NODE name="enrichGraphDB" type="LaunchWorkflowTemplate" isStart="true">
<DESCRIPTION>Enrich GraphDB with sparql update query</DESCRIPTION>
<PARAMETERS>
<PARAM name="wfTemplateId" value="f780a64d-bb00-4c9b-8393-f738846945f3_V29ya2Zsb3dUZW1wbGF0ZURTUmVzb3VyY2VzL1dvcmtmbG93VGVtcGxhdGVEU1Jlc291cmNlVHlwZQ=="/>
<PARAM name="wfTemplateParams">
<MAP>
<ENTRY key="dsId" value="$dsId$" />
<ENTRY key="dsName" value="$dsName$" />
<ENTRY key="interface" value="$interface$" />
<ENTRY key="publisherEndpoint" ref="publisherEndpoint" />
<ENTRY key="sparqlUpdateQuery" ref="sparqlUpdateQuery" />
</MAP>
</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</WORKFLOW>
<DESTROY_WORKFLOW_TEMPLATE id="23ef4bb3-2383-45b4-9661-ab03472fcd52_V29ya2Zsb3dUZW1wbGF0ZURTUmVzb3VyY2VzL1dvcmtmbG93VGVtcGxhdGVEU1Jlc291cmNlVHlwZQ==">
<PARAMETERS/>
</DESTROY_WORKFLOW_TEMPLATE>
</CONFIGURATION>
<NOTIFICATIONS/>
<SCHEDULING enabled="false">
<CRON>9 9 9 ? * *</CRON>
<MININTERVAL>10080</MININTERVAL>
</SCHEDULING>
<STATUS/>
</BODY>
</RESOURCE_PROFILE>

View File

@ -0,0 +1,34 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="f780a64d-bb00-4c9b-8393-f738846945f3_V29ya2Zsb3dUZW1wbGF0ZURTUmVzb3VyY2VzL1dvcmtmbG93VGVtcGxhdGVEU1Jlc291cmNlVHlwZQ=="/>
<RESOURCE_TYPE value="WorkflowTemplateDSResourceType"/>
<RESOURCE_KIND value="WorkflowTemplateDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-05-28T16:53:35+02:00"/>
</HEADER>
<BODY>
<CONFIGURATION>
<PARAMETERS>
<PARAM description="Datasource Name" name="dsName" required="true" type="string"/>
<PARAM description="Datasource Id" name="dsId" required="true" type="string"/>
<PARAM description="Datasource Interface" name="interface" required="true" type="string"/>
<PARAM description="AriadnePlus Publisher Endpoint" name="publisherEndpoint" required="true" type="string">http://localhost:8080/ariadneplus/publish</PARAM>
<PARAM description="Sparql Update Query" name="sparqlUpdateQuery" required="true" type="string"></PARAM>
</PARAMETERS>
<WORKFLOW>
<NODE isStart="true" name="enrichGraphDB" type="EnrichGraphDBContent">
<DESCRIPTION>Enrich GraphDB Content with a sparql update query</DESCRIPTION>
<PARAMETERS>
<PARAM name="publisherEndpoint" ref="publisherEndpoint"/>
<PARAM name="datasourceInterface" ref="interface"/>
<PARAM name="datasource" ref="dsName"/>
<PARAM name="sparqlUpdateQuery" ref="sparqlUpdateQuery"/>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</WORKFLOW>
</CONFIGURATION>
</BODY>
</RESOURCE_PROFILE>

View File

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="8161893d-e0a4-4d56-ad9e-0681eda9eb54_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-05-28T12:00:00.0Z"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>ENRICH GRAPHDB CONTENT</WORKFLOW_NAME>
<WORKFLOW_DESCRIPTION>Enrich GraphDB Content with sparql update query</WORKFLOW_DESCRIPTION>
<WORKFLOW_INFO>
<FIELD name="Action">Enrich</FIELD>
<FIELD name="Datasource class">Content Provider</FIELD>
</WORKFLOW_INFO>
<WORKFLOW_FAMILY>REPO_HI</WORKFLOW_FAMILY>
<WORKFLOW_PRIORITY>20</WORKFLOW_PRIORITY>
<CONFIGURATION status="EXECUTABLE" start="MANUAL">
<PARAMETERS/>
<WORKFLOW>
<NODE name="VerifyDatasource" type="VerifyDatasource" isStart="true">
<DESCRIPTION>Verify if DS is pending</DESCRIPTION>
<PARAMETERS>
<PARAM name="expectedInterfaceTypologyPrefixes" value=""/>
<PARAM name="expectedCompliancePrefixes" value="metadata,native"/>
</PARAMETERS>
<ARCS>
<ARC to="registerEnrichWf"/>
<ARC to="validateDs" name="validateDs"/>
</ARCS>
</NODE>
<NODE name="validateDs" type="ValidateDatasource">
<DESCRIPTION>Validate DS</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="registerEnrichWf"/>
</ARCS>
</NODE>
<NODE name="registerEnrichWf" type="RegisterWorkflowFromTemplate">
<DESCRIPTION>Create Workflow</DESCRIPTION>
<PARAMETERS>
<PARAM name="wfName" value="Enrich GraphDB Content"/>
<PARAM name="wfTemplate" value="/eu/dnetlib/ariadneplus/workflows/repo-hi/enrich_graphdb_wf.xml.st"/>
<PARAM name="description" value="Enrich GraphDB Content with sparql update query"/>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</WORKFLOW>
</CONFIGURATION>
<NOTIFICATIONS/>
<SCHEDULING enabled="false">
<CRON>9 9 9 ? * *</CRON>
<MININTERVAL>10080</MININTERVAL>
</SCHEDULING>
<STATUS/>
</BODY>
</RESOURCE_PROFILE>