diff --git a/ckanext/privatedatasets/converters_validators.py b/ckanext/privatedatasets/converters_validators.py index 708dd76..4f7a1d5 100644 --- a/ckanext/privatedatasets/converters_validators.py +++ b/ckanext/privatedatasets/converters_validators.py @@ -12,7 +12,7 @@ def private_datasets_metadata_checker(key, data, errors, context): private_val = data.get(('private',)) # If the private field is not included in the data dict, we must check the current value - if not private_val and dataset_id: + if private_val is None and dataset_id: dataset_dict = toolkit.get_action('package_show')({'ignore_auth': True}, {'id': dataset_id}) private_val = dataset_dict.get('private') @@ -33,7 +33,7 @@ def allowed_users_convert(key, data, errors, context): current_index = max([int(k[1]) for k in data.keys() if len(k) == 2 and k[0] == constants.ALLOWED_USERS] + [-1]) for num, allowed_user in zip(count(current_index + 1), allowed_users): - data[(constants.ALLOWED_USERS, num)] = allowed_user + data[(constants.ALLOWED_USERS, num)] = allowed_user.strip() def get_allowed_users(key, data, errors, context): diff --git a/ckanext/privatedatasets/helpers.py b/ckanext/privatedatasets/helpers.py index 2f1b851..162ccb8 100644 --- a/ckanext/privatedatasets/helpers.py +++ b/ckanext/privatedatasets/helpers.py @@ -8,7 +8,10 @@ def is_adquired(pkg_dict): if db.package_allowed_users_table is None: db.init_db(model) - return len(db.AllowedUser.get(package_id=pkg_dict['id'], user_name=tk.c.user)) > 0 + if tk.c.user: + return len(db.AllowedUser.get(package_id=pkg_dict['id'], user_name=tk.c.user)) > 0 + else: + return False def is_owner(pkg_dict): @@ -21,3 +24,5 @@ def is_owner(pkg_dict): def get_allowed_users_str(users): if users: return ','.join([user for user in users]) + else: + return '' diff --git a/ckanext/privatedatasets/plugin.py b/ckanext/privatedatasets/plugin.py index 43d1ae5..2ed5d5b 100644 --- a/ckanext/privatedatasets/plugin.py +++ b/ckanext/privatedatasets/plugin.py @@ -210,7 +210,7 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm): return pkg_dict ###################################################################### - ########################## ITEMPLATESHELER ########################### + ######################### ITEMPLATESHELPER ########################### ###################################################################### def get_helpers(self): diff --git a/ckanext/privatedatasets/tests/test_auth.py b/ckanext/privatedatasets/tests/test_auth.py new file mode 100644 index 0000000..0b0bd62 --- /dev/null +++ b/ckanext/privatedatasets/tests/test_auth.py @@ -0,0 +1,210 @@ +import unittest +import ckanext.privatedatasets.auth as auth + +from mock import MagicMock +from nose_parameterized import parameterized + + +class AuthTest(unittest.TestCase): + + def setUp(self): + # Create mocks + self._logic_auth = auth.logic_auth + auth.logic_auth = MagicMock() + + self._request = auth.request + auth.request = MagicMock() + + self._helpers = auth.helpers + auth.helpers = MagicMock() + + self._new_authz = auth.new_authz + auth.new_authz = MagicMock() + + self._tk = auth.tk + auth.tk = MagicMock() + + self._db = auth.db + auth.db = MagicMock() + + def tearDown(self): + auth.logic_auth = self._logic_auth + auth.request = self._request + auth.helpers = self._helpers + auth.new_authz = self._new_authz + auth.tk = self._tk + auth.db = self._db + + if hasattr(self, '_package_show'): + auth.package_show = self._package_show + + def test_decordators(self): + self.assertEquals(True, getattr(auth.package_show, 'auth_allow_anonymous_access', False)) + self.assertEquals(True, getattr(auth.resource_show, 'auth_allow_anonymous_access', False)) + + @parameterized.expand([ + # Anonymous user (public) + (None, None, None, False, 'active', None, None, None, None, None, True), + # Anonymous user (private) + (None, None, None, True, 'active', None, None, None, None, '/', False), + (None, None, '', True, 'active', None, None, '', None, '/', False), + # Anonymous user (private). Buy URL not shown + (None, None, None, True, 'active', None, None, None, 'google.es', '/', False), + # Anonymous user (private). Buy URL show + (None, None, None, True, 'active', None, None, None, 'google.es', '/dataset/testds', False), + # The creator can always see the dataset + (1, 1, None, False, 'active', None, None, None, None, None, True), + (1, 1, None, True, 'active', None, None, None, None, None, True), + (1, 1, None, False, 'draft', None, None, None, None, None, True), + # Other user (no organizations) + (1, 2, 'test', False, 'active', None, None, None, None, None, True), + (1, 2, 'test', True, 'active', None, None, None, 'google.es', '/', False), # Buy MSG not shown + (1, 2, 'test', True, 'active', None, None, None, None, '/dataset/testds', False), # Buy MSG not shown + (1, 2, 'test', True, 'active', None, None, None, 'google.es', '/dataset/testds', False), # Buy MSG shown + (1, 2, 'test', False, 'draft', None, None, None, None, None, False), + # Other user but authorized in the list of authorized users + (1, 2, 'test', True, 'active', None, None, True, None, None, True), + # Other user and not authorized in the list of authorized users + (1, 2, 'test', True, 'active', None, None, False, 'google.es', '/', False), + (1, 2, 'test', True, 'active', None, None, False, 'google.es', '/dataset/testds', False), + # Other user with organizations + (1, 2, 'test', False, 'active', 'conwet', False, None, None, None, True), + (1, 2, 'test', True, 'active', 'conwet', False, None, None, None, False), + (1, 2, 'test', True, 'active', 'conwet', True, None, None, None, True), + (1, 2, 'test', True, 'draft', 'conwet', True, None, None, None, False), + # Other user with organizations (user is not in the organization) + (1, 2, 'test', True, 'active', 'conwet', False, True, None, None, True), + (1, 2, 'test', True, 'active', 'conwet', False, False, None, None, False), + (1, 2, 'test', True, 'active', 'conwet', False, False, 'google.es', '/dataset/testds', False), + (1, 2, 'test', True, 'active', 'conwet', False, False, 'google.es', '/', False) + ]) + def test_auth_package_show(self, creator_user_id, user_obj_id, user, private, state, owner_org, + owner_member, db_auth, adquire_url, request_path, authorized): + + # Configure the mocks + returned_package = MagicMock() + returned_package.creator_user_id = creator_user_id + returned_package.private = private + returned_package.state = state + returned_package.owner_org = owner_org + returned_package.extras = {} + + # Configure the database + db_response = [] + if db_auth is True: + out = auth.db.AllowedUser() + out.package_id = 'package_id' + out.user_name = user + db_response.append(out) + + auth.db.AllowedUser.get = MagicMock(return_value=db_response) + + if adquire_url: + returned_package.extras['adquire_url'] = adquire_url + + auth.logic_auth.get_package_object = MagicMock(return_value=returned_package) + auth.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member) + auth.request.path = MagicMock(return_value=request_path) + + # Prepare the context + context = {'model': MagicMock()} + if user is not None: + context['user'] = user + if user_obj_id is not None: + context['auth_user_obj'] = MagicMock() + context['auth_user_obj'].id = user_obj_id + + # Function to be tested + result = auth.package_show(context, {}) + + # Check the result + self.assertEquals(authorized, result['success']) + + # Check that the mocks has been called properly + if private and owner_org and state == 'active': + auth.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'read') + + # Conditions to buy a dataset; It should be private, active and should not belong to any organization + if not authorized and state == 'active' and not owner_org and request_path.startswith('/dataset/'): + auth.helpers.flash_error.assert_called_once() + + @parameterized.expand([ + (None, None, None, None, None, False), # Anonymous user + (1, 1, None, None, None, True), # A user can edit its dataset + (1, 2, None, None, None, False), # A user cannot edit a dataset belonging to another user + (1, 2, 'test', 'conwet', False, False), # User without rights to update a dataset + (1, 2, 'test', 'conwet', True, True), # User with rights to update a dataset + ]) + def test_auth_package_update(self, creator_user_id, user_obj_id, user, owner_org, owner_member, authorized): + + # Configure the mocks + returned_package = MagicMock() + returned_package.creator_user_id = creator_user_id + returned_package.owner_org = owner_org + + auth.logic_auth.get_package_object = MagicMock(return_value=returned_package) + auth.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member) + + # Prepare the context + context = {} + if user is not None: + context['user'] = user + if user_obj_id is not None: + context['auth_user_obj'] = MagicMock() + context['auth_user_obj'].id = user_obj_id + + # Function to be tested + result = auth.package_update(context, {}) + + # Check the result + self.assertEquals(authorized, result['success']) + + # Check that the mock has been called properly + if creator_user_id != user_obj_id and owner_org: + auth.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'update_dataset') + + @parameterized.expand([ + (True, True), + (True, False), + (False, False), + (False, False) + ]) + def test_auth_resource_show(self, exist_pkg=True, authorized_pkg=True): + #Recover the exception + auth.tk.ObjectNotFound = self._tk.ObjectNotFound + + # Mock the calls + package = MagicMock() + package.id = '1' + + final_query = MagicMock() + final_query.first = MagicMock(return_value=package if exist_pkg else None) + + second_join = MagicMock() + second_join.filter = MagicMock(return_value=final_query) + + first_join = MagicMock() + first_join.join = MagicMock(return_value=second_join) + + query = MagicMock() + query.join = MagicMock(return_value=first_join) + + model = MagicMock() + session = MagicMock() + session.query = MagicMock(return_value=query) + model.Session = session + + # Create the context + context = {} + context['model'] = model + + # Mock the package_show function + self._package_show = auth.package_show + success = True if authorized_pkg else False + auth.package_show = MagicMock(return_value={'success': success}) + + if not exist_pkg: + self.assertRaises(self._tk.ObjectNotFound, auth.resource_show, context, {}) + else: + result = auth.resource_show(context, {}) + self.assertEquals(authorized_pkg, result['success']) diff --git a/ckanext/privatedatasets/tests/test_converters_validators.py b/ckanext/privatedatasets/tests/test_converters_validators.py new file mode 100644 index 0000000..0381e9a --- /dev/null +++ b/ckanext/privatedatasets/tests/test_converters_validators.py @@ -0,0 +1,147 @@ +import unittest +import ckanext.privatedatasets.converters_validators as conv_val + +from mock import MagicMock +from nose_parameterized import parameterized + + +class ConvertersValidatorsTest(unittest.TestCase): + + def setUp(self): + # Create mocks + self._toolkit = conv_val.toolkit + conv_val.toolkit = MagicMock() + + self._db = conv_val.db + conv_val.db = MagicMock() + + def tearDown(self): + conv_val.db = self._db + conv_val.toolkit = self._toolkit + + @parameterized.expand([ + # When no data is present, no errors should be returned + (True, True, 'conwet', '', False), + ('True', True, 'conwet', '', False), + (False, True, 'conwet', '', False), + ('False', True, 'conwet', '', False), + (None, True, 'conwet', '', False), + (None, False, 'conwet', '', False), + (True, True, None, '', False), + ('True', True, None, '', False), + (False, True, None, '', False), + ('False', True, None, '', False), + (None, True, None, '', False), + (None, False, None, '', False), + # When data is present, the field is only valid when the + # the private field is set to true (organization should + # not be taken into account anymore) + (True, True, 'conwet', 'test', False), + ('True', True, 'conwet', 'test', False), + (False, True, 'conwet', 'test', True), + ('False', True, 'conwet', 'test', True), + (None, True, 'conwet', 'test', False), + (None, False, 'conwet', 'test', True), + (True, True, None, 'test', False), + ('True', True, None, 'test', False), + (False, True, None, 'test', True), + ('False', True, None, 'test', True), + (None, True, None, 'test', False), + (None, False, None, 'test', True), + ]) + def test_metadata_checker(self, received_private, package_private, owner_org, metada_val, error_set): + + # Configure the mocks + package_show = MagicMock(return_value={'private': package_private, 'id': 'package_id'}) + conv_val.toolkit.get_action = MagicMock(return_value=package_show) + + KEY = ('test',) + errors = {} + errors[KEY] = [] + + data = {} + data[('id',)] = 'package_id' + data[('owner_org',)] = owner_org + if received_private is not None: + data[('private',)] = received_private + data[KEY] = metada_val + + conv_val.private_datasets_metadata_checker(KEY, data, errors, {}) + + if error_set: + self.assertEquals(1, len(errors[KEY])) + else: + self.assertEquals(0, len(errors[KEY])) + + @parameterized.expand([ + ('', 0, []), + ('', 2, []), + ('a', 0, ['a']), + ('a', 2, ['a']), + ('a,z, d', 0, ['a', 'z', 'd']), + ('a,z, d', 2, ['a', 'z', 'd']) + ]) + def test_allowed_user_convert(self, users_str, previous_users, expected_users): + key_str = 'allowed_users_str' + key = 'allowed_users' + + data = {(key_str,): users_str} + for i in range(0, previous_users): + data[(key, i)] = i + + # Call the function + conv_val.allowed_users_convert((key_str,), data, {}, {}) + + # Check that the users are set properly + for i in range(previous_users, previous_users + len(expected_users)): + self.assertEquals(expected_users[i - previous_users], data[(key, i)]) + + @parameterized.expand([ + ([], True), + (['a'], True), + (['a', 'b'], True), + (['a', 'b', 'c'], True), + (['a', 'b', 'c', 'd', 'e'], True), + ([], False), + (['a'], False), + (['a', 'b'], False), + (['a', 'b', 'c'], False), + (['a', 'b', 'c', 'd', 'e'], False) + ]) + def test_get_allowed_users(self, users, table_initialized): + key = 'allowed_users' + data = {('id',): 'package_id'} + + # Table init + conv_val.db.package_allowed_users_table = None if not table_initialized else MagicMock() + + # Each time 'AllowedUser' is called, we must get a new instance + # and this is the way to get this behaviour + def constructor(): + return MagicMock() + + conv_val.db.AllowedUser = MagicMock(side_effect=constructor) + + # Create the users + db_res = [] + for user in users: + db_row = conv_val.db.AllowedUser() + db_row.package_id = 'package_id' + db_row.user_name = user + db_res.append(db_row) + + conv_val.db.AllowedUser.get = MagicMock(return_value=db_res) + + # Call the function + context = {'model': MagicMock()} + conv_val.get_allowed_users((key,), data, {}, context) + + # Check that the users are set properly + for i, user in enumerate(users): + self.assertEquals(user, data[(key, i)]) + + # Check that the table has been initialized properly + if not table_initialized: + conv_val.db.init_db.assert_called_once_with(context['model']) + else: + self.assertEquals(0, conv_val.db.init_db.call_count) diff --git a/ckanext/privatedatasets/tests/test_helpers.py b/ckanext/privatedatasets/tests/test_helpers.py new file mode 100644 index 0000000..6a7981f --- /dev/null +++ b/ckanext/privatedatasets/tests/test_helpers.py @@ -0,0 +1,82 @@ +import unittest +import ckanext.privatedatasets.helpers as helpers + +from mock import MagicMock +from nose_parameterized import parameterized + + +class HelpersTest(unittest.TestCase): + + def setUp(self): + # Create mocks + self._model = helpers.model + helpers.model = MagicMock() + + self._tk = helpers.tk + helpers.tk = MagicMock() + + self._db = helpers.db + helpers.db = MagicMock() + + def tearDown(self): + helpers.model = self._model + helpers.tk = self._tk + helpers.db = self._db + + @parameterized.expand([ + (False, False, 'user', False), + (True, False, 'user', True), + (False, True, 'user', False), + (True, True, 'user', True), + (False, False, None, False), + (True, False, None, False), + (False, True, None, False), + (True, True, None, False), + ]) + def test_is_adquired(self, db_auth, table_initialized, user, adquired): + # Configure test + helpers.tk.c.user = user + pkg_dict = {'id': 'package_id'} + helpers.db.package_allowed_users_table = None if not table_initialized else MagicMock() + + db_response = [] + if db_auth is True: + out = helpers.db.AllowedUser() + out.package_id = 'package_id' + out.user_name = user + db_response.append(out) + + helpers.db.AllowedUser.get = MagicMock(return_value=db_response) + + # Check the function returns the expected result + self.assertEquals(adquired, helpers.is_adquired(pkg_dict)) + + if not table_initialized: + helpers.db.init_db.assert_called_once_with(helpers.model) + else: + self.assertEquals(0, helpers.db.init_db.call_count) + + @parameterized.expand([ + (1, 1, True), + (1, 2, False), + (1, None, False) + ]) + def test_is_owner(self, creator_user_id, user_id, owner): + # Configure test + user = MagicMock() + user.id = user_id + helpers.tk.c.userobj = user + + pkg_dict = {'creator_user_id': creator_user_id} + + # Check that the functions return the expected result + self.assertEquals(owner, helpers.is_owner(pkg_dict)) + + @parameterized.expand([ + ([], ''), + (['a'], 'a'), + (['a', 'b'], 'a,b'), + (['a', 'b', 'c', 'd'], 'a,b,c,d'), + ]) + def test_get_allowed_users_str(self, allowed_users, expected_result): + self.assertEquals(expected_result, helpers.get_allowed_users_str(allowed_users)) diff --git a/ckanext/privatedatasets/tests/test_plugin.py b/ckanext/privatedatasets/tests/test_plugin.py index 5249609..7afd727 100644 --- a/ckanext/privatedatasets/tests/test_plugin.py +++ b/ckanext/privatedatasets/tests/test_plugin.py @@ -12,212 +12,30 @@ class PluginTest(unittest.TestCase): self.privateDatasets = plugin.PrivateDatasets() # Create mocks - self._logic_auth = plugin.logic_auth - plugin.logic_auth = MagicMock() - - self._request = plugin.request - plugin.request = MagicMock() - - self._helpers = plugin.helpers - plugin.helpers = MagicMock() - - self._new_authz = plugin.new_authz - plugin.new_authz = MagicMock() - self._tk = plugin.tk plugin.tk = MagicMock() def tearDown(self): - plugin.logic_auth = self._logic_auth - plugin.request = self._request - plugin.helpers = self._helpers - plugin.new_authz = self._new_authz plugin.tk = self._tk - if hasattr(self, '_package_show'): - plugin.package_show = self._package_show - @parameterized.expand([ (plugin.p.IDatasetForm,), (plugin.p.IAuthFunctions,), (plugin.p.IConfigurer,), (plugin.p.IRoutes,), + (plugin.p.IActions,), (plugin.p.IPackageController,), (plugin.p.ITemplateHelpers,) ]) def test_implementations(self, interface): self.assertTrue(interface.implemented_by(plugin.PrivateDatasets)) - def test_decordators(self): - self.assertEquals(True, getattr(plugin.package_show, 'auth_allow_anonymous_access', False)) - self.assertEquals(True, getattr(plugin.resource_show, 'auth_allow_anonymous_access', False)) - - @parameterized.expand([ - # Anonymous user (public) - (None, None, None, False, 'active', None, None, None, None, None, True), - # Anonymous user (private) - (None, None, None, True, 'active', None, None, None, None, '/', False), - (None, None, '', True, 'active', None, None, '', None, '/', False), - # Anonymous user (private). Buy URL not shown - (None, None, None, True, 'active', None, None, None, 'google.es', '/', False), - # Anonymous user (private). Buy URL shown - (None, None, None, True, 'active', None, None, None, 'google.es', '/dataset/testds', False), - # The creator can always see the dataset - (1, 1, None, False, 'active', None, None, None, None, None, True), - (1, 1, None, True, 'active', None, None, None, None, None, True), - (1, 1, None, False, 'draft', None, None, None, None, None, True), - # Other user (no organizations) - (1, 2, 'test', False, 'active', None, None, None, None, None, True), - (1, 2, 'test', True, 'active', None, None, None, 'google.es', '/', False), # Buy MSG not shown - (1, 2, 'test', True, 'active', None, None, None, None, '/dataset/testds', False), # Buy MSG not shown - (1, 2, 'test', True, 'active', None, None, None, 'google.es', '/dataset/testds', False), # Buy MSG shown - (1, 2, 'test', False, 'draft', None, None, None, None, None, False), - # Other user but authorized in the list of authorized users - (1, 2, 'test', True, 'active', None, None, 'some,another,test,other', None, None, True), - (1, 2, 'test', True, 'active', None, None, 'test', None, None, True), - # Other user and not authorized in the list of authorized users - (1, 2, 'test', True, 'active', None, None, 'some,another,other', 'google.es', '/', False), - (1, 2, 'test', True, 'active', None, None, 'some,another,other', 'google.es', '/dataset/testds', False), - # Other user with organizations - (1, 2, 'test', False, 'active', 'conwet', False, None, None, None, True), - (1, 2, 'test', True, 'active', 'conwet', False, None, None, None, False), - (1, 2, 'test', True, 'active', 'conwet', True, None, None, None, True), - (1, 2, 'test', True, 'draft', 'conwet', True, None, None, None, False), - # Other user with organizations (user is not in the organization) - (1, 2, 'test', True, 'active', 'conwet', False, 'test', None, None, True), - (1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', None, None, False), - (1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', 'google.es', '/dataset/testds', False), - (1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', 'google.es', '/', False) - ]) - def test_auth_package_show(self, creator_user_id, user_obj_id, user, private, state, owner_org, - owner_member, allowed_users, adquire_url, request_path, authorized): - - # Configure the mocks - returned_package = MagicMock() - returned_package.creator_user_id = creator_user_id - returned_package.private = private - returned_package.state = state - returned_package.owner_org = owner_org - returned_package.extras = {} - - if allowed_users is not None: - returned_package.extras['allowed_users'] = allowed_users - - if adquire_url: - returned_package.extras['adquire_url'] = adquire_url - - plugin.logic_auth.get_package_object = MagicMock(return_value=returned_package) - plugin.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member) - plugin.request.path = MagicMock(return_value=request_path) - - # Prepare the context - context = {} - if user is not None: - context['user'] = user - if user_obj_id is not None: - context['auth_user_obj'] = MagicMock() - context['auth_user_obj'].id = user_obj_id - - # Function to be tested - result = plugin.package_show(context, {}) - - # Check the result - self.assertEquals(authorized, result['success']) - - # Check that the mocks has been called properly - if private and owner_org and state == 'active': - plugin.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'read') - - # Conditions to buy a dataset; It should be private, active and should not belong to any organization - if not authorized and state == 'active' and not owner_org and request_path.startswith('/dataset/'): - plugin.helpers.flash_error.assert_called_once() - - @parameterized.expand([ - (None, None, None, None, None, False), # Anonymous user - (1, 1, None, None, None, True), # A user can edit its dataset - (1, 2, None, None, None, False), # A user cannot edit a dataset belonging to another user - (1, 2, 'test', 'conwet', False, False), # User without rights to update a dataset - (1, 2, 'test', 'conwet', True, True), # User with rights to update a dataset - ]) - def test_auth_package_update(self, creator_user_id, user_obj_id, user, owner_org, owner_member, authorized): - - # Configure the mocks - returned_package = MagicMock() - returned_package.creator_user_id = creator_user_id - returned_package.owner_org = owner_org - - plugin.logic_auth.get_package_object = MagicMock(return_value=returned_package) - plugin.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member) - - # Prepare the context - context = {} - if user is not None: - context['user'] = user - if user_obj_id is not None: - context['auth_user_obj'] = MagicMock() - context['auth_user_obj'].id = user_obj_id - - # Function to be tested - result = plugin.package_update(context, {}) - - # Check the result - self.assertEquals(authorized, result['success']) - - # Check that the mock has been called properly - if creator_user_id != user_obj_id and owner_org: - plugin.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'update_dataset') - - @parameterized.expand([ - (True, True), - (True, False), - (False, False), - (False, False) - ]) - def test_auth_resource_show(self, exist_pkg=True, authorized_pkg=True): - #Recover the exception - plugin.tk.ObjectNotFound = self._tk.ObjectNotFound - - # Mock the calls - package = MagicMock() - package.id = '1' - - final_query = MagicMock() - final_query.first = MagicMock(return_value=package if exist_pkg else None) - - second_join = MagicMock() - second_join.filter = MagicMock(return_value=final_query) - - first_join = MagicMock() - first_join.join = MagicMock(return_value=second_join) - - query = MagicMock() - query.join = MagicMock(return_value=first_join) - - model = MagicMock() - session = MagicMock() - session.query = MagicMock(return_value=query) - model.Session = session - - # Create the context - context = {} - context['model'] = model - - # Mock the package_show function - self._package_show = plugin.package_show - success = True if authorized_pkg else False - plugin.package_show = MagicMock(return_value={'success': success}) - - if not exist_pkg: - self.assertRaises(self._tk.ObjectNotFound, plugin.resource_show, context, {}) - else: - result = plugin.resource_show(context, {}) - self.assertEquals(authorized_pkg, result['success']) - def test_auth_functions(self): auth_functions = self.privateDatasets.get_auth_functions() - self.assertEquals(auth_functions['package_show'], plugin.package_show) - self.assertEquals(auth_functions['package_update'], plugin.package_update) - self.assertEquals(auth_functions['resource_show'], plugin.resource_show) + self.assertEquals(auth_functions['package_show'], plugin.auth.package_show) + self.assertEquals(auth_functions['package_update'], plugin.auth.package_update) + self.assertEquals(auth_functions['resource_show'], plugin.auth.resource_show) + self.assertEquals(auth_functions['package_adquired'], plugin.auth.package_adquired) def test_update_config(self): # Call the method @@ -234,85 +52,13 @@ class PluginTest(unittest.TestCase): self.privateDatasets.after_map(m) # Test that the connect method has been called - m.connect.assert_any_call('/dataset_adquired', - controller='ckanext.privatedatasets.controllers.api_controller:AdquiredDatasetsControllerAPI', - action='add_users', conditions=dict(method=['POST'])) m.connect.assert_any_call('user_adquired_datasets', '/dashboad/adquired', ckan_icon='shopping-cart', controller='ckanext.privatedatasets.controllers.ui_controller:AdquiredDatasetsControllerUI', action='user_adquired_datasets', conditions=dict(method=['GET'])) - @parameterized.expand([ - ('create_package_schema'), - ('update_package_schema'), - ]) - def test_schema_create_update(self, function_name): - - function = getattr(self.privateDatasets, function_name) - returned_schema = function() - - self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema['private']) - self.assertTrue(plugin.tk.get_validator('boolean_validator') in returned_schema['private']) - self.assertEquals(2, len(returned_schema['private'])) - - fields = ['allowed_users', 'adquire_url'] - - for field in fields: - self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema[field]) - self.assertTrue(plugin.tk.get_converter('convert_to_extras') in returned_schema[field]) - self.assertTrue(plugin.private_datasets_metadata_checker in returned_schema[field]) - self.assertEquals(3, len(returned_schema[field])) - - def test_schema_show(self): - - returned_schema = self.privateDatasets.show_package_schema() - - fields = ['allowed_users', 'adquire_url'] - - for field in fields: - self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema[field]) - self.assertTrue(plugin.tk.get_converter('convert_from_extras') in returned_schema[field]) - self.assertEquals(2, len(returned_schema[field])) - - @parameterized.expand([ - # When no data is present, no errors should be returned - (True, 'conwet', '', False), - ('True', 'conwet', '', False), - (False, 'conwet', '', False), - ('False', 'conwet', '', False), - (True, None, '', False), - ('True', None, '', False), - (False, None, '', False), - ('False', None, '', False), - # When data is present, the field is only valid when the - # organization is not set and the private field is set to true - (True, 'conwet', 'test', False), - ('True', 'conwet', 'test', False), - (False, 'conwet', 'test', True), - ('False', 'conwet', 'test', True), - (True, None, 'test', False), - ('True', None, 'test', False), - (False, None, 'test', True), - ('False', None, 'test', True), - ]) - def test_metadata_checker(self, private, owner_org, metada_val, error_set): - - # TODO: Maybe this test should be refactored since the function should be refactored - - KEY = ('test',) - errors = {} - errors[KEY] = [] - - data = {} - data[('private',)] = private - data[('owner_org',)] = owner_org - data[KEY] = metada_val - - plugin.private_datasets_metadata_checker(KEY, data, errors, {}) - - if error_set: - self.assertEquals(1, len(errors[KEY])) - else: - self.assertEquals(0, len(errors[KEY])) + def test_actions_functions(self): + actions = self.privateDatasets.get_actions() + self.assertEquals(actions['package_adquired'], plugin.actions.package_adquired) def test_fallback(self): self.assertEquals(True, self.privateDatasets.is_fallback()) @@ -320,28 +66,113 @@ class PluginTest(unittest.TestCase): def test_package_types(self): self.assertEquals([], self.privateDatasets.package_types()) + ###################################################################### + ############################## SCHEMAS ############################### + ###################################################################### + + def _check_fields(self, schema, fields): + for field in fields: + for checker_validator in fields[field]: + self.assertTrue(checker_validator in schema[field]) + self.assertEquals(len(fields[field]), len(schema[field])) + + @parameterized.expand([ + ('create_package_schema'), + ('update_package_schema'), + ]) + def test_schema_create_update(self, function_name): + + function = getattr(self.privateDatasets, function_name) + returned_schema = function() + + fields = { + 'private': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_validator('boolean_validator')], + 'adquire_url': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_converter('convert_to_extras'), + plugin.conv_val.private_datasets_metadata_checker], + 'searchable': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_validator('boolean_validator'), + plugin.tk.get_converter('convert_to_extras'), plugin.conv_val.private_datasets_metadata_checker], + 'allowed_users_str': [plugin.tk.get_validator('ignore_missing'), plugin.conv_val.allowed_users_convert, + plugin.conv_val.private_datasets_metadata_checker], + 'allowed_users': [plugin.tk.get_validator('ignore_missing'), plugin.conv_val.private_datasets_metadata_checker] + } + + self._check_fields(returned_schema, fields) + + def test_schema_show(self): + + returned_schema = self.privateDatasets.show_package_schema() + + fields = ['searchable', 'adquire_url'] + + fields = { + 'adquire_url': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_converter('convert_from_extras')], + 'searchable': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_converter('convert_from_extras')], + 'allowed_users': [plugin.tk.get_validator('ignore_missing'), plugin.conv_val.get_allowed_users] + } + + self._check_fields(returned_schema, fields) + + ###################################################################### + ############################## PACKAGE ############################### + ###################################################################### + @parameterized.expand([ - ('after_create',), - ('after_update',), - ('after_show',), ('after_delete',), - ('after_create', 'False'), - ('after_update', 'False'), - ('after_show', 'False'), ('after_delete', 'False') ]) - def test_packagecontroller_after(self, function, private='True'): - pkg_dict = {'test': 'a', 'private': private, 'allowed_users': 'a,b,c'} + def test_packagecontroller_after_delete(self, function, private='True'): + pkg_dict = {'test': 'a', 'private': private, 'allowed_users': ['a', 'b', 'c']} expected_pkg_dict = pkg_dict.copy() - result = getattr(self.privateDatasets, function)({}, pkg_dict) # Call the function - self.assertEquals(expected_pkg_dict, result) # Check the result + result = self.privateDatasets.after_delete({}, pkg_dict) # Call the function + self.assertEquals(expected_pkg_dict, result) # Check the result def test_packagecontroller_after_search(self): - search_res = {'test': 'a', 'private': 'a', 'allowed_users': 'a,b,c'} + search_res = {'test': 'a', 'private': 'a', 'allowed_users': ['a', 'b', 'c']} expected_search_res = search_res.copy() result = getattr(self.privateDatasets, 'after_search')(search_res, {}) # Call the function self.assertEquals(expected_search_res, result) # Check the result + @parameterized.expand([ + (True, 1, 1, False, True), + (True, 1, 2, False, True), + (True, 1, 1, True, True), + (True, 1, 2, True, True), + (True, 1, None, None, True), + (True, 1, 1, None, True), + (True, 1, None, True, True), + (True, 1, None, False, True), + (False, 1, 1, False, True), + (False, 1, 2, False, False), + (False, 1, 1, True, True), + (False, 1, 2, True, True), + (False, 1, None, None, False), + (False, 1, 1, None, True), + (False, 1, None, True, True), + (False, 1, None, False, False), + ]) + def test_packagecontroller_after_show(self, update_via_api, creator_id, user_id, sysadmin, fields_expected): + + context = {'updating_via_cb': update_via_api, } + + if creator_id is not None or sysadmin is not None: + user = MagicMock() + user.id = user_id + user.sysadmin = sysadmin + context['auth_user_obj'] = user + + pkg_dict = {'creator_user_id': creator_id, 'allowed_users': ['a', 'b', 'c'], 'searchable': True, 'adquire_url': 'http://google.es'} + + # Call the function + result = self.privateDatasets.after_show(context, pkg_dict) # Call the function + + # Check the final result + fields = ['allowed_users', 'searchable', 'adquire_url'] + for field in fields: + if fields_expected: + self.assertTrue(field in result) + else: + self.assertFalse(field in result) + @parameterized.expand([ ('before_search',), ('before_view',), @@ -357,7 +188,7 @@ class PluginTest(unittest.TestCase): ('delete', 'False') ]) def test_before_and_CRUD(self, function, private='True'): - pkg_dict = {'test': 'a', 'private': private, 'allowed_users': 'a,b,c'} + pkg_dict = {'test': 'a', 'private': private, 'allowed_users': ['a', 'b', 'c']} expected_pkg_dict = pkg_dict.copy() result = getattr(self.privateDatasets, function)(pkg_dict) # Call the function self.assertEquals(expected_pkg_dict, result) # Check the result @@ -382,22 +213,8 @@ class PluginTest(unittest.TestCase): def test_helpers_functions(self): helpers_functions = self.privateDatasets.get_helpers() - self.assertEquals(helpers_functions['privatedatasets_adquired'], plugin.adquired) + self.assertEquals(helpers_functions['privatedatasets_adquired'], plugin.helpers.is_adquired) + + # TODO: Test after create, after update... - @parameterized.expand([ - (False, None, 'user', False), - (True, '', 'user', False), - (True, None, 'user', False), - (True, 'user', 'user', True), - (True, 'another_user,user', 'user', True), - (True, 'another_user,user2', 'user', False), - ]) - def test_adquired(self, include_allowed_users, allowed_users, user, adquired): - # Configure test - plugin.tk.c.user = user - pkg_dict = {} - if include_allowed_users: - pkg_dict['allowed_users'] = allowed_users - # Check the function returns the expected result - self.assertEquals(adquired, plugin.adquired(pkg_dict))