harvester-d4science/ckanext/harvest/tests/test_action.py

619 lines
22 KiB
Python
Raw Normal View History

2012-11-30 18:02:06 +01:00
import json
import factories
import unittest
from nose.tools import assert_equal, assert_raises
from nose.plugins.skip import SkipTest
try:
from ckan.tests import factories as ckan_factories
from ckan.tests.helpers import (_get_test_app, reset_db,
FunctionalTestBase, assert_in)
except ImportError:
from ckan.new_tests import factories as ckan_factories
from ckan.new_tests.helpers import (_get_test_app, reset_db,
2015-12-10 17:36:27 +01:00
FunctionalTestBase)
try:
from ckan.new_tests.helpers import assert_in
except ImportError:
# for ckan 2.2
try:
from nose.tools import assert_in
except ImportError:
# Python 2.6 doesn't have it
def assert_in(a, b, msg=None):
assert a in b, msg or '%r was not in %r' % (a, b)
2015-12-10 17:36:27 +01:00
from ckan import plugins as p
from ckan.plugins import toolkit
from ckan import model
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
def call_action_api(action, apikey=None, status=200, **kwargs):
'''POST an HTTP request to the CKAN API and return the result.
Any additional keyword arguments that you pass to this function as **kwargs
are posted as params to the API.
Usage:
package_dict = call_action_api('package_create', apikey=apikey,
name='my_package')
assert package_dict['name'] == 'my_package'
num_followers = post(app, 'user_follower_count', id='annafan')
If you are expecting an error from the API and want to check the contents
of the error dict, you have to use the status param otherwise an exception
will be raised:
error_dict = call_action_api('group_activity_list', status=403,
id='invalid_id')
assert error_dict['message'] == 'Access Denied'
:param action: the action to post to, e.g. 'package_create'
:type action: string
:param apikey: the API key to put in the Authorization header of the post
(optional, default: None)
:type apikey: string
:param status: the HTTP status code expected in the response from the CKAN
API, e.g. 403, if a different status code is received an exception will
be raised (optional, default: 200)
:type status: int
:param **kwargs: any other keyword arguments passed to this function will
be posted to the API as params
:raises paste.fixture.AppError: if the HTTP status code of the response
from the CKAN API is different from the status param passed to this
function
:returns: the 'result' or 'error' dictionary from the CKAN API response
:rtype: dictionary
'''
params = json.dumps(kwargs)
app = _get_test_app()
response = app.post('/api/action/{0}'.format(action), params=params,
extra_environ={'Authorization': str(apikey)},
status=status)
if status in (200,):
assert response.json['success'] is True
return response.json['result']
else:
assert response.json['success'] is False
return response.json['error']
class MockHarvesterForActionTests(p.SingletonPlugin):
p.implements(IHarvester)
def info(self):
return {'name': 'test-for-action',
'title': 'Test for action',
'description': 'test'}
def validate_config(self, config):
if not config:
return config
try:
config_obj = json.loads(config)
if 'custom_option' in config_obj:
if not isinstance(config_obj['custom_option'], list):
raise ValueError('custom_option must be a list')
except ValueError, e:
raise e
return config
def gather_stage(self, harvest_job):
return []
def fetch_stage(self, harvest_object):
return True
def import_stage(self, harvest_object):
return True
SOURCE_DICT = {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
class ActionBase(object):
@classmethod
def setup_class(cls):
if not p.plugin_loaded('test_action_harvester'):
p.load('test_action_harvester')
2015-11-04 00:19:05 +01:00
def setup(self):
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
p.unload('test_action_harvester')
class HarvestSourceActionBase(FunctionalTestBase):
@classmethod
def setup_class(cls):
super(HarvestSourceActionBase, cls).setup_class()
harvest_model.setup()
if not p.plugin_loaded('test_action_harvester'):
p.load('test_action_harvester')
@classmethod
def teardown_class(cls):
super(HarvestSourceActionBase, cls).teardown_class()
p.unload('test_action_harvester')
def _get_source_dict(self):
return {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
def test_invalid_missing_values(self):
source_dict = {}
test_data = self._get_source_dict()
if 'id' in test_data:
source_dict['id'] = test_data['id']
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
for key in ('name', 'title', 'url', 'source_type'):
assert_equal(result[key], [u'Missing value'])
def test_invalid_unknown_type(self):
source_dict = self._get_source_dict()
source_dict['source_type'] = 'unknown'
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'source_type' in result
assert u'Unknown harvester type' in result['source_type'][0]
def test_invalid_unknown_frequency(self):
wrong_frequency = 'ANNUALLY'
source_dict = self._get_source_dict()
source_dict['frequency'] = wrong_frequency
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'frequency' in result
assert u'Frequency {0} not recognised'.format(wrong_frequency) in result['frequency'][0]
2012-11-30 18:02:06 +01:00
def test_invalid_wrong_configuration(self):
source_dict = self._get_source_dict()
2012-11-30 18:02:06 +01:00
source_dict['config'] = 'not_json'
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
2012-11-30 18:02:06 +01:00
assert 'config' in result
assert u'Error parsing the configuration options: No JSON object could be decoded' in result['config'][0]
2012-11-30 18:02:06 +01:00
source_dict['config'] = json.dumps({'custom_option': 'not_a_list'})
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
2012-11-30 18:02:06 +01:00
assert 'config' in result
assert u'Error parsing the configuration options: custom_option must be a list' in result['config'][0]
2012-11-30 18:02:06 +01:00
class TestHarvestSourceActionCreate(HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_create'
def test_create(self):
source_dict = self._get_source_dict()
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_create',
apikey=sysadmin['apikey'], **source_dict)
for key in source_dict.keys():
assert_equal(source_dict[key], result[key])
# Check that source was actually created
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
# Trying to create a source with the same URL fails
source_dict = self._get_source_dict()
source_dict['name'] = 'test-source-action-new'
result = call_action_api('harvest_source_create',
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'url' in result
assert u'There already is a Harvest Source for this URL' in result['url'][0]
class HarvestSourceFixtureMixin(object):
def _get_source_dict(self):
'''Not only returns a source_dict, but creates the HarvestSource object
as well - suitable for testing update actions.
'''
source = HarvestSourceActionBase._get_source_dict(self)
source = factories.HarvestSource(**source)
# delete status because it gets in the way of the status supplied to
# call_action_api later on. It is only a generated value, not affecting
# the update/patch anyway.
del source['status']
return source
class TestHarvestSourceActionUpdate(HarvestSourceFixtureMixin,
HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_update'
def test_update(self):
source_dict = self._get_source_dict()
source_dict.update({
"url": "http://test.action.updated.com",
"name": "test-source-action-updated",
"title": "Test source action updated",
"notes": "Test source action desc updated",
"source_type": "test",
"frequency": "MONTHLY",
"config": json.dumps({"custom_option": ["c", "d"]})
})
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_update',
apikey=sysadmin['apikey'], **source_dict)
for key in set(('url', 'name', 'title', 'notes', 'source_type',
'frequency', 'config')):
assert_equal(source_dict[key], result[key], "Key: %s" % key)
# Check that source was actually updated
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
class TestHarvestSourceActionPatch(HarvestSourceFixtureMixin,
HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_patch'
if toolkit.check_ckan_version(max_version='2.2.99'):
# harvest_source_patch only came in with ckan 2.3
raise SkipTest()
def test_invalid_missing_values(self):
pass
def test_patch(self):
source_dict = self._get_source_dict()
patch_dict = {
"id": source_dict['id'],
"name": "test-source-action-patched",
"url": "http://test.action.patched.com",
"config": json.dumps({"custom_option": ["pat", "ched"]})
}
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_patch',
apikey=sysadmin['apikey'], **patch_dict)
source_dict.update(patch_dict)
for key in set(('url', 'name', 'title', 'notes', 'source_type',
'frequency', 'config')):
assert_equal(source_dict[key], result[key], "Key: %s" % key)
# Check that source was actually updated
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
class TestActions(ActionBase):
def test_harvest_source_clear(self):
source = factories.HarvestSourceObj(**SOURCE_DICT)
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_clear')(
context, {'id': source.id})
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
assert_equal(model.Package.get(dataset['id']), None)
def test_harvest_source_job_history_clear(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT)
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_job_history_clear')(
context, {'id': source.id})
# verify
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db, 'is None'
2016-11-15 15:50:03 +01:00
assert_equal(dataset_from_db.id, dataset['id'])
def test_harvest_sources_job_history_clear(self):
# prepare
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
data_dict['name'] = 'job-history-clear-source'
data_dict['url'] = 'http://job-history-clear-url'
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
source_1_dict = toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
source_1 = harvest_model.HarvestSource.get(source_1_dict['id'])
data_dict['name'] = 'another-job-history-clear-source'
data_dict['url'] = 'http://another-job-history-clear-url'
source_2_dict = toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
source_2 = harvest_model.HarvestSource.get(source_2_dict['id'])
job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])
job_2 = factories.HarvestJobObj(source=source_2)
dataset_2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=dataset_2['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_sources_job_history_clear')(
context, {})
# verify
2016-11-16 00:25:19 +01:00
assert_equal(
sorted(result),
sorted([{'id': source_1.id}, {'id': source_2.id}]))
source_1 = harvest_model.HarvestSource.get(source_1.id)
assert source_1
assert_equal(harvest_model.HarvestJob.get(job_1.id), None)
assert_equal(harvest_model.HarvestObject.get(object_1_.id), None)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1, 'is None'
assert_equal(dataset_from_db_1.id, dataset_1['id'])
source_2 = harvest_model.HarvestSource.get(source_1.id)
assert source_2
assert_equal(harvest_model.HarvestJob.get(job_2.id), None)
assert_equal(harvest_model.HarvestObject.get(object_2_.id), None)
2016-11-16 00:34:07 +01:00
dataset_from_db_2 = model.Package.get(dataset_2['id'])
assert dataset_from_db_2, 'is None'
2016-11-15 15:50:03 +01:00
assert_equal(dataset_from_db_2.id, dataset_2['id'])
2015-11-04 00:19:05 +01:00
def test_harvest_source_create_twice_with_unique_url(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
data_dict['name'] = 'another-source1'
data_dict['url'] = 'http://another-url'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_same_url(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
data_dict['name'] = 'another-source2'
assert_raises(toolkit.ValidationError,
toolkit.get_action('harvest_source_create'),
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_unique_url_and_config(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
data_dict['name'] = 'another-source3'
data_dict['config'] = '{"something": "new"}'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_job_create_as_sysadmin(self):
source = factories.HarvestSource(**SOURCE_DICT)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': site_user}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
def test_harvest_job_create_as_admin(self):
# as if an admin user presses 'refresh'
user = ckan_factories.User()
user['capacity'] = 'admin'
org = ckan_factories.Organization(users=[user])
source_dict = dict(SOURCE_DICT.items() +
[('publisher_id', org['id'])])
source = factories.HarvestSource(**source_dict)
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': user['name']}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
class TestHarvestObject(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_create(self):
job = factories.HarvestJobObj()
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': job.id,
'extras': {'a key': 'a value'},
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
# fetch the object from database to check it was created
created_object = harvest_model.HarvestObject.get(harvest_object['id'])
assert created_object.guid == harvest_object['guid'] == data_dict['guid']
def test_create_bad_parameters(self):
source_a = factories.HarvestSourceObj()
job = factories.HarvestJobObj()
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'job_id': job.id,
'source_id': source_a.id,
'extras': 1
}
harvest_object_create = toolkit.get_action('harvest_object_create')
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
data_dict['extras'] = {'test': 1}
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
class TestHarvestDBLog(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_harvest_db_logger(self):
# Create source and check if harvest_log table is populated
data_dict = SOURCE_DICT.copy()
data_dict['source_type'] = 'test'
source = factories.HarvestSourceObj(**data_dict)
content = 'Harvest source created: %s' % source.id
log = harvest_model.Session.query(harvest_model.HarvestLog).\
filter(harvest_model.HarvestLog.content==content).first()
self.assertIsNotNone(log)
self.assertEqual(log.level, 'INFO')
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data = toolkit.get_action('harvest_log_list')(context, {})
self.assertTrue(len(data) > 0)
self.assertIn('level', data[0])
self.assertIn('content', data[0])
self.assertIn('created', data[0])
self.assertTrue(data[0]['created'] > data[1]['created'])
per_page = 1
data = toolkit.get_action('harvest_log_list')(context, {'level': 'info', 'per_page': per_page})
self.assertEqual(len(data), per_page)
self.assertEqual(data[0]['level'], 'INFO')