write pytests
This commit is contained in:
parent
6c70200a6e
commit
556136d71d
|
@ -315,6 +315,185 @@ class TestActions():
|
|||
assert dataset_from_db_2, 'is None'
|
||||
assert dataset_from_db_2.id == dataset_2['id']
|
||||
|
||||
def test_harvest_abort_failed_jobs_without_failed_jobs(self):
|
||||
# prepare
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
source = factories.HarvestSourceObj(**data_dict)
|
||||
job = factories.HarvestJobObj(source=source)
|
||||
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'ignore_auth': True, 'user': ''}
|
||||
result = get_action('harvest_abort_failed_jobs')(
|
||||
context, {'life_span': 3})
|
||||
|
||||
job = harvest_model.HarvestJob.get(job.id)
|
||||
|
||||
assert job.status == 'New'
|
||||
assert job.source_id == source.id
|
||||
assert result == 'There is no jobs to abort'
|
||||
|
||||
def test_harvest_abort_failed_jobs_with_failed_jobs(self):
|
||||
# prepare
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
source = factories.HarvestSourceObj(**data_dict)
|
||||
job = factories.HarvestJobObj(source=source)
|
||||
|
||||
# Simulate running job created 4 days ago
|
||||
setattr(job, 'status', 'Running')
|
||||
setattr(job, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
|
||||
|
||||
model.Session.commit()
|
||||
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'ignore_auth': True, 'user': ''}
|
||||
result = get_action('harvest_abort_failed_jobs')(
|
||||
context, {'life_span': 3})
|
||||
|
||||
job = harvest_model.HarvestJob.get(job.id)
|
||||
|
||||
assert job.status in ('Finished', 'Aborted')
|
||||
assert job.source_id == source.id
|
||||
assert 'Aborted jobs: 1' in result
|
||||
|
||||
def test_harvest_abort_failed_jobs_with_include_source(self):
|
||||
# prepare
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
source1 = factories.HarvestSourceObj(**data_dict)
|
||||
job1 = factories.HarvestJobObj(source=source1)
|
||||
|
||||
data_dict['name'] = 'another-source'
|
||||
data_dict['url'] = 'http://another-url'
|
||||
source2 = factories.HarvestSourceObj(**data_dict)
|
||||
job2 = factories.HarvestJobObj(source=source2)
|
||||
|
||||
# Simulate running job created 4 and 5 days ago
|
||||
setattr(job1, 'status', 'Running')
|
||||
setattr(job1, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
|
||||
setattr(job2, 'status', 'Running')
|
||||
setattr(job2, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=5))
|
||||
|
||||
model.Session.commit()
|
||||
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'ignore_auth': True, 'user': ''}
|
||||
|
||||
# include first source with failed job so only job1 will be aborted
|
||||
result = get_action('harvest_abort_failed_jobs')(
|
||||
context, {'life_span': 3, 'include': source1.id})
|
||||
|
||||
job1 = harvest_model.HarvestJob.get(job1.id)
|
||||
job2 = harvest_model.HarvestJob.get(job2.id)
|
||||
|
||||
assert job1.status in ('Finished', 'Aborted')
|
||||
assert job1.source_id == source1.id
|
||||
assert job2.status in 'Running'
|
||||
assert job2.source_id == source2.id
|
||||
assert 'Aborted jobs: 1' in result
|
||||
|
||||
def test_harvest_abort_failed_jobs_with_exclude_source(self):
|
||||
# prepare
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
source1 = factories.HarvestSourceObj(**data_dict)
|
||||
job1 = factories.HarvestJobObj(source=source1)
|
||||
|
||||
data_dict['name'] = 'another-source'
|
||||
data_dict['url'] = 'http://another-url'
|
||||
source2 = factories.HarvestSourceObj(**data_dict)
|
||||
job2 = factories.HarvestJobObj(source=source2)
|
||||
|
||||
# Simulate running job created 4 and 5 days ago
|
||||
setattr(job1, 'status', 'Running')
|
||||
setattr(job1, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
|
||||
setattr(job2, 'status', 'Running')
|
||||
setattr(job2, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=5))
|
||||
|
||||
model.Session.commit()
|
||||
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'ignore_auth': True, 'user': ''}
|
||||
|
||||
# exclude first source with failed job so it's must still be Running
|
||||
result = get_action('harvest_abort_failed_jobs')(
|
||||
context, {'life_span': 3, 'exclude': source1.id})
|
||||
|
||||
job1 = harvest_model.HarvestJob.get(job1.id)
|
||||
job2 = harvest_model.HarvestJob.get(job2.id)
|
||||
|
||||
assert job1.status == 'Running'
|
||||
assert job1.source_id == source1.id
|
||||
assert job2.status in ('Finished', 'Aborted')
|
||||
assert job2.source_id == source2.id
|
||||
assert 'Aborted jobs: 1' in result
|
||||
|
||||
def test_harvest_abort_failed_jobs_with_include_and_exclude(self):
|
||||
# prepare
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
source = factories.HarvestSourceObj(**data_dict)
|
||||
job = factories.HarvestJobObj(source=source)
|
||||
|
||||
# Simulate running job created 4 days ago
|
||||
setattr(job, 'status', 'Running')
|
||||
setattr(job, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
|
||||
|
||||
model.Session.commit()
|
||||
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'ignore_auth': True, 'user': ''}
|
||||
# include must prevaild over exclude
|
||||
result = get_action('harvest_abort_failed_jobs')(
|
||||
context, {'life_span': 3, 'exclude': source.id, 'include': source.id})
|
||||
|
||||
job = harvest_model.HarvestJob.get(job.id)
|
||||
|
||||
assert job.status in ('Finished', 'Aborted')
|
||||
assert job.source_id == source.id
|
||||
assert 'Aborted jobs: 1' in result
|
||||
|
||||
def test_harvest_abort_failed_jobs_with_source_frequency(self):
|
||||
# prepare
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
source = factories.HarvestSourceObj(**data_dict)
|
||||
job = factories.HarvestJobObj(source=source)
|
||||
|
||||
# Simulate running job created 4 days ago
|
||||
setattr(job, 'status', 'Running')
|
||||
setattr(job, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
|
||||
# set source update frequency to biweekly
|
||||
# job will be aborted if it's runs more then 2 weeks
|
||||
setattr(source, 'frequency', 'BIWEEKLY')
|
||||
model.Session.commit()
|
||||
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'ignore_auth': True, 'user': ''}
|
||||
result = get_action('harvest_abort_failed_jobs')(
|
||||
context, {'life_span': 3})
|
||||
|
||||
job = harvest_model.HarvestJob.get(job.id)
|
||||
|
||||
assert job.status == 'Running'
|
||||
assert job.source_id == source.id
|
||||
assert 'Aborted jobs: 0' in result
|
||||
|
||||
def test_harvest_abort_failed_jobs_with_unknown_frequency(self):
|
||||
# prepare
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
source = factories.HarvestSourceObj(**data_dict)
|
||||
job = factories.HarvestJobObj(source=source)
|
||||
|
||||
# Simulate running job created 4 days ago
|
||||
setattr(job, 'status', 'Running')
|
||||
setattr(job, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
|
||||
# set unknown update frequency
|
||||
setattr(source, 'frequency', 'YEARLY')
|
||||
model.Session.commit()
|
||||
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'ignore_auth': True, 'user': ''}
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
toolkit.get_action('harvest_abort_failed_jobs')(
|
||||
context, {'life_span': 3})
|
||||
|
||||
def test_harvest_source_create_twice_with_unique_url(self):
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
factories.HarvestSourceObj(**data_dict)
|
||||
|
|
Loading…
Reference in New Issue