Set job to finished if job is longer than timeout

This commit is contained in:
Ken Tsang 2020-05-19 11:08:22 +01:00
parent 19da5c5768
commit 3619dbc0ad
6 changed files with 250 additions and 1 deletions

View File

@ -196,6 +196,18 @@ That way, all CKAN Users who are declared as Sysadmins will receive the Error em
If you don't specify this setting, the default will be False.
Set a timeout for a harvest job (optional)
================================================
IF you want to set a timeout for harvest jobs, you can add this configuration option to the ini file:
ckan.harvest.timeout = 1440
The timeout value is in minutes, so 1440 represents 24 hours.
Any jobs which are timed out will create an error message for the user to see.
If you don't specify this setting, the default will be False and there will be no timeout on harvest jobs.
Command line interface
======================

View File

@ -30,7 +30,7 @@ from ckanext.harvest.utils import (
from ckanext.harvest.queue import (
get_gather_publisher, resubmit_jobs, resubmit_objects)
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, HarvestGatherError, HarvestObjectError
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.logic.dictization import harvest_job_dictize
@ -500,6 +500,10 @@ def harvest_jobs_run(context, data_dict):
Runs scheduled jobs, checks if any jobs need marking as finished, and
resubmits queue items if needed.
If ckanext.harvest.timeout is set:
Check if the duration of the job is longer than ckanext.harvest.timeout,
then mark that job as finished as there is probably an underlying issue with the harvest process.
This should be called every few minutes (e.g. by a cron), or else jobs
will never show as finished.
@ -514,6 +518,7 @@ def harvest_jobs_run(context, data_dict):
'''
log.info('Harvest job run: %r', data_dict)
check_access('harvest_jobs_run', context, data_dict)
timeout = config.get('ckan.harvest.timeout')
session = context['session']
@ -530,6 +535,24 @@ def harvest_jobs_run(context, data_dict):
context, {'source_id': source_id, 'status': u'Running'})
if len(jobs):
for job in jobs:
if timeout:
created = datetime.datetime.strptime(job['created'], '%Y-%m-%d %H:%M:%S.%f')
now = datetime.datetime.now()
if now - created > datetime.timedelta(minutes=int(timeout)):
msg = 'Job timeout: %s is taking longer than %s minutes' % (job['id'], timeout)
log.error(msg)
job_obj = HarvestJob.get(job['id'])
job_obj.status = u'Finished'
job_obj.finished = now
job_obj.save()
err = HarvestGatherError(message=msg, job=job_obj)
err.save()
log.info('Marking job as finished due to error: %s %s',
job_obj.source.url, job_obj.id)
continue
if job['gather_finished']:
num_objects_in_progress = \
session.query(HarvestObject.id) \

View File

@ -496,6 +496,103 @@ class TestActions(ActionBase):
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
@patch('ckanext.harvest.logic.action.update.log.error')
def test_harvest_jobs_run_times_out(self, mock_error_log):
harvest_source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
harvest_job = factories.HarvestJobObj(
source=harvest_source,
run=True
)
# date in the past, ckan.harvest.timeout has been set to 5 minutes in test-nose.ini
harvest_job.created = '2020-05-29 10:00:00.0'
harvest_job.save()
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job.id,
'source_id': harvest_source.id
}
job = toolkit.get_action('harvest_jobs_run')(
context, data_dict)
msg, = mock_error_log.call_args[0]
assert mock_error_log.called
assert msg == 'Job timeout: {} is taking longer than 5 minutes'.format(harvest_job.id)
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source.id})
assert status['last_job']['status'] == 'Finished'
assert status['last_job']['stats']['errored'] == 1
@patch('ckanext.harvest.logic.action.update.log.error')
def test_harvest_jobs_run_does_not_timeout_if_within_time(self, mock_error_log):
harvest_source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
harvest_job = factories.HarvestJobObj(
source=harvest_source,
run=True
)
# job has just been created, so no timeout expected
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job.id,
'source_id': harvest_source.id
}
job_obj = HarvestJob.get(harvest_job.id)
job = toolkit.get_action('harvest_jobs_run')(
context, data_dict)
assert not mock_error_log.called
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source.id})
assert status['last_job']['status'] == 'Running'
assert status['last_job']['stats']['errored'] == 0
@patch.dict('ckanext.harvest.logic.action.update.config',
{'ckan.harvest.timeout': None})
@patch('ckanext.harvest.logic.action.update.log.error')
def test_harvest_jobs_run_does_not_timeout_if_timeout_not_set(self, mock_error_log):
harvest_source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
harvest_job = factories.HarvestJobObj(
source=harvest_source,
run=True
)
# date in the past, assumes ckan.harvest.timeout has been set to 5 minutes
harvest_job.created = '2020-05-29 10:00:00.0'
harvest_job.save()
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job.id,
'source_id': harvest_source.id
}
job_obj = HarvestJob.get(harvest_job.id)
job = toolkit.get_action('harvest_jobs_run')(
context, data_dict)
assert not mock_error_log.called
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source.id})
assert status['last_job']['status'] == 'Running'
assert status['last_job']['stats']['errored'] == 0
class TestHarvestObject(unittest.TestCase):
@classmethod

View File

@ -264,6 +264,121 @@ class TestHarvestQueue(object):
assert_equal(harvest_source_dict['status']['total_datasets'], 2)
assert_equal(harvest_source_dict['status']['job_count'], 2)
def test_fetch_doesnt_process_remaining_objects_if_job_status_finished(self):
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
source_dict = {
'title': 'Test Job Finished',
'name': 'test-job-finished',
'url': 'basic_test_1',
'source_type': 'test-nose',
}
harvest_source = logic.get_action('harvest_source_create')(
context,
source_dict
)
assert harvest_source['source_type'] == 'test-nose', harvest_source
assert harvest_source['url'] == 'basic_test_1', harvest_source
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).filter(
HarvestObject.harvest_job_id == harvest_job['id']
).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
# artificially set the job to finished to simulate a job abort or timeout
job_obj = HarvestJob.get(harvest_job['id'])
job_obj.status = 'Finished'
job_obj.save()
original_dataset_count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
all_objects = model.Session.query(HarvestObject).filter(
HarvestObject.harvest_job_id == harvest_job['id']
).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'ERROR'
assert all_objects[1].state == 'ERROR'
assert all_objects[2].state == 'ERROR'
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == original_dataset_count
# fire run again to check if job is set to Finished
logic.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert_equal(harvest_job['status'], u'Finished')
assert_equal(harvest_job['stats'], {'added': 0, 'updated': 0, 'not modified': 0, 'errored': 3, 'deleted': 0})
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 0,
'not modified': 0, 'errored': 3, 'deleted': 0})
assert_equal(harvest_source_dict['status']['total_datasets'], 0)
assert_equal(harvest_source_dict['status']['job_count'], 1)
def test_redis_queue_purging(self):
'''
Test that Redis queue purging doesn't purge the wrong keys.

View File

@ -17,6 +17,7 @@ use = config:../ckan/test-core.ini
# run fast.
ckan.plugins = harvest ckan_harvester test_nose_harvester test_nose_harvester2 test_nose_action_harvester
ckan.harvest.mq.type = redis
ckan.harvest.timeout = 5
ckan.legacy_templates = false
# NB: other test configuration should go in test-core.ini, which is
# what the postgres tests use.

View File

@ -17,6 +17,7 @@ use = config:../ckan/test-core.ini
# run fast.
ckan.plugins = harvest ckan_harvester test_harvester test_harvester2 test_action_harvester
ckan.harvest.mq.type = redis
ckan.harvest.timeout = 5
ckan.legacy_templates = false
# NB: other test configuration should go in test-core.ini, which is
# what the postgres tests use.