|
|
|
@ -30,12 +30,11 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
|
|
|
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
|
|
|
import org.elasticsearch.search.SearchHit;
|
|
|
|
|
import org.gcube.common.scope.api.ScopeProvider;
|
|
|
|
|
import org.gcube.portal.databook.server.DBCassandraAstyanaxImpl;
|
|
|
|
|
import org.gcube.portal.databook.server.DatabookStore;
|
|
|
|
|
import org.gcube.portal.databook.shared.Attachment;
|
|
|
|
|
import org.gcube.portal.databook.shared.Comment;
|
|
|
|
|
import org.gcube.portal.databook.shared.EnhancedFeed;
|
|
|
|
|
import org.gcube.portal.databook.shared.Feed;
|
|
|
|
|
import org.gcube.social_networking.social_networking_client_library.LibClient;
|
|
|
|
|
import org.gcube.social_networking.socialnetworking.model.shared.Attachment;
|
|
|
|
|
import org.gcube.social_networking.socialnetworking.model.shared.Comment;
|
|
|
|
|
import org.gcube.social_networking.socialnetworking.model.shared.EnhancedPost;
|
|
|
|
|
import org.gcube.social_networking.socialnetworking.model.shared.Post;
|
|
|
|
|
import org.gcube.socialnetworking.social_data_indexing_common.ex.BulkInsertionFailedException;
|
|
|
|
|
import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster;
|
|
|
|
|
import org.gcube.socialnetworking.social_data_indexing_common.utils.IndexFields;
|
|
|
|
@ -59,7 +58,10 @@ public class SocialDataIndexerPlugin extends Plugin {
|
|
|
|
|
private List<Integer> portNumbers;
|
|
|
|
|
|
|
|
|
|
private TransportClient client;
|
|
|
|
|
private DatabookStore store;
|
|
|
|
|
|
|
|
|
|
//possible bug
|
|
|
|
|
//should the context be specified to the client??!
|
|
|
|
|
private LibClient libClient;
|
|
|
|
|
|
|
|
|
|
private int count = 0;
|
|
|
|
|
|
|
|
|
@ -82,7 +84,7 @@ public class SocialDataIndexerPlugin extends Plugin {
|
|
|
|
|
logger.info("Scope read is " + scope);
|
|
|
|
|
|
|
|
|
|
// connection to cassandra (remove the first / from the scope)
|
|
|
|
|
store = new DBCassandraAstyanaxImpl(scope.replaceFirst("/", ""));
|
|
|
|
|
libClient = new LibClient();
|
|
|
|
|
|
|
|
|
|
// retrieve ElasticSearch Endpoint and set hosts/port numbers (remove the first / from the scope)
|
|
|
|
|
ElasticSearchRunningCluster elasticCluster = new ElasticSearchRunningCluster(scope.replaceFirst("/", ""));
|
|
|
|
@ -129,17 +131,17 @@ public class SocialDataIndexerPlugin extends Plugin {
|
|
|
|
|
logger.info("Connection to ElasticSearch cluster done. Synchronization starts running...");
|
|
|
|
|
final long init = System.currentTimeMillis();
|
|
|
|
|
|
|
|
|
|
// we build a index reachable under the path /social/enhanced_feed
|
|
|
|
|
List<String> vreIds = store.getAllVREIds();
|
|
|
|
|
// we build a index reachable under the path /social/enhanced_post
|
|
|
|
|
List<String> vreIds = libClient.getAllVREIdsLib();
|
|
|
|
|
|
|
|
|
|
// save feeds & comments
|
|
|
|
|
// save posts & comments
|
|
|
|
|
for (String vreID : vreIds) {
|
|
|
|
|
try{
|
|
|
|
|
List<Feed> feeds = store.getAllFeedsByVRE(vreID);
|
|
|
|
|
addEnhancedFeedsInBulk(feeds, init);
|
|
|
|
|
logger.info("Number of indexed feeds is " + feeds.size() + " for vre " + vreID);
|
|
|
|
|
List<Post> posts = libClient.getAllPostsByVRELib(vreID);
|
|
|
|
|
addEnhancedPostsInBulk(posts, init);
|
|
|
|
|
logger.info("Number of indexed posts is " + posts.size() + " for vre " + vreID);
|
|
|
|
|
}catch(Exception e){
|
|
|
|
|
logger.error("Exception while saving feeds/comments into the index for vre " + vreID, e);
|
|
|
|
|
logger.error("Exception while saving posts/comments into the index for vre " + vreID, e);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -166,23 +168,17 @@ public class SocialDataIndexerPlugin extends Plugin {
|
|
|
|
|
logger.info("Closing connection to elasticsearch cluster. " + client.toString());
|
|
|
|
|
client.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// close connection to cassandra
|
|
|
|
|
if(store != null){
|
|
|
|
|
logger.info("Closing connection to cassandra nodes. " + store.toString());
|
|
|
|
|
store.closeConnection();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Add feeds into the elasticsearch index.
|
|
|
|
|
* @param feeds
|
|
|
|
|
* Add posts into the elasticsearch index.
|
|
|
|
|
* @param posts
|
|
|
|
|
* @param init is the timestamp that will be put in the document
|
|
|
|
|
* @throws BulkInsertionFailedException
|
|
|
|
|
*/
|
|
|
|
|
private void addEnhancedFeedsInBulk(List<Feed> feeds, final long init) throws BulkInsertionFailedException {
|
|
|
|
|
logger.debug("Starting bulk insert enhanced feeds operation");
|
|
|
|
|
private void addEnhancedPostsInBulk(List<Post> posts, final long init) throws BulkInsertionFailedException {
|
|
|
|
|
logger.debug("Starting bulk insert enhanced posts operation");
|
|
|
|
|
BulkProcessor bulkProcessor = BulkProcessor.builder(
|
|
|
|
|
client,
|
|
|
|
|
new BulkProcessor.Listener() {
|
|
|
|
@ -227,15 +223,15 @@ public class SocialDataIndexerPlugin extends Plugin {
|
|
|
|
|
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8))
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
// save feeds
|
|
|
|
|
for (Feed feed: feeds) {
|
|
|
|
|
// save posts
|
|
|
|
|
for (Post post: posts) {
|
|
|
|
|
|
|
|
|
|
String enhFeedUUID = null;
|
|
|
|
|
String enhPostUUID = null;
|
|
|
|
|
try{
|
|
|
|
|
// enhance and convert
|
|
|
|
|
String json = enhanceAndConvertToJson(feed);
|
|
|
|
|
enhFeedUUID = feed.getKey();
|
|
|
|
|
IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhFeedUUID)// set timestamp
|
|
|
|
|
String json = enhanceAndConvertToJson(post);
|
|
|
|
|
enhPostUUID = post.getKey();
|
|
|
|
|
IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhPostUUID)// set timestamp
|
|
|
|
|
.timestamp(String.valueOf(init)) // add json object
|
|
|
|
|
.source(json);
|
|
|
|
|
|
|
|
|
@ -244,7 +240,7 @@ public class SocialDataIndexerPlugin extends Plugin {
|
|
|
|
|
|
|
|
|
|
count++;
|
|
|
|
|
}catch(Exception e){
|
|
|
|
|
logger.error("Skip inserting feed with id " + enhFeedUUID, e);
|
|
|
|
|
logger.error("Skip inserting post with id " + enhPostUUID, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -257,48 +253,48 @@ public class SocialDataIndexerPlugin extends Plugin {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Build enhanced feed and convert to json
|
|
|
|
|
* @param feed to enhanced and convert
|
|
|
|
|
* Build enhanced post and convert to json
|
|
|
|
|
* @param post to enhanced and convert
|
|
|
|
|
* @return json object
|
|
|
|
|
* @throws Exception
|
|
|
|
|
*/
|
|
|
|
|
private String enhanceAndConvertToJson(Feed feed) throws Exception {
|
|
|
|
|
private String enhanceAndConvertToJson(Post post) throws Exception {
|
|
|
|
|
|
|
|
|
|
boolean isMultiFileUpload = feed.isMultiFileUpload();
|
|
|
|
|
boolean isMultiFileUpload = post.isMultiFileUpload();
|
|
|
|
|
|
|
|
|
|
// retrieve attachments
|
|
|
|
|
ArrayList<Attachment> attachments = new ArrayList<Attachment>();
|
|
|
|
|
if (isMultiFileUpload) {
|
|
|
|
|
logger.debug("Retrieving attachments for feed with id="+feed.getKey());
|
|
|
|
|
attachments = (ArrayList<Attachment>) store.getAttachmentsByFeedId(feed.getKey());
|
|
|
|
|
logger.debug("Retrieving attachments for post with id="+post.getKey());
|
|
|
|
|
attachments = (ArrayList<Attachment>) libClient.getAttachmentsByPostIdLib(post.getKey());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// retrieve comments
|
|
|
|
|
ArrayList<Comment> comments = getAllCommentsByFeed(feed.getKey());
|
|
|
|
|
ArrayList<Comment> comments = getAllCommentsByPost(post.getKey());
|
|
|
|
|
|
|
|
|
|
// build enhanced feed
|
|
|
|
|
EnhancedFeed enFeed = new EnhancedFeed(feed, false, false, comments, attachments);
|
|
|
|
|
// build enhanced post
|
|
|
|
|
EnhancedPost enPost = new EnhancedPost(post, false, false, comments, attachments);
|
|
|
|
|
|
|
|
|
|
// convert to json
|
|
|
|
|
ObjectMapper mapper = new ObjectMapper();
|
|
|
|
|
return mapper.writeValueAsString(enFeed);
|
|
|
|
|
return mapper.writeValueAsString(enPost);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* retrieve and sort comments given a feed id
|
|
|
|
|
* @param feedid
|
|
|
|
|
* retrieve and sort comments given a post id
|
|
|
|
|
* @param postid
|
|
|
|
|
* @return
|
|
|
|
|
*/
|
|
|
|
|
private ArrayList<Comment> getAllCommentsByFeed(String feedid) {
|
|
|
|
|
// logger.debug("Asking comments for " + feedid);
|
|
|
|
|
ArrayList<Comment> toReturn = (ArrayList<Comment>) store.getAllCommentByFeed(feedid);
|
|
|
|
|
private ArrayList<Comment> getAllCommentsByPost(String postid) {
|
|
|
|
|
// logger.debug("Asking comments for " + postid);
|
|
|
|
|
ArrayList<Comment> toReturn = (ArrayList<Comment>) libClient.getAllCommentsByPostIdLib(postid);
|
|
|
|
|
Collections.sort(toReturn);
|
|
|
|
|
return toReturn;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Delete disabled feeds in the index
|
|
|
|
|
* @param timestamp delete feeds whose _timestamp is lower than timestamp
|
|
|
|
|
* Delete disabled posts in the index
|
|
|
|
|
* @param timestamp delete posts whose _timestamp is lower than timestamp
|
|
|
|
|
* @return
|
|
|
|
|
*/
|
|
|
|
|
public void deleteDocumentsWithTimestampLowerThan(long timestamp) {
|
|
|
|
@ -332,7 +328,7 @@ public class SocialDataIndexerPlugin extends Plugin {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// more enhanced feeds requested
|
|
|
|
|
// more enhanced posts requested
|
|
|
|
|
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
|
|
|
|
|
|
|
|
|
|
//Break condition: No hits are returned
|
|
|
|
|