From b38fc57ec5ca1957c91b21db7e70670fb438661f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Mercader?= Date: Thu, 7 Apr 2011 16:59:11 +0100 Subject: [PATCH] [refactoring] Changes in the queue code and the IHarvester interface" --- ckanext/harvest/interfaces.py | 9 ++++---- ckanext/harvest/queue.py | 43 ++++++++++++++++++++++------------- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/ckanext/harvest/interfaces.py b/ckanext/harvest/interfaces.py index d421af0..da647fb 100644 --- a/ckanext/harvest/interfaces.py +++ b/ckanext/harvest/interfaces.py @@ -23,7 +23,8 @@ class IHarvester(Interface): responsible for: - gathering all the necessary objects to fetch on a later. stage (e.g. for a CSW server, perform a GetRecords request) - - creating the necessary HarvestObjects in the database. + - creating the necessary HarvestObjects in the database, specifying + the guid and a reference to its source and job. - creating and storing any suitable HarvestGatherErrors that may occur. - returning a list with all the ids of the created HarvestObjects. @@ -39,8 +40,6 @@ class IHarvester(Interface): - getting the contents of the remote object (e.g. for a CSW server, perform a GetRecordById request). - saving the content in the provided HarvestObject. - - update the fetch_started, fetch_finished and retry_times as - necessary. - creating and storing any suitable HarvestObjectErrors that may occur. - returning True if everything went as expected, False otherwise. @@ -55,10 +54,10 @@ class IHarvester(Interface): responsible for: - performing any necessary action with the fetched object (e.g create a CKAN package). - - creatingg the HarvestObject - Package relation (if necessary) + - creating the HarvestObject - Package relation (if necessary) - creating and storing any suitable HarvestObjectErrors that may occur. - - returning True if everything went as expected, False otherwisie. + - returning True if everything went as expected, False otherwise. :param harvest_object: HarvestObject object :returns: True if everything went right, False if errors were found diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 3eaa11e..8b264e2 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -1,4 +1,5 @@ import logging +import datetime from carrot.connection import BrokerConnection from carrot.messaging import Publisher @@ -15,7 +16,7 @@ log = logging.getLogger(__name__) __all__ = ['get_gather_publisher', 'get_gather_consumer', \ 'get_fetch_publisher', 'get_fetch_consumer'] -PORT = 5672 +PORT = 5672 USERID = 'guest' PASSWORD = 'guest' HOSTNAME = 'localhost' @@ -36,7 +37,7 @@ def get_carrot_connection(): password = config.get('ckan.harvest.mq.password', PASSWORD) hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME) virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST) - + backend_cls = 'carrot.backends.%s.Backend' % backend return BrokerConnection(hostname=hostname, port=port, userid=userid, password=password, @@ -51,23 +52,26 @@ def get_publisher(routing_key): def get_consumer(queue_name, routing_key): return Consumer(connection=get_carrot_connection(), - queue=queue_name, + queue=queue_name, routing_key=routing_key, exchange=EXCHANGE_NAME, exchange_type=EXCHANGE_TYPE, durable=True, auto_delete=False) + def gather_callback(message_data,message): try: id = message_data['harvest_job_id'] - log.info('Received harvest job id: %s' % id) + log.debug('Received harvest job id: %s' % id) # Get a publisher for the fetch queue publisher = get_fetch_publisher() try: job = HarvestJob.get(id) - + except: + log.error('Harvest job does not exist: %s' % id) + else: # Send the harvest job to the plugins that implement # the Harvester interface, only if the source type # matches @@ -76,15 +80,15 @@ def gather_callback(message_data,message): # Get a list of harvest object ids from the plugin harvest_object_ids = harvester.gather_stage(job) - - if len(harvest_object_ids) > 0: + log.debug('Received from plugin''s gather_stage: %r' % harvest_object_ids) + if harvest_object_ids and len(harvest_object_ids) > 0: for id in harvest_object_ids: # Send the id to the fetch queue publisher.send({'harvest_object_id':id}) - log.info('Sent object %s to the fetch queue' % id) + log.debug('Sent object %s to the fetch queue' % id) - except: - log.error('Harvest job does not exist: %s' % id) + job.status = u'Finished' + job.save() finally: publisher.close() @@ -102,22 +106,26 @@ def fetch_callback(message_data,message): try: obj = HarvestObject.get(id) - + except: + log.error('Harvest object does not exist: %s' % id) + else: # Send the harvest object to the plugins that implement # the Harvester interface, only if the source type # matches for harvester in PluginImplementations(IHarvester): if harvester.get_type() == obj.source.type: - # See if the plugin can fetch the harvest object + # See if the plugin can fetch the harvest object + obj.fetch_started = datetime.datetime.now() success = harvester.fetch_stage(obj) + obj.fetch_finished = datetime.datetime.now() + obj.save() + #TODO: retry times? if success: # If no errors where found, call the import method harvester.import_stage(obj) - - - except: - log.error('Harvest object does not exist: %s' % id) + + except KeyError: log.error('No harvest object id received') @@ -140,3 +148,6 @@ def get_gather_publisher(): def get_fetch_publisher(): return get_publisher('harvest_object_id') +# Get a publisher for the fetch queue +#fetch_publisher = get_fetch_publisher() +