diff --git a/README.rst b/README.rst index 59c6883..9b4a733 100644 --- a/README.rst +++ b/README.rst @@ -196,6 +196,18 @@ That way, all CKAN Users who are declared as Sysadmins will receive the Error em If you don't specify this setting, the default will be False. +Set a timeout for a harvest job (optional) +================================================ + +IF you want to set a timeout for harvest jobs, you can add this configuration option to the ini file: + + ckan.harvest.timeout = 1440 + +The timeout value is in minutes, so 1440 represents 24 hours. +Any jobs which are timed out will create an error message for the user to see. + +If you don't specify this setting, the default will be False and there will be no timeout on harvest jobs. + Command line interface ====================== diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 62fb0a6..05be25c 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -30,7 +30,7 @@ from ckanext.harvest.utils import ( from ckanext.harvest.queue import ( get_gather_publisher, resubmit_jobs, resubmit_objects) -from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject +from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, HarvestGatherError, HarvestObjectError from ckanext.harvest.logic import HarvestJobExists from ckanext.harvest.logic.dictization import harvest_job_dictize @@ -500,6 +500,10 @@ def harvest_jobs_run(context, data_dict): Runs scheduled jobs, checks if any jobs need marking as finished, and resubmits queue items if needed. + If ckanext.harvest.timeout is set: + Check if the duration of the job is longer than ckanext.harvest.timeout, + then mark that job as finished as there is probably an underlying issue with the harvest process. + This should be called every few minutes (e.g. by a cron), or else jobs will never show as finished. @@ -514,6 +518,7 @@ def harvest_jobs_run(context, data_dict): ''' log.info('Harvest job run: %r', data_dict) check_access('harvest_jobs_run', context, data_dict) + timeout = config.get('ckan.harvest.timeout') session = context['session'] @@ -530,6 +535,24 @@ def harvest_jobs_run(context, data_dict): context, {'source_id': source_id, 'status': u'Running'}) if len(jobs): for job in jobs: + if timeout: + created = datetime.datetime.strptime(job['created'], '%Y-%m-%d %H:%M:%S.%f') + now = datetime.datetime.now() + if now - created > datetime.timedelta(minutes=int(timeout)): + msg = 'Job timeout: %s is taking longer than %s minutes' % (job['id'], timeout) + log.error(msg) + + job_obj = HarvestJob.get(job['id']) + job_obj.status = u'Finished' + job_obj.finished = now + job_obj.save() + + err = HarvestGatherError(message=msg, job=job_obj) + err.save() + log.info('Marking job as finished due to error: %s %s', + job_obj.source.url, job_obj.id) + continue + if job['gather_finished']: num_objects_in_progress = \ session.query(HarvestObject.id) \ diff --git a/ckanext/harvest/tests/nose/test_action.py b/ckanext/harvest/tests/nose/test_action.py index f08264e..7cac877 100644 --- a/ckanext/harvest/tests/nose/test_action.py +++ b/ckanext/harvest/tests/nose/test_action.py @@ -496,6 +496,103 @@ class TestActions(ActionBase): assert_equal(job['gather_started'], None) assert_in('stats', job.keys()) + @patch('ckanext.harvest.logic.action.update.log.error') + def test_harvest_jobs_run_times_out(self, mock_error_log): + harvest_source = factories.HarvestSourceObj(**SOURCE_DICT.copy()) + harvest_job = factories.HarvestJobObj( + source=harvest_source, + run=True + ) + # date in the past, ckan.harvest.timeout has been set to 5 minutes in test-nose.ini + harvest_job.created = '2020-05-29 10:00:00.0' + harvest_job.save() + + context = {'model': model, 'session': model.Session, + 'ignore_auth': True, 'user': ''} + + data_dict = { + 'guid': 'guid', + 'content': 'content', + 'job_id': harvest_job.id, + 'source_id': harvest_source.id + } + + job = toolkit.get_action('harvest_jobs_run')( + context, data_dict) + + msg, = mock_error_log.call_args[0] + + assert mock_error_log.called + assert msg == 'Job timeout: {} is taking longer than 5 minutes'.format(harvest_job.id) + + status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source.id}) + assert status['last_job']['status'] == 'Finished' + assert status['last_job']['stats']['errored'] == 1 + + @patch('ckanext.harvest.logic.action.update.log.error') + def test_harvest_jobs_run_does_not_timeout_if_within_time(self, mock_error_log): + harvest_source = factories.HarvestSourceObj(**SOURCE_DICT.copy()) + harvest_job = factories.HarvestJobObj( + source=harvest_source, + run=True + ) + # job has just been created, so no timeout expected + + context = {'model': model, 'session': model.Session, + 'ignore_auth': True, 'user': ''} + + data_dict = { + 'guid': 'guid', + 'content': 'content', + 'job_id': harvest_job.id, + 'source_id': harvest_source.id + } + + job_obj = HarvestJob.get(harvest_job.id) + + job = toolkit.get_action('harvest_jobs_run')( + context, data_dict) + + assert not mock_error_log.called + + status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source.id}) + assert status['last_job']['status'] == 'Running' + assert status['last_job']['stats']['errored'] == 0 + + @patch.dict('ckanext.harvest.logic.action.update.config', + {'ckan.harvest.timeout': None}) + @patch('ckanext.harvest.logic.action.update.log.error') + def test_harvest_jobs_run_does_not_timeout_if_timeout_not_set(self, mock_error_log): + harvest_source = factories.HarvestSourceObj(**SOURCE_DICT.copy()) + harvest_job = factories.HarvestJobObj( + source=harvest_source, + run=True + ) + # date in the past, assumes ckan.harvest.timeout has been set to 5 minutes + harvest_job.created = '2020-05-29 10:00:00.0' + harvest_job.save() + + context = {'model': model, 'session': model.Session, + 'ignore_auth': True, 'user': ''} + + data_dict = { + 'guid': 'guid', + 'content': 'content', + 'job_id': harvest_job.id, + 'source_id': harvest_source.id + } + + job_obj = HarvestJob.get(harvest_job.id) + + job = toolkit.get_action('harvest_jobs_run')( + context, data_dict) + + assert not mock_error_log.called + + status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source.id}) + assert status['last_job']['status'] == 'Running' + assert status['last_job']['stats']['errored'] == 0 + class TestHarvestObject(unittest.TestCase): @classmethod diff --git a/ckanext/harvest/tests/nose/test_queue.py b/ckanext/harvest/tests/nose/test_queue.py index 6d5d549..503ec5b 100644 --- a/ckanext/harvest/tests/nose/test_queue.py +++ b/ckanext/harvest/tests/nose/test_queue.py @@ -264,6 +264,121 @@ class TestHarvestQueue(object): assert_equal(harvest_source_dict['status']['total_datasets'], 2) assert_equal(harvest_source_dict['status']['job_count'], 2) + def test_fetch_doesnt_process_remaining_objects_if_job_status_finished(self): + + # make sure queues/exchanges are created first and are empty + consumer = queue.get_gather_consumer() + consumer_fetch = queue.get_fetch_consumer() + consumer.queue_purge(queue=queue.get_gather_queue_name()) + consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name()) + + user = logic.get_action('get_site_user')( + {'model': model, 'ignore_auth': True}, {} + )['name'] + + context = {'model': model, 'session': model.Session, + 'user': user, 'api_version': 3, 'ignore_auth': True} + + source_dict = { + 'title': 'Test Job Finished', + 'name': 'test-job-finished', + 'url': 'basic_test_1', + 'source_type': 'test-nose', + } + + harvest_source = logic.get_action('harvest_source_create')( + context, + source_dict + ) + + assert harvest_source['source_type'] == 'test-nose', harvest_source + assert harvest_source['url'] == 'basic_test_1', harvest_source + + harvest_job = logic.get_action('harvest_job_create')( + context, + {'source_id': harvest_source['id'], 'run': True} + ) + + job_id = harvest_job['id'] + + assert harvest_job['source_id'] == harvest_source['id'], harvest_job + + assert harvest_job['status'] == u'Running' + + assert logic.get_action('harvest_job_show')( + context, + {'id': job_id} + )['status'] == u'Running' + + # pop on item off the queue and run the callback + reply = consumer.basic_get(queue='ckan.harvest.gather') + + queue.gather_callback(consumer, *reply) + + all_objects = model.Session.query(HarvestObject).filter( + HarvestObject.harvest_job_id == harvest_job['id'] + ).all() + + assert len(all_objects) == 3 + assert all_objects[0].state == 'WAITING' + assert all_objects[1].state == 'WAITING' + assert all_objects[2].state == 'WAITING' + + # artificially set the job to finished to simulate a job abort or timeout + job_obj = HarvestJob.get(harvest_job['id']) + job_obj.status = 'Finished' + job_obj.save() + + original_dataset_count = model.Session.query(model.Package) \ + .filter(model.Package.type == 'dataset') \ + .count() + + # do three times as three harvest objects + reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch') + queue.fetch_callback(consumer_fetch, *reply) + reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch') + queue.fetch_callback(consumer_fetch, *reply) + reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch') + queue.fetch_callback(consumer_fetch, *reply) + + all_objects = model.Session.query(HarvestObject).filter( + HarvestObject.harvest_job_id == harvest_job['id'] + ).all() + + assert len(all_objects) == 3 + assert all_objects[0].state == 'ERROR' + assert all_objects[1].state == 'ERROR' + assert all_objects[2].state == 'ERROR' + + count = model.Session.query(model.Package) \ + .filter(model.Package.type == 'dataset') \ + .count() + assert count == original_dataset_count + + # fire run again to check if job is set to Finished + logic.get_action('harvest_jobs_run')( + context, + {'source_id': harvest_source['id']} + ) + + harvest_job = logic.get_action('harvest_job_show')( + context, + {'id': job_id} + ) + + assert_equal(harvest_job['status'], u'Finished') + assert_equal(harvest_job['stats'], {'added': 0, 'updated': 0, 'not modified': 0, 'errored': 3, 'deleted': 0}) + + harvest_source_dict = logic.get_action('harvest_source_show')( + context, + {'id': harvest_source['id']} + ) + + assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 0, + 'not modified': 0, 'errored': 3, 'deleted': 0}) + assert_equal(harvest_source_dict['status']['total_datasets'], 0) + assert_equal(harvest_source_dict['status']['job_count'], 1) + def test_redis_queue_purging(self): ''' Test that Redis queue purging doesn't purge the wrong keys. diff --git a/test-nose.ini b/test-nose.ini index 1c53e07..08a3737 100644 --- a/test-nose.ini +++ b/test-nose.ini @@ -17,6 +17,7 @@ use = config:../ckan/test-core.ini # run fast. ckan.plugins = harvest ckan_harvester test_nose_harvester test_nose_harvester2 test_nose_action_harvester ckan.harvest.mq.type = redis +ckan.harvest.timeout = 5 ckan.legacy_templates = false # NB: other test configuration should go in test-core.ini, which is # what the postgres tests use. diff --git a/test.ini b/test.ini index ffbd4f5..5c410da 100644 --- a/test.ini +++ b/test.ini @@ -17,6 +17,7 @@ use = config:../ckan/test-core.ini # run fast. ckan.plugins = harvest ckan_harvester test_harvester test_harvester2 test_action_harvester ckan.harvest.mq.type = redis +ckan.harvest.timeout = 5 ckan.legacy_templates = false # NB: other test configuration should go in test-core.ini, which is # what the postgres tests use.