new workflow to import periodo from url

This commit is contained in:
Enrico Ottonello 2020-06-01 12:29:48 +02:00
parent 9814069fb2
commit 819ef88520
8 changed files with 277 additions and 5 deletions

View File

@ -81,10 +81,12 @@ public class EnrichGraphDBContentJobNode extends AsyncJobNode {
break;
}
} catch (ConnectException ce) {
throw new MSROException("unable to connect to Publisher endpoint" + getEnrichEndpoint());
log.error(ce);
throw new MSROException("Unable to connect to Publisher endpoint" + getEnrichEndpoint());
}
catch (IOException e) {
log.error("IO error enriching graphDB ", e);
log.error(e);
throw new MSROException("IO Error" + getEnrichEndpoint());
}
finally{
if(responsePOST != null) responsePOST.close();
@ -95,7 +97,10 @@ public class EnrichGraphDBContentJobNode extends AsyncJobNode {
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode));
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "enrichResult", enrichResult);
log.info("enriching completed");
log.info(enrichResult);
if (statusCode!=200) {
throw new MSROException("Error from Publisher endpoint [ status code: " + statusCode + " ]");
}
return Arc.DEFAULT_ARC;
}

View File

@ -0,0 +1,119 @@
package eu.dnetlib.ariadneplus.workflows.nodes;
import com.google.common.collect.Lists;
import eu.dnetlib.msro.workflows.graph.Arc;
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
import eu.dnetlib.msro.workflows.procs.Env;
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
import eu.dnetlib.rmi.manager.MSROException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import java.io.IOException;
import java.net.ConnectException;
import java.util.List;
public class ImportPeriodoIntoGraphDBJobNode extends AsyncJobNode {
private static final Log log = LogFactory.getLog(ImportPeriodoIntoGraphDBJobNode.class);
private String dataUrl;
private String context;
private String publisherEndpoint;
//for parallel requests to the publisher endpoint
private int nThreads = 5;
@Override
protected String execute(final Env env) throws Exception {
int statusCode = -1;
String loadedResult = "noResult";
log.info("Publisher endpoint: " + getPublisherEndpoint());
log.info("Context: " + getContext());
log.info("Data url: " + getDataUrl());
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(nThreads);
CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build();
log.info("Feed from url endpoint: " + getFeedFromUrlEndpoint());
CloseableHttpResponse responsePOST = null;
try {
HttpPost post = new HttpPost(getFeedFromUrlEndpoint());
List<NameValuePair> params = Lists.newArrayList();
params.add(new BasicNameValuePair("dataUrl", getDataUrl()));
params.add(new BasicNameValuePair("context", getContext()));
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, "UTF-8");
post.setEntity(entity);
responsePOST = client.execute(post);
statusCode = responsePOST.getStatusLine().getStatusCode();
switch (statusCode) {
case 200:
log.info("data loaded completed");
loadedResult = "data loaded from url "+ getDataUrl() + " into context "+ getContext();
break;
default:
log.error("error loading data into graphDB " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
break;
}
} catch (ConnectException ce) {
log.error(ce);
throw new MSROException("Unable to connect to Publisher endpoint" + getFeedFromUrlEndpoint());
}
catch (IOException e) {
log.error(e);
throw new MSROException("IO error " + getFeedFromUrlEndpoint());
}
finally{
if(responsePOST != null) responsePOST.close();
client.close();
cm.shutdown();
}
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode));
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "loadedResult", loadedResult);
log.info(loadedResult);
if (statusCode!=200) {
throw new Exception("Error from Publisher endpoint [ status code: " + statusCode + " ]");
}
return Arc.DEFAULT_ARC;
}
public String getPublisherEndpoint() {
return publisherEndpoint;
}
private String getFeedFromUrlEndpoint() {
return publisherEndpoint.concat("/feedFromURL");
}
public void setPublisherEndpoint(final String publisherEndpoint) {
this.publisherEndpoint = publisherEndpoint;
}
public String getDataUrl() {
return dataUrl;
}
public void setDataUrl(String dataUrl) {
this.dataUrl = dataUrl;
}
public String getContext() {
return context;
}
public void setContext(String context) {
this.context = context;
}
}

View File

@ -15,4 +15,6 @@
<bean id="wfNodeElasticSearchIndex" class="eu.dnetlib.ariadneplus.workflows.nodes.ElasticSearchIndexJobNode" scope="prototype"/>
<bean id="wfNodeEnrichGraphDBContent" class="eu.dnetlib.ariadneplus.workflows.nodes.EnrichGraphDBContentJobNode" scope="prototype"/>
<bean id="wfNodeImportPeriodoIntoGraphDB" class="eu.dnetlib.ariadneplus.workflows.nodes.ImportPeriodoIntoGraphDBJobNode" scope="prototype"/>
</beans>

View File

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value=""/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value=""/>
</HEADER>
<BODY>
<WORKFLOW_NAME>$name$</WORKFLOW_NAME>
<WORKFLOW_DESCRIPTION>$desc$</WORKFLOW_DESCRIPTION>
<WORKFLOW_INFO />
<WORKFLOW_FAMILY>aggregator</WORKFLOW_FAMILY>
<WORKFLOW_PRIORITY>$priority$</WORKFLOW_PRIORITY>
<DATASOURCE id="$dsId$" interface="$interface$" />
<CONFIGURATION status="WAIT_SYS_SETTINGS" start="MANUAL">
<PARAMETERS>
<PARAM name="publisherEndpoint" description="AriadnePlus Publisher Endpoint" required="true" managedBy="user" type="string">http://localhost:8281/ariadneplus-graphdb</PARAM>
<PARAM name="context" description="GraphDB context related to periodo data" required="true" managedBy="user" type="string"></PARAM>
<PARAM name="dataUrl" description="Url of Periodo file to import" required="true" managedBy="user" type="string"></PARAM>
</PARAMETERS>
<WORKFLOW>
<NODE name="importPeriodoIntoGraphDB" type="LaunchWorkflowTemplate" isStart="true">
<DESCRIPTION>Import periodo data into GraphDB from url</DESCRIPTION>
<PARAMETERS>
<PARAM name="wfTemplateId" value="f797c7b5-9f2e-4ae8-b908-2b910765fc2b_V29ya2Zsb3dUZW1wbGF0ZURTUmVzb3VyY2VzL1dvcmtmbG93VGVtcGxhdGVEU1Jlc291cmNlVHlwZQ=="/>
<PARAM name="wfTemplateParams">
<MAP>
<ENTRY key="publisherEndpoint" ref="publisherEndpoint" />
<ENTRY key="dataUrl" ref="dataUrl" />
<ENTRY key="context" ref="context" />
</MAP>
</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</WORKFLOW>
<DESTROY_WORKFLOW_TEMPLATE id="23ef4bb3-2383-45b4-9661-ab03472fcd52_V29ya2Zsb3dUZW1wbGF0ZURTUmVzb3VyY2VzL1dvcmtmbG93VGVtcGxhdGVEU1Jlc291cmNlVHlwZQ==">
<PARAMETERS/>
</DESTROY_WORKFLOW_TEMPLATE>
</CONFIGURATION>
<NOTIFICATIONS/>
<SCHEDULING enabled="false">
<CRON>9 9 9 ? * *</CRON>
<MININTERVAL>10080</MININTERVAL>
</SCHEDULING>
<STATUS/>
</BODY>
</RESOURCE_PROFILE>

View File

@ -12,7 +12,7 @@
<PARAM description="Datasource Name" name="dsName" required="true" type="string"/>
<PARAM description="Datasource Id" name="dsId" required="true" type="string"/>
<PARAM description="Datasource Interface" name="interface" required="true" type="string"/>
<PARAM description="AriadnePlus Publisher Endpoint" name="publisherEndpoint" required="true" type="string">http://localhost:8080/ariadneplus/publish</PARAM>
<PARAM description="AriadnePlus Publisher Endpoint" name="publisherEndpoint" required="true" type="string">http://localhost:8281/ariadneplus-graphdb</PARAM>
<PARAM description="Sparql Update Query" name="sparqlUpdateQuery" required="true" type="string"></PARAM>
</PARAMETERS>
<WORKFLOW>

View File

@ -0,0 +1,31 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="f797c7b5-9f2e-4ae8-b908-2b910765fc2b_V29ya2Zsb3dUZW1wbGF0ZURTUmVzb3VyY2VzL1dvcmtmbG93VGVtcGxhdGVEU1Jlc291cmNlVHlwZQ=="/>
<RESOURCE_TYPE value="WorkflowTemplateDSResourceType"/>
<RESOURCE_KIND value="WorkflowTemplateDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-05-28T16:53:35+02:00"/>
</HEADER>
<BODY>
<CONFIGURATION>
<PARAMETERS>
<PARAM description="AriadnePlus Publisher Endpoint" name="publisherEndpoint" required="true" type="string">http://localhost:8281/ariadneplus-graphdb</PARAM>
<PARAM description="Url of Periodo file to import" name="dataUrl" required="true" type="string"></PARAM>
<PARAM description="GraphDB context related to periodo data" name="context" required="true" type="string">ariadneplus::datasourcename::periodo</PARAM>
</PARAMETERS>
<WORKFLOW>
<NODE isStart="true" name="importPeriodoIntoGraphDB" type="ImportPeriodoIntoGraphDB">
<DESCRIPTION>Import periodo data into GraphDB from url</DESCRIPTION>
<PARAMETERS>
<PARAM name="publisherEndpoint" ref="publisherEndpoint"/>
<PARAM name="dataUrl" ref="dataUrl"/>
<PARAM name="context" ref="context"/>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</WORKFLOW>
</CONFIGURATION>
</BODY>
</RESOURCE_PROFILE>

View File

@ -8,7 +8,7 @@
<DATE_OF_CREATION value="2020-05-28T12:00:00.0Z"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>ENRICH GRAPHDB CONTENT</WORKFLOW_NAME>
<WORKFLOW_NAME>Enrich GraphDB Content</WORKFLOW_NAME>
<WORKFLOW_DESCRIPTION>Enrich GraphDB Content with sparql update query</WORKFLOW_DESCRIPTION>
<WORKFLOW_INFO>
<FIELD name="Action">Enrich</FIELD>

View File

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="6700c914-6f66-4a05-be61-32c0db32911f_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-05-28T12:00:00.0Z"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>Import periodo data</WORKFLOW_NAME>
<WORKFLOW_DESCRIPTION>Import periodo data into GraphDB from url</WORKFLOW_DESCRIPTION>
<WORKFLOW_INFO>
<FIELD name="Action">Import</FIELD>
<FIELD name="Datasource class">Content Provider</FIELD>
</WORKFLOW_INFO>
<WORKFLOW_FAMILY>REPO_HI</WORKFLOW_FAMILY>
<WORKFLOW_PRIORITY>20</WORKFLOW_PRIORITY>
<CONFIGURATION status="EXECUTABLE" start="MANUAL">
<PARAMETERS/>
<WORKFLOW>
<NODE name="VerifyDatasource" type="VerifyDatasource" isStart="true">
<DESCRIPTION>Verify if DS is pending</DESCRIPTION>
<PARAMETERS>
<PARAM name="expectedInterfaceTypologyPrefixes" value=""/>
<PARAM name="expectedCompliancePrefixes" value="metadata,native"/>
</PARAMETERS>
<ARCS>
<ARC to="registerImportPeriodoWf"/>
<ARC to="validateDs" name="validateDs"/>
</ARCS>
</NODE>
<NODE name="validateDs" type="ValidateDatasource">
<DESCRIPTION>Validate DS</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="registerImportPeriodoWf"/>
</ARCS>
</NODE>
<NODE name="registerImportPeriodoWf" type="RegisterWorkflowFromTemplate">
<DESCRIPTION>Create Workflow</DESCRIPTION>
<PARAMETERS>
<PARAM name="wfName" value="Import Periodo"/>
<PARAM name="wfTemplate" value="/eu/dnetlib/ariadneplus/workflows/repo-hi/import_periodo_into_graphdb_wf.xml.st"/>
<PARAM name="description" value="Import periodo data into GraphDB from url"/>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</WORKFLOW>
</CONFIGURATION>
<NOTIFICATIONS/>
<SCHEDULING enabled="false">
<CRON>9 9 9 ? * *</CRON>
<MININTERVAL>10080</MININTERVAL>
</SCHEDULING>
<STATUS/>
</BODY>
</RESOURCE_PROFILE>