[#32] redis queue support
This commit is contained in:
parent
0ce59a29b6
commit
dcfd201cdd
|
@ -118,7 +118,7 @@ class Harvester(CkanCommand):
|
||||||
logging.getLogger('amqplib').setLevel(logging.INFO)
|
logging.getLogger('amqplib').setLevel(logging.INFO)
|
||||||
consumer = get_gather_consumer()
|
consumer = get_gather_consumer()
|
||||||
for method, header, body in consumer.consume(queue='ckan.harvest.gather'):
|
for method, header, body in consumer.consume(queue='ckan.harvest.gather'):
|
||||||
gather_callback(consumer, method, header, body)
|
gather_callback(consumer, method, header, body)
|
||||||
elif cmd == 'fetch_consumer':
|
elif cmd == 'fetch_consumer':
|
||||||
import logging
|
import logging
|
||||||
logging.getLogger('amqplib').setLevel(logging.INFO)
|
logging.getLogger('amqplib').setLevel(logging.INFO)
|
||||||
|
|
|
@ -20,7 +20,7 @@ from ckan import logic
|
||||||
from ckan.logic import NotFound, check_access
|
from ckan.logic import NotFound, check_access
|
||||||
|
|
||||||
from ckanext.harvest.plugin import DATASET_TYPE_NAME
|
from ckanext.harvest.plugin import DATASET_TYPE_NAME
|
||||||
from ckanext.harvest.queue import get_gather_publisher
|
from ckanext.harvest.queue import get_gather_publisher, resubmit_jobs
|
||||||
|
|
||||||
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
|
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
|
||||||
from ckanext.harvest.logic import HarvestJobExists
|
from ckanext.harvest.logic import HarvestJobExists
|
||||||
|
@ -321,6 +321,8 @@ def harvest_jobs_run(context,data_dict):
|
||||||
if package_dict:
|
if package_dict:
|
||||||
package_index.index_package(package_dict)
|
package_index.index_package(package_dict)
|
||||||
|
|
||||||
|
# resubmit old redis tasks
|
||||||
|
resubmit_jobs()
|
||||||
|
|
||||||
# Check if there are pending harvest jobs
|
# Check if there are pending harvest jobs
|
||||||
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
|
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
|
||||||
|
|
|
@ -22,13 +22,23 @@ USERID = 'guest'
|
||||||
PASSWORD = 'guest'
|
PASSWORD = 'guest'
|
||||||
HOSTNAME = 'localhost'
|
HOSTNAME = 'localhost'
|
||||||
VIRTUAL_HOST = '/'
|
VIRTUAL_HOST = '/'
|
||||||
|
MQ_TYPE = 'ampq'
|
||||||
|
REDIS_PORT = 6379
|
||||||
|
REDIS_DB = 0
|
||||||
|
|
||||||
# settings for AMQP
|
# settings for AMQP
|
||||||
EXCHANGE_TYPE = 'direct'
|
EXCHANGE_TYPE = 'direct'
|
||||||
EXCHANGE_NAME = 'ckan.harvest'
|
EXCHANGE_NAME = 'ckan.harvest'
|
||||||
|
|
||||||
def get_connection():
|
def get_connection():
|
||||||
|
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
|
||||||
|
if backend == 'ampq':
|
||||||
|
return get_connection_ampq()
|
||||||
|
if backend == 'redis':
|
||||||
|
return get_connection_redis()
|
||||||
|
raise Exception('not a valid queue type %s' % backend)
|
||||||
|
|
||||||
|
def get_connection_ampq():
|
||||||
try:
|
try:
|
||||||
port = int(config.get('ckan.harvest.mq.port', PORT))
|
port = int(config.get('ckan.harvest.mq.port', PORT))
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
@ -48,11 +58,45 @@ def get_connection():
|
||||||
|
|
||||||
return pika.BlockingConnection(parameters)
|
return pika.BlockingConnection(parameters)
|
||||||
|
|
||||||
|
def get_connection_redis():
|
||||||
|
import redis
|
||||||
|
return redis.StrictRedis(host=config.get('ckan.harvest.mq.hostname', HOSTNAME),
|
||||||
|
port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)),
|
||||||
|
db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB)))
|
||||||
|
|
||||||
def purge_queues():
|
def purge_queues():
|
||||||
connection = get_connection()
|
connection = get_connection()
|
||||||
channel = connection.channel()
|
if config.get('ckan.harvest.mq.type') == 'ampq':
|
||||||
channel.queue_purge(queue='ckan.harvest.gather')
|
channel = connection.channel()
|
||||||
channel.queue_purge(queue='ckan.harvest.fetch')
|
channel.queue_purge(queue='ckan.harvest.gather')
|
||||||
|
channel.queue_purge(queue='ckan.harvest.fetch')
|
||||||
|
return
|
||||||
|
if config.get('ckan.harvest.mq.type') == 'redis':
|
||||||
|
connection.flushall()
|
||||||
|
|
||||||
|
def resubmit_jobs():
|
||||||
|
if config.get('ckan.harvest.mq.type') != 'redis':
|
||||||
|
return
|
||||||
|
redis = get_connection()
|
||||||
|
harvest_object_pending = redis.keys('harvest_object_id:*')
|
||||||
|
for key in harvest_object_pending:
|
||||||
|
date_of_key = datetime.datetime.strptime(redis.get(key),
|
||||||
|
"%Y-%m-%d %H:%M:%S.%f")
|
||||||
|
if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max
|
||||||
|
redis.rpush('harvest_object_id',
|
||||||
|
json.dumps({'harvest_object_id': key.split(':')[-1]})
|
||||||
|
)
|
||||||
|
redis.delete(key)
|
||||||
|
|
||||||
|
harvest_jobs_pending = redis.keys('harvest_job_id:*')
|
||||||
|
for key in harvest_jobs_pending:
|
||||||
|
date_of_key = datetime.datetime.strptime(redis.get(key),
|
||||||
|
"%Y-%m-%d %H:%M:%S.%f")
|
||||||
|
if (datetime.datetime.now() - date_of_key).seconds > 7200: # 3 hours for a gather
|
||||||
|
redis.rpush('harvest_job_id',
|
||||||
|
json.dumps({'harvest_job_id': key.split(':')[-1]})
|
||||||
|
)
|
||||||
|
redis.delete(key)
|
||||||
|
|
||||||
class Publisher(object):
|
class Publisher(object):
|
||||||
def __init__(self, connection, channel, exchange, routing_key):
|
def __init__(self, connection, channel, exchange, routing_key):
|
||||||
|
@ -71,26 +115,68 @@ class Publisher(object):
|
||||||
def close(self):
|
def close(self):
|
||||||
self.connection.close()
|
self.connection.close()
|
||||||
|
|
||||||
|
class RedisPublisher(object):
|
||||||
|
def __init__(self, redis, routing_key):
|
||||||
|
self.redis = redis ## not used
|
||||||
|
self.routing_key = routing_key
|
||||||
|
def send(self, body, **kw):
|
||||||
|
value = json.dumps(body)
|
||||||
|
# remove if already there
|
||||||
|
if self.routing_key == 'harvest_job_id':
|
||||||
|
self.redis.lrem(self.routing_key, 0, value)
|
||||||
|
self.redis.rpush(self.routing_key, value)
|
||||||
|
def close(self):
|
||||||
|
return
|
||||||
|
|
||||||
def get_publisher(routing_key):
|
def get_publisher(routing_key):
|
||||||
connection = get_connection()
|
connection = get_connection()
|
||||||
channel = connection.channel()
|
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
|
||||||
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
|
if backend == 'ampq':
|
||||||
return Publisher(connection,
|
channel = connection.channel()
|
||||||
channel,
|
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
|
||||||
EXCHANGE_NAME,
|
return Publisher(connection,
|
||||||
routing_key=routing_key)
|
channel,
|
||||||
|
EXCHANGE_NAME,
|
||||||
|
routing_key=routing_key)
|
||||||
|
if backend == 'redis':
|
||||||
|
return RedisPublisher(connection, routing_key)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeMethod(object):
|
||||||
|
''' This is to act like the method returned by ampq'''
|
||||||
|
def __init__(self, message):
|
||||||
|
self.delivery_tag = message
|
||||||
|
|
||||||
|
class RedisConsumer(object):
|
||||||
|
def __init__(self, redis, routing_key):
|
||||||
|
self.redis = redis
|
||||||
|
self.routing_key = routing_key
|
||||||
|
def consume(self, queue):
|
||||||
|
while True:
|
||||||
|
key, body = self.redis.blpop(self.routing_key)
|
||||||
|
self.redis.set(self.persistance_key(body),
|
||||||
|
str(datetime.datetime.now()))
|
||||||
|
yield (FakeMethod(body), self, body)
|
||||||
|
def persistance_key(self, message):
|
||||||
|
message = json.loads(message)
|
||||||
|
return self.routing_key + ':' + message[self.routing_key]
|
||||||
|
def basic_ack(self, message):
|
||||||
|
self.redis.delete(self.persistance_key(message))
|
||||||
|
|
||||||
|
|
||||||
def get_consumer(queue_name, routing_key):
|
def get_consumer(queue_name, routing_key):
|
||||||
|
|
||||||
connection = get_connection()
|
connection = get_connection()
|
||||||
channel = connection.channel()
|
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
|
||||||
|
|
||||||
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
|
if backend == 'ampq':
|
||||||
channel.queue_declare(queue=queue_name, durable=True)
|
channel = connection.channel()
|
||||||
channel.queue_bind(queue=queue_name, exchange=EXCHANGE_NAME, routing_key=routing_key)
|
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
|
||||||
|
channel.queue_declare(queue=queue_name, durable=True)
|
||||||
return channel
|
channel.queue_bind(queue=queue_name, exchange=EXCHANGE_NAME, routing_key=routing_key)
|
||||||
|
return channel
|
||||||
|
if backend == 'redis':
|
||||||
|
return RedisConsumer(connection, routing_key)
|
||||||
|
|
||||||
|
|
||||||
def gather_callback(channel, method, header, body):
|
def gather_callback(channel, method, header, body):
|
||||||
|
|
Loading…
Reference in New Issue