Resend awaiting objects in the DB that are not present in Redis
This happens when there is an ongoing harvest job and the keys on redis are deleted for some reason (eg not persistent container)
This commit is contained in:
parent
5ad6d866ed
commit
5ffe6d4bfa
|
@ -22,7 +22,8 @@ from ckan.plugins import toolkit
|
|||
from ckan.logic import NotFound, check_access
|
||||
|
||||
from ckanext.harvest.plugin import DATASET_TYPE_NAME
|
||||
from ckanext.harvest.queue import get_gather_publisher, resubmit_jobs
|
||||
from ckanext.harvest.queue import (
|
||||
get_gather_publisher, resubmit_jobs, resubmit_objects)
|
||||
|
||||
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
|
||||
from ckanext.harvest.logic import HarvestJobExists
|
||||
|
@ -565,10 +566,14 @@ def harvest_jobs_run(context, data_dict):
|
|||
else:
|
||||
log.debug('Ongoing job:%s source:%s',
|
||||
job['id'], job['source_id'])
|
||||
log.debug('No jobs to send to the gather queue')
|
||||
|
||||
# resubmit old redis tasks
|
||||
# Resubmit old redis tasks
|
||||
resubmit_jobs()
|
||||
|
||||
# Resubmit pending objects missing from Redis
|
||||
resubmit_objects()
|
||||
|
||||
return [] # merely for backwards compatibility
|
||||
|
||||
|
||||
|
|
|
@ -146,6 +146,25 @@ def resubmit_jobs():
|
|||
redis.delete(key)
|
||||
|
||||
|
||||
def resubmit_objects():
|
||||
'''
|
||||
Resubmit all WAITING objects on the DB that are not present in Redis
|
||||
'''
|
||||
if config.get('ckan.harvest.mq.type') != 'redis':
|
||||
return
|
||||
redis = get_connection()
|
||||
publisher = get_fetch_publisher()
|
||||
|
||||
waiting_objects = model.Session.query(HarvestObject.id) \
|
||||
.filter_by(state='WAITING') \
|
||||
.all()
|
||||
|
||||
for object_id in waiting_objects:
|
||||
if not redis.get(object_id):
|
||||
log.debug('Re-sent object {} to the fetch queue'.format(object_id[0]))
|
||||
publisher.send({'harvest_object_id': object_id[0]})
|
||||
|
||||
|
||||
class Publisher(object):
|
||||
def __init__(self, connection, channel, exchange, routing_key):
|
||||
self.connection = connection
|
||||
|
|
Loading…
Reference in New Issue