Re-add deleted old nose tests

This commit is contained in:
amercader 2020-02-14 13:54:27 +01:00
parent caa2c71526
commit d89815d664
4 changed files with 1429 additions and 0 deletions

View File

@ -0,0 +1,94 @@
import factory
import ckanext.harvest.model as harvest_model
from ckantoolkit.tests.factories import _get_action_user_name
from ckan.plugins import toolkit
class HarvestSource(factory.Factory):
FACTORY_FOR = harvest_model.HarvestSource
_return_type = 'dict'
name = factory.Sequence(lambda n: 'test_source_{n}'.format(n=n))
title = factory.Sequence(lambda n: 'test title {n}'.format(n=n))
url = factory.Sequence(lambda n: 'http://{n}.test.com'.format(n=n))
source_type = 'test' # defined in test_queue.py
id = '{0}_id'.format(name).lower()
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
# If there is an existing source for this URL, and we can't create
# another source with that URL, just return the original one.
try:
source_dict = toolkit.get_action('harvest_source_show')(
context, dict(url=kwargs['url']))
except toolkit.ObjectNotFound:
source_dict = toolkit.get_action('harvest_source_create')(
context, kwargs)
if cls._return_type == 'dict':
return source_dict
else:
return cls.FACTORY_FOR.get(source_dict['id'])
class HarvestSourceObj(HarvestSource):
_return_type = 'obj'
class HarvestJob(factory.Factory):
FACTORY_FOR = harvest_model.HarvestJob
_return_type = 'dict'
source = factory.SubFactory(HarvestSourceObj)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
if 'source_id' not in kwargs:
kwargs['source_id'] = kwargs['source'].id
if 'run' not in kwargs:
kwargs['run'] = False
job_dict = toolkit.get_action('harvest_job_create')(
context, kwargs)
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
class HarvestJobObj(HarvestJob):
_return_type = 'obj'
class HarvestObject(factory.Factory):
FACTORY_FOR = harvest_model.HarvestObject
_return_type = 'dict'
# source = factory.SubFactory(HarvestSourceObj)
job = factory.SubFactory(HarvestJobObj)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
if 'job_id' not in kwargs:
kwargs['job_id'] = kwargs['job'].id
kwargs['source_id'] = kwargs['job'].source.id
# Remove 'job' to avoid it getting added as a HarvestObjectExtra
if 'job' in kwargs:
kwargs.pop('job')
job_dict = toolkit.get_action('harvest_object_create')(
context, kwargs)
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
class HarvestObjectObj(HarvestObject):
_return_type = 'obj'

View File

@ -0,0 +1,807 @@
import json
import factories
import unittest
from mock import patch
from nose.tools import assert_equal, assert_raises, assert_in
from nose.plugins.skip import SkipTest
from ckantoolkit.tests import factories as ckan_factories
from ckantoolkit.tests.helpers import _get_test_app, reset_db, FunctionalTestBase
from ckan import plugins as p
from ckan.plugins import toolkit
from ckan import model
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestGatherError, HarvestObjectError, HarvestObject, HarvestJob
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.logic.action.update import send_error_mail
def call_action_api(action, apikey=None, status=200, **kwargs):
'''POST an HTTP request to the CKAN API and return the result.
Any additional keyword arguments that you pass to this function as **kwargs
are posted as params to the API.
Usage:
package_dict = call_action_api('package_create', apikey=apikey,
name='my_package')
assert package_dict['name'] == 'my_package'
num_followers = post(app, 'user_follower_count', id='annafan')
If you are expecting an error from the API and want to check the contents
of the error dict, you have to use the status param otherwise an exception
will be raised:
error_dict = call_action_api('group_activity_list', status=403,
id='invalid_id')
assert error_dict['message'] == 'Access Denied'
:param action: the action to post to, e.g. 'package_create'
:type action: string
:param apikey: the API key to put in the Authorization header of the post
(optional, default: None)
:type apikey: string
:param status: the HTTP status code expected in the response from the CKAN
API, e.g. 403, if a different status code is received an exception will
be raised (optional, default: 200)
:type status: int
:param **kwargs: any other keyword arguments passed to this function will
be posted to the API as params
:raises paste.fixture.AppError: if the HTTP status code of the response
from the CKAN API is different from the status param passed to this
function
:returns: the 'result' or 'error' dictionary from the CKAN API response
:rtype: dictionary
'''
params = json.dumps(kwargs)
app = _get_test_app()
response = app.post('/api/action/{0}'.format(action), params=params,
extra_environ={'Authorization': str(apikey)},
status=status)
if status in (200,):
assert response.json['success'] is True
return response.json['result']
else:
assert response.json['success'] is False
return response.json['error']
class MockHarvesterForActionTests(p.SingletonPlugin):
p.implements(IHarvester)
def info(self):
return {'name': 'test-for-action',
'title': 'Test for action',
'description': 'test'}
def validate_config(self, config):
if not config:
return config
try:
config_obj = json.loads(config)
if 'custom_option' in config_obj:
if not isinstance(config_obj['custom_option'], list):
raise ValueError('custom_option must be a list')
except ValueError, e:
raise e
return config
def gather_stage(self, harvest_job):
return []
def fetch_stage(self, harvest_object):
return True
def import_stage(self, harvest_object):
return True
SOURCE_DICT = {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
class ActionBase(object):
@classmethod
def setup_class(cls):
if not p.plugin_loaded('test_action_harvester'):
p.load('test_action_harvester')
def setup(self):
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
p.unload('test_action_harvester')
class HarvestSourceActionBase(FunctionalTestBase):
@classmethod
def setup_class(cls):
super(HarvestSourceActionBase, cls).setup_class()
harvest_model.setup()
if not p.plugin_loaded('test_action_harvester'):
p.load('test_action_harvester')
@classmethod
def teardown_class(cls):
super(HarvestSourceActionBase, cls).teardown_class()
p.unload('test_action_harvester')
def _get_source_dict(self):
return {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
def test_invalid_missing_values(self):
source_dict = {}
test_data = self._get_source_dict()
if 'id' in test_data:
source_dict['id'] = test_data['id']
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
for key in ('name', 'title', 'url', 'source_type'):
assert_equal(result[key], [u'Missing value'])
def test_invalid_unknown_type(self):
source_dict = self._get_source_dict()
source_dict['source_type'] = 'unknown'
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'source_type' in result
assert u'Unknown harvester type' in result['source_type'][0]
def test_invalid_unknown_frequency(self):
wrong_frequency = 'ANNUALLY'
source_dict = self._get_source_dict()
source_dict['frequency'] = wrong_frequency
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'frequency' in result
assert u'Frequency {0} not recognised'.format(wrong_frequency) in result['frequency'][0]
def test_invalid_wrong_configuration(self):
source_dict = self._get_source_dict()
source_dict['config'] = 'not_json'
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'config' in result
assert u'Error parsing the configuration options: No JSON object could be decoded' in result['config'][0]
source_dict['config'] = json.dumps({'custom_option': 'not_a_list'})
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'config' in result
assert u'Error parsing the configuration options: custom_option must be a list' in result['config'][0]
class TestHarvestSourceActionCreate(HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_create'
def test_create(self):
source_dict = self._get_source_dict()
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_create',
apikey=sysadmin['apikey'], **source_dict)
for key in source_dict.keys():
assert_equal(source_dict[key], result[key])
# Check that source was actually created
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
# Trying to create a source with the same URL fails
source_dict = self._get_source_dict()
source_dict['name'] = 'test-source-action-new'
result = call_action_api('harvest_source_create',
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'url' in result
assert u'There already is a Harvest Source for this URL' in result['url'][0]
class HarvestSourceFixtureMixin(object):
def _get_source_dict(self):
'''Not only returns a source_dict, but creates the HarvestSource object
as well - suitable for testing update actions.
'''
source = HarvestSourceActionBase._get_source_dict(self)
source = factories.HarvestSource(**source)
# delete status because it gets in the way of the status supplied to
# call_action_api later on. It is only a generated value, not affecting
# the update/patch anyway.
del source['status']
return source
class TestHarvestSourceActionUpdate(HarvestSourceFixtureMixin,
HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_update'
def test_update(self):
source_dict = self._get_source_dict()
source_dict.update({
"url": "http://test.action.updated.com",
"name": "test-source-action-updated",
"title": "Test source action updated",
"notes": "Test source action desc updated",
"source_type": "test",
"frequency": "MONTHLY",
"config": json.dumps({"custom_option": ["c", "d"]})
})
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_update',
apikey=sysadmin['apikey'], **source_dict)
for key in set(('url', 'name', 'title', 'notes', 'source_type',
'frequency', 'config')):
assert_equal(source_dict[key], result[key], "Key: %s" % key)
# Check that source was actually updated
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
class TestHarvestSourceActionPatch(HarvestSourceFixtureMixin,
HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_patch'
if toolkit.check_ckan_version(max_version='2.2.99'):
# harvest_source_patch only came in with ckan 2.3
raise SkipTest()
def test_invalid_missing_values(self):
pass
def test_patch(self):
source_dict = self._get_source_dict()
patch_dict = {
"id": source_dict['id'],
"name": "test-source-action-patched",
"url": "http://test.action.patched.com",
"config": json.dumps({"custom_option": ["pat", "ched"]})
}
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_patch',
apikey=sysadmin['apikey'], **patch_dict)
source_dict.update(patch_dict)
for key in set(('url', 'name', 'title', 'notes', 'source_type',
'frequency', 'config')):
assert_equal(source_dict[key], result[key], "Key: %s" % key)
# Check that source was actually updated
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
class TestActions(ActionBase):
def test_harvest_source_clear(self):
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_clear')(
context, {'id': source.id})
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
assert_equal(model.Package.get(dataset['id']), None)
def test_harvest_source_job_history_clear(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_job_history_clear')(
context, {'id': source.id})
# verify
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db, 'is None'
assert_equal(dataset_from_db.id, dataset['id'])
def test_harvest_sources_job_history_clear(self):
# prepare
data_dict = SOURCE_DICT.copy()
source_1 = factories.HarvestSourceObj(**data_dict)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)
job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])
job_2 = factories.HarvestJobObj(source=source_2)
dataset_2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=dataset_2['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_sources_job_history_clear')(
context, {})
# verify
assert_equal(
sorted(result),
sorted([{'id': source_1.id}, {'id': source_2.id}]))
source_1 = harvest_model.HarvestSource.get(source_1.id)
assert source_1
assert_equal(harvest_model.HarvestJob.get(job_1.id), None)
assert_equal(harvest_model.HarvestObject.get(object_1_.id), None)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1, 'is None'
assert_equal(dataset_from_db_1.id, dataset_1['id'])
source_2 = harvest_model.HarvestSource.get(source_1.id)
assert source_2
assert_equal(harvest_model.HarvestJob.get(job_2.id), None)
assert_equal(harvest_model.HarvestObject.get(object_2_.id), None)
dataset_from_db_2 = model.Package.get(dataset_2['id'])
assert dataset_from_db_2, 'is None'
assert_equal(dataset_from_db_2.id, dataset_2['id'])
def test_harvest_source_create_twice_with_unique_url(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_same_url(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
assert_raises(toolkit.ValidationError,
toolkit.get_action('harvest_source_create'),
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_unique_url_and_config(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
data_dict['config'] = '{"something": "new"}'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_job_create_as_sysadmin(self):
source = factories.HarvestSource(**SOURCE_DICT.copy())
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': site_user}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
def test_harvest_job_create_as_admin(self):
# as if an admin user presses 'refresh'
user = ckan_factories.User()
user['capacity'] = 'admin'
org = ckan_factories.Organization(users=[user])
source_dict = dict(SOURCE_DICT.items() +
[('publisher_id', org['id'])])
source = factories.HarvestSource(**source_dict)
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': user['name']}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
class TestHarvestObject(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_create(self):
job = factories.HarvestJobObj()
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': job.id,
'extras': {'a key': 'a value'},
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
# fetch the object from database to check it was created
created_object = harvest_model.HarvestObject.get(harvest_object['id'])
assert created_object.guid == harvest_object['guid'] == data_dict['guid']
def test_create_bad_parameters(self):
source_a = factories.HarvestSourceObj()
job = factories.HarvestJobObj()
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'job_id': job.id,
'source_id': source_a.id,
'extras': 1
}
harvest_object_create = toolkit.get_action('harvest_object_create')
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
data_dict['extras'] = {'test': 1}
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
class TestHarvestErrorMail(FunctionalTestBase):
@classmethod
def setup_class(cls):
super(TestHarvestErrorMail, cls).setup_class()
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
super(TestHarvestErrorMail, cls).teardown_class()
reset_db()
def _create_harvest_source_and_job_if_not_existing(self):
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
context = {
'user': site_user,
'model': model,
'session': model.Session,
'ignore_auth': True,
}
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test',
}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context,
source_dict
)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context,
{'id': source_dict['name']}
)
pass
try:
job = toolkit.get_action('harvest_job_create')(context, {
'source_id': harvest_source['id'], 'run': True})
except HarvestJobExists:
job = toolkit.get_action('harvest_job_show')(context, {
'id': harvest_source['status']['last_job']['id']})
pass
toolkit.get_action('harvest_jobs_run')(context, {})
toolkit.get_action('harvest_source_reindex')(context, {'id': harvest_source['id']})
return context, harvest_source, job
def _create_harvest_source_with_owner_org_and_job_if_not_existing(self):
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
context = {
'user': site_user,
'model': model,
'session': model.Session,
'ignore_auth': True,
}
test_org = ckan_factories.Organization()
test_other_org = ckan_factories.Organization()
org_admin_user = ckan_factories.User()
org_member_user = ckan_factories.User()
other_org_admin_user = ckan_factories.User()
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_org['id'],
'username': org_admin_user['name'],
'role': 'admin'
}
)
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_org['id'],
'username': org_member_user['name'],
'role': 'member'
}
)
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_other_org['id'],
'username': other_org_admin_user['name'],
'role': 'admin'
}
)
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test',
'owner_org': test_org['id'],
'run': True
}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context.copy(),
source_dict
)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context.copy(),
{'id': source_dict['name']}
)
pass
try:
job = toolkit.get_action('harvest_job_create')(context.copy(), {
'source_id': harvest_source['id'], 'run': True})
except HarvestJobExists:
job = toolkit.get_action('harvest_job_show')(context.copy(), {
'id': harvest_source['status']['last_job']['id']})
pass
toolkit.get_action('harvest_jobs_run')(context.copy(), {})
toolkit.get_action('harvest_source_reindex')(context.copy(), {'id': harvest_source['id']})
return context, harvest_source, job
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_not_sent(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_and_job_if_not_existing()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(0, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.not_called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_and_job_if_not_existing()
# create a HarvestGatherError
job_model = HarvestJob.get(job['id'])
msg = 'System error - No harvester could be found for source type %s' % job_model.source.type
err = HarvestGatherError(message=msg, job=job_model)
err.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent_with_object_error(self, mock_mailer_mail_recipient):
context, harvest_source, harvest_job = self._create_harvest_source_and_job_if_not_existing()
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job['id'],
'extras': {'a key': 'a value'},
'source_id': harvest_source['id']
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
harvest_object_model = HarvestObject.get(harvest_object['id'])
# create a HarvestObjectError
msg = 'HarvestObjectError occured: %s' % harvest_job['id']
harvest_object_error = HarvestObjectError(message=msg, object=harvest_object_model)
harvest_object_error.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent_with_org(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_with_owner_org_and_job_if_not_existing()
# create a HarvestGatherError
job_model = HarvestJob.get(job['id'])
msg = 'System error - No harvester could be found for source type %s' % job_model.source.type
err = HarvestGatherError(message=msg, job=job_model)
err.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
assert_equal(2, mock_mailer_mail_recipient.call_count)
# Skip for now as the Harvest DB log doesn't work on CKAN 2.9
class XXTestHarvestDBLog(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def xxtest_harvest_db_logger(self):
# Create source and check if harvest_log table is populated
data_dict = SOURCE_DICT.copy()
data_dict['source_type'] = 'test'
source = factories.HarvestSourceObj(**data_dict)
content = 'Harvest source created: %s' % source.id
log = harvest_model.Session.query(harvest_model.HarvestLog).\
filter(harvest_model.HarvestLog.content == content).first()
self.assertIsNotNone(log)
self.assertEqual(log.level, 'INFO')
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data = toolkit.get_action('harvest_log_list')(context, {})
self.assertTrue(len(data) > 0)
self.assertIn('level', data[0])
self.assertIn('content', data[0])
self.assertIn('created', data[0])
self.assertTrue(data[0]['created'] > data[1]['created'])
per_page = 1
data = toolkit.get_action('harvest_log_list')(context, {'level': 'info', 'per_page': per_page})
self.assertEqual(len(data), per_page)
self.assertEqual(data[0]['level'], 'INFO')

View File

@ -0,0 +1,348 @@
from mock import patch
from ckantoolkit.tests.helpers import reset_db
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.queue as queue
from ckan.plugins.core import SingletonPlugin, implements
import json
import ckan.logic as logic
from ckan import model
from nose.tools import assert_equal, ok_
from ckan.lib.base import config
from nose.plugins.skip import SkipTest
import uuid
class MockHarvester(SingletonPlugin):
implements(IHarvester)
def info(self):
return {'name': 'test', 'title': 'test', 'description': 'test'}
def gather_stage(self, harvest_job):
if harvest_job.source.url.startswith('basic_test'):
obj = HarvestObject(guid='test1', job=harvest_job)
obj.extras.append(HarvestObjectExtra(key='key', value='value'))
obj2 = HarvestObject(guid='test2', job=harvest_job)
obj3 = HarvestObject(guid='test_to_delete', job=harvest_job)
obj.add()
obj2.add()
obj3.save() # this will commit both
return [obj.id, obj2.id, obj3.id]
return []
def fetch_stage(self, harvest_object):
assert_equal(harvest_object.state, "FETCH")
assert harvest_object.fetch_started is not None
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
def import_stage(self, harvest_object):
assert_equal(harvest_object.state, "IMPORT")
assert harvest_object.fetch_finished is not None
assert harvest_object.import_started is not None
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
package = json.loads(harvest_object.content)
name = package['name']
package_object = model.Package.get(name)
if package_object:
logic_function = 'package_update'
else:
logic_function = 'package_create'
package_dict = logic.get_action(logic_function)(
{'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True},
json.loads(harvest_object.content)
)
# set previous objects to not current
previous_object = model.Session.query(HarvestObject) \
.filter(HarvestObject.guid == harvest_object.guid) \
.filter(
HarvestObject.current == True # noqa: E712
).first()
if previous_object:
previous_object.current = False
previous_object.save()
# delete test_to_delete package on second run
harvest_object.package_id = package_dict['id']
harvest_object.current = True
if package_dict['name'] == 'test_to_delete' and package_object:
harvest_object.current = False
package_object.state = 'deleted'
package_object.save()
harvest_object.save()
return True
class TestHarvestQueue(object):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_01_basic_harvester(self):
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test',
}
harvest_source = logic.get_action('harvest_source_create')(
context,
source_dict
)
assert harvest_source['source_type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == 3
all_objects = model.Session.query(HarvestObject).filter_by(current=True).all()
assert_equal(len(all_objects), 3)
assert_equal(all_objects[0].state, 'COMPLETE')
assert_equal(all_objects[0].report_status, 'added')
assert_equal(all_objects[1].state, 'COMPLETE')
assert_equal(all_objects[1].report_status, 'added')
assert_equal(all_objects[2].state, 'COMPLETE')
assert_equal(all_objects[2].report_status, 'added')
# fire run again to check if job is set to Finished
logic.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert_equal(harvest_job['status'], u'Finished')
assert_equal(harvest_job['stats'], {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0})
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 3, 'updated': 0,
'not modified': 0, 'errored': 0, 'deleted': 0})
assert_equal(harvest_source_dict['status']['total_datasets'], 3)
assert_equal(harvest_source_dict['status']['job_count'], 1)
# Second run
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 6
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert_equal(count, 3)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='added').all()
assert_equal(len(all_objects), 3)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='updated').all()
assert_equal(len(all_objects), 2)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='deleted').all()
assert_equal(len(all_objects), 1)
# run to make sure job is marked as finshed
logic.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert_equal(harvest_job['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1})
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2,
'not modified': 0, 'errored': 0, 'deleted': 1})
assert_equal(harvest_source_dict['status']['total_datasets'], 2)
assert_equal(harvest_source_dict['status']['job_count'], 2)
def test_redis_queue_purging(self):
'''
Test that Redis queue purging doesn't purge the wrong keys.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
raise SkipTest()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key', 'foobar')
# Create some fake jobs
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
num_keys = redis.dbsize()
# Create some fake objects
gather_consumer = queue.get_gather_consumer()
next(gather_consumer.consume(queue.get_gather_queue_name()))
fetch_consumer = queue.get_fetch_consumer()
next(fetch_consumer.consume(queue.get_fetch_queue_name()))
ok_(redis.dbsize() > num_keys)
queue.purge_queues()
assert_equal(redis.get('ckanext-harvest:some-random-key'),
'foobar')
assert_equal(redis.dbsize(), num_keys)
assert_equal(redis.llen(queue.get_gather_routing_key()), 0)
assert_equal(redis.llen(queue.get_fetch_routing_key()), 0)
finally:
redis.delete('ckanext-harvest:some-random-key')
class TestHarvestCorruptRedis(object):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
@patch('ckanext.harvest.queue.log.error')
def test_redis_corrupt(self, mock_log_error):
'''
Test that corrupt Redis doesn't stop harvest process and still processes other jobs.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
raise SkipTest()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key-2', 'foobar')
# make sure queues/exchanges are created first and are empty
gather_consumer = queue.get_gather_consumer()
fetch_consumer = queue.get_fetch_consumer()
gather_consumer.queue_purge(queue=queue.get_gather_queue_name())
fetch_consumer.queue_purge(queue=queue.get_fetch_queue_name())
# Create some fake jobs and objects with no harvest_job_id
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': None})
h_obj_id = str(uuid.uuid4())
fetch_publisher.send({'harvest_object_id': h_obj_id})
# Create some fake objects
next(gather_consumer.consume(queue.get_gather_queue_name()))
_, _, body = next(fetch_consumer.consume(queue.get_fetch_queue_name()))
json_obj = json.loads(body)
assert json_obj['harvest_object_id'] == h_obj_id
assert mock_log_error.call_count == 1
args, _ = mock_log_error.call_args_list[0]
assert "cannot concatenate 'str' and 'NoneType' objects" in args[1]
finally:
redis.delete('ckanext-harvest:some-random-key-2')

View File

@ -0,0 +1,180 @@
'''Tests elements of queue.py, but doesn't use the queue subsystem
(redis/rabbitmq)
'''
import json
from nose.tools import assert_equal
from ckantoolkit.tests.helpers import reset_db
from ckan import model
from ckan import plugins as p
from ckan.plugins import toolkit
from ckanext.harvest.tests.factories import (HarvestObjectObj)
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
from ckanext.harvest.tests.lib import run_harvest
class MockHarvester(p.SingletonPlugin):
p.implements(IHarvester)
@classmethod
def _set_test_params(cls, guid, **test_params):
cls._guid = guid
cls._test_params = test_params
def info(self):
return {'name': 'test2', 'title': 'test', 'description': 'test'}
def gather_stage(self, harvest_job):
obj = HarvestObjectObj(guid=self._guid, job=harvest_job)
return [obj.id]
def fetch_stage(self, harvest_object):
if self._test_params.get('fetch_object_unchanged'):
return 'unchanged'
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
def import_stage(self, harvest_object):
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
package = json.loads(harvest_object.content)
name = package['name']
package_object = model.Package.get(name)
if package_object:
logic_function = 'package_update'
else:
logic_function = 'package_create'
package_dict = toolkit.get_action(logic_function)(
{'model': model, 'session': model.Session,
'user': user},
json.loads(harvest_object.content)
)
if self._test_params.get('object_error'):
return False
# successful, so move 'current' to this object
previous_object = model.Session.query(harvest_model.HarvestObject) \
.filter_by(guid=harvest_object.guid) \
.filter_by(current=True) \
.first()
if previous_object:
previous_object.current = False
previous_object.save()
harvest_object.package_id = package_dict['id']
harvest_object.current = True
if self._test_params.get('delete'):
# 'current=False' is the key step in getting report_status to be
# set as 'deleted'
harvest_object.current = False
package_object.save()
harvest_object.save()
if self._test_params.get('import_object_unchanged'):
return 'unchanged'
return True
class TestEndStates(object):
def setup(self):
reset_db()
harvest_model.setup()
def test_create_dataset(self):
guid = 'obj-create'
MockHarvester._set_test_params(guid=guid)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['errors'], [])
def test_update_dataset(self):
guid = 'obj-update'
MockHarvester._set_test_params(guid=guid)
# create the original harvest_object and dataset
run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
# update it
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'updated')
assert_equal(result['errors'], [])
def test_delete_dataset(self):
guid = 'obj-delete'
MockHarvester._set_test_params(guid=guid)
# create the original harvest_object and dataset
run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
MockHarvester._set_test_params(guid=guid, delete=True)
# delete it
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'deleted')
assert_equal(result['errors'], [])
def test_obj_error(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, object_error=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'ERROR')
assert_equal(result['report_status'], 'errored')
assert_equal(result['errors'], [])
def test_fetch_unchanged(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, fetch_object_unchanged=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert_equal(result['errors'], [])
def test_import_unchanged(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, import_object_unchanged=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert_equal(result['errors'], [])