Minor bugfixes. Add plugin tests

This commit is contained in:
Aitor Magán 2014-06-26 16:37:27 +02:00
parent c8da0fb6bb
commit ff4bdabf1d
2 changed files with 317 additions and 40 deletions

View File

@ -21,39 +21,43 @@ def package_show(context, data_dict):
if package and user_obj and package.creator_user_id == user_obj.id:
return {'success': True}
# anyone can see a public package
if not package.private and package.state == 'active':
return {'success': True}
# Not active packages can only be seen by its owners
if package.state == 'active':
# anyone can see a public package
if not package.private:
return {'success': True}
# if the user has rights to read in the organization or in the group
if package.owner_org:
authorized = new_authz.has_user_permission_for_group_or_org(
package.owner_org, user, 'read')
# if the user has rights to read in the organization or in the group
if package.owner_org:
authorized = new_authz.has_user_permission_for_group_or_org(
package.owner_org, user, 'read')
else:
authorized = False
# if the user is not authorized yet, we should check if the
# user is in the allowed_users object
if not authorized:
if hasattr(package, 'extras') and 'allowed_users' in package.extras:
allowed_users = package.extras['allowed_users'].split(',')
if user in allowed_users:
authorized = True
if not authorized:
# Show a flash message with the URL to adquire the dataset
# This message only can be shown when the user tries to access the dataset via its URL (/dataset/...)
# The message cannot be displayed in other pages that uses the package_show function such as
# the user profile page
if hasattr(package, 'extras') and 'adquire_url' in package.extras and request.path.startswith('/dataset/'):
helpers.flash_notice(_('This private dataset can be adquired. To do so, please click ' +
'<a target="_blank" href="%s">here</a>') % package.extras['adquire_url'],
allow_html=True)
return {'success': False, 'msg': _('User %s not authorized to read package %s') % (user, package.id)}
else:
return {'success': True}
else:
authorized = False
# if the user is not authorized yet, we should check if the
# user is in the allowed_users object
if not authorized:
if hasattr(package, 'extras') and 'allowed_users' in package.extras:
allowed_users = package.extras['allowed_users'].split(',')
if user in allowed_users:
authorized = True
if not authorized:
# Show a flash message with the URL to adquire the dataset
# This message only can be shown when the user tries to access the dataset via its URL (/dataset/...)
# The message cannot be displayed in other pages that uses the package_show function such as
# the user profile page
if hasattr(package, 'extras') and 'adquire_url' in package.extras and request.path.startswith('/dataset/'):
helpers.flash_notice(_('This private dataset can be adquired. To do so, please click ' +
'<a target="_blank" href="%s">here</a>') % package.extras['adquire_url'],
allow_html=True)
return {'success': False, 'msg': _('User %s not authorized to read package %s') % (user, package.id)}
else:
return {'success': True}
def package_update(context, data_dict):
@ -110,6 +114,9 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
def _modify_package_schema(self):
return {
# remove datasets_with_no_organization_cannot_be_private validator
'private': [tk.get_validator('ignore_missing'),
tk.get_validator('boolean_validator')],
'allowed_users': [tk.get_validator('ignore_missing'),
private_datasets_metadata_checker,
tk.get_converter('convert_to_extras')],
@ -121,22 +128,12 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
def create_package_schema(self):
# grab the default schema in our plugin
schema = super(PrivateDatasets, self).create_package_schema()
# remove datasets_with_no_organization_cannot_be_private validator
schema.update({
'private': [tk.get_validator('ignore_missing'),
tk.get_validator('boolean_validator')]
})
schema.update(self._modify_package_schema())
return schema
def update_package_schema(self):
# grab the default schema in our plugin
schema = super(PrivateDatasets, self).update_package_schema()
# remove datasets_with_no_organization_cannot_be_private validator
schema.update({
'private': [tk.get_validator('ignore_missing'),
tk.get_validator('boolean_validator')]
})
schema.update(self._modify_package_schema())
return schema

View File

@ -0,0 +1,280 @@
import unittest
import ckanext.privatedatasets.plugin as plugin
from mock import MagicMock, ANY
from nose_parameterized import parameterized
class PluginTest(unittest.TestCase):
def setUp(self):
# Create the plugin
self.privateDatasets = plugin.PrivateDatasets()
# Create mocks
self._logic_auth = plugin.logic_auth
plugin.logic_auth = MagicMock()
self._request = plugin.request
plugin.request = MagicMock()
self._helpers = plugin.helpers
plugin.helpers = MagicMock()
self._new_authz = plugin.new_authz
plugin.new_authz = MagicMock()
self._tk = plugin.tk
plugin.tk = MagicMock()
def tearDown(self):
plugin.logic_auth = self._logic_auth
plugin.request = self._request
plugin.helpers = self._helpers
plugin.new_authz = self._new_authz
plugin.tk = self._tk
def test_implementations(self):
self.assertTrue(plugin.p.IDatasetForm.implemented_by(plugin.PrivateDatasets))
self.assertTrue(plugin.p.IAuthFunctions.implemented_by(plugin.PrivateDatasets))
self.assertTrue(plugin.p.IConfigurer.implemented_by(plugin.PrivateDatasets))
self.assertTrue(plugin.p.IRoutes.implemented_by(plugin.PrivateDatasets))
self.assertTrue(plugin.p.IActions.implemented_by(plugin.PrivateDatasets))
@parameterized.expand([
# Anonymous user (public)
(None, None, None, False, 'active', None, None, None, None, None, True),
# Anonymous user (private)
(None, None, None, True, 'active', None, None, None, None, '/', False),
# Anonymous user (private). Buy URL not shown
(None, None, None, True, 'active', None, None, None, 'google.es', '/', False),
# Anonymous user (private). Buy URL shown
(None, None, None, True, 'active', None, None, None, 'google.es', '/dataset/testds', False),
# The creator can always see the dataset
(1, 1, None, False, 'active', None, None, None, None, None, True),
(1, 1, None, True, 'active', None, None, None, None, None, True),
(1, 1, None, False, 'draft', None, None, None, None, None, True),
# Other user (no organizations)
(1, 2, 'test', False, 'active', None, None, None, None, None, True),
(1, 2, 'test', True, 'active', None, None, None, 'google.es', '/', False), # Buy MSG not shown
(1, 2, 'test', True, 'active', None, None, None, None, '/dataset/testds', False), # Buy MSG not shown
(1, 2, 'test', True, 'active', None, None, None, 'google.es', '/dataset/testds', False), # Buy MSG shown
(1, 2, 'test', False, 'draft', None, None, None, None, None, False),
# Other user but authorized in the list of authorized users
(1, 2, 'test', True, 'active', None, None, 'some,another,test,other', None, None, True),
(1, 2, 'test', True, 'active', None, None, 'test', None, None, True),
# Other user and not authorized in the list of authorized users
(1, 2, 'test', True, 'active', None, None, 'some,another,other', 'google.es', '/', False),
(1, 2, 'test', True, 'active', None, None, 'some,another,other', 'google.es', '/dataset/testds', False),
# Other user with organizations
(1, 2, 'test', False, 'active', 'conwet', False, None, None, None, True),
(1, 2, 'test', True, 'active', 'conwet', False, None, None, None, False),
(1, 2, 'test', True, 'active', 'conwet', True, None, None, None, True),
(1, 2, 'test', True, 'draft', 'conwet', True, None, None, None, False)
])
def test_auth_package_show(self, creator_user_id, user_obj_id, user, private, state, owner_org,
owner_member, allowed_users, adquire_url, request_path, authorized):
# Configure the mocks
returned_package = MagicMock()
returned_package.creator_user_id = creator_user_id
returned_package.private = private
returned_package.state = state
returned_package.owner_org = owner_org
returned_package.extras = {}
if allowed_users:
returned_package.extras['allowed_users'] = allowed_users
if adquire_url:
returned_package.extras['adquire_url'] = adquire_url
plugin.logic_auth.get_package_object = MagicMock(return_value=returned_package)
plugin.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member)
plugin.request.path = MagicMock(return_value=request_path)
# Prepare the context
context = {}
if user:
context['user'] = user
if user_obj_id:
context['auth_user_obj'] = MagicMock()
context['auth_user_obj'].id = user_obj_id
# Function to be tested
result = plugin.package_show(context, {})
# Check the result
self.assertEquals(authorized, result['success'])
# Check that the mocks has been called properly
if private and owner_org and state == 'active':
plugin.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'read')
# Conditions to buy a dataset; It should be private, active and should not belong to any organization
if not authorized and state == 'active' and not owner_org and request_path.startswith('/dataset/'):
plugin.helpers.flash_error.assert_called_once()
@parameterized.expand([
(None, None, None, None, None, False), # Anonymous user
(1, 1, None, None, None, True), # A user can edit its dataset
(1, 2, None, None, None, False), # A user cannot edit a dataset belonging to another user
(1, 2, 'test', 'conwet', False, False), # User without rights to update a dataset
(1, 2, 'test', 'conwet', True, True), # User with rights to update a dataset
])
def test_auth_package_update(self, creator_user_id, user_obj_id, user, owner_org, owner_member, authorized):
# Configure the mocks
returned_package = MagicMock()
returned_package.creator_user_id = creator_user_id
returned_package.owner_org = owner_org
plugin.logic_auth.get_package_object = MagicMock(return_value=returned_package)
plugin.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member)
# Prepare the context
context = {}
if user:
context['user'] = user
if user_obj_id:
context['auth_user_obj'] = MagicMock()
context['auth_user_obj'].id = user_obj_id
# Function to be tested
result = plugin.package_update(context, {})
# Check the result
self.assertEquals(authorized, result['success'])
# Check that the mock has been called properly
if creator_user_id != user_obj_id and owner_org:
plugin.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'update_dataset')
def test_auth_functions(self):
auth_functions = self.privateDatasets.get_auth_functions()
self.assertEquals(auth_functions['package_show'], plugin.package_show)
self.assertEquals(auth_functions['package_update'], plugin.package_update)
@parameterized.expand([
('/dataset', True), # Include ignore_capacity_check
('/', False), # Not include ignore_capacity_check
('/datasets', False), # Not include ignore_capacity_check
('/api/rest/dataset', False) # Not include ignore_capacity_check. TODO: Maybe in the future this must change
])
def test_package_seach_modified(self, request_path, include_ignore_capacity):
# Mock the default actions
package_search_old = MagicMock()
plugin.tk.get_action = MagicMock(return_value=package_search_old)
# Mock request
plugin.request.path = request_path
# Get the actions returned by the plugin
actions = self.privateDatasets.get_actions()
# Call the function
context = {'id': 'test', 'another_test': 'test_value'}
data_dict = {'example': 'test', 'key': 'value'}
actions['package_search'](context, data_dict)
# Test if the default function has been called properly
package_search_old.assert_called_once_with(ANY, data_dict)
context_called = package_search_old.call_args_list[0][0][0] # First call, first argument
expected_context = context.copy()
if include_ignore_capacity:
expected_context.update({'ignore_capacity_check': True})
self.assertEquals(expected_context, context_called)
def test_update_config(self):
# Call the method
config = {'test': 1234, 'another': 'value'}
self.privateDatasets.update_config(config)
# Test that functions are called as expected
plugin.tk.add_template_directory.assert_called_once_with(config, 'templates')
plugin.tk.add_resource('fanstatic', 'privatedatasets')
def test_map(self):
# Call the method
m = MagicMock()
self.privateDatasets.after_map(m)
# Test that the connect method has been called
m.connect.assert_called_once_with('/dataset_adquired',
controller='ckanext.privatedatasets.controller:AdquiredDatasetsController',
action='add_user', conditions=dict(method=['POST']))
@parameterized.expand([
('create_package_schema'),
('update_package_schema'),
])
def test_schema_create_update(self, function_name):
function = getattr(self.privateDatasets, function_name)
returned_schema = function()
self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema['private'])
self.assertTrue(plugin.tk.get_validator('boolean_validator') in returned_schema['private'])
self.assertEquals(2, len(returned_schema['private']))
fields = ['allowed_users', 'adquire_url']
for field in fields:
self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema[field])
self.assertTrue(plugin.tk.get_converter('convert_to_extras') in returned_schema[field])
self.assertTrue(plugin.private_datasets_metadata_checker in returned_schema[field])
self.assertEquals(3, len(returned_schema[field]))
def test_schema_show(self):
returned_schema = self.privateDatasets.show_package_schema()
fields = ['allowed_users', 'adquire_url']
for field in fields:
self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema[field])
self.assertTrue(plugin.tk.get_converter('convert_from_extras') in returned_schema[field])
self.assertEquals(2, len(returned_schema[field]))
@parameterized.expand([
# When no data is present, no errors should be returned
(True, 'conwet', '', False),
('True', 'conwet', '', False),
(False, 'conwet', '', False),
('False', 'conwet', '', False),
(True, None, '', False),
('True', None, '', False),
(False, None, '', False),
('False', None, '', False),
# When data is present, the field is only valid when the
# organization is not set and the private field is set to true
(True, 'conwet', 'test', True),
('True', 'conwet', 'test', True),
(False, 'conwet', 'test', True),
('False', 'conwet', 'test', True),
(True, None, 'test', False),
('True', None, 'test', False),
(False, None, 'test', True),
('False', None, 'test', True),
])
def test_metadata_checker(self, private, owner_org, metada_val, error_set):
# TODO: Maybe this test should be refactored since the function should be refactored
KEY = ('test')
errors = {}
errors[KEY] = []
data = {}
data[('private',)] = private
data[('owner_org',)] = owner_org
data[KEY] = metada_val
plugin.private_datasets_metadata_checker(KEY, data, errors, {})
if (error_set):
self.assertEquals(1, len(errors[KEY]))
else:
self.assertEquals(0, len(errors[KEY]))