Append site id to queue name

This allows multiple CKAN sites to share the same RabbitMQ exchange
(For the Redis backend this is handled via different Redis databases)
This commit is contained in:
amercader 2015-06-01 17:54:22 +01:00
parent 3e21ea4f82
commit 9f8aae3a18
1 changed files with 15 additions and 4 deletions

View File

@ -64,14 +64,25 @@ def get_connection_redis():
port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)), port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)),
db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB))) db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB)))
def get_gather_queue_name():
return 'ckan.harvest.{0}.gather'.format(config.get('ckan.site_id',
'default'))
def get_fetch_queue_name():
return 'ckan.harvest.{0}.fetch'.format(config.get('ckan.site_id',
'default'))
def purge_queues(): def purge_queues():
backend = config.get('ckan.harvest.mq.type', MQ_TYPE) backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
connection = get_connection() connection = get_connection()
if backend in ('amqp', 'ampq'): if backend in ('amqp', 'ampq'):
channel = connection.channel() channel = connection.channel()
channel.queue_purge(queue='ckan.harvest.gather') channel.queue_purge(queue=get_gather_queue_name())
channel.queue_purge(queue='ckan.harvest.fetch') channel.queue_purge(queue=get_fetch_queue_name())
return return
if backend == 'redis': if backend == 'redis':
connection.flushall() connection.flushall()
@ -334,12 +345,12 @@ def fetch_and_import_stages(harvester, obj):
obj.save() obj.save()
def get_gather_consumer(): def get_gather_consumer():
consumer = get_consumer('ckan.harvest.gather','harvest_job_id') consumer = get_consumer(get_gather_queue_name(), 'harvest_job_id')
log.debug('Gather queue consumer registered') log.debug('Gather queue consumer registered')
return consumer return consumer
def get_fetch_consumer(): def get_fetch_consumer():
consumer = get_consumer('ckan.harvest.fetch','harvest_object_id') consumer = get_consumer(get_fetch_queue_name(), 'harvest_object_id')
log.debug('Fetch queue consumer registered') log.debug('Fetch queue consumer registered')
return consumer return consumer