diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 43bda6a..d022ce4 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -1,12 +1,14 @@ import sys import re +#import logging from pprint import pprint from ckan.lib.cli import CkanCommand from ckan.model import repo -#from ckanext.harvest.model import HarvestSource, HarvestingJob, HarvestedDocument -#from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject from ckanext.harvest.lib import * +from ckanext.harvest.queue import get_gather_consumer, get_fetch_consumer +#log = logging.getLogger(__name__) +#log.setLevel(logging.DEBUG) class Harvester(CkanCommand): '''Harvests remotely mastered metadata @@ -32,7 +34,13 @@ class Harvester(CkanCommand): harvester run - runs harvest jobs - + + harvester gather_consumer + - starts the consumer for the gathering queue + + harvester fetch_consumer + - starts the consumer for the fetching queue + harvester extents - creates or updates the extent geometry column for packages with a bounding box defined in extras @@ -52,12 +60,14 @@ class Harvester(CkanCommand): def command(self): self._load_config() - print "" + print '' if len(self.args) == 0: self.parser.print_usage() sys.exit(1) cmd = self.args[0] + #log.info(cmd) + #sys.exit(0) if cmd == 'source': self.create_harvest_source() elif cmd == "rmsource": @@ -74,6 +84,12 @@ class Harvester(CkanCommand): self.run_harvester() elif cmd == 'extents': self.update_extents() + elif cmd == 'gather_consumer': + consumer = get_gather_consumer() + consumer.wait() + elif cmd == 'fetch_consumer': + consumer = get_fetch_consumer() + consumer.wait() else: print 'Command %s not recognized' % cmd @@ -168,8 +184,16 @@ class Harvester(CkanCommand): self.print_harvest_jobs(jobs) self.print_there_are(what='harvest job', sequence=jobs) - #TODO: Move to lib and implement the queue system def run_harvester(self, *args, **kwds): + + jobs = run_harvest_jobs() + + print 'Sent %s jobs to the gather queue' % len(jobs) + + sys.exit(0) + + + #TODO: move all this stuff to ckanext-inspire from pylons.i18n.translation import _get_translator import pylons pylons.translator._push_object(_get_translator(pylons.config.get('lang'))) @@ -228,7 +252,7 @@ class Harvester(CkanCommand): def print_harvest_sources(self, sources): if sources: - print "" + print '' for source in sources: self.print_harvest_source(source) diff --git a/ckanext/harvest/interfaces.py b/ckanext/harvest/interfaces.py new file mode 100644 index 0000000..d421af0 --- /dev/null +++ b/ckanext/harvest/interfaces.py @@ -0,0 +1,66 @@ +from ckan.plugins.interfaces import Interface + +class IHarvester(Interface): + ''' + Common harvesting interface + + ''' + + def get_type(self): + ''' + Plugins must provide this method, which will return a string with the + Harvester type implemented by the plugin (e.g ``CSW``,``INSPIRE``, etc). + This will ensure that they only receive Harvest Jobs and Objects + relevant to them. + + returns: A string with the harvester type + ''' + + + def gather_stage(self, harvest_job): + ''' + The gather stage will recieve a HarvestJob object and will be + responsible for: + - gathering all the necessary objects to fetch on a later. + stage (e.g. for a CSW server, perform a GetRecords request) + - creating the necessary HarvestObjects in the database. + - creating and storing any suitable HarvestGatherErrors that may + occur. + - returning a list with all the ids of the created HarvestObjects. + + :param harvest_job: HarvestJob object + :returns: A list of HarvestObject ids + ''' + + def fetch_stage(self, harvest_object): + ''' + The fetch stage will receive a HarvestObject object and will be + responsible for: + - getting the contents of the remote object (e.g. for a CSW server, + perform a GetRecordById request). + - saving the content in the provided HarvestObject. + - update the fetch_started, fetch_finished and retry_times as + necessary. + - creating and storing any suitable HarvestObjectErrors that may + occur. + - returning True if everything went as expected, False otherwise. + + :param harvest_object: HarvestObject object + :returns: True if everything went right, False if errors were found + ''' + + def import_stage(self, harvest_object): + ''' + The import stage will receive a HarvestObject object and will be + responsible for: + - performing any necessary action with the fetched object (e.g + create a CKAN package). + - creatingg the HarvestObject - Package relation (if necessary) + - creating and storing any suitable HarvestObjectErrors that may + occur. + - returning True if everything went as expected, False otherwisie. + + :param harvest_object: HarvestObject object + :returns: True if everything went right, False if errors were found + ''' + diff --git a/ckanext/harvest/lib/__init__.py b/ckanext/harvest/lib/__init__.py index 7ca1289..97defb6 100644 --- a/ckanext/harvest/lib/__init__.py +++ b/ckanext/harvest/lib/__init__.py @@ -2,8 +2,8 @@ from ckan.model import Session from ckan.model import repo from ckan.lib.base import config -from ckanext.harvest.model import HarvestSource, HarvestJob - +from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject +from ckanext.harvest.queue import get_gather_publisher log = __import__("logging").getLogger(__name__) @@ -40,6 +40,21 @@ def _job_as_dict(job): return out +def _object_as_dict(obj): + out = obj.as_dict() + out['source'] = obj.source.as_dict() + out['job'] = obj.job.as_dict() + + if obj.package: + out['package'] = obj.package.as_dict() + + out['errors'] = [] + + for error in obj.errors: + out['errors'].append(error.as_dict()) + + return out + def get_harvest_source(id,default=Exception,attr=None): source = HarvestSource.get(id,default=default,attr=attr) @@ -127,6 +142,30 @@ def delete_harvest_job(job_id): return True +def run_harvest_jobs(): + # Check if there are pending harvest jobs + jobs = get_harvest_jobs(status=u'New') + if len(jobs) == 0: + raise Exception('There are no new harvesting jobs') + + # Send each job to the gather queue + publisher = get_gather_publisher() + for job in jobs: + publisher.send({'harvest_job_id': job['id']}) + log.info('Sent job %s to the gather queue' % job['id']) + + publisher.close() + return jobs + +def get_harvest_object(id,attr=None): + obj = HarvestObject.get(id,attr) + return _object_as_dict(obj) + +def get_harvest_objects(**kwds): + objects = HarvestObject.filter(**kwds).all() + return [_object_as_dict(obj) for obj in objects] + + #TODO: move to ckanext-?? for geo stuff def get_srid(crs): """Returns the SRID for the provided CRS definition diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py new file mode 100644 index 0000000..3eaa11e --- /dev/null +++ b/ckanext/harvest/queue.py @@ -0,0 +1,142 @@ +import logging + +from carrot.connection import BrokerConnection +from carrot.messaging import Publisher +from carrot.messaging import Consumer + +from ckan.lib.base import config +from ckan.plugins import PluginImplementations + +from ckanext.harvest.model import HarvestJob, HarvestObject +from ckanext.harvest.interfaces import IHarvester + +log = logging.getLogger(__name__) + +__all__ = ['get_gather_publisher', 'get_gather_consumer', \ + 'get_fetch_publisher', 'get_fetch_consumer'] + +PORT = 5672 +USERID = 'guest' +PASSWORD = 'guest' +HOSTNAME = 'localhost' +VIRTUAL_HOST = '/' + +# settings for AMQP +EXCHANGE_TYPE = 'direct' +EXCHANGE_NAME = 'ckan.harvest' + +def get_carrot_connection(): + backend = config.get('ckan.harvest.mq.library', 'pyamqplib') + log.info("Carrot connnection using %s backend" % backend) + try: + port = int(config.get('ckan.harvest.mq.port', PORT)) + except ValueError: + port = PORT + userid = config.get('ckan.harvest.mq.user_id', USERID) + password = config.get('ckan.harvest.mq.password', PASSWORD) + hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME) + virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST) + + backend_cls = 'carrot.backends.%s.Backend' % backend + return BrokerConnection(hostname=hostname, port=port, + userid=userid, password=password, + virtual_host=virtual_host, + backend_cls=backend_cls) + +def get_publisher(routing_key): + return Publisher(connection=get_carrot_connection(), + exchange=EXCHANGE_NAME, + exchange_type=EXCHANGE_TYPE, + routing_key=routing_key) + +def get_consumer(queue_name, routing_key): + return Consumer(connection=get_carrot_connection(), + queue=queue_name, + routing_key=routing_key, + exchange=EXCHANGE_NAME, + exchange_type=EXCHANGE_TYPE, + durable=True, auto_delete=False) + +def gather_callback(message_data,message): + try: + id = message_data['harvest_job_id'] + log.info('Received harvest job id: %s' % id) + + # Get a publisher for the fetch queue + publisher = get_fetch_publisher() + + try: + job = HarvestJob.get(id) + + # Send the harvest job to the plugins that implement + # the Harvester interface, only if the source type + # matches + for harvester in PluginImplementations(IHarvester): + if harvester.get_type() == job.source.type: + + # Get a list of harvest object ids from the plugin + harvest_object_ids = harvester.gather_stage(job) + + if len(harvest_object_ids) > 0: + for id in harvest_object_ids: + # Send the id to the fetch queue + publisher.send({'harvest_object_id':id}) + log.info('Sent object %s to the fetch queue' % id) + + except: + log.error('Harvest job does not exist: %s' % id) + + finally: + publisher.close() + + except KeyError: + log.error('No harvest job id received') + finally: + message.ack() + + +def fetch_callback(message_data,message): + try: + id = message_data['harvest_object_id'] + log.info('Received harvest object id: %s' % id) + + try: + obj = HarvestObject.get(id) + + # Send the harvest object to the plugins that implement + # the Harvester interface, only if the source type + # matches + for harvester in PluginImplementations(IHarvester): + if harvester.get_type() == obj.source.type: + + # See if the plugin can fetch the harvest object + success = harvester.fetch_stage(obj) + if success: + # If no errors where found, call the import method + harvester.import_stage(obj) + + + except: + log.error('Harvest object does not exist: %s' % id) + + except KeyError: + log.error('No harvest object id received') + finally: + message.ack() + +def get_gather_consumer(): + consumer = get_consumer('ckan.harvest.gather','harvest_job_id') + consumer.register_callback(gather_callback) + return consumer + +def get_fetch_consumer(): + consumer = get_consumer('ckan.harvert.fetch','harvest_object_id') + consumer.register_callback(fetch_callback) + return consumer + +def get_gather_publisher(): + return get_publisher('harvest_job_id') + +def get_fetch_publisher(): + return get_publisher('harvest_object_id') +