diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a0bb373..d4b87cd 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,4 +3,5 @@ Changelog v0.0.6 `2016-??-??` ------------------- -Includes i18n directory in package. \ No newline at end of file +- Includes i18n directory in package. +- Adds a new `clearsource_history` command/operation. diff --git a/README.rst b/README.rst index d61aefe..fe47bec 100644 --- a/README.rst +++ b/README.rst @@ -189,6 +189,14 @@ The following operations can be run from the command line as described underneat - clears all datasets, jobs and objects related to a harvest source, but keeps the source itself + harvester clearsource_history [{source-id}] + - If no source id is given the history for all harvest sources (maximum is 1000) + will be cleared. + Clears all jobs and objects related to a harvest source, but keeps the source + itself. The datasets imported from the harvest source will **NOT** be deleted!!! + If a source id is given, it only clears the history of the harvest source with + the given source id. + harvester sources [all] - lists harvest sources If 'all' is defined, it also shows the Inactive sources diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 738ccae..5c515b9 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -29,6 +29,12 @@ class Harvester(CkanCommand): - clears all datasets, jobs and objects related to a harvest source, but keeps the source itself + harvester clearsource_history [{source-id}] + - If no source id is given the history for all harvest sources (maximum is 1000) will be cleared. + Clears all jobs and objects related to a harvest source, but keeps the source itself. + The datasets imported from the harvest source will NOT be deleted!!! + If a source id is given, it only clears the history of the harvest source with the given source id. + harvester sources [all] - lists harvest sources If 'all' is defined, it also shows the Inactive sources @@ -153,6 +159,8 @@ class Harvester(CkanCommand): self.remove_harvest_source() elif cmd == 'clearsource': self.clear_harvest_source() + elif cmd == 'clearsource_history': + self.clear_harvest_source_history() elif cmd == 'sources': self.list_harvest_sources() elif cmd == 'job': @@ -182,8 +190,7 @@ class Harvester(CkanCommand): for method, header, body in consumer.consume(queue=get_fetch_queue_name()): fetch_callback(consumer, method, header, body) elif cmd == 'purge_queues': - from ckanext.harvest.queue import purge_queues - purge_queues() + self.purge_queues() elif cmd == 'initdb': self.initdb() elif cmd == 'import': @@ -288,6 +295,29 @@ class Harvester(CkanCommand): print str(e.error_dict) raise e + def clear_harvest_source_history(self): + source_id = None + if len(self.args) >= 2: + source_id = unicode(self.args[1]) + + context = { + 'model': model, + 'user': self.admin_user['name'], + 'session': model.Session + } + if source_id is not None: + get_action('harvest_source_job_history_clear')(context,{'id':source_id}) + print 'Cleared job history of harvest source: %s' % source_id + else: + ''' + Purge queues, because we clean all harvest jobs and + objects in the database. + ''' + self.purge_queues() + cleared_sources_dicts = get_action('harvest_sources_job_history_clear')(context,{}) + print 'Cleared job history for all harvest sources: %s source(s)' % len(cleared_sources_dicts) + + def show_harvest_source(self): if len(self.args) >= 2: @@ -465,6 +495,9 @@ class Harvester(CkanCommand): context = {'model': model, 'user': self.admin_user['name']} get_action('harvest_sources_reindex')(context,{}) + def purge_queues(self): + from ckanext.harvest.queue import purge_queues + purge_queues() def print_harvest_sources(self, sources): if sources: diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index e4f2041..7728fec 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -225,6 +225,73 @@ def harvest_source_clear(context, data_dict): return {'id': harvest_source_id} +def harvest_sources_job_history_clear(context, data_dict): + ''' + Clears the history for all active harvest sources. All jobs and objects related to a harvest source will + be cleared, but keeps the source itself. + This is useful to clean history of long running harvest sources to start again fresh. + The datasets imported from the harvest source will NOT be deleted!!! + + ''' + check_access('harvest_sources_clear', context, data_dict) + + job_history_clear_results = [] + # We assume that the maximum of 1000 (hard limit) rows should be enough + result = logic.get_action('package_search')(context, {'fq': '+dataset_type:harvest', 'rows': 1000}) + harvest_packages = result['results'] + if harvest_packages: + for data_dict in harvest_packages: + try: + clear_result = get_action('harvest_source_job_history_clear')(context, {'id': data_dict['id']}) + job_history_clear_results.append(clear_result) + except NotFound: + # Ignoring not existent harvest sources because of a possibly corrupt search index + # Logging was already done in called function + pass + + return job_history_clear_results + + +def harvest_source_job_history_clear(context, data_dict): + ''' + Clears all jobs and objects related to a harvest source, but keeps the source itself. + This is useful to clean history of long running harvest sources to start again fresh. + The datasets imported from the harvest source will NOT be deleted!!! + + :param id: the id of the harvest source to clear + :type id: string + + ''' + check_access('harvest_source_clear', context, data_dict) + + harvest_source_id = data_dict.get('id', None) + + source = HarvestSource.get(harvest_source_id) + if not source: + log.error('Harvest source %s does not exist', harvest_source_id) + raise NotFound('Harvest source %s does not exist' % harvest_source_id) + + harvest_source_id = source.id + + model = context['model'] + + sql = '''begin; + delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); + delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); + delete from harvest_object where harvest_source_id = '{harvest_source_id}'; + delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}'); + delete from harvest_job where source_id = '{harvest_source_id}'; + commit; + '''.format(harvest_source_id=harvest_source_id) + + model.Session.execute(sql) + + # Refresh the index for this source to update the status object + get_action('harvest_source_reindex')(context, {'id': harvest_source_id}) + + return {'id': harvest_source_id} + + def harvest_source_index_clear(context, data_dict): ''' Clears all datasets, jobs and objects related to a harvest source, but diff --git a/ckanext/harvest/logic/auth/update.py b/ckanext/harvest/logic/auth/update.py index 2bd70b9..3a4a75e 100644 --- a/ckanext/harvest/logic/auth/update.py +++ b/ckanext/harvest/logic/auth/update.py @@ -27,6 +27,17 @@ def harvest_source_update(context, data_dict): return {'success': False, 'msg': pt._('User {0} not authorized to update harvest source {1}').format(user, source_id)} +def harvest_sources_clear(context, data_dict): + ''' + Authorization check for clearing history for all harvest sources + + Only sysadmins can do it + ''' + if not user_is_sysadmin(context): + return {'success': False, 'msg': pt._('Only sysadmins can clear history for all harvest jobs')} + else: + return {'success': True} + def harvest_source_clear(context, data_dict): ''' Authorization check for clearing a harvest source diff --git a/ckanext/harvest/tests/test_action.py b/ckanext/harvest/tests/test_action.py index 79e91ca..9cc407d 100644 --- a/ckanext/harvest/tests/test_action.py +++ b/ckanext/harvest/tests/test_action.py @@ -354,7 +354,7 @@ class TestHarvestSourceActionPatch(HarvestSourceFixtureMixin, class TestActions(ActionBase): def test_harvest_source_clear(self): - source = factories.HarvestSourceObj(**SOURCE_DICT) + source = factories.HarvestSourceObj(**SOURCE_DICT.copy()) job = factories.HarvestJobObj(source=source) dataset = ckan_factories.Dataset() object_ = factories.HarvestObjectObj(job=job, source=source, @@ -372,50 +372,106 @@ class TestActions(ActionBase): assert_equal(harvest_model.HarvestObject.get(object_.id), None) assert_equal(model.Package.get(dataset['id']), None) + def test_harvest_source_job_history_clear(self): + # prepare + source = factories.HarvestSourceObj(**SOURCE_DICT.copy()) + job = factories.HarvestJobObj(source=source) + dataset = ckan_factories.Dataset() + object_ = factories.HarvestObjectObj(job=job, source=source, + package_id=dataset['id']) + + # execute + context = {'model': model, 'session': model.Session, + 'ignore_auth': True, 'user': ''} + result = toolkit.get_action('harvest_source_job_history_clear')( + context, {'id': source.id}) + + # verify + assert_equal(result, {'id': source.id}) + source = harvest_model.HarvestSource.get(source.id) + assert source + assert_equal(harvest_model.HarvestJob.get(job.id), None) + assert_equal(harvest_model.HarvestObject.get(object_.id), None) + dataset_from_db = model.Package.get(dataset['id']) + assert dataset_from_db, 'is None' + assert_equal(dataset_from_db.id, dataset['id']) + + def test_harvest_sources_job_history_clear(self): + # prepare + data_dict = SOURCE_DICT.copy() + source_1 = factories.HarvestSourceObj(**data_dict) + data_dict['name'] = 'another-source' + data_dict['url'] = 'http://another-url' + source_2 = factories.HarvestSourceObj(**data_dict) + + job_1 = factories.HarvestJobObj(source=source_1) + dataset_1 = ckan_factories.Dataset() + object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1, + package_id=dataset_1['id']) + job_2 = factories.HarvestJobObj(source=source_2) + dataset_2 = ckan_factories.Dataset() + object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2, + package_id=dataset_2['id']) + + # execute + context = {'model': model, 'session': model.Session, + 'ignore_auth': True, 'user': ''} + result = toolkit.get_action('harvest_sources_job_history_clear')( + context, {}) + + # verify + assert_equal( + sorted(result), + sorted([{'id': source_1.id}, {'id': source_2.id}])) + source_1 = harvest_model.HarvestSource.get(source_1.id) + assert source_1 + assert_equal(harvest_model.HarvestJob.get(job_1.id), None) + assert_equal(harvest_model.HarvestObject.get(object_1_.id), None) + dataset_from_db_1 = model.Package.get(dataset_1['id']) + assert dataset_from_db_1, 'is None' + assert_equal(dataset_from_db_1.id, dataset_1['id']) + source_2 = harvest_model.HarvestSource.get(source_1.id) + assert source_2 + assert_equal(harvest_model.HarvestJob.get(job_2.id), None) + assert_equal(harvest_model.HarvestObject.get(object_2_.id), None) + dataset_from_db_2 = model.Package.get(dataset_2['id']) + assert dataset_from_db_2, 'is None' + assert_equal(dataset_from_db_2.id, dataset_2['id']) + def test_harvest_source_create_twice_with_unique_url(self): - # don't use factory because it looks for the existing source - data_dict = SOURCE_DICT + data_dict = SOURCE_DICT.copy() + factories.HarvestSourceObj(**data_dict) site_user = toolkit.get_action('get_site_user')( {'model': model, 'ignore_auth': True}, {})['name'] - - toolkit.get_action('harvest_source_create')( - {'user': site_user}, data_dict) - - data_dict['name'] = 'another-source1' + data_dict['name'] = 'another-source' data_dict['url'] = 'http://another-url' toolkit.get_action('harvest_source_create')( {'user': site_user}, data_dict) def test_harvest_source_create_twice_with_same_url(self): - # don't use factory because it looks for the existing source - data_dict = SOURCE_DICT + data_dict = SOURCE_DICT.copy() + factories.HarvestSourceObj(**data_dict) + site_user = toolkit.get_action('get_site_user')( {'model': model, 'ignore_auth': True}, {})['name'] - - toolkit.get_action('harvest_source_create')( - {'user': site_user}, data_dict) - - data_dict['name'] = 'another-source2' + data_dict['name'] = 'another-source' assert_raises(toolkit.ValidationError, toolkit.get_action('harvest_source_create'), {'user': site_user}, data_dict) def test_harvest_source_create_twice_with_unique_url_and_config(self): - # don't use factory because it looks for the existing source - data_dict = SOURCE_DICT + data_dict = SOURCE_DICT.copy() + factories.HarvestSourceObj(**data_dict) + site_user = toolkit.get_action('get_site_user')( {'model': model, 'ignore_auth': True}, {})['name'] - - toolkit.get_action('harvest_source_create')( - {'user': site_user}, data_dict) - - data_dict['name'] = 'another-source3' + data_dict['name'] = 'another-source' data_dict['config'] = '{"something": "new"}' toolkit.get_action('harvest_source_create')( {'user': site_user}, data_dict) def test_harvest_job_create_as_sysadmin(self): - source = factories.HarvestSource(**SOURCE_DICT) + source = factories.HarvestSource(**SOURCE_DICT.copy()) site_user = toolkit.get_action('get_site_user')( {'model': model, 'ignore_auth': True}, {})['name'] @@ -538,4 +594,4 @@ class TestHarvestDBLog(unittest.TestCase): per_page = 1 data = toolkit.get_action('harvest_log_list')(context, {'level': 'info', 'per_page': per_page}) self.assertEqual(len(data), per_page) - self.assertEqual(data[0]['level'], 'INFO') \ No newline at end of file + self.assertEqual(data[0]['level'], 'INFO')