Compare commits

...

2 Commits

3 changed files with 49 additions and 62 deletions

View File

@ -5,6 +5,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [v4.0.0]
- Communicate with persistence through social networking service
- Use social client and remove dependency on social networking library
## [v3.1.0-SNAPSHOT]

14
pom.xml
View File

@ -52,23 +52,13 @@
<!-- <scope>compile</scope> -->
<!-- </dependency> -->
<dependency>
<groupId>org.gcube.portal</groupId>
<artifactId>social-library-stubs</artifactId>
<version>[1.0.0-SNAPSHOT, 2.0.0)</version>
<scope>compile</scope>
</dependency>
<dependency>
<!--needs to be provided -->
<groupId>org.gcube.social-networking</groupId>
<artifactId>social-service-client</artifactId>
<version>[2.0.0-SNAPSHOT, 3.0.0)</version>
<exclusions>
<exclusion>
<groupId>org.gcube.portal</groupId>
<artifactId>social-networking-library</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<!--needs to be provided -->
<groupId>org.gcube.social-networking</groupId>
<artifactId>social-service-model</artifactId>
<version>[1.2.0-SNAPSHOT, 2.0.0)</version>

View File

@ -30,12 +30,11 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.portal.databook.server.DBCassandraAstyanaxImpl;
import org.gcube.portal.databook.server.DatabookStore;
import org.gcube.portal.databook.shared.Attachment;
import org.gcube.portal.databook.shared.Comment;
import org.gcube.portal.databook.shared.EnhancedFeed;
import org.gcube.portal.databook.shared.Feed;
import org.gcube.social_networking.social_networking_client_library.LibClient;
import org.gcube.social_networking.socialnetworking.model.shared.Attachment;
import org.gcube.social_networking.socialnetworking.model.shared.Comment;
import org.gcube.social_networking.socialnetworking.model.shared.EnhancedPost;
import org.gcube.social_networking.socialnetworking.model.shared.Post;
import org.gcube.socialnetworking.social_data_indexing_common.ex.BulkInsertionFailedException;
import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster;
import org.gcube.socialnetworking.social_data_indexing_common.utils.IndexFields;
@ -59,7 +58,10 @@ public class SocialDataIndexerPlugin extends Plugin {
private List<Integer> portNumbers;
private TransportClient client;
private DatabookStore store;
//possible bug
//should the context be specified to the client??!
private LibClient libClient;
private int count = 0;
@ -82,7 +84,7 @@ public class SocialDataIndexerPlugin extends Plugin {
logger.info("Scope read is " + scope);
// connection to cassandra (remove the first / from the scope)
store = new DBCassandraAstyanaxImpl(scope.replaceFirst("/", ""));
libClient = new LibClient();
// retrieve ElasticSearch Endpoint and set hosts/port numbers (remove the first / from the scope)
ElasticSearchRunningCluster elasticCluster = new ElasticSearchRunningCluster(scope.replaceFirst("/", ""));
@ -129,17 +131,17 @@ public class SocialDataIndexerPlugin extends Plugin {
logger.info("Connection to ElasticSearch cluster done. Synchronization starts running...");
final long init = System.currentTimeMillis();
// we build a index reachable under the path /social/enhanced_feed
List<String> vreIds = store.getAllVREIds();
// we build a index reachable under the path /social/enhanced_post
List<String> vreIds = libClient.getAllVREIdsLib();
// save feeds & comments
// save posts & comments
for (String vreID : vreIds) {
try{
List<Feed> feeds = store.getAllFeedsByVRE(vreID);
addEnhancedFeedsInBulk(feeds, init);
logger.info("Number of indexed feeds is " + feeds.size() + " for vre " + vreID);
List<Post> posts = libClient.getAllPostsByVRELib(vreID);
addEnhancedPostsInBulk(posts, init);
logger.info("Number of indexed posts is " + posts.size() + " for vre " + vreID);
}catch(Exception e){
logger.error("Exception while saving feeds/comments into the index for vre " + vreID, e);
logger.error("Exception while saving posts/comments into the index for vre " + vreID, e);
continue;
}
@ -166,23 +168,17 @@ public class SocialDataIndexerPlugin extends Plugin {
logger.info("Closing connection to elasticsearch cluster. " + client.toString());
client.close();
}
// close connection to cassandra
if(store != null){
logger.info("Closing connection to cassandra nodes. " + store.toString());
store.closeConnection();
}
}
}
/**
* Add feeds into the elasticsearch index.
* @param feeds
* Add posts into the elasticsearch index.
* @param posts
* @param init is the timestamp that will be put in the document
* @throws BulkInsertionFailedException
*/
private void addEnhancedFeedsInBulk(List<Feed> feeds, final long init) throws BulkInsertionFailedException {
logger.debug("Starting bulk insert enhanced feeds operation");
private void addEnhancedPostsInBulk(List<Post> posts, final long init) throws BulkInsertionFailedException {
logger.debug("Starting bulk insert enhanced posts operation");
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@ -227,15 +223,15 @@ public class SocialDataIndexerPlugin extends Plugin {
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8))
.build();
// save feeds
for (Feed feed: feeds) {
// save posts
for (Post post: posts) {
String enhFeedUUID = null;
String enhPostUUID = null;
try{
// enhance and convert
String json = enhanceAndConvertToJson(feed);
enhFeedUUID = feed.getKey();
IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhFeedUUID)// set timestamp
String json = enhanceAndConvertToJson(post);
enhPostUUID = post.getKey();
IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhPostUUID)// set timestamp
.timestamp(String.valueOf(init)) // add json object
.source(json);
@ -244,7 +240,7 @@ public class SocialDataIndexerPlugin extends Plugin {
count++;
}catch(Exception e){
logger.error("Skip inserting feed with id " + enhFeedUUID, e);
logger.error("Skip inserting post with id " + enhPostUUID, e);
}
}
@ -257,48 +253,48 @@ public class SocialDataIndexerPlugin extends Plugin {
}
/**
* Build enhanced feed and convert to json
* @param feed to enhanced and convert
* Build enhanced post and convert to json
* @param post to enhanced and convert
* @return json object
* @throws Exception
*/
private String enhanceAndConvertToJson(Feed feed) throws Exception {
private String enhanceAndConvertToJson(Post post) throws Exception {
boolean isMultiFileUpload = feed.isMultiFileUpload();
boolean isMultiFileUpload = post.isMultiFileUpload();
// retrieve attachments
ArrayList<Attachment> attachments = new ArrayList<Attachment>();
if (isMultiFileUpload) {
logger.debug("Retrieving attachments for feed with id="+feed.getKey());
attachments = (ArrayList<Attachment>) store.getAttachmentsByFeedId(feed.getKey());
logger.debug("Retrieving attachments for post with id="+post.getKey());
attachments = (ArrayList<Attachment>) libClient.getAttachmentsByPostIdLib(post.getKey());
}
// retrieve comments
ArrayList<Comment> comments = getAllCommentsByFeed(feed.getKey());
ArrayList<Comment> comments = getAllCommentsByPost(post.getKey());
// build enhanced feed
EnhancedFeed enFeed = new EnhancedFeed(feed, false, false, comments, attachments);
// build enhanced post
EnhancedPost enPost = new EnhancedPost(post, false, false, comments, attachments);
// convert to json
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(enFeed);
return mapper.writeValueAsString(enPost);
}
/**
* retrieve and sort comments given a feed id
* @param feedid
* retrieve and sort comments given a post id
* @param postid
* @return
*/
private ArrayList<Comment> getAllCommentsByFeed(String feedid) {
// logger.debug("Asking comments for " + feedid);
ArrayList<Comment> toReturn = (ArrayList<Comment>) store.getAllCommentByFeed(feedid);
private ArrayList<Comment> getAllCommentsByPost(String postid) {
// logger.debug("Asking comments for " + postid);
ArrayList<Comment> toReturn = (ArrayList<Comment>) libClient.getAllCommentsByPostIdLib(postid);
Collections.sort(toReturn);
return toReturn;
}
/**
* Delete disabled feeds in the index
* @param timestamp delete feeds whose _timestamp is lower than timestamp
* Delete disabled posts in the index
* @param timestamp delete posts whose _timestamp is lower than timestamp
* @return
*/
public void deleteDocumentsWithTimestampLowerThan(long timestamp) {
@ -332,7 +328,7 @@ public class SocialDataIndexerPlugin extends Plugin {
}
// more enhanced feeds requested
// more enhanced posts requested
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
//Break condition: No hits are returned