[#4] Fixes on the auth layer against the new core auth

Thanks @locusf for the original patch
This commit is contained in:
amercader 2012-12-20 16:09:26 +00:00
parent 510e2d3725
commit 19cd80b264
7 changed files with 52 additions and 51 deletions

View File

@ -3,7 +3,6 @@ from lxml import etree
from lxml.etree import XMLSyntaxError
from pylons.i18n import _
from ckan.authz import Authorizer
from ckan import model
from ckan.model.group import Group
@ -33,9 +32,9 @@ class ViewController(BaseController):
def _get_publishers(self):
groups = None
user = model.User.get(c.user)
if c.publisher_auth:
if Authorizer().is_sysadmin(c.user):
if user.sysadmin:
groups = Group.all(group_type='publisher')
elif c.userobj:
groups = c.userobj.get_groups('publisher')

View File

@ -1,6 +1,5 @@
import logging
from sqlalchemy import or_, distinct
from ckan.authz import Authorizer
from ckan.model import User
import datetime
@ -284,26 +283,27 @@ def _get_sources_for_user(context,data_dict):
)
# Sysadmins will get all sources
if not Authorizer().is_sysadmin(user):
# This only applies to a non sysadmin user when using the
# publisher auth profile. When using the default profile,
# normal users will never arrive at this point, but even if they
# do, they will get an empty list.
if user:
user_obj = User.get(user)
if not user_obj.sysadmin:
# This only applies to a non sysadmin user when using the
# publisher auth profile. When using the default profile,
# normal users will never arrive at this point, but even if they
# do, they will get an empty list.
publisher_filters = []
publishers_for_the_user = user_obj.get_groups(u'publisher')
for publisher_id in [g.id for g in publishers_for_the_user]:
publisher_filters.append(HarvestSource.publisher_id==publisher_id)
publisher_filters = []
publishers_for_the_user = user_obj.get_groups(u'publisher')
for publisher_id in [g.id for g in publishers_for_the_user]:
publisher_filters.append(HarvestSource.publisher_id==publisher_id)
if len(publisher_filters):
query = query.filter(or_(*publisher_filters))
else:
# This user does not belong to a publisher yet, no sources for him/her
return []
if len(publisher_filters):
query = query.filter(or_(*publisher_filters))
else:
# This user does not belong to a publisher yet, no sources for him/her
return []
log.debug('User %s with publishers %r has Harvest Sources: %r',
user, publishers_for_the_user, [(hs.id, hs.url) for hs in query])
log.debug('User %s with publishers %r has Harvest Sources: %r',
user, publishers_for_the_user, [(hs.id, hs.url) for hs in query])
sources = query.all()

View File

@ -1,29 +1,31 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
def harvest_source_create(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user = User.get(user)
if not user.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to create harvest sources') % str(user)}
else:
return {'success': True}
def harvest_job_create(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user = User.get(user)
if not user.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to create harvest jobs') % str(user)}
else:
return {'success': True}
def harvest_job_create_all(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user = User.get(user)
if not user.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to create harvest jobs for all sources') % str(user)}
else:
return {'success': True}

View File

@ -1,11 +1,11 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
def harvest_source_delete(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user = User.get(user)
if not user.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to delete harvest sources') % str(user)}
else:
return {'success': True}

View File

@ -1,11 +1,11 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
def harvest_source_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to read this harvest source') % str(user)}
else:
return {'success': True}
@ -14,7 +14,8 @@ def harvest_source_list(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to see the harvest sources') % str(user)}
else:
return {'success': True}
@ -24,7 +25,8 @@ def harvest_job_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to read this harvest job') % str(user)}
else:
return {'success': True}
@ -33,7 +35,8 @@ def harvest_job_list(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to see the harvest jobs') % str(user)}
else:
return {'success': True}
@ -48,7 +51,8 @@ def harvest_object_list(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to see the harvest objects') % str(user)}
else:
return {'success': True}
@ -57,7 +61,8 @@ def harvesters_info_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to see the harvesters information') % str(user)}
else:
return {'success': True}

View File

@ -1,5 +1,4 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
from ckanext.harvest.model import HarvestSource
@ -15,7 +14,7 @@ def harvest_source_create(context,data_dict):
# Sysadmins and the rest of logged users can create sources,
# as long as they belong to a publisher
user_obj = User.get(user)
if not user_obj or not Authorizer().is_sysadmin(user) and len(user_obj.get_groups(u'publisher')) == 0:
if not user_obj or not user_obj.sysadmin and len(user_obj.get_groups(u'publisher')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to create harvest sources') % str(user)}
else:
return {'success': True}
@ -28,11 +27,9 @@ def harvest_job_create(context,data_dict):
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to create harvest jobs')}
if Authorizer().is_sysadmin(user):
return {'success': True}
user_obj = User.get(user)
if user_obj.sysadmin:
return {'success': True}
source = HarvestSource.get(source_id)
if not source:
raise NotFound
@ -45,8 +42,8 @@ def harvest_job_create(context,data_dict):
def harvest_job_create_all(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
user_obj = User.get(user)
if not user_obj.sysadmin:
return {'success': False, 'msg': _('Only sysadmins can create harvest jobs for all sources') % str(user)}
else:
return {'success': True}

View File

@ -1,9 +1,9 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
from ckanext.harvest.logic.auth import get_source_object
def harvest_source_delete(context,data_dict):
model = context['model']
user = context.get('user','')
@ -13,13 +13,11 @@ def harvest_source_delete(context,data_dict):
# Non-logged users cannot delete this source
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to delete harvest sources')}
# Sysadmins can delete the source
if Authorizer().is_sysadmin(user):
return {'success': True}
# Check if the source publisher id exists on the user's groups
user_obj = User.get(user)
# Sysadmins can delete the source
if user_obj.sysadmin:
return {'success': True}
if not user_obj or not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher')]:
return {'success': False, 'msg': _('User %s not authorized to delete harvest source %s') % (str(user),source.id)}
else: