From 819ef88520c97080dc970cbff0411e6beedb641e Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Mon, 1 Jun 2020 12:29:48 +0200 Subject: [PATCH] new workflow to import periodo from url --- .../nodes/EnrichGraphDBContentJobNode.java | 11 +- .../ImportPeriodoIntoGraphDBJobNode.java | 119 ++++++++++++++++++ ...licationContext-ariadneplus-msro-nodes.xml | 2 + .../import_periodo_into_graphdb_wf.xml.st | 55 ++++++++ .../workflows/enrich_graphdb_template.xml | 2 +- .../import_periodo_into_graphdb_template.xml | 31 +++++ .../workflows/repo_hi_enrich_graphdb.xml | 2 +- .../repo_hi_import_periodo_into_graphdb.xml | 60 +++++++++ 8 files changed, 277 insertions(+), 5 deletions(-) create mode 100644 dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/ImportPeriodoIntoGraphDBJobNode.java create mode 100644 dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/import_periodo_into_graphdb_wf.xml.st create mode 100644 dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/import_periodo_into_graphdb_template.xml create mode 100644 dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_import_periodo_into_graphdb.xml diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java index fb4d727..0e99fc7 100644 --- a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/EnrichGraphDBContentJobNode.java @@ -81,10 +81,12 @@ public class EnrichGraphDBContentJobNode extends AsyncJobNode { break; } } catch (ConnectException ce) { - throw new MSROException("unable to connect to Publisher endpoint" + getEnrichEndpoint()); + log.error(ce); + throw new MSROException("Unable to connect to Publisher endpoint" + getEnrichEndpoint()); } catch (IOException e) { - log.error("IO error enriching graphDB ", e); + log.error(e); + throw new MSROException("IO Error" + getEnrichEndpoint()); } finally{ if(responsePOST != null) responsePOST.close(); @@ -95,7 +97,10 @@ public class EnrichGraphDBContentJobNode extends AsyncJobNode { env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode)); env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "enrichResult", enrichResult); - log.info("enriching completed"); + log.info(enrichResult); + if (statusCode!=200) { + throw new MSROException("Error from Publisher endpoint [ status code: " + statusCode + " ]"); + } return Arc.DEFAULT_ARC; } diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/ImportPeriodoIntoGraphDBJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/ImportPeriodoIntoGraphDBJobNode.java new file mode 100644 index 0000000..58359d4 --- /dev/null +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/ImportPeriodoIntoGraphDBJobNode.java @@ -0,0 +1,119 @@ +package eu.dnetlib.ariadneplus.workflows.nodes; + +import com.google.common.collect.Lists; +import eu.dnetlib.msro.workflows.graph.Arc; +import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; +import eu.dnetlib.msro.workflows.procs.Env; +import eu.dnetlib.msro.workflows.util.WorkflowsConstants; +import eu.dnetlib.rmi.manager.MSROException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.NameValuePair; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicNameValuePair; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.List; + + +public class ImportPeriodoIntoGraphDBJobNode extends AsyncJobNode { + + private static final Log log = LogFactory.getLog(ImportPeriodoIntoGraphDBJobNode.class); + + private String dataUrl; + private String context; + private String publisherEndpoint; + + //for parallel requests to the publisher endpoint + private int nThreads = 5; + + @Override + protected String execute(final Env env) throws Exception { + + int statusCode = -1; + String loadedResult = "noResult"; + log.info("Publisher endpoint: " + getPublisherEndpoint()); + log.info("Context: " + getContext()); + log.info("Data url: " + getDataUrl()); + PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); + cm.setMaxTotal(nThreads); + CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build(); + + log.info("Feed from url endpoint: " + getFeedFromUrlEndpoint()); + CloseableHttpResponse responsePOST = null; + try { + HttpPost post = new HttpPost(getFeedFromUrlEndpoint()); + List params = Lists.newArrayList(); + params.add(new BasicNameValuePair("dataUrl", getDataUrl())); + params.add(new BasicNameValuePair("context", getContext())); + UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, "UTF-8"); + post.setEntity(entity); + responsePOST = client.execute(post); + statusCode = responsePOST.getStatusLine().getStatusCode(); + switch (statusCode) { + case 200: + log.info("data loaded completed"); + loadedResult = "data loaded from url "+ getDataUrl() + " into context "+ getContext(); + break; + default: + log.error("error loading data into graphDB " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase()); + break; + } + } catch (ConnectException ce) { + log.error(ce); + throw new MSROException("Unable to connect to Publisher endpoint" + getFeedFromUrlEndpoint()); + } + catch (IOException e) { + log.error(e); + throw new MSROException("IO error " + getFeedFromUrlEndpoint()); + } + finally{ + if(responsePOST != null) responsePOST.close(); + client.close(); + cm.shutdown(); + } + + env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode)); + env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "loadedResult", loadedResult); + + log.info(loadedResult); + if (statusCode!=200) { + throw new Exception("Error from Publisher endpoint [ status code: " + statusCode + " ]"); + } + return Arc.DEFAULT_ARC; + } + + public String getPublisherEndpoint() { + return publisherEndpoint; + } + + private String getFeedFromUrlEndpoint() { + return publisherEndpoint.concat("/feedFromURL"); + } + + public void setPublisherEndpoint(final String publisherEndpoint) { + this.publisherEndpoint = publisherEndpoint; + } + + public String getDataUrl() { + return dataUrl; + } + + public void setDataUrl(String dataUrl) { + this.dataUrl = dataUrl; + } + + public String getContext() { + return context; + } + + public void setContext(String context) { + this.context = context; + } +} diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml index 45b5715..cca0863 100644 --- a/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/nodes/applicationContext-ariadneplus-msro-nodes.xml @@ -15,4 +15,6 @@ + + diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/import_periodo_into_graphdb_wf.xml.st b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/import_periodo_into_graphdb_wf.xml.st new file mode 100644 index 0000000..85cac47 --- /dev/null +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/ariadneplus/workflows/repo-hi/import_periodo_into_graphdb_wf.xml.st @@ -0,0 +1,55 @@ + + +
+ + + + + +
+ + $name$ + $desc$ + + aggregator + $priority$ + + + + + http://localhost:8281/ariadneplus-graphdb + + + + + + Import periodo data into GraphDB from url + + + + + + + + + + + + + + + + + + + + + + + + 9 9 9 ? * * + 10080 + + + +
diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/enrich_graphdb_template.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/enrich_graphdb_template.xml index 30137dc..34dcf03 100644 --- a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/enrich_graphdb_template.xml +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/enrich_graphdb_template.xml @@ -12,7 +12,7 @@ - http://localhost:8080/ariadneplus/publish + http://localhost:8281/ariadneplus-graphdb diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/import_periodo_into_graphdb_template.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/import_periodo_into_graphdb_template.xml new file mode 100644 index 0000000..5d201bf --- /dev/null +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/import_periodo_into_graphdb_template.xml @@ -0,0 +1,31 @@ + +
+ + + + + +
+ + + + http://localhost:8281/ariadneplus-graphdb + + ariadneplus::datasourcename::periodo + + + + Import periodo data into GraphDB from url + + + + + + + + + + + + +
\ No newline at end of file diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_enrich_graphdb.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_enrich_graphdb.xml index 2449cf8..fd23df7 100644 --- a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_enrich_graphdb.xml +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_enrich_graphdb.xml @@ -8,7 +8,7 @@ - ENRICH GRAPHDB CONTENT + Enrich GraphDB Content Enrich GraphDB Content with sparql update query Enrich diff --git a/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_import_periodo_into_graphdb.xml b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_import_periodo_into_graphdb.xml new file mode 100644 index 0000000..32d09a5 --- /dev/null +++ b/dnet-ariadneplus/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/repo_hi_import_periodo_into_graphdb.xml @@ -0,0 +1,60 @@ + + +
+ + + + + +
+ + Import periodo data + Import periodo data into GraphDB from url + + Import + Content Provider + + REPO_HI + 20 + + + + + Verify if DS is pending + + + + + + + + + + + Validate DS + + + + + + + Create Workflow + + + + + + + + + + + + + + 9 9 9 ? * * + 10080 + + + +