diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index f59102c..738ccae 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -65,9 +65,7 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue - WARNING: if using Redis, this command purges all data in the current - Redis database - + harvester clean_harvest_log - Clean-up mechanism for the harvest log table. You can configure the time frame through the configuration @@ -531,4 +529,4 @@ class Harvester(CkanCommand): condition = datetime.utcnow() - timedelta(days=log_timeframe) # Delete logs older then the given date - clean_harvest_log(condition=condition) \ No newline at end of file + clean_harvest_log(condition=condition) diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index e4c098b..32bb895 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -76,19 +76,17 @@ def get_fetch_queue_name(): return 'ckan.harvest.{0}.fetch'.format(config.get('ckan.site_id', 'default')) - def get_gather_routing_key(): - return '{0}:harvest_job_id'.format(config.get('ckan.site_id', - 'default')) + return 'ckanext-harvest:{0}:harvest_job_id'.format( + config.get('ckan.site_id', 'default')) def get_fetch_routing_key(): - return '{0}:harvest_object_id'.format(config.get('ckan.site_id', - 'default')) + return 'ckanext-harvest:{0}:harvest_object_id'.format( + config.get('ckan.site_id', 'default')) def purge_queues(): - backend = config.get('ckan.harvest.mq.type', MQ_TYPE) connection = get_connection() if backend in ('amqp', 'ampq'): @@ -97,10 +95,12 @@ def purge_queues(): log.info('AMQP queue purged: %s', get_gather_queue_name()) channel.queue_purge(queue=get_fetch_queue_name()) log.info('AMQP queue purged: %s', get_fetch_queue_name()) - return - if backend == 'redis': - connection.flushdb() - log.info('Redis database flushed') + elif backend == 'redis': + get_gather_consumer().queue_purge() + log.info('Redis gather queue purged') + get_fetch_consumer().queue_purge() + log.info('Redis fetch queue purged') + def resubmit_jobs(): ''' @@ -205,17 +205,40 @@ class RedisConsumer(object): yield (FakeMethod(body), self, body) def persistance_key(self, message): - # Persistance keys are constructed with - # {site-id}:{message-key}:{object-id}, eg: - # default:harvest_job_id:804f114a-8f68-4e7c-b124-3eb00f66202e + # If you change this, make sure to update the script in `queue_purge` message = json.loads(message) return self.routing_key + ':' + message[self.message_key] def basic_ack(self, message): self.redis.delete(self.persistance_key(message)) - def queue_purge(self, queue): - self.redis.flushdb() + def queue_purge(self, queue=None): + ''' + Purge the consumer's queue. + + The ``queue`` parameter exists only for compatibility and is + ignored. + ''' + # Use a script to make the operation atomic + lua_code = b''' + local routing_key = KEYS[1] + local message_key = ARGV[1] + local count = 0 + while true do + local s = redis.call("lpop", routing_key) + if s == false then + break + end + local value = cjson.decode(s) + local id = value[message_key] + local persistance_key = routing_key .. ":" .. id + redis.call("del", persistance_key) + count = count + 1 + end + return count + ''' + script = self.redis.register_script(lua_code) + return script(keys=[self.routing_key], args=[self.message_key]) def basic_get(self, queue): body = self.redis.lpop(self.routing_key) diff --git a/ckanext/harvest/tests/test_queue.py b/ckanext/harvest/tests/test_queue.py index bb381c7..ff66e9a 100644 --- a/ckanext/harvest/tests/test_queue.py +++ b/ckanext/harvest/tests/test_queue.py @@ -10,7 +10,10 @@ from ckan.plugins.core import SingletonPlugin, implements import json import ckan.logic as logic from ckan import model -from nose.tools import assert_equal +from nose.tools import assert_equal, ok_ +from ckan.lib.base import config +from nose.plugins.skip import SkipTest +import uuid class MockHarvester(SingletonPlugin): @@ -93,12 +96,10 @@ class TestHarvestQueue(object): def test_01_basic_harvester(self): ### make sure queues/exchanges are created first and are empty - consumer = queue.get_consumer('ckan.harvest.test.gather', - queue.get_gather_routing_key()) - consumer_fetch = queue.get_consumer('ckan.harvest.test.fetch', - queue.get_fetch_routing_key()) - consumer.queue_purge(queue='ckan.harvest.test.gather') - consumer_fetch.queue_purge(queue='ckan.harvest.test.fetch') + consumer = queue.get_gather_consumer() + consumer_fetch = queue.get_fetch_consumer() + consumer.queue_purge(queue=queue.get_gather_queue_name()) + consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name()) user = logic.get_action('get_site_user')( @@ -262,3 +263,41 @@ class TestHarvestQueue(object): assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1}) assert_equal(harvest_source_dict['status']['total_datasets'], 2) assert_equal(harvest_source_dict['status']['job_count'], 2) + + + def test_redis_queue_purging(self): + ''' + Test that Redis queue purging doesn't purge the wrong keys. + ''' + if config.get('ckan.harvest.mq.type') != 'redis': + raise SkipTest() + redis = queue.get_connection() + try: + redis.set('ckanext-harvest:some-random-key', 'foobar') + + # Create some fake jobs + gather_publisher = queue.get_gather_publisher() + gather_publisher.send({'harvest_job_id': str(uuid.uuid4())}) + gather_publisher.send({'harvest_job_id': str(uuid.uuid4())}) + fetch_publisher = queue.get_fetch_publisher() + fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())}) + fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())}) + num_keys = redis.dbsize() + + # Create some fake objects + gather_consumer = queue.get_gather_consumer() + next(gather_consumer.consume(queue.get_gather_queue_name())) + fetch_consumer = queue.get_fetch_consumer() + next(fetch_consumer.consume(queue.get_fetch_queue_name())) + + ok_(redis.dbsize() > num_keys) + + queue.purge_queues() + + assert_equal(redis.get('ckanext-harvest:some-random-key'), + 'foobar') + assert_equal(redis.dbsize(), num_keys) + assert_equal(redis.llen(queue.get_gather_routing_key()), 0) + assert_equal(redis.llen(queue.get_fetch_routing_key()), 0) + finally: + redis.delete('ckanext-harvest:some-random-key')