diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index b8c4131..bb1d63c 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -64,14 +64,25 @@ def get_connection_redis(): port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)), db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB))) + +def get_gather_queue_name(): + return 'ckan.harvest.{0}.gather'.format(config.get('ckan.site_id', + 'default')) + + +def get_fetch_queue_name(): + return 'ckan.harvest.{0}.fetch'.format(config.get('ckan.site_id', + 'default')) + + def purge_queues(): backend = config.get('ckan.harvest.mq.type', MQ_TYPE) connection = get_connection() if backend in ('amqp', 'ampq'): channel = connection.channel() - channel.queue_purge(queue='ckan.harvest.gather') - channel.queue_purge(queue='ckan.harvest.fetch') + channel.queue_purge(queue=get_gather_queue_name()) + channel.queue_purge(queue=get_fetch_queue_name()) return if backend == 'redis': connection.flushall() @@ -334,12 +345,12 @@ def fetch_and_import_stages(harvester, obj): obj.save() def get_gather_consumer(): - consumer = get_consumer('ckan.harvest.gather','harvest_job_id') + consumer = get_consumer(get_gather_queue_name(), 'harvest_job_id') log.debug('Gather queue consumer registered') return consumer def get_fetch_consumer(): - consumer = get_consumer('ckan.harvest.fetch','harvest_object_id') + consumer = get_consumer(get_fetch_queue_name(), 'harvest_object_id') log.debug('Fetch queue consumer registered') return consumer