From 2602de9094812f7175569f74c8543cc1b37bec9c Mon Sep 17 00:00:00 2001 From: Florian Brucker Date: Wed, 20 Jul 2016 14:14:33 +0200 Subject: [PATCH] [#257] Purge only our own Redis data. Previously purging the queue on the Redis backend would clear the whole database, making it hard to share the same database with other parts of CKAN. With this commit, only the keys that belong to ckanext-harvest and the current CKAN instance are purged. --- ckanext/harvest/commands/harvester.py | 6 +-- ckanext/harvest/queue.py | 53 +++++++++++++++++++-------- ckanext/harvest/tests/test_queue.py | 53 +++++++++++++++++++++++---- 3 files changed, 86 insertions(+), 26 deletions(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index f59102c..738ccae 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -65,9 +65,7 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue - WARNING: if using Redis, this command purges all data in the current - Redis database - + harvester clean_harvest_log - Clean-up mechanism for the harvest log table. You can configure the time frame through the configuration @@ -531,4 +529,4 @@ class Harvester(CkanCommand): condition = datetime.utcnow() - timedelta(days=log_timeframe) # Delete logs older then the given date - clean_harvest_log(condition=condition) \ No newline at end of file + clean_harvest_log(condition=condition) diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index e4c098b..32bb895 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -76,19 +76,17 @@ def get_fetch_queue_name(): return 'ckan.harvest.{0}.fetch'.format(config.get('ckan.site_id', 'default')) - def get_gather_routing_key(): - return '{0}:harvest_job_id'.format(config.get('ckan.site_id', - 'default')) + return 'ckanext-harvest:{0}:harvest_job_id'.format( + config.get('ckan.site_id', 'default')) def get_fetch_routing_key(): - return '{0}:harvest_object_id'.format(config.get('ckan.site_id', - 'default')) + return 'ckanext-harvest:{0}:harvest_object_id'.format( + config.get('ckan.site_id', 'default')) def purge_queues(): - backend = config.get('ckan.harvest.mq.type', MQ_TYPE) connection = get_connection() if backend in ('amqp', 'ampq'): @@ -97,10 +95,12 @@ def purge_queues(): log.info('AMQP queue purged: %s', get_gather_queue_name()) channel.queue_purge(queue=get_fetch_queue_name()) log.info('AMQP queue purged: %s', get_fetch_queue_name()) - return - if backend == 'redis': - connection.flushdb() - log.info('Redis database flushed') + elif backend == 'redis': + get_gather_consumer().queue_purge() + log.info('Redis gather queue purged') + get_fetch_consumer().queue_purge() + log.info('Redis fetch queue purged') + def resubmit_jobs(): ''' @@ -205,17 +205,40 @@ class RedisConsumer(object): yield (FakeMethod(body), self, body) def persistance_key(self, message): - # Persistance keys are constructed with - # {site-id}:{message-key}:{object-id}, eg: - # default:harvest_job_id:804f114a-8f68-4e7c-b124-3eb00f66202e + # If you change this, make sure to update the script in `queue_purge` message = json.loads(message) return self.routing_key + ':' + message[self.message_key] def basic_ack(self, message): self.redis.delete(self.persistance_key(message)) - def queue_purge(self, queue): - self.redis.flushdb() + def queue_purge(self, queue=None): + ''' + Purge the consumer's queue. + + The ``queue`` parameter exists only for compatibility and is + ignored. + ''' + # Use a script to make the operation atomic + lua_code = b''' + local routing_key = KEYS[1] + local message_key = ARGV[1] + local count = 0 + while true do + local s = redis.call("lpop", routing_key) + if s == false then + break + end + local value = cjson.decode(s) + local id = value[message_key] + local persistance_key = routing_key .. ":" .. id + redis.call("del", persistance_key) + count = count + 1 + end + return count + ''' + script = self.redis.register_script(lua_code) + return script(keys=[self.routing_key], args=[self.message_key]) def basic_get(self, queue): body = self.redis.lpop(self.routing_key) diff --git a/ckanext/harvest/tests/test_queue.py b/ckanext/harvest/tests/test_queue.py index bb381c7..ff66e9a 100644 --- a/ckanext/harvest/tests/test_queue.py +++ b/ckanext/harvest/tests/test_queue.py @@ -10,7 +10,10 @@ from ckan.plugins.core import SingletonPlugin, implements import json import ckan.logic as logic from ckan import model -from nose.tools import assert_equal +from nose.tools import assert_equal, ok_ +from ckan.lib.base import config +from nose.plugins.skip import SkipTest +import uuid class MockHarvester(SingletonPlugin): @@ -93,12 +96,10 @@ class TestHarvestQueue(object): def test_01_basic_harvester(self): ### make sure queues/exchanges are created first and are empty - consumer = queue.get_consumer('ckan.harvest.test.gather', - queue.get_gather_routing_key()) - consumer_fetch = queue.get_consumer('ckan.harvest.test.fetch', - queue.get_fetch_routing_key()) - consumer.queue_purge(queue='ckan.harvest.test.gather') - consumer_fetch.queue_purge(queue='ckan.harvest.test.fetch') + consumer = queue.get_gather_consumer() + consumer_fetch = queue.get_fetch_consumer() + consumer.queue_purge(queue=queue.get_gather_queue_name()) + consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name()) user = logic.get_action('get_site_user')( @@ -262,3 +263,41 @@ class TestHarvestQueue(object): assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1}) assert_equal(harvest_source_dict['status']['total_datasets'], 2) assert_equal(harvest_source_dict['status']['job_count'], 2) + + + def test_redis_queue_purging(self): + ''' + Test that Redis queue purging doesn't purge the wrong keys. + ''' + if config.get('ckan.harvest.mq.type') != 'redis': + raise SkipTest() + redis = queue.get_connection() + try: + redis.set('ckanext-harvest:some-random-key', 'foobar') + + # Create some fake jobs + gather_publisher = queue.get_gather_publisher() + gather_publisher.send({'harvest_job_id': str(uuid.uuid4())}) + gather_publisher.send({'harvest_job_id': str(uuid.uuid4())}) + fetch_publisher = queue.get_fetch_publisher() + fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())}) + fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())}) + num_keys = redis.dbsize() + + # Create some fake objects + gather_consumer = queue.get_gather_consumer() + next(gather_consumer.consume(queue.get_gather_queue_name())) + fetch_consumer = queue.get_fetch_consumer() + next(fetch_consumer.consume(queue.get_fetch_queue_name())) + + ok_(redis.dbsize() > num_keys) + + queue.purge_queues() + + assert_equal(redis.get('ckanext-harvest:some-random-key'), + 'foobar') + assert_equal(redis.dbsize(), num_keys) + assert_equal(redis.llen(queue.get_gather_routing_key()), 0) + assert_equal(redis.llen(queue.get_fetch_routing_key()), 0) + finally: + redis.delete('ckanext-harvest:some-random-key')