diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 49fd4d9..cc23417 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -22,7 +22,7 @@ USERID = 'guest' PASSWORD = 'guest' HOSTNAME = 'localhost' VIRTUAL_HOST = '/' -MQ_TYPE = 'ampq' +MQ_TYPE = 'amqp' REDIS_PORT = 6379 REDIS_DB = 0 @@ -32,13 +32,13 @@ EXCHANGE_NAME = 'ckan.harvest' def get_connection(): backend = config.get('ckan.harvest.mq.type', MQ_TYPE) - if backend == 'ampq': - return get_connection_ampq() + if backend in ('amqp', 'ampq'): # "ampq" is for compat with old typo + return get_connection_amqp() if backend == 'redis': return get_connection_redis() raise Exception('not a valid queue type %s' % backend) -def get_connection_ampq(): +def get_connection_amqp(): try: port = int(config.get('ckan.harvest.mq.port', PORT)) except ValueError: @@ -66,7 +66,7 @@ def get_connection_redis(): def purge_queues(): connection = get_connection() - if config.get('ckan.harvest.mq.type') == 'ampq': + if config.get('ckan.harvest.mq.type') in ('amqp', 'ampq'): channel = connection.channel() channel.queue_purge(queue='ckan.harvest.gather') channel.queue_purge(queue='ckan.harvest.fetch') @@ -132,7 +132,7 @@ class RedisPublisher(object): def get_publisher(routing_key): connection = get_connection() backend = config.get('ckan.harvest.mq.type', MQ_TYPE) - if backend == 'ampq': + if backend in ('amqp', 'ampq'): channel = connection.channel() channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) return Publisher(connection, @@ -144,7 +144,7 @@ def get_publisher(routing_key): class FakeMethod(object): - ''' This is to act like the method returned by ampq''' + ''' This is to act like the method returned by AMQP''' def __init__(self, message): self.delivery_tag = message @@ -174,7 +174,7 @@ def get_consumer(queue_name, routing_key): connection = get_connection() backend = config.get('ckan.harvest.mq.type', MQ_TYPE) - if backend == 'ampq': + if backend in ('amqp', 'ampq'): channel = connection.channel() channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) channel.queue_declare(queue=queue_name, durable=True)