Merge pull request #4 from conwetlab/no_extras
Store users without using CKAN extras
This commit is contained in:
commit
c775a7937f
10
README.md
10
README.md
|
@ -21,14 +21,12 @@ Since each service can send notifications in a different way, the extension allo
|
|||
|
||||
If you want to create your own parser, you have to:
|
||||
|
||||
1. Create a class with a method called `parse_notification`
|
||||
2. Import `request` from `ckan.common` in order to be able to read the notification: `from ckan.common import request`.
|
||||
3. Parse the notification as you like. You can read the body by accesing `request.body`.
|
||||
4. Return a dictionary with the following structure. The `errors` field contains the list of errors arised when the notification was parsed while the `users_datasets` is the lists of datasets available for each user (each element of this list is a dictionary with two fields: `user` and `datasets`). If the `error` field is present and it is **not** empty, the `users_datasets` field will **not** be processed.
|
||||
1. Create a class with a method called `parse_notification`. This method will recieve one argument that will include the notification body.
|
||||
2. Parse the notification as you like. You can raise a CKAN's default exception (`ValidationError`, `ObjectNotFound`, `NotAuthorized`, `ValidationError`, `SearchError`, `SearchQueryError` or `SearchIndexError`) if you find an error parsing the notification.
|
||||
3. Return a dictionary with the structure attached below. The `users_datasets` is the lists of datasets available for each user (each element of this list is a dictionary with two fields: `user` and `datasets`).
|
||||
|
||||
```
|
||||
{'errors': ['...', '...', '...']
|
||||
'users_datasets': [{'user': 'user_name', 'datasets': ['ds1', 'ds2', ...]},
|
||||
{'users_datasets': [{'user': 'user_name', 'datasets': ['ds1', 'ds2', ...]},
|
||||
{'user': 'user_name2', 'datasets': ['ds1', 'ds4', ...] }]}
|
||||
```
|
||||
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
import ckan.plugins as plugins
|
||||
import ckanext.privatedatasets.constants as constants
|
||||
import importlib
|
||||
import logging
|
||||
import pylons.config as config
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
PARSER_CONFIG_PROP = 'ckan.privatedatasets.parser'
|
||||
|
||||
|
||||
def package_adquired(context, request_data):
|
||||
|
||||
log.info('Notification Request received')
|
||||
|
||||
# Check access
|
||||
plugins.toolkit.check_access(constants.PACKAGE_ADQUIRED, context, request_data)
|
||||
|
||||
# Get the parser from the configuration
|
||||
class_path = config.get(PARSER_CONFIG_PROP, '')
|
||||
|
||||
if class_path != '':
|
||||
try:
|
||||
class_package = class_path.split(':')[0]
|
||||
class_name = class_path.split(':')[1]
|
||||
parser_cls = getattr(importlib.import_module(class_package), class_name)
|
||||
parser = parser_cls()
|
||||
except Exception as e:
|
||||
raise plugins.toolkit.ValidationError({'message': '%s: %s' % (type(e).__name__, str(e))})
|
||||
else:
|
||||
raise plugins.toolkit.ValidationError({'message': '%s not configured' % PARSER_CONFIG_PROP})
|
||||
|
||||
# Parse the result using the parser set in the configuration
|
||||
# Expected result: {'errors': ["...", "...", ...]
|
||||
# 'users_datasets': [{'user': 'user_name', 'datasets': ['ds1', 'ds2', ...]}, ...]}
|
||||
result = parser.parse_notification(request_data)
|
||||
|
||||
warns = []
|
||||
|
||||
# Introduce the users into the datasets
|
||||
for user_info in result['users_datasets']:
|
||||
for dataset_id in user_info['datasets']:
|
||||
try:
|
||||
|
||||
dataset = plugins.toolkit.get_action('package_show')({'ignore_auth': True, constants.CONTEXT_CALLBACK: True}, {'id': dataset_id})
|
||||
|
||||
# Create the array if it does not exist
|
||||
if constants.ALLOWED_USERS not in dataset or dataset[constants.ALLOWED_USERS] is None:
|
||||
dataset[constants.ALLOWED_USERS] = []
|
||||
|
||||
# Add the user only if it is not in the list
|
||||
if user_info['user'] not in dataset[constants.ALLOWED_USERS]:
|
||||
dataset[constants.ALLOWED_USERS].append(user_info['user'])
|
||||
plugins.toolkit.get_action('package_update')({'ignore_auth': True}, dataset)
|
||||
else:
|
||||
log.warn('The user %s is already allowed to access the %s dataset' % (user_info['user'], dataset_id))
|
||||
|
||||
except plugins.toolkit.ObjectNotFound:
|
||||
# If a dataset does not exist in the instance, an error message will be returned to the user.
|
||||
# However the process won't stop and the process will continue with the remaining datasets.
|
||||
log.warn('Dataset %s was not found in this instance' % dataset_id)
|
||||
warns.append('Dataset %s was not found in this instance' % dataset_id)
|
||||
except plugins.toolkit.ValidationError as e:
|
||||
# Some datasets does not allow to introduce the list of allowed users since this property is
|
||||
# only valid for private datasets outside an organization. In this case, a wanr will return
|
||||
# but the process will continue
|
||||
warns.append('%s(%s): %s' % (dataset_id, constants.ALLOWED_USERS, e.error_dict[constants.ALLOWED_USERS][0]))
|
||||
|
||||
# Return warnings that inform about non-existing datasets
|
||||
if len(warns) > 0:
|
||||
return {'warns': warns}
|
|
@ -0,0 +1,116 @@
|
|||
import ckan.lib.helpers as helpers
|
||||
import ckan.logic.auth as logic_auth
|
||||
import ckan.plugins.toolkit as tk
|
||||
import ckan.new_authz as new_authz
|
||||
import db
|
||||
|
||||
from ckan.common import _, request
|
||||
|
||||
|
||||
@tk.auth_allow_anonymous_access
|
||||
def package_show(context, data_dict):
|
||||
user = context.get('user')
|
||||
user_obj = context.get('auth_user_obj')
|
||||
package = logic_auth.get_package_object(context, data_dict)
|
||||
|
||||
# datasets can be read by its creator
|
||||
if package and user_obj and package.creator_user_id == user_obj.id:
|
||||
return {'success': True}
|
||||
|
||||
# Not active packages can only be seen by its owners
|
||||
if package.state == 'active':
|
||||
# anyone can see a public package
|
||||
if not package.private:
|
||||
return {'success': True}
|
||||
|
||||
# if the user has rights to read in the organization or in the group
|
||||
if package.owner_org:
|
||||
authorized = new_authz.has_user_permission_for_group_or_org(
|
||||
package.owner_org, user, 'read')
|
||||
else:
|
||||
authorized = False
|
||||
|
||||
# if the user is not authorized yet, we should check if the
|
||||
# user is in the allowed_users object
|
||||
if not authorized:
|
||||
# Init the model
|
||||
db.init_db(context['model'])
|
||||
|
||||
# Branch not executed if the database return an empty list
|
||||
if db.AllowedUser.get(package_id=package.id, user_name=user):
|
||||
authorized = True
|
||||
|
||||
if not authorized:
|
||||
# Show a flash message with the URL to adquire the dataset
|
||||
# This message only can be shown when the user tries to access the dataset via its URL (/dataset/...)
|
||||
# The message cannot be displayed in other pages that uses the package_show function such as
|
||||
# the user profile page
|
||||
|
||||
if hasattr(package, 'extras') and 'adquire_url' in package.extras and request.path.startswith('/dataset/')\
|
||||
and package.extras['adquire_url'] != '':
|
||||
helpers.flash_notice(_('This private dataset can be adquired. To do so, please click ' +
|
||||
'<a target="_blank" href="%s">here</a>') % package.extras['adquire_url'],
|
||||
allow_html=True)
|
||||
|
||||
return {'success': False, 'msg': _('User %s not authorized to read package %s') % (user, package.id)}
|
||||
else:
|
||||
return {'success': True}
|
||||
else:
|
||||
return {'success': False, 'msg': _('User %s not authorized to read package %s') % (user, package.id)}
|
||||
|
||||
|
||||
def package_update(context, data_dict):
|
||||
user = context.get('user')
|
||||
user_obj = context.get('auth_user_obj')
|
||||
package = logic_auth.get_package_object(context, data_dict)
|
||||
|
||||
# Only the package creator can update it
|
||||
if package and user_obj and package.creator_user_id == user_obj.id:
|
||||
return {'success': True}
|
||||
|
||||
# if the user has rights to update a dataset in the organization or in the group
|
||||
if package and package.owner_org:
|
||||
authorized = new_authz.has_user_permission_for_group_or_org(
|
||||
package.owner_org, user, 'update_dataset')
|
||||
else:
|
||||
authorized = False
|
||||
|
||||
if not authorized:
|
||||
return {'success': False, 'msg': _('User %s is not authorized to edit package %s') % (user, package.id)}
|
||||
else:
|
||||
return {'success': True}
|
||||
|
||||
|
||||
@tk.auth_allow_anonymous_access
|
||||
def resource_show(context, data_dict):
|
||||
# This function is needed since CKAN resource_show function uses the default package_show
|
||||
# function instead the one defined in the plugin.
|
||||
# A bug is openend in order to be able to remove this function
|
||||
# https://github.com/ckan/ckan/issues/1818
|
||||
# It's fixed now, so this function can be deleted when the new version is released.
|
||||
_model = context['model']
|
||||
user = context.get('user')
|
||||
resource = logic_auth.get_resource_object(context, data_dict)
|
||||
|
||||
# check authentication against package
|
||||
query = _model.Session.query(_model.Package)\
|
||||
.join(_model.ResourceGroup)\
|
||||
.join(_model.Resource)\
|
||||
.filter(_model.ResourceGroup.id == resource.resource_group_id)
|
||||
pkg = query.first()
|
||||
if not pkg:
|
||||
raise tk.ObjectNotFound(_('No package found for this resource, cannot check auth.'))
|
||||
|
||||
pkg_dict = {'id': pkg.id}
|
||||
authorized = package_show(context, pkg_dict).get('success')
|
||||
|
||||
if not authorized:
|
||||
return {'success': False, 'msg': _('User %s not authorized to read resource %s') % (user, resource.id)}
|
||||
else:
|
||||
return {'success': True}
|
||||
|
||||
|
||||
@tk.auth_allow_anonymous_access
|
||||
def package_adquired(context, data_dict):
|
||||
# TODO: Improve security
|
||||
return {'success': True}
|
|
@ -0,0 +1,6 @@
|
|||
ALLOWED_USERS = 'allowed_users'
|
||||
ALLOWED_USERS_STR = 'allowed_users_str'
|
||||
SEARCHABLE = 'searchable'
|
||||
ADQUIRE_URL = 'adquire_url'
|
||||
CONTEXT_CALLBACK = 'updating_via_cb'
|
||||
PACKAGE_ADQUIRED = 'package_adquired'
|
|
@ -1,91 +0,0 @@
|
|||
import ckan.lib.base as base
|
||||
import ckan.lib.helpers as helpers
|
||||
import ckan.plugins as plugins
|
||||
import importlib
|
||||
import logging
|
||||
import pylons.config as config
|
||||
|
||||
from ckan.common import response
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
PARSER_CONFIG_PROP = 'ckan.privatedatasets.parser'
|
||||
|
||||
|
||||
######################################################################
|
||||
############################ API CONTROLLER ##########################
|
||||
######################################################################
|
||||
|
||||
class AdquiredDatasetsControllerAPI(base.BaseController):
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
# avoid status_code_redirect intercepting error responses
|
||||
environ['pylons.status_code_redirect'] = True
|
||||
return base.BaseController.__call__(self, environ, start_response)
|
||||
|
||||
def add_users(self):
|
||||
|
||||
log.info('Notification Request received')
|
||||
|
||||
# Get the parser from the configuration
|
||||
class_path = config.get(PARSER_CONFIG_PROP, '')
|
||||
|
||||
if class_path != '':
|
||||
try:
|
||||
class_package = class_path.split(':')[0]
|
||||
class_name = class_path.split(':')[1]
|
||||
parser = getattr(importlib.import_module(class_package), class_name)
|
||||
# Parse the result using the parser set in the configuration
|
||||
result = parser().parse_notification()
|
||||
except Exception as e:
|
||||
result = {'errors': [type(e).__name__ + ': ' + str(e)]}
|
||||
else:
|
||||
result = {'errors': ['%s not configured' % PARSER_CONFIG_PROP]}
|
||||
|
||||
# Introduce the users into the datasets
|
||||
# Expected result: {'errors': ["...", "...", ...]
|
||||
# 'users_datasets': [{'user': 'user_name', 'datasets': ['ds1', 'ds2', ...]}, ...]}
|
||||
warns = []
|
||||
|
||||
if 'errors' in result and len(result['errors']) > 0:
|
||||
log.warn('Errors arised parsing the request: ' + str(result['errors']))
|
||||
response.status_int = 400
|
||||
return helpers.json.dumps({'errors': result['errors']})
|
||||
elif 'users_datasets' in result:
|
||||
for user_info in result['users_datasets']:
|
||||
for dataset_id in user_info['datasets']:
|
||||
try:
|
||||
# Get dataset data
|
||||
dataset = plugins.toolkit.get_action('package_show')({'ignore_auth': True}, {'id': dataset_id})
|
||||
|
||||
# Generate default set of users if it does not exist
|
||||
if 'allowed_users' not in dataset or dataset['allowed_users'] is None:
|
||||
dataset['allowed_users'] = ''
|
||||
|
||||
# Only new users will be inserted
|
||||
allowed_users = dataset['allowed_users'].split(',')
|
||||
if user_info['user'] not in allowed_users:
|
||||
|
||||
# Comma is only introduced when there are more than one user in the list of allowed users
|
||||
separator = '' if dataset['allowed_users'] == '' else ','
|
||||
dataset['allowed_users'] += separator + user_info['user']
|
||||
|
||||
# Update dataset data
|
||||
plugins.toolkit.get_action('package_update')({'ignore_auth': True}, dataset)
|
||||
else:
|
||||
log.warn('The user %s is already allowed to access the %s dataset' % (user_info['user'], dataset_id))
|
||||
|
||||
except plugins.toolkit.ObjectNotFound:
|
||||
# If a dataset does not exist in the instance, an error message will be returned to the user.
|
||||
# However the process won't stop and the process will continue with the remaining datasets.
|
||||
log.warn('Dataset %s was not found in this instance' % dataset_id)
|
||||
warns.append('Dataset %s was not found in this instance' % dataset_id)
|
||||
except plugins.toolkit.ValidationError as e:
|
||||
# Some datasets does not allow to introduce the list of allowed users since this property is
|
||||
# only valid for private datasets outside an organization. In this case, a wanr will return
|
||||
# but the process will continue
|
||||
warns.append('Dataset %s: %s' % (dataset_id, e.error_dict['allowed_users'][0]))
|
||||
|
||||
# Return warnings that inform about non-existing datasets
|
||||
if len(warns) > 0:
|
||||
return helpers.json.dumps({'warns': warns})
|
|
@ -1,6 +1,7 @@
|
|||
import ckan.lib.base as base
|
||||
import ckan.model as model
|
||||
import ckan.plugins as plugins
|
||||
import ckanext.privatedatasets.db as db
|
||||
import logging
|
||||
|
||||
from ckan.common import _
|
||||
|
@ -12,6 +13,8 @@ class AdquiredDatasetsControllerUI(base.BaseController):
|
|||
|
||||
def user_adquired_datasets(self):
|
||||
|
||||
db.init_db(model)
|
||||
|
||||
c = plugins.toolkit.c
|
||||
context = {
|
||||
'model': model,
|
||||
|
@ -29,13 +32,7 @@ class AdquiredDatasetsControllerUI(base.BaseController):
|
|||
plugins.toolkit.abort(401, _('Not authorized to see this page'))
|
||||
|
||||
# Get the datasets adquired by the user
|
||||
query = model.Session.query(model.PackageExtra).filter(
|
||||
# Select only the allowed_users key
|
||||
'package_extra.key=\'%s\' AND package_extra.value!=\'\' ' % 'allowed_users' +
|
||||
# Select only when the state is 'active'
|
||||
'AND package_extra.state=\'%s\' ' % 'active' +
|
||||
# The user name should be contained in the list
|
||||
'AND regexp_split_to_array(package_extra.value,\',\') @> ARRAY[\'%s\']' % context['user'])
|
||||
query = db.AllowedUser.get(user_name=context['user'])
|
||||
|
||||
# Get the datasets
|
||||
for dataset in query:
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
import constants
|
||||
import db
|
||||
|
||||
from ckan.plugins import toolkit
|
||||
from ckan.common import _
|
||||
from itertools import count
|
||||
|
||||
|
||||
def private_datasets_metadata_checker(key, data, errors, context):
|
||||
|
||||
dataset_id = data.get(('id',))
|
||||
private_val = data.get(('private',))
|
||||
|
||||
# Avoid missing value
|
||||
if not isinstance(private_val, basestring) and not isinstance(private_val, bool):
|
||||
private_val = None
|
||||
|
||||
# If the private field is not included in the data dict, we must check the current value
|
||||
if private_val is None and dataset_id:
|
||||
dataset_dict = toolkit.get_action('package_show')({'ignore_auth': True}, {'id': dataset_id})
|
||||
private_val = dataset_dict.get('private')
|
||||
|
||||
private = private_val is True if isinstance(private_val, bool) else private_val == "True"
|
||||
metadata_value = data[key]
|
||||
|
||||
# If allowed users are included and the dataset is not private outside and organization, an error will be raised.
|
||||
if metadata_value and not private:
|
||||
errors[key].append(_('This field is only valid when you create a private dataset'))
|
||||
|
||||
|
||||
def allowed_users_convert(key, data, errors, context):
|
||||
|
||||
# By default, all the fileds are in the data dictionary even if they contains nothing. In this case,
|
||||
# the value is 'ckan.lib.navl.dictization_functions.Missing' and for this reason the type is checked
|
||||
|
||||
# Get the allowed user list
|
||||
if (constants.ALLOWED_USERS,) in data and isinstance(data[(constants.ALLOWED_USERS,)], list):
|
||||
allowed_users = data[(constants.ALLOWED_USERS,)]
|
||||
elif (constants.ALLOWED_USERS_STR,) in data and isinstance(data[(constants.ALLOWED_USERS_STR,)], basestring):
|
||||
allowed_users_str = data[(constants.ALLOWED_USERS_STR,)].strip()
|
||||
allowed_users = [allowed_user for allowed_user in allowed_users_str.split(',') if allowed_user.strip() != '']
|
||||
else:
|
||||
allowed_users = None
|
||||
|
||||
if allowed_users is not None:
|
||||
current_index = max([int(k[1]) for k in data.keys() if len(k) == 2 and k[0] == key[0]] + [-1])
|
||||
|
||||
if len(allowed_users) == 0:
|
||||
data[(constants.ALLOWED_USERS,)] = []
|
||||
else:
|
||||
for num, allowed_user in zip(count(current_index + 1), allowed_users):
|
||||
allowed_user = allowed_user.strip()
|
||||
toolkit.get_validator('name_validator')(allowed_user, context) # User name should be validated
|
||||
data[(key[0], num)] = allowed_user
|
||||
|
||||
|
||||
def get_allowed_users(key, data, errors, context):
|
||||
pkg_id = data[('id',)]
|
||||
|
||||
db.init_db(context['model'])
|
||||
|
||||
users = db.AllowedUser.get(package_id=pkg_id)
|
||||
counter = 0
|
||||
|
||||
for user in users:
|
||||
data[(key[0], counter)] = user.user_name
|
||||
counter += 1
|
|
@ -0,0 +1,29 @@
|
|||
import sqlalchemy as sa
|
||||
|
||||
AllowedUser = None
|
||||
|
||||
|
||||
def init_db(model):
|
||||
|
||||
global AllowedUser
|
||||
if AllowedUser is None:
|
||||
|
||||
class _AllowedUser(model.DomainObject):
|
||||
|
||||
@classmethod
|
||||
def get(cls, **kw):
|
||||
'''Finds all the instances required.'''
|
||||
query = model.Session.query(cls).autoflush(False)
|
||||
return query.filter_by(**kw).all()
|
||||
|
||||
AllowedUser = _AllowedUser
|
||||
|
||||
package_allowed_users_table = sa.Table('package_allowed_users', model.meta.metadata,
|
||||
sa.Column('package_id', sa.types.UnicodeText, primary_key=True, default=u''),
|
||||
sa.Column('user_name', sa.types.UnicodeText, primary_key=True, default=u''),
|
||||
)
|
||||
|
||||
# Create the table only if it does not exist
|
||||
package_allowed_users_table.create(checkfirst=True)
|
||||
|
||||
model.meta.mapper(AllowedUser, package_allowed_users_table,)
|
|
@ -14,17 +14,16 @@ this.ckan.module('allowed-users', function ($, _) {
|
|||
var ds_private = $('#field-private').val();
|
||||
|
||||
if (ds_private == "True") {
|
||||
$('#field-allowed_users').prop('disabled', false); //Enable
|
||||
$('#field-allowed_users_str').prop('disabled', false); //Enable
|
||||
$('#field-adquire_url').prop('disabled', false); //Enable
|
||||
$('#field-searchable').prop('disabled', false); //Enable
|
||||
} else {
|
||||
$('#field-allowed_users').prop('disabled', true); //Disable
|
||||
$('#field-allowed_users_str').prop('disabled', true); //Disable
|
||||
$('#field-adquire_url').prop('disabled', true); //Disable
|
||||
$('#field-searchable').prop('disabled', true); //Disable
|
||||
|
||||
//Remove previous values
|
||||
$('#s2id_field-allowed_users .select2-search-choice').remove();
|
||||
$('#field-allowed_users').val('');
|
||||
$('#field-allowed_users_str').select2('val', '');
|
||||
$('#field-adquire_url').val('');
|
||||
$('#field-searchable').val('True');
|
||||
}
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
.label-adquired {
|
||||
background-color: #55a1ce;
|
||||
}
|
||||
|
||||
.label-owner {
|
||||
background-color: #e0051e;
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
import ckan.model as model
|
||||
import ckan.plugins.toolkit as tk
|
||||
import db
|
||||
|
||||
|
||||
def is_adquired(pkg_dict):
|
||||
|
||||
db.init_db(model)
|
||||
|
||||
if tk.c.user:
|
||||
return len(db.AllowedUser.get(package_id=pkg_dict['id'], user_name=tk.c.user)) > 0
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def is_owner(pkg_dict):
|
||||
if tk.c.userobj is not None:
|
||||
return tk.c.userobj.id == pkg_dict['creator_user_id']
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def get_allowed_users_str(users):
|
||||
if users:
|
||||
return ','.join([user for user in users])
|
||||
else:
|
||||
return ''
|
|
@ -1,4 +1,4 @@
|
|||
import ckan.lib.helpers as helpers
|
||||
import ckan.plugins.toolkit as tk
|
||||
import re
|
||||
|
||||
from urlparse import urlparse
|
||||
|
@ -7,19 +7,29 @@ from ckan.common import request
|
|||
|
||||
class FiWareNotificationParser(object):
|
||||
|
||||
def parse_notification(self):
|
||||
def parse_notification(self, request_data):
|
||||
|
||||
my_host = request.host
|
||||
|
||||
fields = ['customer_name', 'resources']
|
||||
|
||||
for field in fields:
|
||||
if not field in request_data:
|
||||
raise tk.ValidationError({'message': '%s not found in the request' % field})
|
||||
|
||||
# Parse the body
|
||||
content = helpers.json.loads(request.body, encoding='utf-8')
|
||||
resources = content['resources']
|
||||
user_name = content['customer_name']
|
||||
resources = request_data['resources']
|
||||
user_name = request_data['customer_name']
|
||||
datasets = []
|
||||
errors = []
|
||||
|
||||
if not isinstance(user_name, basestring):
|
||||
raise tk.ValidationError({'message': 'Invalid customer_name format'})
|
||||
|
||||
if not isinstance(resources, list):
|
||||
raise tk.ValidationError({'message': 'Invalid resources format'})
|
||||
|
||||
for resource in resources:
|
||||
if 'url' in resource:
|
||||
if isinstance(resource, dict) and 'url' in resource:
|
||||
parsed_url = urlparse(resource['url'])
|
||||
dataset_name = re.findall('^/dataset/([^/]+).*$', parsed_url.path)
|
||||
|
||||
|
@ -27,8 +37,9 @@ class FiWareNotificationParser(object):
|
|||
if parsed_url.netloc == my_host:
|
||||
datasets.append(dataset_name[0])
|
||||
else:
|
||||
errors.append('Dataset %s is associated with the CKAN instance located at %s' % (dataset_name[0], parsed_url.netloc))
|
||||
raise tk.ValidationError({'message': 'Dataset %s is associated with the CKAN instance located at %s'
|
||||
% (dataset_name[0], parsed_url.netloc)})
|
||||
else:
|
||||
raise tk.ValidationError({'message': 'Invalid resource format'})
|
||||
|
||||
return {'errors': errors,
|
||||
'users_datasets': [{'user': user_name, 'datasets': datasets}]
|
||||
}
|
||||
return {'users_datasets': [{'user': user_name, 'datasets': datasets}]}
|
||||
|
|
|
@ -1,148 +1,12 @@
|
|||
import ckan.lib.helpers as helpers
|
||||
import ckan.logic.auth as logic_auth
|
||||
import ckan.lib.search as search
|
||||
import ckan.plugins as p
|
||||
import ckan.plugins.toolkit as tk
|
||||
import ckan.new_authz as new_authz
|
||||
|
||||
from ckan.common import _, request
|
||||
|
||||
|
||||
######################################################################
|
||||
########################### AUTH FUNCTIONS ###########################
|
||||
######################################################################
|
||||
|
||||
@tk.auth_allow_anonymous_access
|
||||
def package_show(context, data_dict):
|
||||
user = context.get('user')
|
||||
user_obj = context.get('auth_user_obj')
|
||||
package = logic_auth.get_package_object(context, data_dict)
|
||||
|
||||
# datasets can be readed by it creator
|
||||
if package and user_obj and package.creator_user_id == user_obj.id:
|
||||
return {'success': True}
|
||||
|
||||
# Not active packages can only be seen by its owners
|
||||
if package.state == 'active':
|
||||
# anyone can see a public package
|
||||
if not package.private:
|
||||
return {'success': True}
|
||||
|
||||
# if the user has rights to read in the organization or in the group
|
||||
if package.owner_org:
|
||||
authorized = new_authz.has_user_permission_for_group_or_org(
|
||||
package.owner_org, user, 'read')
|
||||
else:
|
||||
authorized = False
|
||||
|
||||
# if the user is not authorized yet, we should check if the
|
||||
# user is in the allowed_users object
|
||||
if not authorized:
|
||||
if hasattr(package, 'extras') and 'allowed_users' in package.extras:
|
||||
allowed_users = package.extras['allowed_users']
|
||||
if allowed_users != '': # ''.split(',') ==> ['']
|
||||
allowed_users_list = allowed_users.split(',')
|
||||
if user in allowed_users_list:
|
||||
authorized = True
|
||||
|
||||
if not authorized:
|
||||
# Show a flash message with the URL to adquire the dataset
|
||||
# This message only can be shown when the user tries to access the dataset via its URL (/dataset/...)
|
||||
# The message cannot be displayed in other pages that uses the package_show function such as
|
||||
# the user profile page
|
||||
|
||||
if hasattr(package, 'extras') and 'adquire_url' in package.extras and request.path.startswith('/dataset/')\
|
||||
and package.extras['adquire_url'] != '':
|
||||
helpers.flash_notice(_('This private dataset can be adquired. To do so, please click ' +
|
||||
'<a target="_blank" href="%s">here</a>') % package.extras['adquire_url'],
|
||||
allow_html=True)
|
||||
|
||||
return {'success': False, 'msg': _('User %s not authorized to read package %s') % (user, package.id)}
|
||||
else:
|
||||
return {'success': True}
|
||||
else:
|
||||
return {'success': False, 'msg': _('User %s not authorized to read package %s') % (user, package.id)}
|
||||
|
||||
|
||||
def package_update(context, data_dict):
|
||||
user = context.get('user')
|
||||
user_obj = context.get('auth_user_obj')
|
||||
package = logic_auth.get_package_object(context, data_dict)
|
||||
|
||||
# Only the package creator can update it
|
||||
if package and user_obj and package.creator_user_id == user_obj.id:
|
||||
return {'success': True}
|
||||
|
||||
# if the user has rights to update a dataset in the organization or in the group
|
||||
if package and package.owner_org:
|
||||
authorized = new_authz.has_user_permission_for_group_or_org(
|
||||
package.owner_org, user, 'update_dataset')
|
||||
else:
|
||||
authorized = False
|
||||
|
||||
if not authorized:
|
||||
return {'success': False, 'msg': _('User %s is not authorized to edit package %s') % (user, package.id)}
|
||||
else:
|
||||
return {'success': True}
|
||||
|
||||
|
||||
@tk.auth_allow_anonymous_access
|
||||
def resource_show(context, data_dict):
|
||||
# This function is needed since CKAN resource_show function uses the default package_show
|
||||
# function instead the one defined in the plugin.
|
||||
# A bug is openend in order to be able to remove this function
|
||||
# https://github.com/ckan/ckan/issues/1818
|
||||
model = context['model']
|
||||
user = context.get('user')
|
||||
resource = logic_auth.get_resource_object(context, data_dict)
|
||||
|
||||
# check authentication against package
|
||||
query = model.Session.query(model.Package)\
|
||||
.join(model.ResourceGroup)\
|
||||
.join(model.Resource)\
|
||||
.filter(model.ResourceGroup.id == resource.resource_group_id)
|
||||
pkg = query.first()
|
||||
if not pkg:
|
||||
raise tk.ObjectNotFound(_('No package found for this resource, cannot check auth.'))
|
||||
|
||||
pkg_dict = {'id': pkg.id}
|
||||
authorized = package_show(context, pkg_dict).get('success')
|
||||
|
||||
if not authorized:
|
||||
return {'success': False, 'msg': _('User %s not authorized to read resource %s') % (user, resource.id)}
|
||||
else:
|
||||
return {'success': True}
|
||||
|
||||
|
||||
######################################################################
|
||||
############################### CHECKER ##############################
|
||||
######################################################################
|
||||
|
||||
def private_datasets_metadata_checker(key, data, errors, context):
|
||||
|
||||
# TODO: In some cases, we will need to retireve all the dataset information if it isn't present...
|
||||
|
||||
private_val = data.get(('private',))
|
||||
private = private_val is True if isinstance(private_val, bool) else private_val == "True"
|
||||
metadata_value = data[key]
|
||||
|
||||
# If allowed users are included and the dataset is not private outside and organization, an error will be raised.
|
||||
if metadata_value != '' and not private:
|
||||
errors[key].append(_('This field is only valid when you create a private dataset outside an organization'))
|
||||
|
||||
|
||||
######################################################################
|
||||
############################### ADQUIRED #############################
|
||||
######################################################################
|
||||
|
||||
def adquired(pkg_dict):
|
||||
|
||||
adquired = False
|
||||
if 'allowed_users' in pkg_dict and pkg_dict['allowed_users'] != '' and pkg_dict['allowed_users'] is not None:
|
||||
allowed_users = pkg_dict['allowed_users'].split(',')
|
||||
if tk.c.user in allowed_users:
|
||||
adquired = True
|
||||
|
||||
return adquired
|
||||
import auth
|
||||
import actions
|
||||
import constants
|
||||
import converters_validators as conv_val
|
||||
import db
|
||||
import helpers as helpers
|
||||
|
||||
|
||||
class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
|
||||
|
@ -151,6 +15,7 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
|
|||
p.implements(p.IAuthFunctions)
|
||||
p.implements(p.IConfigurer)
|
||||
p.implements(p.IRoutes, inherit=True)
|
||||
p.implements(p.IActions)
|
||||
p.implements(p.IPackageController)
|
||||
p.implements(p.ITemplateHelpers)
|
||||
|
||||
|
@ -158,21 +23,26 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
|
|||
############################ DATASET FORM ############################
|
||||
######################################################################
|
||||
|
||||
def __init__(self, name=None):
|
||||
self.indexer = search.PackageSearchIndex()
|
||||
|
||||
def _modify_package_schema(self):
|
||||
return {
|
||||
# remove datasets_with_no_organization_cannot_be_private validator
|
||||
'private': [tk.get_validator('ignore_missing'),
|
||||
tk.get_validator('boolean_validator')],
|
||||
'allowed_users': [tk.get_validator('ignore_missing'),
|
||||
private_datasets_metadata_checker,
|
||||
tk.get_converter('convert_to_extras')],
|
||||
'adquire_url': [tk.get_validator('ignore_missing'),
|
||||
private_datasets_metadata_checker,
|
||||
tk.get_converter('convert_to_extras')],
|
||||
'searchable': [tk.get_validator('ignore_missing'),
|
||||
private_datasets_metadata_checker,
|
||||
tk.get_converter('convert_to_extras'),
|
||||
tk.get_validator('boolean_validator')]
|
||||
constants.ALLOWED_USERS_STR: [tk.get_validator('ignore_missing'),
|
||||
conv_val.private_datasets_metadata_checker],
|
||||
constants.ALLOWED_USERS: [conv_val.allowed_users_convert,
|
||||
tk.get_validator('ignore_missing'),
|
||||
conv_val.private_datasets_metadata_checker],
|
||||
constants.ADQUIRE_URL: [tk.get_validator('ignore_missing'),
|
||||
conv_val.private_datasets_metadata_checker,
|
||||
tk.get_converter('convert_to_extras')],
|
||||
constants.SEARCHABLE: [tk.get_validator('ignore_missing'),
|
||||
conv_val.private_datasets_metadata_checker,
|
||||
tk.get_converter('convert_to_extras'),
|
||||
tk.get_validator('boolean_validator')]
|
||||
}
|
||||
|
||||
def create_package_schema(self):
|
||||
|
@ -190,12 +60,12 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
|
|||
def show_package_schema(self):
|
||||
schema = super(PrivateDatasets, self).show_package_schema()
|
||||
schema.update({
|
||||
'allowed_users': [tk.get_converter('convert_from_extras'),
|
||||
tk.get_validator('ignore_missing')],
|
||||
'adquire_url': [tk.get_converter('convert_from_extras'),
|
||||
tk.get_validator('ignore_missing')],
|
||||
'searchable': [tk.get_converter('convert_from_extras'),
|
||||
tk.get_validator('ignore_missing')]
|
||||
constants.ALLOWED_USERS: [conv_val.get_allowed_users,
|
||||
tk.get_validator('ignore_missing')],
|
||||
constants.ADQUIRE_URL: [tk.get_converter('convert_from_extras'),
|
||||
tk.get_validator('ignore_missing')],
|
||||
constants.SEARCHABLE: [tk.get_converter('convert_from_extras'),
|
||||
tk.get_validator('ignore_missing')]
|
||||
})
|
||||
return schema
|
||||
|
||||
|
@ -214,9 +84,10 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
|
|||
######################################################################
|
||||
|
||||
def get_auth_functions(self):
|
||||
return {'package_show': package_show,
|
||||
'package_update': package_update,
|
||||
'resource_show': resource_show}
|
||||
return {'package_show': auth.package_show,
|
||||
'package_update': auth.package_update,
|
||||
'resource_show': auth.resource_show,
|
||||
constants.PACKAGE_ADQUIRED: auth.package_adquired}
|
||||
|
||||
######################################################################
|
||||
############################ ICONFIGURER #############################
|
||||
|
@ -231,27 +102,31 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
|
|||
tk.add_resource('fanstatic', 'privatedatasets')
|
||||
|
||||
######################################################################
|
||||
############################### ROUTES ###############################
|
||||
############################## IROUTES ###############################
|
||||
######################################################################
|
||||
|
||||
def after_map(self, m):
|
||||
# DataSet adquired notification
|
||||
m.connect('/dataset_adquired',
|
||||
controller='ckanext.privatedatasets.controllers.api_controller:AdquiredDatasetsControllerAPI',
|
||||
action='add_users', conditions=dict(method=['POST']))
|
||||
m.connect('user_adquired_datasets', '/dashboad/adquired', ckan_icon='shopping-cart',
|
||||
controller='ckanext.privatedatasets.controllers.ui_controller:AdquiredDatasetsControllerUI',
|
||||
action='user_adquired_datasets', conditions=dict(method=['GET']))
|
||||
|
||||
return m
|
||||
|
||||
######################################################################
|
||||
############################## IACTIONS ##############################
|
||||
######################################################################
|
||||
|
||||
def get_actions(self):
|
||||
return {constants.PACKAGE_ADQUIRED: actions.package_adquired}
|
||||
|
||||
######################################################################
|
||||
######################### IPACKAGECONTROLLER #########################
|
||||
######################################################################
|
||||
|
||||
def before_index(self, pkg_dict):
|
||||
|
||||
if 'extras_searchable' in pkg_dict:
|
||||
if 'extras_' + constants.SEARCHABLE in pkg_dict:
|
||||
if pkg_dict['extras_searchable'] == 'False':
|
||||
pkg_dict['capacity'] = 'private'
|
||||
else:
|
||||
|
@ -278,12 +153,71 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
|
|||
return pkg_dict
|
||||
|
||||
def after_create(self, context, pkg_dict):
|
||||
session = context['session']
|
||||
update_cache = False
|
||||
|
||||
db.init_db(context['model'])
|
||||
|
||||
# Get the users and the package ID
|
||||
if constants.ALLOWED_USERS in pkg_dict:
|
||||
|
||||
allowed_users = pkg_dict[constants.ALLOWED_USERS]
|
||||
package_id = pkg_dict['id']
|
||||
|
||||
# Get current users
|
||||
users = db.AllowedUser.get(package_id=package_id)
|
||||
|
||||
# Delete users and save the list of current users
|
||||
current_users = []
|
||||
for user in users:
|
||||
current_users.append(user.user_name)
|
||||
if user.user_name not in allowed_users:
|
||||
session.delete(user)
|
||||
update_cache = True
|
||||
|
||||
# Add non existing users
|
||||
for user_name in allowed_users:
|
||||
if user_name not in current_users:
|
||||
out = db.AllowedUser()
|
||||
out.package_id = package_id
|
||||
out.user_name = user_name
|
||||
out.save()
|
||||
session.add(out)
|
||||
update_cache = True
|
||||
|
||||
session.commit()
|
||||
|
||||
# The cache should be updated. Otherwise, the system may return
|
||||
# outdated information in future requests
|
||||
if update_cache:
|
||||
new_pkg_dict = tk.get_action('package_show')(
|
||||
{'model': context['model'],
|
||||
'ignore_auth': True,
|
||||
'validate': False,
|
||||
'use_cache': False},
|
||||
{'id': package_id})
|
||||
self.indexer.update_dict(new_pkg_dict)
|
||||
|
||||
return pkg_dict
|
||||
|
||||
def after_update(self, context, pkg_dict):
|
||||
return pkg_dict
|
||||
return self.after_create(context, pkg_dict)
|
||||
|
||||
def after_show(self, context, pkg_dict):
|
||||
|
||||
user_obj = context.get('auth_user_obj')
|
||||
updating_via_api = context.get(constants.CONTEXT_CALLBACK, False)
|
||||
|
||||
# allowed_users, searchable and adquire_url fileds can be only viewed by:
|
||||
# * the dataset creator
|
||||
# * the sysadmin
|
||||
# * users allowed to update the allowed_users list via the notification API
|
||||
if not updating_via_api and (not user_obj or (pkg_dict['creator_user_id'] != user_obj.id and not user_obj.sysadmin)):
|
||||
attrs = [constants.ALLOWED_USERS, constants.SEARCHABLE, constants.ADQUIRE_URL]
|
||||
for attr in attrs:
|
||||
if attr in pkg_dict:
|
||||
del pkg_dict[attr]
|
||||
|
||||
return pkg_dict
|
||||
|
||||
def after_search(self, search_results, search_params):
|
||||
|
@ -293,8 +227,10 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
|
|||
return pkg_dict
|
||||
|
||||
######################################################################
|
||||
########################## ITEMPLATESHELER ###########################
|
||||
######################### ITEMPLATESHELPER ###########################
|
||||
######################################################################
|
||||
|
||||
def get_helpers(self):
|
||||
return {'privatedatasets_adquired': adquired}
|
||||
return {'privatedatasets_adquired': helpers.is_adquired,
|
||||
'get_allowed_users_str': helpers.get_allowed_users_str,
|
||||
'is_owner': helpers.is_owner}
|
||||
|
|
|
@ -69,7 +69,7 @@
|
|||
<span class="info-block info-inline">
|
||||
<i class="icon-info-sign"></i>
|
||||
{% trans %}
|
||||
When true, the dataset will be shown in search. Otherwise, it will only be accesible entering its URL directly.
|
||||
When true, the dataset will be shown in searchs. Otherwise, it will only be accesible entering its URL directly.
|
||||
{% endtrans %}
|
||||
</span>
|
||||
</div>
|
||||
|
@ -83,7 +83,7 @@
|
|||
{% endif %}
|
||||
|
||||
{% set users_attrs = {'data-module': 'autocomplete', 'data-module-tags': '', 'data-module-source': '/api/2/util/user/autocomplete?q=?'} %}
|
||||
{{ form.input('allowed_users', label=_('Allowed Users'), id='field-allowed_users', placeholder=_('Allowed Users'), value=data.allowed_users, error=errors.custom_text, classes=['control-full'], attrs=users_attrs) }}
|
||||
{{ form.input('allowed_users_str', label=_('Allowed Users'), id='field-allowed_users_str', placeholder=_('Allowed Users'), value=h.get_allowed_users_str(data.allowed_users), error=errors.custom_text, classes=['control-full'], attrs=users_attrs) }}
|
||||
|
||||
{{ form.input('adquire_url', label=_('Adquire URL'), id='field-adquire_url', placeholder=_('http://example.com/adquire/'), value=data.adquire_url, error=errors.custom_text, classes=['control-medium']) }}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ Example:
|
|||
{% set title = package.title or package.name %}
|
||||
{% set notes = h.markdown_extract(package.notes, extract_length=truncate) %}
|
||||
{% set adquired = h.privatedatasets_adquired(package) %}
|
||||
{% set owner = h.is_owner(package) %}
|
||||
|
||||
{% resource 'privatedatasets/custom.css' %}
|
||||
|
||||
|
@ -25,7 +26,7 @@ Example:
|
|||
{% block package_item_content %}
|
||||
<div class="dataset-content">
|
||||
<h3 class="dataset-heading">
|
||||
{% if package.private and not adquired %}
|
||||
{% if package.private and not adquired and not owner %}
|
||||
<span class="dataset-private label label-inverse">
|
||||
<i class="icon-lock"></i>
|
||||
{{ _('Private') }}
|
||||
|
@ -37,6 +38,12 @@ Example:
|
|||
{{ _('Adquired') }}
|
||||
</span>
|
||||
{% endif %}
|
||||
{% if owner and package.private %}
|
||||
<span class="dataset-private label label-owner">
|
||||
<i class="icon-user"></i>
|
||||
{{ _('Owner') }}
|
||||
</span>
|
||||
{% endif %}
|
||||
{{ h.link_to(h.truncate(title, truncate_title), h.url_for(controller='package', action='read', id=package.name)) }}
|
||||
{% if package.get('state', '').startswith('draft') %}
|
||||
<span class="label label-info">{{ _('Draft') }}</span>
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
{% block dashboard_activity_stream_context %}{% endblock %}
|
||||
|
||||
{% block page_primary_action %}
|
||||
{% link_for _('Adquire Dataset'), controller='package', action='new', class_="btn btn-primary", icon="shopping-cart" %}
|
||||
{% link_for _('Adquire Dataset'), controller='package', action='search', class_="btn btn-primary", icon="shopping-cart" %}
|
||||
{% endblock %}
|
||||
|
||||
{% block primary_content_inner %}
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
import ckanext.privatedatasets.actions as actions
|
||||
import unittest
|
||||
|
||||
from mock import MagicMock
|
||||
from nose_parameterized import parameterized
|
||||
|
||||
PARSER_CONFIG_PROP = 'ckan.privatedatasets.parser'
|
||||
IMPORT_ERROR_MSG = 'Unable to load the module'
|
||||
CLASS_NAME = 'parser_class'
|
||||
ADD_USERS_ERROR = 'Error updating the dataset'
|
||||
|
||||
|
||||
class ActionsTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
# Load the mocks
|
||||
self._config = actions.config
|
||||
actions.config = MagicMock()
|
||||
|
||||
self._importlib = actions.importlib
|
||||
actions.importlib = MagicMock()
|
||||
|
||||
self._plugins = actions.plugins
|
||||
actions.plugins = MagicMock()
|
||||
|
||||
def tearDown(self):
|
||||
# Unmock
|
||||
actions.config = self._config
|
||||
actions.importlib = self._importlib
|
||||
actions.plugins = self._plugins
|
||||
|
||||
@parameterized.expand([
|
||||
('', None, False, False, '%s not configured' % PARSER_CONFIG_PROP),
|
||||
('INVALID_CLASS', None, False, False, 'IndexError: list index out of range'),
|
||||
('INVALID.CLASS', None, False, False, 'IndexError: list index out of range'),
|
||||
('valid.path', CLASS_NAME, False, False, 'ImportError: %s' % IMPORT_ERROR_MSG),
|
||||
('valid.path', CLASS_NAME, False, True, 'ImportError: %s' % IMPORT_ERROR_MSG),
|
||||
('valid.path', CLASS_NAME, True, False, 'AttributeError: %s' % CLASS_NAME),
|
||||
('valid.path', CLASS_NAME, True, True, None)
|
||||
|
||||
])
|
||||
def test_class_cannot_be_loaded(self, class_path, class_name, path_exist, class_exist, expected_error):
|
||||
|
||||
class_package = class_path
|
||||
class_package += ':' + class_name if class_name else ''
|
||||
actions.config = {PARSER_CONFIG_PROP: class_package}
|
||||
|
||||
# Recover exception
|
||||
actions.plugins.toolkit.ValidationError = self._plugins.toolkit.ValidationError
|
||||
|
||||
# Configure the mock
|
||||
package = MagicMock()
|
||||
if class_name and not class_exist:
|
||||
delattr(package, class_name)
|
||||
|
||||
actions.importlib.import_module = MagicMock(side_effect=ImportError(IMPORT_ERROR_MSG) if not path_exist else None,
|
||||
return_value=package if path_exist else None)
|
||||
|
||||
# Call the function
|
||||
if expected_error:
|
||||
with self.assertRaises(actions.plugins.toolkit.ValidationError) as cm:
|
||||
actions.package_adquired({}, {})
|
||||
self.assertEqual(cm.exception.error_dict['message'], expected_error)
|
||||
else:
|
||||
# Exception is not risen
|
||||
self.assertEquals(None, actions.package_adquired({}, {}))
|
||||
|
||||
# Checks
|
||||
self.assertEquals(0, actions.plugins.toolkit.get_action.call_count)
|
||||
|
||||
def configure_mocks(self, parse_result, datasets_not_found=[], not_updatable_datasets=[], allowed_users=None):
|
||||
|
||||
actions.config = {PARSER_CONFIG_PROP: 'valid.path:%s' % CLASS_NAME}
|
||||
|
||||
# Configure mocks
|
||||
parser_instance = MagicMock()
|
||||
parser_instance.parse_notification = MagicMock(return_value=parse_result)
|
||||
package = MagicMock()
|
||||
package.parser_class = MagicMock(return_value=parser_instance)
|
||||
|
||||
actions.importlib.import_module = MagicMock(return_value=package)
|
||||
|
||||
# We should use the real exceptions
|
||||
actions.plugins.toolkit.ObjectNotFound = self._plugins.toolkit.ObjectNotFound
|
||||
actions.plugins.toolkit.ValidationError = self._plugins.toolkit.ValidationError
|
||||
|
||||
def _package_show(context, data_dict):
|
||||
if data_dict['id'] in datasets_not_found:
|
||||
raise actions.plugins.toolkit.ObjectNotFound()
|
||||
else:
|
||||
dataset = {'id': data_dict['id']}
|
||||
if allowed_users is not None:
|
||||
dataset['allowed_users'] = list(allowed_users)
|
||||
return dataset
|
||||
|
||||
def _package_update(context, data_dict):
|
||||
if data_dict['id'] in not_updatable_datasets:
|
||||
raise actions.plugins.toolkit.ValidationError({'allowed_users': [ADD_USERS_ERROR]})
|
||||
|
||||
package_show = MagicMock(side_effect=_package_show)
|
||||
package_update = MagicMock(side_effect=_package_update)
|
||||
|
||||
def _get_action(action):
|
||||
if action == 'package_update':
|
||||
return package_update
|
||||
elif action == 'package_show':
|
||||
return package_show
|
||||
|
||||
actions.plugins.toolkit.get_action = _get_action
|
||||
|
||||
return parser_instance.parse_notification, package_show, package_update
|
||||
|
||||
@parameterized.expand([
|
||||
# Simple Test: one user and one dataset
|
||||
({'user1': ['ds1']}, [], [], None),
|
||||
({'user2': ['ds1']}, [], [], []),
|
||||
({'user3': ['ds1']}, [], [], ['another_user']),
|
||||
({'user4': ['ds1']}, [], [], ['another_user', 'another_one']),
|
||||
({'user5': ['ds1']}, [], [], ['another_user', 'user_name']),
|
||||
({'user6': ['ds1']}, ['ds1'], [], None),
|
||||
({'user7': ['ds1']}, [], ['ds1'], []),
|
||||
({'user8': ['ds1']}, [], ['ds1'], ['another_user']),
|
||||
({'user9': ['ds1']}, [], ['ds1'], ['another_user', 'another_one']),
|
||||
({'user1': ['ds1']}, [], ['ds1'], ['another_user', 'user_name']),
|
||||
|
||||
# Complex test: some users and some datasets
|
||||
({'user1': ['ds1', 'ds2', 'ds3', 'ds4'], 'user2': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], []),
|
||||
({'user3': ['ds1', 'ds2', 'ds3', 'ds4'], 'user4': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], ['another_user']),
|
||||
({'user5': ['ds1', 'ds2', 'ds3', 'ds4'], 'user6': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], ['another_user', 'another_one']),
|
||||
({'user7': ['ds1', 'ds2', 'ds3', 'ds4'], 'user8': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], ['another_user', 'another_one', 'user7'])
|
||||
])
|
||||
def test_add_users(self, users_info, datasets_not_found, not_updatable_datasets, allowed_users=[]):
|
||||
|
||||
parse_result = {'users_datasets': []}
|
||||
|
||||
# Transform user_info
|
||||
for user in users_info:
|
||||
parse_result['users_datasets'].append({'user': user, 'datasets': users_info[user]})
|
||||
|
||||
parse_notification, package_show, package_update = self.configure_mocks(parse_result, datasets_not_found, not_updatable_datasets, allowed_users)
|
||||
|
||||
# Call the function
|
||||
context = {'user': 'user1', 'model': 'model', 'auth_obj': {'id': 1}}
|
||||
result = actions.package_adquired(context, users_info)
|
||||
|
||||
# Calculate the list of warns
|
||||
warns = []
|
||||
for user_datasets in parse_result['users_datasets']:
|
||||
for dataset_id in user_datasets['datasets']:
|
||||
if dataset_id in datasets_not_found:
|
||||
warns.append('Dataset %s was not found in this instance' % dataset_id)
|
||||
elif dataset_id in not_updatable_datasets and allowed_users is not None and user_datasets['user'] not in allowed_users:
|
||||
warns.append('%s(%s): %s' % (dataset_id, 'allowed_users', ADD_USERS_ERROR))
|
||||
|
||||
expected_result = {'warns': warns} if len(warns) > 0 else None
|
||||
|
||||
# Check that the returned result is as expected
|
||||
self.assertEquals(expected_result, result)
|
||||
|
||||
# Check that the initial functions (check_access and parse_notification) has been called properly
|
||||
parse_notification.assert_called_once_with(users_info)
|
||||
actions.plugins.toolkit.check_access.assert_called_once_with('package_adquired', context, users_info)
|
||||
|
||||
for user_datasets in parse_result['users_datasets']:
|
||||
for dataset_id in user_datasets['datasets']:
|
||||
# The show function is always called
|
||||
package_show.assert_any_call({'ignore_auth': True, 'updating_via_cb': True}, {'id': dataset_id})
|
||||
|
||||
# The update function is called only when the show function does not throw an exception and
|
||||
# when the user is not in the list of allowed users.
|
||||
if dataset_id not in datasets_not_found and allowed_users is not None and user_datasets['user'] not in allowed_users:
|
||||
# Calculate the list of allowed_users
|
||||
expected_allowed_users = list(allowed_users)
|
||||
expected_allowed_users.append(user_datasets['user'])
|
||||
|
||||
package_update.assert_any_call({'ignore_auth': True}, {'id': dataset_id, 'allowed_users': expected_allowed_users})
|
|
@ -0,0 +1,233 @@
|
|||
import unittest
|
||||
import ckanext.privatedatasets.auth as auth
|
||||
|
||||
from mock import MagicMock
|
||||
from nose_parameterized import parameterized
|
||||
|
||||
|
||||
class AuthTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Create mocks
|
||||
self._logic_auth = auth.logic_auth
|
||||
auth.logic_auth = MagicMock()
|
||||
|
||||
self._request = auth.request
|
||||
auth.request = MagicMock()
|
||||
|
||||
self._helpers = auth.helpers
|
||||
auth.helpers = MagicMock()
|
||||
|
||||
self._new_authz = auth.new_authz
|
||||
auth.new_authz = MagicMock()
|
||||
|
||||
self._tk = auth.tk
|
||||
auth.tk = MagicMock()
|
||||
|
||||
self._db = auth.db
|
||||
auth.db = MagicMock()
|
||||
|
||||
def tearDown(self):
|
||||
auth.logic_auth = self._logic_auth
|
||||
auth.request = self._request
|
||||
auth.helpers = self._helpers
|
||||
auth.new_authz = self._new_authz
|
||||
auth.tk = self._tk
|
||||
auth.db = self._db
|
||||
|
||||
if hasattr(self, '_package_show'):
|
||||
auth.package_show = self._package_show
|
||||
|
||||
def test_decordators(self):
|
||||
self.assertEquals(True, getattr(auth.package_show, 'auth_allow_anonymous_access', False))
|
||||
self.assertEquals(True, getattr(auth.resource_show, 'auth_allow_anonymous_access', False))
|
||||
self.assertEquals(True, getattr(auth.package_adquired, 'auth_allow_anonymous_access', False))
|
||||
|
||||
@parameterized.expand([
|
||||
# Anonymous user (public)
|
||||
(None, None, None, False, 'active', None, None, None, None, None, True),
|
||||
# Anonymous user (private)
|
||||
(None, None, None, True, 'active', None, None, None, None, '/', False),
|
||||
(None, None, '', True, 'active', None, None, '', None, '/', False),
|
||||
# Anonymous user (private). Buy URL not shown
|
||||
(None, None, None, True, 'active', None, None, None, 'google.es', '/', False),
|
||||
# Anonymous user (private). Buy URL show
|
||||
(None, None, None, True, 'active', None, None, None, 'google.es', '/dataset/testds', False),
|
||||
# The creator can always see the dataset
|
||||
(1, 1, None, False, 'active', None, None, None, None, None, True),
|
||||
(1, 1, None, True, 'active', 'conwet', None, None, None, None, True),
|
||||
(1, 1, None, True, 'active', None, None, None, None, None, True),
|
||||
(1, 1, None, False, 'draft', None, None, None, None, None, True),
|
||||
# Other user (no organizations)
|
||||
(1, 2, 'test', False, 'active', None, None, None, None, None, True),
|
||||
(1, 2, 'test', True, 'active', None, None, None, 'google.es', '/', False), # Buy MSG not shown
|
||||
(1, 2, 'test', True, 'active', None, None, None, None, '/dataset/testds', False), # Buy MSG not shown
|
||||
(1, 2, 'test', True, 'active', None, None, None, 'google.es', '/dataset/testds', False), # Buy MSG shown
|
||||
(1, 2, 'test', False, 'draft', None, None, None, None, None, False),
|
||||
# Other user but authorized in the list of authorized users
|
||||
(1, 2, 'test', True, 'active', None, None, True, None, None, True),
|
||||
# Other user and not authorized in the list of authorized users
|
||||
(1, 2, 'test', True, 'active', None, None, False, 'google.es', '/', False),
|
||||
(1, 2, 'test', True, 'active', None, None, False, 'google.es', '/dataset/testds', False),
|
||||
# Other user with organizations
|
||||
(1, 2, 'test', False, 'active', 'conwet', False, None, None, None, True),
|
||||
(1, 2, 'test', True, 'active', 'conwet', False, None, None, None, False),
|
||||
(1, 2, 'test', True, 'active', 'conwet', True, None, None, None, True),
|
||||
(1, 2, 'test', True, 'draft', 'conwet', True, None, None, None, False),
|
||||
# Other user with organizations (user is not in the organization)
|
||||
(1, 2, 'test', True, 'active', 'conwet', False, True, None, None, True),
|
||||
(1, 2, 'test', True, 'active', 'conwet', False, False, None, None, False),
|
||||
(1, 2, 'test', True, 'active', 'conwet', False, False, 'google.es', '/dataset/testds', False),
|
||||
(1, 2, 'test', True, 'active', 'conwet', False, False, 'google.es', '/', False) ])
|
||||
def test_auth_package_show(self, creator_user_id, user_obj_id, user, private, state, owner_org,
|
||||
owner_member, db_auth, adquire_url, request_path, authorized):
|
||||
|
||||
# Configure the mocks
|
||||
returned_package = MagicMock()
|
||||
returned_package.creator_user_id = creator_user_id
|
||||
returned_package.private = private
|
||||
returned_package.state = state
|
||||
returned_package.owner_org = owner_org
|
||||
returned_package.extras = {}
|
||||
|
||||
# Configure the database
|
||||
db_response = []
|
||||
if db_auth is True:
|
||||
out = auth.db.AllowedUser()
|
||||
out.package_id = 'package_id'
|
||||
out.user_name = user
|
||||
db_response.append(out)
|
||||
|
||||
auth.db.AllowedUser.get = MagicMock(return_value=db_response)
|
||||
|
||||
if adquire_url:
|
||||
returned_package.extras['adquire_url'] = adquire_url
|
||||
|
||||
auth.logic_auth.get_package_object = MagicMock(return_value=returned_package)
|
||||
auth.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member)
|
||||
auth.request.path = MagicMock(return_value=request_path)
|
||||
|
||||
# Prepare the context
|
||||
context = {'model': MagicMock()}
|
||||
if user is not None:
|
||||
context['user'] = user
|
||||
if user_obj_id is not None:
|
||||
context['auth_user_obj'] = MagicMock()
|
||||
context['auth_user_obj'].id = user_obj_id
|
||||
|
||||
# Function to be tested
|
||||
result = auth.package_show(context, {})
|
||||
|
||||
# Check the result
|
||||
self.assertEquals(authorized, result['success'])
|
||||
|
||||
# Premissions for organization are checked when the dataset is private, it belongs to an organization
|
||||
# and when the dataset has not been created by the user who is asking for it
|
||||
if private and owner_org and state == 'active' and creator_user_id != user_obj_id:
|
||||
auth.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'read')
|
||||
else:
|
||||
self.assertEquals(0, auth.new_authz.has_user_permission_for_group_or_org.call_count)
|
||||
|
||||
# The databse is only initialized when:
|
||||
# * the dataset is private AND
|
||||
# * the dataset is active AND
|
||||
# * the dataset has no organization OR the user does not belong to that organization AND
|
||||
# * the dataset has not been created by the user who is asking for it OR the user is not specified
|
||||
if private and state == 'active' and (not owner_org or not owner_member) and (creator_user_id != user_obj_id or user_obj_id is None):
|
||||
# Check that the database has been initialized properly
|
||||
auth.db.init_db.assert_called_once_with(context['model'])
|
||||
else:
|
||||
self.assertEquals(0, auth.db.init_db.call_count)
|
||||
|
||||
# Conditions to buy a dataset; It should be private, active and should not belong to any organization
|
||||
if not authorized and state == 'active' and not owner_org and request_path.startswith('/dataset/'):
|
||||
auth.helpers.flash_error.assert_called_once()
|
||||
else:
|
||||
self.assertEquals(0, auth.helpers.flash_error.call_count)
|
||||
|
||||
@parameterized.expand([
|
||||
(None, None, None, None, None, False), # Anonymous user
|
||||
(1, 1, None, None, None, True), # A user can edit its dataset
|
||||
(1, 2, None, None, None, False), # A user cannot edit a dataset belonging to another user
|
||||
(1, 2, 'test', 'conwet', False, False), # User without rights to update a dataset
|
||||
(1, 2, 'test', 'conwet', True, True), # User with rights to update a dataset
|
||||
])
|
||||
def test_auth_package_update(self, creator_user_id, user_obj_id, user, owner_org, owner_member, authorized):
|
||||
|
||||
# Configure the mocks
|
||||
returned_package = MagicMock()
|
||||
returned_package.creator_user_id = creator_user_id
|
||||
returned_package.owner_org = owner_org
|
||||
|
||||
auth.logic_auth.get_package_object = MagicMock(return_value=returned_package)
|
||||
auth.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member)
|
||||
|
||||
# Prepare the context
|
||||
context = {}
|
||||
if user is not None:
|
||||
context['user'] = user
|
||||
if user_obj_id is not None:
|
||||
context['auth_user_obj'] = MagicMock()
|
||||
context['auth_user_obj'].id = user_obj_id
|
||||
|
||||
# Function to be tested
|
||||
result = auth.package_update(context, {})
|
||||
|
||||
# Check the result
|
||||
self.assertEquals(authorized, result['success'])
|
||||
|
||||
# Permissions for organization are checked when the user asking to update the dataset is not the creator
|
||||
# and when the dataset has organization
|
||||
if creator_user_id != user_obj_id and owner_org:
|
||||
auth.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'update_dataset')
|
||||
else:
|
||||
self.assertEquals(0, auth.new_authz.has_user_permission_for_group_or_org.call_count)
|
||||
|
||||
@parameterized.expand([
|
||||
(True, True),
|
||||
(True, False),
|
||||
(False, False),
|
||||
(False, False)
|
||||
])
|
||||
def test_auth_resource_show(self, exist_pkg=True, authorized_pkg=True):
|
||||
#Recover the exception
|
||||
auth.tk.ObjectNotFound = self._tk.ObjectNotFound
|
||||
|
||||
# Mock the calls
|
||||
package = MagicMock()
|
||||
package.id = '1'
|
||||
|
||||
final_query = MagicMock()
|
||||
final_query.first = MagicMock(return_value=package if exist_pkg else None)
|
||||
|
||||
second_join = MagicMock()
|
||||
second_join.filter = MagicMock(return_value=final_query)
|
||||
|
||||
first_join = MagicMock()
|
||||
first_join.join = MagicMock(return_value=second_join)
|
||||
|
||||
query = MagicMock()
|
||||
query.join = MagicMock(return_value=first_join)
|
||||
|
||||
model = MagicMock()
|
||||
session = MagicMock()
|
||||
session.query = MagicMock(return_value=query)
|
||||
model.Session = session
|
||||
|
||||
# Create the context
|
||||
context = {}
|
||||
context['model'] = model
|
||||
|
||||
# Mock the package_show function
|
||||
self._package_show = auth.package_show
|
||||
success = True if authorized_pkg else False
|
||||
auth.package_show = MagicMock(return_value={'success': success})
|
||||
|
||||
if not exist_pkg:
|
||||
self.assertRaises(self._tk.ObjectNotFound, auth.resource_show, context, {})
|
||||
else:
|
||||
result = auth.resource_show(context, {})
|
||||
self.assertEquals(authorized_pkg, result['success'])
|
||||
|
||||
def test_package_adquired(self):
|
||||
self.assertTrue(auth.package_adquired({}, {}))
|
|
@ -1,194 +0,0 @@
|
|||
import ckanext.privatedatasets.controllers.api_controller as controller
|
||||
import json
|
||||
import unittest
|
||||
|
||||
from mock import MagicMock
|
||||
from nose_parameterized import parameterized
|
||||
|
||||
PARSER_CONFIG_PROP = 'ckan.privatedatasets.parser'
|
||||
IMPORT_ERROR_MSG = 'Unable to load the module'
|
||||
CLASS_NAME = 'parser_class'
|
||||
ADD_USERS_ERROR = 'Default Message'
|
||||
|
||||
|
||||
class APIControllerTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
# Get the instance
|
||||
self.instanceAPI = controller.AdquiredDatasetsControllerAPI()
|
||||
|
||||
# Load the mocks
|
||||
self._config = controller.config
|
||||
controller.config = MagicMock()
|
||||
|
||||
self._importlib = controller.importlib
|
||||
controller.importlib = MagicMock()
|
||||
|
||||
self._plugins = controller.plugins
|
||||
controller.plugins = MagicMock()
|
||||
|
||||
self._response = controller.response
|
||||
controller.response = MagicMock()
|
||||
|
||||
def tearDown(self):
|
||||
# Unmock
|
||||
controller.config = self._config
|
||||
controller.importlib = self._importlib
|
||||
controller.plugins = self._plugins
|
||||
controller.response = self._response
|
||||
|
||||
@parameterized.expand([
|
||||
('', None, False, False, '{"errors": ["%s not configured"]}' % PARSER_CONFIG_PROP),
|
||||
('INVALID_CLASS', None, False, False, '{"errors": ["IndexError: list index out of range"]}'),
|
||||
('INVALID.CLASS', None, False, False, '{"errors": ["IndexError: list index out of range"]}'),
|
||||
('valid.path', CLASS_NAME, False, False, '{"errors": ["ImportError: %s"]}' % IMPORT_ERROR_MSG),
|
||||
('valid.path', CLASS_NAME, False, True, '{"errors": ["ImportError: %s"]}' % IMPORT_ERROR_MSG),
|
||||
('valid.path', CLASS_NAME, True, False, '{"errors": ["AttributeError: %s"]}' % CLASS_NAME),
|
||||
('valid.path', CLASS_NAME, True, True, None)
|
||||
|
||||
])
|
||||
def test_class_cannot_be_loaded(self, class_path, class_name, path_exist, class_exist, expected_error):
|
||||
|
||||
class_package = class_path
|
||||
class_package += ':' + class_name if class_name else ''
|
||||
controller.config = {PARSER_CONFIG_PROP: class_package}
|
||||
|
||||
# Configure the mock
|
||||
package = MagicMock()
|
||||
if class_name and not class_exist:
|
||||
delattr(package, class_name)
|
||||
|
||||
controller.importlib.import_module = MagicMock(side_effect=ImportError(IMPORT_ERROR_MSG) if not path_exist else None,
|
||||
return_value=package if path_exist else None)
|
||||
|
||||
# Call the function
|
||||
result = self.instanceAPI.add_users()
|
||||
|
||||
# Checks
|
||||
self.assertEquals(expected_error, result)
|
||||
self.assertEquals(0, controller.plugins.toolkit.get_action.call_count)
|
||||
|
||||
if expected_error:
|
||||
self.assertEquals(400, controller.response.status_int)
|
||||
|
||||
def configure_mocks(self, parse_result, datasets_not_found=[], not_updatable_datasets=[], allowed_users=None):
|
||||
|
||||
controller.config = {PARSER_CONFIG_PROP: 'valid.path:%s' % CLASS_NAME}
|
||||
|
||||
# Configure mocks
|
||||
parser_instance = MagicMock()
|
||||
parser_instance.parse_notification = MagicMock(return_value=parse_result)
|
||||
package = MagicMock()
|
||||
package.parser_class = MagicMock(return_value=parser_instance)
|
||||
|
||||
controller.importlib.import_module = MagicMock(return_value=package)
|
||||
|
||||
# We should use the real exceptions
|
||||
controller.plugins.toolkit.ObjectNotFound = self._plugins.toolkit.ObjectNotFound
|
||||
controller.plugins.toolkit.ValidationError = self._plugins.toolkit.ValidationError
|
||||
|
||||
def _package_show(context, data_dict):
|
||||
if data_dict['id'] in datasets_not_found:
|
||||
raise controller.plugins.toolkit.ObjectNotFound()
|
||||
else:
|
||||
return {'id': data_dict['id'], 'allowed_users': allowed_users}
|
||||
|
||||
def _package_update(context, data_dict):
|
||||
if data_dict['id'] in not_updatable_datasets:
|
||||
raise controller.plugins.toolkit.ValidationError({'allowed_users': [ADD_USERS_ERROR]})
|
||||
|
||||
package_show = MagicMock(side_effect=_package_show)
|
||||
package_update = MagicMock(side_effect=_package_update)
|
||||
|
||||
def _get_action(action):
|
||||
if action == 'package_update':
|
||||
return package_update
|
||||
elif action == 'package_show':
|
||||
return package_show
|
||||
|
||||
controller.plugins.toolkit.get_action = _get_action
|
||||
|
||||
return package_show, package_update
|
||||
|
||||
@parameterized.expand([
|
||||
({'errors': ['Error1', 'Error2']}, '{"errors": ["Error1", "Error2"]}'),
|
||||
# Even when the users_datasets field is included, the users should not be introduced
|
||||
({'errors': ['Error1', 'Error2'], 'users_datasets': [{'user': 'user_name', 'datasets': ['ds1']}]}, '{"errors": ["Error1", "Error2"]}'),
|
||||
])
|
||||
def test_errors_in_parse(self, parse_result, expected_result):
|
||||
|
||||
package_search, package_update = self.configure_mocks(parse_result)
|
||||
|
||||
# Call the function
|
||||
result = self.instanceAPI.add_users()
|
||||
|
||||
# Checks
|
||||
self.assertEquals(0, package_search.call_count)
|
||||
self.assertEquals(0, package_update.call_count)
|
||||
self.assertEquals(expected_result, result)
|
||||
self.assertEquals(400, controller.response.status_int)
|
||||
|
||||
@parameterized.expand([
|
||||
# Simple Test: one user and one dataset
|
||||
({'user_name': ['ds1']}, [], [], None),
|
||||
({'user_name': ['ds1']}, [], [], ''),
|
||||
({'user_name': ['ds1']}, [], [], 'another_user'),
|
||||
({'user_name': ['ds1']}, [], [], 'another_user,another_one'),
|
||||
({'user_name': ['ds1']}, [], [], 'another_user,user_name'),
|
||||
({'user_name': ['ds1']}, ['ds1'], [], None),
|
||||
({'user_name': ['ds1']}, [], ['ds1'], ''),
|
||||
({'user_name': ['ds1']}, [], ['ds1'], 'another_user'),
|
||||
({'user_name': ['ds1']}, [], ['ds1'], 'another_user,another_one'),
|
||||
({'user_name': ['ds1']}, [], ['ds1'], 'another_user,user_name'),
|
||||
|
||||
# Complex test: some users and some datasets
|
||||
({'user1': ['ds1', 'ds2', 'ds3', 'ds4'], 'user2': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], ''),
|
||||
({'user1': ['ds1', 'ds2', 'ds3', 'ds4'], 'user2': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], 'another_user'),
|
||||
({'user1': ['ds1', 'ds2', 'ds3', 'ds4'], 'user2': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], 'another_user,another_one'),
|
||||
({'user1': ['ds1', 'ds2', 'ds3', 'ds4'], 'user2': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], 'another_user,another_one,user1')
|
||||
])
|
||||
def test_add_users(self, users_info, datasets_not_found, not_updatable_datasets, allowed_users=''):
|
||||
|
||||
parse_result = {'users_datasets': []}
|
||||
datasets_ids = []
|
||||
|
||||
for user in users_info:
|
||||
for dataset_id in users_info[user]:
|
||||
if dataset_id not in datasets_ids:
|
||||
datasets_ids.append(dataset_id)
|
||||
|
||||
parse_result['users_datasets'].append({'user': user, 'datasets': users_info[user]})
|
||||
|
||||
package_show, package_update = self.configure_mocks(parse_result, datasets_not_found, not_updatable_datasets, allowed_users)
|
||||
|
||||
# Call the function
|
||||
result = self.instanceAPI.add_users()
|
||||
|
||||
# Calculate the list of warns
|
||||
warns = []
|
||||
for user_datasets in parse_result['users_datasets']:
|
||||
for dataset_id in user_datasets['datasets']:
|
||||
if dataset_id in datasets_not_found:
|
||||
warns.append('Dataset %s was not found in this instance' % dataset_id)
|
||||
elif dataset_id in not_updatable_datasets and allowed_users is not None and user_datasets['user'] not in allowed_users:
|
||||
warns.append('Dataset %s: %s' % (dataset_id, ADD_USERS_ERROR))
|
||||
|
||||
expected_result = json.dumps({'warns': warns}) if len(warns) > 0 else None
|
||||
|
||||
# Check that the returned result is as expected
|
||||
self.assertEquals(expected_result, result)
|
||||
|
||||
for user_datasets in parse_result['users_datasets']:
|
||||
for dataset_id in user_datasets['datasets']:
|
||||
# The show function is always called
|
||||
package_show.assert_any_call({'ignore_auth': True}, {'id': dataset_id})
|
||||
|
||||
# The update function is called only when the show function does not throw an exception and
|
||||
# when the user is not in the list of allowed users.
|
||||
if dataset_id not in datasets_not_found and allowed_users is not None and user_datasets['user'] not in allowed_users:
|
||||
# Calculate the list of allowed_users
|
||||
expected_allowed_users = allowed_users
|
||||
expected_allowed_users += ',' + user_datasets['user'] if expected_allowed_users != '' else user_datasets['user']
|
||||
|
||||
package_update.assert_any_call({'ignore_auth': True}, {'id': dataset_id, 'allowed_users': expected_allowed_users})
|
|
@ -19,6 +19,9 @@ class UIControllerTest(unittest.TestCase):
|
|||
self._model = controller.model
|
||||
controller.model = MagicMock()
|
||||
|
||||
self._db = controller.db
|
||||
controller.db = MagicMock()
|
||||
|
||||
# Set exceptions
|
||||
controller.plugins.toolkit.ObjectNotFound = self._plugins.toolkit.ObjectNotFound
|
||||
controller.plugins.toolkit.NotAuthorized = self._plugins.toolkit.NotAuthorized
|
||||
|
@ -27,6 +30,7 @@ class UIControllerTest(unittest.TestCase):
|
|||
# Unmock
|
||||
controller.plugins = self._plugins
|
||||
controller.model = self._model
|
||||
controller.db = self._db
|
||||
|
||||
@parameterized.expand([
|
||||
(controller.plugins.toolkit.ObjectNotFound, 404),
|
||||
|
@ -90,15 +94,17 @@ class UIControllerTest(unittest.TestCase):
|
|||
for i in pkgs_ids:
|
||||
pkg = MagicMock()
|
||||
pkg.package_id = i
|
||||
pkg.user_name = user
|
||||
query_res.append(pkg)
|
||||
|
||||
filter_f = MagicMock()
|
||||
filter_f.filter = MagicMock(return_value=query_res)
|
||||
controller.model.Session.query = MagicMock(return_value=filter_f)
|
||||
controller.db.AllowedUser.get = MagicMock(return_value=query_res)
|
||||
|
||||
# Call the function
|
||||
returned = self.instanceUI.user_adquired_datasets()
|
||||
|
||||
# Check that the database has been initialized properly
|
||||
controller.db.init_db.assert_called_once_with(controller.model)
|
||||
|
||||
# User_show called correctly
|
||||
expected_context = {
|
||||
'model': controller.model,
|
||||
|
@ -109,12 +115,7 @@ class UIControllerTest(unittest.TestCase):
|
|||
user_show.assert_called_once_with(expected_context, {'user_obj': controller.plugins.toolkit.c.userobj})
|
||||
|
||||
# Query called correctry
|
||||
controller.model.Session.query.assert_called_once_with(controller.model.PackageExtra)
|
||||
|
||||
# Filter called correctly
|
||||
filter_f.filter.assert_called_once_with('package_extra.key=\'allowed_users\' AND package_extra.value!=\'\' ' +
|
||||
'AND package_extra.state=\'active\' AND ' +
|
||||
'regexp_split_to_array(package_extra.value,\',\') @> ARRAY[\'%s\']' % user)
|
||||
controller.db.AllowedUser.get.assert_called_once_with(user_name=user)
|
||||
|
||||
# Assert that the package_show has been called properly
|
||||
self.assertEquals(len(pkgs_ids), package_show.call_count)
|
||||
|
|
|
@ -0,0 +1,168 @@
|
|||
import unittest
|
||||
import ckanext.privatedatasets.converters_validators as conv_val
|
||||
|
||||
from mock import MagicMock
|
||||
from nose_parameterized import parameterized
|
||||
|
||||
|
||||
class ConvertersValidatorsTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Create mocks
|
||||
self._toolkit = conv_val.toolkit
|
||||
conv_val.toolkit = MagicMock()
|
||||
|
||||
self._db = conv_val.db
|
||||
conv_val.db = MagicMock()
|
||||
|
||||
def tearDown(self):
|
||||
conv_val.db = self._db
|
||||
conv_val.toolkit = self._toolkit
|
||||
|
||||
@parameterized.expand([
|
||||
# When no data is present, no errors should be returned
|
||||
(True, True, 'conwet', '', False),
|
||||
('True', True, 'conwet', '', False),
|
||||
(False, True, 'conwet', '', False),
|
||||
('False', True, 'conwet', '', False),
|
||||
(None, True, 'conwet', '', False),
|
||||
(None, False, 'conwet', '', False),
|
||||
(True, True, None, '', False),
|
||||
('True', True, None, '', False),
|
||||
(False, True, None, '', False),
|
||||
('False', True, None, '', False),
|
||||
(None, True, None, '', False),
|
||||
(None, False, None, '', False),
|
||||
(True, True, 'conwet', [], False),
|
||||
('True', True, 'conwet', [], False),
|
||||
(False, True, 'conwet', [], False),
|
||||
('False', True, 'conwet', [], False),
|
||||
(None, True, 'conwet', [], False),
|
||||
(None, False, 'conwet', [], False),
|
||||
(True, True, None, [], False),
|
||||
('True', True, None, [], False),
|
||||
(False, True, None, [], False),
|
||||
('False', True, None, [], False),
|
||||
(None, True, None, [], False),
|
||||
(None, False, None, [], False),
|
||||
# When data is present, the field is only valid when the
|
||||
# the private field is set to true (organization should
|
||||
# not be taken into account anymore)
|
||||
(True, True, 'conwet', 'test', False),
|
||||
('True', True, 'conwet', 'test', False),
|
||||
(True, False, 'conwet', 'test', False),
|
||||
('True', False, 'conwet', 'test', False),
|
||||
(False, True, 'conwet', 'test', True),
|
||||
('False', True, 'conwet', 'test', True),
|
||||
(False, False, 'conwet', 'test', True),
|
||||
('False', False, 'conwet', 'test', True),
|
||||
(None, True, 'conwet', 'test', False),
|
||||
(None, False, 'conwet', 'test', True),
|
||||
(True, True, None, 'test', False),
|
||||
('True', True, None, 'test', False),
|
||||
(True, False, None, 'test', False),
|
||||
('True', False, None, 'test', False),
|
||||
(False, True, None, 'test', True),
|
||||
('False', True, None, 'test', True),
|
||||
(False, False, None, 'test', True),
|
||||
('False', False, None, 'test', True),
|
||||
(None, True, None, 'test', False),
|
||||
(None, False, None, 'test', True),
|
||||
])
|
||||
def test_metadata_checker(self, received_private, package_private, owner_org, metada_val, error_set):
|
||||
|
||||
# Configure the mocks
|
||||
package_show = MagicMock(return_value={'private': package_private, 'id': 'package_id'})
|
||||
conv_val.toolkit.get_action = MagicMock(return_value=package_show)
|
||||
|
||||
KEY = ('test',)
|
||||
errors = {}
|
||||
errors[KEY] = []
|
||||
|
||||
data = {}
|
||||
data[('id',)] = 'package_id'
|
||||
data[('owner_org',)] = owner_org
|
||||
if received_private is not None:
|
||||
data[('private',)] = received_private
|
||||
data[KEY] = metada_val
|
||||
|
||||
conv_val.private_datasets_metadata_checker(KEY, data, errors, {})
|
||||
|
||||
if error_set:
|
||||
self.assertEquals(1, len(errors[KEY]))
|
||||
else:
|
||||
self.assertEquals(0, len(errors[KEY]))
|
||||
|
||||
@parameterized.expand([
|
||||
('', 0, []),
|
||||
('', 2, []),
|
||||
('a', 0, ['a']),
|
||||
('a', 2, ['a']),
|
||||
(',,, , , ', 0, []),
|
||||
(',,, , , ', 2, []),
|
||||
('a,z, d', 0, ['a', 'z', 'd']),
|
||||
('a,z, d', 2, ['a', 'z', 'd']),
|
||||
(['a','z', 'd'], 0, ['a', 'z', 'd']),
|
||||
(['a','z', 'd'], 2, ['a', 'z', 'd']),
|
||||
])
|
||||
def test_allowed_user_convert(self, users, previous_users, expected_users):
|
||||
key_str = 'allowed_users_str'
|
||||
key = 'allowed_users'
|
||||
|
||||
# Configure mock
|
||||
name_validator = MagicMock()
|
||||
conv_val.toolkit.get_validator = MagicMock(return_value=name_validator)
|
||||
|
||||
# Fullfill the data dictionary
|
||||
# * list should be included in the allowed_users filed
|
||||
# * strings should be included in the allowed_users_str field
|
||||
if isinstance(users, basestring):
|
||||
data_key = key_str
|
||||
else:
|
||||
data_key = key
|
||||
|
||||
data = {(data_key,): users}
|
||||
|
||||
for i in range(0, previous_users):
|
||||
data[(key, i)] = i
|
||||
|
||||
# Call the function
|
||||
context = {'user': 'test', 'auth_obj_id': {'id': 1}}
|
||||
conv_val.allowed_users_convert((key,), data, {}, context)
|
||||
|
||||
# Check that the users are set properly
|
||||
for i in range(previous_users, previous_users + len(expected_users)):
|
||||
name_validator.assert_any_call(expected_users[i - previous_users], context)
|
||||
self.assertEquals(expected_users[i - previous_users], data[(key, i)])
|
||||
|
||||
@parameterized.expand([
|
||||
([],),
|
||||
(['a'],),
|
||||
(['a', 'b'],),
|
||||
(['a', 'b', 'c'],),
|
||||
(['a', 'b', 'c', 'd', 'e'],)
|
||||
])
|
||||
def test_get_allowed_users(self, users):
|
||||
key = 'allowed_users'
|
||||
data = {('id',): 'package_id'}
|
||||
|
||||
# Create the users
|
||||
db_res = []
|
||||
for user in users:
|
||||
db_row = MagicMock()
|
||||
db_row.package_id = 'package_id'
|
||||
db_row.user_name = user
|
||||
db_res.append(db_row)
|
||||
|
||||
conv_val.db.AllowedUser.get = MagicMock(return_value=db_res)
|
||||
|
||||
# Call the function
|
||||
context = {'model': MagicMock()}
|
||||
conv_val.get_allowed_users((key,), data, {}, context)
|
||||
|
||||
# Check that the users are set properly
|
||||
for i, user in enumerate(users):
|
||||
self.assertEquals(user, data[(key, i)])
|
||||
|
||||
# Check that the table has been initialized properly
|
||||
conv_val.db.init_db.assert_called_once_with(context['model'])
|
|
@ -0,0 +1,39 @@
|
|||
import unittest
|
||||
import ckanext.privatedatasets.db as db
|
||||
|
||||
from mock import MagicMock
|
||||
|
||||
|
||||
class DBTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Restart databse initial status
|
||||
db.AllowedUser = None
|
||||
|
||||
# Create mocks
|
||||
self._sa = db.sa
|
||||
db.sa = MagicMock()
|
||||
|
||||
def tearDown(self):
|
||||
db.sa = self._sa
|
||||
|
||||
def test_initdb_not_initialized(self):
|
||||
|
||||
# Call the function
|
||||
model = MagicMock()
|
||||
db.init_db(model)
|
||||
|
||||
# Assert that table method has been called
|
||||
db.sa.Table.assert_called_once()
|
||||
model.meta.mapper.assert_called_once()
|
||||
|
||||
def test_initdb_initialized(self):
|
||||
db.AllowedUser = MagicMock()
|
||||
|
||||
# Call the function
|
||||
model = MagicMock()
|
||||
db.init_db(model)
|
||||
|
||||
# Assert that table method has been called
|
||||
self.assertEquals(0, db.sa.Table.call_count)
|
||||
self.assertEquals(0, model.meta.mapper.call_count)
|
|
@ -8,45 +8,71 @@ from nose_parameterized import parameterized
|
|||
TEST_CASES = {
|
||||
'one_ds': {
|
||||
'host': 'localhost',
|
||||
'json': '{"customer_name":"test", "resources": [{"url": "http://localhost/dataset/ds1"}]}',
|
||||
'result': {'errors': [], 'users_datasets': [{'user': 'test', 'datasets': ['ds1']}]}
|
||||
'json': {"customer_name": "test", "resources": [{"url": "http://localhost/dataset/ds1"}]},
|
||||
'result': {'users_datasets': [{'user': 'test', 'datasets': ['ds1']}]}
|
||||
},
|
||||
'two_ds': {
|
||||
'host': 'localhost',
|
||||
'json': '{"customer_name":"test", "resources": [{"url": "http://localhost/dataset/ds1"},' +
|
||||
'{"url": "http://localhost/dataset/ds2"}]}',
|
||||
'result': {'errors': [], 'users_datasets': [{'user': 'test', 'datasets': ['ds1', 'ds2']}]}
|
||||
'json': {"customer_name": "test", "resources": [{"url": "http://localhost/dataset/ds1"},
|
||||
{"url": "http://localhost/dataset/ds2"}]},
|
||||
'result': {'users_datasets': [{'user': 'test', 'datasets': ['ds1', 'ds2']}]}
|
||||
},
|
||||
'error': {
|
||||
'host': 'localhost',
|
||||
'json': '{"customer_name":"test", "resources": [{"url": "http://localhosta/dataset/ds1"}]}',
|
||||
'result': {'errors': ['Dataset ds1 is associated with the CKAN instance located at localhosta'],
|
||||
'users_datasets': [{'user': 'test', 'datasets': []}]}
|
||||
'json': {"customer_name": "test", "resources": [{"url": "http://localhosta/dataset/ds1"}]},
|
||||
'error': 'Dataset ds1 is associated with the CKAN instance located at localhosta',
|
||||
},
|
||||
'error_one_ds': {
|
||||
'host': 'localhost',
|
||||
'json': '{"customer_name":"test", "resources": [{"url": "http://localhosta/dataset/ds1"},' +
|
||||
'{"url": "http://localhost/dataset/ds2"}]}',
|
||||
'result': {'errors': ['Dataset ds1 is associated with the CKAN instance located at localhosta'],
|
||||
'users_datasets': [{'user': 'test', 'datasets': ['ds2']}]}
|
||||
'json': {"customer_name": "test", "resources": [{"url": "http://localhosta/dataset/ds1"},
|
||||
{"url": "http://localhost/dataset/ds2"}]},
|
||||
'error': 'Dataset ds1 is associated with the CKAN instance located at localhosta',
|
||||
},
|
||||
'two_errors': {
|
||||
'host': 'localhost',
|
||||
'json': '{"customer_name":"test", "resources": [{"url": "http://localhosta/dataset/ds1"},' +
|
||||
'{"url": "http://localhostb/dataset/ds2"}]}',
|
||||
'result': {'errors': ['Dataset ds1 is associated with the CKAN instance located at localhosta',
|
||||
'Dataset ds2 is associated with the CKAN instance located at localhostb'],
|
||||
'users_datasets': [{'user': 'test', 'datasets': []}]}
|
||||
'json': {"customer_name": "test", "resources": [{"url": "http://localhosta/dataset/ds1"},
|
||||
{"url": "http://localhostb/dataset/ds2"}]},
|
||||
'error': 'Dataset ds1 is associated with the CKAN instance located at localhosta',
|
||||
},
|
||||
'two_errors_two_ds': {
|
||||
'host': 'example.com',
|
||||
'json': '{"customer_name":"test", "resources": [{"url": "http://localhosta/dataset/ds1"},' +
|
||||
'{"url": "http://example.es/dataset/ds2"}, {"url": "http://example.com/dataset/ds3"},' +
|
||||
'{"url": "http://example.com/dataset/ds4"}]}',
|
||||
'result': {'errors': ['Dataset ds1 is associated with the CKAN instance located at localhosta',
|
||||
'Dataset ds2 is associated with the CKAN instance located at example.es'],
|
||||
'users_datasets': [{'user': 'test', 'datasets': ['ds3', 'ds4']}]}
|
||||
'json': {"customer_name": "test", "resources": [{"url": "http://localhosta/dataset/ds1"},
|
||||
{"url": "http://example.es/dataset/ds2"}, {"url": "http://example.com/dataset/ds3"},
|
||||
{"url": "http://example.com/dataset/ds4"}]},
|
||||
'error': 'Dataset ds1 is associated with the CKAN instance located at localhosta',
|
||||
},
|
||||
'no_customer_name': {
|
||||
'host': 'localhost',
|
||||
'json': {"resources": [{"url": "http://localhost/dataset/ds1"}]},
|
||||
'error': 'customer_name not found in the request'
|
||||
},
|
||||
'no_resources': {
|
||||
'host': 'localhost',
|
||||
'json': {"customer_name": "test"},
|
||||
'error': 'resources not found in the request'
|
||||
},
|
||||
'no_customer_name_and_resources': {
|
||||
'host': 'localhost',
|
||||
'json': {"customer": "test"},
|
||||
'error': 'customer_name not found in the request'
|
||||
},
|
||||
'invalid_customer_name': {
|
||||
'host': 'localhost',
|
||||
'json': {"customer_name": 974, "resources": [{"url": "http://localhost/dataset/ds1"}]},
|
||||
'error': 'Invalid customer_name format'
|
||||
},
|
||||
'invalid_resources': {
|
||||
'host': 'localhost',
|
||||
'json': {"customer_name": "test", "resources": "http://localhost/dataset/ds1"},
|
||||
'error': 'Invalid resources format'
|
||||
},
|
||||
'missing_url_resource': {
|
||||
'host': 'localhost',
|
||||
'json': {"customer_name": "test", "resources": [{"urla": "http://localhost/dataset/ds1"}]},
|
||||
'error': 'Invalid resource format'
|
||||
},
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -60,12 +86,8 @@ class FiWareParserTest(unittest.TestCase):
|
|||
self._request = fiware.request
|
||||
fiware.request = MagicMock()
|
||||
|
||||
#self._json_loads = fiware.helpers.json.loads
|
||||
#fiware.helpers.json.loads = MagicMock()
|
||||
|
||||
def tearDown(self):
|
||||
# Unmock functions
|
||||
#fiware.helpers.json.loads = self._json_loads
|
||||
fiware.request = self._request
|
||||
|
||||
@parameterized.expand([
|
||||
|
@ -75,15 +97,24 @@ class FiWareParserTest(unittest.TestCase):
|
|||
('error_one_ds',),
|
||||
('two_errors',),
|
||||
('two_errors_two_ds',),
|
||||
('no_customer_name',),
|
||||
('no_resources',),
|
||||
('no_customer_name_and_resources',),
|
||||
('invalid_customer_name',),
|
||||
('invalid_resources',),
|
||||
('missing_url_resource',)
|
||||
])
|
||||
def test_parse_notification(self, case):
|
||||
|
||||
# Configure
|
||||
fiware.request.host = TEST_CASES[case]['host']
|
||||
fiware.request.body = TEST_CASES[case]['json']
|
||||
|
||||
# Call the function
|
||||
result = self.parser.parse_notification()
|
||||
|
||||
# Assert that the result is what we expected to be
|
||||
self.assertEquals(TEST_CASES[case]['result'], result)
|
||||
if 'error' in TEST_CASES[case]:
|
||||
with self.assertRaises(fiware.tk.ValidationError) as cm:
|
||||
self.parser.parse_notification(TEST_CASES[case]['json'])
|
||||
self.assertEqual(cm.exception.error_dict['message'], TEST_CASES[case]['error'])
|
||||
else:
|
||||
result = self.parser.parse_notification(TEST_CASES[case]['json'])
|
||||
# Assert that the result is what we expected to be
|
||||
self.assertEquals(TEST_CASES[case]['result'], result)
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
import unittest
|
||||
import ckanext.privatedatasets.helpers as helpers
|
||||
|
||||
from mock import MagicMock
|
||||
from nose_parameterized import parameterized
|
||||
|
||||
|
||||
class HelpersTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Create mocks
|
||||
self._model = helpers.model
|
||||
helpers.model = MagicMock()
|
||||
|
||||
self._tk = helpers.tk
|
||||
helpers.tk = MagicMock()
|
||||
|
||||
self._db = helpers.db
|
||||
helpers.db = MagicMock()
|
||||
|
||||
def tearDown(self):
|
||||
helpers.model = self._model
|
||||
helpers.tk = self._tk
|
||||
helpers.db = self._db
|
||||
|
||||
@parameterized.expand([
|
||||
(False, 'user', False),
|
||||
(True, 'user', True),
|
||||
(False, None, False),
|
||||
(True, None, False),
|
||||
])
|
||||
def test_is_adquired(self, db_adquired, user, adquired):
|
||||
# Configure test
|
||||
helpers.tk.c.user = user
|
||||
pkg_dict = {'id': 'package_id'}
|
||||
|
||||
db_response = []
|
||||
if db_adquired is True:
|
||||
out = helpers.db.AllowedUser()
|
||||
out.package_id = 'package_id'
|
||||
out.user_name = user
|
||||
db_response.append(out)
|
||||
|
||||
helpers.db.AllowedUser.get = MagicMock(return_value=db_response)
|
||||
|
||||
# Check the function returns the expected result
|
||||
self.assertEquals(adquired, helpers.is_adquired(pkg_dict))
|
||||
|
||||
# Check that the database has been initialized properly
|
||||
helpers.db.init_db.assert_called_once_with(helpers.model)
|
||||
|
||||
@parameterized.expand([
|
||||
(1, 1, True),
|
||||
(1, 2, False),
|
||||
(1, None, False)
|
||||
])
|
||||
def test_is_owner(self, creator_user_id, user_id, owner):
|
||||
# Configure test
|
||||
if user_id:
|
||||
user = MagicMock()
|
||||
user.id = user_id
|
||||
helpers.tk.c.userobj = user
|
||||
else:
|
||||
helpers.tk.c.userobj = None
|
||||
|
||||
pkg_dict = {'creator_user_id': creator_user_id}
|
||||
|
||||
# Check that the functions return the expected result
|
||||
self.assertEquals(owner, helpers.is_owner(pkg_dict))
|
||||
|
||||
@parameterized.expand([
|
||||
([], ''),
|
||||
(['a'], 'a'),
|
||||
(['a', 'b'], 'a,b'),
|
||||
(['a', 'b', 'c', 'd'], 'a,b,c,d'),
|
||||
])
|
||||
def test_get_allowed_users_str(self, allowed_users, expected_result):
|
||||
self.assertEquals(expected_result, helpers.get_allowed_users_str(allowed_users))
|
|
@ -8,216 +8,45 @@ from nose_parameterized import parameterized
|
|||
class PluginTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Create the plugin
|
||||
self.privateDatasets = plugin.PrivateDatasets()
|
||||
|
||||
# Create mocks
|
||||
self._logic_auth = plugin.logic_auth
|
||||
plugin.logic_auth = MagicMock()
|
||||
|
||||
self._request = plugin.request
|
||||
plugin.request = MagicMock()
|
||||
|
||||
self._helpers = plugin.helpers
|
||||
plugin.helpers = MagicMock()
|
||||
|
||||
self._new_authz = plugin.new_authz
|
||||
plugin.new_authz = MagicMock()
|
||||
|
||||
self._tk = plugin.tk
|
||||
plugin.tk = MagicMock()
|
||||
|
||||
def tearDown(self):
|
||||
plugin.logic_auth = self._logic_auth
|
||||
plugin.request = self._request
|
||||
plugin.helpers = self._helpers
|
||||
plugin.new_authz = self._new_authz
|
||||
plugin.tk = self._tk
|
||||
self._db = plugin.db
|
||||
plugin.db = MagicMock()
|
||||
|
||||
if hasattr(self, '_package_show'):
|
||||
plugin.package_show = self._package_show
|
||||
self._search = plugin.search
|
||||
plugin.search = MagicMock()
|
||||
|
||||
# Create the plugin
|
||||
self.privateDatasets = plugin.PrivateDatasets()
|
||||
|
||||
def tearDown(self):
|
||||
plugin.tk = self._tk
|
||||
plugin.db = self._db
|
||||
plugin.search = self._search
|
||||
|
||||
@parameterized.expand([
|
||||
(plugin.p.IDatasetForm,),
|
||||
(plugin.p.IAuthFunctions,),
|
||||
(plugin.p.IConfigurer,),
|
||||
(plugin.p.IRoutes,),
|
||||
(plugin.p.IActions,),
|
||||
(plugin.p.IPackageController,),
|
||||
(plugin.p.ITemplateHelpers,)
|
||||
])
|
||||
def test_implementations(self, interface):
|
||||
def test_implementation(self, interface):
|
||||
self.assertTrue(interface.implemented_by(plugin.PrivateDatasets))
|
||||
|
||||
def test_decordators(self):
|
||||
self.assertEquals(True, getattr(plugin.package_show, 'auth_allow_anonymous_access', False))
|
||||
self.assertEquals(True, getattr(plugin.resource_show, 'auth_allow_anonymous_access', False))
|
||||
|
||||
@parameterized.expand([
|
||||
# Anonymous user (public)
|
||||
(None, None, None, False, 'active', None, None, None, None, None, True),
|
||||
# Anonymous user (private)
|
||||
(None, None, None, True, 'active', None, None, None, None, '/', False),
|
||||
(None, None, '', True, 'active', None, None, '', None, '/', False),
|
||||
# Anonymous user (private). Buy URL not shown
|
||||
(None, None, None, True, 'active', None, None, None, 'google.es', '/', False),
|
||||
# Anonymous user (private). Buy URL shown
|
||||
(None, None, None, True, 'active', None, None, None, 'google.es', '/dataset/testds', False),
|
||||
# The creator can always see the dataset
|
||||
(1, 1, None, False, 'active', None, None, None, None, None, True),
|
||||
(1, 1, None, True, 'active', None, None, None, None, None, True),
|
||||
(1, 1, None, False, 'draft', None, None, None, None, None, True),
|
||||
# Other user (no organizations)
|
||||
(1, 2, 'test', False, 'active', None, None, None, None, None, True),
|
||||
(1, 2, 'test', True, 'active', None, None, None, 'google.es', '/', False), # Buy MSG not shown
|
||||
(1, 2, 'test', True, 'active', None, None, None, None, '/dataset/testds', False), # Buy MSG not shown
|
||||
(1, 2, 'test', True, 'active', None, None, None, 'google.es', '/dataset/testds', False), # Buy MSG shown
|
||||
(1, 2, 'test', False, 'draft', None, None, None, None, None, False),
|
||||
# Other user but authorized in the list of authorized users
|
||||
(1, 2, 'test', True, 'active', None, None, 'some,another,test,other', None, None, True),
|
||||
(1, 2, 'test', True, 'active', None, None, 'test', None, None, True),
|
||||
# Other user and not authorized in the list of authorized users
|
||||
(1, 2, 'test', True, 'active', None, None, 'some,another,other', 'google.es', '/', False),
|
||||
(1, 2, 'test', True, 'active', None, None, 'some,another,other', 'google.es', '/dataset/testds', False),
|
||||
# Other user with organizations
|
||||
(1, 2, 'test', False, 'active', 'conwet', False, None, None, None, True),
|
||||
(1, 2, 'test', True, 'active', 'conwet', False, None, None, None, False),
|
||||
(1, 2, 'test', True, 'active', 'conwet', True, None, None, None, True),
|
||||
(1, 2, 'test', True, 'draft', 'conwet', True, None, None, None, False),
|
||||
# Other user with organizations (user is not in the organization)
|
||||
(1, 2, 'test', True, 'active', 'conwet', False, 'test', None, None, True),
|
||||
(1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', None, None, False),
|
||||
(1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', 'google.es', '/dataset/testds', False),
|
||||
(1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', 'google.es', '/', False)
|
||||
('package_show', plugin.auth.package_show),
|
||||
('package_update', plugin.auth.package_update),
|
||||
('package_show', plugin.auth.package_show),
|
||||
('package_adquired', plugin.auth.package_adquired)
|
||||
])
|
||||
def test_auth_package_show(self, creator_user_id, user_obj_id, user, private, state, owner_org,
|
||||
owner_member, allowed_users, adquire_url, request_path, authorized):
|
||||
|
||||
# Configure the mocks
|
||||
returned_package = MagicMock()
|
||||
returned_package.creator_user_id = creator_user_id
|
||||
returned_package.private = private
|
||||
returned_package.state = state
|
||||
returned_package.owner_org = owner_org
|
||||
returned_package.extras = {}
|
||||
|
||||
if allowed_users is not None:
|
||||
returned_package.extras['allowed_users'] = allowed_users
|
||||
|
||||
if adquire_url:
|
||||
returned_package.extras['adquire_url'] = adquire_url
|
||||
|
||||
plugin.logic_auth.get_package_object = MagicMock(return_value=returned_package)
|
||||
plugin.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member)
|
||||
plugin.request.path = MagicMock(return_value=request_path)
|
||||
|
||||
# Prepare the context
|
||||
context = {}
|
||||
if user is not None:
|
||||
context['user'] = user
|
||||
if user_obj_id is not None:
|
||||
context['auth_user_obj'] = MagicMock()
|
||||
context['auth_user_obj'].id = user_obj_id
|
||||
|
||||
# Function to be tested
|
||||
result = plugin.package_show(context, {})
|
||||
|
||||
# Check the result
|
||||
self.assertEquals(authorized, result['success'])
|
||||
|
||||
# Check that the mocks has been called properly
|
||||
if private and owner_org and state == 'active':
|
||||
plugin.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'read')
|
||||
|
||||
# Conditions to buy a dataset; It should be private, active and should not belong to any organization
|
||||
if not authorized and state == 'active' and not owner_org and request_path.startswith('/dataset/'):
|
||||
plugin.helpers.flash_error.assert_called_once()
|
||||
|
||||
@parameterized.expand([
|
||||
(None, None, None, None, None, False), # Anonymous user
|
||||
(1, 1, None, None, None, True), # A user can edit its dataset
|
||||
(1, 2, None, None, None, False), # A user cannot edit a dataset belonging to another user
|
||||
(1, 2, 'test', 'conwet', False, False), # User without rights to update a dataset
|
||||
(1, 2, 'test', 'conwet', True, True), # User with rights to update a dataset
|
||||
])
|
||||
def test_auth_package_update(self, creator_user_id, user_obj_id, user, owner_org, owner_member, authorized):
|
||||
|
||||
# Configure the mocks
|
||||
returned_package = MagicMock()
|
||||
returned_package.creator_user_id = creator_user_id
|
||||
returned_package.owner_org = owner_org
|
||||
|
||||
plugin.logic_auth.get_package_object = MagicMock(return_value=returned_package)
|
||||
plugin.new_authz.has_user_permission_for_group_or_org = MagicMock(return_value=owner_member)
|
||||
|
||||
# Prepare the context
|
||||
context = {}
|
||||
if user is not None:
|
||||
context['user'] = user
|
||||
if user_obj_id is not None:
|
||||
context['auth_user_obj'] = MagicMock()
|
||||
context['auth_user_obj'].id = user_obj_id
|
||||
|
||||
# Function to be tested
|
||||
result = plugin.package_update(context, {})
|
||||
|
||||
# Check the result
|
||||
self.assertEquals(authorized, result['success'])
|
||||
|
||||
# Check that the mock has been called properly
|
||||
if creator_user_id != user_obj_id and owner_org:
|
||||
plugin.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'update_dataset')
|
||||
|
||||
@parameterized.expand([
|
||||
(True, True),
|
||||
(True, False),
|
||||
(False, False),
|
||||
(False, False)
|
||||
])
|
||||
def test_auth_resource_show(self, exist_pkg=True, authorized_pkg=True):
|
||||
#Recover the exception
|
||||
plugin.tk.ObjectNotFound = self._tk.ObjectNotFound
|
||||
|
||||
# Mock the calls
|
||||
package = MagicMock()
|
||||
package.id = '1'
|
||||
|
||||
final_query = MagicMock()
|
||||
final_query.first = MagicMock(return_value=package if exist_pkg else None)
|
||||
|
||||
second_join = MagicMock()
|
||||
second_join.filter = MagicMock(return_value=final_query)
|
||||
|
||||
first_join = MagicMock()
|
||||
first_join.join = MagicMock(return_value=second_join)
|
||||
|
||||
query = MagicMock()
|
||||
query.join = MagicMock(return_value=first_join)
|
||||
|
||||
model = MagicMock()
|
||||
session = MagicMock()
|
||||
session.query = MagicMock(return_value=query)
|
||||
model.Session = session
|
||||
|
||||
# Create the context
|
||||
context = {}
|
||||
context['model'] = model
|
||||
|
||||
# Mock the package_show function
|
||||
self._package_show = plugin.package_show
|
||||
success = True if authorized_pkg else False
|
||||
plugin.package_show = MagicMock(return_value={'success': success})
|
||||
|
||||
if not exist_pkg:
|
||||
self.assertRaises(self._tk.ObjectNotFound, plugin.resource_show, context, {})
|
||||
else:
|
||||
result = plugin.resource_show(context, {})
|
||||
self.assertEquals(authorized_pkg, result['success'])
|
||||
|
||||
def test_auth_functions(self):
|
||||
def test_auth_function(self, function_name, expected_function):
|
||||
auth_functions = self.privateDatasets.get_auth_functions()
|
||||
self.assertEquals(auth_functions['package_show'], plugin.package_show)
|
||||
self.assertEquals(auth_functions['package_update'], plugin.package_update)
|
||||
self.assertEquals(auth_functions['resource_show'], plugin.resource_show)
|
||||
self.assertEquals(auth_functions[function_name], expected_function)
|
||||
|
||||
def test_update_config(self):
|
||||
# Call the method
|
||||
|
@ -234,85 +63,16 @@ class PluginTest(unittest.TestCase):
|
|||
self.privateDatasets.after_map(m)
|
||||
|
||||
# Test that the connect method has been called
|
||||
m.connect.assert_any_call('/dataset_adquired',
|
||||
controller='ckanext.privatedatasets.controllers.api_controller:AdquiredDatasetsControllerAPI',
|
||||
action='add_users', conditions=dict(method=['POST']))
|
||||
m.connect.assert_any_call('user_adquired_datasets', '/dashboad/adquired', ckan_icon='shopping-cart',
|
||||
controller='ckanext.privatedatasets.controllers.ui_controller:AdquiredDatasetsControllerUI',
|
||||
action='user_adquired_datasets', conditions=dict(method=['GET']))
|
||||
|
||||
@parameterized.expand([
|
||||
('create_package_schema'),
|
||||
('update_package_schema'),
|
||||
('package_adquired', plugin.actions.package_adquired)
|
||||
])
|
||||
def test_schema_create_update(self, function_name):
|
||||
|
||||
function = getattr(self.privateDatasets, function_name)
|
||||
returned_schema = function()
|
||||
|
||||
self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema['private'])
|
||||
self.assertTrue(plugin.tk.get_validator('boolean_validator') in returned_schema['private'])
|
||||
self.assertEquals(2, len(returned_schema['private']))
|
||||
|
||||
fields = ['allowed_users', 'adquire_url']
|
||||
|
||||
for field in fields:
|
||||
self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema[field])
|
||||
self.assertTrue(plugin.tk.get_converter('convert_to_extras') in returned_schema[field])
|
||||
self.assertTrue(plugin.private_datasets_metadata_checker in returned_schema[field])
|
||||
self.assertEquals(3, len(returned_schema[field]))
|
||||
|
||||
def test_schema_show(self):
|
||||
|
||||
returned_schema = self.privateDatasets.show_package_schema()
|
||||
|
||||
fields = ['allowed_users', 'adquire_url']
|
||||
|
||||
for field in fields:
|
||||
self.assertTrue(plugin.tk.get_validator('ignore_missing') in returned_schema[field])
|
||||
self.assertTrue(plugin.tk.get_converter('convert_from_extras') in returned_schema[field])
|
||||
self.assertEquals(2, len(returned_schema[field]))
|
||||
|
||||
@parameterized.expand([
|
||||
# When no data is present, no errors should be returned
|
||||
(True, 'conwet', '', False),
|
||||
('True', 'conwet', '', False),
|
||||
(False, 'conwet', '', False),
|
||||
('False', 'conwet', '', False),
|
||||
(True, None, '', False),
|
||||
('True', None, '', False),
|
||||
(False, None, '', False),
|
||||
('False', None, '', False),
|
||||
# When data is present, the field is only valid when the
|
||||
# organization is not set and the private field is set to true
|
||||
(True, 'conwet', 'test', False),
|
||||
('True', 'conwet', 'test', False),
|
||||
(False, 'conwet', 'test', True),
|
||||
('False', 'conwet', 'test', True),
|
||||
(True, None, 'test', False),
|
||||
('True', None, 'test', False),
|
||||
(False, None, 'test', True),
|
||||
('False', None, 'test', True),
|
||||
])
|
||||
def test_metadata_checker(self, private, owner_org, metada_val, error_set):
|
||||
|
||||
# TODO: Maybe this test should be refactored since the function should be refactored
|
||||
|
||||
KEY = ('test',)
|
||||
errors = {}
|
||||
errors[KEY] = []
|
||||
|
||||
data = {}
|
||||
data[('private',)] = private
|
||||
data[('owner_org',)] = owner_org
|
||||
data[KEY] = metada_val
|
||||
|
||||
plugin.private_datasets_metadata_checker(KEY, data, errors, {})
|
||||
|
||||
if error_set:
|
||||
self.assertEquals(1, len(errors[KEY]))
|
||||
else:
|
||||
self.assertEquals(0, len(errors[KEY]))
|
||||
def test_actions_function(self, function_name, expected_function):
|
||||
actions = self.privateDatasets.get_actions()
|
||||
self.assertEquals(actions[function_name], expected_function)
|
||||
|
||||
def test_fallback(self):
|
||||
self.assertEquals(True, self.privateDatasets.is_fallback())
|
||||
|
@ -321,27 +81,121 @@ class PluginTest(unittest.TestCase):
|
|||
self.assertEquals([], self.privateDatasets.package_types())
|
||||
|
||||
@parameterized.expand([
|
||||
('after_create',),
|
||||
('after_update',),
|
||||
('after_show',),
|
||||
('after_delete',),
|
||||
('after_create', 'False'),
|
||||
('after_update', 'False'),
|
||||
('after_show', 'False'),
|
||||
('after_delete', 'False')
|
||||
('privatedatasets_adquired', plugin.helpers.is_adquired),
|
||||
('get_allowed_users_str', plugin.helpers.get_allowed_users_str),
|
||||
('is_owner', plugin.helpers.is_owner)
|
||||
])
|
||||
def test_packagecontroller_after(self, function, private='True'):
|
||||
pkg_dict = {'test': 'a', 'private': private, 'allowed_users': 'a,b,c'}
|
||||
def test_helpers_functions(self, function_name, expected_function):
|
||||
helpers_functions = self.privateDatasets.get_helpers()
|
||||
self.assertEquals(helpers_functions[function_name], expected_function)
|
||||
|
||||
######################################################################
|
||||
############################## SCHEMAS ###############################
|
||||
######################################################################
|
||||
|
||||
def _check_fields(self, schema, fields):
|
||||
for field in fields:
|
||||
for checker_validator in fields[field]:
|
||||
self.assertTrue(checker_validator in schema[field])
|
||||
self.assertEquals(len(fields[field]), len(schema[field]))
|
||||
|
||||
@parameterized.expand([
|
||||
('create_package_schema'),
|
||||
('update_package_schema'),
|
||||
])
|
||||
def test_schema_create_update(self, function_name):
|
||||
|
||||
function = getattr(self.privateDatasets, function_name)
|
||||
returned_schema = function()
|
||||
|
||||
fields = {
|
||||
'private': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_validator('boolean_validator')],
|
||||
'adquire_url': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_converter('convert_to_extras'),
|
||||
plugin.conv_val.private_datasets_metadata_checker],
|
||||
'searchable': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_validator('boolean_validator'),
|
||||
plugin.tk.get_converter('convert_to_extras'), plugin.conv_val.private_datasets_metadata_checker],
|
||||
'allowed_users_str': [plugin.tk.get_validator('ignore_missing'), plugin.conv_val.private_datasets_metadata_checker],
|
||||
'allowed_users': [plugin.conv_val.allowed_users_convert, plugin.tk.get_validator('ignore_missing'),
|
||||
plugin.conv_val.private_datasets_metadata_checker]
|
||||
}
|
||||
|
||||
self._check_fields(returned_schema, fields)
|
||||
|
||||
def test_schema_show(self):
|
||||
|
||||
returned_schema = self.privateDatasets.show_package_schema()
|
||||
|
||||
fields = ['searchable', 'adquire_url']
|
||||
|
||||
fields = {
|
||||
'adquire_url': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_converter('convert_from_extras')],
|
||||
'searchable': [plugin.tk.get_validator('ignore_missing'), plugin.tk.get_converter('convert_from_extras')],
|
||||
'allowed_users': [plugin.tk.get_validator('ignore_missing'), plugin.conv_val.get_allowed_users]
|
||||
}
|
||||
|
||||
self._check_fields(returned_schema, fields)
|
||||
|
||||
######################################################################
|
||||
############################## PACKAGE ###############################
|
||||
######################################################################
|
||||
|
||||
@parameterized.expand([
|
||||
('True'),
|
||||
('False')
|
||||
])
|
||||
def test_packagecontroller_after_delete(self, private):
|
||||
pkg_dict = {'test': 'a', 'private': private, 'allowed_users': ['a', 'b', 'c']}
|
||||
expected_pkg_dict = pkg_dict.copy()
|
||||
result = getattr(self.privateDatasets, function)({}, pkg_dict) # Call the function
|
||||
self.assertEquals(expected_pkg_dict, result) # Check the result
|
||||
result = self.privateDatasets.after_delete({}, pkg_dict) # Call the function
|
||||
self.assertEquals(expected_pkg_dict, result) # Check the result
|
||||
|
||||
def test_packagecontroller_after_search(self):
|
||||
search_res = {'test': 'a', 'private': 'a', 'allowed_users': 'a,b,c'}
|
||||
search_res = {'test': 'a', 'private': 'a', 'allowed_users': ['a', 'b', 'c']}
|
||||
expected_search_res = search_res.copy()
|
||||
result = getattr(self.privateDatasets, 'after_search')(search_res, {}) # Call the function
|
||||
self.assertEquals(expected_search_res, result) # Check the result
|
||||
|
||||
@parameterized.expand([
|
||||
(True, 1, 1, False, True),
|
||||
(True, 1, 2, False, True),
|
||||
(True, 1, 1, True, True),
|
||||
(True, 1, 2, True, True),
|
||||
(True, 1, None, None, True),
|
||||
(True, 1, 1, None, True),
|
||||
(True, 1, None, True, True),
|
||||
(True, 1, None, False, True),
|
||||
(False, 1, 1, False, True),
|
||||
(False, 1, 2, False, False),
|
||||
(False, 1, 1, True, True),
|
||||
(False, 1, 2, True, True),
|
||||
(False, 1, None, None, False),
|
||||
(False, 1, 1, None, True),
|
||||
(False, 1, None, True, True),
|
||||
(False, 1, None, False, False),
|
||||
])
|
||||
def test_packagecontroller_after_show(self, update_via_api, creator_id, user_id, sysadmin, fields_expected):
|
||||
|
||||
context = {'updating_via_cb': update_via_api}
|
||||
|
||||
if creator_id is not None or sysadmin is not None:
|
||||
user = MagicMock()
|
||||
user.id = user_id
|
||||
user.sysadmin = sysadmin
|
||||
context['auth_user_obj'] = user
|
||||
|
||||
pkg_dict = {'creator_user_id': creator_id, 'allowed_users': ['a', 'b', 'c'], 'searchable': True, 'adquire_url': 'http://google.es'}
|
||||
|
||||
# Call the function
|
||||
result = self.privateDatasets.after_show(context, pkg_dict) # Call the function
|
||||
|
||||
# Check the final result
|
||||
fields = ['allowed_users', 'searchable', 'adquire_url']
|
||||
for field in fields:
|
||||
if fields_expected:
|
||||
self.assertTrue(field in result)
|
||||
else:
|
||||
self.assertFalse(field in result)
|
||||
|
||||
@parameterized.expand([
|
||||
('before_search',),
|
||||
('before_view',),
|
||||
|
@ -357,7 +211,7 @@ class PluginTest(unittest.TestCase):
|
|||
('delete', 'False')
|
||||
])
|
||||
def test_before_and_CRUD(self, function, private='True'):
|
||||
pkg_dict = {'test': 'a', 'private': private, 'allowed_users': 'a,b,c'}
|
||||
pkg_dict = {'test': 'a', 'private': private, 'allowed_users': ['a', 'b', 'c']}
|
||||
expected_pkg_dict = pkg_dict.copy()
|
||||
result = getattr(self.privateDatasets, function)(pkg_dict) # Call the function
|
||||
self.assertEquals(expected_pkg_dict, result) # Check the result
|
||||
|
@ -380,24 +234,102 @@ class PluginTest(unittest.TestCase):
|
|||
|
||||
self.assertEquals(expected_result, self.privateDatasets.before_index(pkg_dict))
|
||||
|
||||
def test_helpers_functions(self):
|
||||
helpers_functions = self.privateDatasets.get_helpers()
|
||||
self.assertEquals(helpers_functions['privatedatasets_adquired'], plugin.adquired)
|
||||
def _aux_test_after_create_update(self, function, new_users, current_users, users_to_add, users_to_delete):
|
||||
package_id = 'package_id'
|
||||
|
||||
# Configure mocks
|
||||
default_dict = {'a': '0', 'b': 1, 'm': True}
|
||||
package_show = MagicMock(return_value=default_dict)
|
||||
plugin.tk.get_action = MagicMock(return_value=package_show)
|
||||
|
||||
# Each time 'AllowedUser' is called, we must get a new instance
|
||||
# and this is the way to get this behaviour
|
||||
def constructor():
|
||||
return MagicMock()
|
||||
|
||||
plugin.db.AllowedUser = MagicMock(side_effect=constructor)
|
||||
|
||||
# Configure the database mock
|
||||
db_current_users = []
|
||||
for user in current_users:
|
||||
db_user = MagicMock()
|
||||
db_user.package_id = package_id
|
||||
db_user.user_name = user
|
||||
db_current_users.append(db_user)
|
||||
|
||||
plugin.db.AllowedUser.get = MagicMock(return_value=db_current_users)
|
||||
|
||||
# Call the method
|
||||
context = {'user': 'test', 'auth_user_obj': {'id': 1}, 'session': MagicMock(), 'model': MagicMock()}
|
||||
pkg_dict = {'id': 'package_id', 'allowed_users': new_users}
|
||||
function(context, pkg_dict)
|
||||
|
||||
def _test_calls(user_list, function):
|
||||
self.assertEquals(len(user_list), function.call_count)
|
||||
for user in user_list:
|
||||
found = False
|
||||
for call in function.call_args_list:
|
||||
call_user = call[0][0]
|
||||
|
||||
if call_user.package_id == package_id and call.user_name == user:
|
||||
found = True
|
||||
break
|
||||
|
||||
self.assertTrue(found)
|
||||
|
||||
# Check that the method has deleted the appropriate users
|
||||
_test_calls(users_to_delete, context['session'].delete)
|
||||
|
||||
# Check that the method has added the appropiate users
|
||||
_test_calls(users_to_add, context['session'].add)
|
||||
|
||||
if len(users_to_add) == 0 and len(users_to_delete) == 0:
|
||||
# Check that the cache has not been updated
|
||||
self.assertEquals(0, self.privateDatasets.indexer.update_dict.call_count)
|
||||
else:
|
||||
# Check that the cache has been updated
|
||||
self.privateDatasets.indexer.update_dict.assert_called_once_with(default_dict)
|
||||
|
||||
@parameterized.expand([
|
||||
(False, None, 'user', False),
|
||||
(True, '', 'user', False),
|
||||
(True, None, 'user', False),
|
||||
(True, 'user', 'user', True),
|
||||
(True, 'another_user,user', 'user', True),
|
||||
(True, 'another_user,user2', 'user', False),
|
||||
# One element
|
||||
(['a'], [], ['a'], []),
|
||||
(['a'], ['a'], [], []),
|
||||
([], ['a'], [], ['a']),
|
||||
# Two elements
|
||||
(['a', 'b'], [], ['a', 'b'], []),
|
||||
(['a', 'b'], ['b'], ['a'], []),
|
||||
(['a'], ['a', 'b'], [], ['b']),
|
||||
([], ['a', 'b'], [], ['a', 'b']),
|
||||
(['a', 'b'], ['a', 'b'], [], []),
|
||||
# Three or more elements
|
||||
(['c'], ['a', 'b'], ['c'], ['a', 'b']),
|
||||
(['a', 'b', 'c'], ['a', 'b'], ['c'], []),
|
||||
(['a', 'b', 'c'], ['a'], ['b', 'c'], []),
|
||||
(['a', 'b', 'c'], ['a', 'b', 'c'], [], []),
|
||||
(['a', 'b', 'c'], [], ['a', 'b', 'c'], []),
|
||||
(['a', 'b'], ['a', 'b', 'c'], [], ['c'])
|
||||
])
|
||||
def test_adquired(self, include_allowed_users, allowed_users, user, adquired):
|
||||
# Configure test
|
||||
plugin.tk.c.user = user
|
||||
pkg_dict = {}
|
||||
if include_allowed_users:
|
||||
pkg_dict['allowed_users'] = allowed_users
|
||||
def test_after_create(self, new_users, current_users, users_to_add, users_to_delete):
|
||||
self._aux_test_after_create_update(self.privateDatasets.after_create, new_users, current_users, users_to_add, users_to_delete)
|
||||
|
||||
# Check the function returns the expected result
|
||||
self.assertEquals(adquired, plugin.adquired(pkg_dict))
|
||||
@parameterized.expand([
|
||||
# One element
|
||||
(['a'], [], ['a'], []),
|
||||
(['a'], ['a'], [], []),
|
||||
([], ['a'], [], ['a']),
|
||||
# Two elements
|
||||
(['a', 'b'], [], ['a', 'b'], []),
|
||||
(['a', 'b'], ['b'], ['a'], []),
|
||||
(['a'], ['a', 'b'], [], ['b']),
|
||||
([], ['a', 'b'], [], ['a', 'b']),
|
||||
(['a', 'b'], ['a', 'b'], [], []),
|
||||
# Three or more elements
|
||||
(['c'], ['a', 'b'], ['c'], ['a', 'b']),
|
||||
(['a', 'b', 'c'], ['a', 'b'], ['c'], []),
|
||||
(['a', 'b', 'c'], ['a'], ['b', 'c'], []),
|
||||
(['a', 'b', 'c'], ['a', 'b', 'c'], [], []),
|
||||
(['a', 'b', 'c'], [], ['a', 'b', 'c'], []),
|
||||
(['a', 'b'], ['a', 'b', 'c'], [], ['c'])
|
||||
])
|
||||
def test_after_update(self, new_users, current_users, users_to_add, users_to_delete):
|
||||
self._aux_test_after_create_update(self.privateDatasets.after_update, new_users, current_users, users_to_add, users_to_delete)
|
||||
|
|
Loading…
Reference in New Issue