Merge pull request #258 from torfsen/257-allow-sharing-of-redis-database

[#257] Purge only our own Redis data.
This commit is contained in:
David Read 2016-07-20 15:58:17 +01:00 committed by GitHub
commit 2208da6669
3 changed files with 86 additions and 26 deletions

View File

@ -65,9 +65,7 @@ class Harvester(CkanCommand):
harvester purge_queues harvester purge_queues
- removes all jobs from fetch and gather queue - removes all jobs from fetch and gather queue
WARNING: if using Redis, this command purges all data in the current
Redis database
harvester clean_harvest_log harvester clean_harvest_log
- Clean-up mechanism for the harvest log table. - Clean-up mechanism for the harvest log table.
You can configure the time frame through the configuration You can configure the time frame through the configuration
@ -531,4 +529,4 @@ class Harvester(CkanCommand):
condition = datetime.utcnow() - timedelta(days=log_timeframe) condition = datetime.utcnow() - timedelta(days=log_timeframe)
# Delete logs older then the given date # Delete logs older then the given date
clean_harvest_log(condition=condition) clean_harvest_log(condition=condition)

View File

@ -76,19 +76,17 @@ def get_fetch_queue_name():
return 'ckan.harvest.{0}.fetch'.format(config.get('ckan.site_id', return 'ckan.harvest.{0}.fetch'.format(config.get('ckan.site_id',
'default')) 'default'))
def get_gather_routing_key(): def get_gather_routing_key():
return '{0}:harvest_job_id'.format(config.get('ckan.site_id', return 'ckanext-harvest:{0}:harvest_job_id'.format(
'default')) config.get('ckan.site_id', 'default'))
def get_fetch_routing_key(): def get_fetch_routing_key():
return '{0}:harvest_object_id'.format(config.get('ckan.site_id', return 'ckanext-harvest:{0}:harvest_object_id'.format(
'default')) config.get('ckan.site_id', 'default'))
def purge_queues(): def purge_queues():
backend = config.get('ckan.harvest.mq.type', MQ_TYPE) backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
connection = get_connection() connection = get_connection()
if backend in ('amqp', 'ampq'): if backend in ('amqp', 'ampq'):
@ -97,10 +95,12 @@ def purge_queues():
log.info('AMQP queue purged: %s', get_gather_queue_name()) log.info('AMQP queue purged: %s', get_gather_queue_name())
channel.queue_purge(queue=get_fetch_queue_name()) channel.queue_purge(queue=get_fetch_queue_name())
log.info('AMQP queue purged: %s', get_fetch_queue_name()) log.info('AMQP queue purged: %s', get_fetch_queue_name())
return elif backend == 'redis':
if backend == 'redis': get_gather_consumer().queue_purge()
connection.flushdb() log.info('Redis gather queue purged')
log.info('Redis database flushed') get_fetch_consumer().queue_purge()
log.info('Redis fetch queue purged')
def resubmit_jobs(): def resubmit_jobs():
''' '''
@ -205,17 +205,40 @@ class RedisConsumer(object):
yield (FakeMethod(body), self, body) yield (FakeMethod(body), self, body)
def persistance_key(self, message): def persistance_key(self, message):
# Persistance keys are constructed with # If you change this, make sure to update the script in `queue_purge`
# {site-id}:{message-key}:{object-id}, eg:
# default:harvest_job_id:804f114a-8f68-4e7c-b124-3eb00f66202e
message = json.loads(message) message = json.loads(message)
return self.routing_key + ':' + message[self.message_key] return self.routing_key + ':' + message[self.message_key]
def basic_ack(self, message): def basic_ack(self, message):
self.redis.delete(self.persistance_key(message)) self.redis.delete(self.persistance_key(message))
def queue_purge(self, queue): def queue_purge(self, queue=None):
self.redis.flushdb() '''
Purge the consumer's queue.
The ``queue`` parameter exists only for compatibility and is
ignored.
'''
# Use a script to make the operation atomic
lua_code = b'''
local routing_key = KEYS[1]
local message_key = ARGV[1]
local count = 0
while true do
local s = redis.call("lpop", routing_key)
if s == false then
break
end
local value = cjson.decode(s)
local id = value[message_key]
local persistance_key = routing_key .. ":" .. id
redis.call("del", persistance_key)
count = count + 1
end
return count
'''
script = self.redis.register_script(lua_code)
return script(keys=[self.routing_key], args=[self.message_key])
def basic_get(self, queue): def basic_get(self, queue):
body = self.redis.lpop(self.routing_key) body = self.redis.lpop(self.routing_key)

View File

@ -10,7 +10,10 @@ from ckan.plugins.core import SingletonPlugin, implements
import json import json
import ckan.logic as logic import ckan.logic as logic
from ckan import model from ckan import model
from nose.tools import assert_equal from nose.tools import assert_equal, ok_
from ckan.lib.base import config
from nose.plugins.skip import SkipTest
import uuid
class MockHarvester(SingletonPlugin): class MockHarvester(SingletonPlugin):
@ -93,12 +96,10 @@ class TestHarvestQueue(object):
def test_01_basic_harvester(self): def test_01_basic_harvester(self):
### make sure queues/exchanges are created first and are empty ### make sure queues/exchanges are created first and are empty
consumer = queue.get_consumer('ckan.harvest.test.gather', consumer = queue.get_gather_consumer()
queue.get_gather_routing_key()) consumer_fetch = queue.get_fetch_consumer()
consumer_fetch = queue.get_consumer('ckan.harvest.test.fetch', consumer.queue_purge(queue=queue.get_gather_queue_name())
queue.get_fetch_routing_key()) consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
consumer.queue_purge(queue='ckan.harvest.test.gather')
consumer_fetch.queue_purge(queue='ckan.harvest.test.fetch')
user = logic.get_action('get_site_user')( user = logic.get_action('get_site_user')(
@ -262,3 +263,41 @@ class TestHarvestQueue(object):
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1}) assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1})
assert_equal(harvest_source_dict['status']['total_datasets'], 2) assert_equal(harvest_source_dict['status']['total_datasets'], 2)
assert_equal(harvest_source_dict['status']['job_count'], 2) assert_equal(harvest_source_dict['status']['job_count'], 2)
def test_redis_queue_purging(self):
'''
Test that Redis queue purging doesn't purge the wrong keys.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
raise SkipTest()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key', 'foobar')
# Create some fake jobs
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
num_keys = redis.dbsize()
# Create some fake objects
gather_consumer = queue.get_gather_consumer()
next(gather_consumer.consume(queue.get_gather_queue_name()))
fetch_consumer = queue.get_fetch_consumer()
next(fetch_consumer.consume(queue.get_fetch_queue_name()))
ok_(redis.dbsize() > num_keys)
queue.purge_queues()
assert_equal(redis.get('ckanext-harvest:some-random-key'),
'foobar')
assert_equal(redis.dbsize(), num_keys)
assert_equal(redis.llen(queue.get_gather_routing_key()), 0)
assert_equal(redis.llen(queue.get_fetch_routing_key()), 0)
finally:
redis.delete('ckanext-harvest:some-random-key')