[#4] Refactor authorization functions

The authorization functions have been refactored to take into account
both the new organizaton based authorization on CKAN core and the
harvest source datasets.

Basically at the source level, authorization checks are forwarded to the
relevant package auth function (package_create, package_update, etc.)
wich will check for organizations membership, sysadmin, etc.

Also we only use functions available on the plugins toolkit whenever
possible.
This commit is contained in:
amercader 2013-01-09 17:26:48 +00:00
parent 1342463f8a
commit a866445023
5 changed files with 223 additions and 147 deletions

View File

@ -1,39 +1,39 @@
from ckan.logic import NotFound
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckan.plugins import toolkit as pt
from ckanext.harvest import model as harvest_model
def user_is_sysadmin(context):
'''
Checks if the user defined in the context is a sysadmin
rtype: boolean
'''
model = context['model']
user = context['user']
user_obj = model.User.get(user)
if not user_obj:
raise pt.Objectpt.ObjectNotFound('User {0} not found').format(user)
return user_obj.sysadmin
def _get_object(context, data_dict, name, class_name):
'''
return the named item if in the data_dict, or get it from
model.class_name
'''
if not name in context:
id = data_dict.get('id', None)
obj = getattr(harvest_model, class_name).get(id)
if not obj:
raise pt.ObjectNotFound
else:
obj = context[name]
return obj
def get_source_object(context, data_dict = {}):
if not 'source' in context:
model = context['model']
id = data_dict.get('id',None)
source = HarvestSource.get(id)
if not source:
raise NotFound
else:
source = context['source']
return source
return _get_object(context, data_dict, 'source', 'HarvestSource')
def get_job_object(context, data_dict = {}):
if not 'job' in context:
model = context['model']
id = data_dict.get('id',None)
job = HarvestJob.get(id)
if not job:
raise NotFound
else:
job = context['job']
return job
return _get_object(context, data_dict, 'job', 'HarvestJob')
def get_obj_object(context, data_dict = {}):
if not 'obj' in context:
model = context['model']
id = data_dict.get('id',None)
obj = HarvestObject.get(id)
if not obj:
raise NotFound
else:
obj = context['obj']
return obj
return _get_object(context, data_dict, 'obj', 'HarvestObject')

View File

@ -1,32 +1,55 @@
from ckan.lib.base import _
from ckan.model import User
from ckan.plugins import toolkit as pt
from ckanext.harvest.logic.auth import user_is_sysadmin
def harvest_source_create(context,data_dict):
model = context['model']
def harvest_source_create(context, data_dict):
'''
Authorization check for harvest source creation
It forwards the checks to package_create, which will check for
organization membership, whether if sysadmin, etc according to the
instance configuration.
'''
user = context.get('user')
user = User.get(user)
if not user.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to create harvest sources') % str(user)}
try:
pt.check_access('package_create', context, data_dict)
return {'success': True}
except pt.Not_Authorized:
return {'success': False,
'msg': pt._('User {0} not authorized to create harvest sources').format(user)}
def harvest_job_create(context, data_dict):
'''
Authorization check for harvest job creation
It forwards the checks to package_update, ie the user can only create
new jobs if she is allowed to edit the harvest source dataset.
'''
model = context['model']
source_id = data_dict['source_id']
pkg = model.Package.get(source_id)
if not pkg:
raise pt.ObjectNotFound(pt._('Harvest source not found'))
context['package'] = pkg
try:
pt.check_access('package_update', context, data_dict)
return {'success': True}
except pt.Not_Authorized:
return {'success': False,
'msg': pt._('User not authorized to create a job for source {0}').format(source_id)}
def harvest_job_create_all(context, data_dict):
'''
Authorization check for creating new jobs for all sources
Only sysadmins can do it
'''
if not user_is_sysadmin(context):
return {'success': False, 'msg': pt._('Only sysadmins can create harvest jobs for all sources')}
else:
return {'success': True}
def harvest_job_create(context,data_dict):
model = context['model']
user = context.get('user')
user = User.get(user)
if not user.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to create harvest jobs') % str(user)}
else:
return {'success': True}
def harvest_job_create_all(context,data_dict):
model = context['model']
user = context.get('user')
user = User.get(user)
if not user.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to create harvest jobs for all sources') % str(user)}
else:
return {'success': True}

View File

@ -1,13 +1,27 @@
from ckan.lib.base import _
from ckan.model import User
from ckan.plugins import toolkit as pt
def harvest_source_delete(context,data_dict):
model = context['model']
def harvest_source_update(context, data_dict):
'''
Authorization check for harvest source deletion
It forwards the checks to package_delete, which will check for
organization membership, whether if sysadmin, etc according to the
instance configuration.
'''
model = context.get('model')
user = context.get('user')
user = User.get(user)
if not user.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to delete harvest sources') % str(user)}
else:
source_id = data_dict['id']
pkg = model.Package.get(source_id)
if not pkg:
raise pt.ObjectNotFound(pt._('Harvest source not found'))
context['package'] = pkg
try:
pt.check_access('package_delete', context, data_dict)
return {'success': True}
except pt.Not_Authorized:
return {'success': False,
'msg': pt._('User {0} not authorized to delete harvest source {1}').format(user, source_id)}

View File

@ -1,69 +1,86 @@
from ckan.lib.base import _
from ckan.plugins import toolkit as pt
from ckanext.harvest.logic.auth import get_job_object
def harvest_source_show(context,data_dict):
model = context['model']
def harvest_source_show(context, data_dict):
'''
Authorization check for getting the details of a harvest source
It forwards the checks to package_show, which will check for
organization membership, whether if sysadmin, etc according to the
instance configuration.
'''
model = context.get('model')
user = context.get('user')
source_id = data_dict['id']
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to read this harvest source') % str(user)}
else:
pkg = model.Package.get(source_id)
if not pkg:
raise pt.ObjectNotFound(pt._('Harvest source not found'))
context['package'] = pkg
try:
pt.check_access('package_show', context, data_dict)
return {'success': True}
except pt.Not_Authorized:
return {'success': False,
'msg': pt._('User {0} not authorized to read harvest source {1}').format(user, source_id)}
def harvest_source_list(context,data_dict):
model = context['model']
user = context.get('user')
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to see the harvest sources') % str(user)}
else:
def harvest_source_list(context, data_dict):
'''
Authorization check for getting a list of harveste sources
Everybody can do it
'''
return {'success': True}
def harvest_job_show(context,data_dict):
model = context['model']
user = context.get('user')
def harvest_job_show(context, data_dict):
'''
Authorization check for getting the details of a harvest job
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to read this harvest job') % str(user)}
else:
It forwards the checks to harvest_source_show, ie if the user can get
the details for the parent source, she can get the details for the job
'''
job = get_job_object(context, data_dict)
return harvest_source_show(context, {'id': job.source.id})
def harvest_job_list(context, data_dict):
'''
Authorization check for getting a list of jobs for a source
It forwards the checks to harvest_source_show, ie if the user can get
the details for the parent source, she can get the list of jobs
'''
source_id = data_dict['source_id']
return harvest_source_show(context, {'id': source_id})
def harvest_object_show(context, data_dict):
'''
Authorization check for getting the contents of a harvest object
Everybody can do it
'''
return {'success': True}
def harvest_job_list(context,data_dict):
model = context['model']
user = context.get('user')
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to see the harvest jobs') % str(user)}
else:
def harvest_object_list(context, data_dict):
'''
TODO: remove
'''
return {'success': True}
def harvest_object_show(context,data_dict):
model = context['model']
user = context.get('user')
def harvesters_info_show(context, data_dict):
'''
Authorization check for getting information about the available
harvesters
Everybody can do it
'''
return {'success': True}
def harvest_object_list(context,data_dict):
model = context['model']
user = context.get('user')
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to see the harvest objects') % str(user)}
else:
return {'success': True}
def harvesters_info_show(context,data_dict):
model = context['model']
user = context.get('user')
user_obj = model.User.get(user)
if not user_obj or not user_obj.sysadmin:
return {'success': False, 'msg': _('User %s not authorized to see the harvesters information') % str(user)}
else:
return {'success': True}

View File

@ -1,30 +1,52 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.plugins import toolkit as pt
from ckanext.harvest.logic.auth import user_is_sysadmin
def harvest_source_update(context,data_dict):
model = context['model']
def harvest_source_update(context, data_dict):
'''
Authorization check for harvest source update
It forwards the checks to package_update, which will check for
organization membership, whether if sysadmin, etc according to the
instance configuration.
'''
model = context.get('model')
user = context.get('user')
source_id = data_dict['id']
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to update harvest sources') % str(user)}
pkg = model.Package.get(source_id)
if not pkg:
raise pt.ObjectNotFound(pt._('Harvest source not found'))
context['package'] = pkg
try:
pt.check_access('package_update', context, data_dict)
return {'success': True}
except pt.Not_Authorized:
return {'success': False,
'msg': pt._('User {0} not authorized to update harvest source {1}').format(user, source_id)}
def harvest_objects_import(context, data_dict):
'''
Authorization check reimporting all harvest objects
Only sysadmins can do it
'''
if not user_is_sysadmin(context):
return {'success': False, 'msg': pt._('Only sysadmins can reimport all harvest objects')}
else:
return {'success': True}
def harvest_objects_import(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to reimport harvest objects') % str(user)}
def harvest_jobs_run(context, data_dict):
'''
Authorization check for running the pending harvest jobs
Only sysadmins can do it
'''
if not user_is_sysadmin(context):
return {'success': False, 'msg': pt._('Only sysadmins can run the pending harvest jobs')}
else:
return {'success': True}
def harvest_jobs_run(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to run the pending harvest jobs') % str(user)}
else:
return {'success': True}