Fix tests, improve Travis setup

* Run pytest tests only
* Fix py2/py3 compat issues, factoryboy definitions
* Run tests on focal
* Use latest supported pg version for each ckan version
* Use dockerized Solr
This commit is contained in:
amercader 2020-08-31 21:02:35 +02:00
parent e49452457c
commit 6c78efcb6b
23 changed files with 110 additions and 3412 deletions

View File

@ -1,14 +1,14 @@
os: linux
dist: focal
language: python
services:
- docker
- redis
- postgresql
install: bash bin/travis-build.bash
script: bash bin/travis-run.bash
# the new trusty images of Travis cause build errors with psycopg2, see https://github.com/travis-ci/travis-ci/issues/8897
dist: trusty
group: deprecated-2017Q4
stages:
- Flake8
- Tests
@ -16,7 +16,7 @@ stages:
jobs:
include:
- stage: Flake8
python: 2.7
python: 3.6
env: FLAKE8=True
install:
- pip install flake8==3.5.0
@ -27,17 +27,60 @@ jobs:
- flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics --exclude ckan
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
- flake8 . --count --max-line-length=127 --statistics --exclude ckan --exit-zero
- stage: Tests
python: "2.7"
python: "3.6"
env: CKANVERSION=master
services:
- postgresql
- redis
- docker
- python: "2.7"
env: CKANVERSION=2.9
services:
- postgresql
- redis
- docker
- python: "3.6"
env: CKANVERSION=master
env: CKANVERSION=2.9
services:
- postgresql
- redis
- docker
- python: "2.7"
env: CKANVERSION=2.8
addons:
postgresql: '11'
apt:
sources:
- sourceline: 'deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main'
packages:
- postgresql-11
- python: "2.7"
env: CKANVERSION=2.7
addons:
postgresql: '9.6'
apt:
sources:
- sourceline: 'deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main'
packages:
- postgresql-9.6
- python: "2.7"
env: CKANVERSION=2.6
addons:
postgresql: '9.6'
apt:
sources:
- sourceline: 'deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main'
packages:
- postgresql-9.6
cache:
directories:

View File

@ -12,11 +12,6 @@ fi
export PYTHON_MAJOR_VERSION=${TRAVIS_PYTHON_VERSION%.*}
echo "Installing the packages that CKAN requires..."
sudo apt-get update -qq
sudo apt-get install solr-jetty
echo "Installing CKAN and its Python dependencies..."
git clone https://github.com/ckan/ckan
cd ckan
@ -29,6 +24,7 @@ else
echo "CKAN version: ${CKAN_TAG#ckan-}"
fi
echo "Installing the recommended setuptools requirement"
if [ -f requirement-setuptools.txt ]
then
pip install -r requirement-setuptools.txt
@ -47,19 +43,26 @@ pip install -r dev-requirements.txt
cd -
echo "Setting up Solr..."
# solr is multicore for tests on ckan master now, but it's easier to run tests
# on Travis single-core still.
# see https://github.com/ckan/ckan/issues/2972
sed -i -e 's/solr_url.*/solr_url = http:\/\/127.0.0.1:8983\/solr/' ckan/test-core.ini
printf "NO_START=0\nJETTY_HOST=127.0.0.1\nJETTY_PORT=8983\nJAVA_HOME=$JAVA_HOME" | sudo tee /etc/default/jetty
sudo cp ckan/ckan/config/solr/schema.xml /etc/solr/conf/schema.xml
sudo service jetty restart
docker run --name ckan-solr -p 8983:8983 -d openknowledge/ckan-solr-dev:$CKANVERSION
echo "Setting up Postgres..."
export PG_VERSION="$(pg_lsclusters | grep online | awk '{print $1}')"
export PG_PORT="$(pg_lsclusters | grep online | awk '{print $3}')"
echo "Using Postgres $PGVERSION on port $PG_PORT"
if [ $PG_PORT != "5432" ]
then
echo "Using non-standard Postgres port, updating configuration..."
sed -i -e "s/postgresql:\/\/ckan_default:pass@localhost\/ckan_test/postgresql:\/\/ckan_default:pass@localhost:$PG_PORT\/ckan_test/" ckan/test-core.ini
sed -i -e "s/postgresql:\/\/ckan_default:pass@localhost\/datastore_test/postgresql:\/\/ckan_default:pass@localhost:$PG_PORT\/datastore_test/" ckan/test-core.ini
sed -i -e "s/postgresql:\/\/datastore_default:pass@localhost\/datastore_test/postgresql:\/\/datastore_default:pass@localhost:$PG_PORT\/datastore_test/" ckan/test-core.ini
fi
echo "Creating the PostgreSQL user and database..."
sudo -u postgres psql -c "CREATE USER ckan_default WITH PASSWORD 'pass';"
sudo -u postgres psql -c "CREATE USER datastore_default WITH PASSWORD 'pass';"
sudo -u postgres psql -c 'CREATE DATABASE ckan_test WITH OWNER ckan_default;'
sudo -u postgres psql -c 'CREATE DATABASE datastore_test WITH OWNER ckan_default;'
sudo -u postgres psql -p $PG_PORT -c "CREATE USER ckan_default WITH PASSWORD 'pass';"
sudo -u postgres psql -p $PG_PORT -c "CREATE USER datastore_default WITH PASSWORD 'pass';"
sudo -u postgres psql -p $PG_PORT -c 'CREATE DATABASE ckan_test WITH OWNER ckan_default;'
sudo -u postgres psql -p $PG_PORT -c 'CREATE DATABASE datastore_test WITH OWNER ckan_default;'
echo "Initialising the database..."
cd ckan
@ -81,9 +84,8 @@ python setup.py develop
echo "Moving test.ini into a subdir... (because the core ini file is referenced as ../ckan/test-core.ini)"
mkdir subdir
mv test.ini subdir
mv test-nose.ini subdir
echo "Setting up additional requirements..."
if (( $CKAN_MINOR_VERSION >= 9 ))
then
ckan -c subdir/test.ini harvester initdb

View File

@ -1,17 +1,4 @@
#!/bin/bash
set -e
if [ $CKANVERSION == 'master' ]
then
export CKAN_MINOR_VERSION=100
else
export CKAN_MINOR_VERSION=${CKANVERSION##*.}
fi
if (( $CKAN_MINOR_VERSION >= 9 ))
then
pytest --ckan-ini=subdir/test.ini --cov=ckanext.harvest ckanext/harvest/tests
else
nosetests --ckan --nologcapture --with-pylons=subdir/test-nose.ini --with-coverage --cover-package=ckanext.harvest --cover-inclusive --cover-erase --cover-tests ckanext/harvest/tests/nose
fi
pytest --ckan-ini=subdir/test.ini --cov=ckanext.harvest --disable-warnings ckanext/harvest/tests

View File

@ -2,6 +2,8 @@ from __future__ import print_function
import sys
import six
from ckan import model
from ckan.logic import get_action, ValidationError
@ -235,23 +237,23 @@ class Harvester(CkanCommand):
def create_harvest_source(self):
if len(self.args) >= 2:
name = unicode(self.args[1])
name = six.text_type(self.args[1])
else:
print("Please provide a source name")
sys.exit(1)
if len(self.args) >= 3:
url = unicode(self.args[2])
url = six.text_type(self.args[2])
else:
print("Please provide a source URL")
sys.exit(1)
if len(self.args) >= 4:
type = unicode(self.args[3])
type = six.text_type(self.args[3])
else:
print("Please provide a source type")
sys.exit(1)
if len(self.args) >= 5:
title = unicode(self.args[4])
title = six.text_type(self.args[4])
else:
title = None
if len(self.args) >= 6:
@ -261,17 +263,17 @@ class Harvester(CkanCommand):
else:
active = True
if len(self.args) >= 7:
owner_org = unicode(self.args[6])
owner_org = six.text_type(self.args[6])
else:
owner_org = None
if len(self.args) >= 8:
frequency = unicode(self.args[7])
frequency = six.text_type(self.args[7])
if not frequency:
frequency = "MANUAL"
else:
frequency = "MANUAL"
if len(self.args) >= 9:
source_config = unicode(self.args[8])
source_config = six.text_type(self.args[8])
else:
source_config = None
try:
@ -288,14 +290,14 @@ class Harvester(CkanCommand):
def clear_harvest_source_history(self):
source_id = None
if len(self.args) >= 2:
source_id = unicode(self.args[1])
source_id = six.text_type(self.args[1])
print(utils.clear_harvest_source_history(source_id))
def show_harvest_source(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
source_id_or_name = six.text_type(self.args[1])
else:
print("Please provide a source name")
sys.exit(1)
@ -303,7 +305,7 @@ class Harvester(CkanCommand):
def remove_harvest_source(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
source_id_or_name = six.text_type(self.args[1])
else:
print("Please provide a source id")
sys.exit(1)
@ -311,7 +313,7 @@ class Harvester(CkanCommand):
def clear_harvest_source(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
source_id_or_name = six.text_type(self.args[1])
else:
print("Please provide a source id")
sys.exit(1)
@ -327,7 +329,7 @@ class Harvester(CkanCommand):
def create_harvest_job(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
source_id_or_name = six.text_type(self.args[1])
else:
print("Please provide a source id")
sys.exit(1)
@ -338,7 +340,7 @@ class Harvester(CkanCommand):
def job_abort(self):
if len(self.args) >= 2:
job_or_source_id_or_name = unicode(self.args[1])
job_or_source_id_or_name = six.text_type(self.args[1])
else:
print("Please provide a job id or source name/id")
sys.exit(1)
@ -353,7 +355,7 @@ class Harvester(CkanCommand):
if len(self.args) >= 2:
if len(self.args) >= 3 and self.args[2].startswith('force-import='):
force_import = self.args[2].split('=')[-1]
source_id_or_name = unicode(self.args[1])
source_id_or_name = six.text_type(self.args[1])
else:
print("Please provide a source id")
sys.exit(1)
@ -363,7 +365,7 @@ class Harvester(CkanCommand):
def import_stage(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
source_id_or_name = six.text_type(self.args[1])
context = {
"model": model,
"session": model.Session,

View File

@ -5,7 +5,12 @@ from ckan.plugins import toolkit
class HarvestSource(factory.Factory):
FACTORY_FOR = harvest_model.HarvestSource
class Meta:
model = harvest_model.HarvestSource
_return_type = 'dict'
name = factory.Sequence(lambda n: 'test_source_{n}'.format(n=n))
@ -30,7 +35,7 @@ class HarvestSource(factory.Factory):
if cls._return_type == 'dict':
return source_dict
else:
return cls.FACTORY_FOR.get(source_dict['id'])
return harvest_model.HarvestSource.get(source_dict['id'])
class HarvestSourceObj(HarvestSource):
@ -38,7 +43,12 @@ class HarvestSourceObj(HarvestSource):
class HarvestJob(factory.Factory):
FACTORY_FOR = harvest_model.HarvestJob
class Meta:
model = harvest_model.HarvestJob
_return_type = 'dict'
source = factory.SubFactory(HarvestSourceObj)
@ -57,7 +67,7 @@ class HarvestJob(factory.Factory):
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
return harvest_model.HarvestJob.get(job_dict['id'])
class HarvestJobObj(HarvestJob):
@ -65,7 +75,12 @@ class HarvestJobObj(HarvestJob):
class HarvestObject(factory.Factory):
FACTORY_FOR = harvest_model.HarvestObject
class Meta:
model = harvest_model.HarvestObject
_return_type = 'dict'
# source = factory.SubFactory(HarvestSourceObj)
@ -87,7 +102,7 @@ class HarvestObject(factory.Factory):
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
return harvest_model.HarvestObject.get(job_dict['id'])
class HarvestObjectObj(HarvestObject):

View File

@ -1,94 +0,0 @@
import factory
import ckanext.harvest.model as harvest_model
from ckantoolkit.tests.factories import _get_action_user_name
from ckan.plugins import toolkit
class HarvestSource(factory.Factory):
FACTORY_FOR = harvest_model.HarvestSource
_return_type = 'dict'
name = factory.Sequence(lambda n: 'test_source_{n}'.format(n=n))
title = factory.Sequence(lambda n: 'test title {n}'.format(n=n))
url = factory.Sequence(lambda n: 'http://{n}.test.com'.format(n=n))
source_type = 'test-nose' # defined in test_queue.py
id = '{0}_id'.format(name).lower()
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
# If there is an existing source for this URL, and we can't create
# another source with that URL, just return the original one.
try:
source_dict = toolkit.get_action('harvest_source_show')(
context, dict(url=kwargs['url']))
except toolkit.ObjectNotFound:
source_dict = toolkit.get_action('harvest_source_create')(
context, kwargs)
if cls._return_type == 'dict':
return source_dict
else:
return cls.FACTORY_FOR.get(source_dict['id'])
class HarvestSourceObj(HarvestSource):
_return_type = 'obj'
class HarvestJob(factory.Factory):
FACTORY_FOR = harvest_model.HarvestJob
_return_type = 'dict'
source = factory.SubFactory(HarvestSourceObj)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
if 'source_id' not in kwargs:
kwargs['source_id'] = kwargs['source'].id
if 'run' not in kwargs:
kwargs['run'] = False
job_dict = toolkit.get_action('harvest_job_create')(
context, kwargs)
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
class HarvestJobObj(HarvestJob):
_return_type = 'obj'
class HarvestObject(factory.Factory):
FACTORY_FOR = harvest_model.HarvestObject
_return_type = 'dict'
# source = factory.SubFactory(HarvestSourceObj)
job = factory.SubFactory(HarvestJobObj)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
if 'job_id' not in kwargs:
kwargs['job_id'] = kwargs['job'].id
kwargs['source_id'] = kwargs['job'].source.id
# Remove 'job' to avoid it getting added as a HarvestObjectExtra
if 'job' in kwargs:
kwargs.pop('job')
job_dict = toolkit.get_action('harvest_object_create')(
context, kwargs)
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
class HarvestObjectObj(HarvestObject):
_return_type = 'obj'

View File

@ -1,543 +0,0 @@
from __future__ import print_function
import json
import re
import copy
import urllib
import SimpleHTTPServer
import SocketServer
from threading import Thread
PORT = 8998
class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def do_GET(self):
# test name is the first bit of the URL and makes CKAN behave
# differently in some way.
# Its value is recorded and then removed from the path
self.test_name = None
test_name_match = re.match('^/([^/]+)/', self.path)
if test_name_match:
self.test_name = test_name_match.groups()[0]
if self.test_name == 'api':
self.test_name = None
else:
self.path = re.sub('^/([^/]+)/', '/', self.path)
if self.test_name == 'site_down':
return self.respond('Site is down', status=500)
# The API version is recorded and then removed from the path
api_version = None
version_match = re.match(r'^/api/(\d)', self.path)
if version_match:
api_version = int(version_match.groups()[0])
self.path = re.sub(r'^/api/(\d)/', '/api/', self.path)
if self.path == '/api/rest/package':
if api_version == 2:
dataset_refs = [d['id'] for d in DATASETS]
else:
dataset_refs = [d['name'] for d in DATASETS]
return self.respond_json(dataset_refs)
if self.path == '/api/action/package_list':
dataset_names = [d['name'] for d in DATASETS]
return self.respond_action(dataset_names)
if self.path.startswith('/api/rest/package/'):
dataset_ref = self.path.split('/')[-1]
dataset = self.get_dataset(dataset_ref)
if dataset:
return self.respond_json(
convert_dataset_to_restful_form(dataset))
if self.path.startswith('/api/action/package_show'):
params = self.get_url_params()
dataset_ref = params['id']
dataset = self.get_dataset(dataset_ref)
if dataset:
return self.respond_action(dataset)
if self.path.startswith('/api/action/group_show'):
params = self.get_url_params()
group_ref = params['id']
group = self.get_group(group_ref)
if group:
return self.respond_action(group)
if self.path.startswith('/api/search/dataset'):
params = self.get_url_params()
if params.keys() == ['organization']:
org = self.get_org(params['organization'])
dataset_ids = [d['id'] for d in DATASETS
if d['owner_org'] == org['id']]
return self.respond_json({'count': len(dataset_ids),
'results': dataset_ids})
else:
return self.respond(
'Not implemented search params %s' % params, status=400)
if self.path.startswith('/api/search/revision'):
revision_ids = [r['id'] for r in REVISIONS]
return self.respond_json(revision_ids)
if self.path.startswith('/api/rest/revision/'):
revision_ref = self.path.split('/')[-1]
assert api_version == 2
for rev in REVISIONS:
if rev['id'] == revision_ref:
return self.respond_json(rev)
self.respond('Cannot find revision', status=404)
# /api/3/action/package_search?fq=metadata_modified:[2015-10-23T14:51:13.282361Z TO *]&rows=1000
if self.path.startswith('/api/action/package_search'):
params = self.get_url_params()
if self.test_name == 'datasets_added':
if params['start'] == '0':
# when page 1 is retrieved, the site only has 1 dataset
datasets = [DATASETS[0]['name']]
elif params['start'] == '100':
# when page 2 is retrieved, the site now has new datasets,
# and so the second page has the original dataset, pushed
# onto this page now, plus a new one
datasets = [DATASETS[0]['name'],
DATASETS[1]['name']]
else:
datasets = []
else:
# ignore sort param for now
if 'sort' in params:
del params['sort']
if params['start'] != '0':
datasets = []
elif set(params.keys()) == set(['rows', 'start']):
datasets = ['dataset1', DATASETS[1]['name']]
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
params['fq'] == '-organization:org1':
datasets = [DATASETS[1]['name']]
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
params['fq'] == 'organization:org1':
datasets = ['dataset1']
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
params['fq'] == '-groups:group1':
datasets = [DATASETS[1]['name']]
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
params['fq'] == 'groups:group1':
datasets = ['dataset1']
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
'metadata_modified' in params['fq']:
assert '+TO+' not in params['fq'], \
'Spaces should not be decoded by now - seeing + '\
'means they were double encoded and SOLR doesnt like '\
'that'
datasets = [DATASETS[1]['name']]
else:
return self.respond(
'Not implemented search params %s' % params,
status=400)
out = {'count': len(datasets),
'results': [self.get_dataset(dataset_ref_)
for dataset_ref_ in datasets]}
return self.respond_action(out)
# if we wanted to server a file from disk, then we'd call this:
# return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
self.respond('Mock CKAN doesnt recognize that call', status=400)
def get_dataset(self, dataset_ref):
for dataset in DATASETS:
if dataset['name'] == dataset_ref or \
dataset['id'] == dataset_ref:
if self.test_name == 'invalid_tag':
dataset['tags'] = INVALID_TAGS
return dataset
def get_group(self, group_ref):
for group in GROUPS:
if group['name'] == group_ref or \
group['id'] == group_ref:
return group
def get_org(self, org_ref):
for org in ORGS:
if org['name'] == org_ref or \
org['id'] == org_ref:
return org
def get_url_params(self):
params_str = self.path.split('?')[-1]
params_unicode = urllib.unquote_plus(params_str).decode('utf8')
params = params_unicode.split('&')
return dict([param.split('=') for param in params])
def respond_action(self, result_dict, status=200):
response_dict = {'result': result_dict, 'success': True}
return self.respond_json(response_dict, status=status)
def respond_json(self, content_dict, status=200):
return self.respond(json.dumps(content_dict), status=status,
content_type='application/json')
def respond(self, content, status=200, content_type='application/json'):
self.send_response(status)
self.send_header('Content-Type', content_type)
self.end_headers()
self.wfile.write(content)
self.wfile.close()
def serve(port=PORT):
'''Runs a CKAN-alike app (over HTTP) that is used for harvesting tests'''
# Choose the directory to serve files from
# os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)),
# 'mock_ckan_files'))
class TestServer(SocketServer.TCPServer):
allow_reuse_address = True
httpd = TestServer(("", PORT), MockCkanHandler)
print('Serving test HTTP server at port {}'.format(PORT))
httpd_thread = Thread(target=httpd.serve_forever)
httpd_thread.setDaemon(True)
httpd_thread.start()
def convert_dataset_to_restful_form(dataset):
dataset = copy.deepcopy(dataset)
dataset['extras'] = dict([(e['key'], e['value']) for e in dataset['extras']])
dataset['tags'] = [t['name'] for t in dataset.get('tags', [])]
return dataset
# Datasets are in the package_show form, rather than the RESTful form
DATASETS = [
{'id': 'dataset1-id',
'name': 'dataset1',
'title': 'Test Dataset1',
'owner_org': 'org1-id',
'tags': [{'name': 'test-tag'}],
'groups': [{'id': 'group1-id', 'name': 'group1'}],
'extras': []},
{
"id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"name": "cabinet-office-energy-use",
"private": False,
"maintainer_email": None,
"revision_timestamp": "2010-11-23T22:34:55.089925",
"organization":
{
"description": "The Cabinet Office supports the Prime Minister and Deputy Prime Minister,"
" and ensure the effective running of government. We are also the corporate"
" headquarters for government, in partnership with HM Treasury, and we take"
" the lead in certain critical policy areas.\r\nCO is a ministerial department,"
" supported by 18 agencies and public bodies\r\n\r\nYou can find out more at"
" https://www.gov.uk/government/organisations/cabinet-office",
"created": "2012-06-27T14:48:40.244951",
"title": "Cabinet Office",
"name": "cabinet-office",
"revision_timestamp": "2013-04-02T14:27:23.086886",
"is_organization": True,
"state": "active",
"image_url": "",
"revision_id": "4be8825d-d3f4-4fb2-b80b-43e36f574c05",
"type": "organization",
"id": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"approval_status": "pending"
},
"update_frequency": "other",
"metadata_created": "2010-08-02T09:19:47.600853",
"last_major_modification": "2010-08-02T09:19:47.600853",
"metadata_modified": "2014-05-09T22:00:01.486366",
"temporal_granularity": "",
"author_email": None,
"geographic_granularity": "point",
"geographic_coverage": [],
"state": "active",
"version": None,
"temporal_coverage-to": "",
"license_id": "uk-ogl",
"type": "dataset",
"published_via": "",
"resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"num_resources": 1,
"tags":
[
{
"vocabulary_id": None,
"display_name": "consumption",
"name": "consumption",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8"
},
{
"vocabulary_id": None,
"display_name": "energy",
"name": "energy",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "9f2ae723-602f-4290-80c4-6637ad617a45"
}
],
"precision": "",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"taxonomy_url": "",
"groups": [{"id": "remote-group-id", "name": "remote-group"}],
"creator_user_id": None,
"national_statistic": "no",
"relationships_as_subject": [],
"num_tags": 8,
"update_frequency-other": "Real-time",
"isopen": True,
"url": "http://www.carbonculture.net/orgs/cabinet-office/70-whitehall/",
"notes": "Cabinet Office head office energy use updated from on-site meters showing use, cost and carbon impact.",
"owner_org": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"theme-secondary":
[
"Environment"
],
"extras":
[
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "categories",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "6813d71b-785b-4f56-b296-1b2acb34eed6"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "2010-07-30",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_released",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "515f638b-e2cf-40a6-a8a7-cbc8001269e3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_updated",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "bff63465-4f96-44e7-bb87-6e66fff5e596"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "000000: ",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_coverage",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "414bcd35-b628-4218-99e2-639615183df8"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "point",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "c7b460dd-c61f-4cd2-90c2-eceb6c91fe9b"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "no",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "national_statistic",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "9f04b202-3646-49be-b69e-7fa997399ff3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "{\"status\": \"final\", \"source\": \"Automatically awarded by ODI\","
" \"certification_type\": \"automatically awarded\", \"level\": \"raw\","
" \"title\": \"Cabinet Office 70 Whitehall energy use\","
" \"created_at\": \"2014-10-28T12:25:57Z\", \"jurisdiction\": \"GB\","
" \"certificate_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922\","
" \"badge_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922/badge.png\","
" \"cert_title\": \"Basic Level Certificate\"}",
"revision_timestamp": "2014-11-12T02:52:35.048060",
"state": "active",
"key": "odi-certificate",
"revision_id": "eae9763b-e258-4d76-9ec2-7f5baf655394",
"id": "373a3cbb-d9c0-45a6-9a78-b95c86398766"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-from",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "39f72eed-6f76-4733-b636-7541cee3404f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-to",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "818e2c8f-fee0-49da-8bea-ea3c9401ece5"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "f868b950-d3ce-4fbe-88ca-5cbc4b672320"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Towns & Cities",
"revision_timestamp": "2015-03-16T18:10:08.802815",
"state": "active",
"key": "theme-primary",
"revision_id": "fc2b6630-84f8-4c88-8ac7-0ca275b2bc97",
"id": "bdcf00fd-3248-4c2f-9cf8-b90706c88e8d"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "[\"Environment\"]",
"revision_timestamp": "2015-04-08T20:57:04.895214",
"state": "active",
"key": "theme-secondary",
"revision_id": "c2c48530-ff82-4af1-9373-cdc64d5bc83c",
"id": "417482c5-a9c0-4430-8c4e-0c76e59fe44f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Real-time",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "update_frequency",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "e8ad4837-514e-4446-81a2-ffacfa7cf683"
}
],
"license_url": "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/",
"individual_resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"title": "Cabinet Office 70 Whitehall energy use",
"revision_id": "3bd6ced3-35b2-4b20-94e2-c596e24bc375",
"date_released": "30/7/2010",
"theme-primary": "Towns & Cities"
}
]
INVALID_TAGS = [
{
"vocabulary_id": None,
"display_name": "consumption%^&",
"name": "consumption%^&",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8"
},
]
ORGS = [
{'id': 'org1-id',
'name': 'org1'},
{'id': 'aa1e068a-23da-4563-b9c2-2cad272b663e',
'name': 'cabinet-office'}
]
GROUPS = [
{'id': 'group1-id',
'name': 'group1'},
{'id': '9853c3e1-eebb-4e8c-9ae7-1668a01bf2ca',
'name': 'finances'}
]
REVISIONS = [
{
"id": "23daf2eb-d7ec-4d86-a844-3924acd311ea",
"timestamp": "2015-10-21T09:50:08.160045",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
DATASETS[1]['id']
],
"groups": []
},
{
"id": "8254a293-10db-4af2-9dfa-6a1f06ee899c",
"timestamp": "2015-10-21T09:46:21.198021",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
DATASETS[1]['id']
],
"groups": []
}]

View File

@ -1,180 +0,0 @@
import re
from nose.tools import assert_equal, assert_in
from ckanext.harvest import model as harvest_model
from ckanext.harvest.harvesters.base import HarvesterBase, munge_tag
from mock import patch
from ckantoolkit.tests import helpers, factories
_ensure_name_is_unique = HarvesterBase._ensure_name_is_unique
class TestGenNewName(object):
def setup(self):
helpers.reset_db()
harvest_model.setup()
def test_basic(self):
assert_equal(HarvesterBase._gen_new_name('Trees'), 'trees')
def test_munge(self):
assert_equal(
HarvesterBase._gen_new_name('Trees and branches - survey.'),
'trees-and-branches-survey')
@patch.dict('ckanext.harvest.harvesters.base.config',
{'ckanext.harvest.some_other_config': 'value'})
def test_without_config(self):
'''Tests if the number suffix is used when no config is set.'''
factories.Dataset(name='trees')
assert_equal(
HarvesterBase._gen_new_name('Trees'),
'trees1')
@patch.dict('ckanext.harvest.harvesters.base.config',
{'ckanext.harvest.default_dataset_name_append': 'number-sequence'})
def test_number_config(self):
factories.Dataset(name='trees')
assert_equal(
HarvesterBase._gen_new_name('Trees'),
'trees1')
@patch.dict('ckanext.harvest.harvesters.base.config',
{'ckanext.harvest.default_dataset_name_append': 'random-hex'})
def test_random_config(self):
factories.Dataset(name='trees')
new_name = HarvesterBase._gen_new_name('Trees')
assert re.match(r'trees[\da-f]{5}', new_name)
@patch.dict('ckanext.harvest.harvesters.base.config',
{'ckanext.harvest.default_dataset_name_append': 'random-hex'})
def test_config_override(self):
'''Tests if a parameter has precedence over a config value.'''
factories.Dataset(name='trees')
assert_equal(
HarvesterBase._gen_new_name('Trees', append_type='number-sequence'),
'trees1')
class TestEnsureNameIsUnique(object):
def setup(self):
helpers.reset_db()
harvest_model.setup()
def test_no_existing_datasets(self):
factories.Dataset(name='unrelated')
assert_equal(_ensure_name_is_unique('trees'), 'trees')
def test_existing_dataset(self):
factories.Dataset(name='trees')
assert_equal(_ensure_name_is_unique('trees'), 'trees1')
def test_two_existing_datasets(self):
factories.Dataset(name='trees')
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('trees'), 'trees2')
def test_no_existing_datasets_and_long_name(self):
assert_equal(_ensure_name_is_unique('x'*101), 'x'*100)
def test_existing_dataset_and_long_name(self):
# because PACKAGE_NAME_MAX_LENGTH = 100
factories.Dataset(name='x'*100)
assert_equal(_ensure_name_is_unique('x'*101), 'x'*99 + '1')
def test_update_dataset_with_new_name(self):
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('tree', existing_name='trees1'),
'tree')
def test_update_dataset_but_with_same_name(self):
# this can happen if you remove a trailing space from the title - the
# harvester sees the title changed and thinks it should have a new
# name, but clearly it can reuse its existing one
factories.Dataset(name='trees')
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('trees', existing_name='trees'),
'trees')
def test_update_dataset_to_available_shorter_name(self):
# this can be handy when if reharvesting, you got duplicates and
# managed to purge one set and through a minor title change you can now
# lose the appended number. users don't like unnecessary numbers.
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('trees', existing_name='trees1'),
'trees')
def test_update_dataset_but_doesnt_change_to_other_number(self):
# there's no point changing one number for another though
factories.Dataset(name='trees')
factories.Dataset(name='trees2')
assert_equal(_ensure_name_is_unique('trees', existing_name='trees2'),
'trees2')
def test_update_dataset_with_new_name_with_numbers(self):
factories.Dataset(name='trees')
factories.Dataset(name='trees2')
factories.Dataset(name='frogs')
assert_equal(_ensure_name_is_unique('frogs', existing_name='trees2'),
'frogs1')
def test_existing_dataset_appending_hex(self):
factories.Dataset(name='trees')
name = _ensure_name_is_unique('trees', append_type='random-hex')
# e.g. 'trees0b53f'
assert re.match(r'trees[\da-f]{5}', name)
# taken from ckan/tests/lib/test_munge.py
class TestMungeTag:
# (original, expected)
munge_list = [
('unchanged', 'unchanged'),
# ('s', 's_'), # too short
('some spaces here', 'some-spaces--here'),
('random:other%characters&_.here', 'randomothercharactershere'),
('river-water-dashes', 'river-water-dashes'),
]
def test_munge_tag(self):
'''Munge a list of tags gives expected results.'''
for org, exp in self.munge_list:
munge = munge_tag(org)
assert_equal(munge, exp)
def test_munge_tag_multiple_pass(self):
'''Munge a list of tags muliple times gives expected results.'''
for org, exp in self.munge_list:
first_munge = munge_tag(org)
assert_equal(first_munge, exp)
second_munge = munge_tag(first_munge)
assert_equal(second_munge, exp)
def test_clean_tags_package_show(self):
instance = HarvesterBase()
tags_as_dict = [{u'vocabulary_id': None,
u'state': u'active',
u'display_name': name,
u'id': u'073080c8-fef2-4743-9c9e-6216019f8b3d',
u'name': name} for name, exp in self.munge_list]
clean_tags = HarvesterBase._clean_tags(instance, tags_as_dict)
idx = 0
for _, exp in self.munge_list:
tag = clean_tags[idx]
assert_equal(tag['name'], exp)
idx += 1
def test_clean_tags_rest(self):
instance = HarvesterBase()
tags_as_str = [name for name, exp in self.munge_list]
clean_tags = HarvesterBase._clean_tags(instance, tags_as_str)
assert_equal(len(clean_tags), len(tags_as_str))
for _, exp in self.munge_list:
assert_in(exp, clean_tags)

View File

@ -1,367 +0,0 @@
from __future__ import absolute_import
import copy
from nose.tools import assert_equal, assert_raises, assert_in
import json
from mock import patch, MagicMock, Mock
from requests.exceptions import HTTPError, RequestException
from ckantoolkit.tests.helpers import reset_db, call_action
from ckantoolkit.tests.factories import Organization, Group
from ckan import model
from ckan.plugins import toolkit
from ckanext.harvest.harvesters.ckanharvester import ContentFetchError
from ckanext.harvest.tests.nose.factories import (HarvestSourceObj, HarvestJobObj,
HarvestObjectObj)
from ckanext.harvest.tests.lib import run_harvest
import ckanext.harvest.model as harvest_model
from ckanext.harvest.harvesters.base import HarvesterBase
from ckanext.harvest.harvesters.ckanharvester import CKANHarvester
from . import mock_ckan
# Start CKAN-alike server we can test harvesting against it
mock_ckan.serve()
def was_last_job_considered_error_free():
last_job = model.Session.query(harvest_model.HarvestJob) \
.order_by(harvest_model.HarvestJob.created.desc()) \
.first()
job = MagicMock()
job.source = last_job.source
job.id = ''
return bool(HarvesterBase.last_error_free_job(job))
class TestCkanHarvester(object):
@classmethod
def setup(cls):
reset_db()
harvest_model.setup()
def test_gather_normal(self):
source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT)
job = HarvestJobObj(source=source)
harvester = CKANHarvester()
obj_ids = harvester.gather_stage(job)
assert_equal(job.gather_errors, [])
assert_equal(type(obj_ids), list)
assert_equal(len(obj_ids), len(mock_ckan.DATASETS))
harvest_object = harvest_model.HarvestObject.get(obj_ids[0])
assert_equal(harvest_object.guid, mock_ckan.DATASETS[0]['id'])
assert_equal(
json.loads(harvest_object.content),
mock_ckan.DATASETS[0])
def test_fetch_normal(self):
source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT)
job = HarvestJobObj(source=source)
harvest_object = HarvestObjectObj(
guid=mock_ckan.DATASETS[0]['id'],
job=job,
content=json.dumps(mock_ckan.DATASETS[0]))
harvester = CKANHarvester()
result = harvester.fetch_stage(harvest_object)
assert_equal(harvest_object.errors, [])
assert_equal(result, True)
def test_import_normal(self):
org = Organization()
harvest_object = HarvestObjectObj(
guid=mock_ckan.DATASETS[0]['id'],
content=json.dumps(mock_ckan.DATASETS[0]),
job__source__owner_org=org['id'])
harvester = CKANHarvester()
result = harvester.import_stage(harvest_object)
assert_equal(harvest_object.errors, [])
assert_equal(result, True)
assert harvest_object.package_id
dataset = model.Package.get(harvest_object.package_id)
assert_equal(dataset.name, mock_ckan.DATASETS[0]['name'])
def test_harvest(self):
results_by_guid = run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
result = results_by_guid['dataset1-id']
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name'])
assert_equal(result['errors'], [])
result = results_by_guid[mock_ckan.DATASETS[1]['id']]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name'])
assert_equal(result['errors'], [])
assert was_last_job_considered_error_free()
def test_harvest_twice(self):
run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
# change the modified date
datasets = copy.deepcopy(mock_ckan.DATASETS)
datasets[1]['metadata_modified'] = '2050-05-09T22:00:01.486366'
with patch('ckanext.harvest.tests.nose.harvesters.mock_ckan.DATASETS',
datasets):
results_by_guid = run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
# updated the dataset which has revisions
result = results_by_guid[mock_ckan.DATASETS[1]['id']]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'updated')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name'])
assert_equal(result['errors'], [])
# the other dataset is unchanged and not harvested
assert mock_ckan.DATASETS[0]['id'] not in result
assert was_last_job_considered_error_free()
def test_harvest_invalid_tag(self):
from nose.plugins.skip import SkipTest
raise SkipTest()
results_by_guid = run_harvest(
url='http://localhost:%s/invalid_tag' % mock_ckan.PORT,
harvester=CKANHarvester())
result = results_by_guid['dataset1-id']
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name'])
def test_exclude_organizations(self):
config = {'organizations_filter_exclude': ['org1']}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert 'dataset1-id' not in results_by_guid
assert mock_ckan.DATASETS[1]['id'] in results_by_guid
def test_include_organizations(self):
config = {'organizations_filter_include': ['org1']}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert 'dataset1-id' in results_by_guid
assert mock_ckan.DATASETS[1]['id'] not in results_by_guid
def test_exclude_groups(self):
config = {'groups_filter_exclude': ['group1']}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert 'dataset1-id' not in results_by_guid
assert mock_ckan.DATASETS[1]['id'] in results_by_guid
def test_include_groups(self):
config = {'groups_filter_include': ['group1']}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert 'dataset1-id' in results_by_guid
assert mock_ckan.DATASETS[1]['id'] not in results_by_guid
def test_remote_groups_create(self):
config = {'remote_groups': 'create'}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert 'dataset1-id' in results_by_guid
# Check that the remote group was created locally
call_action('group_show', {}, id=mock_ckan.GROUPS[0]['id'])
def test_remote_groups_only_local(self):
# Create an existing group
Group(id='group1-id', name='group1')
config = {'remote_groups': 'only_local'}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert 'dataset1-id' in results_by_guid
# Check that the dataset was added to the existing local group
dataset = call_action('package_show', {}, id=mock_ckan.DATASETS[0]['id'])
assert_equal(dataset['groups'][0]['id'], mock_ckan.DATASETS[0]['groups'][0]['id'])
# Check that the other remote group was not created locally
assert_raises(toolkit.ObjectNotFound, call_action, 'group_show', {},
id='remote-group')
def test_harvest_not_modified(self):
run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
results_by_guid = run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
# The metadata_modified was the same for this dataset so the import
# would have returned 'unchanged'
result = results_by_guid[mock_ckan.DATASETS[1]['id']]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert 'dataset' not in result
assert_equal(result['errors'], [])
assert was_last_job_considered_error_free()
def test_harvest_whilst_datasets_added(self):
results_by_guid = run_harvest(
url='http://localhost:%s/datasets_added' % mock_ckan.PORT,
harvester=CKANHarvester())
assert_equal(sorted(results_by_guid.keys()),
[mock_ckan.DATASETS[1]['id'],
mock_ckan.DATASETS[0]['id']])
def test_harvest_site_down(self):
results_by_guid = run_harvest(
url='http://localhost:%s/site_down' % mock_ckan.PORT,
harvester=CKANHarvester())
assert not results_by_guid
assert not was_last_job_considered_error_free()
def test_default_tags(self):
config = {'default_tags': [{'name': 'geo'}]}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
tags = results_by_guid['dataset1-id']['dataset']['tags']
tag_names = [tag['name'] for tag in tags]
assert 'geo' in tag_names
def test_default_tags_invalid(self):
config = {'default_tags': ['geo']} # should be list of dicts
with assert_raises(toolkit.ValidationError) as harvest_context:
run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert_in('default_tags must be a list of dictionaries',
str(harvest_context.exception))
def test_default_groups(self):
Group(id='group1-id', name='group1')
Group(id='group2-id', name='group2')
Group(id='group3-id', name='group3')
config = {'default_groups': ['group2-id', 'group3'],
'remote_groups': 'only_local'}
tmp_c = toolkit.c
try:
# c.user is used by the validation (annoying),
# however patch doesn't work because it's a weird
# StackedObjectProxy, so we swap it manually
toolkit.c = MagicMock(user='')
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
finally:
toolkit.c = tmp_c
assert_equal(results_by_guid['dataset1-id']['errors'], [])
groups = results_by_guid['dataset1-id']['dataset']['groups']
group_names = set(group['name'] for group in groups)
# group1 comes from the harvested dataset
# group2 & 3 come from the default_groups
assert_equal(group_names, set(('group1', 'group2', 'group3')))
def test_default_groups_invalid(self):
Group(id='group2-id', name='group2')
# should be list of strings
config = {'default_groups': [{'name': 'group2'}]}
with assert_raises(toolkit.ValidationError) as harvest_context:
run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert_in('default_groups must be a list of group names/ids',
str(harvest_context.exception))
def test_default_extras(self):
config = {
'default_extras': {
'encoding': 'utf8',
'harvest_url': '{harvest_source_url}/dataset/{dataset_id}'
}}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert_equal(results_by_guid['dataset1-id']['errors'], [])
extras = results_by_guid['dataset1-id']['dataset']['extras']
extras_dict = dict((e['key'], e['value']) for e in extras)
assert_equal(extras_dict['encoding'], 'utf8')
assert_equal(extras_dict['harvest_url'],
'http://localhost:8998/dataset/dataset1-id')
def test_default_extras_invalid(self):
config = {
'default_extras': 'utf8', # value should be a dict
}
with assert_raises(toolkit.ValidationError) as harvest_context:
run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert_in('default_extras must be a dictionary',
str(harvest_context.exception))
@patch('ckanext.harvest.harvesters.ckanharvester.pyopenssl.inject_into_urllib3')
@patch('ckanext.harvest.harvesters.ckanharvester.CKANHarvester.config')
@patch('ckanext.harvest.harvesters.ckanharvester.requests.get', side_effect=RequestException('Test exception'))
def test_get_content_handles_request_exception(
self, mock_requests_get, mock_config, mock_pyopenssl_inject
):
mock_config.return_value = {}
harvester = CKANHarvester()
with assert_raises(ContentFetchError) as context:
harvester._get_content("http://test.example.gov.uk")
assert str(context.exception) == 'Request error: Test exception'
class MockHTTPError(HTTPError):
def __init__(self):
self.response = Mock()
self.response.status_code = 404
self.request = Mock()
self.request.url = "http://test.example.gov.uk"
@patch('ckanext.harvest.harvesters.ckanharvester.pyopenssl.inject_into_urllib3')
@patch('ckanext.harvest.harvesters.ckanharvester.CKANHarvester.config')
@patch('ckanext.harvest.harvesters.ckanharvester.requests.get', side_effect=MockHTTPError())
def test_get_content_handles_http_error(
self, mock_requests_get, mock_config, mock_pyopenssl_inject
):
mock_config.return_value = {}
harvester = CKANHarvester()
with assert_raises(ContentFetchError) as context:
harvester._get_content("http://test.example.gov.uk")
assert str(context.exception) == 'HTTP error: 404 http://test.example.gov.uk'

View File

@ -1,72 +0,0 @@
import logging
from ckanext.harvest.tests.factories import HarvestSourceObj, HarvestJobObj
import ckanext.harvest.model as harvest_model
from ckanext.harvest import queue
from ckan.plugins import toolkit
log = logging.getLogger(__name__)
def run_harvest(url, harvester, config=''):
'''Runs a harvest and returns the results.
This allows you to test a harvester.
Queues are avoided as they are a pain in tests.
'''
# User creates a harvest source
source = HarvestSourceObj(url=url, config=config,
source_type=harvester.info()['name'])
# User triggers a harvest, which is the creation of a harvest job.
# We set run=False so that it doesn't put it on the gather queue.
job = HarvestJobObj(source=source, run=False)
return run_harvest_job(job, harvester)
def run_harvest_job(job, harvester):
# In 'harvest_job_create' it would call 'harvest_send_job_to_gather_queue'
# which would do 2 things to 'run' the job:
# 1. change the job status to Running
job.status = 'Running'
job.save()
# 2. put the job on the gather queue which is consumed by
# queue.gather_callback, which determines the harvester and then calls
# gather_stage. We simply call the gather_stage.
obj_ids = queue.gather_stage(harvester, job)
if not isinstance(obj_ids, list):
# gather had nothing to do or errored. Carry on to ensure the job is
# closed properly
obj_ids = []
# The object ids are put onto the fetch queue, consumed by
# queue.fetch_callback which calls queue.fetch_and_import_stages
results_by_guid = {}
for obj_id in obj_ids:
harvest_object = harvest_model.HarvestObject.get(obj_id)
guid = harvest_object.guid
# force reimport of datasets
if hasattr(job, 'force_import'):
if guid in job.force_import:
harvest_object.force_import = True
else:
log.info('Skipping: %s', guid)
continue
results_by_guid[guid] = {'obj_id': obj_id}
queue.fetch_and_import_stages(harvester, harvest_object)
results_by_guid[guid]['state'] = harvest_object.state
results_by_guid[guid]['report_status'] = harvest_object.report_status
if harvest_object.state == 'COMPLETE' and harvest_object.package_id:
results_by_guid[guid]['dataset'] = \
toolkit.get_action('package_show')(
{'ignore_auth': True},
dict(id=harvest_object.package_id))
results_by_guid[guid]['errors'] = harvest_object.errors
# Do 'harvest_jobs_run' to change the job status to 'finished'
toolkit.get_action('harvest_jobs_run')({'ignore_auth': True}, {})
return results_by_guid

View File

@ -1,993 +0,0 @@
import json
import factories
import unittest
from mock import patch
from nose.tools import assert_equal, assert_raises, assert_in
from nose.plugins.skip import SkipTest
from ckantoolkit.tests import factories as ckan_factories
from ckantoolkit.tests.helpers import _get_test_app, reset_db, FunctionalTestBase
from ckan import plugins as p
from ckan.plugins import toolkit
from ckan import model
from ckan.lib.base import config
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestGatherError, HarvestObjectError, HarvestObject, HarvestJob
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.logic.action.update import send_error_mail
def call_action_api(action, apikey=None, status=200, **kwargs):
'''POST an HTTP request to the CKAN API and return the result.
Any additional keyword arguments that you pass to this function as **kwargs
are posted as params to the API.
Usage:
package_dict = call_action_api('package_create', apikey=apikey,
name='my_package')
assert package_dict['name'] == 'my_package'
num_followers = post(app, 'user_follower_count', id='annafan')
If you are expecting an error from the API and want to check the contents
of the error dict, you have to use the status param otherwise an exception
will be raised:
error_dict = call_action_api('group_activity_list', status=403,
id='invalid_id')
assert error_dict['message'] == 'Access Denied'
:param action: the action to post to, e.g. 'package_create'
:type action: string
:param apikey: the API key to put in the Authorization header of the post
(optional, default: None)
:type apikey: string
:param status: the HTTP status code expected in the response from the CKAN
API, e.g. 403, if a different status code is received an exception will
be raised (optional, default: 200)
:type status: int
:param **kwargs: any other keyword arguments passed to this function will
be posted to the API as params
:raises paste.fixture.AppError: if the HTTP status code of the response
from the CKAN API is different from the status param passed to this
function
:returns: the 'result' or 'error' dictionary from the CKAN API response
:rtype: dictionary
'''
params = json.dumps(kwargs)
app = _get_test_app()
response = app.post('/api/action/{0}'.format(action), params=params,
extra_environ={'Authorization': str(apikey)},
status=status)
if status in (200,):
assert response.json['success'] is True
return response.json['result']
else:
assert response.json['success'] is False
return response.json['error']
class MockHarvesterForActionTests(p.SingletonPlugin):
p.implements(IHarvester)
def info(self):
return {'name': 'test-for-action-nose',
'title': 'Test for action',
'description': 'test'}
def validate_config(self, config):
if not config:
return config
try:
config_obj = json.loads(config)
if 'custom_option' in config_obj:
if not isinstance(config_obj['custom_option'], list):
raise ValueError('custom_option must be a list')
except ValueError, e:
raise e
return config
def gather_stage(self, harvest_job):
return []
def fetch_stage(self, harvest_object):
return True
def import_stage(self, harvest_object):
return True
SOURCE_DICT = {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action-nose",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
class ActionBase(object):
@classmethod
def setup_class(cls):
if not p.plugin_loaded('test_nose_action_harvester'):
p.load('test_nose_action_harvester')
def setup(self):
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
p.unload('test_nose_action_harvester')
class HarvestSourceActionBase(FunctionalTestBase):
@classmethod
def setup_class(cls):
super(HarvestSourceActionBase, cls).setup_class()
harvest_model.setup()
if not p.plugin_loaded('test_nose_action_harvester'):
p.load('test_nose_action_harvester')
@classmethod
def teardown_class(cls):
super(HarvestSourceActionBase, cls).teardown_class()
p.unload('test_nose_action_harvester')
def _get_source_dict(self):
return {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action-nose",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
def test_invalid_missing_values(self):
source_dict = {}
test_data = self._get_source_dict()
if 'id' in test_data:
source_dict['id'] = test_data['id']
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
for key in ('name', 'title', 'url', 'source_type'):
assert_equal(result[key], [u'Missing value'])
def test_invalid_unknown_type(self):
source_dict = self._get_source_dict()
source_dict['source_type'] = 'unknown'
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'source_type' in result
assert u'Unknown harvester type' in result['source_type'][0]
def test_invalid_unknown_frequency(self):
wrong_frequency = 'ANNUALLY'
source_dict = self._get_source_dict()
source_dict['frequency'] = wrong_frequency
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'frequency' in result
assert u'Frequency {0} not recognised'.format(wrong_frequency) in result['frequency'][0]
def test_invalid_wrong_configuration(self):
source_dict = self._get_source_dict()
source_dict['config'] = 'not_json'
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'config' in result
assert u'Error parsing the configuration options: No JSON object could be decoded' in result['config'][0]
source_dict['config'] = json.dumps({'custom_option': 'not_a_list'})
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'config' in result
assert u'Error parsing the configuration options: custom_option must be a list' in result['config'][0]
class TestHarvestSourceActionCreate(HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_create'
def test_create(self):
source_dict = self._get_source_dict()
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_create',
apikey=sysadmin['apikey'], **source_dict)
for key in source_dict.keys():
assert_equal(source_dict[key], result[key])
# Check that source was actually created
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
# Trying to create a source with the same URL fails
source_dict = self._get_source_dict()
source_dict['name'] = 'test-source-action-new'
result = call_action_api('harvest_source_create',
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'url' in result
assert u'There already is a Harvest Source for this URL' in result['url'][0]
class TestHarvestSourceActionList(FunctionalTestBase):
def test_list_with_organization(self):
organization = ckan_factories.Organization.create()
harvest_data = {
"owner_org": organization["id"],
"type": "harvest",
"url": "https://www.gov.uk/random",
"source_type": "test-nose"
}
other_harvest_data = {
"type": "harvest",
"url": "https://www.gov.uk/other-path",
"source_type": "test-nose"
}
harvest = ckan_factories.Dataset.create(**harvest_data)
harvest_source = factories.HarvestSource.create(id=harvest["id"])
other_harvest = ckan_factories.Dataset.create(**other_harvest_data)
other_harvest_source = factories.HarvestSource.create(id=other_harvest["id"])
app = _get_test_app()
response = app.get('/api/action/{0}'.format('harvest_source_list'),
params={"organization_id":organization["id"]},
status=200)
results = response.json['result']
result_harvest = model.Session.query(model.Package).get(results[0]["id"])
result_organization_id = result_harvest.owner_org
assert response.json['success'] is True
assert 1 is len(results)
assert_equal(organization["id"], result_organization_id)
def test_list_without_organization(self):
harvest_data = {
"type": "harvest",
"url": "https://www.gov.uk/random",
"source_type": "test-nose"
}
other_harvest_data = {
"type": "harvest",
"url": "https://www.gov.uk/other-path",
"source_type": "test-nose"
}
harvest_source = factories.HarvestSource.create()
other_harvest_source = factories.HarvestSource.create()
app = _get_test_app()
response = app.get('/api/action/{0}'.format('harvest_source_list'), status=200)
results = response.json['result']
assert response.json['success'] is True
assert 2 is len(results)
@patch.dict('ckanext.harvest.logic.action.get.config',
{'ckan.harvest.harvest_source_limit': 1})
def test_list_with_limit(self):
harvest_data = {
"type": "harvest",
"url": "https://www.gov.uk/random",
"source_type": "test-nose"
}
other_harvest_data = {
"type": "harvest",
"url": "https://www.gov.uk/other-path",
"source_type": "test-nose"
}
harvest_source = factories.HarvestSource.create()
other_harvest_source = factories.HarvestSource.create()
app = _get_test_app()
response = app.get('/api/action/{0}'.format('harvest_source_list'), status=200)
results = response.json['result']
assert response.json['success'] is True
assert 1 is len(results)
class HarvestSourceFixtureMixin(object):
def _get_source_dict(self):
'''Not only returns a source_dict, but creates the HarvestSource object
as well - suitable for testing update actions.
'''
source = HarvestSourceActionBase._get_source_dict(self)
source = factories.HarvestSource(**source)
# delete status because it gets in the way of the status supplied to
# call_action_api later on. It is only a generated value, not affecting
# the update/patch anyway.
del source['status']
return source
class TestHarvestSourceActionUpdate(HarvestSourceFixtureMixin,
HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_update'
def test_update(self):
source_dict = self._get_source_dict()
source_dict.update({
"url": "http://test.action.updated.com",
"name": "test-source-action-updated",
"title": "Test source action updated",
"notes": "Test source action desc updated",
"source_type": "test-nose",
"frequency": "MONTHLY",
"config": json.dumps({"custom_option": ["c", "d"]})
})
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_update',
apikey=sysadmin['apikey'], **source_dict)
for key in set(('url', 'name', 'title', 'notes', 'source_type',
'frequency', 'config')):
assert_equal(source_dict[key], result[key], "Key: %s" % key)
# Check that source was actually updated
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
class TestHarvestSourceActionPatch(HarvestSourceFixtureMixin,
HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_patch'
if toolkit.check_ckan_version(max_version='2.2.99'):
# harvest_source_patch only came in with ckan 2.3
raise SkipTest()
def test_invalid_missing_values(self):
pass
def test_patch(self):
source_dict = self._get_source_dict()
patch_dict = {
"id": source_dict['id'],
"name": "test-source-action-patched",
"url": "http://test.action.patched.com",
"config": json.dumps({"custom_option": ["pat", "ched"]})
}
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_patch',
apikey=sysadmin['apikey'], **patch_dict)
source_dict.update(patch_dict)
for key in set(('url', 'name', 'title', 'notes', 'source_type',
'frequency', 'config')):
assert_equal(source_dict[key], result[key], "Key: %s" % key)
# Check that source was actually updated
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
class TestActions(ActionBase):
def test_harvest_source_clear(self):
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_clear')(
context, {'id': source.id})
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
assert_equal(model.Package.get(dataset['id']), None)
def test_harvest_source_job_history_clear(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_job_history_clear')(
context, {'id': source.id})
# verify
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db, 'is None'
assert_equal(dataset_from_db.id, dataset['id'])
def test_harvest_sources_job_history_clear(self):
# prepare
data_dict = SOURCE_DICT.copy()
source_1 = factories.HarvestSourceObj(**data_dict)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)
job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])
job_2 = factories.HarvestJobObj(source=source_2)
dataset_2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=dataset_2['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_sources_job_history_clear')(
context, {})
# verify
assert_equal(
sorted(result),
sorted([{'id': source_1.id}, {'id': source_2.id}]))
source_1 = harvest_model.HarvestSource.get(source_1.id)
assert source_1
assert_equal(harvest_model.HarvestJob.get(job_1.id), None)
assert_equal(harvest_model.HarvestObject.get(object_1_.id), None)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1, 'is None'
assert_equal(dataset_from_db_1.id, dataset_1['id'])
source_2 = harvest_model.HarvestSource.get(source_1.id)
assert source_2
assert_equal(harvest_model.HarvestJob.get(job_2.id), None)
assert_equal(harvest_model.HarvestObject.get(object_2_.id), None)
dataset_from_db_2 = model.Package.get(dataset_2['id'])
assert dataset_from_db_2, 'is None'
assert_equal(dataset_from_db_2.id, dataset_2['id'])
def test_harvest_source_create_twice_with_unique_url(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_same_url(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
assert_raises(toolkit.ValidationError,
toolkit.get_action('harvest_source_create'),
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_unique_url_and_config(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
data_dict['config'] = '{"something": "new"}'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_job_create_as_sysadmin(self):
source = factories.HarvestSource(**SOURCE_DICT.copy())
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': site_user}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
def test_harvest_job_create_as_admin(self):
# as if an admin user presses 'refresh'
user = ckan_factories.User()
user['capacity'] = 'admin'
org = ckan_factories.Organization(users=[user])
source_dict = dict(SOURCE_DICT.items() +
[('publisher_id', org['id'])])
source = factories.HarvestSource(**source_dict)
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': user['name']}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
@patch('ckanext.harvest.logic.action.update.log.error')
def test_harvest_jobs_run_times_out(self, mock_error_log):
harvest_source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
harvest_job = factories.HarvestJobObj(
source=harvest_source,
run=True
)
# date in the past, ckan.harvest.timeout has been set to 5 minutes in test-nose.ini
harvest_job.created = '2020-05-29 10:00:00.0'
harvest_job.save()
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job.id,
'source_id': harvest_source.id
}
job = toolkit.get_action('harvest_jobs_run')(
context, data_dict)
msg, = mock_error_log.call_args[0]
assert mock_error_log.called
assert msg == 'Job timeout: {} is taking longer than 5 minutes'.format(harvest_job.id)
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source.id})
assert status['last_job']['status'] == 'Finished'
assert status['last_job']['stats']['errored'] == 1
@patch('ckanext.harvest.logic.action.update.log.error')
def test_harvest_jobs_run_does_not_timeout_if_within_time(self, mock_error_log):
harvest_source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
harvest_job = factories.HarvestJobObj(
source=harvest_source,
run=True
)
# job has just been created, so no timeout expected
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job.id,
'source_id': harvest_source.id
}
job_obj = HarvestJob.get(harvest_job.id)
job = toolkit.get_action('harvest_jobs_run')(
context, data_dict)
assert not mock_error_log.called
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source.id})
assert status['last_job']['status'] == 'Running'
assert status['last_job']['stats']['errored'] == 0
@patch.dict('ckanext.harvest.logic.action.update.config',
{'ckan.harvest.timeout': None})
@patch('ckanext.harvest.logic.action.update.log.error')
def test_harvest_jobs_run_does_not_timeout_if_timeout_not_set(self, mock_error_log):
harvest_source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
harvest_job = factories.HarvestJobObj(
source=harvest_source,
run=True
)
# date in the past, assumes ckan.harvest.timeout has been set to 5 minutes
harvest_job.created = '2020-05-29 10:00:00.0'
harvest_job.save()
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job.id,
'source_id': harvest_source.id
}
job_obj = HarvestJob.get(harvest_job.id)
job = toolkit.get_action('harvest_jobs_run')(
context, data_dict)
assert not mock_error_log.called
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source.id})
assert status['last_job']['status'] == 'Running'
assert status['last_job']['stats']['errored'] == 0
class TestHarvestObject(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_create(self):
job = factories.HarvestJobObj()
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': job.id,
'extras': {'a key': 'a value'},
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
# fetch the object from database to check it was created
created_object = harvest_model.HarvestObject.get(harvest_object['id'])
assert created_object.guid == harvest_object['guid'] == data_dict['guid']
def test_create_bad_parameters(self):
source_a = factories.HarvestSourceObj()
job = factories.HarvestJobObj()
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'job_id': job.id,
'source_id': source_a.id,
'extras': 1
}
harvest_object_create = toolkit.get_action('harvest_object_create')
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
data_dict['extras'] = {'test': 1}
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
class TestHarvestErrorMail(FunctionalTestBase):
@classmethod
def setup_class(cls):
super(TestHarvestErrorMail, cls).setup_class()
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
super(TestHarvestErrorMail, cls).teardown_class()
reset_db()
def _create_harvest_source_and_job_if_not_existing(self):
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
context = {
'user': site_user,
'model': model,
'session': model.Session,
'ignore_auth': True,
}
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test-nose',
}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context,
source_dict
)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context,
{'id': source_dict['name']}
)
pass
try:
job = toolkit.get_action('harvest_job_create')(context, {
'source_id': harvest_source['id'], 'run': True})
except HarvestJobExists:
job = toolkit.get_action('harvest_job_show')(context, {
'id': harvest_source['status']['last_job']['id']})
pass
toolkit.get_action('harvest_jobs_run')(context, {})
toolkit.get_action('harvest_source_reindex')(context, {'id': harvest_source['id']})
return context, harvest_source, job
def _create_harvest_source_with_owner_org_and_job_if_not_existing(self):
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
context = {
'user': site_user,
'model': model,
'session': model.Session,
'ignore_auth': True,
}
test_org = ckan_factories.Organization()
test_other_org = ckan_factories.Organization()
org_admin_user = ckan_factories.User()
org_member_user = ckan_factories.User()
other_org_admin_user = ckan_factories.User()
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_org['id'],
'username': org_admin_user['name'],
'role': 'admin'
}
)
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_org['id'],
'username': org_member_user['name'],
'role': 'member'
}
)
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_other_org['id'],
'username': other_org_admin_user['name'],
'role': 'admin'
}
)
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test-nose',
'owner_org': test_org['id'],
'run': True
}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context.copy(),
source_dict
)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context.copy(),
{'id': source_dict['name']}
)
pass
try:
job = toolkit.get_action('harvest_job_create')(context.copy(), {
'source_id': harvest_source['id'], 'run': True})
except HarvestJobExists:
job = toolkit.get_action('harvest_job_show')(context.copy(), {
'id': harvest_source['status']['last_job']['id']})
pass
toolkit.get_action('harvest_jobs_run')(context.copy(), {})
toolkit.get_action('harvest_source_reindex')(context.copy(), {'id': harvest_source['id']})
return context, harvest_source, job
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_not_sent(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_and_job_if_not_existing()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(0, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.not_called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_and_job_if_not_existing()
# create a HarvestGatherError
job_model = HarvestJob.get(job['id'])
msg = 'System error - No harvester could be found for source type %s' % job_model.source.type
err = HarvestGatherError(message=msg, job=job_model)
err.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent_with_object_error(self, mock_mailer_mail_recipient):
context, harvest_source, harvest_job = self._create_harvest_source_and_job_if_not_existing()
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job['id'],
'extras': {'a key': 'a value'},
'source_id': harvest_source['id']
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
harvest_object_model = HarvestObject.get(harvest_object['id'])
# create a HarvestObjectError
msg = 'HarvestObjectError occured: %s' % harvest_job['id']
harvest_object_error = HarvestObjectError(message=msg, object=harvest_object_model)
harvest_object_error.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent_with_org(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_with_owner_org_and_job_if_not_existing()
# create a HarvestGatherError
job_model = HarvestJob.get(job['id'])
msg = 'System error - No harvester could be found for source type %s' % job_model.source.type
err = HarvestGatherError(message=msg, job=job_model)
err.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
assert_equal(2, mock_mailer_mail_recipient.call_count)
# Skip for now as the Harvest DB log doesn't work on CKAN 2.9
class XXTestHarvestDBLog(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def xxtest_harvest_db_logger(self):
# Create source and check if harvest_log table is populated
data_dict = SOURCE_DICT.copy()
data_dict['source_type'] = 'test-nose'
source = factories.HarvestSourceObj(**data_dict)
content = 'Harvest source created: %s' % source.id
log = harvest_model.Session.query(harvest_model.HarvestLog).\
filter(harvest_model.HarvestLog.content == content).first()
self.assertIsNotNone(log)
self.assertEqual(log.level, 'INFO')
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data = toolkit.get_action('harvest_log_list')(context, {})
self.assertTrue(len(data) > 0)
self.assertIn('level', data[0])
self.assertIn('content', data[0])
self.assertIn('created', data[0])
self.assertTrue(data[0]['created'] > data[1]['created'])
per_page = 1
data = toolkit.get_action('harvest_log_list')(context, {'level': 'info', 'per_page': per_page})
self.assertEqual(len(data), per_page)
self.assertEqual(data[0]['level'], 'INFO')

View File

@ -1,225 +0,0 @@
import logging
from nose.plugins.skip import SkipTest
from ckan import model
from ckan.model import Session
from ckan.lib.base import config
# TODO: remove references to old tests
try:
from ckan.tests import CreateTestData
except ImportError:
from ckan.tests.legacy import CreateTestData
try:
from ckan.tests.functional.base import FunctionalTestCase
except ImportError:
from ckan.tests.legacy.functional.base import FunctionalTestCase
from ckanext.harvest.model import HarvestSource, HarvestJob, setup as harvest_model_setup
log = logging.getLogger(__name__)
class HarvestAuthBaseCase():
@classmethod
def setup_class(cls):
raise SkipTest()
harvest_model_setup()
@classmethod
def teardown_class(cls):
pass
def _test_auth_not_allowed(self, user_name=None, source=None, status=401):
if not source:
# Create harvest source
source = HarvestSource(url=u'http://test-source.com', type='ckan')
Session.add(source)
Session.commit()
if user_name:
extra_environ = {'REMOTE_USER': user_name.encode('utf8')}
else:
extra_environ = {}
# List
self.app.get('/harvest', status=status, extra_environ=extra_environ)
# Create
self.app.get('/harvest/new', status=status, extra_environ=extra_environ)
# Read
self.app.get('/harvest/%s' % source.id, status=status, extra_environ=extra_environ)
# Edit
self.app.get('/harvest/edit/%s' % source.id, status=status, extra_environ=extra_environ)
# Refresh
self.app.get('/harvest/refresh/%s' % source.id, status=status, extra_environ=extra_environ)
def _test_auth_allowed(self, user_name, auth_profile=None):
extra_environ = {'REMOTE_USER': user_name.encode('utf8')}
# List
res = self.app.get('/harvest', extra_environ=extra_environ)
assert 'Harvesting Sources' in res
# Create
res = self.app.get('/harvest/new', extra_environ=extra_environ)
assert 'New harvest source' in res
if auth_profile == 'publisher':
assert 'publisher_id' in res
else:
assert 'publisher_id' not in res
fv = res.forms['source-new']
fv['url'] = u'http://test-source.com'
fv['type'] = u'ckan'
fv['title'] = u'Test harvest source'
fv['description'] = u'Test harvest source'
fv['config'] = u'{"a":1,"b":2}'
if auth_profile == 'publisher':
fv['publisher_id'] = self.publisher1.id
res = fv.submit('save', extra_environ=extra_environ)
assert 'Error' not in res, res
source = Session.query(HarvestSource).first()
assert source.url == u'http://test-source.com'
assert source.type == u'ckan'
# Read
res = self.app.get('/harvest/%s' % source.id, extra_environ=extra_environ)
assert 'Harvest Source Details' in res
assert source.id in res
assert source.title in res
# Edit
res = self.app.get('/harvest/edit/%s' % source.id, extra_environ=extra_environ)
assert 'Edit harvest source' in res
if auth_profile == 'publisher':
assert 'publisher_id' in res
else:
assert 'publisher_id' not in res
fv = res.forms['source-new']
fv['title'] = u'Test harvest source Updated'
res = fv.submit('save', extra_environ=extra_environ)
assert 'Error' not in res, res
source = Session.query(HarvestSource).first()
assert source.title == u'Test harvest source Updated'
# Refresh
res = self.app.get('/harvest/refresh/%s' % source.id, extra_environ=extra_environ)
job = Session.query(HarvestJob).first()
assert job.source_id == source.id
class TestAuthDefaultProfile(FunctionalTestCase, HarvestAuthBaseCase):
@classmethod
def setup_class(cls):
if (config.get('ckan.harvest.auth.profile', '') != ''):
raise SkipTest('Skipping default auth profile tests. Set ckan.harvest.auth.profile = \'\' to run them')
super(TestAuthDefaultProfile, cls).setup_class()
def setup(self):
CreateTestData.create()
self.sysadmin_user = model.User.get('testsysadmin')
self.normal_user = model.User.get('annafan')
def teardown(self):
model.repo.rebuild_db()
def test_auth_default_profile_sysadmin(self):
self._test_auth_allowed(self.sysadmin_user.name)
def test_auth_default_profile_normal(self):
self._test_auth_not_allowed(self.normal_user.name)
def test_auth_default_profile_notloggedin(self):
self._test_auth_not_allowed(status=302)
class TestAuthPublisherProfile(FunctionalTestCase, HarvestAuthBaseCase):
@classmethod
def setup_class(cls):
if (config.get('ckan.harvest.auth.profile') != 'publisher'):
raise SkipTest('Skipping publisher auth profile tests. Set ckan.harvest.auth.profile = \'publisher\' to run them')
super(TestAuthPublisherProfile, cls).setup_class()
def setup(self):
model.Session.remove()
CreateTestData.create(auth_profile='publisher')
self.sysadmin_user = model.User.get('testsysadmin')
self.normal_user = model.User.get('annafan') # Does not belong to a publisher
self.publisher1_user = model.User.by_name('russianfan')
self.publisher2_user = model.User.by_name('tester')
# Create two Publishers
model.repo.new_revision()
self.publisher1 = model.Group(name=u'test-publisher1', title=u'Test Publihser 1', type=u'publisher')
Session.add(self.publisher1)
self.publisher2 = model.Group(name=u'test-publisher2', title=u'Test Publihser 2', type=u'publisher')
Session.add(self.publisher2)
member1 = model.Member(table_name='user',
table_id=self.publisher1_user.id,
group=self.publisher1,
capacity='admin')
Session.add(member1)
member2 = model.Member(table_name='user',
table_id=self.publisher2_user.id,
group=self.publisher2,
capacity='admin')
Session.add(member2)
Session.commit()
def teardown(self):
model.repo.rebuild_db()
def test_auth_publisher_profile_normal(self):
self._test_auth_not_allowed(self.normal_user.name)
def test_auth_publisher_profile_notloggedin(self):
self._test_auth_not_allowed(status=302)
def test_auth_publisher_profile_sysadmin(self):
self._test_auth_allowed(self.sysadmin_user.name, auth_profile='publisher')
def test_auth_publisher_profile_publisher(self):
self._test_auth_allowed(self.publisher1_user.name, auth_profile='publisher')
def test_auth_publisher_profile_different_publisher(self):
# Create a source for publisher 1
source = HarvestSource(url=u'http://test-source.com', type='ckan',
publisher_id=self.publisher1.id)
Session.add(source)
Session.commit()
extra_environ = {'REMOTE_USER': self.publisher2_user.name.encode('utf8')}
# List (Publihsers can see the sources list)
res = self.app.get('/harvest', extra_environ=extra_environ)
assert 'Harvesting Sources' in res
# Create
res = self.app.get('/harvest/new', extra_environ=extra_environ)
assert 'New harvest source' in res
assert 'publisher_id' in res
# Check that this publihser is not allowed to manage sources from other publishers
status = 401
# Read
res = self.app.get('/harvest/%s' % source.id, status=status, extra_environ=extra_environ)
# Edit
res = self.app.get('/harvest/edit/%s' % source.id, status=status, extra_environ=extra_environ)
# Refresh
res = self.app.get('/harvest/refresh/%s' % source.id, status=status, extra_environ=extra_environ)

View File

@ -1,154 +0,0 @@
from ckan.lib.helpers import url_for
from ckantoolkit.tests import helpers, factories
from ckanext.harvest.tests.nose import factories as harvest_factories
from nose.tools import assert_in
import ckanext.harvest.model as harvest_model
from ckan.plugins import toolkit
from ckan import model
class TestController(helpers.FunctionalTestBase):
@classmethod
def setup_class(cls):
helpers.reset_db()
super(TestController, cls).setup_class()
harvest_model.setup()
sysadmin = factories.Sysadmin()
cls.extra_environ = {'REMOTE_USER': sysadmin['name'].encode('ascii')}
@classmethod
def teardown_class(cls):
super(TestController, cls).teardown_class()
helpers.reset_db()
def setup(self):
super(TestController, self).setup()
sysadmin = factories.Sysadmin()
self.extra_environ = {'REMOTE_USER': sysadmin['name'].encode('ascii')}
def test_index_page_is_rendered(self):
source1 = harvest_factories.HarvestSource()
source2 = harvest_factories.HarvestSource()
app = self._get_test_app()
response = app.get(u'/harvest')
assert_in(source1['title'], response.unicode_body)
assert_in(source2['title'], response.unicode_body)
def test_new_form_is_rendered(self):
app = self._get_test_app()
url = url_for('harvest_new')
response = app.get(url, extra_environ=self.extra_environ)
assert_in('<form id="source-new"', response.unicode_body)
def test_edit_form_is_rendered(self):
source = harvest_factories.HarvestSource()
app = self._get_test_app()
url = url_for('harvest_edit', id=source['id'])
response = app.get(url, extra_environ=self.extra_environ)
assert_in('<form id="source-new"', response.unicode_body)
def test_source_page_rendered(self):
source = harvest_factories.HarvestSource()
app = self._get_test_app()
url = url_for('harvest_read', id=source['name'])
response = app.get(url, extra_environ=self.extra_environ)
assert_in(source['name'], response.unicode_body)
def test_admin_page_rendered(self):
source_obj = harvest_factories.HarvestSourceObj()
job = harvest_factories.HarvestJob(source=source_obj)
app = self._get_test_app()
url = url_for('harvest_admin', id=source_obj.id)
response = app.get(url, extra_environ=self.extra_environ)
assert_in(source_obj.title, response.unicode_body)
assert_in(job['id'], response.unicode_body)
def test_about_page_rendered(self):
source = harvest_factories.HarvestSource()
app = self._get_test_app()
url = url_for('harvest_about', id=source['name'])
response = app.get(url, extra_environ=self.extra_environ)
assert_in(source['name'], response.unicode_body)
def test_job_page_rendered(self):
job = harvest_factories.HarvestJob()
app = self._get_test_app()
url = url_for('harvest_job_list', source=job['source_id'])
response = app.get(url, extra_environ=self.extra_environ)
assert_in(job['id'], response.unicode_body)
def test_job_show_last_page_rendered(self):
job = harvest_factories.HarvestJob()
app = self._get_test_app()
url = url_for('harvest_job_show_last', source=job['source_id'])
response = app.get(url, extra_environ=self.extra_environ)
assert_in(job['id'], response.unicode_body)
def test_job_show_page_rendered(self):
job = harvest_factories.HarvestJob()
app = self._get_test_app()
url = url_for(
'harvest_job_show', source=job['source_id'], id=job['id'])
response = app.get(url, extra_environ=self.extra_environ)
assert_in(job['id'], response.unicode_body)
def test_job_show_object(self):
source_obj = harvest_factories.HarvestSourceObj()
job = harvest_factories.HarvestJob(source=source_obj)
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'guid': 'guid',
'content': 'test content',
'job_id': job['id'],
'source_id': source_obj.id,
'extras': {'a key': 'a value'},
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
app = self._get_test_app()
url = url_for('harvest_object_show', id=harvest_object['id'])
response = app.get(url)
assert_in(data_dict['content'], response.unicode_body)

View File

@ -1,463 +0,0 @@
from mock import patch
from ckantoolkit.tests.helpers import reset_db
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra, HarvestJob
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.queue as queue
from ckan.plugins.core import SingletonPlugin, implements
import json
import ckan.logic as logic
from ckan import model
from nose.tools import assert_equal, ok_
from ckan.lib.base import config
from nose.plugins.skip import SkipTest
import uuid
class MockHarvester(SingletonPlugin):
implements(IHarvester)
def info(self):
return {'name': 'test-nose', 'title': 'test', 'description': 'test'}
def gather_stage(self, harvest_job):
if harvest_job.source.url.startswith('basic_test'):
obj = HarvestObject(guid='test1', job=harvest_job)
obj.extras.append(HarvestObjectExtra(key='key', value='value'))
obj2 = HarvestObject(guid='test2', job=harvest_job)
obj3 = HarvestObject(guid='test_to_delete', job=harvest_job)
obj.add()
obj2.add()
obj3.save() # this will commit both
return [obj.id, obj2.id, obj3.id]
return []
def fetch_stage(self, harvest_object):
assert_equal(harvest_object.state, "FETCH")
assert harvest_object.fetch_started is not None
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
def import_stage(self, harvest_object):
assert_equal(harvest_object.state, "IMPORT")
assert harvest_object.fetch_finished is not None
assert harvest_object.import_started is not None
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
package = json.loads(harvest_object.content)
name = package['name']
package_object = model.Package.get(name)
if package_object:
logic_function = 'package_update'
else:
logic_function = 'package_create'
package_dict = logic.get_action(logic_function)(
{'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True},
json.loads(harvest_object.content)
)
# set previous objects to not current
previous_object = model.Session.query(HarvestObject) \
.filter(HarvestObject.guid == harvest_object.guid) \
.filter(
HarvestObject.current == True # noqa: E712
).first()
if previous_object:
previous_object.current = False
previous_object.save()
# delete test_to_delete package on second run
harvest_object.package_id = package_dict['id']
harvest_object.current = True
if package_dict['name'] == 'test_to_delete' and package_object:
harvest_object.current = False
package_object.state = 'deleted'
package_object.save()
harvest_object.save()
return True
class TestHarvestQueue(object):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_01_basic_harvester(self):
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test-nose',
}
harvest_source = logic.get_action('harvest_source_create')(
context,
source_dict
)
assert harvest_source['source_type'] == 'test-nose', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == 3
all_objects = model.Session.query(HarvestObject).filter_by(current=True).all()
assert_equal(len(all_objects), 3)
assert_equal(all_objects[0].state, 'COMPLETE')
assert_equal(all_objects[0].report_status, 'added')
assert_equal(all_objects[1].state, 'COMPLETE')
assert_equal(all_objects[1].report_status, 'added')
assert_equal(all_objects[2].state, 'COMPLETE')
assert_equal(all_objects[2].report_status, 'added')
# fire run again to check if job is set to Finished
logic.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert_equal(harvest_job['status'], u'Finished')
assert_equal(harvest_job['stats'], {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0})
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 3, 'updated': 0,
'not modified': 0, 'errored': 0, 'deleted': 0})
assert_equal(harvest_source_dict['status']['total_datasets'], 3)
assert_equal(harvest_source_dict['status']['job_count'], 1)
# Second run
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 6
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert_equal(count, 3)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='added').all()
assert_equal(len(all_objects), 3)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='updated').all()
assert_equal(len(all_objects), 2)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='deleted').all()
assert_equal(len(all_objects), 1)
# run to make sure job is marked as finshed
logic.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert_equal(harvest_job['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1})
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2,
'not modified': 0, 'errored': 0, 'deleted': 1})
assert_equal(harvest_source_dict['status']['total_datasets'], 2)
assert_equal(harvest_source_dict['status']['job_count'], 2)
def test_fetch_doesnt_process_remaining_objects_if_job_status_finished(self):
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
source_dict = {
'title': 'Test Job Finished',
'name': 'test-job-finished',
'url': 'basic_test_1',
'source_type': 'test-nose',
}
harvest_source = logic.get_action('harvest_source_create')(
context,
source_dict
)
assert harvest_source['source_type'] == 'test-nose', harvest_source
assert harvest_source['url'] == 'basic_test_1', harvest_source
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).filter(
HarvestObject.harvest_job_id == harvest_job['id']
).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
# artificially set the job to finished to simulate a job abort or timeout
job_obj = HarvestJob.get(harvest_job['id'])
job_obj.status = 'Finished'
job_obj.save()
original_dataset_count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
all_objects = model.Session.query(HarvestObject).filter(
HarvestObject.harvest_job_id == harvest_job['id']
).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'ERROR'
assert all_objects[1].state == 'ERROR'
assert all_objects[2].state == 'ERROR'
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == original_dataset_count
# fire run again to check if job is set to Finished
logic.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert_equal(harvest_job['status'], u'Finished')
assert_equal(harvest_job['stats'], {'added': 0, 'updated': 0, 'not modified': 0, 'errored': 3, 'deleted': 0})
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 0,
'not modified': 0, 'errored': 3, 'deleted': 0})
assert_equal(harvest_source_dict['status']['total_datasets'], 0)
assert_equal(harvest_source_dict['status']['job_count'], 1)
def test_redis_queue_purging(self):
'''
Test that Redis queue purging doesn't purge the wrong keys.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
raise SkipTest()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key', 'foobar')
# Create some fake jobs
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
num_keys = redis.dbsize()
# Create some fake objects
gather_consumer = queue.get_gather_consumer()
next(gather_consumer.consume(queue.get_gather_queue_name()))
fetch_consumer = queue.get_fetch_consumer()
next(fetch_consumer.consume(queue.get_fetch_queue_name()))
ok_(redis.dbsize() > num_keys)
queue.purge_queues()
assert_equal(redis.get('ckanext-harvest:some-random-key'),
'foobar')
assert_equal(redis.dbsize(), num_keys)
assert_equal(redis.llen(queue.get_gather_routing_key()), 0)
assert_equal(redis.llen(queue.get_fetch_routing_key()), 0)
finally:
redis.delete('ckanext-harvest:some-random-key')
class TestHarvestCorruptRedis(object):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
@patch('ckanext.harvest.queue.log.error')
def test_redis_corrupt(self, mock_log_error):
'''
Test that corrupt Redis doesn't stop harvest process and still processes other jobs.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
raise SkipTest()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key-2', 'foobar')
# make sure queues/exchanges are created first and are empty
gather_consumer = queue.get_gather_consumer()
fetch_consumer = queue.get_fetch_consumer()
gather_consumer.queue_purge(queue=queue.get_gather_queue_name())
fetch_consumer.queue_purge(queue=queue.get_fetch_queue_name())
# Create some fake jobs and objects with no harvest_job_id
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': None})
h_obj_id = str(uuid.uuid4())
fetch_publisher.send({'harvest_object_id': h_obj_id})
# Create some fake objects
next(gather_consumer.consume(queue.get_gather_queue_name()))
_, _, body = next(fetch_consumer.consume(queue.get_fetch_queue_name()))
json_obj = json.loads(body)
assert json_obj['harvest_object_id'] == h_obj_id
assert mock_log_error.call_count == 1
args, _ = mock_log_error.call_args_list[0]
assert "cannot concatenate 'str' and 'NoneType' objects" in args[1]
finally:
redis.delete('ckanext-harvest:some-random-key-2')

View File

@ -1,180 +0,0 @@
'''Tests elements of queue.py, but doesn't use the queue subsystem
(redis/rabbitmq)
'''
import json
from nose.tools import assert_equal
from ckantoolkit.tests.helpers import reset_db
from ckan import model
from ckan import plugins as p
from ckan.plugins import toolkit
from ckanext.harvest.tests.factories import (HarvestObjectObj)
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
from ckanext.harvest.tests.lib import run_harvest
class MockHarvester(p.SingletonPlugin):
p.implements(IHarvester)
@classmethod
def _set_test_params(cls, guid, **test_params):
cls._guid = guid
cls._test_params = test_params
def info(self):
return {'name': 'test2-nose', 'title': 'test', 'description': 'test'}
def gather_stage(self, harvest_job):
obj = HarvestObjectObj(guid=self._guid, job=harvest_job)
return [obj.id]
def fetch_stage(self, harvest_object):
if self._test_params.get('fetch_object_unchanged'):
return 'unchanged'
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
def import_stage(self, harvest_object):
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
package = json.loads(harvest_object.content)
name = package['name']
package_object = model.Package.get(name)
if package_object:
logic_function = 'package_update'
else:
logic_function = 'package_create'
package_dict = toolkit.get_action(logic_function)(
{'model': model, 'session': model.Session,
'user': user},
json.loads(harvest_object.content)
)
if self._test_params.get('object_error'):
return False
# successful, so move 'current' to this object
previous_object = model.Session.query(harvest_model.HarvestObject) \
.filter_by(guid=harvest_object.guid) \
.filter_by(current=True) \
.first()
if previous_object:
previous_object.current = False
previous_object.save()
harvest_object.package_id = package_dict['id']
harvest_object.current = True
if self._test_params.get('delete'):
# 'current=False' is the key step in getting report_status to be
# set as 'deleted'
harvest_object.current = False
package_object.save()
harvest_object.save()
if self._test_params.get('import_object_unchanged'):
return 'unchanged'
return True
class TestEndStates(object):
def setup(self):
reset_db()
harvest_model.setup()
def test_create_dataset(self):
guid = 'obj-create'
MockHarvester._set_test_params(guid=guid)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['errors'], [])
def test_update_dataset(self):
guid = 'obj-update'
MockHarvester._set_test_params(guid=guid)
# create the original harvest_object and dataset
run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
# update it
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'updated')
assert_equal(result['errors'], [])
def test_delete_dataset(self):
guid = 'obj-delete'
MockHarvester._set_test_params(guid=guid)
# create the original harvest_object and dataset
run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
MockHarvester._set_test_params(guid=guid, delete=True)
# delete it
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'deleted')
assert_equal(result['errors'], [])
def test_obj_error(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, object_error=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'ERROR')
assert_equal(result['report_status'], 'errored')
assert_equal(result['errors'], [])
def test_fetch_unchanged(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, fetch_object_unchanged=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert_equal(result['errors'], [])
def test_import_unchanged(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, import_object_unchanged=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert_equal(result['errors'], [])

View File

@ -377,10 +377,7 @@ def run_test_harvester(source_id_or_name, force_import):
print('\nSource "{0}" apparently has a "Running" job:\n{1}'.format(
source.get("name") or source["id"], running_jobs))
if six.PY2:
resp = raw_input("Abort it? (y/n)")
else:
resp = input("Abort it? (y/n)")
resp = six.moves.input("Abort it? (y/n)")
if not resp.lower().startswith("y"):
sys.exit(1)
job_dict = tk.get_action("harvest_job_abort")(

View File

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
pytest_plugins = [
u'ckan.tests.pytest_ckan.ckan_setup',
u'ckan.tests.pytest_ckan.fixtures',
u'ckanext.harvest.tests.fixtures',
]

View File

@ -1,2 +1,4 @@
pytest-ckan
pytest-cov
factory-boy>=2
mock

View File

@ -39,11 +39,6 @@ setup(
test_harvester2=ckanext.harvest.tests.test_queue2:MockHarvester
test_action_harvester=ckanext.harvest.tests.test_action:MockHarvesterForActionTests
test_nose_harvester=ckanext.harvest.tests.nose.test_queue:MockHarvester
test_nose_harvester2=ckanext.harvest.tests.nose.test_queue2:MockHarvester
test_nose_action_harvester=ckanext.harvest.tests.nose.test_action:MockHarvesterForActionTests
[paste.paster_command]
harvester = ckanext.harvest.commands.harvester:Harvester
[babel.extractors]

View File

@ -1,71 +0,0 @@
[DEFAULT]
debug = false
# Uncomment and replace with the address which should receive any error reports
#email_to = you@yourdomain.com
smtp_server = localhost
error_email_from = paste@localhost
[server:main]
use = egg:Paste#http
host = 0.0.0.0
port = 5000
[app:main]
use = config:../ckan/test-core.ini
# Here we hard-code the database and a flag to make default tests
# run fast.
ckan.plugins = harvest ckan_harvester test_nose_harvester test_nose_harvester2 test_nose_action_harvester
ckan.harvest.mq.type = redis
ckan.harvest.timeout = 5
ckan.legacy_templates = false
# NB: other test configuration should go in test-core.ini, which is
# what the postgres tests use.
# Logging configuration
[loggers]
keys = root, ckan, sqlalchemy
[handlers]
keys = console, dblog
[formatters]
keys = generic, dblog
[logger_root]
level = WARN
handlers = console
[logger_ckan]
qualname = ckan
handlers =
level = INFO
[logger_ckan_harvester]
qualname = ckanext.harvest
handlers = dblog
level = DEBUG
[logger_sqlalchemy]
handlers =
qualname = sqlalchemy.engine
level = WARN
[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic
[handler_dblog]
class = ckanext.harvest.log.DBLogHandler
args = ()
level = DEBUG
formatter = dblog
[formatter_dblog]
format = %(message)s
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s

View File

@ -17,7 +17,6 @@ use = config:../ckan/test-core.ini
# run fast.
ckan.plugins = harvest ckan_harvester test_harvester test_harvester2 test_action_harvester
ckan.harvest.mq.type = redis
ckan.harvest.timeout = 5
ckan.legacy_templates = false
# NB: other test configuration should go in test-core.ini, which is
# what the postgres tests use.