write nose tests

This commit is contained in:
calexandr 2020-03-26 15:27:51 +02:00
parent 02f9c60963
commit 6c70200a6e
1 changed files with 180 additions and 0 deletions

View File

@ -1,6 +1,7 @@
import json
import factories
import unittest
import datetime
from mock import patch
from nose.tools import assert_equal, assert_raises, assert_in
from nose.plugins.skip import SkipTest
@ -426,6 +427,185 @@ class TestActions(ActionBase):
assert dataset_from_db_2, 'is None'
assert_equal(dataset_from_db_2.id, dataset_2['id'])
def test_harvest_abort_failed_jobs_without_failed_jobs(self):
# prepare
data_dict = SOURCE_DICT.copy()
source = factories.HarvestSourceObj(**data_dict)
job = factories.HarvestJobObj(source=source)
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_abort_failed_jobs')(
context, {'life_span': 3})
job = harvest_model.HarvestJob.get(job.id)
assert_equal(job.status, 'New')
assert_equal(job.source_id, source.id)
assert_equal(result, 'There is no jobs to abort')
def test_harvest_abort_failed_jobs_with_failed_jobs(self):
# prepare
data_dict = SOURCE_DICT.copy()
source = factories.HarvestSourceObj(**data_dict)
job = factories.HarvestJobObj(source=source)
# Simulate running job created 4 days ago
setattr(job, 'status', 'Running')
setattr(job, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
model.Session.commit()
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_abort_failed_jobs')(
context, {'life_span': 3})
job = harvest_model.HarvestJob.get(job.id)
assert_in(job.status, ('Finished', 'Aborted'))
assert_equal(job.source_id, source.id)
assert_in('Aborted jobs: 1', result)
def test_harvest_abort_failed_jobs_with_include_source(self):
# prepare
data_dict = SOURCE_DICT.copy()
source1 = factories.HarvestSourceObj(**data_dict)
job1 = factories.HarvestJobObj(source=source1)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source2 = factories.HarvestSourceObj(**data_dict)
job2 = factories.HarvestJobObj(source=source2)
# Simulate running job created 4 and 5 days ago
setattr(job1, 'status', 'Running')
setattr(job1, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
setattr(job2, 'status', 'Running')
setattr(job2, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=5))
model.Session.commit()
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
# include first source with failed job so only job1 will be aborted
result = toolkit.get_action('harvest_abort_failed_jobs')(
context, {'life_span': 3, 'include': source1.id})
job1 = harvest_model.HarvestJob.get(job1.id)
job2 = harvest_model.HarvestJob.get(job2.id)
assert_in(job1.status, ('Finished', 'Aborted'))
assert_equal(job1.source_id, source1.id)
assert_equal(job2.status, 'Running')
assert_equal(job2.source_id, source2.id)
assert_in('Aborted jobs: 1', result)
def test_harvest_abort_failed_jobs_with_exclude_source(self):
# prepare
data_dict = SOURCE_DICT.copy()
source1 = factories.HarvestSourceObj(**data_dict)
job1 = factories.HarvestJobObj(source=source1)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source2 = factories.HarvestSourceObj(**data_dict)
job2 = factories.HarvestJobObj(source=source2)
# Simulate running job created 4 and 5 days ago
setattr(job1, 'status', 'Running')
setattr(job1, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
setattr(job2, 'status', 'Running')
setattr(job2, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=5))
model.Session.commit()
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
# exclude first source with failed job so it's must still be Running
result = toolkit.get_action('harvest_abort_failed_jobs')(
context, {'life_span': 3, 'exclude': source1.id})
job1 = harvest_model.HarvestJob.get(job1.id)
job2 = harvest_model.HarvestJob.get(job2.id)
assert_equal(job1.status, 'Running')
assert_equal(job1.source_id, source1.id)
assert_in(job2.status, ('Finished', 'Aborted'))
assert_equal(job2.source_id, source2.id)
assert_in('Aborted jobs: 1', result)
def test_harvest_abort_failed_jobs_with_include_and_exclude(self):
# prepare
data_dict = SOURCE_DICT.copy()
source = factories.HarvestSourceObj(**data_dict)
job = factories.HarvestJobObj(source=source)
# Simulate running job created 4 days ago
setattr(job, 'status', 'Running')
setattr(job, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
model.Session.commit()
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
# include must prevaild over exclude
result = toolkit.get_action('harvest_abort_failed_jobs')(
context, {'life_span': 3, 'exclude': source.id, 'include': source.id})
job = harvest_model.HarvestJob.get(job.id)
assert_in(job.status, ('Finished', 'Aborted'))
assert_equal(job.source_id, source.id)
assert_in('Aborted jobs: 1', result)
def test_harvest_abort_failed_jobs_with_source_frequency(self):
# prepare
data_dict = SOURCE_DICT.copy()
source = factories.HarvestSourceObj(**data_dict)
job = factories.HarvestJobObj(source=source)
# Simulate running job created 4 days ago
setattr(job, 'status', 'Running')
setattr(job, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
# set source update frequency to biweekly
# job will be aborted if it's runs more then 2 weeks
setattr(source, 'frequency', 'BIWEEKLY')
model.Session.commit()
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_abort_failed_jobs')(
context, {'life_span': 3})
job = harvest_model.HarvestJob.get(job.id)
assert_equal(job.status, 'Running')
assert_equal(job.source_id, source.id)
assert_in('Aborted jobs: 0', result)
def test_harvest_abort_failed_jobs_with_unknown_frequency(self):
# prepare
data_dict = SOURCE_DICT.copy()
source = factories.HarvestSourceObj(**data_dict)
job = factories.HarvestJobObj(source=source)
# Simulate running job created 4 days ago
setattr(job, 'status', 'Running')
setattr(job, 'created', datetime.datetime.utcnow()-datetime.timedelta(days=4))
# set unknown update frequency
setattr(source, 'frequency', 'YEARLY')
model.Session.commit()
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
assert_raises(Exception,
toolkit.get_action('harvest_abort_failed_jobs'),
context, {'life_span': 3})
def test_harvest_source_create_twice_with_unique_url(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)