[#21] Improve gather stage error handling
See issue for full details. Basically we don't want to catch any exception at the queue.py level, as they prevent debugging. Harvesters should deal with them and return a list of ids or an empty list if no objects need to be fetched. Also improved the debug messages.
This commit is contained in:
parent
91f18bffab
commit
d77f16aba9
|
@ -97,54 +97,64 @@ def gather_callback(channel, method, header, body):
|
||||||
try:
|
try:
|
||||||
id = json.loads(body)['harvest_job_id']
|
id = json.loads(body)['harvest_job_id']
|
||||||
log.debug('Received harvest job id: %s' % id)
|
log.debug('Received harvest job id: %s' % id)
|
||||||
|
|
||||||
# Get a publisher for the fetch queue
|
|
||||||
publisher = get_fetch_publisher()
|
|
||||||
|
|
||||||
try:
|
|
||||||
job = HarvestJob.get(id)
|
|
||||||
except:
|
|
||||||
log.error('Harvest job does not exist: %s' % id)
|
|
||||||
else:
|
|
||||||
# Send the harvest job to the plugins that implement
|
|
||||||
# the Harvester interface, only if the source type
|
|
||||||
# matches
|
|
||||||
harvester_found = False
|
|
||||||
for harvester in PluginImplementations(IHarvester):
|
|
||||||
if harvester.info()['name'] == job.source.type:
|
|
||||||
harvester_found = True
|
|
||||||
# Get a list of harvest object ids from the plugin
|
|
||||||
job.gather_started = datetime.datetime.now()
|
|
||||||
try:
|
|
||||||
harvest_object_ids = harvester.gather_stage(job)
|
|
||||||
except Exception, e:
|
|
||||||
log.error('Gather stage failed unexpectedly: %s' % e)
|
|
||||||
job.status = 'Errored'
|
|
||||||
job.save()
|
|
||||||
continue
|
|
||||||
job.gather_finished = datetime.datetime.now()
|
|
||||||
job.save()
|
|
||||||
log.debug('Received from plugin''s gather_stage: %r' % harvest_object_ids)
|
|
||||||
if harvest_object_ids and len(harvest_object_ids) > 0:
|
|
||||||
for id in harvest_object_ids:
|
|
||||||
# Send the id to the fetch queue
|
|
||||||
publisher.send({'harvest_object_id':id})
|
|
||||||
log.debug('Sent object %s to the fetch queue' % id)
|
|
||||||
|
|
||||||
if not harvester_found:
|
|
||||||
msg = 'No harvester could be found for source type %s' % job.source.type
|
|
||||||
err = HarvestGatherError(message=msg,job=job)
|
|
||||||
err.save()
|
|
||||||
log.error(msg)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
publisher.close()
|
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.error('No harvest job id received')
|
log.error('No harvest job id received')
|
||||||
finally:
|
|
||||||
model.Session.remove()
|
|
||||||
channel.basic_ack(method.delivery_tag)
|
channel.basic_ack(method.delivery_tag)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Get a publisher for the fetch queue
|
||||||
|
publisher = get_fetch_publisher()
|
||||||
|
|
||||||
|
job = HarvestJob.get(id)
|
||||||
|
|
||||||
|
if not job:
|
||||||
|
log.error('Harvest job does not exist: %s' % id)
|
||||||
|
channel.basic_ack(method.delivery_tag)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Send the harvest job to the plugins that implement
|
||||||
|
# the Harvester interface, only if the source type
|
||||||
|
# matches
|
||||||
|
harvester_found = False
|
||||||
|
for harvester in PluginImplementations(IHarvester):
|
||||||
|
if harvester.info()['name'] == job.source.type:
|
||||||
|
harvester_found = True
|
||||||
|
# Get a list of harvest object ids from the plugin
|
||||||
|
job.gather_started = datetime.datetime.now()
|
||||||
|
|
||||||
|
harvest_object_ids = harvester.gather_stage(job)
|
||||||
|
|
||||||
|
job.gather_finished = datetime.datetime.now()
|
||||||
|
job.save()
|
||||||
|
|
||||||
|
if not isinstance(harvest_object_ids, list):
|
||||||
|
log.error('Gather stage failed')
|
||||||
|
publisher.close()
|
||||||
|
channel.basic_ack(method.delivery_tag)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if len(harvest_object_ids) == 0:
|
||||||
|
log.info('No harvest objects to fetch')
|
||||||
|
publisher.close()
|
||||||
|
channel.basic_ack(method.delivery_tag)
|
||||||
|
return False
|
||||||
|
|
||||||
|
log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format(
|
||||||
|
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
|
||||||
|
for id in harvest_object_ids:
|
||||||
|
# Send the id to the fetch queue
|
||||||
|
publisher.send({'harvest_object_id':id})
|
||||||
|
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))
|
||||||
|
|
||||||
|
if not harvester_found:
|
||||||
|
msg = 'No harvester could be found for source type %s' % job.source.type
|
||||||
|
err = HarvestGatherError(message=msg,job=job)
|
||||||
|
err.save()
|
||||||
|
log.error(msg)
|
||||||
|
|
||||||
|
model.Session.remove()
|
||||||
|
publisher.close()
|
||||||
|
channel.basic_ack(method.delivery_tag)
|
||||||
|
|
||||||
|
|
||||||
def fetch_callback(channel, method, header, body):
|
def fetch_callback(channel, method, header, body):
|
||||||
|
|
Loading…
Reference in New Issue