diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index e9b44f3..818530e 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -111,13 +111,13 @@ class Harvester(CkanCommand): from ckanext.harvest.queue import get_gather_consumer logging.getLogger('amqplib').setLevel(logging.INFO) consumer = get_gather_consumer() - consumer.wait() + consumer.start_consuming() elif cmd == 'fetch_consumer': import logging logging.getLogger('amqplib').setLevel(logging.INFO) from ckanext.harvest.queue import get_fetch_consumer consumer = get_fetch_consumer() - consumer.wait() + consumer.start_consuming() elif cmd == 'initdb': self.initdb() elif cmd == 'import': diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 819e790..2750a18 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -1,9 +1,8 @@ import logging import datetime +import json -from carrot.connection import BrokerConnection -from carrot.messaging import Publisher -from carrot.messaging import Consumer +import pika from ckan.lib.base import config from ckan.plugins import PluginImplementations @@ -27,9 +26,8 @@ VIRTUAL_HOST = '/' EXCHANGE_TYPE = 'direct' EXCHANGE_NAME = 'ckan.harvest' -def get_carrot_connection(): - backend = config.get('ckan.harvest.mq.library', 'pyamqplib') - log.debug("Carrot connection using %s backend" % backend) +def get_connection(): + try: port = int(config.get('ckan.harvest.mq.port', PORT)) except ValueError: @@ -39,30 +37,59 @@ def get_carrot_connection(): hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME) virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST) - backend_cls = 'carrot.backends.%s.Backend' % backend - return BrokerConnection(hostname=hostname, port=port, - userid=userid, password=password, - virtual_host=virtual_host, - backend_cls=backend_cls) + credentials = pika.PlainCredentials(userid, password) + parameters = pika.ConnectionParameters(host=hostname, + port=port, + virtual_host=virtual_host, + credentials=credentials, + frame_max=10000) + log.debug("pika connection using %s" % parameters.__dict__) + + return pika.BlockingConnection(parameters) + + +class Publisher(object): + def __init__(self, connection, channel, exchange, routing_key): + self.connection = connection + self.channel = channel + self.exchange = exchange + self.routing_key = routing_key + def send(self, body, **kw): + return self.channel.basic_publish(self.exchange, + self.routing_key, + json.dumps(body), + properties=pika.BasicProperties( + delivery_mode = 2, # make message persistent + ), + **kw) + def close(self): + self.connection.close() def get_publisher(routing_key): - return Publisher(connection=get_carrot_connection(), - exchange=EXCHANGE_NAME, - exchange_type=EXCHANGE_TYPE, + connection = get_connection() + channel = connection.channel() + channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) + return Publisher(connection, + channel, + EXCHANGE_NAME, routing_key=routing_key) + def get_consumer(queue_name, routing_key): - return Consumer(connection=get_carrot_connection(), - queue=queue_name, - routing_key=routing_key, - exchange=EXCHANGE_NAME, - exchange_type=EXCHANGE_TYPE, - durable=True, auto_delete=False) + + connection = get_connection() + channel = connection.channel() + + channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) + channel.queue_declare(queue=queue_name, durable=True) + channel.queue_bind(queue=queue_name, exchange=EXCHANGE_NAME, routing_key=routing_key) + + return channel -def gather_callback(message_data,message): +def gather_callback(channel, method, header, body): try: - id = message_data['harvest_job_id'] + id = json.loads(body)['harvest_job_id'] log.debug('Received harvest job id: %s' % id) # Get a publisher for the fetch queue @@ -107,12 +134,12 @@ def gather_callback(message_data,message): except KeyError: log.error('No harvest job id received') finally: - message.ack() + channel.basic_ack(method.delivery_tag) -def fetch_callback(message_data,message): +def fetch_callback(channel, method, header, body): try: - id = message_data['harvest_object_id'] + id = json.loads(body)['harvest_object_id'] log.info('Received harvest object id: %s' % id) try: @@ -141,17 +168,17 @@ def fetch_callback(message_data,message): except KeyError: log.error('No harvest object id received') finally: - message.ack() + channel.basic_ack(method.delivery_tag) def get_gather_consumer(): consumer = get_consumer('ckan.harvest.gather','harvest_job_id') - consumer.register_callback(gather_callback) + consumer.basic_consume(gather_callback, queue='ckan.harvest.gather') log.debug('Gather queue consumer registered') return consumer def get_fetch_consumer(): - consumer = get_consumer('ckan.harvert.fetch','harvest_object_id') - consumer.register_callback(fetch_callback) + consumer = get_consumer('ckan.harvest.fetch','harvest_object_id') + consumer.basic_consume(fetch_callback, queue='ckan.harvest.fetch') log.debug('Fetch queue consumer registered') return consumer diff --git a/pip-requirements.txt b/pip-requirements.txt index 5f4afbd..b376e1e 100644 --- a/pip-requirements.txt +++ b/pip-requirements.txt @@ -1,7 +1,3 @@ -# I'm not quite sure how to structure all this, but have put carrot -# dependency here for buildbot. This will probably need changing -# to suit the packaging system. - -carrot==0.10.1 +pika==0.9.6 ckanclient>=0.7 lxml==2.2.4