2011-04-06 13:45:00 +02:00
|
|
|
import logging
|
2011-04-07 17:59:11 +02:00
|
|
|
import datetime
|
2011-04-06 13:45:00 +02:00
|
|
|
|
|
|
|
from carrot.connection import BrokerConnection
|
|
|
|
from carrot.messaging import Publisher
|
|
|
|
from carrot.messaging import Consumer
|
|
|
|
|
|
|
|
from ckan.lib.base import config
|
|
|
|
from ckan.plugins import PluginImplementations
|
|
|
|
|
|
|
|
from ckanext.harvest.model import HarvestJob, HarvestObject
|
|
|
|
from ckanext.harvest.interfaces import IHarvester
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
__all__ = ['get_gather_publisher', 'get_gather_consumer', \
|
|
|
|
'get_fetch_publisher', 'get_fetch_consumer']
|
|
|
|
|
2011-04-07 17:59:11 +02:00
|
|
|
PORT = 5672
|
2011-04-06 13:45:00 +02:00
|
|
|
USERID = 'guest'
|
|
|
|
PASSWORD = 'guest'
|
|
|
|
HOSTNAME = 'localhost'
|
|
|
|
VIRTUAL_HOST = '/'
|
|
|
|
|
|
|
|
# settings for AMQP
|
|
|
|
EXCHANGE_TYPE = 'direct'
|
|
|
|
EXCHANGE_NAME = 'ckan.harvest'
|
|
|
|
|
|
|
|
def get_carrot_connection():
|
|
|
|
backend = config.get('ckan.harvest.mq.library', 'pyamqplib')
|
|
|
|
log.info("Carrot connnection using %s backend" % backend)
|
|
|
|
try:
|
|
|
|
port = int(config.get('ckan.harvest.mq.port', PORT))
|
|
|
|
except ValueError:
|
|
|
|
port = PORT
|
|
|
|
userid = config.get('ckan.harvest.mq.user_id', USERID)
|
|
|
|
password = config.get('ckan.harvest.mq.password', PASSWORD)
|
|
|
|
hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME)
|
|
|
|
virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST)
|
2011-04-07 17:59:11 +02:00
|
|
|
|
2011-04-06 13:45:00 +02:00
|
|
|
backend_cls = 'carrot.backends.%s.Backend' % backend
|
|
|
|
return BrokerConnection(hostname=hostname, port=port,
|
|
|
|
userid=userid, password=password,
|
|
|
|
virtual_host=virtual_host,
|
|
|
|
backend_cls=backend_cls)
|
|
|
|
|
|
|
|
def get_publisher(routing_key):
|
|
|
|
return Publisher(connection=get_carrot_connection(),
|
|
|
|
exchange=EXCHANGE_NAME,
|
|
|
|
exchange_type=EXCHANGE_TYPE,
|
|
|
|
routing_key=routing_key)
|
|
|
|
|
|
|
|
def get_consumer(queue_name, routing_key):
|
|
|
|
return Consumer(connection=get_carrot_connection(),
|
2011-04-07 17:59:11 +02:00
|
|
|
queue=queue_name,
|
2011-04-06 13:45:00 +02:00
|
|
|
routing_key=routing_key,
|
|
|
|
exchange=EXCHANGE_NAME,
|
|
|
|
exchange_type=EXCHANGE_TYPE,
|
|
|
|
durable=True, auto_delete=False)
|
|
|
|
|
2011-04-07 17:59:11 +02:00
|
|
|
|
2011-04-06 13:45:00 +02:00
|
|
|
def gather_callback(message_data,message):
|
|
|
|
try:
|
|
|
|
id = message_data['harvest_job_id']
|
2011-04-07 17:59:11 +02:00
|
|
|
log.debug('Received harvest job id: %s' % id)
|
2011-04-06 13:45:00 +02:00
|
|
|
|
|
|
|
# Get a publisher for the fetch queue
|
|
|
|
publisher = get_fetch_publisher()
|
|
|
|
|
|
|
|
try:
|
|
|
|
job = HarvestJob.get(id)
|
2011-04-07 17:59:11 +02:00
|
|
|
except:
|
|
|
|
log.error('Harvest job does not exist: %s' % id)
|
|
|
|
else:
|
2011-04-06 13:45:00 +02:00
|
|
|
# Send the harvest job to the plugins that implement
|
|
|
|
# the Harvester interface, only if the source type
|
|
|
|
# matches
|
|
|
|
for harvester in PluginImplementations(IHarvester):
|
|
|
|
if harvester.get_type() == job.source.type:
|
|
|
|
|
|
|
|
# Get a list of harvest object ids from the plugin
|
|
|
|
harvest_object_ids = harvester.gather_stage(job)
|
2011-04-07 17:59:11 +02:00
|
|
|
log.debug('Received from plugin''s gather_stage: %r' % harvest_object_ids)
|
|
|
|
if harvest_object_ids and len(harvest_object_ids) > 0:
|
2011-04-06 13:45:00 +02:00
|
|
|
for id in harvest_object_ids:
|
|
|
|
# Send the id to the fetch queue
|
|
|
|
publisher.send({'harvest_object_id':id})
|
2011-04-07 17:59:11 +02:00
|
|
|
log.debug('Sent object %s to the fetch queue' % id)
|
2011-04-06 13:45:00 +02:00
|
|
|
|
2011-04-07 17:59:11 +02:00
|
|
|
job.status = u'Finished'
|
|
|
|
job.save()
|
2011-04-06 13:45:00 +02:00
|
|
|
|
|
|
|
finally:
|
|
|
|
publisher.close()
|
|
|
|
|
|
|
|
except KeyError:
|
|
|
|
log.error('No harvest job id received')
|
|
|
|
finally:
|
|
|
|
message.ack()
|
|
|
|
|
|
|
|
|
|
|
|
def fetch_callback(message_data,message):
|
|
|
|
try:
|
|
|
|
id = message_data['harvest_object_id']
|
|
|
|
log.info('Received harvest object id: %s' % id)
|
|
|
|
|
|
|
|
try:
|
|
|
|
obj = HarvestObject.get(id)
|
2011-04-07 17:59:11 +02:00
|
|
|
except:
|
|
|
|
log.error('Harvest object does not exist: %s' % id)
|
|
|
|
else:
|
2011-04-06 13:45:00 +02:00
|
|
|
# Send the harvest object to the plugins that implement
|
|
|
|
# the Harvester interface, only if the source type
|
|
|
|
# matches
|
|
|
|
for harvester in PluginImplementations(IHarvester):
|
|
|
|
if harvester.get_type() == obj.source.type:
|
|
|
|
|
2011-04-07 17:59:11 +02:00
|
|
|
# See if the plugin can fetch the harvest object
|
|
|
|
obj.fetch_started = datetime.datetime.now()
|
2011-04-06 13:45:00 +02:00
|
|
|
success = harvester.fetch_stage(obj)
|
2011-04-07 17:59:11 +02:00
|
|
|
obj.fetch_finished = datetime.datetime.now()
|
|
|
|
obj.save()
|
|
|
|
#TODO: retry times?
|
2011-04-06 13:45:00 +02:00
|
|
|
if success:
|
|
|
|
# If no errors where found, call the import method
|
|
|
|
harvester.import_stage(obj)
|
2011-04-07 17:59:11 +02:00
|
|
|
|
|
|
|
|
2011-04-06 13:45:00 +02:00
|
|
|
|
|
|
|
except KeyError:
|
|
|
|
log.error('No harvest object id received')
|
|
|
|
finally:
|
|
|
|
message.ack()
|
|
|
|
|
|
|
|
def get_gather_consumer():
|
|
|
|
consumer = get_consumer('ckan.harvest.gather','harvest_job_id')
|
|
|
|
consumer.register_callback(gather_callback)
|
|
|
|
return consumer
|
|
|
|
|
|
|
|
def get_fetch_consumer():
|
|
|
|
consumer = get_consumer('ckan.harvert.fetch','harvest_object_id')
|
|
|
|
consumer.register_callback(fetch_callback)
|
|
|
|
return consumer
|
|
|
|
|
|
|
|
def get_gather_publisher():
|
|
|
|
return get_publisher('harvest_job_id')
|
|
|
|
|
|
|
|
def get_fetch_publisher():
|
|
|
|
return get_publisher('harvest_object_id')
|
|
|
|
|
2011-04-07 17:59:11 +02:00
|
|
|
# Get a publisher for the fetch queue
|
|
|
|
#fetch_publisher = get_fetch_publisher()
|
|
|
|
|