Documents older than a certain timestamp will be now automatically removed after indexing of newer posts (or updates of them)

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/social-networking/social-data-indexer-se-plugin@124391 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Costantino Perciante 2016-02-19 17:42:45 +00:00
parent bc2bb8d0a9
commit 983ee1d6c4
1 changed files with 55 additions and 17 deletions

View File

@ -17,6 +17,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -24,13 +25,15 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.gcube.portal.databook.server.DBCassandraAstyanaxImpl; import org.gcube.portal.databook.server.DBCassandraAstyanaxImpl;
import org.gcube.portal.databook.server.DatabookStore; import org.gcube.portal.databook.server.DatabookStore;
import org.gcube.portal.databook.shared.Attachment; import org.gcube.portal.databook.shared.Attachment;
import org.gcube.portal.databook.shared.Comment; import org.gcube.portal.databook.shared.Comment;
import org.gcube.portal.databook.shared.EnhancedFeed; import org.gcube.portal.databook.shared.EnhancedFeed;
import org.gcube.portal.databook.shared.Feed; import org.gcube.portal.databook.shared.Feed;
import org.gcube.portal.databook.shared.FeedType;
import org.gcube.socialnetworking.social_data_indexing_common.ex.BulkInsertionFailedException; import org.gcube.socialnetworking.social_data_indexing_common.ex.BulkInsertionFailedException;
import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster; import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster;
import org.gcube.socialnetworking.social_data_indexing_common.utils.IndexFields; import org.gcube.socialnetworking.social_data_indexing_common.utils.IndexFields;
@ -139,7 +142,7 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
for (String vreID : vreIds) { for (String vreID : vreIds) {
try{ try{
List<Feed> feeds = store.getAllFeedsByVRE(vreID); List<Feed> feeds = store.getAllFeedsByVRE(vreID);
addEnhancedFeedsInBulk(feeds); addEnhancedFeedsInBulk(feeds, init);
logger.debug("Number of indexed feeds is " + feeds.size() + " for vre " + vreID); logger.debug("Number of indexed feeds is " + feeds.size() + " for vre " + vreID);
}catch(Exception e){ }catch(Exception e){
logger.debug("Exception while saving feeds/comments into the index for vre " + vreID, e); logger.debug("Exception while saving feeds/comments into the index for vre " + vreID, e);
@ -148,6 +151,9 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
} }
// delete documents with timestamp less than init
deleteDocumentWithTimestampLowerThan(init);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
logger.debug("Synchronization thread ends running. It took " + (end - init) + " milliseconds " + logger.debug("Synchronization thread ends running. It took " + (end - init) + " milliseconds " +
" that is " + (double)(end - init)/(1000.0 * 60.0) + " minutes."); " that is " + (double)(end - init)/(1000.0 * 60.0) + " minutes.");
@ -172,9 +178,10 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
/** /**
* Add feeds into the elasticsearch index. * Add feeds into the elasticsearch index.
* @param feeds * @param feeds
* @param init is the timestamp that will be put in the document
* @throws BulkInsertionFailedException * @throws BulkInsertionFailedException
*/ */
private void addEnhancedFeedsInBulk(List<Feed> feeds) throws BulkInsertionFailedException { private void addEnhancedFeedsInBulk(List<Feed> feeds, long init) throws BulkInsertionFailedException {
logger.debug("Starting bulk insert enhanced feeds operation"); logger.debug("Starting bulk insert enhanced feeds operation");
BulkProcessor bulkProcessor = BulkProcessor.builder( BulkProcessor bulkProcessor = BulkProcessor.builder(
client, client,
@ -223,19 +230,15 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
// save feeds // save feeds
for (Feed feed: feeds) { for (Feed feed: feeds) {
// skip disabled feeds but delete from the index (they could be present)
if(feed.getType().equals(FeedType.DISABLED)){
deleteDocument(feed.getKey());
continue;
}
String enhFeedUUID = null; String enhFeedUUID = null;
try{ try{
// enhance and convert // enhance and convert
String json = enhanceAndConvertToJson(feed); String json = enhanceAndConvertToJson(feed);
enhFeedUUID = feed.getKey(); enhFeedUUID = feed.getKey();
IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhFeedUUID) IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhFeedUUID)
.source(json); .timestamp(Long.toString(init)) // set timestamp
.source(json); // add json object
bulkProcessor.add(ind); bulkProcessor.add(ind);
}catch(Exception e){ }catch(Exception e){
logger.error("Skipping insert feed with id " + enhFeedUUID, e); logger.error("Skipping insert feed with id " + enhFeedUUID, e);
@ -292,13 +295,48 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
* @param docID * @param docID
* @return * @return
*/ */
private boolean deleteDocument(String docID) { public void deleteDocumentWithTimestampLowerThan(long timestamp) {
if(docID == null || docID.isEmpty())
return false; logger.debug("Removing docs with timestamp lower than " + timestamp);
logger.debug("Removing doc with id " + docID);
DeleteResponse response = client.prepareDelete(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, docID).get(); // query on timestamp field
logger.debug("doc found? " + response.isFound()); BoolQueryBuilder filter = QueryBuilders.boolQuery();
return response.isFound(); filter.must(QueryBuilders.matchAllQuery());
filter.filter(QueryBuilders.rangeQuery("_timestamp").gte(0).lt(timestamp));
logger.debug(filter.toString());
SearchResponse scrollResp = client.prepareSearch(IndexFields.INDEX_NAME)
.setSize(100)
.setScroll(new TimeValue(60000))
.setQuery(filter)
.execute()
.actionGet();
int deleteDocs = 0;
//Scroll until no hits are returned
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
String docID = hit.getId();
DeleteResponse response = client.prepareDelete(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, docID).get();
logger.debug("deleting doc with id = " + docID + "...found? " + response.isFound()); // found of course..
if(response.isFound())
deleteDocs ++;
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
//Break condition: No hits are returned
if (scrollResp.getHits().getHits().length == 0) {
logger.debug("No more hits to delete");
break;
}
}
logger.debug("Number of delete documents is " + deleteDocs);
} }
/**{@inheritDoc}*/ /**{@inheritDoc}*/