From dcfd201cdd7403fbca84f6c84c6b37c23596eb07 Mon Sep 17 00:00:00 2001 From: kindly Date: Sun, 21 Apr 2013 17:04:57 +0100 Subject: [PATCH] [#32] redis queue support --- ckanext/harvest/commands/harvester.py | 2 +- ckanext/harvest/logic/action/update.py | 4 +- ckanext/harvest/queue.py | 116 +++++++++++++++++++++---- 3 files changed, 105 insertions(+), 17 deletions(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 8abde60..df35965 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -118,7 +118,7 @@ class Harvester(CkanCommand): logging.getLogger('amqplib').setLevel(logging.INFO) consumer = get_gather_consumer() for method, header, body in consumer.consume(queue='ckan.harvest.gather'): - gather_callback(consumer, method, header, body) + gather_callback(consumer, method, header, body) elif cmd == 'fetch_consumer': import logging logging.getLogger('amqplib').setLevel(logging.INFO) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 59550ff..f57b988 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -20,7 +20,7 @@ from ckan import logic from ckan.logic import NotFound, check_access from ckanext.harvest.plugin import DATASET_TYPE_NAME -from ckanext.harvest.queue import get_gather_publisher +from ckanext.harvest.queue import get_gather_publisher, resubmit_jobs from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject from ckanext.harvest.logic import HarvestJobExists @@ -321,6 +321,8 @@ def harvest_jobs_run(context,data_dict): if package_dict: package_index.index_package(package_dict) + # resubmit old redis tasks + resubmit_jobs() # Check if there are pending harvest jobs jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'}) diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index fba5e96..2e2c462 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -22,13 +22,23 @@ USERID = 'guest' PASSWORD = 'guest' HOSTNAME = 'localhost' VIRTUAL_HOST = '/' +MQ_TYPE = 'ampq' +REDIS_PORT = 6379 +REDIS_DB = 0 # settings for AMQP EXCHANGE_TYPE = 'direct' EXCHANGE_NAME = 'ckan.harvest' def get_connection(): + backend = config.get('ckan.harvest.mq.type', MQ_TYPE) + if backend == 'ampq': + return get_connection_ampq() + if backend == 'redis': + return get_connection_redis() + raise Exception('not a valid queue type %s' % backend) +def get_connection_ampq(): try: port = int(config.get('ckan.harvest.mq.port', PORT)) except ValueError: @@ -48,11 +58,45 @@ def get_connection(): return pika.BlockingConnection(parameters) +def get_connection_redis(): + import redis + return redis.StrictRedis(host=config.get('ckan.harvest.mq.hostname', HOSTNAME), + port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)), + db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB))) + def purge_queues(): connection = get_connection() - channel = connection.channel() - channel.queue_purge(queue='ckan.harvest.gather') - channel.queue_purge(queue='ckan.harvest.fetch') + if config.get('ckan.harvest.mq.type') == 'ampq': + channel = connection.channel() + channel.queue_purge(queue='ckan.harvest.gather') + channel.queue_purge(queue='ckan.harvest.fetch') + return + if config.get('ckan.harvest.mq.type') == 'redis': + connection.flushall() + +def resubmit_jobs(): + if config.get('ckan.harvest.mq.type') != 'redis': + return + redis = get_connection() + harvest_object_pending = redis.keys('harvest_object_id:*') + for key in harvest_object_pending: + date_of_key = datetime.datetime.strptime(redis.get(key), + "%Y-%m-%d %H:%M:%S.%f") + if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max + redis.rpush('harvest_object_id', + json.dumps({'harvest_object_id': key.split(':')[-1]}) + ) + redis.delete(key) + + harvest_jobs_pending = redis.keys('harvest_job_id:*') + for key in harvest_jobs_pending: + date_of_key = datetime.datetime.strptime(redis.get(key), + "%Y-%m-%d %H:%M:%S.%f") + if (datetime.datetime.now() - date_of_key).seconds > 7200: # 3 hours for a gather + redis.rpush('harvest_job_id', + json.dumps({'harvest_job_id': key.split(':')[-1]}) + ) + redis.delete(key) class Publisher(object): def __init__(self, connection, channel, exchange, routing_key): @@ -71,26 +115,68 @@ class Publisher(object): def close(self): self.connection.close() +class RedisPublisher(object): + def __init__(self, redis, routing_key): + self.redis = redis ## not used + self.routing_key = routing_key + def send(self, body, **kw): + value = json.dumps(body) + # remove if already there + if self.routing_key == 'harvest_job_id': + self.redis.lrem(self.routing_key, 0, value) + self.redis.rpush(self.routing_key, value) + def close(self): + return + def get_publisher(routing_key): connection = get_connection() - channel = connection.channel() - channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) - return Publisher(connection, - channel, - EXCHANGE_NAME, - routing_key=routing_key) + backend = config.get('ckan.harvest.mq.type', MQ_TYPE) + if backend == 'ampq': + channel = connection.channel() + channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) + return Publisher(connection, + channel, + EXCHANGE_NAME, + routing_key=routing_key) + if backend == 'redis': + return RedisPublisher(connection, routing_key) + + +class FakeMethod(object): + ''' This is to act like the method returned by ampq''' + def __init__(self, message): + self.delivery_tag = message + +class RedisConsumer(object): + def __init__(self, redis, routing_key): + self.redis = redis + self.routing_key = routing_key + def consume(self, queue): + while True: + key, body = self.redis.blpop(self.routing_key) + self.redis.set(self.persistance_key(body), + str(datetime.datetime.now())) + yield (FakeMethod(body), self, body) + def persistance_key(self, message): + message = json.loads(message) + return self.routing_key + ':' + message[self.routing_key] + def basic_ack(self, message): + self.redis.delete(self.persistance_key(message)) def get_consumer(queue_name, routing_key): connection = get_connection() - channel = connection.channel() + backend = config.get('ckan.harvest.mq.type', MQ_TYPE) - channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) - channel.queue_declare(queue=queue_name, durable=True) - channel.queue_bind(queue=queue_name, exchange=EXCHANGE_NAME, routing_key=routing_key) - - return channel + if backend == 'ampq': + channel = connection.channel() + channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) + channel.queue_declare(queue=queue_name, durable=True) + channel.queue_bind(queue=queue_name, exchange=EXCHANGE_NAME, routing_key=routing_key) + return channel + if backend == 'redis': + return RedisConsumer(connection, routing_key) def gather_callback(channel, method, header, body):