Merge pull request #268 from GovDataOfficial/clearsource-history-command

Add clearsource history command
This commit is contained in:
Adrià Mercader 2016-11-23 14:35:45 +02:00 committed by GitHub
commit 3836fcfd65
6 changed files with 203 additions and 27 deletions

View File

@ -3,4 +3,5 @@ Changelog
v0.0.6 `2016-??-??`
-------------------
Includes i18n directory in package.
- Includes i18n directory in package.
- Adds a new `clearsource_history` command/operation.

View File

@ -189,6 +189,14 @@ The following operations can be run from the command line as described underneat
- clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself
harvester clearsource_history [{source-id}]
- If no source id is given the history for all harvest sources (maximum is 1000)
will be cleared.
Clears all jobs and objects related to a harvest source, but keeps the source
itself. The datasets imported from the harvest source will **NOT** be deleted!!!
If a source id is given, it only clears the history of the harvest source with
the given source id.
harvester sources [all]
- lists harvest sources
If 'all' is defined, it also shows the Inactive sources

View File

@ -29,6 +29,12 @@ class Harvester(CkanCommand):
- clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself
harvester clearsource_history [{source-id}]
- If no source id is given the history for all harvest sources (maximum is 1000) will be cleared.
Clears all jobs and objects related to a harvest source, but keeps the source itself.
The datasets imported from the harvest source will NOT be deleted!!!
If a source id is given, it only clears the history of the harvest source with the given source id.
harvester sources [all]
- lists harvest sources
If 'all' is defined, it also shows the Inactive sources
@ -153,6 +159,8 @@ class Harvester(CkanCommand):
self.remove_harvest_source()
elif cmd == 'clearsource':
self.clear_harvest_source()
elif cmd == 'clearsource_history':
self.clear_harvest_source_history()
elif cmd == 'sources':
self.list_harvest_sources()
elif cmd == 'job':
@ -182,8 +190,7 @@ class Harvester(CkanCommand):
for method, header, body in consumer.consume(queue=get_fetch_queue_name()):
fetch_callback(consumer, method, header, body)
elif cmd == 'purge_queues':
from ckanext.harvest.queue import purge_queues
purge_queues()
self.purge_queues()
elif cmd == 'initdb':
self.initdb()
elif cmd == 'import':
@ -288,6 +295,29 @@ class Harvester(CkanCommand):
print str(e.error_dict)
raise e
def clear_harvest_source_history(self):
source_id = None
if len(self.args) >= 2:
source_id = unicode(self.args[1])
context = {
'model': model,
'user': self.admin_user['name'],
'session': model.Session
}
if source_id is not None:
get_action('harvest_source_job_history_clear')(context,{'id':source_id})
print 'Cleared job history of harvest source: %s' % source_id
else:
'''
Purge queues, because we clean all harvest jobs and
objects in the database.
'''
self.purge_queues()
cleared_sources_dicts = get_action('harvest_sources_job_history_clear')(context,{})
print 'Cleared job history for all harvest sources: %s source(s)' % len(cleared_sources_dicts)
def show_harvest_source(self):
if len(self.args) >= 2:
@ -465,6 +495,9 @@ class Harvester(CkanCommand):
context = {'model': model, 'user': self.admin_user['name']}
get_action('harvest_sources_reindex')(context,{})
def purge_queues(self):
from ckanext.harvest.queue import purge_queues
purge_queues()
def print_harvest_sources(self, sources):
if sources:

View File

@ -225,6 +225,73 @@ def harvest_source_clear(context, data_dict):
return {'id': harvest_source_id}
def harvest_sources_job_history_clear(context, data_dict):
'''
Clears the history for all active harvest sources. All jobs and objects related to a harvest source will
be cleared, but keeps the source itself.
This is useful to clean history of long running harvest sources to start again fresh.
The datasets imported from the harvest source will NOT be deleted!!!
'''
check_access('harvest_sources_clear', context, data_dict)
job_history_clear_results = []
# We assume that the maximum of 1000 (hard limit) rows should be enough
result = logic.get_action('package_search')(context, {'fq': '+dataset_type:harvest', 'rows': 1000})
harvest_packages = result['results']
if harvest_packages:
for data_dict in harvest_packages:
try:
clear_result = get_action('harvest_source_job_history_clear')(context, {'id': data_dict['id']})
job_history_clear_results.append(clear_result)
except NotFound:
# Ignoring not existent harvest sources because of a possibly corrupt search index
# Logging was already done in called function
pass
return job_history_clear_results
def harvest_source_job_history_clear(context, data_dict):
'''
Clears all jobs and objects related to a harvest source, but keeps the source itself.
This is useful to clean history of long running harvest sources to start again fresh.
The datasets imported from the harvest source will NOT be deleted!!!
:param id: the id of the harvest source to clear
:type id: string
'''
check_access('harvest_source_clear', context, data_dict)
harvest_source_id = data_dict.get('id', None)
source = HarvestSource.get(harvest_source_id)
if not source:
log.error('Harvest source %s does not exist', harvest_source_id)
raise NotFound('Harvest source %s does not exist' % harvest_source_id)
harvest_source_id = source.id
model = context['model']
sql = '''begin;
delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
commit;
'''.format(harvest_source_id=harvest_source_id)
model.Session.execute(sql)
# Refresh the index for this source to update the status object
get_action('harvest_source_reindex')(context, {'id': harvest_source_id})
return {'id': harvest_source_id}
def harvest_source_index_clear(context, data_dict):
'''
Clears all datasets, jobs and objects related to a harvest source, but

View File

@ -27,6 +27,17 @@ def harvest_source_update(context, data_dict):
return {'success': False,
'msg': pt._('User {0} not authorized to update harvest source {1}').format(user, source_id)}
def harvest_sources_clear(context, data_dict):
'''
Authorization check for clearing history for all harvest sources
Only sysadmins can do it
'''
if not user_is_sysadmin(context):
return {'success': False, 'msg': pt._('Only sysadmins can clear history for all harvest jobs')}
else:
return {'success': True}
def harvest_source_clear(context, data_dict):
'''
Authorization check for clearing a harvest source

View File

@ -354,7 +354,7 @@ class TestHarvestSourceActionPatch(HarvestSourceFixtureMixin,
class TestActions(ActionBase):
def test_harvest_source_clear(self):
source = factories.HarvestSourceObj(**SOURCE_DICT)
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
@ -372,50 +372,106 @@ class TestActions(ActionBase):
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
assert_equal(model.Package.get(dataset['id']), None)
def test_harvest_source_job_history_clear(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_job_history_clear')(
context, {'id': source.id})
# verify
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db, 'is None'
assert_equal(dataset_from_db.id, dataset['id'])
def test_harvest_sources_job_history_clear(self):
# prepare
data_dict = SOURCE_DICT.copy()
source_1 = factories.HarvestSourceObj(**data_dict)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)
job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])
job_2 = factories.HarvestJobObj(source=source_2)
dataset_2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=dataset_2['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_sources_job_history_clear')(
context, {})
# verify
assert_equal(
sorted(result),
sorted([{'id': source_1.id}, {'id': source_2.id}]))
source_1 = harvest_model.HarvestSource.get(source_1.id)
assert source_1
assert_equal(harvest_model.HarvestJob.get(job_1.id), None)
assert_equal(harvest_model.HarvestObject.get(object_1_.id), None)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1, 'is None'
assert_equal(dataset_from_db_1.id, dataset_1['id'])
source_2 = harvest_model.HarvestSource.get(source_1.id)
assert source_2
assert_equal(harvest_model.HarvestJob.get(job_2.id), None)
assert_equal(harvest_model.HarvestObject.get(object_2_.id), None)
dataset_from_db_2 = model.Package.get(dataset_2['id'])
assert dataset_from_db_2, 'is None'
assert_equal(dataset_from_db_2.id, dataset_2['id'])
def test_harvest_source_create_twice_with_unique_url(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
data_dict['name'] = 'another-source1'
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_same_url(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
data_dict['name'] = 'another-source2'
data_dict['name'] = 'another-source'
assert_raises(toolkit.ValidationError,
toolkit.get_action('harvest_source_create'),
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_unique_url_and_config(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
data_dict['name'] = 'another-source3'
data_dict['name'] = 'another-source'
data_dict['config'] = '{"something": "new"}'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_job_create_as_sysadmin(self):
source = factories.HarvestSource(**SOURCE_DICT)
source = factories.HarvestSource(**SOURCE_DICT.copy())
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
@ -538,4 +594,4 @@ class TestHarvestDBLog(unittest.TestCase):
per_page = 1
data = toolkit.get_action('harvest_log_list')(context, {'level': 'info', 'per_page': per_page})
self.assertEqual(len(data), per_page)
self.assertEqual(data[0]['level'], 'INFO')
self.assertEqual(data[0]['level'], 'INFO')