[refactoring] Changes in the queue code and the IHarvester interface"

This commit is contained in:
Adrià Mercader 2011-04-07 16:59:11 +01:00
parent 90ae9d27db
commit b38fc57ec5
2 changed files with 31 additions and 21 deletions

View File

@ -23,7 +23,8 @@ class IHarvester(Interface):
responsible for:
- gathering all the necessary objects to fetch on a later.
stage (e.g. for a CSW server, perform a GetRecords request)
- creating the necessary HarvestObjects in the database.
- creating the necessary HarvestObjects in the database, specifying
the guid and a reference to its source and job.
- creating and storing any suitable HarvestGatherErrors that may
occur.
- returning a list with all the ids of the created HarvestObjects.
@ -39,8 +40,6 @@ class IHarvester(Interface):
- getting the contents of the remote object (e.g. for a CSW server,
perform a GetRecordById request).
- saving the content in the provided HarvestObject.
- update the fetch_started, fetch_finished and retry_times as
necessary.
- creating and storing any suitable HarvestObjectErrors that may
occur.
- returning True if everything went as expected, False otherwise.
@ -55,10 +54,10 @@ class IHarvester(Interface):
responsible for:
- performing any necessary action with the fetched object (e.g
create a CKAN package).
- creatingg the HarvestObject - Package relation (if necessary)
- creating the HarvestObject - Package relation (if necessary)
- creating and storing any suitable HarvestObjectErrors that may
occur.
- returning True if everything went as expected, False otherwisie.
- returning True if everything went as expected, False otherwise.
:param harvest_object: HarvestObject object
:returns: True if everything went right, False if errors were found

View File

@ -1,4 +1,5 @@
import logging
import datetime
from carrot.connection import BrokerConnection
from carrot.messaging import Publisher
@ -15,7 +16,7 @@ log = logging.getLogger(__name__)
__all__ = ['get_gather_publisher', 'get_gather_consumer', \
'get_fetch_publisher', 'get_fetch_consumer']
PORT = 5672
PORT = 5672
USERID = 'guest'
PASSWORD = 'guest'
HOSTNAME = 'localhost'
@ -36,7 +37,7 @@ def get_carrot_connection():
password = config.get('ckan.harvest.mq.password', PASSWORD)
hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME)
virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST)
backend_cls = 'carrot.backends.%s.Backend' % backend
return BrokerConnection(hostname=hostname, port=port,
userid=userid, password=password,
@ -51,23 +52,26 @@ def get_publisher(routing_key):
def get_consumer(queue_name, routing_key):
return Consumer(connection=get_carrot_connection(),
queue=queue_name,
queue=queue_name,
routing_key=routing_key,
exchange=EXCHANGE_NAME,
exchange_type=EXCHANGE_TYPE,
durable=True, auto_delete=False)
def gather_callback(message_data,message):
try:
id = message_data['harvest_job_id']
log.info('Received harvest job id: %s' % id)
log.debug('Received harvest job id: %s' % id)
# Get a publisher for the fetch queue
publisher = get_fetch_publisher()
try:
job = HarvestJob.get(id)
except:
log.error('Harvest job does not exist: %s' % id)
else:
# Send the harvest job to the plugins that implement
# the Harvester interface, only if the source type
# matches
@ -76,15 +80,15 @@ def gather_callback(message_data,message):
# Get a list of harvest object ids from the plugin
harvest_object_ids = harvester.gather_stage(job)
if len(harvest_object_ids) > 0:
log.debug('Received from plugin''s gather_stage: %r' % harvest_object_ids)
if harvest_object_ids and len(harvest_object_ids) > 0:
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id':id})
log.info('Sent object %s to the fetch queue' % id)
log.debug('Sent object %s to the fetch queue' % id)
except:
log.error('Harvest job does not exist: %s' % id)
job.status = u'Finished'
job.save()
finally:
publisher.close()
@ -102,22 +106,26 @@ def fetch_callback(message_data,message):
try:
obj = HarvestObject.get(id)
except:
log.error('Harvest object does not exist: %s' % id)
else:
# Send the harvest object to the plugins that implement
# the Harvester interface, only if the source type
# matches
for harvester in PluginImplementations(IHarvester):
if harvester.get_type() == obj.source.type:
# See if the plugin can fetch the harvest object
# See if the plugin can fetch the harvest object
obj.fetch_started = datetime.datetime.now()
success = harvester.fetch_stage(obj)
obj.fetch_finished = datetime.datetime.now()
obj.save()
#TODO: retry times?
if success:
# If no errors where found, call the import method
harvester.import_stage(obj)
except:
log.error('Harvest object does not exist: %s' % id)
except KeyError:
log.error('No harvest object id received')
@ -140,3 +148,6 @@ def get_gather_publisher():
def get_fetch_publisher():
return get_publisher('harvest_object_id')
# Get a publisher for the fetch queue
#fetch_publisher = get_fetch_publisher()