use generator to consume
This commit is contained in:
parent
33d5e09722
commit
c9c1eb4848
|
@ -111,16 +111,18 @@ class Harvester(CkanCommand):
|
|||
self.run_harvester()
|
||||
elif cmd == 'gather_consumer':
|
||||
import logging
|
||||
from ckanext.harvest.queue import get_gather_consumer
|
||||
from ckanext.harvest.queue import get_gather_consumer, gather_callback
|
||||
logging.getLogger('amqplib').setLevel(logging.INFO)
|
||||
consumer = get_gather_consumer()
|
||||
consumer.start_consuming()
|
||||
for method, header, body in consumer.consume(queue='ckan.harvest.fetch'):
|
||||
gather_callback(consumer, method, header, body)
|
||||
elif cmd == 'fetch_consumer':
|
||||
import logging
|
||||
logging.getLogger('amqplib').setLevel(logging.INFO)
|
||||
from ckanext.harvest.queue import get_fetch_consumer
|
||||
from ckanext.harvest.queue import get_fetch_consumer, fetch_callback
|
||||
consumer = get_fetch_consumer()
|
||||
consumer.start_consuming()
|
||||
for method, header, body in consumer.consume(queue='ckan.harvest.fetch'):
|
||||
fetch_callback(consumer, method, header, body)
|
||||
elif cmd == 'purge_queues':
|
||||
from ckanext.harvest.queue import purge_queues
|
||||
purge_queues()
|
||||
|
|
|
@ -190,13 +190,11 @@ def fetch_callback(channel, method, header, body):
|
|||
|
||||
def get_gather_consumer():
|
||||
consumer = get_consumer('ckan.harvest.gather','harvest_job_id')
|
||||
consumer.basic_consume(gather_callback, queue='ckan.harvest.gather')
|
||||
log.debug('Gather queue consumer registered')
|
||||
return consumer
|
||||
|
||||
def get_fetch_consumer():
|
||||
consumer = get_consumer('ckan.harvest.fetch','harvest_object_id')
|
||||
consumer.basic_consume(fetch_callback, queue='ckan.harvest.fetch')
|
||||
log.debug('Fetch queue consumer registered')
|
||||
return consumer
|
||||
|
||||
|
|
Loading…
Reference in New Issue