add perge queues command

This commit is contained in:
kindly 2012-11-07 09:51:25 +00:00
parent 6db65b5826
commit 28e5e9137a
2 changed files with 11 additions and 0 deletions

View File

@ -40,6 +40,9 @@ class Harvester(CkanCommand):
harvester fetch_consumer harvester fetch_consumer
- starts the consumer for the fetching queue - starts the consumer for the fetching queue
harvester perge_queues
- removes all jobs from fetch and gather queue
harvester [-j] [--segments={segments}] import [{source-id}] harvester [-j] [--segments={segments}] import [{source-id}]
- perform the import stage with the last fetched objects, optionally belonging to a certain source. - perform the import stage with the last fetched objects, optionally belonging to a certain source.
Please note that no objects will be fetched from the remote server. It will only affect Please note that no objects will be fetched from the remote server. It will only affect
@ -118,6 +121,9 @@ class Harvester(CkanCommand):
from ckanext.harvest.queue import get_fetch_consumer from ckanext.harvest.queue import get_fetch_consumer
consumer = get_fetch_consumer() consumer = get_fetch_consumer()
consumer.start_consuming() consumer.start_consuming()
elif cmd == 'purge_queues':
from ckanext.harvest.queue import purge_queues
purge_queues()
elif cmd == 'initdb': elif cmd == 'initdb':
self.initdb() self.initdb()
elif cmd == 'import': elif cmd == 'import':

View File

@ -47,6 +47,11 @@ def get_connection():
return pika.BlockingConnection(parameters) return pika.BlockingConnection(parameters)
def purge_queues():
connection = get_connection()
channel = connection.channel()
channel.queue_purge(queue='ckan.harvest.gather')
channel.queue_purge(queue='ckan.harvest.fetch')
class Publisher(object): class Publisher(object):
def __init__(self, connection, channel, exchange, routing_key): def __init__(self, connection, channel, exchange, routing_key):