Compare commits

..

2 Commits

3 changed files with 49 additions and 62 deletions

View File

@ -5,6 +5,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [v4.0.0] ## [v4.0.0]
- Communicate with persistence through social networking service - Communicate with persistence through social networking service
- Use social client and remove dependency on social networking library
## [v3.1.0-SNAPSHOT] ## [v3.1.0-SNAPSHOT]

14
pom.xml
View File

@ -52,23 +52,13 @@
<!-- <scope>compile</scope> --> <!-- <scope>compile</scope> -->
<!-- </dependency> --> <!-- </dependency> -->
<dependency> <dependency>
<groupId>org.gcube.portal</groupId> <!--needs to be provided -->
<artifactId>social-library-stubs</artifactId>
<version>[1.0.0-SNAPSHOT, 2.0.0)</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.gcube.social-networking</groupId> <groupId>org.gcube.social-networking</groupId>
<artifactId>social-service-client</artifactId> <artifactId>social-service-client</artifactId>
<version>[2.0.0-SNAPSHOT, 3.0.0)</version> <version>[2.0.0-SNAPSHOT, 3.0.0)</version>
<exclusions>
<exclusion>
<groupId>org.gcube.portal</groupId>
<artifactId>social-networking-library</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<!--needs to be provided -->
<groupId>org.gcube.social-networking</groupId> <groupId>org.gcube.social-networking</groupId>
<artifactId>social-service-model</artifactId> <artifactId>social-service-model</artifactId>
<version>[1.2.0-SNAPSHOT, 2.0.0)</version> <version>[1.2.0-SNAPSHOT, 2.0.0)</version>

View File

@ -30,12 +30,11 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.gcube.common.scope.api.ScopeProvider; import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.portal.databook.server.DBCassandraAstyanaxImpl; import org.gcube.social_networking.social_networking_client_library.LibClient;
import org.gcube.portal.databook.server.DatabookStore; import org.gcube.social_networking.socialnetworking.model.shared.Attachment;
import org.gcube.portal.databook.shared.Attachment; import org.gcube.social_networking.socialnetworking.model.shared.Comment;
import org.gcube.portal.databook.shared.Comment; import org.gcube.social_networking.socialnetworking.model.shared.EnhancedPost;
import org.gcube.portal.databook.shared.EnhancedFeed; import org.gcube.social_networking.socialnetworking.model.shared.Post;
import org.gcube.portal.databook.shared.Feed;
import org.gcube.socialnetworking.social_data_indexing_common.ex.BulkInsertionFailedException; import org.gcube.socialnetworking.social_data_indexing_common.ex.BulkInsertionFailedException;
import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster; import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster;
import org.gcube.socialnetworking.social_data_indexing_common.utils.IndexFields; import org.gcube.socialnetworking.social_data_indexing_common.utils.IndexFields;
@ -59,7 +58,10 @@ public class SocialDataIndexerPlugin extends Plugin {
private List<Integer> portNumbers; private List<Integer> portNumbers;
private TransportClient client; private TransportClient client;
private DatabookStore store;
//possible bug
//should the context be specified to the client??!
private LibClient libClient;
private int count = 0; private int count = 0;
@ -82,7 +84,7 @@ public class SocialDataIndexerPlugin extends Plugin {
logger.info("Scope read is " + scope); logger.info("Scope read is " + scope);
// connection to cassandra (remove the first / from the scope) // connection to cassandra (remove the first / from the scope)
store = new DBCassandraAstyanaxImpl(scope.replaceFirst("/", "")); libClient = new LibClient();
// retrieve ElasticSearch Endpoint and set hosts/port numbers (remove the first / from the scope) // retrieve ElasticSearch Endpoint and set hosts/port numbers (remove the first / from the scope)
ElasticSearchRunningCluster elasticCluster = new ElasticSearchRunningCluster(scope.replaceFirst("/", "")); ElasticSearchRunningCluster elasticCluster = new ElasticSearchRunningCluster(scope.replaceFirst("/", ""));
@ -129,17 +131,17 @@ public class SocialDataIndexerPlugin extends Plugin {
logger.info("Connection to ElasticSearch cluster done. Synchronization starts running..."); logger.info("Connection to ElasticSearch cluster done. Synchronization starts running...");
final long init = System.currentTimeMillis(); final long init = System.currentTimeMillis();
// we build a index reachable under the path /social/enhanced_feed // we build a index reachable under the path /social/enhanced_post
List<String> vreIds = store.getAllVREIds(); List<String> vreIds = libClient.getAllVREIdsLib();
// save feeds & comments // save posts & comments
for (String vreID : vreIds) { for (String vreID : vreIds) {
try{ try{
List<Feed> feeds = store.getAllFeedsByVRE(vreID); List<Post> posts = libClient.getAllPostsByVRELib(vreID);
addEnhancedFeedsInBulk(feeds, init); addEnhancedPostsInBulk(posts, init);
logger.info("Number of indexed feeds is " + feeds.size() + " for vre " + vreID); logger.info("Number of indexed posts is " + posts.size() + " for vre " + vreID);
}catch(Exception e){ }catch(Exception e){
logger.error("Exception while saving feeds/comments into the index for vre " + vreID, e); logger.error("Exception while saving posts/comments into the index for vre " + vreID, e);
continue; continue;
} }
@ -166,23 +168,17 @@ public class SocialDataIndexerPlugin extends Plugin {
logger.info("Closing connection to elasticsearch cluster. " + client.toString()); logger.info("Closing connection to elasticsearch cluster. " + client.toString());
client.close(); client.close();
} }
// close connection to cassandra
if(store != null){
logger.info("Closing connection to cassandra nodes. " + store.toString());
store.closeConnection();
}
} }
} }
/** /**
* Add feeds into the elasticsearch index. * Add posts into the elasticsearch index.
* @param feeds * @param posts
* @param init is the timestamp that will be put in the document * @param init is the timestamp that will be put in the document
* @throws BulkInsertionFailedException * @throws BulkInsertionFailedException
*/ */
private void addEnhancedFeedsInBulk(List<Feed> feeds, final long init) throws BulkInsertionFailedException { private void addEnhancedPostsInBulk(List<Post> posts, final long init) throws BulkInsertionFailedException {
logger.debug("Starting bulk insert enhanced feeds operation"); logger.debug("Starting bulk insert enhanced posts operation");
BulkProcessor bulkProcessor = BulkProcessor.builder( BulkProcessor bulkProcessor = BulkProcessor.builder(
client, client,
new BulkProcessor.Listener() { new BulkProcessor.Listener() {
@ -227,15 +223,15 @@ public class SocialDataIndexerPlugin extends Plugin {
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8)) BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8))
.build(); .build();
// save feeds // save posts
for (Feed feed: feeds) { for (Post post: posts) {
String enhFeedUUID = null; String enhPostUUID = null;
try{ try{
// enhance and convert // enhance and convert
String json = enhanceAndConvertToJson(feed); String json = enhanceAndConvertToJson(post);
enhFeedUUID = feed.getKey(); enhPostUUID = post.getKey();
IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhFeedUUID)// set timestamp IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhPostUUID)// set timestamp
.timestamp(String.valueOf(init)) // add json object .timestamp(String.valueOf(init)) // add json object
.source(json); .source(json);
@ -244,7 +240,7 @@ public class SocialDataIndexerPlugin extends Plugin {
count++; count++;
}catch(Exception e){ }catch(Exception e){
logger.error("Skip inserting feed with id " + enhFeedUUID, e); logger.error("Skip inserting post with id " + enhPostUUID, e);
} }
} }
@ -257,48 +253,48 @@ public class SocialDataIndexerPlugin extends Plugin {
} }
/** /**
* Build enhanced feed and convert to json * Build enhanced post and convert to json
* @param feed to enhanced and convert * @param post to enhanced and convert
* @return json object * @return json object
* @throws Exception * @throws Exception
*/ */
private String enhanceAndConvertToJson(Feed feed) throws Exception { private String enhanceAndConvertToJson(Post post) throws Exception {
boolean isMultiFileUpload = feed.isMultiFileUpload(); boolean isMultiFileUpload = post.isMultiFileUpload();
// retrieve attachments // retrieve attachments
ArrayList<Attachment> attachments = new ArrayList<Attachment>(); ArrayList<Attachment> attachments = new ArrayList<Attachment>();
if (isMultiFileUpload) { if (isMultiFileUpload) {
logger.debug("Retrieving attachments for feed with id="+feed.getKey()); logger.debug("Retrieving attachments for post with id="+post.getKey());
attachments = (ArrayList<Attachment>) store.getAttachmentsByFeedId(feed.getKey()); attachments = (ArrayList<Attachment>) libClient.getAttachmentsByPostIdLib(post.getKey());
} }
// retrieve comments // retrieve comments
ArrayList<Comment> comments = getAllCommentsByFeed(feed.getKey()); ArrayList<Comment> comments = getAllCommentsByPost(post.getKey());
// build enhanced feed // build enhanced post
EnhancedFeed enFeed = new EnhancedFeed(feed, false, false, comments, attachments); EnhancedPost enPost = new EnhancedPost(post, false, false, comments, attachments);
// convert to json // convert to json
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(enFeed); return mapper.writeValueAsString(enPost);
} }
/** /**
* retrieve and sort comments given a feed id * retrieve and sort comments given a post id
* @param feedid * @param postid
* @return * @return
*/ */
private ArrayList<Comment> getAllCommentsByFeed(String feedid) { private ArrayList<Comment> getAllCommentsByPost(String postid) {
// logger.debug("Asking comments for " + feedid); // logger.debug("Asking comments for " + postid);
ArrayList<Comment> toReturn = (ArrayList<Comment>) store.getAllCommentByFeed(feedid); ArrayList<Comment> toReturn = (ArrayList<Comment>) libClient.getAllCommentsByPostIdLib(postid);
Collections.sort(toReturn); Collections.sort(toReturn);
return toReturn; return toReturn;
} }
/** /**
* Delete disabled feeds in the index * Delete disabled posts in the index
* @param timestamp delete feeds whose _timestamp is lower than timestamp * @param timestamp delete posts whose _timestamp is lower than timestamp
* @return * @return
*/ */
public void deleteDocumentsWithTimestampLowerThan(long timestamp) { public void deleteDocumentsWithTimestampLowerThan(long timestamp) {
@ -332,7 +328,7 @@ public class SocialDataIndexerPlugin extends Plugin {
} }
// more enhanced feeds requested // more enhanced posts requested
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
//Break condition: No hits are returned //Break condition: No hits are returned