harvester-d4science/ckanext/harvest/queue.py

366 lines
13 KiB
Python
Raw Normal View History

import logging
import datetime
2012-10-24 01:34:32 +02:00
import json
2012-10-24 01:34:32 +02:00
import pika
from ckan.lib.base import config
from ckan.plugins import PluginImplementations
2012-12-18 00:50:26 +01:00
from ckan import model
from ckanext.harvest.model import HarvestJob, HarvestObject,HarvestGatherError
from ckanext.harvest.interfaces import IHarvester
log = logging.getLogger(__name__)
2012-04-10 21:53:29 +02:00
assert not log.disabled
__all__ = ['get_gather_publisher', 'get_gather_consumer', \
'get_fetch_publisher', 'get_fetch_consumer']
PORT = 5672
USERID = 'guest'
PASSWORD = 'guest'
HOSTNAME = 'localhost'
VIRTUAL_HOST = '/'
2013-09-19 11:43:03 +02:00
MQ_TYPE = 'amqp'
2013-04-21 18:04:57 +02:00
REDIS_PORT = 6379
REDIS_DB = 0
# settings for AMQP
EXCHANGE_TYPE = 'direct'
EXCHANGE_NAME = 'ckan.harvest'
def get_connection():
2013-04-21 18:04:57 +02:00
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
2013-09-19 11:43:03 +02:00
if backend in ('amqp', 'ampq'): # "ampq" is for compat with old typo
return get_connection_amqp()
2013-04-21 18:04:57 +02:00
if backend == 'redis':
return get_connection_redis()
raise Exception('not a valid queue type %s' % backend)
2013-09-19 11:43:03 +02:00
def get_connection_amqp():
try:
port = int(config.get('ckan.harvest.mq.port', PORT))
except ValueError:
port = PORT
userid = config.get('ckan.harvest.mq.user_id', USERID)
password = config.get('ckan.harvest.mq.password', PASSWORD)
hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME)
virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST)
2012-10-24 01:34:32 +02:00
credentials = pika.PlainCredentials(userid, password)
parameters = pika.ConnectionParameters(host=hostname,
port=port,
virtual_host=virtual_host,
credentials=credentials,
frame_max=10000)
log.debug("pika connection using %s" % parameters.__dict__)
2012-10-24 01:34:32 +02:00
return pika.BlockingConnection(parameters)
2013-04-21 18:04:57 +02:00
def get_connection_redis():
import redis
return redis.StrictRedis(host=config.get('ckan.harvest.mq.hostname', HOSTNAME),
port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)),
db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB)))
def get_gather_queue_name():
return 'ckan.harvest.{0}.gather'.format(config.get('ckan.site_id',
'default'))
def get_fetch_queue_name():
return 'ckan.harvest.{0}.fetch'.format(config.get('ckan.site_id',
'default'))
2012-11-07 10:51:25 +01:00
def purge_queues():
2013-10-16 13:59:23 +02:00
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
2012-11-07 10:51:25 +01:00
connection = get_connection()
2013-10-16 13:59:23 +02:00
if backend in ('amqp', 'ampq'):
2013-04-21 18:04:57 +02:00
channel = connection.channel()
channel.queue_purge(queue=get_gather_queue_name())
channel.queue_purge(queue=get_fetch_queue_name())
2013-04-21 18:04:57 +02:00
return
2013-10-16 13:59:23 +02:00
if backend == 'redis':
2013-04-21 18:04:57 +02:00
connection.flushall()
def resubmit_jobs():
if config.get('ckan.harvest.mq.type') != 'redis':
return
redis = get_connection()
harvest_object_pending = redis.keys('harvest_object_id:*')
for key in harvest_object_pending:
date_of_key = datetime.datetime.strptime(redis.get(key),
"%Y-%m-%d %H:%M:%S.%f")
if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max
redis.rpush('harvest_object_id',
json.dumps({'harvest_object_id': key.split(':')[-1]})
)
redis.delete(key)
harvest_jobs_pending = redis.keys('harvest_job_id:*')
for key in harvest_jobs_pending:
date_of_key = datetime.datetime.strptime(redis.get(key),
"%Y-%m-%d %H:%M:%S.%f")
if (datetime.datetime.now() - date_of_key).seconds > 7200: # 3 hours for a gather
redis.rpush('harvest_job_id',
json.dumps({'harvest_job_id': key.split(':')[-1]})
)
redis.delete(key)
2012-10-24 01:34:32 +02:00
class Publisher(object):
def __init__(self, connection, channel, exchange, routing_key):
self.connection = connection
self.channel = channel
self.exchange = exchange
self.routing_key = routing_key
def send(self, body, **kw):
return self.channel.basic_publish(self.exchange,
self.routing_key,
json.dumps(body),
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
),
**kw)
def close(self):
self.connection.close()
2013-04-21 18:04:57 +02:00
class RedisPublisher(object):
def __init__(self, redis, routing_key):
self.redis = redis ## not used
self.routing_key = routing_key
def send(self, body, **kw):
value = json.dumps(body)
# remove if already there
if self.routing_key == 'harvest_job_id':
self.redis.lrem(self.routing_key, 0, value)
self.redis.rpush(self.routing_key, value)
2013-04-21 18:04:57 +02:00
def close(self):
return
def get_publisher(routing_key):
connection = get_connection()
2013-04-21 18:04:57 +02:00
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
2013-09-19 11:43:03 +02:00
if backend in ('amqp', 'ampq'):
2013-04-21 18:04:57 +02:00
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
return Publisher(connection,
channel,
EXCHANGE_NAME,
routing_key=routing_key)
if backend == 'redis':
return RedisPublisher(connection, routing_key)
class FakeMethod(object):
2013-09-19 11:43:03 +02:00
''' This is to act like the method returned by AMQP'''
2013-04-21 18:04:57 +02:00
def __init__(self, message):
self.delivery_tag = message
class RedisConsumer(object):
def __init__(self, redis, routing_key):
self.redis = redis
self.routing_key = routing_key
def consume(self, queue):
while True:
key, body = self.redis.blpop(self.routing_key)
self.redis.set(self.persistance_key(body),
str(datetime.datetime.now()))
yield (FakeMethod(body), self, body)
def persistance_key(self, message):
message = json.loads(message)
return self.routing_key + ':' + message[self.routing_key]
def basic_ack(self, message):
self.redis.delete(self.persistance_key(message))
def queue_purge(self, queue):
self.redis.flushall()
def basic_get(self, queue):
body = self.redis.lpop(self.routing_key)
return (FakeMethod(body), self, body)
2012-10-24 01:34:32 +02:00
def get_consumer(queue_name, routing_key):
2012-10-24 01:34:32 +02:00
connection = get_connection()
2013-04-21 18:04:57 +02:00
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
2013-09-19 11:43:03 +02:00
if backend in ('amqp', 'ampq'):
2013-04-21 18:04:57 +02:00
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(queue=queue_name, exchange=EXCHANGE_NAME, routing_key=routing_key)
return channel
if backend == 'redis':
return RedisConsumer(connection, routing_key)
2012-10-24 01:34:32 +02:00
def gather_callback(channel, method, header, body):
try:
2012-10-24 01:34:32 +02:00
id = json.loads(body)['harvest_job_id']
log.debug('Received harvest job id: %s' % id)
except KeyError:
log.error('No harvest job id received')
2012-10-24 01:34:32 +02:00
channel.basic_ack(method.delivery_tag)
return False
# Get a publisher for the fetch queue
publisher = get_fetch_publisher()
job = HarvestJob.get(id)
if not job:
log.error('Harvest job does not exist: %s' % id)
channel.basic_ack(method.delivery_tag)
return False
# Send the harvest job to the plugins that implement
# the Harvester interface, only if the source type
# matches
harvester_found = False
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == job.source.type:
harvester_found = True
# Get a list of harvest object ids from the plugin
job.gather_started = datetime.datetime.utcnow()
try:
harvest_object_ids = harvester.gather_stage(job)
except (Exception, KeyboardInterrupt):
channel.basic_ack(method.delivery_tag)
harvest_objects = model.Session.query(HarvestObject).filter_by(
harvest_job_id=job.id
)
for harvest_object in harvest_objects:
model.Session.delete(harvest_object)
model.Session.commit()
raise
finally:
job.gather_finished = datetime.datetime.utcnow()
job.save()
if not isinstance(harvest_object_ids, list):
log.error('Gather stage failed')
publisher.close()
channel.basic_ack(method.delivery_tag)
return False
if len(harvest_object_ids) == 0:
log.info('No harvest objects to fetch')
publisher.close()
channel.basic_ack(method.delivery_tag)
return False
log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format(
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id':id})
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))
if not harvester_found:
msg = 'No harvester could be found for source type %s' % job.source.type
err = HarvestGatherError(message=msg,job=job)
err.save()
log.error(msg)
model.Session.remove()
publisher.close()
channel.basic_ack(method.delivery_tag)
2012-10-24 01:34:32 +02:00
def fetch_callback(channel, method, header, body):
try:
2012-10-24 01:34:32 +02:00
id = json.loads(body)['harvest_object_id']
log.info('Received harvest object id: %s' % id)
except KeyError:
log.error('No harvest object id received')
2012-10-24 01:34:32 +02:00
channel.basic_ack(method.delivery_tag)
return False
obj = HarvestObject.get(id)
if not obj:
log.error('Harvest object does not exist: %s' % id)
channel.basic_ack(method.delivery_tag)
return False
obj.retry_times += 1
obj.save()
if obj.retry_times >= 5:
2013-01-10 11:48:48 +01:00
obj.state = "ERROR"
obj.save()
log.error('Too many consecutive retries for object {0}'.format(obj.id))
channel.basic_ack(method.delivery_tag)
return False
# Send the harvest object to the plugins that implement
# the Harvester interface, only if the source type
# matches
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == obj.source.type:
fetch_and_import_stages(harvester, obj)
model.Session.remove()
channel.basic_ack(method.delivery_tag)
def fetch_and_import_stages(harvester, obj):
obj.fetch_started = datetime.datetime.utcnow()
obj.state = "FETCH"
obj.save()
success_fetch = harvester.fetch_stage(obj)
obj.fetch_finished = datetime.datetime.utcnow()
obj.save()
if success_fetch:
# If no errors where found, call the import method
obj.import_started = datetime.datetime.utcnow()
obj.state = "IMPORT"
obj.save()
success_import = harvester.import_stage(obj)
obj.import_finished = datetime.datetime.utcnow()
if success_import:
obj.state = "COMPLETE"
else:
obj.state = "ERROR"
obj.save()
else:
obj.state = "ERROR"
obj.save()
if obj.report_status:
2013-03-19 02:21:20 +01:00
return
if obj.state == 'ERROR':
obj.report_status = 'errored'
elif obj.current == False:
obj.report_status = 'deleted'
elif len(model.Session.query(HarvestObject)
.filter_by(package_id = obj.package_id)
.limit(2)
.all()) == 2:
obj.report_status = 'updated'
else:
obj.report_status = 'added'
obj.save()
def get_gather_consumer():
consumer = get_consumer(get_gather_queue_name(), 'harvest_job_id')
log.debug('Gather queue consumer registered')
return consumer
def get_fetch_consumer():
consumer = get_consumer(get_fetch_queue_name(), 'harvest_object_id')
log.debug('Fetch queue consumer registered')
return consumer
def get_gather_publisher():
return get_publisher('harvest_job_id')
def get_fetch_publisher():
return get_publisher('harvest_object_id')
# Get a publisher for the fetch queue
#fetch_publisher = get_fetch_publisher()