Add option keep-actual to clearsource_history command

This commit is contained in:
seitenbau-govdata 2021-12-15 16:02:56 +01:00
parent d84d847b09
commit 6b238f8d74
5 changed files with 276 additions and 28 deletions

View File

@ -108,8 +108,13 @@ def clear(ctx, id):
@source.command()
@click.argument(u"id", metavar=u"SOURCE_ID_OR_NAME", required=False)
@click.option(
"-k",
"--keep-actual",
default=False
)
@click.pass_context
def clear_history(ctx, id):
def clear_history(ctx, id, keep_actual):
"""If no source id is given the history for all harvest sources
(maximum is 1000) will be cleared.
@ -122,7 +127,7 @@ def clear_history(ctx, id):
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
result = utils.clear_harvest_source_history(id)
result = utils.clear_harvest_source_history(id, bool(keep_actual))
click.secho(result, fg="green")

View File

@ -190,6 +190,14 @@ class Harvester(CkanCommand):
will be aborted. You can use comma as a separator to provide multiple source_id's""",
)
self.parser.add_option(
"-k",
"--keep-actual",
dest="keep_actual",
default=False,
help="Do not delete relevant harvest objects",
)
def command(self):
self._load_config()
@ -316,11 +324,12 @@ class Harvester(CkanCommand):
print(result)
def clear_harvest_source_history(self):
keep_actual = bool(self.options.keep_actual)
source_id = None
if len(self.args) >= 2:
source_id = six.text_type(self.args[1])
print(utils.clear_harvest_source_history(source_id))
print(utils.clear_harvest_source_history(source_id, keep_actual))
def show_harvest_source(self):

View File

@ -322,6 +322,8 @@ def harvest_sources_job_history_clear(context, data_dict):
'''
check_access('harvest_sources_clear', context, data_dict)
keep_actual = data_dict.get('keep_actual', False)
job_history_clear_results = []
# We assume that the maximum of 1000 (hard limit) rows should be enough
result = logic.get_action('package_search')(context, {'fq': '+dataset_type:harvest', 'rows': 1000})
@ -329,7 +331,8 @@ def harvest_sources_job_history_clear(context, data_dict):
if harvest_packages:
for data_dict in harvest_packages:
try:
clear_result = get_action('harvest_source_job_history_clear')(context, {'id': data_dict['id']})
clear_result = get_action('harvest_source_job_history_clear')(
context, {'id': data_dict['id'], 'keep_actual': keep_actual})
job_history_clear_results.append(clear_result)
except NotFound:
# Ignoring not existent harvest sources because of a possibly corrupt search index
@ -352,6 +355,7 @@ def harvest_source_job_history_clear(context, data_dict):
check_access('harvest_source_clear', context, data_dict)
harvest_source_id = data_dict.get('id', None)
keep_actual = data_dict.get('keep_actual', False)
source = HarvestSource.get(harvest_source_id)
if not source:
@ -362,17 +366,51 @@ def harvest_source_job_history_clear(context, data_dict):
model = context['model']
sql = '''begin;
delete from harvest_object_error where harvest_object_id
in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id
in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id
in (select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
commit;
'''.format(harvest_source_id=harvest_source_id)
if keep_actual:
sql = '''BEGIN;
DELETE FROM harvest_object_error WHERE harvest_object_id
IN (SELECT id FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true))
);
DELETE FROM harvest_object_extra WHERE harvest_object_id
IN (SELECT id FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true))
);
DELETE FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true));
DELETE FROM harvest_gather_error WHERE harvest_job_id
IN (SELECT id FROM harvest_job AS job WHERE source_id = '{harvest_source_id}'
AND job.status != 'Running'
AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id));
DELETE FROM harvest_job AS job WHERE source_id = '{harvest_source_id}'
AND job.status != 'Running'
AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id);
COMMIT;
'''.format(harvest_source_id=harvest_source_id)
else:
sql = '''BEGIN;
DELETE FROM harvest_object_error WHERE harvest_object_id
IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object_extra WHERE harvest_object_id
IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}';
DELETE FROM harvest_gather_error WHERE harvest_job_id
IN (SELECT id FROM harvest_job WHERE source_id = '{harvest_source_id}');
DELETE FROM harvest_job WHERE source_id = '{harvest_source_id}';
COMMIT;
'''.format(harvest_source_id=harvest_source_id)
model.Session.execute(sql)

View File

@ -258,6 +258,15 @@ class TestActions():
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
data_dict = SOURCE_DICT.copy()
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source2 = factories.HarvestSourceObj(**data_dict)
job2 = factories.HarvestJobObj(source=source2)
dataset2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job2, source=source2,
package_id=dataset2['id'])
# execute
context = {'session': model.Session,
'ignore_auth': True, 'user': ''}
@ -266,13 +275,19 @@ class TestActions():
# verify
assert result == {'id': source.id}
source = harvest_model.HarvestSource.get(source.id)
assert source
assert harvest_model.HarvestSource.get(source.id)
assert harvest_model.HarvestJob.get(job.id) is None
assert harvest_model.HarvestObject.get(object_.id) is None
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db, 'is None'
assert dataset_from_db
assert dataset_from_db.id == dataset['id']
# source2 and related objects are untouched
assert harvest_model.HarvestSource.get(source2.id)
assert harvest_model.HarvestJob.get(job2.id)
assert harvest_model.HarvestObject.get(object_2_.id)
dataset_from_db_2 = model.Package.get(dataset2['id'])
assert dataset_from_db_2
assert dataset_from_db_2.id == dataset2['id']
def test_harvest_sources_job_history_clear(self):
# prepare
@ -300,21 +315,198 @@ class TestActions():
# verify
assert sorted(result, key=lambda item: item['id']) == sorted(
[{'id': source_1.id}, {'id': source_2.id}], key=lambda item: item['id'])
source_1 = harvest_model.HarvestSource.get(source_1.id)
assert source_1
assert harvest_model.HarvestSource.get(source_1.id)
assert harvest_model.HarvestJob.get(job_1.id) is None
assert harvest_model.HarvestObject.get(object_1_.id) is None
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1, 'is None'
assert dataset_from_db_1
assert dataset_from_db_1.id == dataset_1['id']
source_2 = harvest_model.HarvestSource.get(source_1.id)
assert source_2
assert harvest_model.HarvestSource.get(source_2.id)
assert harvest_model.HarvestJob.get(job_2.id) is None
assert harvest_model.HarvestObject.get(object_2_.id) is None
dataset_from_db_2 = model.Package.get(dataset_2['id'])
assert dataset_from_db_2, 'is None'
assert dataset_from_db_2
assert dataset_from_db_2.id == dataset_2['id']
def test_harvest_sources_job_history_clear_keep_actual(self):
# prepare
data_dict = SOURCE_DICT.copy()
source_1 = factories.HarvestSourceObj(**data_dict)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)
job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])
job_2 = factories.HarvestJobObj(source=source_2)
# creating harvest_object with empty package_id
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=None)
setattr(object_1_, 'report_status', 'added')
setattr(object_1_, 'current', True)
model.Session.commit()
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_sources_job_history_clear')(
context, {'keep_actual': True})
# verify
assert sorted(result, key=lambda item: item['id']) == sorted(
[{'id': source_1.id}, {'id': source_2.id}], key=lambda item: item['id'])
# dataset, related source, object and job still persist!
assert harvest_model.HarvestSource.get(source_1.id)
assert harvest_model.HarvestJob.get(job_1.id)
assert harvest_model.HarvestObject.get(object_1_.id)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1
assert dataset_from_db_1.id == dataset_1['id']
# second source persist, but job and object was deleted
assert harvest_model.HarvestSource.get(source_2.id)
assert not harvest_model.HarvestJob.get(job_2.id)
assert not harvest_model.HarvestObject.get(object_2_.id)
def test_harvest_source_job_history_clear_keep_actual(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
data_dict = SOURCE_DICT.copy()
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source2 = factories.HarvestSourceObj(**data_dict)
job2 = factories.HarvestJobObj(source=source2)
dataset2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job2, source=source2,
package_id=dataset2['id'])
setattr(object_, 'report_status', 'added')
setattr(object_, 'current', True)
model.Session.commit()
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_actual': True})
# verify
assert result == {'id': source.id}
assert harvest_model.HarvestSource.get(source.id)
assert harvest_model.HarvestJob.get(job.id)
assert harvest_model.HarvestObject.get(object_.id)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db
assert dataset_from_db.id == dataset['id']
# source2 and related objects are untouched
assert harvest_model.HarvestSource.get(source2.id)
assert harvest_model.HarvestJob.get(job2.id)
assert harvest_model.HarvestObject.get(object_2_.id)
dataset_from_db_2 = model.Package.get(dataset2['id'])
assert dataset_from_db_2
assert dataset_from_db_2.id == dataset2['id']
def test_harvest_source_job_history_clear_keep_actual_finished_jobs(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
setattr(job, 'status', 'Finished')
setattr(job, 'finished', datetime.datetime.utcnow()-datetime.timedelta(days=2))
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
job2 = factories.HarvestJobObj(source=source)
setattr(job2, 'finished', datetime.datetime.utcnow()-datetime.timedelta(days=1))
setattr(job2, 'status', 'Finished')
dataset2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job2, source=source,
package_id=dataset2['id'])
setattr(object_2_, 'current', True)
model.Session.commit()
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_actual': True})
# verify
assert result == {'id': source.id}
assert harvest_model.HarvestSource.get(source.id)
assert not harvest_model.HarvestJob.get(job.id)
assert not harvest_model.HarvestObject.get(object_.id)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db
assert dataset_from_db.id == dataset['id']
# job2 and related objects are untouched
assert harvest_model.HarvestJob.get(job2.id)
assert harvest_model.HarvestObject.get(object_2_.id)
dataset_from_db_2 = model.Package.get(dataset2['id'])
assert dataset_from_db_2
assert dataset_from_db_2.id == dataset2['id']
def test_harvest_source_job_history_clear_keep_actual_running_job(self):
# Both jobs contain current objects
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job1 = factories.HarvestJobObj(source=source)
setattr(job1, 'status', 'Finished')
setattr(job1, 'finished', datetime.datetime.utcnow()-datetime.timedelta(days=1))
dataset1 = ckan_factories.Dataset()
dataset2 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job1, source=source,
package_id=dataset1['id'])
setattr(object_1_, 'current', False)
object_2_ = factories.HarvestObjectObj(job=job1, source=source,
package_id=dataset2['id'])
setattr(object_2_, 'current', True)
job2 = factories.HarvestJobObj(source=source)
setattr(job2, 'status', 'Running')
object_3_ = factories.HarvestObjectObj(job=job2, source=source,
package_id=dataset1['id'])
setattr(object_3_, 'current', True)
object_4_ = factories.HarvestObjectObj(job=job2, source=source,
package_id=dataset2['id'])
setattr(object_4_, 'current', False)
model.Session.commit()
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_actual': True})
# verify that both jobs still exists
assert result == {'id': source.id}
assert harvest_model.HarvestSource.get(source.id)
assert harvest_model.HarvestJob.get(job1.id)
assert harvest_model.HarvestObject.get(object_1_.id)
assert harvest_model.HarvestObject.get(object_2_.id)
dataset_from_db = model.Package.get(dataset1['id'])
assert dataset_from_db
assert dataset_from_db.id == dataset1['id']
assert harvest_model.HarvestJob.get(job2.id)
assert harvest_model.HarvestObject.get(object_3_.id)
assert harvest_model.HarvestObject.get(object_4_.id)
dataset_from_db_2 = model.Package.get(dataset2['id'])
assert dataset_from_db_2
assert dataset_from_db_2.id == dataset2['id']
def test_harvest_abort_failed_jobs_without_failed_jobs(self):
# prepare
data_dict = SOURCE_DICT.copy()

View File

@ -206,7 +206,7 @@ def clear_harvest_source(source_id_or_name):
tk.get_action("harvest_source_clear")(context, {"id": source["id"]})
def clear_harvest_source_history(source_id):
def clear_harvest_source_history(source_id, keep_actual):
context = {
"model": model,
@ -215,15 +215,19 @@ def clear_harvest_source_history(source_id):
}
if source_id is not None:
tk.get_action("harvest_source_job_history_clear")(context, {
"id": source_id
"id": source_id,
"keep_actual": keep_actual
})
return "Cleared job history of harvest source: {0}".format(source_id)
else:
# Purge queues, because we clean all harvest jobs and
# objects in the database.
purge_queues()
if not keep_actual:
purge_queues()
cleared_sources_dicts = tk.get_action(
"harvest_sources_job_history_clear")(context, {})
"harvest_sources_job_history_clear")(context, {
"keep_actual": keep_actual
})
return "Cleared job history for all harvest sources: {0} source(s)".format(
len(cleared_sources_dicts))