Merge pull request #3 from conwetlab/develop

Merge develop into master
This commit is contained in:
Aitor Magán García 2014-07-08 15:47:24 +02:00
commit c5776b1d96
15 changed files with 565 additions and 99 deletions

View File

@ -12,7 +12,11 @@ log = logging.getLogger(__name__)
PARSER_CONFIG_PROP = 'ckan.privatedatasets.parser'
class AdquiredDatasetsController(base.BaseController):
######################################################################
############################ API CONTROLLER ##########################
######################################################################
class AdquiredDatasetsControllerAPI(base.BaseController):
def __call__(self, environ, start_response):
# avoid status_code_redirect intercepting error responses

View File

@ -0,0 +1,48 @@
import ckan.lib.base as base
import ckan.model as model
import ckan.plugins as plugins
import logging
from ckan.common import _
log = logging.getLogger(__name__)
class AdquiredDatasetsControllerUI(base.BaseController):
def user_adquired_datasets(self):
c = plugins.toolkit.c
context = {
'model': model,
'session': model.Session,
'user': plugins.toolkit.c.user
}
# Get user information
try:
c.user_dict = plugins.toolkit.get_action('user_show')(context, {'user_obj': c.userobj})
c.user_dict['adquired_datasets'] = []
except plugins.toolkit.ObjectNotFound:
plugins.toolkit.abort(404, _('User not found'))
except plugins.toolkit.NotAuthorized:
plugins.toolkit.abort(401, _('Not authorized to see this page'))
# Get the datasets adquired by the user
query = model.Session.query(model.PackageExtra).filter(
# Select only the allowed_users key
'package_extra.key=\'%s\' AND package_extra.value!=\'\' ' % 'allowed_users' +
# Select only when the state is 'active'
'AND package_extra.state=\'%s\' ' % 'active' +
# The user name should be contained in the list
'AND regexp_split_to_array(package_extra.value,\',\') @> ARRAY[\'%s\']' % context['user'])
# Get the datasets
for dataset in query:
try:
dataset_dict = plugins.toolkit.get_action('package_show')(context, {'id': dataset.package_id})
c.user_dict['adquired_datasets'].append(dataset_dict)
except Exception:
continue
return plugins.toolkit.render('user/dashboard_adquired.html')

View File

@ -1,6 +1,6 @@
/* Dataset allowed_users and adquire_url toggler
* allowd_users and adquire_url can only be active when a user attempts to create
* a private dataset outside an organization
* allowed_users, adquire_url and searchable can only be active when a
* user attempts to create a private dataset outside an organization
*/
this.ckan.module('allowed-users', function ($, _) {
@ -12,19 +12,21 @@ this.ckan.module('allowed-users', function ($, _) {
},
_onChange: function() {
var ds_private = $('#field-private').val();
var organization = $('#field-organizations').val();
if (ds_private == "True" && !organization) {
if (ds_private == "True") {
$('#field-allowed_users').prop('disabled', false); //Enable
$('#field-adquire_url').prop('disabled', false); //Enable
$('#field-searchable').prop('disabled', false); //Enable
} else {
$('#field-allowed_users').prop('disabled', true); //Disable
$('#field-adquire_url').prop('disabled', true); //Disable
$('#field-searchable').prop('disabled', true); //Disable
//Remove previous values
$('#s2id_field-allowed_users .select2-search-choice').remove();
$('#field-allowed_users').val('');
$('#field-adquire_url').val('');
$('#field-searchable').val('True');
}
}
};

View File

@ -0,0 +1,3 @@
.label-adquired {
background-color: #55a1ce;
}

View File

@ -39,7 +39,7 @@ def package_show(context, data_dict):
if not authorized:
if hasattr(package, 'extras') and 'allowed_users' in package.extras:
allowed_users = package.extras['allowed_users']
if allowed_users != '': # ''.split(',') ==> ['']
if allowed_users != '': # ''.split(',') ==> ['']
allowed_users_list = allowed_users.split(',')
if user in allowed_users_list:
authorized = True
@ -50,7 +50,8 @@ def package_show(context, data_dict):
# The message cannot be displayed in other pages that uses the package_show function such as
# the user profile page
if hasattr(package, 'extras') and 'adquire_url' in package.extras and request.path.startswith('/dataset/'):
if hasattr(package, 'extras') and 'adquire_url' in package.extras and request.path.startswith('/dataset/')\
and package.extras['adquire_url'] != '':
helpers.flash_notice(_('This private dataset can be adquired. To do so, please click ' +
'<a target="_blank" href="%s">here</a>') % package.extras['adquire_url'],
allow_html=True)
@ -121,22 +122,37 @@ def private_datasets_metadata_checker(key, data, errors, context):
# TODO: In some cases, we will need to retireve all the dataset information if it isn't present...
private_val = data.get(('private',))
owner_org = data.get(('owner_org',))
private = private_val is True if isinstance(private_val, bool) else private_val == "True"
metadata_value = data[key]
# If allowed users are included and the dataset is not private outside and organization, an error will be raised.
if metadata_value != '' and (not private or owner_org):
if metadata_value != '' and not private:
errors[key].append(_('This field is only valid when you create a private dataset outside an organization'))
######################################################################
############################### ADQUIRED #############################
######################################################################
def adquired(pkg_dict):
adquired = False
if 'allowed_users' in pkg_dict and pkg_dict['allowed_users'] != '' and pkg_dict['allowed_users'] is not None:
allowed_users = pkg_dict['allowed_users'].split(',')
if tk.c.user in allowed_users:
adquired = True
return adquired
class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
p.implements(p.IDatasetForm)
p.implements(p.IAuthFunctions)
p.implements(p.IConfigurer)
p.implements(p.IRoutes, inherit=True)
p.implements(p.IActions)
p.implements(p.IPackageController)
p.implements(p.ITemplateHelpers)
######################################################################
############################ DATASET FORM ############################
@ -152,7 +168,11 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
tk.get_converter('convert_to_extras')],
'adquire_url': [tk.get_validator('ignore_missing'),
private_datasets_metadata_checker,
tk.get_converter('convert_to_extras')]
tk.get_converter('convert_to_extras')],
'searchable': [tk.get_validator('ignore_missing'),
private_datasets_metadata_checker,
tk.get_converter('convert_to_extras'),
tk.get_validator('boolean_validator')]
}
def create_package_schema(self):
@ -173,7 +193,9 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
'allowed_users': [tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')],
'adquire_url': [tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')]
tk.get_validator('ignore_missing')],
'searchable': [tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')]
})
return schema
@ -215,32 +237,64 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
def after_map(self, m):
# DataSet adquired notification
m.connect('/dataset_adquired',
controller='ckanext.privatedatasets.controller:AdquiredDatasetsController',
controller='ckanext.privatedatasets.controllers.api_controller:AdquiredDatasetsControllerAPI',
action='add_users', conditions=dict(method=['POST']))
m.connect('user_adquired_datasets', '/dashboad/adquired', ckan_icon='shopping-cart',
controller='ckanext.privatedatasets.controllers.ui_controller:AdquiredDatasetsControllerUI',
action='user_adquired_datasets', conditions=dict(method=['GET']))
return m
######################################################################
############################## IACTIONS ##############################
######################### IPACKAGECONTROLLER #########################
######################################################################
def get_actions(self):
# Update package_show function. When the URL is the URL used to
# check the datasets, the context parameter will me modified and
# the field 'ignore_capacity_check' will be added in order to
# get both the private and the public datasets.
def before_index(self, pkg_dict):
_old_package_search = tk.get_action('package_search')
if 'extras_searchable' in pkg_dict:
if pkg_dict['extras_searchable'] == 'False':
pkg_dict['capacity'] = 'private'
else:
pkg_dict['capacity'] = 'public'
@tk.side_effect_free
def _new_package_search(context, data_dict):
valid_urls = ['/dataset', '/api/3/action/package_search',
'/api/3/action/dataset_search']
if request.path in valid_urls:
context.update({'ignore_capacity_check': True})
return _old_package_search(context, data_dict)
return pkg_dict
_new_package_search.__doc__ = _old_package_search.__doc__
def before_view(self, pkg_dict):
return pkg_dict
# Modify the package_show function used across the system
return {'package_search': _new_package_search}
def before_search(self, search_params):
return search_params
def create(self, pkg_dict):
return pkg_dict
def edit(self, pkg_dict):
return pkg_dict
def read(self, pkg_dict):
return pkg_dict
def delete(self, pkg_dict):
return pkg_dict
def after_create(self, context, pkg_dict):
return pkg_dict
def after_update(self, context, pkg_dict):
return pkg_dict
def after_show(self, context, pkg_dict):
return pkg_dict
def after_search(self, search_results, search_params):
return search_results
def after_delete(self, context, pkg_dict):
return pkg_dict
######################################################################
########################## ITEMPLATESHELER ###########################
######################################################################
def get_helpers(self):
return {'privatedatasets_adquired': adquired}

View File

@ -47,10 +47,37 @@
<option value="{{ option[0] }}" {% if option[0] == data.private|trim %}selected="selected"{% endif %}>{{ option[1] }}</option>
{% endfor %}
</select>
<span class="info-block info-inline">
<i class="icon-info-sign"></i>
{% trans %}
If private, the dataset will be only accesible to certain users. Otherwise, everyone will be able to access the dataset.
{% endtrans %}
</span>
</div>
</div>
{% endblock %}
{% block package_metadata_fields_protected %}
<div class="control-group">
<label for="field-searchable" class="control-label">{{ _('Searchable') }}</label>
<div class="controls">
<select id="field-searchable" name="searchable">
{% for option in [('True', _('True')), ('False', _('False'))] %}
<option value="{{ option[0] }}" {% if option[0] == data.searchable|trim %}selected="selected"{% endif %}>{{ option[1] }}</option>
{% endfor %}
</select>
<span class="info-block info-inline">
<i class="icon-info-sign"></i>
{% trans %}
When true, the dataset will be shown in search. Otherwise, it will only be accesible entering its URL directly.
{% endtrans %}
</span>
</div>
</div>
{% endblock %}
{% if show_organizations_selector and show_visibility_selector %}
</div>
{% endif %}

View File

@ -0,0 +1,65 @@
{#
Displays a single of dataset.
package - A package to display.
item_class - The class name to use on the list item.
hide_resources - If true hides the resources (default: false).
banner - If true displays a popular banner (default: false).
truncate - The length to trucate the description to (default: 180)
truncate_title - The length to truncate the title to (default: 80).
Example:
{% snippet 'snippets/package_item.html', package=c.datasets[0] %}
#}
{% set truncate = truncate or 180 %}
{% set truncate_title = truncate_title or 80 %}
{% set title = package.title or package.name %}
{% set notes = h.markdown_extract(package.notes, extract_length=truncate) %}
{% set adquired = h.privatedatasets_adquired(package) %}
{% resource 'privatedatasets/custom.css' %}
<li class="{{ item_class or "dataset-item" }}">
{% block package_item_content %}
<div class="dataset-content">
<h3 class="dataset-heading">
{% if package.private and not adquired %}
<span class="dataset-private label label-inverse">
<i class="icon-lock"></i>
{{ _('Private') }}
</span>
{% endif %}
{% if adquired %}
<span class="dataset-private label label-adquired">
<i class="icon-shopping-cart"></i>
{{ _('Adquired') }}
</span>
{% endif %}
{{ h.link_to(h.truncate(title, truncate_title), h.url_for(controller='package', action='read', id=package.name)) }}
{% if package.get('state', '').startswith('draft') %}
<span class="label label-info">{{ _('Draft') }}</span>
{% elif package.get('state', '').startswith('deleted') %}
<span class="label label-important">{{ _('Deleted') }}</span>
{% endif %}
{{ h.popular('recent views', package.tracking_summary.recent, min=10) if package.tracking_summary }}
</h3>
{% if banner %}
<span class="banner">{{ _('Popular') }}</span>
{% endif %}
{% if notes %}
<div>{{ notes|urlize }}</div>
{% endif %}
</div>
{% if package.resources and not hide_resources %}
<ul class="dataset-resources unstyled">
{% for resource in h.dict_list_reduce(package.resources, 'format') %}
<li>
<a href="{{ h.url_for(controller='package', action='read', id=package.name) }}" class="label" data-format="{{ resource.lower() }}">{{ resource }}</a>
</li>
{% endfor %}
</ul>
{% endif %}
{% endblock %}
</li>

View File

@ -0,0 +1,49 @@
{% extends "user/edit_base.html" %}
{% set user = c.userobj %}
{% block breadcrumb_content %}
<li class="active"><a href="{{ h.url_for(controller='user', action='dashboard') }}">{{ _('Dashboard') }}</a></li>
{% endblock %}
{% block secondary %}{% endblock %}
{% block primary %}
<article class="module">
{% block page_header %}
<header class="module-content page-header hug">
<div class="content_action">
{% link_for _('Edit settings'), controller='user', action='edit', id=user.name, class_='btn', icon='cog' %}
</div>
<ul class="nav nav-tabs">
{{ h.build_nav_icon('user_dashboard', _('News feed')) }}
{{ h.build_nav_icon('user_dashboard_datasets', _('My Datasets')) }}
{{ h.build_nav_icon('user_adquired_datasets', _('Adquired Datasets')) }}
{{ h.build_nav_icon('user_dashboard_organizations', _('My Organizations')) }}
{{ h.build_nav_icon('user_dashboard_groups', _('My Groups')) }}
</ul>
</header>
{% endblock %}
<div class="module-content">
{% if self.page_primary_action() | trim %}
<div class="page_primary_action">
{% block page_primary_action %}{% endblock %}
</div>
{% endif %}
{% block primary_content_inner %}
<div data-module="dashboard">
{% snippet 'user/snippets/followee_dropdown.html', context=c.dashboard_activity_stream_context, followees=c.followee_list %}
<h2 class="page-heading">
{% block page_heading %}
{{ _('News feed') }}
{% endblock %}
<small>{{ _("Activity from items that I'm following") }}</small>
</h2>
{% block activity_stream %}
{{ c.dashboard_activity_stream }}
{% endblock %}
</div>
{% endblock %}
</div>
</article>
{% endblock %}

View File

@ -0,0 +1,19 @@
{% extends "user/dashboard.html" %}
{% block dashboard_activity_stream_context %}{% endblock %}
{% block page_primary_action %}
{% link_for _('Adquire Dataset'), controller='package', action='new', class_="btn btn-primary", icon="shopping-cart" %}
{% endblock %}
{% block primary_content_inner %}
<h2 class="hide-heading">{{ _('Adquired Datasets') }}</h2>
{% if c.user_dict.adquired_datasets %}
{% snippet 'snippets/package_list.html', packages=c.user_dict.adquired_datasets %}
{% else %}
<p class="empty">
{{ _('You haven\'t adquired any datasets.') }}
{% link_for _('Adquire one now?'), controller='package', action='search' %}
</p>
{% endif %}
{% endblock %}

View File

@ -1,4 +1,4 @@
import ckanext.privatedatasets.controller as controller
import ckanext.privatedatasets.controllers.api_controller as controller
import json
import unittest
@ -11,12 +11,12 @@ CLASS_NAME = 'parser_class'
ADD_USERS_ERROR = 'Default Message'
class ControllerTest(unittest.TestCase):
class APIControllerTest(unittest.TestCase):
def setUp(self):
# Get the instance
self.instance = controller.AdquiredDatasetsController()
self.instanceAPI = controller.AdquiredDatasetsControllerAPI()
# Load the mocks
self._config = controller.config
@ -63,7 +63,7 @@ class ControllerTest(unittest.TestCase):
return_value=package if path_exist else None)
# Call the function
result = self.instance.add_users()
result = self.instanceAPI.add_users()
# Checks
self.assertEquals(expected_error, result)
@ -121,7 +121,7 @@ class ControllerTest(unittest.TestCase):
package_search, package_update = self.configure_mocks(parse_result)
# Call the function
result = self.instance.add_users()
result = self.instanceAPI.add_users()
# Checks
self.assertEquals(0, package_search.call_count)
@ -163,7 +163,7 @@ class ControllerTest(unittest.TestCase):
package_show, package_update = self.configure_mocks(parse_result, datasets_not_found, not_updatable_datasets, allowed_users)
# Call the function
result = self.instance.add_users()
result = self.instanceAPI.add_users()
# Calculate the list of warns
warns = []

View File

@ -0,0 +1,137 @@
import ckanext.privatedatasets.controllers.ui_controller as controller
import unittest
from mock import MagicMock, ANY
from nose_parameterized import parameterized
class UIControllerTest(unittest.TestCase):
def setUp(self):
# Get the instance
self.instanceUI = controller.AdquiredDatasetsControllerUI()
# Load the mocks
self._plugins = controller.plugins
controller.plugins = MagicMock()
self._model = controller.model
controller.model = MagicMock()
# Set exceptions
controller.plugins.toolkit.ObjectNotFound = self._plugins.toolkit.ObjectNotFound
controller.plugins.toolkit.NotAuthorized = self._plugins.toolkit.NotAuthorized
def tearDown(self):
# Unmock
controller.plugins = self._plugins
controller.model = self._model
@parameterized.expand([
(controller.plugins.toolkit.ObjectNotFound, 404),
(controller.plugins.toolkit.NotAuthorized, 401)
])
def test_exceptions_loading_users(self, exception, expected_status):
# Configure the mock
user_show = MagicMock(side_effect=exception)
controller.plugins.toolkit.get_action = MagicMock(return_value=user_show)
# Call the function
self.instanceUI.user_adquired_datasets()
# Assertations
expected_context = {
'model': controller.model,
'session': controller.model.Session,
'user': controller.plugins.toolkit.c.user
}
user_show.assert_called_once_with(expected_context, {'user_obj': controller.plugins.toolkit.c.userobj})
controller.plugins.toolkit.abort.assert_called_once_with(expected_status, ANY)
@parameterized.expand([
({},),
({2: controller.plugins.toolkit.ObjectNotFound},),
({1: controller.plugins.toolkit.NotAuthorized},)
])
def test_no_error_loading_users(self, package_errors={}):
pkgs_ids = [0, 1, 2, 3]
user = 'example_user_test'
controller.plugins.toolkit.c.user = user
# get_action mock
default_package = {'pkg_id': 0, 'test': 'ok', 'res': 'ta'}
def _package_show(context, data_dict):
if data_dict['id'] in package_errors:
raise package_errors[data_dict['id']]('ERROR')
else:
pkg = default_package.copy()
pkg['pkg_id'] = data_dict['id']
return pkg
user_dict = {'user_name': 'test', 'another_val': 'example value'}
package_show = MagicMock(side_effect=_package_show)
user_show = MagicMock(return_value=user_dict.copy())
def _get_action(action):
if action == 'package_show':
return package_show
elif action == 'user_show':
return user_show
controller.plugins.toolkit.get_action = MagicMock(side_effect=_get_action)
# query mock
query_res = []
for i in pkgs_ids:
pkg = MagicMock()
pkg.package_id = i
query_res.append(pkg)
filter_f = MagicMock()
filter_f.filter = MagicMock(return_value=query_res)
controller.model.Session.query = MagicMock(return_value=filter_f)
# Call the function
returned = self.instanceUI.user_adquired_datasets()
# User_show called correctly
expected_context = {
'model': controller.model,
'session': controller.model.Session,
'user': controller.plugins.toolkit.c.user
}
user_show.assert_called_once_with(expected_context, {'user_obj': controller.plugins.toolkit.c.userobj})
# Query called correctry
controller.model.Session.query.assert_called_once_with(controller.model.PackageExtra)
# Filter called correctly
filter_f.filter.assert_called_once_with('package_extra.key=\'allowed_users\' AND package_extra.value!=\'\' ' +
'AND package_extra.state=\'active\' AND ' +
'regexp_split_to_array(package_extra.value,\',\') @> ARRAY[\'%s\']' % user)
# Assert that the package_show has been called properly
self.assertEquals(len(pkgs_ids), package_show.call_count)
for i in pkgs_ids:
package_show.assert_any_call(expected_context, {'id': i})
# Check that the template receives the correct datasets
expected_user_dict = user_dict.copy()
expected_user_dict['adquired_datasets'] = []
for i in pkgs_ids:
if i not in package_errors:
pkg = default_package.copy()
pkg['pkg_id'] = i
expected_user_dict['adquired_datasets'].append(pkg)
self.assertEquals(expected_user_dict, controller.plugins.toolkit.c.user_dict)
# Check that the render method has been called and that its result has been returned
self.assertEquals(controller.plugins.toolkit.render.return_value, returned)
controller.plugins.toolkit.render.assert_called_once_with('user/dashboard_adquired.html')

View File

@ -69,21 +69,21 @@ class FiWareParserTest(unittest.TestCase):
fiware.request = self._request
@parameterized.expand([
(TEST_CASES['one_ds']['host'], TEST_CASES['one_ds']['json'], TEST_CASES['one_ds']['result']),
(TEST_CASES['two_ds']['host'], TEST_CASES['two_ds']['json'], TEST_CASES['two_ds']['result']),
(TEST_CASES['error']['host'], TEST_CASES['error']['json'], TEST_CASES['error']['result']),
(TEST_CASES['error_one_ds']['host'], TEST_CASES['error_one_ds']['json'], TEST_CASES['error_one_ds']['result']),
(TEST_CASES['two_errors']['host'], TEST_CASES['two_errors']['json'], TEST_CASES['two_errors']['result']),
(TEST_CASES['two_errors_two_ds']['host'], TEST_CASES['two_errors_two_ds']['json'], TEST_CASES['two_errors_two_ds']['result']),
('one_ds',),
('two_ds',),
('error',),
('error_one_ds',),
('two_errors',),
('two_errors_two_ds',),
])
def test_parse_notification(self, my_host, request_body, expected_result):
def test_parse_notification(self, case):
# Configure
fiware.request.host = my_host
fiware.request.body = request_body
fiware.request.host = TEST_CASES[case]['host']
fiware.request.body = TEST_CASES[case]['json']
# Call the function
result = self.parser.parse_notification()
# Assert that the result is what we expected to be
self.assertEquals(expected_result, result)
self.assertEquals(TEST_CASES[case]['result'], result)

View File

@ -1,7 +1,7 @@
import unittest
import ckanext.privatedatasets.plugin as plugin
from mock import MagicMock, ANY
from mock import MagicMock
from nose_parameterized import parameterized
@ -37,12 +37,16 @@ class PluginTest(unittest.TestCase):
if hasattr(self, '_package_show'):
plugin.package_show = self._package_show
def test_implementations(self):
self.assertTrue(plugin.p.IDatasetForm.implemented_by(plugin.PrivateDatasets))
self.assertTrue(plugin.p.IAuthFunctions.implemented_by(plugin.PrivateDatasets))
self.assertTrue(plugin.p.IConfigurer.implemented_by(plugin.PrivateDatasets))
self.assertTrue(plugin.p.IRoutes.implemented_by(plugin.PrivateDatasets))
self.assertTrue(plugin.p.IActions.implemented_by(plugin.PrivateDatasets))
@parameterized.expand([
(plugin.p.IDatasetForm,),
(plugin.p.IAuthFunctions,),
(plugin.p.IConfigurer,),
(plugin.p.IRoutes,),
(plugin.p.IPackageController,),
(plugin.p.ITemplateHelpers,)
])
def test_implementations(self, interface):
self.assertTrue(interface.implemented_by(plugin.PrivateDatasets))
def test_decordators(self):
self.assertEquals(True, getattr(plugin.package_show, 'auth_allow_anonymous_access', False))
@ -78,7 +82,12 @@ class PluginTest(unittest.TestCase):
(1, 2, 'test', False, 'active', 'conwet', False, None, None, None, True),
(1, 2, 'test', True, 'active', 'conwet', False, None, None, None, False),
(1, 2, 'test', True, 'active', 'conwet', True, None, None, None, True),
(1, 2, 'test', True, 'draft', 'conwet', True, None, None, None, False)
(1, 2, 'test', True, 'draft', 'conwet', True, None, None, None, False),
# Other user with organizations (user is not in the organization)
(1, 2, 'test', True, 'active', 'conwet', False, 'test', None, None, True),
(1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', None, None, False),
(1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', 'google.es', '/dataset/testds', False),
(1, 2, 'test', True, 'active', 'conwet', False, 'some,another,other', 'google.es', '/', False)
])
def test_auth_package_show(self, creator_user_id, user_obj_id, user, private, state, owner_org,
owner_member, allowed_users, adquire_url, request_path, authorized):
@ -159,8 +168,8 @@ class PluginTest(unittest.TestCase):
plugin.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'update_dataset')
@parameterized.expand([
(True, True),
(True, False),
(True, True),
(True, False),
(False, False),
(False, False)
])
@ -210,42 +219,6 @@ class PluginTest(unittest.TestCase):
self.assertEquals(auth_functions['package_update'], plugin.package_update)
self.assertEquals(auth_functions['resource_show'], plugin.resource_show)
@parameterized.expand([
('/dataset', True), # Include ignore_capacity_check
('/', False), # Not include ignore_capacity_check
('/datasets', False), # Not include ignore_capacity_check
('/api/3/action/package_search', True), # Include ignore_capacity_check
('/api/3/action/dataset_search', True) # Include ignore_capacity_check
])
def test_package_seach_modified(self, request_path, include_ignore_capacity):
# Mock the default actions
package_search_old = MagicMock()
plugin.tk.get_action = MagicMock(return_value=package_search_old)
# Mock request
plugin.request.path = request_path
# Unmock the decorator
plugin.tk.side_effect_free = self._tk.side_effect_free
# Get the actions returned by the plugin
actions = self.privateDatasets.get_actions()
# Call the function
context = {'id': 'test', 'another_test': 'test_value'}
expected_context = context.copy()
data_dict = {'example': 'test', 'key': 'value'}
actions['package_search'](context, data_dict)
# Test if the default function has been called properly
package_search_old.assert_called_once_with(ANY, data_dict)
context_called = package_search_old.call_args_list[0][0][0] # First call, first argument
if include_ignore_capacity:
expected_context.update({'ignore_capacity_check': True})
self.assertEquals(expected_context, context_called)
def test_update_config(self):
# Call the method
config = {'test': 1234, 'another': 'value'}
@ -261,9 +234,12 @@ class PluginTest(unittest.TestCase):
self.privateDatasets.after_map(m)
# Test that the connect method has been called
m.connect.assert_called_once_with('/dataset_adquired',
controller='ckanext.privatedatasets.controller:AdquiredDatasetsController',
action='add_users', conditions=dict(method=['POST']))
m.connect.assert_any_call('/dataset_adquired',
controller='ckanext.privatedatasets.controllers.api_controller:AdquiredDatasetsControllerAPI',
action='add_users', conditions=dict(method=['POST']))
m.connect.assert_any_call('user_adquired_datasets', '/dashboad/adquired', ckan_icon='shopping-cart',
controller='ckanext.privatedatasets.controllers.ui_controller:AdquiredDatasetsControllerUI',
action='user_adquired_datasets', conditions=dict(method=['GET']))
@parameterized.expand([
('create_package_schema'),
@ -309,8 +285,8 @@ class PluginTest(unittest.TestCase):
('False', None, '', False),
# When data is present, the field is only valid when the
# organization is not set and the private field is set to true
(True, 'conwet', 'test', True),
('True', 'conwet', 'test', True),
(True, 'conwet', 'test', False),
('True', 'conwet', 'test', False),
(False, 'conwet', 'test', True),
('False', 'conwet', 'test', True),
(True, None, 'test', False),
@ -322,7 +298,7 @@ class PluginTest(unittest.TestCase):
# TODO: Maybe this test should be refactored since the function should be refactored
KEY = ('test')
KEY = ('test',)
errors = {}
errors[KEY] = []
@ -343,3 +319,85 @@ class PluginTest(unittest.TestCase):
def test_package_types(self):
self.assertEquals([], self.privateDatasets.package_types())
@parameterized.expand([
('after_create',),
('after_update',),
('after_show',),
('after_delete',),
('after_create', 'False'),
('after_update', 'False'),
('after_show', 'False'),
('after_delete', 'False')
])
def test_packagecontroller_after(self, function, private='True'):
pkg_dict = {'test': 'a', 'private': private, 'allowed_users': 'a,b,c'}
expected_pkg_dict = pkg_dict.copy()
result = getattr(self.privateDatasets, function)({}, pkg_dict) # Call the function
self.assertEquals(expected_pkg_dict, result) # Check the result
def test_packagecontroller_after_search(self):
search_res = {'test': 'a', 'private': 'a', 'allowed_users': 'a,b,c'}
expected_search_res = search_res.copy()
result = getattr(self.privateDatasets, 'after_search')(search_res, {}) # Call the function
self.assertEquals(expected_search_res, result) # Check the result
@parameterized.expand([
('before_search',),
('before_view',),
('create',),
('edit',),
('read',),
('delete',),
('before_search', 'False'),
('before_view', 'False'),
('create', 'False'),
('edit', 'False'),
('read', 'False'),
('delete', 'False')
])
def test_before_and_CRUD(self, function, private='True'):
pkg_dict = {'test': 'a', 'private': private, 'allowed_users': 'a,b,c'}
expected_pkg_dict = pkg_dict.copy()
result = getattr(self.privateDatasets, function)(pkg_dict) # Call the function
self.assertEquals(expected_pkg_dict, result) # Check the result
@parameterized.expand([
('public', None, 'public'),
('public', 'False', 'private'),
('public', 'True', 'public'),
('private', None, 'private'),
('private', 'False', 'private'),
('public', 'True', 'public')
])
def test_before_index(self, initialCapacity, searchable, finalCapacity):
pkg_dict = {'capacity': initialCapacity, 'name': 'a', 'description': 'This is a test'}
if searchable is not None:
pkg_dict['extras_searchable'] = searchable
expected_result = pkg_dict.copy()
expected_result['capacity'] = finalCapacity
self.assertEquals(expected_result, self.privateDatasets.before_index(pkg_dict))
def test_helpers_functions(self):
helpers_functions = self.privateDatasets.get_helpers()
self.assertEquals(helpers_functions['privatedatasets_adquired'], plugin.adquired)
@parameterized.expand([
(False, None, 'user', False),
(True, '', 'user', False),
(True, None, 'user', False),
(True, 'user', 'user', True),
(True, 'another_user,user', 'user', True),
(True, 'another_user,user2', 'user', False),
])
def test_adquired(self, include_allowed_users, allowed_users, user, adquired):
# Configure test
plugin.tk.c.user = user
pkg_dict = {}
if include_allowed_users:
pkg_dict['allowed_users'] = allowed_users
# Check the function returns the expected result
self.assertEquals(adquired, plugin.adquired(pkg_dict))

View File

@ -6,7 +6,7 @@ version = '0.1'
setup(
name='ckanext-privatedatasets',
version=version,
description="This extensions allows a user to create private datasets only visible to certain users. The extension provides also an API to specify which users can access private datasets",
description="This extensions allows users to create private datasets only visible to certain users. The extension provides also an API to specify programatically which users can access private datasets",
long_description='''
''',
classifiers=[], # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers