the node retrieves resources identifier list for collection and record, then loops on the 2 identifiers lists and request indexing to rest module

This commit is contained in:
Enrico Ottonello 2020-10-20 17:43:27 +02:00
parent 1398c2495e
commit 8073ac7131
1 changed files with 144 additions and 78 deletions

View File

@ -1,33 +1,27 @@
package eu.dnetlib.ariadneplus.workflows.nodes; package eu.dnetlib.ariadneplus.workflows.nodes;
import com.google.common.collect.Lists;
import eu.dnetlib.msro.workflows.graph.Arc; import eu.dnetlib.msro.workflows.graph.Arc;
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
import eu.dnetlib.msro.workflows.nodes.is.ValidateProfilesJobNode;
import eu.dnetlib.msro.workflows.procs.Env; import eu.dnetlib.msro.workflows.procs.Env;
import eu.dnetlib.msro.workflows.util.WorkflowsConstants; import eu.dnetlib.msro.workflows.procs.Token;
import eu.dnetlib.msro.workflows.util.ProgressProvider;
import eu.dnetlib.rmi.manager.MSROException; import eu.dnetlib.rmi.manager.MSROException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder; import org.apache.http.client.utils.URIBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List; import java.util.List;
@ -38,88 +32,88 @@ public class IndexOnESJobNode extends AsyncJobNode {
private String publisherEndpoint; private String publisherEndpoint;
private String datasourceInterface; private String datasourceInterface;
private String datasource; private String datasource;
private int currentResourceToIndex = 0;
private int totalResourceToIndex = 0;
// @Override
// protected String execute(final Env env) throws Exception {
//
// int statusCode = -1;
// String response = "";
// log.info("IndexOnES endpoint: " + getIndexOnESEndpoint());
// HttpClient client = null;
// try {
// String[] splits = getDatasourceInterface().split("::");
// String datasource = splits[2];
// String collectionId = splits[3];
//
// URI getURI = new URIBuilder(getIndexOnESEndpoint())
// .addParameter("datasource", datasource)
// .addParameter("collectionId", collectionId)
// .build();
// client = HttpClients.createDefault();
// HttpResponse res = client.execute(new HttpGet(getURI));
// response = EntityUtils.toString(res.getEntity());
// if (res.getStatusLine()!=null) {
// statusCode = res.getStatusLine().getStatusCode();
// }
//
// }
// catch (Throwable t) {
// log.error(t);
// throw new MSROException("Indexing on Elastic Search: " + t.getMessage());
// }
//
// finally{
// }
//
// env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode));
// env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "response", response);
//
// if (statusCode!=200) {
// throw new MSROException("Error from Publisher endpoint [ status code: " + statusCode + " ]");
// }
//
// return Arc.DEFAULT_ARC;
// }
@Override @Override
protected String execute(final Env env) throws Exception { protected String execute(final Env env) throws Exception {
int statusCode = -1; final String collectionResourceType = "COLLECTION";
String response = ""; final String recordResourceType = "RECORD";
// String indexOnESResult = "noResult";
// log.info("Publisher endpoint: " + getPublisherEndpoint());
// SocketConfig socketConfig = SocketConfig.custom().setSoKeepAlive(true).build();
// CloseableHttpClient client = HttpClientBuilder.create()
// .setDefaultSocketConfig(socketConfig).build();
log.info("IndexOnES endpoint: " + getIndexOnESEndpoint());
// CloseableHttpResponse responsePOST = null;
HttpClient client = null;
try { try {
// HttpPost post = new HttpPost(getIndexOnESEndpoint());
// List<NameValuePair> params = Lists.newArrayList();
String[] splits = getDatasourceInterface().split("::"); String[] splits = getDatasourceInterface().split("::");
String datasource = splits[2]; String datasource = splits[2];
String collectionId = splits[3]; String collectionId = splits[3];
// params.add(new BasicNameValuePair("datasource", datasource));
// params.add(new BasicNameValuePair("collectionId", collectionId));
// UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
// post.setEntity(ent);
// log.info("Calling IndexOnES endpoint with params: "+getDatasource()+" "+getDatasourceInterface());
// responsePOST = client.execute(post);
URI getURI = new URIBuilder(getIndexOnESEndpoint()) List<String> collectionIdentifiers = selectIdentifiers(datasource, collectionId, collectionResourceType);
.addParameter("datasource", datasource) if (!collectionIdentifiers.isEmpty()) {
.addParameter("collectionId", collectionId) collectionIdentifiers.forEach(identifier -> {
.build(); try {
client = HttpClients.createDefault(); indexing(datasource, collectionId, collectionResourceType, cleanIdentifier(identifier));
HttpResponse res = client.execute(new HttpGet(getURI)); } catch (Throwable t) {
response = EntityUtils.toString(res.getEntity()); log.error(identifier+" "+t);
if (res.getStatusLine()!=null) { }
statusCode = res.getStatusLine().getStatusCode(); });
} }
// statusCode = responsePOST.getStatusLine().getStatusCode(); List<String> recordIdentifiers = selectIdentifiers(datasource, collectionId, recordResourceType);
// try(InputStream responseBody = responsePOST.getEntity().getContent()) { if (!recordIdentifiers.isEmpty()) {
// indexOnESResult = IOUtils.toString(responseBody, "UTF-8"); recordIdentifiers.forEach(identifier -> {
// } catch (Exception e) { try {
// log.error(e); indexing(datasource, collectionId, recordResourceType, cleanIdentifier(identifier));
// } } catch (Throwable t) {
// switch (statusCode) { log.error(identifier+" "+t);
// case 200: }
// log.info("index on ES completed"); });
// break; }
// default:
// log.error("error indexing on ES " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
// break;
// }
} }
// catch (ConnectException ce) {
// log.error(ce);
// throw new MSROException("Unable to connect to Publisher endpoint" + getIndexOnESEndpoint());
// }
// catch (IOException e) {
// log.error(e);
// throw new MSROException("IO Error" + getIndexOnESEndpoint());
// }
catch (Throwable t) { catch (Throwable t) {
log.error(t); log.error(t);
throw new MSROException("Indexing on Elastic Search: " + t.getMessage()); throw new MSROException("Indexing on Elastic Search: " + t.getMessage());
} }
finally{
// if(responsePOST != null) responsePOST.close();
// client.close();
// cm.shutdown();
}
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "statusCode", Integer.toString(statusCode));
// env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "indexResult", indexOnESResult);
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "response", response);
if (statusCode!=200) {
throw new MSROException("Error from Publisher endpoint [ status code: " + statusCode + " ]");
}
return Arc.DEFAULT_ARC; return Arc.DEFAULT_ARC;
} }
@ -128,7 +122,11 @@ public class IndexOnESJobNode extends AsyncJobNode {
} }
private String getIndexOnESEndpoint() { private String getIndexOnESEndpoint() {
return publisherEndpoint.concat("/indexOnES"); return publisherEndpoint.concat("/indexOnESByIdentifier");
}
private String getSelectIdentifiersEndpoint() {
return publisherEndpoint.concat("/selectIdentifiers");
} }
public void setPublisherEndpoint(final String publisherEndpoint) { public void setPublisherEndpoint(final String publisherEndpoint) {
@ -151,4 +149,72 @@ public class IndexOnESJobNode extends AsyncJobNode {
this.datasource = datasource; this.datasource = datasource;
} }
private String cleanIdentifier(String identifier) {
String cleaned = identifier;
try {
cleaned = identifier
.replace("[", "")
.replace("]", "")
.replace("\"", "");
}
catch (Exception e) {
}
return cleaned;
}
private String indexing(String datasource, String collectionId, String resourceType, String identifier) throws IOException, URISyntaxException {
int statusCode = -1;
String response = "";
String result = "";
String endpoint = getIndexOnESEndpoint();
HttpClient client = null;
URI postURI = new URIBuilder(endpoint)
.addParameter("datasource", datasource)
.addParameter("collectionId", collectionId)
.addParameter("resourceType", resourceType)
.addParameter("identifier", identifier)
.build();
client = HttpClients.createDefault();
HttpResponse res = client.execute(new HttpPost(postURI));
if (res.getStatusLine()!=null) {
statusCode = res.getStatusLine().getStatusCode();
}
HttpEntity entity = res.getEntity();
result = EntityUtils.toString(entity);
return result;
}
private List<String> selectIdentifiers(String datasource, String collectionId, String resourceType) throws Exception {
int statusCode = -1;
String response = "";
List<String> identifiers = null;
String endpoint = getSelectIdentifiersEndpoint();
HttpClient client = null;
URI getURI = new URIBuilder(endpoint)
.addParameter("datasource", datasource)
.addParameter("collectionId", collectionId)
.addParameter("resourceType", resourceType)
.build();
client = HttpClients.createDefault();
HttpResponse res = client.execute(new HttpGet(getURI));
if (res.getStatusLine()!=null) {
statusCode = res.getStatusLine().getStatusCode();
}
HttpEntity entity = res.getEntity();
String content = EntityUtils.toString(entity);
String[] identifiersStr = content.split(",");
identifiers = Arrays.asList(identifiersStr);
return identifiers;
}
@Override
protected void beforeStart(final Token token) {
token.setProgressProvider(new ProgressProvider() {
@Override
public String getProgressDescription() {
return IndexOnESJobNode.this.currentResourceToIndex + " / " + IndexOnESJobNode.this.totalResourceToIndex;
}
});
}
} }