harvester-d4science/ckanext/harvest/queue.py

240 lines
8.2 KiB
Python

import logging
import datetime
import json
import pika
from ckan.lib.base import config
from ckan.plugins import PluginImplementations
from ckan import model
from ckanext.harvest.model import HarvestJob, HarvestObject,HarvestGatherError
from ckanext.harvest.interfaces import IHarvester
log = logging.getLogger(__name__)
assert not log.disabled
__all__ = ['get_gather_publisher', 'get_gather_consumer', \
'get_fetch_publisher', 'get_fetch_consumer']
PORT = 5672
USERID = 'guest'
PASSWORD = 'guest'
HOSTNAME = 'localhost'
VIRTUAL_HOST = '/'
# settings for AMQP
EXCHANGE_TYPE = 'direct'
EXCHANGE_NAME = 'ckan.harvest'
def get_connection():
try:
port = int(config.get('ckan.harvest.mq.port', PORT))
except ValueError:
port = PORT
userid = config.get('ckan.harvest.mq.user_id', USERID)
password = config.get('ckan.harvest.mq.password', PASSWORD)
hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME)
virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST)
credentials = pika.PlainCredentials(userid, password)
parameters = pika.ConnectionParameters(host=hostname,
port=port,
virtual_host=virtual_host,
credentials=credentials,
frame_max=10000)
log.debug("pika connection using %s" % parameters.__dict__)
return pika.BlockingConnection(parameters)
def purge_queues():
connection = get_connection()
channel = connection.channel()
channel.queue_purge(queue='ckan.harvest.gather')
channel.queue_purge(queue='ckan.harvest.fetch')
class Publisher(object):
def __init__(self, connection, channel, exchange, routing_key):
self.connection = connection
self.channel = channel
self.exchange = exchange
self.routing_key = routing_key
def send(self, body, **kw):
return self.channel.basic_publish(self.exchange,
self.routing_key,
json.dumps(body),
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
),
**kw)
def close(self):
self.connection.close()
def get_publisher(routing_key):
connection = get_connection()
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
return Publisher(connection,
channel,
EXCHANGE_NAME,
routing_key=routing_key)
def get_consumer(queue_name, routing_key):
connection = get_connection()
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, durable=True)
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(queue=queue_name, exchange=EXCHANGE_NAME, routing_key=routing_key)
return channel
def gather_callback(channel, method, header, body):
try:
id = json.loads(body)['harvest_job_id']
log.debug('Received harvest job id: %s' % id)
# Get a publisher for the fetch queue
publisher = get_fetch_publisher()
try:
job = HarvestJob.get(id)
except:
log.error('Harvest job does not exist: %s' % id)
else:
# Send the harvest job to the plugins that implement
# the Harvester interface, only if the source type
# matches
harvester_found = False
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == job.source.type:
harvester_found = True
# Get a list of harvest object ids from the plugin
job.gather_started = datetime.datetime.now()
try:
harvest_object_ids = harvester.gather_stage(job)
except Exception, e:
log.error('Gather stage failed unexpectedly: %s' % e)
job.status = 'Errored'
job.save()
continue
job.gather_finished = datetime.datetime.now()
job.save()
log.debug('Received from plugin''s gather_stage: %r' % harvest_object_ids)
if harvest_object_ids and len(harvest_object_ids) > 0:
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id':id})
log.debug('Sent object %s to the fetch queue' % id)
if not harvester_found:
msg = 'No harvester could be found for source type %s' % job.source.type
err = HarvestGatherError(message=msg,job=job)
err.save()
log.error(msg)
finally:
publisher.close()
except KeyError:
log.error('No harvest job id received')
finally:
model.Session.remove()
channel.basic_ack(method.delivery_tag)
def fetch_callback(channel, method, header, body):
try:
id = json.loads(body)['harvest_object_id']
log.info('Received harvest object id: %s' % id)
except KeyError:
log.error('No harvest object id received')
channel.basic_ack(method.delivery_tag)
return False
obj = HarvestObject.get(id)
if not obj:
log.error('Harvest object does not exist: %s' % id)
channel.basic_ack(method.delivery_tag)
return False
obj.retry_times += 1
obj.save()
if obj.retry_times >= 5:
obj.state = "ERROR"
obj.save()
log.error('Too many consecutive retries for object {0}'.format(obj.id))
channel.basic_ack(method.delivery_tag)
return False
# Send the harvest object to the plugins that implement
# the Harvester interface, only if the source type
# matches
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == obj.source.type:
# See if the plugin can fetch the harvest object
obj.fetch_started = datetime.datetime.now()
obj.state = "FETCH"
obj.save()
success_fetch = harvester.fetch_stage(obj)
obj.fetch_finished = datetime.datetime.now()
obj.save()
if success_fetch:
# If no errors where found, call the import method
obj.import_started = datetime.datetime.now()
obj.state = "IMPORT"
obj.save()
success_import = harvester.import_stage(obj)
obj.import_finished = datetime.datetime.now()
if success_import:
obj.state = "COMPLETE"
else:
obj.state = "ERROR"
obj.save()
else:
obj.state = "ERROR"
obj.save()
if obj.report_status:
continue
if obj.state == 'ERROR':
obj.report_status = 'errored'
elif obj.current == False:
obj.report_status = 'deleted'
elif len(model.Session.query(HarvestObject)
.filter_by(package_id = obj.package_id)
.limit(2)
.all()) == 2:
obj.report_status = 'updated'
else:
obj.report_status = 'added'
obj.save()
model.Session.remove()
channel.basic_ack(method.delivery_tag)
def get_gather_consumer():
consumer = get_consumer('ckan.harvest.gather','harvest_job_id')
log.debug('Gather queue consumer registered')
return consumer
def get_fetch_consumer():
consumer = get_consumer('ckan.harvest.fetch','harvest_object_id')
log.debug('Fetch queue consumer registered')
return consumer
def get_gather_publisher():
return get_publisher('harvest_job_id')
def get_fetch_publisher():
return get_publisher('harvest_object_id')
# Get a publisher for the fetch queue
#fetch_publisher = get_fetch_publisher()