Add complex tests to Controller test. Change add_user function name to add_users
This commit is contained in:
parent
cb4c8ea314
commit
3af8917175
|
@ -19,7 +19,7 @@ class AdquiredDatasetsController(base.BaseController):
|
|||
environ['pylons.status_code_redirect'] = True
|
||||
return base.BaseController.__call__(self, environ, start_response)
|
||||
|
||||
def add_user(self):
|
||||
def add_users(self):
|
||||
|
||||
log.info('Notification Request received')
|
||||
|
||||
|
@ -55,7 +55,7 @@ class AdquiredDatasetsController(base.BaseController):
|
|||
dataset = plugins.toolkit.get_action('package_show')({'ignore_auth': True}, {'id': dataset_id})
|
||||
|
||||
# Generate default set of users if it does not exist
|
||||
if 'allowed_users' not in dataset:
|
||||
if 'allowed_users' not in dataset or dataset['allowed_users'] is None:
|
||||
dataset['allowed_users'] = ''
|
||||
|
||||
# Only new users will be inserted
|
||||
|
|
|
@ -185,7 +185,7 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
|
|||
# DataSet adquired notification
|
||||
m.connect('/dataset_adquired',
|
||||
controller='ckanext.privatedatasets.controller:AdquiredDatasetsController',
|
||||
action='add_user', conditions=dict(method=['POST']))
|
||||
action='add_users', conditions=dict(method=['POST']))
|
||||
|
||||
return m
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import unittest
|
||||
import ckanext.privatedatasets.controller as controller
|
||||
import json
|
||||
import unittest
|
||||
|
||||
from mock import MagicMock
|
||||
from nose_parameterized import parameterized
|
||||
|
@ -62,7 +63,7 @@ class ControllerTest(unittest.TestCase):
|
|||
return_value=package if path_exist else None)
|
||||
|
||||
# Call the function
|
||||
result = self.instance.add_user()
|
||||
result = self.instance.add_users()
|
||||
|
||||
# Checks
|
||||
self.assertEquals(expected_error, result)
|
||||
|
@ -71,7 +72,7 @@ class ControllerTest(unittest.TestCase):
|
|||
if expected_error:
|
||||
self.assertEquals(400, controller.response.status_int)
|
||||
|
||||
def configure_mocks(self, parse_result, ds_not_found=None, error_update=None, dataset=None):
|
||||
def configure_mocks(self, parse_result, datasets_not_found=[], not_updatable_datasets=[], allowed_users=None):
|
||||
|
||||
controller.config = {PARSER_CONFIG_PROP: 'valid.path:%s' % CLASS_NAME}
|
||||
|
||||
|
@ -87,10 +88,18 @@ class ControllerTest(unittest.TestCase):
|
|||
controller.plugins.toolkit.ObjectNotFound = self._plugins.toolkit.ObjectNotFound
|
||||
controller.plugins.toolkit.ValidationError = self._plugins.toolkit.ValidationError
|
||||
|
||||
package_show = MagicMock(side_effect=controller.plugins.toolkit.ObjectNotFound if ds_not_found else None,
|
||||
return_value=dataset)
|
||||
package_update = MagicMock(side_effect=controller.plugins.toolkit.ValidationError(
|
||||
{'allowed_users': [ADD_USERS_ERROR]}) if error_update else None)
|
||||
def _package_show(context, data_dict):
|
||||
if data_dict['id'] in datasets_not_found:
|
||||
raise controller.plugins.toolkit.ObjectNotFound()
|
||||
else:
|
||||
return {'id': data_dict['id'], 'allowed_users': allowed_users}
|
||||
|
||||
def _package_update(context, data_dict):
|
||||
if data_dict['id'] in not_updatable_datasets:
|
||||
raise controller.plugins.toolkit.ValidationError({'allowed_users': [ADD_USERS_ERROR]})
|
||||
|
||||
package_show = MagicMock(side_effect=_package_show)
|
||||
package_update = MagicMock(side_effect=_package_update)
|
||||
|
||||
def _get_action(action):
|
||||
if action == 'package_update':
|
||||
|
@ -112,7 +121,7 @@ class ControllerTest(unittest.TestCase):
|
|||
package_search, package_update = self.configure_mocks(parse_result)
|
||||
|
||||
# Call the function
|
||||
result = self.instance.add_user()
|
||||
result = self.instance.add_users()
|
||||
|
||||
# Checks
|
||||
self.assertEquals(0, package_search.call_count)
|
||||
|
@ -121,59 +130,68 @@ class ControllerTest(unittest.TestCase):
|
|||
self.assertEquals(400, controller.response.status_int)
|
||||
|
||||
@parameterized.expand([
|
||||
(False, False, None),
|
||||
(False, False, ''),
|
||||
(False, False, 'another_user'),
|
||||
(False, False, 'another_user,another_one'),
|
||||
(False, False, 'another_user,user_name'),
|
||||
(True, False),
|
||||
(False, True, None),
|
||||
(False, True, ''),
|
||||
(False, True, 'another_user'),
|
||||
(False, True, 'another_user,another_one'),
|
||||
# Simple Test: one user and one dataset
|
||||
({'user_name': ['ds1']}, [], [], None),
|
||||
({'user_name': ['ds1']}, [], [], ''),
|
||||
({'user_name': ['ds1']}, [], [], 'another_user'),
|
||||
({'user_name': ['ds1']}, [], [], 'another_user,another_one'),
|
||||
({'user_name': ['ds1']}, [], [], 'another_user,user_name'),
|
||||
({'user_name': ['ds1']}, ['ds1'], [], None),
|
||||
({'user_name': ['ds1']}, [], ['ds1'], ''),
|
||||
({'user_name': ['ds1']}, [], ['ds1'], 'another_user'),
|
||||
({'user_name': ['ds1']}, [], ['ds1'], 'another_user,another_one'),
|
||||
({'user_name': ['ds1']}, [], ['ds1'], 'another_user,user_name'),
|
||||
|
||||
# Complex test: some users and some datasets
|
||||
({'user1': ['ds1', 'ds2', 'ds3', 'ds4'], 'user2': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], ''),
|
||||
({'user1': ['ds1', 'ds2', 'ds3', 'ds4'], 'user2': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], 'another_user'),
|
||||
({'user1': ['ds1', 'ds2', 'ds3', 'ds4'], 'user2': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], 'another_user,another_one'),
|
||||
({'user1': ['ds1', 'ds2', 'ds3', 'ds4'], 'user2': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], 'another_user,another_one,user1')
|
||||
])
|
||||
def test_without_errors_one_user_one_ds(self, ds_not_found, error_update, allowed_users=None):
|
||||
def test_add_users(self, users_info, datasets_not_found, not_updatable_datasets, allowed_users=''):
|
||||
|
||||
user_name = 'user_name'
|
||||
dataset_id = 'ds1'
|
||||
parse_result = {'users_datasets': [{'user': user_name, 'datasets': [dataset_id]}]}
|
||||
parse_result = {'users_datasets': []}
|
||||
datasets_ids = []
|
||||
|
||||
# Expected allowed users: allowed_users + ',' + user_name
|
||||
# If the user_name is in the list, it should not be included again
|
||||
# If the list is empty, we should not a comma
|
||||
expected_allowed_users = allowed_users if allowed_users else ''
|
||||
if user_name not in expected_allowed_users:
|
||||
if not allowed_users or allowed_users == '':
|
||||
expected_allowed_users += user_name
|
||||
else:
|
||||
expected_allowed_users += ',' + user_name
|
||||
for user in users_info:
|
||||
for dataset_id in users_info[user]:
|
||||
if dataset_id not in datasets_ids:
|
||||
datasets_ids.append(dataset_id)
|
||||
|
||||
dataset = {'id': dataset_id}
|
||||
if allowed_users:
|
||||
dataset['allowed_users'] = allowed_users
|
||||
parse_result['users_datasets'].append({'user': user, 'datasets': users_info[user]})
|
||||
|
||||
package_show, package_update = self.configure_mocks(parse_result, ds_not_found, error_update, dataset)
|
||||
package_show, package_update = self.configure_mocks(parse_result, datasets_not_found, not_updatable_datasets, allowed_users)
|
||||
|
||||
# Call the function
|
||||
result = self.instance.add_user()
|
||||
result = self.instance.add_users()
|
||||
|
||||
# Check the result
|
||||
if not error_update and not ds_not_found:
|
||||
self.assertEquals(None, result)
|
||||
elif error_update:
|
||||
self.assertEquals('{"warns": ["Dataset %s: %s"]}' % (dataset_id, ADD_USERS_ERROR), result)
|
||||
elif ds_not_found:
|
||||
self.assertEquals('{"warns": ["Dataset %s was not found in this instance"]}' % dataset_id, result)
|
||||
# Calculate the list of warns
|
||||
warns = []
|
||||
for user_datasets in parse_result['users_datasets']:
|
||||
for dataset_id in user_datasets['datasets']:
|
||||
if dataset_id in datasets_not_found:
|
||||
warns.append('Dataset %s was not found in this instance' % dataset_id)
|
||||
elif dataset_id in not_updatable_datasets and allowed_users is not None and user_datasets['user'] not in allowed_users:
|
||||
warns.append('Dataset %s: %s' % (dataset_id, ADD_USERS_ERROR))
|
||||
|
||||
# The show function is always called
|
||||
package_show.assert_called_once_with({'ignore_auth': True}, {'id': dataset_id})
|
||||
|
||||
# The update function is called only when the show function does not throw an exception and
|
||||
# when it's needed to add the user (if the user is already in the list we mustn't add it)
|
||||
if not ds_not_found and allowed_users != expected_allowed_users:
|
||||
new_allowed_users = package_update.call_args[0][1]['allowed_users']
|
||||
self.assertEquals(expected_allowed_users, new_allowed_users)
|
||||
if len(warns) > 0:
|
||||
expected_result = json.dumps({'warns': warns})
|
||||
else:
|
||||
self.assertEquals(0, package_update.call_count)
|
||||
expected_result = None
|
||||
|
||||
# Check that the returned result is as expected
|
||||
self.assertEquals(expected_result, result)
|
||||
|
||||
for user_datasets in parse_result['users_datasets']:
|
||||
for dataset_id in user_datasets['datasets']:
|
||||
# The show function is always called
|
||||
package_show.assert_any_call({'ignore_auth': True}, {'id': dataset_id})
|
||||
|
||||
# The update function is called only when the show function does not throw an exception and
|
||||
# when the user is not in the list of allowed users.
|
||||
if dataset_id not in datasets_not_found and allowed_users is not None and user_datasets['user'] not in allowed_users:
|
||||
# Calculate the list of allowed_users
|
||||
expected_allowed_users = allowed_users
|
||||
expected_allowed_users += ',' + user_datasets['user'] if expected_allowed_users != '' else user_datasets['user']
|
||||
|
||||
package_update.assert_any_call({'ignore_auth': True}, {'id': dataset_id, 'allowed_users': expected_allowed_users})
|
||||
|
|
|
@ -204,7 +204,7 @@ class PluginTest(unittest.TestCase):
|
|||
# Test that the connect method has been called
|
||||
m.connect.assert_called_once_with('/dataset_adquired',
|
||||
controller='ckanext.privatedatasets.controller:AdquiredDatasetsController',
|
||||
action='add_user', conditions=dict(method=['POST']))
|
||||
action='add_users', conditions=dict(method=['POST']))
|
||||
|
||||
@parameterized.expand([
|
||||
('create_package_schema'),
|
||||
|
|
Loading…
Reference in New Issue