Fixed typo: ampq -> amqp

This commit is contained in:
Samuele Santi 2013-09-19 11:43:03 +02:00
parent 584c340583
commit 611b9aab6d
1 changed files with 8 additions and 8 deletions

View File

@ -22,7 +22,7 @@ USERID = 'guest'
PASSWORD = 'guest'
HOSTNAME = 'localhost'
VIRTUAL_HOST = '/'
MQ_TYPE = 'ampq'
MQ_TYPE = 'amqp'
REDIS_PORT = 6379
REDIS_DB = 0
@ -32,13 +32,13 @@ EXCHANGE_NAME = 'ckan.harvest'
def get_connection():
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
if backend == 'ampq':
return get_connection_ampq()
if backend in ('amqp', 'ampq'): # "ampq" is for compat with old typo
return get_connection_amqp()
if backend == 'redis':
return get_connection_redis()
raise Exception('not a valid queue type %s' % backend)
def get_connection_ampq():
def get_connection_amqp():
try:
port = int(config.get('ckan.harvest.mq.port', PORT))
except ValueError:
@ -66,7 +66,7 @@ def get_connection_redis():
def purge_queues():
connection = get_connection()
if config.get('ckan.harvest.mq.type') == 'ampq':
if config.get('ckan.harvest.mq.type') in ('amqp', 'ampq'):
channel = connection.channel()
channel.queue_purge(queue='ckan.harvest.gather')
channel.queue_purge(queue='ckan.harvest.fetch')
@ -132,7 +132,7 @@ class RedisPublisher(object):
def get_publisher(routing_key):
connection = get_connection()
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
if backend == 'ampq':
if backend in ('amqp', 'ampq'):
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
return Publisher(connection,
@ -144,7 +144,7 @@ def get_publisher(routing_key):
class FakeMethod(object):
''' This is to act like the method returned by ampq'''
''' This is to act like the method returned by AMQP'''
def __init__(self, message):
self.delivery_tag = message
@ -174,7 +174,7 @@ def get_consumer(queue_name, routing_key):
connection = get_connection()
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
if backend == 'ampq':
if backend in ('amqp', 'ampq'):
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
channel.queue_declare(queue=queue_name, durable=True)