From d1f84295f8726d6a83f28585ca76ea0a1487f9a1 Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 21 Oct 2015 16:12:40 +0000 Subject: [PATCH 1/2] purge_queues command now has warning about impact of Redis flushall, plus add some (log) output when you run a purge. --- ckanext/harvest/commands/harvester.py | 2 ++ ckanext/harvest/queue.py | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 7158c8c..03a6bb8 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -45,6 +45,8 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue + WARNING: if using Redis, this command purges any other data you have + in Redis too! harvester [-j] [-o] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, for a certain diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index bb1d63c..7e56a31 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -82,10 +82,13 @@ def purge_queues(): if backend in ('amqp', 'ampq'): channel = connection.channel() channel.queue_purge(queue=get_gather_queue_name()) + log.info('AMQP queue purged: %s', get_gather_queue_name()) channel.queue_purge(queue=get_fetch_queue_name()) + log.info('AMQP queue purged: %s', get_fetch_queue_name()) return if backend == 'redis': connection.flushall() + log.info('Redis flushed') def resubmit_jobs(): if config.get('ckan.harvest.mq.type') != 'redis': @@ -95,7 +98,7 @@ def resubmit_jobs(): for key in harvest_object_pending: date_of_key = datetime.datetime.strptime(redis.get(key), "%Y-%m-%d %H:%M:%S.%f") - if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max + if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max redis.rpush('harvest_object_id', json.dumps({'harvest_object_id': key.split(':')[-1]}) ) From 3c6cc55be078431b43681c30e24b097ce8485a90 Mon Sep 17 00:00:00 2001 From: amercader Date: Fri, 23 Oct 2015 11:52:22 +0100 Subject: [PATCH 2/2] Only flush keys on the current Redis database --- ckanext/harvest/commands/harvester.py | 4 ++-- ckanext/harvest/queue.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 03a6bb8..c0592e4 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -45,8 +45,8 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue - WARNING: if using Redis, this command purges any other data you have - in Redis too! + WARNING: if using Redis, this command purges all data in the current + Redis database harvester [-j] [-o] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, for a certain diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index e1a1180..76b0a35 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -87,8 +87,8 @@ def purge_queues(): log.info('AMQP queue purged: %s', get_fetch_queue_name()) return if backend == 'redis': - connection.flushall() - log.info('Redis flushed') + connection.flushdb() + log.info('Redis database flushed') def resubmit_jobs(): if config.get('ckan.harvest.mq.type') != 'redis': @@ -180,7 +180,7 @@ class RedisConsumer(object): def basic_ack(self, message): self.redis.delete(self.persistance_key(message)) def queue_purge(self, queue): - self.redis.flushall() + self.redis.flushdb() def basic_get(self, queue): body = self.redis.lpop(self.routing_key) return (FakeMethod(body), self, body)