From 8073ac7131903bf948ae0939c38f7102310b599f Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Tue, 20 Oct 2020 17:43:27 +0200 Subject: [PATCH] the node retrieves resources identifier list for collection and record, then loops on the 2 identifiers lists and request indexing to rest module --- .../workflows/nodes/IndexOnESJobNode.java | 224 ++++++++++++------ 1 file changed, 145 insertions(+), 79 deletions(-) diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java index 69e81d8..05a6564 100644 --- a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/IndexOnESJobNode.java @@ -1,33 +1,27 @@ package eu.dnetlib.ariadneplus.workflows.nodes; -import com.google.common.collect.Lists; import eu.dnetlib.msro.workflows.graph.Arc; import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; +import eu.dnetlib.msro.workflows.nodes.is.ValidateProfilesJobNode; import eu.dnetlib.msro.workflows.procs.Env; -import eu.dnetlib.msro.workflows.util.WorkflowsConstants; +import eu.dnetlib.msro.workflows.procs.Token; +import eu.dnetlib.msro.workflows.util.ProgressProvider; import eu.dnetlib.rmi.manager.MSROException; -import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; -import org.apache.http.NameValuePair; import org.apache.http.client.HttpClient; -import org.apache.http.client.entity.UrlEncodedFormEntity; -import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.utils.URIBuilder; -import org.apache.http.config.SocketConfig; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; -import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import java.io.IOException; -import java.io.InputStream; -import java.net.ConnectException; import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; import java.util.List; @@ -38,88 +32,88 @@ public class IndexOnESJobNode extends AsyncJobNode { private String publisherEndpoint; private String datasourceInterface; private String datasource; + private int currentResourceToIndex = 0; + private int totalResourceToIndex = 0; + +// @Override +// protected String execute(final Env env) throws Exception { +// +// int statusCode = -1; +// String response = ""; +// log.info("IndexOnES endpoint: " + getIndexOnESEndpoint()); +// HttpClient client = null; +// try { +// String[] splits = getDatasourceInterface().split("::"); +// String datasource = splits[2]; +// String collectionId = splits[3]; +// +// URI getURI = new URIBuilder(getIndexOnESEndpoint()) +// .addParameter("datasource", datasource) +// .addParameter("collectionId", collectionId) +// .build(); +// client = HttpClients.createDefault(); +// HttpResponse res = client.execute(new HttpGet(getURI)); +// response = EntityUtils.toString(res.getEntity()); +// if (res.getStatusLine()!=null) { +// statusCode = res.getStatusLine().getStatusCode(); +// } +// +// } +// catch (Throwable t) { +// log.error(t); +// throw new MSROException("Indexing on Elastic Search: " + t.getMessage()); +// } +// +// finally{ +// } +// +// env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode)); +// env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "response", response); +// +// if (statusCode!=200) { +// throw new MSROException("Error from Publisher endpoint [ status code: " + statusCode + " ]"); +// } +// +// return Arc.DEFAULT_ARC; +// } @Override protected String execute(final Env env) throws Exception { - int statusCode = -1; - String response = ""; -// String indexOnESResult = "noResult"; -// log.info("Publisher endpoint: " + getPublisherEndpoint()); -// SocketConfig socketConfig = SocketConfig.custom().setSoKeepAlive(true).build(); -// CloseableHttpClient client = HttpClientBuilder.create() -// .setDefaultSocketConfig(socketConfig).build(); - log.info("IndexOnES endpoint: " + getIndexOnESEndpoint()); -// CloseableHttpResponse responsePOST = null; - HttpClient client = null; + final String collectionResourceType = "COLLECTION"; + final String recordResourceType = "RECORD"; try { -// HttpPost post = new HttpPost(getIndexOnESEndpoint()); -// List params = Lists.newArrayList(); String[] splits = getDatasourceInterface().split("::"); String datasource = splits[2]; String collectionId = splits[3]; -// params.add(new BasicNameValuePair("datasource", datasource)); -// params.add(new BasicNameValuePair("collectionId", collectionId)); -// UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); -// post.setEntity(ent); -// log.info("Calling IndexOnES endpoint with params: "+getDatasource()+" "+getDatasourceInterface()); -// responsePOST = client.execute(post); - - URI getURI = new URIBuilder(getIndexOnESEndpoint()) - .addParameter("datasource", datasource) - .addParameter("collectionId", collectionId) - .build(); - client = HttpClients.createDefault(); - HttpResponse res = client.execute(new HttpGet(getURI)); - response = EntityUtils.toString(res.getEntity()); - if (res.getStatusLine()!=null) { - statusCode = res.getStatusLine().getStatusCode(); + + List collectionIdentifiers = selectIdentifiers(datasource, collectionId, collectionResourceType); + if (!collectionIdentifiers.isEmpty()) { + collectionIdentifiers.forEach(identifier -> { + try { + indexing(datasource, collectionId, collectionResourceType, cleanIdentifier(identifier)); + } catch (Throwable t) { + log.error(identifier+" "+t); + } + }); } -// statusCode = responsePOST.getStatusLine().getStatusCode(); -// try(InputStream responseBody = responsePOST.getEntity().getContent()) { -// indexOnESResult = IOUtils.toString(responseBody, "UTF-8"); -// } catch (Exception e) { -// log.error(e); -// } -// switch (statusCode) { -// case 200: -// log.info("index on ES completed"); -// break; -// default: -// log.error("error indexing on ES " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase()); -// break; -// } + List recordIdentifiers = selectIdentifiers(datasource, collectionId, recordResourceType); + if (!recordIdentifiers.isEmpty()) { + recordIdentifiers.forEach(identifier -> { + try { + indexing(datasource, collectionId, recordResourceType, cleanIdentifier(identifier)); + } catch (Throwable t) { + log.error(identifier+" "+t); + } + }); + } } -// catch (ConnectException ce) { -// log.error(ce); -// throw new MSROException("Unable to connect to Publisher endpoint" + getIndexOnESEndpoint()); -// } -// catch (IOException e) { -// log.error(e); -// throw new MSROException("IO Error" + getIndexOnESEndpoint()); -// } catch (Throwable t) { log.error(t); throw new MSROException("Indexing on Elastic Search: " + t.getMessage()); } - finally{ -// if(responsePOST != null) responsePOST.close(); -// client.close(); -// cm.shutdown(); - - } - - env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode)); -// env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "indexResult", indexOnESResult); - - env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "response", response); - - if (statusCode!=200) { - throw new MSROException("Error from Publisher endpoint [ status code: " + statusCode + " ]"); - } - return Arc.DEFAULT_ARC; } @@ -128,7 +122,11 @@ public class IndexOnESJobNode extends AsyncJobNode { } private String getIndexOnESEndpoint() { - return publisherEndpoint.concat("/indexOnES"); + return publisherEndpoint.concat("/indexOnESByIdentifier"); + } + + private String getSelectIdentifiersEndpoint() { + return publisherEndpoint.concat("/selectIdentifiers"); } public void setPublisherEndpoint(final String publisherEndpoint) { @@ -151,4 +149,72 @@ public class IndexOnESJobNode extends AsyncJobNode { this.datasource = datasource; } + private String cleanIdentifier(String identifier) { + String cleaned = identifier; + try { + cleaned = identifier + .replace("[", "") + .replace("]", "") + .replace("\"", ""); + } + catch (Exception e) { + + } + return cleaned; + } + + private String indexing(String datasource, String collectionId, String resourceType, String identifier) throws IOException, URISyntaxException { + int statusCode = -1; + String response = ""; + String result = ""; + String endpoint = getIndexOnESEndpoint(); + HttpClient client = null; + URI postURI = new URIBuilder(endpoint) + .addParameter("datasource", datasource) + .addParameter("collectionId", collectionId) + .addParameter("resourceType", resourceType) + .addParameter("identifier", identifier) + .build(); + client = HttpClients.createDefault(); + HttpResponse res = client.execute(new HttpPost(postURI)); + if (res.getStatusLine()!=null) { + statusCode = res.getStatusLine().getStatusCode(); + } + HttpEntity entity = res.getEntity(); + result = EntityUtils.toString(entity); + return result; + } + + private List selectIdentifiers(String datasource, String collectionId, String resourceType) throws Exception { + int statusCode = -1; + String response = ""; + List identifiers = null; + String endpoint = getSelectIdentifiersEndpoint(); + HttpClient client = null; + URI getURI = new URIBuilder(endpoint) + .addParameter("datasource", datasource) + .addParameter("collectionId", collectionId) + .addParameter("resourceType", resourceType) + .build(); + client = HttpClients.createDefault(); + HttpResponse res = client.execute(new HttpGet(getURI)); + if (res.getStatusLine()!=null) { + statusCode = res.getStatusLine().getStatusCode(); + } + HttpEntity entity = res.getEntity(); + String content = EntityUtils.toString(entity); + String[] identifiersStr = content.split(","); + identifiers = Arrays.asList(identifiersStr); + return identifiers; + } + + @Override + protected void beforeStart(final Token token) { + token.setProgressProvider(new ProgressProvider() { + @Override + public String getProgressDescription() { + return IndexOnESJobNode.this.currentResourceToIndex + " / " + IndexOnESJobNode.this.totalResourceToIndex; + } + }); + } }