Merge branch 'fix/rename-ampq-to-amqp' of git://github.com/opendatatrentino/ckanext-harvest into opendatatrentino-fix/rename-ampq-to-amqp

This commit is contained in:
amercader 2013-10-04 17:24:53 +01:00
commit f89f12203c
1 changed files with 8 additions and 8 deletions

View File

@ -22,7 +22,7 @@ USERID = 'guest'
PASSWORD = 'guest' PASSWORD = 'guest'
HOSTNAME = 'localhost' HOSTNAME = 'localhost'
VIRTUAL_HOST = '/' VIRTUAL_HOST = '/'
MQ_TYPE = 'ampq' MQ_TYPE = 'amqp'
REDIS_PORT = 6379 REDIS_PORT = 6379
REDIS_DB = 0 REDIS_DB = 0
@ -32,13 +32,13 @@ EXCHANGE_NAME = 'ckan.harvest'
def get_connection(): def get_connection():
backend = config.get('ckan.harvest.mq.type', MQ_TYPE) backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
if backend == 'ampq': if backend in ('amqp', 'ampq'): # "ampq" is for compat with old typo
return get_connection_ampq() return get_connection_amqp()
if backend == 'redis': if backend == 'redis':
return get_connection_redis() return get_connection_redis()
raise Exception('not a valid queue type %s' % backend) raise Exception('not a valid queue type %s' % backend)
def get_connection_ampq(): def get_connection_amqp():
try: try:
port = int(config.get('ckan.harvest.mq.port', PORT)) port = int(config.get('ckan.harvest.mq.port', PORT))
except ValueError: except ValueError:
@ -66,7 +66,7 @@ def get_connection_redis():
def purge_queues(): def purge_queues():
connection = get_connection() connection = get_connection()
if config.get('ckan.harvest.mq.type') == 'ampq': if config.get('ckan.harvest.mq.type') in ('amqp', 'ampq'):
channel = connection.channel() channel = connection.channel()
channel.queue_purge(queue='ckan.harvest.gather') channel.queue_purge(queue='ckan.harvest.gather')
channel.queue_purge(queue='ckan.harvest.fetch') channel.queue_purge(queue='ckan.harvest.fetch')
@ -132,7 +132,7 @@ class RedisPublisher(object):
def get_publisher(routing_key): def get_publisher(routing_key):
connection = get_connection() connection = get_connection()
backend = config.get('ckan.harvest.mq.type', MQ_TYPE) backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
if backend == 'ampq': if backend in ('amqp', 'ampq'):
channel = connection.channel() channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
return Publisher(connection, return Publisher(connection,
@ -144,7 +144,7 @@ def get_publisher(routing_key):
class FakeMethod(object): class FakeMethod(object):
''' This is to act like the method returned by ampq''' ''' This is to act like the method returned by AMQP'''
def __init__(self, message): def __init__(self, message):
self.delivery_tag = message self.delivery_tag = message
@ -174,7 +174,7 @@ def get_consumer(queue_name, routing_key):
connection = get_connection() connection = get_connection()
backend = config.get('ckan.harvest.mq.type', MQ_TYPE) backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
if backend == 'ampq': if backend in ('amqp', 'ampq'):
channel = connection.channel() channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
channel.queue_declare(queue=queue_name, durable=True) channel.queue_declare(queue=queue_name, durable=True)