[refactoring] Add code to handle queuing and the new IHarvester interface. Add

new commands in the CLI to start the queue consumers and fire the harvesting
process.
This commit is contained in:
Adrià Mercader 2011-04-06 12:45:00 +01:00
parent 4023bb7222
commit 56d5acc867
4 changed files with 279 additions and 8 deletions

View File

@ -1,12 +1,14 @@
import sys
import re
#import logging
from pprint import pprint
from ckan.lib.cli import CkanCommand
from ckan.model import repo
#from ckanext.harvest.model import HarvestSource, HarvestingJob, HarvestedDocument
#from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.lib import *
from ckanext.harvest.queue import get_gather_consumer, get_fetch_consumer
#log = logging.getLogger(__name__)
#log.setLevel(logging.DEBUG)
class Harvester(CkanCommand):
'''Harvests remotely mastered metadata
@ -32,7 +34,13 @@ class Harvester(CkanCommand):
harvester run
- runs harvest jobs
harvester gather_consumer
- starts the consumer for the gathering queue
harvester fetch_consumer
- starts the consumer for the fetching queue
harvester extents
- creates or updates the extent geometry column for packages with
a bounding box defined in extras
@ -52,12 +60,14 @@ class Harvester(CkanCommand):
def command(self):
self._load_config()
print ""
print ''
if len(self.args) == 0:
self.parser.print_usage()
sys.exit(1)
cmd = self.args[0]
#log.info(cmd)
#sys.exit(0)
if cmd == 'source':
self.create_harvest_source()
elif cmd == "rmsource":
@ -74,6 +84,12 @@ class Harvester(CkanCommand):
self.run_harvester()
elif cmd == 'extents':
self.update_extents()
elif cmd == 'gather_consumer':
consumer = get_gather_consumer()
consumer.wait()
elif cmd == 'fetch_consumer':
consumer = get_fetch_consumer()
consumer.wait()
else:
print 'Command %s not recognized' % cmd
@ -168,8 +184,16 @@ class Harvester(CkanCommand):
self.print_harvest_jobs(jobs)
self.print_there_are(what='harvest job', sequence=jobs)
#TODO: Move to lib and implement the queue system
def run_harvester(self, *args, **kwds):
jobs = run_harvest_jobs()
print 'Sent %s jobs to the gather queue' % len(jobs)
sys.exit(0)
#TODO: move all this stuff to ckanext-inspire
from pylons.i18n.translation import _get_translator
import pylons
pylons.translator._push_object(_get_translator(pylons.config.get('lang')))
@ -228,7 +252,7 @@ class Harvester(CkanCommand):
def print_harvest_sources(self, sources):
if sources:
print ""
print ''
for source in sources:
self.print_harvest_source(source)

View File

@ -0,0 +1,66 @@
from ckan.plugins.interfaces import Interface
class IHarvester(Interface):
'''
Common harvesting interface
'''
def get_type(self):
'''
Plugins must provide this method, which will return a string with the
Harvester type implemented by the plugin (e.g ``CSW``,``INSPIRE``, etc).
This will ensure that they only receive Harvest Jobs and Objects
relevant to them.
returns: A string with the harvester type
'''
def gather_stage(self, harvest_job):
'''
The gather stage will recieve a HarvestJob object and will be
responsible for:
- gathering all the necessary objects to fetch on a later.
stage (e.g. for a CSW server, perform a GetRecords request)
- creating the necessary HarvestObjects in the database.
- creating and storing any suitable HarvestGatherErrors that may
occur.
- returning a list with all the ids of the created HarvestObjects.
:param harvest_job: HarvestJob object
:returns: A list of HarvestObject ids
'''
def fetch_stage(self, harvest_object):
'''
The fetch stage will receive a HarvestObject object and will be
responsible for:
- getting the contents of the remote object (e.g. for a CSW server,
perform a GetRecordById request).
- saving the content in the provided HarvestObject.
- update the fetch_started, fetch_finished and retry_times as
necessary.
- creating and storing any suitable HarvestObjectErrors that may
occur.
- returning True if everything went as expected, False otherwise.
:param harvest_object: HarvestObject object
:returns: True if everything went right, False if errors were found
'''
def import_stage(self, harvest_object):
'''
The import stage will receive a HarvestObject object and will be
responsible for:
- performing any necessary action with the fetched object (e.g
create a CKAN package).
- creatingg the HarvestObject - Package relation (if necessary)
- creating and storing any suitable HarvestObjectErrors that may
occur.
- returning True if everything went as expected, False otherwisie.
:param harvest_object: HarvestObject object
:returns: True if everything went right, False if errors were found
'''

View File

@ -2,8 +2,8 @@ from ckan.model import Session
from ckan.model import repo
from ckan.lib.base import config
from ckanext.harvest.model import HarvestSource, HarvestJob
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.queue import get_gather_publisher
log = __import__("logging").getLogger(__name__)
@ -40,6 +40,21 @@ def _job_as_dict(job):
return out
def _object_as_dict(obj):
out = obj.as_dict()
out['source'] = obj.source.as_dict()
out['job'] = obj.job.as_dict()
if obj.package:
out['package'] = obj.package.as_dict()
out['errors'] = []
for error in obj.errors:
out['errors'].append(error.as_dict())
return out
def get_harvest_source(id,default=Exception,attr=None):
source = HarvestSource.get(id,default=default,attr=attr)
@ -127,6 +142,30 @@ def delete_harvest_job(job_id):
return True
def run_harvest_jobs():
# Check if there are pending harvest jobs
jobs = get_harvest_jobs(status=u'New')
if len(jobs) == 0:
raise Exception('There are no new harvesting jobs')
# Send each job to the gather queue
publisher = get_gather_publisher()
for job in jobs:
publisher.send({'harvest_job_id': job['id']})
log.info('Sent job %s to the gather queue' % job['id'])
publisher.close()
return jobs
def get_harvest_object(id,attr=None):
obj = HarvestObject.get(id,attr)
return _object_as_dict(obj)
def get_harvest_objects(**kwds):
objects = HarvestObject.filter(**kwds).all()
return [_object_as_dict(obj) for obj in objects]
#TODO: move to ckanext-?? for geo stuff
def get_srid(crs):
"""Returns the SRID for the provided CRS definition

142
ckanext/harvest/queue.py Normal file
View File

@ -0,0 +1,142 @@
import logging
from carrot.connection import BrokerConnection
from carrot.messaging import Publisher
from carrot.messaging import Consumer
from ckan.lib.base import config
from ckan.plugins import PluginImplementations
from ckanext.harvest.model import HarvestJob, HarvestObject
from ckanext.harvest.interfaces import IHarvester
log = logging.getLogger(__name__)
__all__ = ['get_gather_publisher', 'get_gather_consumer', \
'get_fetch_publisher', 'get_fetch_consumer']
PORT = 5672
USERID = 'guest'
PASSWORD = 'guest'
HOSTNAME = 'localhost'
VIRTUAL_HOST = '/'
# settings for AMQP
EXCHANGE_TYPE = 'direct'
EXCHANGE_NAME = 'ckan.harvest'
def get_carrot_connection():
backend = config.get('ckan.harvest.mq.library', 'pyamqplib')
log.info("Carrot connnection using %s backend" % backend)
try:
port = int(config.get('ckan.harvest.mq.port', PORT))
except ValueError:
port = PORT
userid = config.get('ckan.harvest.mq.user_id', USERID)
password = config.get('ckan.harvest.mq.password', PASSWORD)
hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME)
virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST)
backend_cls = 'carrot.backends.%s.Backend' % backend
return BrokerConnection(hostname=hostname, port=port,
userid=userid, password=password,
virtual_host=virtual_host,
backend_cls=backend_cls)
def get_publisher(routing_key):
return Publisher(connection=get_carrot_connection(),
exchange=EXCHANGE_NAME,
exchange_type=EXCHANGE_TYPE,
routing_key=routing_key)
def get_consumer(queue_name, routing_key):
return Consumer(connection=get_carrot_connection(),
queue=queue_name,
routing_key=routing_key,
exchange=EXCHANGE_NAME,
exchange_type=EXCHANGE_TYPE,
durable=True, auto_delete=False)
def gather_callback(message_data,message):
try:
id = message_data['harvest_job_id']
log.info('Received harvest job id: %s' % id)
# Get a publisher for the fetch queue
publisher = get_fetch_publisher()
try:
job = HarvestJob.get(id)
# Send the harvest job to the plugins that implement
# the Harvester interface, only if the source type
# matches
for harvester in PluginImplementations(IHarvester):
if harvester.get_type() == job.source.type:
# Get a list of harvest object ids from the plugin
harvest_object_ids = harvester.gather_stage(job)
if len(harvest_object_ids) > 0:
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id':id})
log.info('Sent object %s to the fetch queue' % id)
except:
log.error('Harvest job does not exist: %s' % id)
finally:
publisher.close()
except KeyError:
log.error('No harvest job id received')
finally:
message.ack()
def fetch_callback(message_data,message):
try:
id = message_data['harvest_object_id']
log.info('Received harvest object id: %s' % id)
try:
obj = HarvestObject.get(id)
# Send the harvest object to the plugins that implement
# the Harvester interface, only if the source type
# matches
for harvester in PluginImplementations(IHarvester):
if harvester.get_type() == obj.source.type:
# See if the plugin can fetch the harvest object
success = harvester.fetch_stage(obj)
if success:
# If no errors where found, call the import method
harvester.import_stage(obj)
except:
log.error('Harvest object does not exist: %s' % id)
except KeyError:
log.error('No harvest object id received')
finally:
message.ack()
def get_gather_consumer():
consumer = get_consumer('ckan.harvest.gather','harvest_job_id')
consumer.register_callback(gather_callback)
return consumer
def get_fetch_consumer():
consumer = get_consumer('ckan.harvert.fetch','harvest_object_id')
consumer.register_callback(fetch_callback)
return consumer
def get_gather_publisher():
return get_publisher('harvest_job_id')
def get_fetch_publisher():
return get_publisher('harvest_object_id')