diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 6ea3c76..864d6c4 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -40,6 +40,9 @@ class Harvester(CkanCommand): harvester fetch_consumer - starts the consumer for the fetching queue + harvester perge_queues + - removes all jobs from fetch and gather queue + harvester [-j] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, optionally belonging to a certain source. Please note that no objects will be fetched from the remote server. It will only affect @@ -118,6 +121,9 @@ class Harvester(CkanCommand): from ckanext.harvest.queue import get_fetch_consumer consumer = get_fetch_consumer() consumer.start_consuming() + elif cmd == 'purge_queues': + from ckanext.harvest.queue import purge_queues + purge_queues() elif cmd == 'initdb': self.initdb() elif cmd == 'import': diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 2860510..7eeb6ca 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -47,6 +47,11 @@ def get_connection(): return pika.BlockingConnection(parameters) +def purge_queues(): + connection = get_connection() + channel = connection.channel() + channel.queue_purge(queue='ckan.harvest.gather') + channel.queue_purge(queue='ckan.harvest.fetch') class Publisher(object): def __init__(self, connection, channel, exchange, routing_key):