pika now used as queue library
This commit is contained in:
parent
c1f83e0d3e
commit
da125cdcc2
|
@ -111,13 +111,13 @@ class Harvester(CkanCommand):
|
|||
from ckanext.harvest.queue import get_gather_consumer
|
||||
logging.getLogger('amqplib').setLevel(logging.INFO)
|
||||
consumer = get_gather_consumer()
|
||||
consumer.wait()
|
||||
consumer.start_consuming()
|
||||
elif cmd == 'fetch_consumer':
|
||||
import logging
|
||||
logging.getLogger('amqplib').setLevel(logging.INFO)
|
||||
from ckanext.harvest.queue import get_fetch_consumer
|
||||
consumer = get_fetch_consumer()
|
||||
consumer.wait()
|
||||
consumer.start_consuming()
|
||||
elif cmd == 'initdb':
|
||||
self.initdb()
|
||||
elif cmd == 'import':
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
import logging
|
||||
import datetime
|
||||
import json
|
||||
|
||||
from carrot.connection import BrokerConnection
|
||||
from carrot.messaging import Publisher
|
||||
from carrot.messaging import Consumer
|
||||
import pika
|
||||
|
||||
from ckan.lib.base import config
|
||||
from ckan.plugins import PluginImplementations
|
||||
|
@ -28,8 +27,7 @@ EXCHANGE_TYPE = 'direct'
|
|||
EXCHANGE_NAME = 'ckan.harvest'
|
||||
|
||||
def get_carrot_connection():
|
||||
backend = config.get('ckan.harvest.mq.library', 'pyamqplib')
|
||||
log.debug("Carrot connection using %s backend" % backend)
|
||||
|
||||
try:
|
||||
port = int(config.get('ckan.harvest.mq.port', PORT))
|
||||
except ValueError:
|
||||
|
@ -39,30 +37,59 @@ def get_carrot_connection():
|
|||
hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME)
|
||||
virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST)
|
||||
|
||||
backend_cls = 'carrot.backends.%s.Backend' % backend
|
||||
return BrokerConnection(hostname=hostname, port=port,
|
||||
userid=userid, password=password,
|
||||
virtual_host=virtual_host,
|
||||
backend_cls=backend_cls)
|
||||
credentials = pika.PlainCredentials(userid, password)
|
||||
parameters = pika.ConnectionParameters(host=hostname,
|
||||
port=port,
|
||||
virtual_host=virtual_host,
|
||||
credentials=credentials,
|
||||
frame_max=10000)
|
||||
log.debug("pika connection using %s" % parameters.__dict__)
|
||||
|
||||
return pika.BlockingConnection(parameters)
|
||||
|
||||
|
||||
class Publisher(object):
|
||||
def __init__(self, connection, channel, exchange, routing_key):
|
||||
self.connection = connection
|
||||
self.channel = channel
|
||||
self.exchange = exchange
|
||||
self.routing_key = routing_key
|
||||
def send(self, body, **kw):
|
||||
return self.channel.basic_publish(self.exchange,
|
||||
self.routing_key,
|
||||
json.dumps(body),
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode = 2, # make message persistent
|
||||
),
|
||||
**kw)
|
||||
def close(self):
|
||||
self.connection.close()
|
||||
|
||||
def get_publisher(routing_key):
|
||||
return Publisher(connection=get_carrot_connection(),
|
||||
exchange=EXCHANGE_NAME,
|
||||
exchange_type=EXCHANGE_TYPE,
|
||||
connection = get_carrot_connection()
|
||||
channel = connection.channel()
|
||||
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
|
||||
return Publisher(connection,
|
||||
channel,
|
||||
EXCHANGE_NAME,
|
||||
routing_key=routing_key)
|
||||
|
||||
|
||||
def get_consumer(queue_name, routing_key):
|
||||
return Consumer(connection=get_carrot_connection(),
|
||||
queue=queue_name,
|
||||
routing_key=routing_key,
|
||||
exchange=EXCHANGE_NAME,
|
||||
exchange_type=EXCHANGE_TYPE,
|
||||
durable=True, auto_delete=False)
|
||||
|
||||
connection = get_carrot_connection()
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
|
||||
channel.queue_declare(queue=queue_name, durable=True)
|
||||
channel.queue_bind(queue=queue_name, exchange=EXCHANGE_NAME, routing_key=routing_key)
|
||||
|
||||
return channel
|
||||
|
||||
|
||||
def gather_callback(message_data,message):
|
||||
def gather_callback(channel, method, header, body):
|
||||
try:
|
||||
id = message_data['harvest_job_id']
|
||||
id = json.loads(body)['harvest_job_id']
|
||||
log.debug('Received harvest job id: %s' % id)
|
||||
|
||||
# Get a publisher for the fetch queue
|
||||
|
@ -107,12 +134,12 @@ def gather_callback(message_data,message):
|
|||
except KeyError:
|
||||
log.error('No harvest job id received')
|
||||
finally:
|
||||
message.ack()
|
||||
channel.basic_ack(method.delivery_tag)
|
||||
|
||||
|
||||
def fetch_callback(message_data,message):
|
||||
def fetch_callback(channel, method, header, body):
|
||||
try:
|
||||
id = message_data['harvest_object_id']
|
||||
id = json.loads(body)['harvest_object_id']
|
||||
log.info('Received harvest object id: %s' % id)
|
||||
|
||||
try:
|
||||
|
@ -141,17 +168,17 @@ def fetch_callback(message_data,message):
|
|||
except KeyError:
|
||||
log.error('No harvest object id received')
|
||||
finally:
|
||||
message.ack()
|
||||
channel.basic_ack(method.delivery_tag)
|
||||
|
||||
def get_gather_consumer():
|
||||
consumer = get_consumer('ckan.harvest.gather','harvest_job_id')
|
||||
consumer.register_callback(gather_callback)
|
||||
consumer.basic_consume(gather_callback, queue='ckan.harvest.gather')
|
||||
log.debug('Gather queue consumer registered')
|
||||
return consumer
|
||||
|
||||
def get_fetch_consumer():
|
||||
consumer = get_consumer('ckan.harvert.fetch','harvest_object_id')
|
||||
consumer.register_callback(fetch_callback)
|
||||
consumer = get_consumer('ckan.harvest.fetch','harvest_object_id')
|
||||
consumer.basic_consume(fetch_callback, queue='ckan.harvest.fetch')
|
||||
log.debug('Fetch queue consumer registered')
|
||||
return consumer
|
||||
|
||||
|
|
Loading…
Reference in New Issue