Split plugin test. Minor bug fixes

This commit is contained in:
Aitor Magán 2014-07-14 17:05:46 +02:00
parent bdbc1372d5
commit 414bc46e59
7 changed files with 556 additions and 295 deletions

View File

@ -12,7 +12,7 @@ def private_datasets_metadata_checker(key, data, errors, context):
private_val = data.get(('private',))
# If the private field is not included in the data dict, we must check the current value
if not private_val and dataset_id:
if private_val is None and dataset_id:
dataset_dict = toolkit.get_action('package_show')({'ignore_auth': True}, {'id': dataset_id})
private_val = dataset_dict.get('private')
@ -33,7 +33,7 @@ def allowed_users_convert(key, data, errors, context):
current_index = max([int(k[1]) for k in data.keys() if len(k) == 2 and k[0] == constants.ALLOWED_USERS] + [-1])
for num, allowed_user in zip(count(current_index + 1), allowed_users):
data[(constants.ALLOWED_USERS, num)] = allowed_user
data[(constants.ALLOWED_USERS, num)] = allowed_user.strip()
def get_allowed_users(key, data, errors, context):

View File

@ -8,7 +8,10 @@ def is_adquired(pkg_dict):
if db.package_allowed_users_table is None:
db.init_db(model)
return len(db.AllowedUser.get(package_id=pkg_dict['id'], user_name=tk.c.user)) > 0
if tk.c.user:
return len(db.AllowedUser.get(package_id=pkg_dict['id'], user_name=tk.c.user)) > 0
else:
return False
def is_owner(pkg_dict):
@ -21,3 +24,5 @@ def is_owner(pkg_dict):
def get_allowed_users_str(users):
if users:
return ','.join([user for user in users])
else:
return ''

View File

@ -210,7 +210,7 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
return pkg_dict
######################################################################
########################## ITEMPLATESHELER ###########################
######################### ITEMPLATESHELPER ###########################
######################################################################
def get_helpers(self):

View File

@ -0,0 +1,210 @@
import unittest
import ckanext.privatedatasets.auth as auth
from mock import MagicMock
from nose_parameterized import parameterized
class AuthTest(unittest.TestCase):
def setUp(self):
# Create mocks
self._logic_auth = auth.logic_auth
auth.logic_auth = MagicMock()
self._request = auth.request
auth.request = MagicMock()
self._helpers = auth.helpers
auth.helpers = MagicMock()
self._new_authz = auth.new_authz
auth.new_authz = MagicMock()
self._tk = auth.tk
auth.tk = MagicMock()
self._db = auth.db
auth.db = MagicMock()
def tearDown(self):
auth.logic_auth = self._logic_auth
auth.request = self._request
auth.helpers = self._helpers
auth.new_authz = self._new_authz
auth.tk = self._tk
auth.db = self._db
if hasattr(self, '_package_show'):
auth.package_show = self._package_show
def test_decordators(self):
self.assertEquals(True, getattr(auth.package_show, 'auth_allow_anonymous_access', False))
self.assertEquals(True, getattr(auth.resource_show, 'auth_allow_anonymous_access', False))
@parameterized.expand([
# Anonymous user (public)
(None, None, None, False, 'active', None, None, None, None, None, True),
# Anonymous user (private)
(None, None, None, True, 'active', None, None, None, None, '/', False),
(None, None, '', True, 'active', None, None, '', None, '/', False),
# Anonymous user (private). Buy URL not shown
(None, None, None, True, 'active', None, None, None, 'google.es', '/', False),
# Anonymous user (private). Buy URL show
(None, None, None, True, 'active', None, None, None, 'google.es', '/dataset/testds', False),
# The creator can always see the dataset
(1, 1, None, False, 'active', None, None, None, None, None, True),
(1, 1, None, True, 'active', None, None, None, None, None, True),
(1, 1, None, False, 'draft', None, None, None, None, None, True),
# Other user (no organizations)
(1, 2, 'test', False, 'active', None, None, None, None, None, True),
(1, 2, 'test', True, 'active', None, None, None, 'google.es', '/', False), # Buy MSG not shown
(1, 2, 'test', True, 'active', None, None, None, None, '/dataset/testds', False), # Buy MSG not shown
(1, 2, 'test', True, 'active', None, None, None, 'google.es', '/dataset/testds', False), # Buy MSG shown
(1, 2, 'test', False, 'draft', None, None, None, None, None, False),
# Other user but authorized in the list of authorized users
(1, 2, 'test', True, 'active', None, None, True, None, None, True),
# Other user and not authorized in the list of authorized users
(1, 2, 'test', True, 'active', None, None, False, 'google.es', '/', False),
(1, 2, 'test', True, 'active', None, None, False, 'google.es', '/dataset/testds', False),
# Other user with organizations
(1, 2, 'test', False, 'active', 'conwet', False, None, None, None, True),
(1, 2, 'test', True, 'active', 'conwet', False, None, None, None, False),
(1, 2, 'test', True, 'active', 'conwet', True, None, None, None, True),
(1, 2, 'test', True, 'draft', 'conwet', True, None, None, None, False),
# Other user with organizations (user is not in the organization)
(1, 2, 'test', True, 'active', 'conwet', False, True, None, None, True),
(1, 2, 'test', True, 'active', 'conwet', False, False, None, None, False),
(1, 2, 'test', True, 'active', 'conwet', False, False, 'google.es', '/dataset/testds', False),
(1, 2, 'test', True, 'active', 'conwet', False, False, 'google.es', '/', False)
])
def test_auth_package_show(self, creator_user_id, user_obj_id, user, private, state, owner_org,
owner_member, db_auth, adquire_url, request_path, authorized):
# Configure the mocks
returned_package = MagicMock()
returned_package.creator_user_id = creator_user_id
returned_package.private = private
returned_package.state = state
returned_package.owner_org = owner_org
returned_package.extras = {}
# Configure the database
db_response = []
if db_auth is True:
out = auth.db.AllowedUser()
out.package_id = 'package_id'
out.user_name = user
db_response.append(out)
auth.db.AllowedUser.get = MagicMock(return_value=db_response)
if adquire_url:
returned_package.extras['adquire_url'] = adquire_url
auth.logic_auth.get_package_object = MagicMock(return_value=returned_package)
auth.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member)
auth.request.path = MagicMock(return_value=request_path)
# Prepare the context
context = {'model': MagicMock()}
if user is not None:
context['user'] = user
if user_obj_id is not None:
context['auth_user_obj'] = MagicMock()
context['auth_user_obj'].id = user_obj_id
# Function to be tested
result = auth.package_show(context, {})
# Check the result
self.assertEquals(authorized, result['success'])
# Check that the mocks has been called properly
if private and owner_org and state == 'active':
auth.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'read')
# Conditions to buy a dataset; It should be private, active and should not belong to any organization
if not authorized and state == 'active' and not owner_org and request_path.startswith('/dataset/'):
auth.helpers.flash_error.assert_called_once()
@parameterized.expand([
(None, None, None, None, None, False), # Anonymous user
(1, 1, None, None, None, True), # A user can edit its dataset
(1, 2, None, None, None, False), # A user cannot edit a dataset belonging to another user
(1, 2, 'test', 'conwet', False, False), # User without rights to update a dataset
(1, 2, 'test', 'conwet', True, True), # User with rights to update a dataset
])
def test_auth_package_update(self, creator_user_id, user_obj_id, user, owner_org, owner_member, authorized):
# Configure the mocks
returned_package = MagicMock()
returned_package.creator_user_id = creator_user_id
returned_package.owner_org = owner_org
auth.logic_auth.get_package_object = MagicMock(return_value=returned_package)
auth.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member)
# Prepare the context
context = {}
if user is not None:
context['user'] = user
if user_obj_id is not None:
context['auth_user_obj'] = MagicMock()
context['auth_user_obj'].id = user_obj_id
# Function to be tested
result = auth.package_update(context, {})
# Check the result
self.assertEquals(authorized, result['success'])
# Check that the mock has been called properly
if creator_user_id != user_obj_id and owner_org:
auth.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'update_dataset')
@parameterized.expand([
(True, True),
(True, False),
(False, False),
(False, False)
])
def test_auth_resource_show(self, exist_pkg=True, authorized_pkg=True):
#Recover the exception
auth.tk.ObjectNotFound = self._tk.ObjectNotFound
# Mock the calls
package = MagicMock()
package.id = '1'
final_query = MagicMock()
final_query.first = MagicMock(return_value=package if exist_pkg else None)
second_join = MagicMock()
second_join.filter = MagicMock(return_value=final_query)
first_join = MagicMock()
first_join.join = MagicMock(return_value=second_join)
query = MagicMock()
query.join = MagicMock(return_value=first_join)
model = MagicMock()
session = MagicMock()
session.query = MagicMock(return_value=query)
model.Session = session
# Create the context
context = {}
context['model'] = model
# Mock the package_show function
self._package_show = auth.package_show
success = True if authorized_pkg else False
auth.package_show = MagicMock(return_value={'success': success})
if not exist_pkg:
self.assertRaises(self._tk.ObjectNotFound, auth.resource_show, context, {})
else:
result = auth.resource_show(context, {})
self.assertEquals(authorized_pkg, result['success'])

View File

@ -0,0 +1,147 @@
import unittest
import ckanext.privatedatasets.converters_validators as conv_val
from mock import MagicMock
from nose_parameterized import parameterized
class ConvertersValidatorsTest(unittest.TestCase):
def setUp(self):
# Create mocks
self._toolkit = conv_val.toolkit
conv_val.toolkit = MagicMock()
self._db = conv_val.db
conv_val.db = MagicMock()
def tearDown(self):
conv_val.db = self._db
conv_val.toolkit = self._toolkit
@parameterized.expand([
# When no data is present, no errors should be returned
(True, True, 'conwet', '', False),
('True', True, 'conwet', '', False),
(False, True, 'conwet', '', False),
('False', True, 'conwet', '', False),
(None, True, 'conwet', '', False),
(None, False, 'conwet', '', False),
(True, True, None, '', False),
('True', True, None, '', False),
(False, True, None, '', False),
('False', True, None, '', False),
(None, True, None, '', False),
(None, False, None, '', False),
# When data is present, the field is only valid when the
# the private field is set to true (organization should
# not be taken into account anymore)
(True, True, 'conwet', 'test', False),
('True', True, 'conwet', 'test', False),
(False, True, 'conwet', 'test', True),
('False', True, 'conwet', 'test', True),
(None, True, 'conwet', 'test', False),
(None, False, 'conwet', 'test', True),
(True, True, None, 'test', False),
('True', True, None, 'test', False),
(False, True, None, 'test', True),
('False', True, None, 'test', True),
(None, True, None, 'test', False),
(None, False, None, 'test', True),
])
def test_metadata_checker(self, received_private, package_private, owner_org, metada_val, error_set):
# Configure the mocks
package_show = MagicMock(return_value={'private': package_private, 'id': 'package_id'})
conv_val.toolkit.get_action = MagicMock(return_value=package_show)
KEY = ('test',)
errors = {}
errors[KEY] = []
data = {}
data[('id',)] = 'package_id'
data[('owner_org',)] = owner_org
if received_private is not None:
data[('private',)] = received_private
data[KEY] = metada_val
conv_val.private_datasets_metadata_checker(KEY, data, errors, {})
if error_set:
self.assertEquals(1, len(errors[KEY]))
else:
self.assertEquals(0, len(errors[KEY]))
@parameterized.expand([
('', 0, []),
('', 2, []),
('a', 0, ['a']),
('a', 2, ['a']),
('a,z, d', 0, ['a', 'z', 'd']),
('a,z, d', 2, ['a', 'z', 'd'])
])
def test_allowed_user_convert(self, users_str, previous_users, expected_users):
key_str = 'allowed_users_str'
key = 'allowed_users'
data = {(key_str,): users_str}
for i in range(0, previous_users):
data[(key, i)] = i
# Call the function
conv_val.allowed_users_convert((key_str,), data, {}, {})
# Check that the users are set properly
for i in range(previous_users, previous_users + len(expected_users)):
self.assertEquals(expected_users[i - previous_users], data[(key, i)])
@parameterized.expand([
([], True),
(['a'], True),
(['a', 'b'], True),
(['a', 'b', 'c'], True),
(['a', 'b', 'c', 'd', 'e'], True),
([], False),
(['a'], False),
(['a', 'b'], False),
(['a', 'b', 'c'], False),
(['a', 'b', 'c', 'd', 'e'], False)
])
def test_get_allowed_users(self, users, table_initialized):
key = 'allowed_users'
data = {('id',): 'package_id'}
# Table init
conv_val.db.package_allowed_users_table = None if not table_initialized else MagicMock()
# Each time 'AllowedUser' is called, we must get a new instance
# and this is the way to get this behaviour
def constructor():
return MagicMock()
conv_val.db.AllowedUser = MagicMock(side_effect=constructor)
# Create the users
db_res = []
for user in users:
db_row = conv_val.db.AllowedUser()
db_row.package_id = 'package_id'
db_row.user_name = user
db_res.append(db_row)
conv_val.db.AllowedUser.get = MagicMock(return_value=db_res)
# Call the function
context = {'model': MagicMock()}
conv_val.get_allowed_users((key,), data, {}, context)
# Check that the users are set properly
for i, user in enumerate(users):
self.assertEquals(user, data[(key, i)])
# Check that the table has been initialized properly
if not table_initialized:
conv_val.db.init_db.assert_called_once_with(context['model'])
else:
self.assertEquals(0, conv_val.db.init_db.call_count)

View File

@ -0,0 +1,82 @@
import unittest
import ckanext.privatedatasets.helpers as helpers
from mock import MagicMock
from nose_parameterized import parameterized
class HelpersTest(unittest.TestCase):
def setUp(self):
# Create mocks
self._model = helpers.model
helpers.model = MagicMock()
self._tk = helpers.tk
helpers.tk = MagicMock()
self._db = helpers.db
helpers.db = MagicMock()
def tearDown(self):
helpers.model = self._model
helpers.tk = self._tk
helpers.db = self._db
@parameterized.expand([
(False, False, 'user', False),
(True, False, 'user', True),
(False, True, 'user', False),
(True, True, 'user', True),
(False, False, None, False),
(True, False, None, False),
(False, True, None, False),
(True, True, None, False),
])
def test_is_adquired(self, db_auth, table_initialized, user, adquired):
# Configure test
helpers.tk.c.user = user
pkg_dict = {'id': 'package_id'}
helpers.db.package_allowed_users_table = None if not table_initialized else MagicMock()
db_response = []
if db_auth is True:
out = helpers.db.AllowedUser()
out.package_id = 'package_id'
out.user_name = user
db_response.append(out)
helpers.db.AllowedUser.get = MagicMock(return_value=db_response)
# Check the function returns the expected result
self.assertEquals(adquired, helpers.is_adquired(pkg_dict))
if not table_initialized:
helpers.db.init_db.assert_called_once_with(helpers.model)
else:
self.assertEquals(0, helpers.db.init_db.call_count)
@parameterized.expand([
(1, 1, True),
(1, 2, False),
(1, None, False)
])
def test_is_owner(self, creator_user_id, user_id, owner):
# Configure test
user = MagicMock()
user.id = user_id
helpers.tk.c.userobj = user
pkg_dict = {'creator_user_id': creator_user_id}
# Check that the functions return the expected result
self.assertEquals(owner, helpers.is_owner(pkg_dict))
@parameterized.expand([
([], ''),
(['a'], 'a'),
(['a', 'b'], 'a,b'),
(['a', 'b', 'c', 'd'], 'a,b,c,d'),
])
def test_get_allowed_users_str(self, allowed_users, expected_result):
self.assertEquals(expected_result, helpers.get_allowed_users_str(allowed_users))

View File

@ -12,212 +12,30 @@ class PluginTest(unittest.TestCase):
self.privateDatasets = plugin.PrivateDatasets()
# Create mocks
self._logic_auth = plugin.logic_auth
plugin.logic_auth = MagicMock()
self._request = plugin.request
plugin.request = MagicMock()
self._helpers = plugin.helpers
plugin.helpers = MagicMock()
self._new_authz = plugin.new_authz
plugin.new_authz = MagicMock()
self._tk = plugin.tk
plugin.tk = MagicMock()
def tearDown(self):
plugin.logic_auth = self._logic_auth
plugin.request = self._request
plugin.helpers = self._helpers
plugin.new_authz = self._new_authz
plugin.tk = self._tk
if hasattr(self, '_package_show'):
plugin.package_show = self._package_show
@parameterized.expand([
(plugin.p.IDatasetForm,),
(plugin.p.IAuthFunctions,),
(plugin.p.IConfigurer,),
(plugin.p.IRoutes,),
(plugin.p.IActions,),
(plugin.p.IPackageController,),
(plugin.p.ITemplateHelpers,)
])
def test_implementations(self, interface):
self.assertTrue(interface.implemented_by(plugin.PrivateDatasets))
def test_decordators(self):
self.assertEquals(True, getattr(plugin.package_show, 'auth_allow_anonymous_access', False))
self.assertEquals(True, getattr(plugin.resource_show, 'auth_allow_anonymous_access', False))
@parameterized.expand([
# Anonymous user (public)
(None, None, None, False, 'active', None, None, None, None, None, True),
# Anonymous user (private)
(None, None, None, True, 'active', None, None, None, None, '/', False),
(None, None, '', True, 'active', None, None, '', None, '/', False),
# Anonymous user (private). Buy URL not shown
(None, None, None, True, 'active', None, None, None, 'google.es', '/', False),
# Anonymous user (private). Buy URL shown
(None, None, None, True, 'active', None, None, None, 'google.es', '/dataset/testds', False),
# The creator can always see the dataset
(1, 1, None, False, 'active', None, None, None, None, None, True),
(1, 1, None, True, 'active', None, None, None, None, None, True),
(1, 1, None, False, 'draft', None, None, None, None, None, True),
# Other user (no organizations)
(1, 2, 'test', False, 'active', None, None, None, None, None, True),
(1, 2, 'test', True, 'active', None, None, None, 'google.es', '/', False), # Buy MSG not shown
(1, 2, 'test', True, 'active', None, None, None, None, '/dataset/testds', False), # Buy MSG not shown
(1, 2, 'test', True, 'active', None, None, None, 'google.es', '/dataset/testds', False), # Buy MSG shown
(1, 2, 'test', False, 'draft', None, None, None, None, None, False),
# Other user but authorized in the list of authorized users
(1, 2, 'test', True, 'active', None, None, 'some,another,test,other', None, None, True),
(1, 2, 'test', True, 'active', None, None, 'test', None, None, True),
# Other user and not authorized in the list of authorized users
(1, 2, 'test', True, 'active', None, None, 'some,another,other', 'google.es', '/', False),
(1, 2, 'test', True, 'active', None, None, 'some,another,other', 'google.es', '/dataset/testds', False),
# Other user with organizations
(1, 2, 'test', False, 'active', 'conwet', False, None, None, None, True),
(1, 2, 'test', True, 'active', 'conwet', False, None, None, None, False),
(1, 2, 'test', True, 'active', 'conwet', True, None, None, None, True),
(1, 2, 'test', True, 'draft', 'conwet', True, None, None, None, False),
# Other user with organizations (user is not in the organization)
(1, 2, 'test', True, 'active', 'conwet', False, 'test', None, None, True),
(1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', None, None, False),
(1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', 'google.es', '/dataset/testds', False),
(1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', 'google.es', '/', False)
])
def test_auth_package_show(self, creator_user_id, user_obj_id, user, private, state, owner_org,
owner_member, allowed_users, adquire_url, request_path, authorized):
# Configure the mocks
returned_package = MagicMock()
returned_package.creator_user_id = creator_user_id
returned_package.private = private
returned_package.state = state
returned_package.owner_org = owner_org
returned_package.extras = {}
if allowed_users is not None:
returned_package.extras['allowed_users'] = allowed_users
if adquire_url:
returned_package.extras['adquire_url'] = adquire_url
plugin.logic_auth.get_package_object = MagicMock(return_value=returned_package)
plugin.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member)
plugin.request.path = MagicMock(return_value=request_path)
# Prepare the context
context = {}
if user is not None:
context['user'] = user
if user_obj_id is not None:
context['auth_user_obj'] = MagicMock()
context['auth_user_obj'].id = user_obj_id
# Function to be tested
result = plugin.package_show(context, {})
# Check the result
self.assertEquals(authorized, result['success'])
# Check that the mocks has been called properly
if private and owner_org and state == 'active':
plugin.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'read')
# Conditions to buy a dataset; It should be private, active and should not belong to any organization
if not authorized and state == 'active' and not owner_org and request_path.startswith('/dataset/'):
plugin.helpers.flash_error.assert_called_once()
@parameterized.expand([
(None, None, None, None, None, False), # Anonymous user
(1, 1, None, None, None, True), # A user can edit its dataset
(1, 2, None, None, None, False), # A user cannot edit a dataset belonging to another user
(1, 2, 'test', 'conwet', False, False), # User without rights to update a dataset
(1, 2, 'test', 'conwet', True, True), # User with rights to update a dataset
])
def test_auth_package_update(self, creator_user_id, user_obj_id, user, owner_org, owner_member, authorized):
# Configure the mocks
returned_package = MagicMock()
returned_package.creator_user_id = creator_user_id
returned_package.owner_org = owner_org
plugin.logic_auth.get_package_object = MagicMock(return_value=returned_package)
plugin.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member)
# Prepare the context
context = {}
if user is not None:
context['user'] = user
if user_obj_id is not None:
context['auth_user_obj'] = MagicMock()
context['auth_user_obj'].id = user_obj_id
# Function to be tested
result = plugin.package_update(context, {})
# Check the result
self.assertEquals(authorized, result['success'])
# Check that the mock has been called properly
if creator_user_id != user_obj_id and owner_org:
plugin.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'update_dataset')
@parameterized.expand([
(True, True),
(True, False),
(False, False),
(False, False)
])
def test_auth_resource_show(self, exist_pkg=True, authorized_pkg=True):
#Recover the exception
plugin.tk.ObjectNotFound = self._tk.ObjectNotFound
# Mock the calls
package = MagicMock()
package.id = '1'
final_query = MagicMock()
final_query.first = MagicMock(return_value=package if exist_pkg else None)
second_join = MagicMock()
second_join.filter = MagicMock(return_value=final_query)
first_join = MagicMock()
first_join.join = MagicMock(return_value=second_join)
query = MagicMock()
query.join = MagicMock(return_value=first_join)
model = MagicMock()
session = MagicMock()
session.query = MagicMock(return_value=query)
model.Session = session
# Create the context
context = {}
context['model'] = model
# Mock the package_show function
self._package_show = plugin.package_show
success = True if authorized_pkg else False
plugin.package_show = MagicMock(return_value={'success': success})
if not exist_pkg:
self.assertRaises(self._tk.ObjectNotFound, plugin.resource_show, context, {})
else:
result = plugin.resource_show(context, {})
self.assertEquals(authorized_pkg, result['success'])
def test_auth_functions(self):
auth_functions = self.privateDatasets.get_auth_functions()
self.assertEquals(auth_functions['package_show'], plugin.package_show)
self.assertEquals(auth_functions['package_update'], plugin.package_update)
self.assertEquals(auth_functions['resource_show'], plugin.resource_show)
self.assertEquals(auth_functions['package_show'], plugin.auth.package_show)
self.assertEquals(auth_functions['package_update'], plugin.auth.package_update)
self.assertEquals(auth_functions['resource_show'], plugin.auth.resource_show)
self.assertEquals(auth_functions['package_adquired'], plugin.auth.package_adquired)
def test_update_config(self):
# Call the method
@ -234,85 +52,13 @@ class PluginTest(unittest.TestCase):
self.privateDatasets.after_map(m)
# Test that the connect method has been called
m.connect.assert_any_call('/dataset_adquired',
controller='ckanext.privatedatasets.controllers.api_controller:AdquiredDatasetsControllerAPI',
action='add_users', conditions=dict(method=['POST']))
m.connect.assert_any_call('user_adquired_datasets', '/dashboad/adquired', ckan_icon='shopping-cart',
controller='ckanext.privatedatasets.controllers.ui_controller:AdquiredDatasetsControllerUI',
action='user_adquired_datasets', conditions=dict(method=['GET']))
@parameterized.expand([
('create_package_schema'),
('update_package_schema'),
])
def test_schema_create_update(self, function_name):
function = getattr(self.privateDatasets, function_name)
returned_schema = function()
self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema['private'])
self.assertTrue(plugin.tk.get_validator('boolean_validator') in returned_schema['private'])
self.assertEquals(2, len(returned_schema['private']))
fields = ['allowed_users', 'adquire_url']
for field in fields:
self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema[field])
self.assertTrue(plugin.tk.get_converter('convert_to_extras') in returned_schema[field])
self.assertTrue(plugin.private_datasets_metadata_checker in returned_schema[field])
self.assertEquals(3, len(returned_schema[field]))
def test_schema_show(self):
returned_schema = self.privateDatasets.show_package_schema()
fields = ['allowed_users', 'adquire_url']
for field in fields:
self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema[field])
self.assertTrue(plugin.tk.get_converter('convert_from_extras') in returned_schema[field])
self.assertEquals(2, len(returned_schema[field]))
@parameterized.expand([
# When no data is present, no errors should be returned
(True, 'conwet', '', False),
('True', 'conwet', '', False),
(False, 'conwet', '', False),
('False', 'conwet', '', False),
(True, None, '', False),
('True', None, '', False),
(False, None, '', False),
('False', None, '', False),
# When data is present, the field is only valid when the
# organization is not set and the private field is set to true
(True, 'conwet', 'test', False),
('True', 'conwet', 'test', False),
(False, 'conwet', 'test', True),
('False', 'conwet', 'test', True),
(True, None, 'test', False),
('True', None, 'test', False),
(False, None, 'test', True),
('False', None, 'test', True),
])
def test_metadata_checker(self, private, owner_org, metada_val, error_set):
# TODO: Maybe this test should be refactored since the function should be refactored
KEY = ('test',)
errors = {}
errors[KEY] = []
data = {}
data[('private',)] = private
data[('owner_org',)] = owner_org
data[KEY] = metada_val
plugin.private_datasets_metadata_checker(KEY, data, errors, {})
if error_set:
self.assertEquals(1, len(errors[KEY]))
else:
self.assertEquals(0, len(errors[KEY]))
def test_actions_functions(self):
actions = self.privateDatasets.get_actions()
self.assertEquals(actions['package_adquired'], plugin.actions.package_adquired)
def test_fallback(self):
self.assertEquals(True, self.privateDatasets.is_fallback())
@ -320,28 +66,113 @@ class PluginTest(unittest.TestCase):
def test_package_types(self):
self.assertEquals([], self.privateDatasets.package_types())
######################################################################
############################## SCHEMAS ###############################
######################################################################
def _check_fields(self, schema, fields):
for field in fields:
for checker_validator in fields[field]:
self.assertTrue(checker_validator in schema[field])
self.assertEquals(len(fields[field]), len(schema[field]))
@parameterized.expand([
('create_package_schema'),
('update_package_schema'),
])
def test_schema_create_update(self, function_name):
function = getattr(self.privateDatasets, function_name)
returned_schema = function()
fields = {
'private': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_validator('boolean_validator')],
'adquire_url': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_converter('convert_to_extras'),
plugin.conv_val.private_datasets_metadata_checker],
'searchable': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_validator('boolean_validator'),
plugin.tk.get_converter('convert_to_extras'), plugin.conv_val.private_datasets_metadata_checker],
'allowed_users_str': [plugin.tk.get_validator('ignore_missing'), plugin.conv_val.allowed_users_convert,
plugin.conv_val.private_datasets_metadata_checker],
'allowed_users': [plugin.tk.get_validator('ignore_missing'), plugin.conv_val.private_datasets_metadata_checker]
}
self._check_fields(returned_schema, fields)
def test_schema_show(self):
returned_schema = self.privateDatasets.show_package_schema()
fields = ['searchable', 'adquire_url']
fields = {
'adquire_url': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_converter('convert_from_extras')],
'searchable': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_converter('convert_from_extras')],
'allowed_users': [plugin.tk.get_validator('ignore_missing'), plugin.conv_val.get_allowed_users]
}
self._check_fields(returned_schema, fields)
######################################################################
############################## PACKAGE ###############################
######################################################################
@parameterized.expand([
('after_create',),
('after_update',),
('after_show',),
('after_delete',),
('after_create', 'False'),
('after_update', 'False'),
('after_show', 'False'),
('after_delete', 'False')
])
def test_packagecontroller_after(self, function, private='True'):
pkg_dict = {'test': 'a', 'private': private, 'allowed_users': 'a,b,c'}
def test_packagecontroller_after_delete(self, function, private='True'):
pkg_dict = {'test': 'a', 'private': private, 'allowed_users': ['a', 'b', 'c']}
expected_pkg_dict = pkg_dict.copy()
result = getattr(self.privateDatasets, function)({}, pkg_dict) # Call the function
self.assertEquals(expected_pkg_dict, result) # Check the result
result = self.privateDatasets.after_delete({}, pkg_dict) # Call the function
self.assertEquals(expected_pkg_dict, result) # Check the result
def test_packagecontroller_after_search(self):
search_res = {'test': 'a', 'private': 'a', 'allowed_users': 'a,b,c'}
search_res = {'test': 'a', 'private': 'a', 'allowed_users': ['a', 'b', 'c']}
expected_search_res = search_res.copy()
result = getattr(self.privateDatasets, 'after_search')(search_res, {}) # Call the function
self.assertEquals(expected_search_res, result) # Check the result
@parameterized.expand([
(True, 1, 1, False, True),
(True, 1, 2, False, True),
(True, 1, 1, True, True),
(True, 1, 2, True, True),
(True, 1, None, None, True),
(True, 1, 1, None, True),
(True, 1, None, True, True),
(True, 1, None, False, True),
(False, 1, 1, False, True),
(False, 1, 2, False, False),
(False, 1, 1, True, True),
(False, 1, 2, True, True),
(False, 1, None, None, False),
(False, 1, 1, None, True),
(False, 1, None, True, True),
(False, 1, None, False, False),
])
def test_packagecontroller_after_show(self, update_via_api, creator_id, user_id, sysadmin, fields_expected):
context = {'updating_via_cb': update_via_api, }
if creator_id is not None or sysadmin is not None:
user = MagicMock()
user.id = user_id
user.sysadmin = sysadmin
context['auth_user_obj'] = user
pkg_dict = {'creator_user_id': creator_id, 'allowed_users': ['a', 'b', 'c'], 'searchable': True, 'adquire_url': 'http://google.es'}
# Call the function
result = self.privateDatasets.after_show(context, pkg_dict) # Call the function
# Check the final result
fields = ['allowed_users', 'searchable', 'adquire_url']
for field in fields:
if fields_expected:
self.assertTrue(field in result)
else:
self.assertFalse(field in result)
@parameterized.expand([
('before_search',),
('before_view',),
@ -357,7 +188,7 @@ class PluginTest(unittest.TestCase):
('delete', 'False')
])
def test_before_and_CRUD(self, function, private='True'):
pkg_dict = {'test': 'a', 'private': private, 'allowed_users': 'a,b,c'}
pkg_dict = {'test': 'a', 'private': private, 'allowed_users': ['a', 'b', 'c']}
expected_pkg_dict = pkg_dict.copy()
result = getattr(self.privateDatasets, function)(pkg_dict) # Call the function
self.assertEquals(expected_pkg_dict, result) # Check the result
@ -382,22 +213,8 @@ class PluginTest(unittest.TestCase):
def test_helpers_functions(self):
helpers_functions = self.privateDatasets.get_helpers()
self.assertEquals(helpers_functions['privatedatasets_adquired'], plugin.adquired)
self.assertEquals(helpers_functions['privatedatasets_adquired'], plugin.helpers.is_adquired)
# TODO: Test after create, after update...
@parameterized.expand([
(False, None, 'user', False),
(True, '', 'user', False),
(True, None, 'user', False),
(True, 'user', 'user', True),
(True, 'another_user,user', 'user', True),
(True, 'another_user,user2', 'user', False),
])
def test_adquired(self, include_allowed_users, allowed_users, user, adquired):
# Configure test
plugin.tk.c.user = user
pkg_dict = {}
if include_allowed_users:
pkg_dict['allowed_users'] = allowed_users
# Check the function returns the expected result
self.assertEquals(adquired, plugin.adquired(pkg_dict))