Merge branch 'pika' into release-v2.0

This commit is contained in:
amercader 2012-10-30 17:31:30 +00:00
commit 61b99e8eff
3 changed files with 59 additions and 36 deletions

View File

@ -111,13 +111,13 @@ class Harvester(CkanCommand):
from ckanext.harvest.queue import get_gather_consumer from ckanext.harvest.queue import get_gather_consumer
logging.getLogger('amqplib').setLevel(logging.INFO) logging.getLogger('amqplib').setLevel(logging.INFO)
consumer = get_gather_consumer() consumer = get_gather_consumer()
consumer.wait() consumer.start_consuming()
elif cmd == 'fetch_consumer': elif cmd == 'fetch_consumer':
import logging import logging
logging.getLogger('amqplib').setLevel(logging.INFO) logging.getLogger('amqplib').setLevel(logging.INFO)
from ckanext.harvest.queue import get_fetch_consumer from ckanext.harvest.queue import get_fetch_consumer
consumer = get_fetch_consumer() consumer = get_fetch_consumer()
consumer.wait() consumer.start_consuming()
elif cmd == 'initdb': elif cmd == 'initdb':
self.initdb() self.initdb()
elif cmd == 'import': elif cmd == 'import':

View File

@ -1,9 +1,8 @@
import logging import logging
import datetime import datetime
import json
from carrot.connection import BrokerConnection import pika
from carrot.messaging import Publisher
from carrot.messaging import Consumer
from ckan.lib.base import config from ckan.lib.base import config
from ckan.plugins import PluginImplementations from ckan.plugins import PluginImplementations
@ -27,9 +26,8 @@ VIRTUAL_HOST = '/'
EXCHANGE_TYPE = 'direct' EXCHANGE_TYPE = 'direct'
EXCHANGE_NAME = 'ckan.harvest' EXCHANGE_NAME = 'ckan.harvest'
def get_carrot_connection(): def get_connection():
backend = config.get('ckan.harvest.mq.library', 'pyamqplib')
log.debug("Carrot connection using %s backend" % backend)
try: try:
port = int(config.get('ckan.harvest.mq.port', PORT)) port = int(config.get('ckan.harvest.mq.port', PORT))
except ValueError: except ValueError:
@ -39,30 +37,59 @@ def get_carrot_connection():
hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME) hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME)
virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST) virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST)
backend_cls = 'carrot.backends.%s.Backend' % backend credentials = pika.PlainCredentials(userid, password)
return BrokerConnection(hostname=hostname, port=port, parameters = pika.ConnectionParameters(host=hostname,
userid=userid, password=password, port=port,
virtual_host=virtual_host, virtual_host=virtual_host,
backend_cls=backend_cls) credentials=credentials,
frame_max=10000)
log.debug("pika connection using %s" % parameters.__dict__)
return pika.BlockingConnection(parameters)
class Publisher(object):
def __init__(self, connection, channel, exchange, routing_key):
self.connection = connection
self.channel = channel
self.exchange = exchange
self.routing_key = routing_key
def send(self, body, **kw):
return self.channel.basic_publish(self.exchange,
self.routing_key,
json.dumps(body),
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
),
**kw)
def close(self):
self.connection.close()
def get_publisher(routing_key): def get_publisher(routing_key):
return Publisher(connection=get_carrot_connection(), connection = get_connection()
exchange=EXCHANGE_NAME, channel = connection.channel()
exchange_type=EXCHANGE_TYPE, channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
return Publisher(connection,
channel,
EXCHANGE_NAME,
routing_key=routing_key) routing_key=routing_key)
def get_consumer(queue_name, routing_key): def get_consumer(queue_name, routing_key):
return Consumer(connection=get_carrot_connection(),
queue=queue_name, connection = get_connection()
routing_key=routing_key, channel = connection.channel()
exchange=EXCHANGE_NAME,
exchange_type=EXCHANGE_TYPE, channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
durable=True, auto_delete=False) channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(queue=queue_name, exchange=EXCHANGE_NAME, routing_key=routing_key)
return channel
def gather_callback(message_data,message): def gather_callback(channel, method, header, body):
try: try:
id = message_data['harvest_job_id'] id = json.loads(body)['harvest_job_id']
log.debug('Received harvest job id: %s' % id) log.debug('Received harvest job id: %s' % id)
# Get a publisher for the fetch queue # Get a publisher for the fetch queue
@ -107,12 +134,12 @@ def gather_callback(message_data,message):
except KeyError: except KeyError:
log.error('No harvest job id received') log.error('No harvest job id received')
finally: finally:
message.ack() channel.basic_ack(method.delivery_tag)
def fetch_callback(message_data,message): def fetch_callback(channel, method, header, body):
try: try:
id = message_data['harvest_object_id'] id = json.loads(body)['harvest_object_id']
log.info('Received harvest object id: %s' % id) log.info('Received harvest object id: %s' % id)
try: try:
@ -141,17 +168,17 @@ def fetch_callback(message_data,message):
except KeyError: except KeyError:
log.error('No harvest object id received') log.error('No harvest object id received')
finally: finally:
message.ack() channel.basic_ack(method.delivery_tag)
def get_gather_consumer(): def get_gather_consumer():
consumer = get_consumer('ckan.harvest.gather','harvest_job_id') consumer = get_consumer('ckan.harvest.gather','harvest_job_id')
consumer.register_callback(gather_callback) consumer.basic_consume(gather_callback, queue='ckan.harvest.gather')
log.debug('Gather queue consumer registered') log.debug('Gather queue consumer registered')
return consumer return consumer
def get_fetch_consumer(): def get_fetch_consumer():
consumer = get_consumer('ckan.harvert.fetch','harvest_object_id') consumer = get_consumer('ckan.harvest.fetch','harvest_object_id')
consumer.register_callback(fetch_callback) consumer.basic_consume(fetch_callback, queue='ckan.harvest.fetch')
log.debug('Fetch queue consumer registered') log.debug('Fetch queue consumer registered')
return consumer return consumer

View File

@ -1,7 +1,3 @@
# I'm not quite sure how to structure all this, but have put carrot pika==0.9.6
# dependency here for buildbot. This will probably need changing
# to suit the packaging system.
carrot==0.10.1
ckanclient>=0.7 ckanclient>=0.7
lxml==2.2.4 lxml==2.2.4