diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 7158c8c..c0592e4 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -45,6 +45,8 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue + WARNING: if using Redis, this command purges all data in the current + Redis database harvester [-j] [-o] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, for a certain diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index c523936..76b0a35 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -82,10 +82,13 @@ def purge_queues(): if backend in ('amqp', 'ampq'): channel = connection.channel() channel.queue_purge(queue=get_gather_queue_name()) + log.info('AMQP queue purged: %s', get_gather_queue_name()) channel.queue_purge(queue=get_fetch_queue_name()) + log.info('AMQP queue purged: %s', get_fetch_queue_name()) return if backend == 'redis': - connection.flushall() + connection.flushdb() + log.info('Redis database flushed') def resubmit_jobs(): if config.get('ckan.harvest.mq.type') != 'redis': @@ -95,7 +98,7 @@ def resubmit_jobs(): for key in harvest_object_pending: date_of_key = datetime.datetime.strptime(redis.get(key), "%Y-%m-%d %H:%M:%S.%f") - if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max + if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max redis.rpush('harvest_object_id', json.dumps({'harvest_object_id': key.split(':')[-1]}) ) @@ -177,7 +180,7 @@ class RedisConsumer(object): def basic_ack(self, message): self.redis.delete(self.persistance_key(message)) def queue_purge(self, queue): - self.redis.flushall() + self.redis.flushdb() def basic_get(self, queue): body = self.redis.lpop(self.routing_key) return (FakeMethod(body), self, body)