From d1f84295f8726d6a83f28585ca76ea0a1487f9a1 Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 21 Oct 2015 16:12:40 +0000 Subject: [PATCH] purge_queues command now has warning about impact of Redis flushall, plus add some (log) output when you run a purge. --- ckanext/harvest/commands/harvester.py | 2 ++ ckanext/harvest/queue.py | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 7158c8c..03a6bb8 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -45,6 +45,8 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue + WARNING: if using Redis, this command purges any other data you have + in Redis too! harvester [-j] [-o] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, for a certain diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index bb1d63c..7e56a31 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -82,10 +82,13 @@ def purge_queues(): if backend in ('amqp', 'ampq'): channel = connection.channel() channel.queue_purge(queue=get_gather_queue_name()) + log.info('AMQP queue purged: %s', get_gather_queue_name()) channel.queue_purge(queue=get_fetch_queue_name()) + log.info('AMQP queue purged: %s', get_fetch_queue_name()) return if backend == 'redis': connection.flushall() + log.info('Redis flushed') def resubmit_jobs(): if config.get('ckan.harvest.mq.type') != 'redis': @@ -95,7 +98,7 @@ def resubmit_jobs(): for key in harvest_object_pending: date_of_key = datetime.datetime.strptime(redis.get(key), "%Y-%m-%d %H:%M:%S.%f") - if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max + if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max redis.rpush('harvest_object_id', json.dumps({'harvest_object_id': key.split(':')[-1]}) )