daje
This commit is contained in:
parent
b352a2ef16
commit
975b7d7e29
Binary file not shown.
|
@ -1,354 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
package org.gcube.socialnetworking.socialdataindexer;
|
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
|
||||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
|
||||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
|
||||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
|
||||||
import org.elasticsearch.action.delete.DeleteResponse;
|
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
|
||||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
|
||||||
import org.elasticsearch.client.transport.TransportClient;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
|
||||||
import org.elasticsearch.search.SearchHit;
|
|
||||||
import org.gcube.common.scope.api.ScopeProvider;
|
|
||||||
import org.gcube.portal.databook.server.DBCassandraAstyanaxImpl;
|
|
||||||
import org.gcube.portal.databook.server.DatabookStore;
|
|
||||||
import org.gcube.portal.databook.shared.Attachment;
|
|
||||||
import org.gcube.portal.databook.shared.Comment;
|
|
||||||
import org.gcube.portal.databook.shared.EnhancedFeed;
|
|
||||||
import org.gcube.portal.databook.shared.Feed;
|
|
||||||
import org.gcube.socialnetworking.social_data_indexing_common.ex.BulkInsertionFailedException;
|
|
||||||
import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster;
|
|
||||||
import org.gcube.socialnetworking.social_data_indexing_common.utils.IndexFields;
|
|
||||||
import org.gcube.vremanagement.executor.plugin.Plugin;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The SocialDataIndexerPlugin synchronizes and indexes data coming from the cassandra
|
|
||||||
* cluster in the elasticsearch engine.
|
|
||||||
* @author Costantino Perciante at ISTI-CNR
|
|
||||||
* @author Luca Frosini (ISTI-CNR)
|
|
||||||
*/
|
|
||||||
public class SocialDataIndexerPlugin extends Plugin {
|
|
||||||
|
|
||||||
//Logger
|
|
||||||
private static Logger logger = LoggerFactory.getLogger(SocialDataIndexerPlugin.class);
|
|
||||||
|
|
||||||
private String clusterName;
|
|
||||||
private List<String> hostsToContact;
|
|
||||||
private List<Integer> portNumbers;
|
|
||||||
|
|
||||||
private TransportClient client;
|
|
||||||
private DatabookStore store;
|
|
||||||
|
|
||||||
private int count = 0;
|
|
||||||
|
|
||||||
public SocialDataIndexerPlugin(){
|
|
||||||
super();
|
|
||||||
logger.debug("{} contructor", this.getClass().getSimpleName());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**{@inheritDoc}
|
|
||||||
* @throws Exception */
|
|
||||||
@Override
|
|
||||||
public void launch(Map<String, Object> inputs) throws Exception{
|
|
||||||
|
|
||||||
try{
|
|
||||||
|
|
||||||
logger.info("Reading scope from ScopeProvider");
|
|
||||||
|
|
||||||
String scope = ScopeProvider.instance.get();
|
|
||||||
|
|
||||||
logger.info("Scope read is " + scope);
|
|
||||||
|
|
||||||
// connection to cassandra (remove the first / from the scope)
|
|
||||||
store = new DBCassandraAstyanaxImpl(scope.replaceFirst("/", ""));
|
|
||||||
|
|
||||||
// retrieve ElasticSearch Endpoint and set hosts/port numbers (remove the first / from the scope)
|
|
||||||
ElasticSearchRunningCluster elasticCluster = new ElasticSearchRunningCluster(scope.replaceFirst("/", ""));
|
|
||||||
|
|
||||||
// save info
|
|
||||||
clusterName = elasticCluster.getClusterName();
|
|
||||||
hostsToContact = elasticCluster.getHosts();
|
|
||||||
portNumbers = elasticCluster.getPorts();
|
|
||||||
|
|
||||||
logger.info("Creating elasticsearch client connection for hosts = " + hostsToContact + ", ports = " + portNumbers + " and "
|
|
||||||
+ " cluster's name = " + clusterName);
|
|
||||||
|
|
||||||
// set cluster's name to check and the sniff property to true.
|
|
||||||
// Cluster's name: each node must have this name.
|
|
||||||
// Sniff property: allows the client to recover cluster's structure.
|
|
||||||
// Look at https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html
|
|
||||||
Settings settings = Settings.settingsBuilder()
|
|
||||||
.put("cluster.name", this.clusterName) // force unique cluster's name check
|
|
||||||
.put("client.transport.sniff", true)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// build the client
|
|
||||||
client = TransportClient.builder().settings(settings).build();
|
|
||||||
|
|
||||||
// add the nodes to contact
|
|
||||||
int reachableHosts = 0;
|
|
||||||
for (int i = 0; i < hostsToContact.size(); i++){
|
|
||||||
try {
|
|
||||||
client.addTransportAddress(
|
|
||||||
new InetSocketTransportAddress(
|
|
||||||
InetAddress.getByName(hostsToContact.get(i)), portNumbers.get(i))
|
|
||||||
);
|
|
||||||
reachableHosts ++;
|
|
||||||
} catch (UnknownHostException e) {
|
|
||||||
logger.error("Error while adding " + hostsToContact.get(i) + ":" + portNumbers.get(i) + " as host to be contacted.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(reachableHosts == 0){
|
|
||||||
logger.error("Unable to reach elasticsearch cluster. Exiting ...");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Connection to ElasticSearch cluster done. Synchronization starts running...");
|
|
||||||
final long init = System.currentTimeMillis();
|
|
||||||
|
|
||||||
// we build a index reachable under the path /social/enhanced_feed
|
|
||||||
List<String> vreIds = store.getAllVREIds();
|
|
||||||
|
|
||||||
// save feeds & comments
|
|
||||||
for (String vreID : vreIds) {
|
|
||||||
try{
|
|
||||||
List<Feed> feeds = store.getAllFeedsByVRE(vreID);
|
|
||||||
addEnhancedFeedsInBulk(feeds, init);
|
|
||||||
logger.info("Number of indexed feeds is " + feeds.size() + " for vre " + vreID);
|
|
||||||
}catch(Exception e){
|
|
||||||
logger.error("Exception while saving feeds/comments into the index for vre " + vreID, e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Inserted " + count + " docs into the index");
|
|
||||||
|
|
||||||
// refresh data (note that the index must be refreshed according to the new value of _timestamp)
|
|
||||||
client.admin().indices().prepareRefresh().execute().actionGet();
|
|
||||||
|
|
||||||
// delete documents with timestamp lower than init
|
|
||||||
deleteDocumentsWithTimestampLowerThan(init);
|
|
||||||
|
|
||||||
long end = System.currentTimeMillis();
|
|
||||||
logger.info("Synchronization thread ends running. It took " + (end - init) + " milliseconds " +
|
|
||||||
" that is " + (double)(end - init)/(1000.0 * 60.0) + " minutes.");
|
|
||||||
|
|
||||||
}catch(Exception e){
|
|
||||||
logger.error("Error while synchronizing data.", e);
|
|
||||||
throw e;
|
|
||||||
}finally{
|
|
||||||
// close connection to elasticsearch
|
|
||||||
if(client != null){
|
|
||||||
logger.info("Closing connection to elasticsearch cluster. " + client.toString());
|
|
||||||
client.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
// close connection to cassandra
|
|
||||||
if(store != null){
|
|
||||||
logger.info("Closing connection to cassandra nodes. " + store.toString());
|
|
||||||
store.closeConnection();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add feeds into the elasticsearch index.
|
|
||||||
* @param feeds
|
|
||||||
* @param init is the timestamp that will be put in the document
|
|
||||||
* @throws BulkInsertionFailedException
|
|
||||||
*/
|
|
||||||
private void addEnhancedFeedsInBulk(List<Feed> feeds, final long init) throws BulkInsertionFailedException {
|
|
||||||
logger.debug("Starting bulk insert enhanced feeds operation");
|
|
||||||
BulkProcessor bulkProcessor = BulkProcessor.builder(
|
|
||||||
client,
|
|
||||||
new BulkProcessor.Listener() {
|
|
||||||
@Override
|
|
||||||
public void beforeBulk(long executionId,
|
|
||||||
BulkRequest request) {
|
|
||||||
logger.debug("Going to execute new bulk composed of {} actions", request.numberOfActions());
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public void afterBulk(long executionId,
|
|
||||||
BulkRequest request,
|
|
||||||
BulkResponse response) {
|
|
||||||
logger.debug("Executed bulk composed of {} actions", request.numberOfActions());
|
|
||||||
if (response.hasFailures()) {
|
|
||||||
logger.warn("There was failures while executing bulk", response.buildFailureMessage());
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
for (BulkItemResponse item : response.getItems()) {
|
|
||||||
if (item.isFailed()) {
|
|
||||||
logger.debug("Error for {}/{}/{} for {} operation: {}", item.getIndex(),
|
|
||||||
item.getType(), item.getId(), item.getOpType(), item.getFailureMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterBulk(long executionId,
|
|
||||||
BulkRequest request,
|
|
||||||
Throwable failure) {
|
|
||||||
logger.error("Error executing bulk", failure);
|
|
||||||
if(failure instanceof NoNodeAvailableException){
|
|
||||||
throw new RuntimeException("No node available. Exiting...");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.setBulkActions(1000)
|
|
||||||
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
|
|
||||||
.setFlushInterval(TimeValue.timeValueSeconds(5))
|
|
||||||
.setConcurrentRequests(0)
|
|
||||||
.setBackoffPolicy(
|
|
||||||
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// save feeds
|
|
||||||
for (Feed feed: feeds) {
|
|
||||||
|
|
||||||
String enhFeedUUID = null;
|
|
||||||
try{
|
|
||||||
// enhance and convert
|
|
||||||
String json = enhanceAndConvertToJson(feed);
|
|
||||||
enhFeedUUID = feed.getKey();
|
|
||||||
IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhFeedUUID)// set timestamp
|
|
||||||
.timestamp(String.valueOf(init)) // add json object
|
|
||||||
.source(json);
|
|
||||||
|
|
||||||
// add
|
|
||||||
bulkProcessor.add(ind);
|
|
||||||
|
|
||||||
count++;
|
|
||||||
}catch(Exception e){
|
|
||||||
logger.error("Skip inserting feed with id " + enhFeedUUID, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// close bulk operations
|
|
||||||
try {
|
|
||||||
bulkProcessor.awaitClose(60000, TimeUnit.MILLISECONDS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
logger.debug("Interrupted while waiting for awaitClose()", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Build enhanced feed and convert to json
|
|
||||||
* @param feed to enhanced and convert
|
|
||||||
* @return json object
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
private String enhanceAndConvertToJson(Feed feed) throws Exception {
|
|
||||||
|
|
||||||
boolean isMultiFileUpload = feed.isMultiFileUpload();
|
|
||||||
|
|
||||||
// retrieve attachments
|
|
||||||
ArrayList<Attachment> attachments = new ArrayList<Attachment>();
|
|
||||||
if (isMultiFileUpload) {
|
|
||||||
logger.debug("Retrieving attachments for feed with id="+feed.getKey());
|
|
||||||
attachments = (ArrayList<Attachment>) store.getAttachmentsByFeedId(feed.getKey());
|
|
||||||
}
|
|
||||||
|
|
||||||
// retrieve comments
|
|
||||||
ArrayList<Comment> comments = getAllCommentsByFeed(feed.getKey());
|
|
||||||
|
|
||||||
// build enhanced feed
|
|
||||||
EnhancedFeed enFeed = new EnhancedFeed(feed, false, false, comments, attachments);
|
|
||||||
|
|
||||||
// convert to json
|
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
|
||||||
return mapper.writeValueAsString(enFeed);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* retrieve and sort comments given a feed id
|
|
||||||
* @param feedid
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
private ArrayList<Comment> getAllCommentsByFeed(String feedid) {
|
|
||||||
// logger.debug("Asking comments for " + feedid);
|
|
||||||
ArrayList<Comment> toReturn = (ArrayList<Comment>) store.getAllCommentByFeed(feedid);
|
|
||||||
Collections.sort(toReturn);
|
|
||||||
return toReturn;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Delete disabled feeds in the index
|
|
||||||
* @param timestamp delete feeds whose _timestamp is lower than timestamp
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
public void deleteDocumentsWithTimestampLowerThan(long timestamp) {
|
|
||||||
|
|
||||||
logger.debug("Removing docs with timestamp lower than " + timestamp);
|
|
||||||
|
|
||||||
// query on _timestamp field
|
|
||||||
BoolQueryBuilder filter = QueryBuilders.boolQuery();
|
|
||||||
filter.must(QueryBuilders.matchAllQuery());
|
|
||||||
filter.filter(QueryBuilders.rangeQuery("_timestamp").gte(0).lt(timestamp));
|
|
||||||
|
|
||||||
SearchResponse scrollResp = client.prepareSearch(IndexFields.INDEX_NAME)
|
|
||||||
.setSize(100) // get 100 elements at most at time
|
|
||||||
.setScroll(new TimeValue(60000)) // keep alive this query for 1 minute
|
|
||||||
.setQuery(filter)
|
|
||||||
.execute()
|
|
||||||
.actionGet();
|
|
||||||
|
|
||||||
int deleteDocs = 0;
|
|
||||||
|
|
||||||
//Scroll until no more hits are returned
|
|
||||||
while (true) {
|
|
||||||
|
|
||||||
for (SearchHit hit : scrollResp.getHits().getHits()) {
|
|
||||||
|
|
||||||
String docID = hit.getId();
|
|
||||||
DeleteResponse response = client.prepareDelete(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, docID).get();
|
|
||||||
//logger.debug("deleting doc with id = " + docID + "...found? " + response.isFound()); // found of course..
|
|
||||||
if(response.isFound())
|
|
||||||
deleteDocs ++;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// more enhanced feeds requested
|
|
||||||
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
|
|
||||||
|
|
||||||
//Break condition: No hits are returned
|
|
||||||
if (scrollResp.getHits().getHits().length == 0) {
|
|
||||||
logger.debug("No more hits to delete");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Number of delete documents is " + deleteDocs);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**{@inheritDoc}*/
|
|
||||||
@Override
|
|
||||||
protected void onStop() throws Exception {
|
|
||||||
logger.debug("onStop()");
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
Binary file not shown.
|
@ -1,176 +0,0 @@
|
||||||
package org.gcube.socialnetworking.socialdataindexer.utils;
|
|
||||||
|
|
||||||
import static org.gcube.resources.discovery.icclient.ICFactory.client;
|
|
||||||
import static org.gcube.resources.discovery.icclient.ICFactory.queryFor;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.HttpURLConnection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.http.Header;
|
|
||||||
import org.apache.http.HttpResponse;
|
|
||||||
import org.apache.http.client.ClientProtocolException;
|
|
||||||
import org.apache.http.client.methods.HttpPost;
|
|
||||||
import org.apache.http.entity.ContentType;
|
|
||||||
import org.apache.http.entity.StringEntity;
|
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
|
||||||
import org.apache.http.impl.client.HttpClientBuilder;
|
|
||||||
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
|
||||||
import org.gcube.common.resources.gcore.GCoreEndpoint;
|
|
||||||
import org.gcube.common.scope.api.ScopeProvider;
|
|
||||||
import org.gcube.resources.discovery.client.api.DiscoveryClient;
|
|
||||||
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
|
|
||||||
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
|
|
||||||
import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
|
|
||||||
import org.json.simple.JSONObject;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send a notification to the interested person.
|
|
||||||
* @author Costantino Perciante at ISTI-CNR (costantino.perciante@isti.cnr.it)
|
|
||||||
*/
|
|
||||||
public class SendNotification extends PluginStateNotification {
|
|
||||||
|
|
||||||
private Map<String, String> pluginInputs;
|
|
||||||
private static Logger logger = LoggerFactory.getLogger(SendNotification.class);
|
|
||||||
private static final String NOTIFY_METHOD = "2/notifications/notify-job-status";
|
|
||||||
private static final String resource = "jersey-servlet";
|
|
||||||
private static final String serviceName = "SocialNetworking";
|
|
||||||
private static final String serviceClass = "Portal";
|
|
||||||
|
|
||||||
// user to contact on exception via social-networking ws
|
|
||||||
private final static String RECIPIENT_KEY = "recipient";
|
|
||||||
|
|
||||||
// service name
|
|
||||||
private final static String SERVICE_NAME = "Smart-Executor";
|
|
||||||
|
|
||||||
public SendNotification(Map<String, String> inputs) {
|
|
||||||
super(inputs);
|
|
||||||
this.pluginInputs = inputs;
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
|
||||||
public void pluginStateEvolution(PluginStateEvolution pluginStateEvolution,
|
|
||||||
Exception exception) throws Exception {
|
|
||||||
|
|
||||||
switch(pluginStateEvolution.getPluginState()){
|
|
||||||
|
|
||||||
// case DONE:
|
|
||||||
case STOPPED:
|
|
||||||
case FAILED:
|
|
||||||
case DISCARDED:
|
|
||||||
|
|
||||||
// check what happened
|
|
||||||
String recipient = pluginInputs.get(RECIPIENT_KEY);
|
|
||||||
String basePath = discoverEndPoint();
|
|
||||||
logger.info("Recipient of the notification is " + recipient + ". Base path found for the notification service is " + basePath);
|
|
||||||
|
|
||||||
if(basePath != null && recipient != null){
|
|
||||||
|
|
||||||
basePath = basePath.endsWith("/") ? basePath : basePath + "/";
|
|
||||||
basePath += NOTIFY_METHOD + "?gcube-token=" + SecurityTokenProvider.instance.get();
|
|
||||||
basePath = basePath.trim();
|
|
||||||
|
|
||||||
try(CloseableHttpClient httpClient = HttpClientBuilder.create().build()){
|
|
||||||
JSONObject obj = new JSONObject();
|
|
||||||
obj.put("job_id", pluginStateEvolution.getUUID().toString());
|
|
||||||
obj.put("recipient", recipient);
|
|
||||||
obj.put("job_name", pluginStateEvolution.getPluginDefinition().getName());
|
|
||||||
obj.put("service_name", SERVICE_NAME);
|
|
||||||
// if(pluginStateEvolution.getPluginState().equals(PluginState.DONE))
|
|
||||||
// obj.put("status", "SUCCEEDED");
|
|
||||||
// else{
|
|
||||||
obj.put("status", "FAILED");
|
|
||||||
obj.put("status_message", "original status reported by " + SERVICE_NAME + " was " + pluginStateEvolution.getPluginState()
|
|
||||||
+ ". Exception is " + exception != null ? exception.getMessage() : null);
|
|
||||||
// }
|
|
||||||
logger.debug("Request json is going to be " + obj.toJSONString());
|
|
||||||
|
|
||||||
HttpResponse response = performRequest(httpClient, basePath, obj.toJSONString());
|
|
||||||
|
|
||||||
logger.info(response.getStatusLine().getStatusCode() + " and response message is " + response.getStatusLine().getReasonPhrase());
|
|
||||||
|
|
||||||
int status = response.getStatusLine().getStatusCode();
|
|
||||||
|
|
||||||
if(status == HttpURLConnection.HTTP_OK){
|
|
||||||
logger.info("Notification sent");
|
|
||||||
}
|
|
||||||
else if(status == HttpURLConnection.HTTP_MOVED_TEMP
|
|
||||||
|| status == HttpURLConnection.HTTP_MOVED_PERM
|
|
||||||
|| status == HttpURLConnection.HTTP_SEE_OTHER){
|
|
||||||
|
|
||||||
// redirect -> fetch new location
|
|
||||||
Header[] locations = response.getHeaders("Location");
|
|
||||||
Header lastLocation = locations[locations.length - 1];
|
|
||||||
String realLocation = lastLocation.getValue();
|
|
||||||
logger.info("New location is " + realLocation);
|
|
||||||
|
|
||||||
response = performRequest(httpClient, realLocation, obj.toJSONString());
|
|
||||||
|
|
||||||
logger.info(" " + response.getStatusLine().getStatusCode() + " and response message is " + response.getStatusLine().getReasonPhrase());
|
|
||||||
}else
|
|
||||||
logger.warn(" " + response.getStatusLine().getStatusCode() + " and response message is " + response.getStatusLine().getReasonPhrase());
|
|
||||||
}catch (Exception e) {
|
|
||||||
logger.warn("Something failed when trying to notify the user", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default: logger.info("No notification is going to be sent, because the status of the plugin execution is " + pluginStateEvolution.getPluginState().name());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Perform the post/json request
|
|
||||||
* @param httpClient
|
|
||||||
* @param path
|
|
||||||
* @param params
|
|
||||||
* @return
|
|
||||||
* @throws ClientProtocolException
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private static HttpResponse performRequest(CloseableHttpClient httpClient, String path, String params) throws ClientProtocolException, IOException{
|
|
||||||
|
|
||||||
HttpPost request = new HttpPost(path);
|
|
||||||
StringEntity paramsEntity = new StringEntity(params, ContentType.APPLICATION_JSON);
|
|
||||||
request.setEntity(paramsEntity);
|
|
||||||
return httpClient.execute(request);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Discover the social networking service base path.
|
|
||||||
* @return base path of the service.
|
|
||||||
*/
|
|
||||||
private static String discoverEndPoint(){
|
|
||||||
String context = ScopeProvider.instance.get();
|
|
||||||
String basePath = null;
|
|
||||||
try{
|
|
||||||
|
|
||||||
SimpleQuery query = queryFor(GCoreEndpoint.class);
|
|
||||||
query.addCondition(String.format("$resource/Profile/ServiceClass/text() eq '%s'",serviceClass));
|
|
||||||
query.addCondition("$resource/Profile/DeploymentData/Status/text() eq 'ready'");
|
|
||||||
query.addCondition(String.format("$resource/Profile/ServiceName/text() eq '%s'",serviceName));
|
|
||||||
query.setResult("$resource/Profile/AccessPoint/RunningInstanceInterfaces//Endpoint[@EntryName/string() eq \""+resource+"\"]/text()");
|
|
||||||
|
|
||||||
DiscoveryClient<String> client = client();
|
|
||||||
List<String> endpoints = client.submit(query);
|
|
||||||
if (endpoints == null || endpoints.isEmpty())
|
|
||||||
throw new Exception("Cannot retrieve the GCoreEndpoint serviceName: "+serviceName +", serviceClass: " +serviceClass +", in scope: "+context);
|
|
||||||
|
|
||||||
basePath = endpoints.get(0);
|
|
||||||
if(basePath==null)
|
|
||||||
throw new Exception("Endpoint:"+resource+", is null for serviceName: "+serviceName +", serviceClass: " +serviceClass +", in scope: "+context);
|
|
||||||
|
|
||||||
logger.info("found entryname "+basePath+" for ckanResource: "+resource);
|
|
||||||
|
|
||||||
}catch(Exception e){
|
|
||||||
logger.error("Unable to retrieve such service endpoint information!", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return basePath;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Binary file not shown.
|
@ -1,34 +0,0 @@
|
||||||
package org.gcube.socialnetworking.socialdataindexer;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
public class Tests {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Logger
|
|
||||||
*/
|
|
||||||
private static Logger logger = LoggerFactory.getLogger(Tests.class);
|
|
||||||
|
|
||||||
// @Before
|
|
||||||
public void beforeTest(){
|
|
||||||
}
|
|
||||||
|
|
||||||
// @Test
|
|
||||||
public void testLaunch() {
|
|
||||||
logger.debug("Starting to test launch");
|
|
||||||
Map<String, Object> inputs = new HashMap<String, Object>();
|
|
||||||
inputs.put("scope", "gcube");
|
|
||||||
@SuppressWarnings("unused")
|
|
||||||
SocialDataIndexerPlugin plugin = new SocialDataIndexerPlugin();
|
|
||||||
//uncomment for testing purpose
|
|
||||||
//plugin.launch(inputs);
|
|
||||||
logger.debug("-------------- launch test finished");
|
|
||||||
}
|
|
||||||
|
|
||||||
// @After
|
|
||||||
public void after(){
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue