From a86644502350c43cd07fce9d115c2ac971a135cf Mon Sep 17 00:00:00 2001 From: amercader Date: Wed, 9 Jan 2013 17:26:48 +0000 Subject: [PATCH] [#4] Refactor authorization functions The authorization functions have been refactored to take into account both the new organizaton based authorization on CKAN core and the harvest source datasets. Basically at the source level, authorization checks are forwarded to the relevant package auth function (package_create, package_update, etc.) wich will check for organizations membership, sysadmin, etc. Also we only use functions available on the plugins toolkit whenever possible. --- ckanext/harvest/logic/auth/__init__.py | 64 ++++++------ ckanext/harvest/logic/auth/create.py | 79 +++++++++------ ckanext/harvest/logic/auth/delete.py | 34 +++++-- ckanext/harvest/logic/auth/get.py | 129 ++++++++++++++----------- ckanext/harvest/logic/auth/update.py | 64 ++++++++---- 5 files changed, 223 insertions(+), 147 deletions(-) diff --git a/ckanext/harvest/logic/auth/__init__.py b/ckanext/harvest/logic/auth/__init__.py index 2f2b306..d92d1ec 100644 --- a/ckanext/harvest/logic/auth/__init__.py +++ b/ckanext/harvest/logic/auth/__init__.py @@ -1,39 +1,39 @@ -from ckan.logic import NotFound -from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject +from ckan.plugins import toolkit as pt +from ckanext.harvest import model as harvest_model +def user_is_sysadmin(context): + ''' + Checks if the user defined in the context is a sysadmin + + rtype: boolean + ''' + model = context['model'] + user = context['user'] + user_obj = model.User.get(user) + if not user_obj: + raise pt.Objectpt.ObjectNotFound('User {0} not found').format(user) + + return user_obj.sysadmin + +def _get_object(context, data_dict, name, class_name): + ''' + return the named item if in the data_dict, or get it from + model.class_name + ''' + if not name in context: + id = data_dict.get('id', None) + obj = getattr(harvest_model, class_name).get(id) + if not obj: + raise pt.ObjectNotFound + else: + obj = context[name] + return obj def get_source_object(context, data_dict = {}): - if not 'source' in context: - model = context['model'] - id = data_dict.get('id',None) - source = HarvestSource.get(id) - if not source: - raise NotFound - else: - source = context['source'] - - return source + return _get_object(context, data_dict, 'source', 'HarvestSource') def get_job_object(context, data_dict = {}): - if not 'job' in context: - model = context['model'] - id = data_dict.get('id',None) - job = HarvestJob.get(id) - if not job: - raise NotFound - else: - job = context['job'] - - return job + return _get_object(context, data_dict, 'job', 'HarvestJob') def get_obj_object(context, data_dict = {}): - if not 'obj' in context: - model = context['model'] - id = data_dict.get('id',None) - obj = HarvestObject.get(id) - if not obj: - raise NotFound - else: - obj = context['obj'] - - return obj + return _get_object(context, data_dict, 'obj', 'HarvestObject') diff --git a/ckanext/harvest/logic/auth/create.py b/ckanext/harvest/logic/auth/create.py index eed9742..984f489 100644 --- a/ckanext/harvest/logic/auth/create.py +++ b/ckanext/harvest/logic/auth/create.py @@ -1,32 +1,55 @@ -from ckan.lib.base import _ -from ckan.model import User +from ckan.plugins import toolkit as pt +from ckanext.harvest.logic.auth import user_is_sysadmin -def harvest_source_create(context,data_dict): - model = context['model'] + +def harvest_source_create(context, data_dict): + ''' + Authorization check for harvest source creation + + It forwards the checks to package_create, which will check for + organization membership, whether if sysadmin, etc according to the + instance configuration. + ''' user = context.get('user') - user = User.get(user) - if not user.sysadmin: - return {'success': False, 'msg': _('User %s not authorized to create harvest sources') % str(user)} + try: + pt.check_access('package_create', context, data_dict) + return {'success': True} + except pt.Not_Authorized: + return {'success': False, + 'msg': pt._('User {0} not authorized to create harvest sources').format(user)} + + +def harvest_job_create(context, data_dict): + ''' + Authorization check for harvest job creation + + It forwards the checks to package_update, ie the user can only create + new jobs if she is allowed to edit the harvest source dataset. + ''' + model = context['model'] + source_id = data_dict['source_id'] + + pkg = model.Package.get(source_id) + if not pkg: + raise pt.ObjectNotFound(pt._('Harvest source not found')) + + context['package'] = pkg + + try: + pt.check_access('package_update', context, data_dict) + return {'success': True} + except pt.Not_Authorized: + return {'success': False, + 'msg': pt._('User not authorized to create a job for source {0}').format(source_id)} + + +def harvest_job_create_all(context, data_dict): + ''' + Authorization check for creating new jobs for all sources + + Only sysadmins can do it + ''' + if not user_is_sysadmin(context): + return {'success': False, 'msg': pt._('Only sysadmins can create harvest jobs for all sources')} else: return {'success': True} - - -def harvest_job_create(context,data_dict): - model = context['model'] - user = context.get('user') - user = User.get(user) - if not user.sysadmin: - return {'success': False, 'msg': _('User %s not authorized to create harvest jobs') % str(user)} - else: - return {'success': True} - - -def harvest_job_create_all(context,data_dict): - model = context['model'] - user = context.get('user') - user = User.get(user) - if not user.sysadmin: - return {'success': False, 'msg': _('User %s not authorized to create harvest jobs for all sources') % str(user)} - else: - return {'success': True} - diff --git a/ckanext/harvest/logic/auth/delete.py b/ckanext/harvest/logic/auth/delete.py index 03e7355..249fad4 100644 --- a/ckanext/harvest/logic/auth/delete.py +++ b/ckanext/harvest/logic/auth/delete.py @@ -1,13 +1,27 @@ -from ckan.lib.base import _ -from ckan.model import User +from ckan.plugins import toolkit as pt -def harvest_source_delete(context,data_dict): - model = context['model'] + +def harvest_source_update(context, data_dict): + ''' + Authorization check for harvest source deletion + + It forwards the checks to package_delete, which will check for + organization membership, whether if sysadmin, etc according to the + instance configuration. + ''' + model = context.get('model') user = context.get('user') - user = User.get(user) - if not user.sysadmin: - return {'success': False, 'msg': _('User %s not authorized to delete harvest sources') % str(user)} - else: + source_id = data_dict['id'] + + pkg = model.Package.get(source_id) + if not pkg: + raise pt.ObjectNotFound(pt._('Harvest source not found')) + + context['package'] = pkg + + try: + pt.check_access('package_delete', context, data_dict) return {'success': True} - - + except pt.Not_Authorized: + return {'success': False, + 'msg': pt._('User {0} not authorized to delete harvest source {1}').format(user, source_id)} diff --git a/ckanext/harvest/logic/auth/get.py b/ckanext/harvest/logic/auth/get.py index 0e396ac..f2119a6 100644 --- a/ckanext/harvest/logic/auth/get.py +++ b/ckanext/harvest/logic/auth/get.py @@ -1,69 +1,86 @@ -from ckan.lib.base import _ +from ckan.plugins import toolkit as pt +from ckanext.harvest.logic.auth import get_job_object -def harvest_source_show(context,data_dict): - model = context['model'] + +def harvest_source_show(context, data_dict): + ''' + Authorization check for getting the details of a harvest source + + It forwards the checks to package_show, which will check for + organization membership, whether if sysadmin, etc according to the + instance configuration. + ''' + model = context.get('model') user = context.get('user') + source_id = data_dict['id'] - user_obj = model.User.get(user) - if not user_obj or not user_obj.sysadmin: - return {'success': False, 'msg': _('User %s not authorized to read this harvest source') % str(user)} - else: - return {'success': True} - -def harvest_source_list(context,data_dict): - model = context['model'] - user = context.get('user') - - user_obj = model.User.get(user) - if not user_obj or not user_obj.sysadmin: - return {'success': False, 'msg': _('User %s not authorized to see the harvest sources') % str(user)} - else: + pkg = model.Package.get(source_id) + if not pkg: + raise pt.ObjectNotFound(pt._('Harvest source not found')) + + context['package'] = pkg + + try: + pt.check_access('package_show', context, data_dict) return {'success': True} + except pt.Not_Authorized: + return {'success': False, + 'msg': pt._('User {0} not authorized to read harvest source {1}').format(user, source_id)} -def harvest_job_show(context,data_dict): - model = context['model'] - user = context.get('user') - - user_obj = model.User.get(user) - if not user_obj or not user_obj.sysadmin: - return {'success': False, 'msg': _('User %s not authorized to read this harvest job') % str(user)} - else: - return {'success': True} - -def harvest_job_list(context,data_dict): - model = context['model'] - user = context.get('user') - - user_obj = model.User.get(user) - if not user_obj or not user_obj.sysadmin: - return {'success': False, 'msg': _('User %s not authorized to see the harvest jobs') % str(user)} - else: - return {'success': True} - -def harvest_object_show(context,data_dict): - model = context['model'] - user = context.get('user') +def harvest_source_list(context, data_dict): + ''' + Authorization check for getting a list of harveste sources + Everybody can do it + ''' return {'success': True} -def harvest_object_list(context,data_dict): - model = context['model'] - user = context.get('user') - user_obj = model.User.get(user) - if not user_obj or not user_obj.sysadmin: - return {'success': False, 'msg': _('User %s not authorized to see the harvest objects') % str(user)} - else: - return {'success': True} +def harvest_job_show(context, data_dict): + ''' + Authorization check for getting the details of a harvest job -def harvesters_info_show(context,data_dict): - model = context['model'] - user = context.get('user') + It forwards the checks to harvest_source_show, ie if the user can get + the details for the parent source, she can get the details for the job + ''' + job = get_job_object(context, data_dict) - user_obj = model.User.get(user) - if not user_obj or not user_obj.sysadmin: - return {'success': False, 'msg': _('User %s not authorized to see the harvesters information') % str(user)} - else: - return {'success': True} + return harvest_source_show(context, {'id': job.source.id}) + +def harvest_job_list(context, data_dict): + ''' + Authorization check for getting a list of jobs for a source + + It forwards the checks to harvest_source_show, ie if the user can get + the details for the parent source, she can get the list of jobs + ''' + source_id = data_dict['source_id'] + return harvest_source_show(context, {'id': source_id}) + + +def harvest_object_show(context, data_dict): + ''' + Authorization check for getting the contents of a harvest object + + Everybody can do it + ''' + return {'success': True} + + +def harvest_object_list(context, data_dict): + ''' + TODO: remove + ''' + return {'success': True} + + +def harvesters_info_show(context, data_dict): + ''' + Authorization check for getting information about the available + harvesters + + Everybody can do it + ''' + return {'success': True} diff --git a/ckanext/harvest/logic/auth/update.py b/ckanext/harvest/logic/auth/update.py index efe84c7..162299c 100644 --- a/ckanext/harvest/logic/auth/update.py +++ b/ckanext/harvest/logic/auth/update.py @@ -1,30 +1,52 @@ -from ckan.lib.base import _ -from ckan.authz import Authorizer +from ckan.plugins import toolkit as pt +from ckanext.harvest.logic.auth import user_is_sysadmin -def harvest_source_update(context,data_dict): - model = context['model'] + +def harvest_source_update(context, data_dict): + ''' + Authorization check for harvest source update + + It forwards the checks to package_update, which will check for + organization membership, whether if sysadmin, etc according to the + instance configuration. + ''' + model = context.get('model') user = context.get('user') + source_id = data_dict['id'] - if not Authorizer().is_sysadmin(user): - return {'success': False, 'msg': _('User %s not authorized to update harvest sources') % str(user)} + pkg = model.Package.get(source_id) + if not pkg: + raise pt.ObjectNotFound(pt._('Harvest source not found')) + + context['package'] = pkg + + try: + pt.check_access('package_update', context, data_dict) + return {'success': True} + except pt.Not_Authorized: + return {'success': False, + 'msg': pt._('User {0} not authorized to update harvest source {1}').format(user, source_id)} + + +def harvest_objects_import(context, data_dict): + ''' + Authorization check reimporting all harvest objects + + Only sysadmins can do it + ''' + if not user_is_sysadmin(context): + return {'success': False, 'msg': pt._('Only sysadmins can reimport all harvest objects')} else: return {'success': True} -def harvest_objects_import(context,data_dict): - model = context['model'] - user = context.get('user') - if not Authorizer().is_sysadmin(user): - return {'success': False, 'msg': _('User %s not authorized to reimport harvest objects') % str(user)} +def harvest_jobs_run(context, data_dict): + ''' + Authorization check for running the pending harvest jobs + + Only sysadmins can do it + ''' + if not user_is_sysadmin(context): + return {'success': False, 'msg': pt._('Only sysadmins can run the pending harvest jobs')} else: return {'success': True} - -def harvest_jobs_run(context,data_dict): - model = context['model'] - user = context.get('user') - - if not Authorizer().is_sysadmin(user): - return {'success': False, 'msg': _('User %s not authorized to run the pending harvest jobs') % str(user)} - else: - return {'success': True} -