From c9c1eb48489646e0ac45316e999faf7013bd11b2 Mon Sep 17 00:00:00 2001 From: kindly Date: Thu, 15 Nov 2012 14:14:55 +0000 Subject: [PATCH] use generator to consume --- ckanext/harvest/commands/harvester.py | 10 ++++++---- ckanext/harvest/queue.py | 2 -- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 594858a..02a2d11 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -111,16 +111,18 @@ class Harvester(CkanCommand): self.run_harvester() elif cmd == 'gather_consumer': import logging - from ckanext.harvest.queue import get_gather_consumer + from ckanext.harvest.queue import get_gather_consumer, gather_callback logging.getLogger('amqplib').setLevel(logging.INFO) consumer = get_gather_consumer() - consumer.start_consuming() + for method, header, body in consumer.consume(queue='ckan.harvest.fetch'): + gather_callback(consumer, method, header, body) elif cmd == 'fetch_consumer': import logging logging.getLogger('amqplib').setLevel(logging.INFO) - from ckanext.harvest.queue import get_fetch_consumer + from ckanext.harvest.queue import get_fetch_consumer, fetch_callback consumer = get_fetch_consumer() - consumer.start_consuming() + for method, header, body in consumer.consume(queue='ckan.harvest.fetch'): + fetch_callback(consumer, method, header, body) elif cmd == 'purge_queues': from ckanext.harvest.queue import purge_queues purge_queues() diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 4ac7b57..1b76530 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -190,13 +190,11 @@ def fetch_callback(channel, method, header, body): def get_gather_consumer(): consumer = get_consumer('ckan.harvest.gather','harvest_job_id') - consumer.basic_consume(gather_callback, queue='ckan.harvest.gather') log.debug('Gather queue consumer registered') return consumer def get_fetch_consumer(): consumer = get_consumer('ckan.harvest.fetch','harvest_object_id') - consumer.basic_consume(fetch_callback, queue='ckan.harvest.fetch') log.debug('Fetch queue consumer registered') return consumer