Merge pull request #47 from aarranz/feature/support-ckan-2.8

Add support for CKAN 2.8
This commit is contained in:
Álvaro Arranz 2018-07-16 11:39:06 +02:00 committed by GitHub
commit 1ab4484f66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 588 additions and 239 deletions

View File

@ -9,7 +9,12 @@ services:
- redis-server
- postgresql
addons:
firefox: "46.0"
firefox: "60.1.0esr"
before_install:
- wget https://github.com/mozilla/geckodriver/releases/download/v0.21.0/geckodriver-v0.21.0-linux64.tar.gz
- mkdir geckodriver
- tar -xzf geckodriver-v0.21.0-linux64.tar.gz -C geckodriver
- export PATH=$PATH:$PWD/geckodriver
install:
- bash bin/travis-build.bash
before_script:

View File

@ -1,2 +1,3 @@
recursive-include ckanext/privatedatasets/templates *
recursive-include ckanext/privatedatasets/fanstatic *
recursive-include ckanext/privatedatasets/templates_2.8 *
recursive-include ckanext/privatedatasets/fanstatic *

View File

@ -1,51 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2014 CoNWeT Lab., Universidad Politécnica de Madrid
# This file is part of CKAN Private Dataset Extension.
# CKAN Private Dataset Extension is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# CKAN Private Dataset Extension is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with CKAN Private Dataset Extension. If not, see <http://www.gnu.org/licenses/>.
import ckanext.privatedatasets.constants as constants
import ckan.lib.base as base
import ckan.model as model
import ckan.plugins as plugins
import logging
from ckan.common import _
log = logging.getLogger(__name__)
class AcquiredDatasetsControllerUI(base.BaseController):
def user_acquired_datasets(self):
c = plugins.toolkit.c
context = {
'model': model,
'session': model.Session,
'user': plugins.toolkit.c.user,
}
# Get user information
try:
c.user_dict = plugins.toolkit.get_action('user_show')(context.copy(), {'user_obj': c.userobj})
c.user_dict['acquired_datasets'] = plugins.toolkit.get_action(constants.ACQUISITIONS_LIST)(context.copy(), None)
except plugins.toolkit.ObjectNotFound:
plugins.toolkit.abort(404, _('User not found'))
except plugins.toolkit.NotAuthorized:
plugins.toolkit.abort(401, _('Not authorized to see this page'))
return plugins.toolkit.render('user/dashboard_acquired.html')

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2014 - 2017 CoNWeT Lab., Universidad Politécnica de Madrid
# Copyright (c) 2018 Future Internet Consulting and Development Solutions S.L.
# This file is part of CKAN Private Dataset Extension.
@ -17,15 +18,16 @@
# You should have received a copy of the GNU Affero General Public License
# along with CKAN Private Dataset Extension. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
from __future__ import absolute_import, unicode_literals
from ckan import model, plugins as p
from ckan.lib import search
from ckan.lib.plugins import DefaultPermissionLabels
from ckan.plugins import toolkit as tk
from flask import Blueprint
from ckanext.privatedatasets import auth, actions, constants, converters_validators as conv_val, db, helpers
from ckanext.privatedatasets.views import acquired_datasets
HIDDEN_FIELDS = [constants.ALLOWED_USERS, constants.SEARCHABLE]
@ -35,6 +37,7 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm, DefaultPermissio
p.implements(p.IDatasetForm)
p.implements(p.IAuthFunctions)
p.implements(p.IConfigurer)
p.implements(p.IBlueprint)
p.implements(p.IRoutes, inherit=True)
p.implements(p.IActions)
p.implements(p.IPackageController, inherit=True)
@ -128,23 +131,32 @@ class PrivateDatasets(p.SingletonPlugin, tk.DefaultDatasetForm, DefaultPermissio
def update_config(self, config):
# Add this plugin's templates dir to CKAN's extra_template_paths, so
# that CKAN will use this plugin's custom templates.
tk.add_template_directory(config, 'templates')
if p.toolkit.check_ckan_version(min_version='2.8'):
tk.add_template_directory(config, 'templates_2.8')
else:
tk.add_template_directory(config, 'templates')
# Register this plugin's fanstatic directory with CKAN.
tk.add_resource('fanstatic', 'privatedatasets')
tk.add_resource(b'fanstatic', b'privatedatasets')
######################################################################
############################## IROUTES ###############################
############################# IBLUEPRINT #############################
######################################################################
# Deprecated but Required for CKAN 2.7
def before_map(self, m):
# DataSet acquired notification
m.connect('user_acquired_datasets', '/dashboard/acquired', ckan_icon='shopping-cart',
controller='ckanext.privatedatasets.controllers.ui_controller:AcquiredDatasetsControllerUI',
action='user_acquired_datasets', conditions=dict(method=['GET']))
if p.toolkit.check_ckan_version(max_version='2.7.99'):
m.connect('user_acquired_datasets', '/dashboard/acquired', ckan_icon='shopping-cart',
controller='ckanext.privatedatasets.views:AcquiredDatasetsControllerUI',
action='acquired_datasets', conditions=dict(method=['GET']))
return m
def get_blueprint(self):
blueprint = Blueprint('privatedatasets', self.__module__)
if p.toolkit.check_ckan_version(min_version='2.8'):
blueprint.add_url_rule('/dashboard/acquired', 'acquired_datasets', acquired_datasets)
return blueprint
######################################################################
############################## IACTIONS ##############################
######################################################################

View File

@ -46,4 +46,4 @@
{% endblock %}
</div>
</article>
{% endblock %}
{% endblock %}

View File

@ -8,12 +8,12 @@
{% block primary_content_inner %}
<h2 class="hide-heading">{{ _('Acquired Datasets') }}</h2>
{% if c.user_dict.acquired_datasets %}
{% snippet 'snippets/package_list.html', packages=c.user_dict.acquired_datasets %}
{% if acquired_datasets %}
{% snippet 'snippets/package_list.html', packages=acquired_datasets %}
{% else %}
<p class="empty">
{{ _('You haven\'t acquired any datasets.') }}
{% link_for _('Acquire one now?'), controller='package', action='search' %}
</p>
{% endif %}
{% endblock %}
{% endblock %}

View File

@ -0,0 +1,107 @@
{% ckan_extends %}
{% block package_basic_fields_org %}
{% resource 'privatedatasets/allowed_users.js' %}
{# if we have a default group then this wants remembering #}
{% if data.group_id %}
<input type="hidden" name="groups__0__id" value="{{ data.group_id }}" />
{% endif %}
{% set dataset_is_draft = data.get('state', 'draft').startswith('draft') or data.get('state', 'none') == 'none' %}
{% set dataset_has_organization = data.owner_org or data.group_id %}
{% set organizations_available = h.organizations_available('create_dataset') %}
{% set user_is_sysadmin = h.check_access('sysadmin') %}
{% set show_organizations_selector = organizations_available and (user_is_sysadmin or dataset_is_draft) %}
{% set editing = 'id' in data %}
{% if show_organizations_selector and show_visibility_selector %}
<div>
{% endif %}
{% if show_organizations_selector %}
{% set existing_org = data.owner_org or data.group_id %}
<div class="control-group">
<label for="field-organizations" class="control-label">{{ _('Organization') }}</label>
<div class="controls">
<select id="field-organizations" name="owner_org" data-module="autocomplete">
{% if h.check_config_permission('create_unowned_dataset') %}
<option value="" {% if not selected_org and data.id %} selected="selected" {% endif %}>{{ _('No organization') }}</option>
{% endif %}
{% for organization in organizations_available %}
{# get out first org from users list only if there is not an existing org #}
{% set selected_org = (existing_org and existing_org == organization.id) or (not existing_org and not data.id and organization.id == organizations_available[0].id) %}
<option value="{{ organization.id }}" {% if selected_org %} selected="selected" {% endif %}>{{ organization.name }}</option>
{% endfor %}
</select>
</div>
</div>
{% endif %}
{% block package_metadata_fields_visibility %}
<div class="control-group">
<label for="field-private" class="control-label">{{ _('Visibility') }}</label>
<div class="controls">
<select id="field-private" name="private" data-module="allowed-users">
{% for option in [('True', _('Private')), ('False', _('Public'))] %}
<option value="{{ option[0] }}" {% if option[0] == data.private|trim %}selected="selected"{% endif %}>{{ option[1] }}</option>
{% endfor %}
</select>
<span class="info-block info-inline">
<i class="icon-info-sign fa fa-info-circle"></i>
{% trans %}
Private datasets can only be accessed by certain users, while public datasets can be accessed by anyone.
{% endtrans %}
</span>
</div>
</div>
{% endblock %}
{% block package_metadata_fields_protected %}
<div class="control-group">
<label for="field-searchable" class="control-label">{{ _('Searchable') }}</label>
<div class="controls">
<select id="field-searchable" name="searchable">
{% for option in [('True', _('True')), ('False', _('False'))] %}
<option value="{{ option[0] }}" {% if option[0] == data.searchable|trim %}selected="selected"{% endif %}>{{ option[1] }}</option>
{% endfor %}
</select>
<span class="info-block info-inline">
<i class="icon-info-sign fa fa-info-circle"></i>
{% trans %}
Searchable datasets can be searched by anyone, while not-searchable datasets can only be accessed by entering directly its URL.
{% endtrans %}
</span>
</div>
</div>
{% endblock %}
{% if show_organizations_selector and show_visibility_selector %}
</div>
{% endif %}
{% set users_attrs = {'data-module': 'autocomplete', 'data-module-tags': '', 'data-module-source': '/api/2/util/user/autocomplete?q=?'} %}
{{ form.input('allowed_users_str', label=_('Allowed Users'), id='field-allowed_users_str', placeholder=_('Allowed Users'), value=h.get_allowed_users_str(data.allowed_users), error=errors.custom_text, classes=['control-full'], attrs=users_attrs) }}
{% if editing and h.show_acquire_url_on_edit() or not editing and h.show_acquire_url_on_create() %}
{{ form.input('acquire_url', label=_('Acquire URL'), id='field-acquire_url', placeholder=_('http://example.com/acquire/'), value=data.acquire_url, error=errors.custom_text, classes=['control-medium']) }}
{% else %}
<input type="hidden" name="acquire_url" id="acquire_url" value="{{ data.acquire_url }}" />
{% endif %}
{% if data.id and h.check_access('package_delete', {'id': data.id}) and data.state != 'active' %}
<div class="control-group">
<label for="field-state" class="control-label">{{ _('State') }}</label>
<div class="controls">
<select id="field-state" name="state">
<option value="active" {% if data.get('state', 'none') == 'active' %} selected="selected" {% endif %}>{{ _('Active') }}</option>
<option value="deleted" {% if data.get('state', 'none') == 'deleted' %} selected="selected" {% endif %}>{{ _('Deleted') }}</option>
</select>
</div>
</div>
{% endif %}
{% endblock %}

View File

@ -0,0 +1,15 @@
{#
Displays a Get Access button to request access to a private dataset.
ulr_dest - target url
Example:
{% snippet 'snippets/acquire_button.html', url_dest=url %}
#}
<a href={{ url_dest }} class="btn btn-mini" target="_blank">
<i class="icon-shopping-cart fa fa-shopping-cart"></i>
{{ _('Acquire') }}
</a>

View File

@ -0,0 +1,82 @@
{#
Displays a single of dataset.
package - A package to display.
item_class - The class name to use on the list item.
hide_resources - If true hides the resources (default: false).
banner - If true displays a popular banner (default: false).
truncate - The length to trucate the description to (default: 180)
truncate_title - The length to truncate the title to (default: 80).
Example:
{% snippet 'snippets/package_item.html', package=c.datasets[0] %}
#}
{% set truncate = truncate or 180 %}
{% set truncate_title = truncate_title or 80 %}
{% set title = package.title or package.name %}
{% set notes = h.markdown_extract(package.notes, extract_length=truncate) %}
{% set acquired = h.is_dataset_acquired(package) %}
{% set owner = h.is_owner(package) %}
{% resource 'privatedatasets/custom.css' %}
<li class="{{ item_class or "dataset-item" }}">
{% block package_item_content %}
<div class="dataset-content">
<h3 class="dataset-heading">
{% if package.private and not h.can_read(package) %}
<span class="dataset-private label label-inverse">
<i class="icon-lock fa fa-lock"></i>
{{ _('Private') }}
</span>
{% endif %}
{% if acquired and not owner %}
<span class="dataset-private label label-acquired">
<i class="icon-shopping-cart fa fa-shopping-cart"></i>
{{ _('Acquired') }}
</span>
{% endif %}
{% if owner %}
<span class="dataset-private label label-owner">
<i class="icon-user fa fa-user"></i>
{{ _('Owner') }}
</span>
{% endif %}
<!-- Customizations Acquire Button -->
{% if package.private and not h.can_read(package) %}
{{ _(h.truncate(title, truncate_title)) }}
<div class="divider"/>
{{ h.acquire_button(package) }}
{% else %}
{{ h.link_to(h.truncate(title, truncate_title), h.url_for(controller='package', action='read', id=package.name)) }}
{% endif %}
<!-- End of customizations Acquire Button -->
{% if package.get('state', '').startswith('draft') %}
<span class="label label-info">{{ _('Draft') }}</span>
{% elif package.get('state', '').startswith('deleted') %}
<span class="label label-important">{{ _('Deleted') }}</span>
{% endif %}
{{ h.popular('recent views', package.tracking_summary.recent, min=10) if package.tracking_summary }}
</h3>
{% if banner %}
<span class="banner">{{ _('Popular') }}</span>
{% endif %}
{% if notes %}
<div>{{ notes|urlize }}</div>
{% endif %}
</div>
{% if package.resources and not hide_resources %}
<ul class="dataset-resources unstyled">
{% for resource in h.dict_list_reduce(package.resources, 'format') %}
<li>
<a href="{{ h.url_for(controller='package', action='read', id=package.name) }}" class="label" data-format="{{ resource.lower() }}">{{ resource }}</a>
</li>
{% endfor %}
</ul>
{% endif %}
{% endblock %}
</li>

View File

@ -0,0 +1,49 @@
{% extends "user/edit_base.html" %}
{% set user = g.userobj %}
{% block breadcrumb_content %}
<li class="active"><a href="{{ h.url_for('dashboard.index') }}">{{ _('Dashboard') }}</a></li>
{% endblock %}
{% block secondary %}{% endblock %}
{% block primary %}
<article class="module">
{% block page_header %}
<header class="module-content page-header hug">
<div class="content_action">
{% link_for _('Edit settings'), named_route='user.edit', id=user.name, class_='btn btn-default', icon='cog' %}
</div>
<ul class="nav nav-tabs">
{{ h.build_nav_icon('dashboard.index', _('News feed')) }}
{{ h.build_nav_icon('dashboard.datasets', _('My Datasets')) }}
{{ h.build_nav_icon('privatedatasets.acquired_datasets', _('Acquired Datasets')) }}
{{ h.build_nav_icon('dashboard.organizations', _('My Organizations')) }}
{{ h.build_nav_icon('dashboard.groups', _('My Groups')) }}
</ul>
</header>
{% endblock %}
<div class="module-content">
{% if self.page_primary_action() | trim %}
<div class="page_primary_action">
{% block page_primary_action %}{% endblock %}
</div>
{% endif %}
{% block primary_content_inner %}
<div data-module="dashboard">
{% snippet 'user/snippets/followee_dropdown.html', context=dashboard_activity_stream_context, followees=followee_list %}
<h2 class="page-heading">
{% block page_heading %}
{{ _('News feed') }}
{% endblock %}
<small>{{ _("Activity from items that I'm following") }}</small>
</h2>
{% block activity_stream %}
{{ dashboard_activity_stream|safe }}
{% endblock %}
</div>
{% endblock %}
</div>
</article>
{% endblock %}

View File

@ -0,0 +1,19 @@
{% extends "user/dashboard.html" %}
{% block dashboard_activity_stream_context %}{% endblock %}
{% block page_primary_action %}
{% link_for _('Acquire Dataset'), controller='package', action='search', class_="btn btn-primary", icon="shopping-cart" %}
{% endblock %}
{% block primary_content_inner %}
<h2 class="hide-heading">{{ _('Acquired Datasets') }}</h2>
{% if acquired_datasets %}
{% snippet 'snippets/package_list.html', packages=acquired_datasets %}
{% else %}
<p class="empty">
{{ _('You haven\'t acquired any datasets.') }}
{% link_for _('Acquire one now?'), controller='package', action='search' %}
</p>
{% endif %}
{% endblock %}

View File

@ -1,110 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2014 CoNWeT Lab., Universidad Politécnica de Madrid
# This file is part of CKAN Private Dataset Extension.
# CKAN Private Dataset Extension is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# CKAN Private Dataset Extension is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with CKAN Private Dataset Extension. If not, see <http://www.gnu.org/licenses/>.
import ckanext.privatedatasets.controllers.ui_controller as controller
import unittest
from mock import MagicMock, ANY
from parameterized import parameterized
class UIControllerTest(unittest.TestCase):
def setUp(self):
# Get the instance
self.instanceUI = controller.AcquiredDatasetsControllerUI()
# Load the mocks
self._plugins = controller.plugins
controller.plugins = MagicMock()
self._model = controller.model
controller.model = MagicMock()
# Set exceptions
controller.plugins.toolkit.ObjectNotFound = self._plugins.toolkit.ObjectNotFound
controller.plugins.toolkit.NotAuthorized = self._plugins.toolkit.NotAuthorized
def tearDown(self):
# Unmock
controller.plugins = self._plugins
controller.model = self._model
@parameterized.expand([
(controller.plugins.toolkit.ObjectNotFound, 404),
(controller.plugins.toolkit.NotAuthorized, 401)
])
def test_exceptions_loading_users(self, exception, expected_status):
# Configure the mock
user_show = MagicMock(side_effect=exception)
controller.plugins.toolkit.get_action = MagicMock(return_value=user_show)
# Call the function
self.instanceUI.user_acquired_datasets()
# Assertations
expected_context = {
'model': controller.model,
'session': controller.model.Session,
'user': controller.plugins.toolkit.c.user
}
user_show.assert_called_once_with(expected_context, {'user_obj': controller.plugins.toolkit.c.userobj})
controller.plugins.toolkit.abort.assert_called_once_with(expected_status, ANY)
def test_no_error_loading_users(self):
user = 'example_user_test'
controller.plugins.toolkit.c.user = user
# actions
default_user = {'user_name': 'test', 'another_val': 'example value'}
user_show = MagicMock(return_value=default_user)
acquisitions_list = MagicMock()
def _get_action(action):
if action == 'user_show':
return user_show
else:
return acquisitions_list
controller.plugins.toolkit.get_action = MagicMock(side_effect=_get_action)
# Call the function
returned = self.instanceUI.user_acquired_datasets()
# User_show called correctly
expected_context = {
'model': controller.model,
'session': controller.model.Session,
'user': controller.plugins.toolkit.c.user
}
user_show.assert_called_once_with(expected_context, {'user_obj': controller.plugins.toolkit.c.userobj})
# Query called correctry
expected_user = default_user.copy()
expected_user['acquired_datasets'] = acquisitions_list.return_value
acquisitions_list.assert_called_with(expected_context, None)
self.assertEquals(expected_user, controller.plugins.toolkit.c.user_dict)
# Check that the render method has been called and that its result has been returned
self.assertEquals(controller.plugins.toolkit.render.return_value, returned)
controller.plugins.toolkit.render.assert_called_once_with('user/dashboard_acquired.html')

View File

@ -19,11 +19,13 @@
import unittest
import copy
import ckanext.privatedatasets.plugin as plugin
from flask import Blueprint
from mock import MagicMock
from parameterized import parameterized
import ckanext.privatedatasets.plugin as plugin
class PluginTest(unittest.TestCase):
@ -51,7 +53,7 @@ class PluginTest(unittest.TestCase):
(plugin.p.IDatasetForm,),
(plugin.p.IAuthFunctions,),
(plugin.p.IConfigurer,),
(plugin.p.IRoutes,),
(plugin.p.IBlueprint,),
(plugin.p.IActions,),
(plugin.p.IPackageController,),
(plugin.p.ITemplateHelpers,)
@ -83,18 +85,15 @@ class PluginTest(unittest.TestCase):
self.privateDatasets.update_config(config)
# Test that functions are called as expected
plugin.tk.add_template_directory.assert_called_once_with(config, 'templates')
if self._tk.check_ckan_version(min_version='2.8'):
plugin.tk.add_template_directory.assert_called_once_with(config, 'templates_2.8')
else:
plugin.tk.add_template_directory.assert_called_once_with(config, 'templates')
plugin.tk.add_resource('fanstatic', 'privatedatasets')
def test_map(self):
def test_get_blueprint(self):
# Call the method
m = MagicMock()
self.privateDatasets.before_map(m)
# Test that the connect method has been called
m.connect.assert_any_call('user_acquired_datasets', '/dashboard/acquired', ckan_icon='shopping-cart',
controller='ckanext.privatedatasets.controllers.ui_controller:AcquiredDatasetsControllerUI',
action='user_acquired_datasets', conditions=dict(method=['GET']))
self.assertIsInstance(self.privateDatasets.get_blueprint(), Blueprint)
@parameterized.expand([
('package_acquired', plugin.actions.package_acquired),

View File

@ -25,15 +25,18 @@ import os
import unittest
import re
from subprocess import Popen
import time
import ckan.lib.search.index as search_index
import ckan.model as model
from parameterized import parameterized
import requests
from selenium import webdriver
from selenium.common.exceptions import NoAlertPresentException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select, WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import Select, WebDriverWait
import ckanext.privatedatasets.db as db
@ -46,13 +49,20 @@ class TestSelenium(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Run CKAN
env = os.environ.copy()
env['DEBUG'] = 'True'
env['DEBUG'] = 'False'
cls._process = Popen(['paster', 'serve', 'test.ini'], env=env)
# Init Selenium
cls.driver = webdriver.Firefox()
cls.base_url = 'http://localhost:5000/'
cls.driver.set_window_size(1024, 768)
@classmethod
def tearDownClass(cls):
cls._process.terminate()
cls.driver.quit()
def clearBBDD(self):
# Clean Solr
@ -71,19 +81,17 @@ class TestSelenium(unittest.TestCase):
def setUp(self):
self.clearBBDD()
if 'WEB_DRIVER_URL' in os.environ and 'CKAN_SERVER_URL' in os.environ:
self.driver = webdriver.Remote(os.environ['WEB_DRIVER_URL'], webdriver.DesiredCapabilities.FIREFOX.copy())
self.base_url = os.environ['CKAN_SERVER_URL']
else:
self.driver = webdriver.Firefox()
self.base_url = 'http://localhost:5000/'
self.driver.set_window_size(1024, 768)
def tearDown(self):
self.driver.get(self.base_url)
try: # pragma: no cover
# Accept any "Are you sure to leave?" alert
self.driver.switch_to.alert.accept()
self.driver.switch_to.default_content()
except NoAlertPresentException:
pass
WebDriverWait(self.driver, 10).until(lambda driver: self.base_url == driver.current_url)
self.driver.delete_all_cookies()
self.clearBBDD()
self.driver.quit()
def assert_fields_disabled(self, fields):
for field in fields:
@ -93,7 +101,7 @@ class TestSelenium(unittest.TestCase):
self.driver.delete_all_cookies()
self.driver.get(self.base_url)
def register(self, username, fullname, mail, password):
def register(self, username, fullname, mail):
driver = self.driver
driver.get(self.base_url)
driver.find_element_by_link_text('Register').click()
@ -104,13 +112,13 @@ class TestSelenium(unittest.TestCase):
driver.find_element_by_id('field-email').clear()
driver.find_element_by_id('field-email').send_keys(mail)
driver.find_element_by_id('field-password').clear()
driver.find_element_by_id('field-password').send_keys(password)
driver.find_element_by_id('field-password').send_keys("1234" + username)
driver.find_element_by_id('field-confirm-password').clear()
driver.find_element_by_id('field-confirm-password').send_keys(password)
driver.find_element_by_id('field-confirm-password').send_keys("1234" + username)
driver.find_element_by_name('save').click()
self.logout()
def login(self, username, password):
def login(self, username):
driver = self.driver
driver.get(self.base_url)
login_btn = WebDriverWait(driver, 15).until(
@ -121,7 +129,7 @@ class TestSelenium(unittest.TestCase):
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, "field-login"))).clear()
driver.find_element_by_id('field-login').send_keys(username)
driver.find_element_by_id('field-password').clear()
driver.find_element_by_id('field-password').send_keys(password)
driver.find_element_by_id('field-password').send_keys("1234" + username)
driver.find_element_by_id('field-remember').click()
driver.find_element_by_css_selector('button.btn.btn-primary').click()
@ -130,7 +138,12 @@ class TestSelenium(unittest.TestCase):
driver.get(self.base_url)
WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.LINK_TEXT, 'Organizations'))).click()
WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.LINK_TEXT, 'Add Organization'))).click()
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, 'field-name'))).clear()
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, 'field-name')))
# Wait a bit to let ckan add javascript hooks
time.sleep(0.2)
driver.find_element_by_id('field-name').clear()
driver.find_element_by_id('field-name').send_keys(name)
driver.find_element_by_id('field-description').clear()
driver.find_element_by_id('field-description').send_keys(description)
@ -140,19 +153,25 @@ class TestSelenium(unittest.TestCase):
WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.LINK_TEXT, 'Manage'))).click()
WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.LINK_TEXT, 'Members'))).click()
for user in users:
driver.find_element_by_link_text('Add Member').click()
driver.find_element_by_id('username').send_keys(user)
WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.LINK_TEXT, 'Add Member'))).click()
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, "s2id_autogen1"))).send_keys(user + Keys.RETURN)
driver.find_element_by_name('submit').click()
def fill_ds_general_info(self, name, description, tags, private, searchable, allowed_users, acquire_url):
# FIRST PAGE: Dataset properties
driver = self.driver
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, "field-title"))).clear()
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, "field-title")))
# Wait a bit to let ckan add javascript hooks
time.sleep(0.2)
driver.find_element_by_id('field-title').clear()
driver.find_element_by_id('field-title').send_keys(name)
driver.find_element_by_id('field-notes').clear()
driver.find_element_by_id('field-notes').send_keys(description)
driver.find_element_by_id('field-tags').clear()
driver.find_element_by_id('field-tags').send_keys(','.join(tags))
# field-tags
for tag in tags:
driver.find_element_by_id('s2id_autogen1').send_keys(tag + Keys.RETURN)
Select(driver.find_element_by_id('field-private')).select_by_visible_text('Private' if private else 'Public')
# WARN: The organization is set by default
@ -160,8 +179,9 @@ class TestSelenium(unittest.TestCase):
# If the dataset is public, these fields will be disabled (we'll check it)
if private:
Select(driver.find_element_by_id('field-searchable')).select_by_visible_text('True' if searchable else 'False')
driver.find_element_by_id('field-allowed_users_str').clear()
driver.find_element_by_id('field-allowed_users_str').send_keys(','.join(allowed_users))
# field-allowed_users
for user in allowed_users:
driver.find_element_by_css_selector('#s2id_field-allowed_users_str .select2-input').send_keys(user + Keys.RETURN)
driver.find_element_by_id('field-acquire_url').clear()
if acquire_url:
driver.find_element_by_id('field-acquire_url').send_keys(acquire_url)
@ -180,10 +200,13 @@ class TestSelenium(unittest.TestCase):
# SECOND PAGE: Add Resources
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, "field-name")))
# Wait a bit to let ckan add javascript hooks
time.sleep(0.2)
try:
# The link button is only clicked if it's present
driver.find_element_by_link_text('Link').click()
except Exception:
except Exception: # pragma: no cover
pass
driver.find_element_by_id('field-image-url').clear()
@ -192,8 +215,7 @@ class TestSelenium(unittest.TestCase):
driver.find_element_by_id('field-name').send_keys(resource_name)
driver.find_element_by_id('field-description').clear()
driver.find_element_by_id('field-description').send_keys(resource_description)
driver.find_element_by_id('s2id_autogen1').clear()
driver.find_element_by_id('s2id_autogen1').send_keys(resource_format + '\n')
driver.find_element_by_id('s2id_autogen1').send_keys(resource_format + Keys.RETURN)
driver.find_element_by_css_selector('button.btn.btn-primary').click()
def modify_ds(self, url, name, description, tags, private, searchable, allowed_users, acquire_url):
@ -218,7 +240,7 @@ class TestSelenium(unittest.TestCase):
# if len(current_users) == 1 and current_users[0] == '':
# current_users = []
# Check the array
self.assertEquals(len(allowed_users), len(current_users))
self.assertEqual(len(allowed_users), len(current_users))
for user in current_users:
self.assertIn(user, allowed_users)
else:
@ -242,7 +264,7 @@ class TestSelenium(unittest.TestCase):
# When a user cannot access a dataset, the link is no longer provided
else:
# If the dataset is not searchable and the user is not the owner, a link to it could not be found in the dataset search page
self.assertEquals(None, re.search(dataset_url, driver.page_source))
self.assertEqual(None, re.search(dataset_url, driver.page_source))
# Access the dataset
driver.get(self.base_url + 'dataset/' + dataset_url)
@ -252,30 +274,30 @@ class TestSelenium(unittest.TestCase):
xpath = '//*[@id="content"]/div[2]/article/div/h1'
msg = '404 Not Found'
self.assertEquals(driver.find_element_by_xpath(xpath).text, msg)
self.assertEqual(driver.find_element_by_xpath(xpath).text, msg)
else:
self.assertEquals(self.base_url + 'dataset/%s' % dataset_url, driver.current_url)
self.assertEqual(self.base_url + 'dataset/%s' % dataset_url, driver.current_url)
def check_acquired(self, dataset, dataset_url, acquired, private):
driver = self.driver
driver.get(self.base_url + 'dashboard')
driver.find_element_by_link_text('Acquired Datasets').click()
WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.LINK_TEXT, 'Acquired Datasets'))).click()
if acquired and private:
# This message could not be shown when the user has acquired at least one dataset
self.assertEquals(None, re.search('You haven\'t acquired any datasets.', driver.page_source))
self.assertEqual(None, re.search('You haven\'t acquired any datasets.', driver.page_source))
# Access the dataset
driver.find_element_by_link_text(dataset).click()
self.assertEquals(self.base_url + 'dataset/%s' % dataset_url, driver.current_url)
self.assertEqual(self.base_url + 'dataset/%s' % dataset_url, driver.current_url)
else:
# If the user has not acquired the dataset, a link to this dataset could not be in the acquired dataset list
self.assertEquals(None, re.search(dataset_url, driver.page_source))
self.assertEqual(None, re.search(dataset_url, driver.page_source))
# When a user has not acquired any dataset, a message will be shown to inform the user
self.assertNotEquals(None, re.search('You haven\'t acquired any datasets.', driver.page_source))
def default_register(self, user):
self.register(user, user, '%s@conwet.com' % user, user)
self.register(user, user, '%s@conwet.com' % user)
@parameterized.expand([
(['user1', 'user2', 'user3'], True, True, [], 'http://store.conwet.com/'),
@ -293,7 +315,7 @@ class TestSelenium(unittest.TestCase):
self.default_register(user)
# The first user creates a dataset
self.login(users[0], users[0])
self.login(users[0])
pkg_name = 'Dataset 1'
url = get_dataset_url(pkg_name)
self.create_ds(pkg_name, 'Example description', ['tag1', 'tag2', 'tag3'], private, searchable,
@ -308,7 +330,7 @@ class TestSelenium(unittest.TestCase):
rest_users = users[1:]
for user in rest_users:
self.logout()
self.login(user, user)
self.login(user)
acquired = user in allowed_users
self.check_user_access(pkg_name, url, False, acquired, False, private, searchable, acquire_url)
@ -336,7 +358,7 @@ class TestSelenium(unittest.TestCase):
self.default_register(user)
# Create the dataset
self.login(user, user)
self.login(user)
pkg_name = 'Dataset 2'
# Go the page to create the dataset
@ -350,7 +372,7 @@ class TestSelenium(unittest.TestCase):
# Check the error message
msg_error = WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.XPATH, '//div[@id=\'content\']/div[3]/div/section/div/form/div/ul/li'))).text
self.assertEquals(expected_msg, msg_error)
self.assertEqual(expected_msg, msg_error)
@parameterized.expand([
('Acquire Dataset', 'dataset'),
@ -360,13 +382,13 @@ class TestSelenium(unittest.TestCase):
# Create a default user
user = 'user1'
self.default_register(user)
self.login(user, user)
self.login(user)
# Enter the acquired dataset tab
driver = self.driver
driver.get(self.base_url + 'dashboard/acquired')
WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.LINK_TEXT, link))).click()
self.assertEquals(self.base_url + 'dataset', self.base_url + expected_url)
self.assertEqual(self.base_url + 'dataset', self.base_url + expected_url)
@parameterized.expand([
@ -402,7 +424,7 @@ class TestSelenium(unittest.TestCase):
# Create a default user
user = 'user1'
self.default_register(user)
self.login(user, user)
self.login(user)
acquire_url = 'http://upm.es'
dataset_default_name = 'Dataset %d'
@ -458,7 +480,7 @@ class TestSelenium(unittest.TestCase):
for user in users:
self.default_register(user)
self.login(users[0], users[0])
self.login(users[0])
# Create the organizations
for org in orgs:
@ -477,7 +499,7 @@ class TestSelenium(unittest.TestCase):
rest_users = users[1:]
for user in rest_users:
self.logout()
self.login(user, user)
self.login(user)
acquired = user in adquiring_users
in_org = user in orgs[0]['users']
self.check_user_access(pkg_name, url, False, acquired, in_org, private, searchable, acquire_url)
@ -492,12 +514,12 @@ class TestSelenium(unittest.TestCase):
self.default_register(user)
# The user creates a dataset
self.login(user, user)
self.login(user)
pkg_name = 'Dataset 1'
description = 'Example Description'
tags = ['tag1', 'tag2', 'tag3']
url = get_dataset_url(pkg_name)
self.create_ds(pkg_name, 'Example description', ['tag1', 'tag2', 'tag3'], True, True,
self.create_ds(pkg_name, 'Example description', [], True, True,
[], 'http://example.com', 'http://upm.es', 'UPM Main', 'Example Description', 'CSV')
self.modify_ds(url, pkg_name, description, tags, False, None, None, None)

View File

@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2014 CoNWeT Lab., Universidad Politécnica de Madrid
# Copyright (c) 2018 Future Internet Consulting and Development Solutions S.L.
# This file is part of CKAN Private Dataset Extension.
# CKAN Private Dataset Extension is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# CKAN Private Dataset Extension is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with CKAN Private Dataset Extension. If not, see <http://www.gnu.org/licenses/>.
import unittest
from mock import ANY, DEFAULT, MagicMock, patch
from parameterized import parameterized
from ckanext.privatedatasets import views
class ViewsTest(unittest.TestCase):
@parameterized.expand([
('NotFound', 404),
('NotAuthorized', 403),
])
@patch.multiple("ckanext.privatedatasets.views", base=DEFAULT, toolkit=DEFAULT, model=DEFAULT, g=DEFAULT, logic=DEFAULT, _=DEFAULT)
def test_exceptions_loading_users(self, exception, expected_status, base, toolkit, model, g, logic, _):
# Configure the mocks
setattr(logic, exception, ValueError)
toolkit.get_action().side_effect = getattr(logic, exception)
base.abort.side_effect = TypeError
# Call the function
with self.assertRaises(TypeError):
views.acquired_datasets()
# Assertations
expected_context = {
'auth_user_obj': g.userobj,
'for_view': True,
'model': model,
'session': model.Session,
'user': g.user,
}
toolkit.get_action().assert_called_once_with(expected_context, {'user_obj': g.userobj})
base.abort.assert_called_once_with(expected_status, ANY)
@patch.multiple("ckanext.privatedatasets.views", base=DEFAULT, toolkit=DEFAULT, model=DEFAULT, g=DEFAULT, logic=DEFAULT)
def test_no_error_loading_users(self, base, toolkit, model, g, logic):
# actions
default_user = {'user_name': 'test', 'another_val': 'example value'}
user_show = MagicMock(return_value=default_user)
acquisitions_list = MagicMock()
toolkit.get_action = MagicMock(side_effect=lambda action: user_show if action == 'user_show' else acquisitions_list)
# Call the function
returned = views.acquired_datasets()
# User_show called correctly
expected_context = {
'auth_user_obj': g.userobj,
'for_view': True,
'model': model,
'session': model.Session,
'user': g.user,
}
user_show.assert_called_once_with(expected_context, {'user_obj': g.userobj})
acquisitions_list.assert_called_with(expected_context, None)
# Check that the render method has been called
base.render.assert_called_once_with('user/dashboard_acquired.html', {'user_dict': default_user, 'acquired_datasets': acquisitions_list()})
self.assertEqual(returned, base.render())
@patch("ckanext.privatedatasets.views.acquired_datasets")
def test_there_is_a_controller_for_ckan_27(self, acquired_datasets):
controller = views.AcquiredDatasetsControllerUI()
response = controller.acquired_datasets()
acquired_datasets.assert_called_once_with()
self.assertEqual(response, acquired_datasets())

View File

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2018 Future Internet Consulting and Development Solutions S.L.
# This file is part of CKAN Private Dataset Extension.
# CKAN Private Dataset Extension is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# CKAN Private Dataset Extension is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with CKAN Private Dataset Extension. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import, unicode_literals
from ckan import logic, model
from ckan.common import _, g
from ckan.lib import base
from ckan.plugins import toolkit
from ckanext.privatedatasets import constants
def acquired_datasets():
context = {'auth_user_obj': g.userobj, 'for_view': True, 'model': model, 'session': model.Session, 'user': g.user}
data_dict = {'user_obj': g.userobj}
try:
user_dict = toolkit.get_action('user_show')(context, data_dict)
acquired_datasets = toolkit.get_action(constants.ACQUISITIONS_LIST)(context, None)
except logic.NotFound:
base.abort(404, _('User not found'))
except logic.NotAuthorized:
base.abort(403, _('Not authorized to see this page'))
extra_vars = {
'user_dict': user_dict,
'acquired_datasets': acquired_datasets,
}
return base.render('user/dashboard_acquired.html', extra_vars)
class AcquiredDatasetsControllerUI(base.BaseController):
def acquired_datasets(self):
return acquired_datasets()

View File

@ -48,7 +48,7 @@ setup(
],
tests_require=[
'parameterized',
'selenium==2.52.0'
'selenium==3.13.0'
],
test_suite='nosetests',
entry_points='''

View File

@ -6,14 +6,27 @@ port = 5000
[app:main]
use = config:./ckan/test-core.ini
ckan.site_id = ckanext.privatedatasets.test
ckan.site_url = http://localhost:5000
package_new_return_url = http://localhost:5000/dataset/<NAME>?test=new
package_edit_return_url = http://localhost:5000/dataset/<NAME>?test=edit
ckan.cache_validation_enabled = True
ckan.cache_enabled = False
ckan.tests.functional.test_cache.expires = 1800
ckan.tests.functional.test_cache.TestCacheBasics.test_get_cache_expires.expires = 3600
# Disable test css as we aren't using it and creates requests to URLs
# raising ValidationErrors. See #46
ckan.template_head_end = <-- template_head_end -->
ckan.template_head_end = <!-- template_head_end -->
ckan.legacy_templates = no
ckan.plugins = privatedatasets
ckan.auth.create_unowned_dataset = true
ckan.auth.create_dataset_if_not_in_organization = true
ckan.auth.user_create_groups = true
ckan.auth.user_create_organizations = true
ckan.privatedatasets.parser = ckanext.privatedatasets.parsers.fiware:FiWareNotificationParser
ckan.privatedatasets.show_acquire_url_on_create = True
ckan.privatedatasets.show_acquire_url_on_edit = True
@ -23,3 +36,43 @@ ckan.datastore.write_url = postgresql://ckan_default:pass@127.0.0.1:5432/datasto
ckan.datastore.read_url = postgresql://datastore_default:pass@127.0.0.1:5432/datastore_test
ckan.storage_path=data/storage
# Logging configuration
[loggers]
keys = root, ckan, ckanext, sqlalchemy
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_ckan]
qualname = ckan
handlers = console
level = INFO
propagate = 0
[logger_ckanext]
qualname = ckanext
handlers = console
level = DEBUG
propagate = 0
[logger_sqlalchemy]
handlers = console
qualname = sqlalchemy.engine
level = WARN
[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s