Improve coverage. Add tests for DB. init_db is now idempotent

This commit is contained in:
Aitor Magán 2014-07-15 11:52:09 +02:00
parent bd2cc7c7c1
commit c7de2e3c91
13 changed files with 105 additions and 73 deletions

View File

@ -34,8 +34,7 @@ def package_show(context, data_dict):
# user is in the allowed_users object
if not authorized:
# Init the model
if db.package_allowed_users_table is None:
db.init_db(context['model'])
db.init_db(context['model'])
if db.AllowedUser.get(package_id=package.id, user_name=user):
authorized = True

View File

@ -13,8 +13,7 @@ class AdquiredDatasetsControllerUI(base.BaseController):
def user_adquired_datasets(self):
if db.package_allowed_users_table is None:
db.init_db(model)
db.init_db(model)
c = plugins.toolkit.c
context = {

View File

@ -39,8 +39,7 @@ def allowed_users_convert(key, data, errors, context):
def get_allowed_users(key, data, errors, context):
pkg_id = data[('id',)]
if db.package_allowed_users_table is None:
db.init_db(context['model'])
db.init_db(context['model'])
users = db.AllowedUser.get(package_id=pkg_id)
counter = 0

View File

@ -1,31 +1,29 @@
import sqlalchemy as sa
package_allowed_users_table = None
AllowedUser = None
def init_db(model):
class _AllowedUser(model.DomainObject):
@classmethod
def get(cls, **kw):
'''Finds all the instances required.'''
query = model.Session.query(cls).autoflush(False)
return query.filter_by(**kw).all()
global AllowedUser
AllowedUser = _AllowedUser
if AllowedUser is None:
global package_allowed_users_table
package_allowed_users_table = sa.Table('package_allowed_users', model.meta.metadata,
sa.Column('package_id', sa.types.UnicodeText, primary_key=True, default=u''),
sa.Column('user_name', sa.types.UnicodeText, primary_key=True, default=u''),
)
class _AllowedUser(model.DomainObject):
# Create the table only if it does not exist
package_allowed_users_table.create(checkfirst=True)
@classmethod
def get(cls, **kw):
'''Finds all the instances required.'''
query = model.Session.query(cls).autoflush(False)
return query.filter_by(**kw).all()
model.meta.mapper(
AllowedUser,
package_allowed_users_table,
)
AllowedUser = _AllowedUser
package_allowed_users_table = sa.Table('package_allowed_users', model.meta.metadata,
sa.Column('package_id', sa.types.UnicodeText, primary_key=True, default=u''),
sa.Column('user_name', sa.types.UnicodeText, primary_key=True, default=u''),
)
# Create the table only if it does not exist
package_allowed_users_table.create(checkfirst=True)
model.meta.mapper(AllowedUser, package_allowed_users_table,)

View File

@ -5,8 +5,7 @@ import db
def is_adquired(pkg_dict):
if db.package_allowed_users_table is None:
db.init_db(model)
db.init_db(model)
if tk.c.user:
return len(db.AllowedUser.get(package_id=pkg_dict['id'], user_name=tk.c.user)) > 0
@ -15,7 +14,7 @@ def is_adquired(pkg_dict):
def is_owner(pkg_dict):
if tk.c.userobj:
if tk.c.userobj is not None:
return tk.c.userobj.id == pkg_dict['creator_user_id']
else:
return False

View File

@ -151,8 +151,7 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm):
def after_create(self, context, pkg_dict):
session = context['session']
if db.package_allowed_users_table is None:
db.init_db(context['model'])
db.init_db(context['model'])
# Get the users and the package ID
if constants.ALLOWED_USERS in pkg_dict:

View File

@ -3,7 +3,7 @@
{% block dashboard_activity_stream_context %}{% endblock %}
{% block page_primary_action %}
{% link_for _('Adquire Dataset'), controller='package', action='new', class_="btn btn-primary", icon="shopping-cart" %}
{% link_for _('Adquire Dataset'), controller='package', action='search', class_="btn btn-primary", icon="shopping-cart" %}
{% endblock %}
{% block primary_content_inner %}

View File

@ -128,7 +128,7 @@ class ActionsTest(unittest.TestCase):
({'user1': ['ds1', 'ds2', 'ds3', 'ds4'], 'user2': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], []),
({'user3': ['ds1', 'ds2', 'ds3', 'ds4'], 'user4': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], ['another_user']),
({'user5': ['ds1', 'ds2', 'ds3', 'ds4'], 'user6': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], ['another_user', 'another_one']),
({'user7': ['ds1', 'ds2', 'ds3', 'ds4'], 'user8': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], ['another_user', 'another_one', 'user1'])
({'user7': ['ds1', 'ds2', 'ds3', 'ds4'], 'user8': ['ds5', 'ds6', 'ds7']}, ['ds3', 'ds6'], ['ds4', 'ds7'], ['another_user', 'another_one', 'user7'])
])
def test_add_users(self, users_info, datasets_not_found, not_updatable_datasets, allowed_users=[]):

View File

@ -41,6 +41,7 @@ class AuthTest(unittest.TestCase):
def test_decordators(self):
self.assertEquals(True, getattr(auth.package_show, 'auth_allow_anonymous_access', False))
self.assertEquals(True, getattr(auth.resource_show, 'auth_allow_anonymous_access', False))
self.assertEquals(True, getattr(auth.package_adquired, 'auth_allow_anonymous_access', False))
@parameterized.expand([
# Anonymous user (public)
@ -54,6 +55,7 @@ class AuthTest(unittest.TestCase):
(None, None, None, True, 'active', None, None, None, 'google.es', '/dataset/testds', False),
# The creator can always see the dataset
(1, 1, None, False, 'active', None, None, None, None, None, True),
(1, 1, None, True, 'active', 'conwet', None, None, None, None, True),
(1, 1, None, True, 'active', None, None, None, None, None, True),
(1, 1, None, False, 'draft', None, None, None, None, None, True),
# Other user (no organizations)
@ -120,10 +122,20 @@ class AuthTest(unittest.TestCase):
# Check the result
self.assertEquals(authorized, result['success'])
# Check that the mocks has been called properly
if private and owner_org and state == 'active':
# Premissions for organization are checked when the dataset is private, it belongs to an organization
# and when the dataset has not been created by the user who is asking for it
if private and owner_org and state == 'active' and creator_user_id != user_obj_id:
auth.new_authz.has_user_permission_for_group_or_org.assert_called_once_with(owner_org, user, 'read')
# The databse is only initialized when:
# * the dataset is private AND
# * the dataset is active AND
# * the dataset has no organization OR the user does not belong to that organization AND
# * the dataset has not been created by the user who is asking for it
if private and state == 'active' and ((owner_org and not owner_org) or not owner_org) and creator_user_id != user_obj_id:
# Check that the database has been initialized properly
auth.db.init_db.assert_called_once_with(context['model'])
# Conditions to buy a dataset; It should be private, active and should not belong to any organization
if not authorized and state == 'active' and not owner_org and request_path.startswith('/dataset/'):
auth.helpers.flash_error.assert_called_once()
@ -208,3 +220,6 @@ class AuthTest(unittest.TestCase):
else:
result = auth.resource_show(context, {})
self.assertEquals(authorized_pkg, result['success'])
def test_package_adquired(self):
self.assertTrue(auth.package_adquired({}, {}))

View File

@ -102,6 +102,9 @@ class UIControllerTest(unittest.TestCase):
# Call the function
returned = self.instanceUI.user_adquired_datasets()
# Check that the database has been initialized properly
controller.db.init_db.assert_called_once_with(controller.model)
# User_show called correctly
expected_context = {
'model': controller.model,

View File

@ -74,12 +74,14 @@ class ConvertersValidatorsTest(unittest.TestCase):
self.assertEquals(0, len(errors[KEY]))
@parameterized.expand([
('', 0, []),
('', 2, []),
('a', 0, ['a']),
('a', 2, ['a']),
('a,z, d', 0, ['a', 'z', 'd']),
('a,z, d', 2, ['a', 'z', 'd'])
('', 0, []),
('', 2, []),
('a', 0, ['a']),
('a', 2, ['a']),
('a,z, d', 0, ['a', 'z', 'd']),
('a,z, d', 2, ['a', 'z', 'd']),
(['a','z', 'd'], 0, ['a', 'z', 'd']),
(['a','z', 'd'], 2, ['a', 'z', 'd'])
])
def test_allowed_user_convert(self, users_str, previous_users, expected_users):
key_str = 'allowed_users_str'
@ -97,24 +99,16 @@ class ConvertersValidatorsTest(unittest.TestCase):
self.assertEquals(expected_users[i - previous_users], data[(key, i)])
@parameterized.expand([
([], True),
(['a'], True),
(['a', 'b'], True),
(['a', 'b', 'c'], True),
(['a', 'b', 'c', 'd', 'e'], True),
([], False),
(['a'], False),
(['a', 'b'], False),
(['a', 'b', 'c'], False),
(['a', 'b', 'c', 'd', 'e'], False)
([],),
(['a'],),
(['a', 'b'],),
(['a', 'b', 'c'],),
(['a', 'b', 'c', 'd', 'e'],)
])
def test_get_allowed_users(self, users, table_initialized):
def test_get_allowed_users(self, users):
key = 'allowed_users'
data = {('id',): 'package_id'}
# Table init
conv_val.db.package_allowed_users_table = None if not table_initialized else MagicMock()
# Each time 'AllowedUser' is called, we must get a new instance
# and this is the way to get this behaviour
def constructor():
@ -141,7 +135,4 @@ class ConvertersValidatorsTest(unittest.TestCase):
self.assertEquals(user, data[(key, i)])
# Check that the table has been initialized properly
if not table_initialized:
conv_val.db.init_db.assert_called_once_with(context['model'])
else:
self.assertEquals(0, conv_val.db.init_db.call_count)
conv_val.db.init_db.assert_called_once_with(context['model'])

View File

@ -0,0 +1,37 @@
import unittest
import ckanext.privatedatasets.db as db
from mock import MagicMock
class DBTest(unittest.TestCase):
def setUp(self):
# Create mocks
self._sa = db.sa
db.sa = MagicMock()
def tearDown(self):
db.sa = self._sa
db.AllowedUser = None
def test_initdb_not_initialized(self):
# Call the function
model = MagicMock()
db.init_db(model)
# Assert that table method has been called
db.sa.Table.assert_called_once()
model.meta.mapper.assert_called_once()
def test_initdb_initialized(self):
db.AllowedUser = MagicMock()
# Call the function
model = MagicMock()
db.init_db(model)
# Assert that table method has been called
self.assertEquals(0, db.sa.Table.call_count)
self.assertEquals(0, model.meta.mapper.call_count)

View File

@ -24,20 +24,15 @@ class HelpersTest(unittest.TestCase):
helpers.db = self._db
@parameterized.expand([
(False, False, 'user', False),
(True, False, 'user', True),
(False, True, 'user', False),
(True, True, 'user', True),
(False, False, None, False),
(True, False, None, False),
(False, True, None, False),
(True, True, None, False),
(False, 'user', False),
(True, 'user', True),
(False, None, False),
(True, None, False),
])
def test_is_adquired(self, db_auth, table_initialized, user, adquired):
def test_is_adquired(self, db_auth, user, adquired):
# Configure test
helpers.tk.c.user = user
pkg_dict = {'id': 'package_id'}
helpers.db.package_allowed_users_table = None if not table_initialized else MagicMock()
db_response = []
if db_auth is True:
@ -51,10 +46,8 @@ class HelpersTest(unittest.TestCase):
# Check the function returns the expected result
self.assertEquals(adquired, helpers.is_adquired(pkg_dict))
if not table_initialized:
helpers.db.init_db.assert_called_once_with(helpers.model)
else:
self.assertEquals(0, helpers.db.init_db.call_count)
# Check that the database has been initialized properly
helpers.db.init_db.assert_called_once_with(helpers.model)
@parameterized.expand([
(1, 1, True),