Modify resource_show function to use the package_show function defined within the plugin

This commit is contained in:
Aitor Magán 2014-07-01 13:25:27 +02:00
parent 95cac2b653
commit bd7630a372
2 changed files with 80 additions and 2 deletions

View File

@ -11,7 +11,6 @@ from ckan.common import _, request
########################### AUTH FUNCTIONS ###########################
######################################################################
@tk.auth_allow_anonymous_access
def package_show(context, data_dict):
user = context.get('user')
user_obj = context.get('auth_user_obj')
@ -84,6 +83,34 @@ def package_update(context, data_dict):
return {'success': True}
@tk.auth_allow_anonymous_access
def resource_show(context, data_dict):
# This function is needed since CKAN resource_show function uses the default package_show
# function instead the one defined in the plugin.
# A bug is openend in order to be able to remove this function
# https://github.com/ckan/ckan/issues/1818
model = context['model']
user = context.get('user')
resource = logic_auth.get_resource_object(context, data_dict)
# check authentication against package
query = model.Session.query(model.Package)\
.join(model.ResourceGroup)\
.join(model.Resource)\
.filter(model.ResourceGroup.id == resource.resource_group_id)
pkg = query.first()
if not pkg:
raise tk.ObjectNotFound(_('No package found for this resource, cannot check auth.'))
pkg_dict = {'id': pkg.id}
authorized = package_show(context, pkg_dict).get('success')
if not authorized:
return {'success': False, 'msg': _('User %s not authorized to read resource %s') % (user, resource.id)}
else:
return {'success': True}
######################################################################
############################### CHECKER ##############################
######################################################################
@ -165,7 +192,8 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
def get_auth_functions(self):
return {'package_show': package_show,
'package_update': package_update}
'package_update': package_update,
'resource_show': resource_show}
######################################################################
############################ ICONFIGURER #############################

View File

@ -34,6 +34,9 @@ class PluginTest(unittest.TestCase):
plugin.new_authz = self._new_authz
plugin.tk = self._tk
if hasattr(self, '_package_show'):
plugin.package_show = self._package_show
def test_implementations(self):
self.assertTrue(plugin.p.IDatasetForm.implemented_by(plugin.PrivateDatasets))
self.assertTrue(plugin.p.IAuthFunctions.implemented_by(plugin.PrivateDatasets))
@ -151,10 +154,57 @@ class PluginTest(unittest.TestCase):
if creator_user_id != user_obj_id and owner_org:
plugin.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'update_dataset')
@parameterized.expand([
(True, True),
(True, False),
(False, False),
(False, False)
])
def test_auth_resource_show(self, exist_pkg=True, authorized_pkg=True):
#Recover the exception
plugin.tk.ObjectNotFound = self._tk.ObjectNotFound
# Mock the calls
package = MagicMock()
package.id = '1'
final_query = MagicMock()
final_query.first = MagicMock(return_value=package if exist_pkg else None)
second_join = MagicMock()
second_join.filter = MagicMock(return_value=final_query)
first_join = MagicMock()
first_join.join = MagicMock(return_value=second_join)
query = MagicMock()
query.join = MagicMock(return_value=first_join)
model = MagicMock()
session = MagicMock()
session.query = MagicMock(return_value=query)
model.Session = session
# Create the context
context = {}
context['model'] = model
# Mock the package_show function
self._package_show = plugin.package_show
success = True if authorized_pkg else False
plugin.package_show = MagicMock(return_value={'success': success})
if not exist_pkg:
self.assertRaises(self._tk.ObjectNotFound, plugin.resource_show, context, {})
else:
result = plugin.resource_show(context, {})
self.assertEquals(authorized_pkg, result['success'])
def test_auth_functions(self):
auth_functions = self.privateDatasets.get_auth_functions()
self.assertEquals(auth_functions['package_show'], plugin.package_show)
self.assertEquals(auth_functions['package_update'], plugin.package_update)
self.assertEquals(auth_functions['resource_show'], plugin.resource_show)
@parameterized.expand([
('/dataset', True), # Include ignore_capacity_check