import logging import datetime import json import pika import sqlalchemy from ckan.lib.base import config from ckan.plugins import PluginImplementations from ckan import model from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError from ckanext.harvest.interfaces import IHarvester log = logging.getLogger(__name__) assert not log.disabled __all__ = ['get_gather_publisher', 'get_gather_consumer', 'get_fetch_publisher', 'get_fetch_consumer', 'get_harvester'] PORT = 5672 USERID = 'guest' PASSWORD = 'guest' HOSTNAME = 'localhost' VIRTUAL_HOST = '/' MQ_TYPE = 'amqp' REDIS_PORT = 6379 REDIS_DB = 0 # settings for AMQP EXCHANGE_TYPE = 'direct' EXCHANGE_NAME = 'ckan.harvest' def get_connection(): backend = config.get('ckan.harvest.mq.type', MQ_TYPE) if backend in ('amqp', 'ampq'): # "ampq" is for compat with old typo return get_connection_amqp() if backend == 'redis': return get_connection_redis() raise Exception('not a valid queue type %s' % backend) def get_connection_amqp(): try: port = int(config.get('ckan.harvest.mq.port', PORT)) except ValueError: port = PORT userid = config.get('ckan.harvest.mq.user_id', USERID) password = config.get('ckan.harvest.mq.password', PASSWORD) hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME) virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST) credentials = pika.PlainCredentials(userid, password) parameters = pika.ConnectionParameters(host=hostname, port=port, virtual_host=virtual_host, credentials=credentials, frame_max=10000) log.debug("pika connection using %s" % parameters.__dict__) return pika.BlockingConnection(parameters) def get_connection_redis(): import redis return redis.StrictRedis(host=config.get('ckan.harvest.mq.hostname', HOSTNAME), port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)), db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB))) def get_gather_queue_name(): return 'ckan.harvest.{0}.gather'.format(config.get('ckan.site_id', 'default')) def get_fetch_queue_name(): return 'ckan.harvest.{0}.fetch'.format(config.get('ckan.site_id', 'default')) def get_gather_routing_key(): return 'ckanext-harvest:{0}:harvest_job_id'.format( config.get('ckan.site_id', 'default')) def get_fetch_routing_key(): return 'ckanext-harvest:{0}:harvest_object_id'.format( config.get('ckan.site_id', 'default')) def purge_queues(): backend = config.get('ckan.harvest.mq.type', MQ_TYPE) connection = get_connection() if backend in ('amqp', 'ampq'): channel = connection.channel() channel.queue_purge(queue=get_gather_queue_name()) log.info('AMQP queue purged: %s', get_gather_queue_name()) channel.queue_purge(queue=get_fetch_queue_name()) log.info('AMQP queue purged: %s', get_fetch_queue_name()) elif backend == 'redis': get_gather_consumer().queue_purge() log.info('Redis gather queue purged') get_fetch_consumer().queue_purge() log.info('Redis fetch queue purged') def resubmit_jobs(): ''' Examines the fetch and gather queues for items that are suspiciously old. These are removed from the queues and placed back on them afresh, to ensure the fetch & gather consumers are triggered to process it. ''' if config.get('ckan.harvest.mq.type') != 'redis': return redis = get_connection() # fetch queue harvest_object_pending = redis.keys(get_fetch_routing_key() + ':*') for key in harvest_object_pending: date_of_key = datetime.datetime.strptime(redis.get(key), "%Y-%m-%d %H:%M:%S.%f") # 3 minutes for fetch and import max if (datetime.datetime.now() - date_of_key).seconds > 180: redis.rpush(get_fetch_routing_key(), json.dumps({'harvest_object_id': key.split(':')[-1]}) ) redis.delete(key) # gather queue harvest_jobs_pending = redis.keys(get_gather_routing_key() + ':*') for key in harvest_jobs_pending: date_of_key = datetime.datetime.strptime(redis.get(key), "%Y-%m-%d %H:%M:%S.%f") # 3 hours for a gather if (datetime.datetime.now() - date_of_key).seconds > 7200: redis.rpush(get_gather_routing_key(), json.dumps({'harvest_job_id': key.split(':')[-1]}) ) redis.delete(key) class Publisher(object): def __init__(self, connection, channel, exchange, routing_key): self.connection = connection self.channel = channel self.exchange = exchange self.routing_key = routing_key def send(self, body, **kw): return self.channel.basic_publish(self.exchange, self.routing_key, json.dumps(body), properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ), **kw) def close(self): self.connection.close() class RedisPublisher(object): def __init__(self, redis, routing_key): self.redis = redis ## not used self.routing_key = routing_key def send(self, body, **kw): value = json.dumps(body) # remove if already there if self.routing_key == get_gather_routing_key(): self.redis.lrem(self.routing_key, 0, value) self.redis.rpush(self.routing_key, value) def close(self): return def get_publisher(routing_key): connection = get_connection() backend = config.get('ckan.harvest.mq.type', MQ_TYPE) if backend in ('amqp', 'ampq'): channel = connection.channel() channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) return Publisher(connection, channel, EXCHANGE_NAME, routing_key=routing_key) if backend == 'redis': return RedisPublisher(connection, routing_key) class FakeMethod(object): ''' This is to act like the method returned by AMQP''' def __init__(self, message): self.delivery_tag = message class RedisConsumer(object): def __init__(self, redis, routing_key): self.redis = redis # Routing keys are constructed with {site-id}:{message-key}, eg: # default:harvest_job_id or default:harvest_object_id self.routing_key = routing_key # Message keys are harvest_job_id for the gather consumer and # harvest_object_id for the fetch consumer self.message_key = routing_key.split(':')[-1] def consume(self, queue): while True: key, body = self.redis.blpop(self.routing_key) self.redis.set(self.persistance_key(body), str(datetime.datetime.now())) yield (FakeMethod(body), self, body) def persistance_key(self, message): # If you change this, make sure to update the script in `queue_purge` message = json.loads(message) return self.routing_key + ':' + message[self.message_key] def basic_ack(self, message): self.redis.delete(self.persistance_key(message)) def queue_purge(self, queue=None): ''' Purge the consumer's queue. The ``queue`` parameter exists only for compatibility and is ignored. ''' # Use a script to make the operation atomic lua_code = b''' local routing_key = KEYS[1] local message_key = ARGV[1] local count = 0 while true do local s = redis.call("lpop", routing_key) if s == false then break end local value = cjson.decode(s) local id = value[message_key] local persistance_key = routing_key .. ":" .. id redis.call("del", persistance_key) count = count + 1 end return count ''' script = self.redis.register_script(lua_code) return script(keys=[self.routing_key], args=[self.message_key]) def basic_get(self, queue): body = self.redis.lpop(self.routing_key) return (FakeMethod(body), self, body) def get_consumer(queue_name, routing_key): connection = get_connection() backend = config.get('ckan.harvest.mq.type', MQ_TYPE) if backend in ('amqp', 'ampq'): channel = connection.channel() channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True) channel.queue_declare(queue=queue_name, durable=True) channel.queue_bind(queue=queue_name, exchange=EXCHANGE_NAME, routing_key=routing_key) return channel if backend == 'redis': return RedisConsumer(connection, routing_key) def gather_callback(channel, method, header, body): try: id = json.loads(body)['harvest_job_id'] log.debug('Received harvest job id: %s' % id) except KeyError: log.error('No harvest job id received') channel.basic_ack(method.delivery_tag) return False # Get a publisher for the fetch queue publisher = get_fetch_publisher() try: job = HarvestJob.get(id) except sqlalchemy.exc.OperationalError: # Occasionally we see: sqlalchemy.exc.OperationalError # "SSL connection has been closed unexpectedly" log.exception('Connection Error during gather of job %s', id) # By not sending the ack, it will be retried later. # Try to clear the issue with a remove. model.Session.remove() return if not job: log.error('Harvest job does not exist: %s' % id) channel.basic_ack(method.delivery_tag) return False # Send the harvest job to the plugins that implement # the Harvester interface, only if the source type # matches harvester = get_harvester(job.source.type) if harvester: try: harvest_object_ids = gather_stage(harvester, job) except (Exception, KeyboardInterrupt): channel.basic_ack(method.delivery_tag) raise if not isinstance(harvest_object_ids, list): log.error('Gather stage failed') publisher.close() channel.basic_ack(method.delivery_tag) return False if len(harvest_object_ids) == 0: log.info('No harvest objects to fetch') publisher.close() channel.basic_ack(method.delivery_tag) return False log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format( len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:])) for id in harvest_object_ids: # Send the id to the fetch queue publisher.send({'harvest_object_id':id}) log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids))) else: # This can occur if you: # * remove a harvester and it still has sources that are then refreshed # * add a new harvester and restart CKAN but not the gather queue. msg = 'System error - No harvester could be found for source type %s' % job.source.type err = HarvestGatherError(message=msg,job=job) err.save() log.error(msg) model.Session.remove() publisher.close() channel.basic_ack(method.delivery_tag) def get_harvester(harvest_source_type): for harvester in PluginImplementations(IHarvester): if harvester.info()['name'] == harvest_source_type: return harvester def gather_stage(harvester, job): '''Calls the harvester's gather_stage, returning harvest object ids, with some error handling. This is split off from gather_callback so that tests can call it without dealing with queue stuff. ''' job.gather_started = datetime.datetime.utcnow() try: harvest_object_ids = harvester.gather_stage(job) except (Exception, KeyboardInterrupt): harvest_objects = model.Session.query(HarvestObject).filter_by( harvest_job_id=job.id ) for harvest_object in harvest_objects: model.Session.delete(harvest_object) model.Session.commit() raise finally: job.gather_finished = datetime.datetime.utcnow() job.save() return harvest_object_ids def fetch_callback(channel, method, header, body): try: id = json.loads(body)['harvest_object_id'] log.info('Received harvest object id: %s' % id) except KeyError: log.error('No harvest object id received') channel.basic_ack(method.delivery_tag) return False try: obj = HarvestObject.get(id) except sqlalchemy.exc.OperationalError: # Occasionally we see: sqlalchemy.exc.OperationalError # "SSL connection has been closed unexpectedly" log.exception('Connection Error during fetch of job %s', id) # By not sending the ack, it will be retried later. # Try to clear the issue with a remove. model.Session.remove() return if not obj: log.error('Harvest object does not exist: %s' % id) channel.basic_ack(method.delivery_tag) return False obj.retry_times += 1 obj.save() if obj.retry_times >= 5: obj.state = "ERROR" obj.save() log.error('Too many consecutive retries for object {0}'.format(obj.id)) channel.basic_ack(method.delivery_tag) return False # Send the harvest object to the plugins that implement # the Harvester interface, only if the source type # matches for harvester in PluginImplementations(IHarvester): if harvester.info()['name'] == obj.source.type: fetch_and_import_stages(harvester, obj) model.Session.remove() channel.basic_ack(method.delivery_tag) def fetch_and_import_stages(harvester, obj): obj.fetch_started = datetime.datetime.utcnow() obj.state = "FETCH" obj.save() success_fetch = harvester.fetch_stage(obj) obj.fetch_finished = datetime.datetime.utcnow() obj.save() if success_fetch is True: # If no errors where found, call the import method obj.import_started = datetime.datetime.utcnow() obj.state = "IMPORT" obj.save() success_import = harvester.import_stage(obj) obj.import_finished = datetime.datetime.utcnow() if success_import: obj.state = "COMPLETE" if success_import is 'unchanged': obj.report_status = 'not modified' obj.save() return else: obj.state = "ERROR" obj.save() elif success_fetch == 'unchanged': obj.state = 'COMPLETE' obj.report_status = 'not modified' obj.save() return else: obj.state = "ERROR" obj.save() if obj.state == 'ERROR': obj.report_status = 'errored' elif obj.current == False: obj.report_status = 'deleted' elif len(model.Session.query(HarvestObject) .filter_by(package_id = obj.package_id) .limit(2) .all()) == 2: obj.report_status = 'updated' else: obj.report_status = 'added' obj.save() def get_gather_consumer(): gather_routing_key = get_gather_routing_key() consumer = get_consumer(get_gather_queue_name(), gather_routing_key) log.debug('Gather queue consumer registered') return consumer def get_fetch_consumer(): fetch_routing_key = get_fetch_routing_key() consumer = get_consumer(get_fetch_queue_name(), fetch_routing_key) log.debug('Fetch queue consumer registered') return consumer def get_gather_publisher(): gather_routing_key = get_gather_routing_key() return get_publisher(gather_routing_key) def get_fetch_publisher(): fetch_routing_key = get_fetch_routing_key() return get_publisher(fetch_routing_key)