Start splitting tests, fix nose ones

This commit is contained in:
amercader 2020-02-08 01:18:20 +01:00
parent 4c2af58307
commit b670aa64a8
20 changed files with 2144 additions and 1030 deletions

View File

@ -0,0 +1,14 @@
import pytest
import ckanext.harvest.model as harvest_model
from ckanext.harvest import queue
@pytest.fixture
def harvest_setup():
harvest_model.setup()
@pytest.fixture
def clean_queues():
queue.purge_queues()

View File

@ -0,0 +1,94 @@
import factory
import ckanext.harvest.model as harvest_model
from ckantoolkit.tests.factories import _get_action_user_name
from ckan.plugins import toolkit
class HarvestSource(factory.Factory):
FACTORY_FOR = harvest_model.HarvestSource
_return_type = 'dict'
name = factory.Sequence(lambda n: 'test_source_{n}'.format(n=n))
title = factory.Sequence(lambda n: 'test title {n}'.format(n=n))
url = factory.Sequence(lambda n: 'http://{n}.test.com'.format(n=n))
source_type = 'test-nose' # defined in test_queue.py
id = '{0}_id'.format(name).lower()
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
# If there is an existing source for this URL, and we can't create
# another source with that URL, just return the original one.
try:
source_dict = toolkit.get_action('harvest_source_show')(
context, dict(url=kwargs['url']))
except toolkit.ObjectNotFound:
source_dict = toolkit.get_action('harvest_source_create')(
context, kwargs)
if cls._return_type == 'dict':
return source_dict
else:
return cls.FACTORY_FOR.get(source_dict['id'])
class HarvestSourceObj(HarvestSource):
_return_type = 'obj'
class HarvestJob(factory.Factory):
FACTORY_FOR = harvest_model.HarvestJob
_return_type = 'dict'
source = factory.SubFactory(HarvestSourceObj)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
if 'source_id' not in kwargs:
kwargs['source_id'] = kwargs['source'].id
if 'run' not in kwargs:
kwargs['run'] = False
job_dict = toolkit.get_action('harvest_job_create')(
context, kwargs)
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
class HarvestJobObj(HarvestJob):
_return_type = 'obj'
class HarvestObject(factory.Factory):
FACTORY_FOR = harvest_model.HarvestObject
_return_type = 'dict'
# source = factory.SubFactory(HarvestSourceObj)
job = factory.SubFactory(HarvestJobObj)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
if 'job_id' not in kwargs:
kwargs['job_id'] = kwargs['job'].id
kwargs['source_id'] = kwargs['job'].source.id
# Remove 'job' to avoid it getting added as a HarvestObjectExtra
if 'job' in kwargs:
kwargs.pop('job')
job_dict = toolkit.get_action('harvest_object_create')(
context, kwargs)
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
class HarvestObjectObj(HarvestObject):
_return_type = 'obj'

View File

@ -0,0 +1,543 @@
from __future__ import print_function
import json
import re
import copy
import urllib
import SimpleHTTPServer
import SocketServer
from threading import Thread
PORT = 8998
class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def do_GET(self):
# test name is the first bit of the URL and makes CKAN behave
# differently in some way.
# Its value is recorded and then removed from the path
self.test_name = None
test_name_match = re.match('^/([^/]+)/', self.path)
if test_name_match:
self.test_name = test_name_match.groups()[0]
if self.test_name == 'api':
self.test_name = None
else:
self.path = re.sub('^/([^/]+)/', '/', self.path)
if self.test_name == 'site_down':
return self.respond('Site is down', status=500)
# The API version is recorded and then removed from the path
api_version = None
version_match = re.match(r'^/api/(\d)', self.path)
if version_match:
api_version = int(version_match.groups()[0])
self.path = re.sub(r'^/api/(\d)/', '/api/', self.path)
if self.path == '/api/rest/package':
if api_version == 2:
dataset_refs = [d['id'] for d in DATASETS]
else:
dataset_refs = [d['name'] for d in DATASETS]
return self.respond_json(dataset_refs)
if self.path == '/api/action/package_list':
dataset_names = [d['name'] for d in DATASETS]
return self.respond_action(dataset_names)
if self.path.startswith('/api/rest/package/'):
dataset_ref = self.path.split('/')[-1]
dataset = self.get_dataset(dataset_ref)
if dataset:
return self.respond_json(
convert_dataset_to_restful_form(dataset))
if self.path.startswith('/api/action/package_show'):
params = self.get_url_params()
dataset_ref = params['id']
dataset = self.get_dataset(dataset_ref)
if dataset:
return self.respond_action(dataset)
if self.path.startswith('/api/action/group_show'):
params = self.get_url_params()
group_ref = params['id']
group = self.get_group(group_ref)
if group:
return self.respond_action(group)
if self.path.startswith('/api/search/dataset'):
params = self.get_url_params()
if params.keys() == ['organization']:
org = self.get_org(params['organization'])
dataset_ids = [d['id'] for d in DATASETS
if d['owner_org'] == org['id']]
return self.respond_json({'count': len(dataset_ids),
'results': dataset_ids})
else:
return self.respond(
'Not implemented search params %s' % params, status=400)
if self.path.startswith('/api/search/revision'):
revision_ids = [r['id'] for r in REVISIONS]
return self.respond_json(revision_ids)
if self.path.startswith('/api/rest/revision/'):
revision_ref = self.path.split('/')[-1]
assert api_version == 2
for rev in REVISIONS:
if rev['id'] == revision_ref:
return self.respond_json(rev)
self.respond('Cannot find revision', status=404)
# /api/3/action/package_search?fq=metadata_modified:[2015-10-23T14:51:13.282361Z TO *]&rows=1000
if self.path.startswith('/api/action/package_search'):
params = self.get_url_params()
if self.test_name == 'datasets_added':
if params['start'] == '0':
# when page 1 is retrieved, the site only has 1 dataset
datasets = [DATASETS[0]['name']]
elif params['start'] == '100':
# when page 2 is retrieved, the site now has new datasets,
# and so the second page has the original dataset, pushed
# onto this page now, plus a new one
datasets = [DATASETS[0]['name'],
DATASETS[1]['name']]
else:
datasets = []
else:
# ignore sort param for now
if 'sort' in params:
del params['sort']
if params['start'] != '0':
datasets = []
elif set(params.keys()) == set(['rows', 'start']):
datasets = ['dataset1', DATASETS[1]['name']]
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
params['fq'] == '-organization:org1':
datasets = [DATASETS[1]['name']]
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
params['fq'] == 'organization:org1':
datasets = ['dataset1']
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
params['fq'] == '-groups:group1':
datasets = [DATASETS[1]['name']]
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
params['fq'] == 'groups:group1':
datasets = ['dataset1']
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
'metadata_modified' in params['fq']:
assert '+TO+' not in params['fq'], \
'Spaces should not be decoded by now - seeing + '\
'means they were double encoded and SOLR doesnt like '\
'that'
datasets = [DATASETS[1]['name']]
else:
return self.respond(
'Not implemented search params %s' % params,
status=400)
out = {'count': len(datasets),
'results': [self.get_dataset(dataset_ref_)
for dataset_ref_ in datasets]}
return self.respond_action(out)
# if we wanted to server a file from disk, then we'd call this:
# return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
self.respond('Mock CKAN doesnt recognize that call', status=400)
def get_dataset(self, dataset_ref):
for dataset in DATASETS:
if dataset['name'] == dataset_ref or \
dataset['id'] == dataset_ref:
if self.test_name == 'invalid_tag':
dataset['tags'] = INVALID_TAGS
return dataset
def get_group(self, group_ref):
for group in GROUPS:
if group['name'] == group_ref or \
group['id'] == group_ref:
return group
def get_org(self, org_ref):
for org in ORGS:
if org['name'] == org_ref or \
org['id'] == org_ref:
return org
def get_url_params(self):
params_str = self.path.split('?')[-1]
params_unicode = urllib.unquote_plus(params_str).decode('utf8')
params = params_unicode.split('&')
return dict([param.split('=') for param in params])
def respond_action(self, result_dict, status=200):
response_dict = {'result': result_dict, 'success': True}
return self.respond_json(response_dict, status=status)
def respond_json(self, content_dict, status=200):
return self.respond(json.dumps(content_dict), status=status,
content_type='application/json')
def respond(self, content, status=200, content_type='application/json'):
self.send_response(status)
self.send_header('Content-Type', content_type)
self.end_headers()
self.wfile.write(content)
self.wfile.close()
def serve(port=PORT):
'''Runs a CKAN-alike app (over HTTP) that is used for harvesting tests'''
# Choose the directory to serve files from
# os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)),
# 'mock_ckan_files'))
class TestServer(SocketServer.TCPServer):
allow_reuse_address = True
httpd = TestServer(("", PORT), MockCkanHandler)
print('Serving test HTTP server at port {}'.format(PORT))
httpd_thread = Thread(target=httpd.serve_forever)
httpd_thread.setDaemon(True)
httpd_thread.start()
def convert_dataset_to_restful_form(dataset):
dataset = copy.deepcopy(dataset)
dataset['extras'] = dict([(e['key'], e['value']) for e in dataset['extras']])
dataset['tags'] = [t['name'] for t in dataset.get('tags', [])]
return dataset
# Datasets are in the package_show form, rather than the RESTful form
DATASETS = [
{'id': 'dataset1-id',
'name': 'dataset1',
'title': 'Test Dataset1',
'owner_org': 'org1-id',
'tags': [{'name': 'test-tag'}],
'groups': [{'id': 'group1-id', 'name': 'group1'}],
'extras': []},
{
"id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"name": "cabinet-office-energy-use",
"private": False,
"maintainer_email": None,
"revision_timestamp": "2010-11-23T22:34:55.089925",
"organization":
{
"description": "The Cabinet Office supports the Prime Minister and Deputy Prime Minister,"
" and ensure the effective running of government. We are also the corporate"
" headquarters for government, in partnership with HM Treasury, and we take"
" the lead in certain critical policy areas.\r\nCO is a ministerial department,"
" supported by 18 agencies and public bodies\r\n\r\nYou can find out more at"
" https://www.gov.uk/government/organisations/cabinet-office",
"created": "2012-06-27T14:48:40.244951",
"title": "Cabinet Office",
"name": "cabinet-office",
"revision_timestamp": "2013-04-02T14:27:23.086886",
"is_organization": True,
"state": "active",
"image_url": "",
"revision_id": "4be8825d-d3f4-4fb2-b80b-43e36f574c05",
"type": "organization",
"id": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"approval_status": "pending"
},
"update_frequency": "other",
"metadata_created": "2010-08-02T09:19:47.600853",
"last_major_modification": "2010-08-02T09:19:47.600853",
"metadata_modified": "2014-05-09T22:00:01.486366",
"temporal_granularity": "",
"author_email": None,
"geographic_granularity": "point",
"geographic_coverage": [],
"state": "active",
"version": None,
"temporal_coverage-to": "",
"license_id": "uk-ogl",
"type": "dataset",
"published_via": "",
"resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"num_resources": 1,
"tags":
[
{
"vocabulary_id": None,
"display_name": "consumption",
"name": "consumption",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8"
},
{
"vocabulary_id": None,
"display_name": "energy",
"name": "energy",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "9f2ae723-602f-4290-80c4-6637ad617a45"
}
],
"precision": "",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"taxonomy_url": "",
"groups": [{"id": "remote-group-id", "name": "remote-group"}],
"creator_user_id": None,
"national_statistic": "no",
"relationships_as_subject": [],
"num_tags": 8,
"update_frequency-other": "Real-time",
"isopen": True,
"url": "http://www.carbonculture.net/orgs/cabinet-office/70-whitehall/",
"notes": "Cabinet Office head office energy use updated from on-site meters showing use, cost and carbon impact.",
"owner_org": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"theme-secondary":
[
"Environment"
],
"extras":
[
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "categories",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "6813d71b-785b-4f56-b296-1b2acb34eed6"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "2010-07-30",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_released",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "515f638b-e2cf-40a6-a8a7-cbc8001269e3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_updated",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "bff63465-4f96-44e7-bb87-6e66fff5e596"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "000000: ",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_coverage",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "414bcd35-b628-4218-99e2-639615183df8"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "point",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "c7b460dd-c61f-4cd2-90c2-eceb6c91fe9b"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "no",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "national_statistic",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "9f04b202-3646-49be-b69e-7fa997399ff3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "{\"status\": \"final\", \"source\": \"Automatically awarded by ODI\","
" \"certification_type\": \"automatically awarded\", \"level\": \"raw\","
" \"title\": \"Cabinet Office 70 Whitehall energy use\","
" \"created_at\": \"2014-10-28T12:25:57Z\", \"jurisdiction\": \"GB\","
" \"certificate_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922\","
" \"badge_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922/badge.png\","
" \"cert_title\": \"Basic Level Certificate\"}",
"revision_timestamp": "2014-11-12T02:52:35.048060",
"state": "active",
"key": "odi-certificate",
"revision_id": "eae9763b-e258-4d76-9ec2-7f5baf655394",
"id": "373a3cbb-d9c0-45a6-9a78-b95c86398766"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-from",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "39f72eed-6f76-4733-b636-7541cee3404f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-to",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "818e2c8f-fee0-49da-8bea-ea3c9401ece5"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "f868b950-d3ce-4fbe-88ca-5cbc4b672320"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Towns & Cities",
"revision_timestamp": "2015-03-16T18:10:08.802815",
"state": "active",
"key": "theme-primary",
"revision_id": "fc2b6630-84f8-4c88-8ac7-0ca275b2bc97",
"id": "bdcf00fd-3248-4c2f-9cf8-b90706c88e8d"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "[\"Environment\"]",
"revision_timestamp": "2015-04-08T20:57:04.895214",
"state": "active",
"key": "theme-secondary",
"revision_id": "c2c48530-ff82-4af1-9373-cdc64d5bc83c",
"id": "417482c5-a9c0-4430-8c4e-0c76e59fe44f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Real-time",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "update_frequency",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "e8ad4837-514e-4446-81a2-ffacfa7cf683"
}
],
"license_url": "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/",
"individual_resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"title": "Cabinet Office 70 Whitehall energy use",
"revision_id": "3bd6ced3-35b2-4b20-94e2-c596e24bc375",
"date_released": "30/7/2010",
"theme-primary": "Towns & Cities"
}
]
INVALID_TAGS = [
{
"vocabulary_id": None,
"display_name": "consumption%^&",
"name": "consumption%^&",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8"
},
]
ORGS = [
{'id': 'org1-id',
'name': 'org1'},
{'id': 'aa1e068a-23da-4563-b9c2-2cad272b663e',
'name': 'cabinet-office'}
]
GROUPS = [
{'id': 'group1-id',
'name': 'group1'},
{'id': '9853c3e1-eebb-4e8c-9ae7-1668a01bf2ca',
'name': 'finances'}
]
REVISIONS = [
{
"id": "23daf2eb-d7ec-4d86-a844-3924acd311ea",
"timestamp": "2015-10-21T09:50:08.160045",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
DATASETS[1]['id']
],
"groups": []
},
{
"id": "8254a293-10db-4af2-9dfa-6a1f06ee899c",
"timestamp": "2015-10-21T09:46:21.198021",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
DATASETS[1]['id']
],
"groups": []
}]

View File

@ -12,7 +12,7 @@ from ckan import model
from ckan.plugins import toolkit
from ckanext.harvest.harvesters.ckanharvester import ContentFetchError
from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj,
from ckanext.harvest.tests.nose.factories import (HarvestSourceObj, HarvestJobObj,
HarvestObjectObj)
from ckanext.harvest.tests.lib import run_harvest
import ckanext.harvest.model as harvest_model
@ -113,7 +113,7 @@ class TestCkanHarvester(object):
# change the modified date
datasets = copy.deepcopy(mock_ckan.DATASETS)
datasets[1]['metadata_modified'] = '2050-05-09T22:00:01.486366'
with patch('ckanext.harvest.tests.harvesters.mock_ckan.DATASETS',
with patch('ckanext.harvest.tests.nose.harvesters.mock_ckan.DATASETS',
datasets):
results_by_guid = run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,

View File

@ -0,0 +1,72 @@
import logging
from ckanext.harvest.tests.factories import HarvestSourceObj, HarvestJobObj
import ckanext.harvest.model as harvest_model
from ckanext.harvest import queue
from ckan.plugins import toolkit
log = logging.getLogger(__name__)
def run_harvest(url, harvester, config=''):
'''Runs a harvest and returns the results.
This allows you to test a harvester.
Queues are avoided as they are a pain in tests.
'''
# User creates a harvest source
source = HarvestSourceObj(url=url, config=config,
source_type=harvester.info()['name'])
# User triggers a harvest, which is the creation of a harvest job.
# We set run=False so that it doesn't put it on the gather queue.
job = HarvestJobObj(source=source, run=False)
return run_harvest_job(job, harvester)
def run_harvest_job(job, harvester):
# In 'harvest_job_create' it would call 'harvest_send_job_to_gather_queue'
# which would do 2 things to 'run' the job:
# 1. change the job status to Running
job.status = 'Running'
job.save()
# 2. put the job on the gather queue which is consumed by
# queue.gather_callback, which determines the harvester and then calls
# gather_stage. We simply call the gather_stage.
obj_ids = queue.gather_stage(harvester, job)
if not isinstance(obj_ids, list):
# gather had nothing to do or errored. Carry on to ensure the job is
# closed properly
obj_ids = []
# The object ids are put onto the fetch queue, consumed by
# queue.fetch_callback which calls queue.fetch_and_import_stages
results_by_guid = {}
for obj_id in obj_ids:
harvest_object = harvest_model.HarvestObject.get(obj_id)
guid = harvest_object.guid
# force reimport of datasets
if hasattr(job, 'force_import'):
if guid in job.force_import:
harvest_object.force_import = True
else:
log.info('Skipping: %s', guid)
continue
results_by_guid[guid] = {'obj_id': obj_id}
queue.fetch_and_import_stages(harvester, harvest_object)
results_by_guid[guid]['state'] = harvest_object.state
results_by_guid[guid]['report_status'] = harvest_object.report_status
if harvest_object.state == 'COMPLETE' and harvest_object.package_id:
results_by_guid[guid]['dataset'] = \
toolkit.get_action('package_show')(
{'ignore_auth': True},
dict(id=harvest_object.package_id))
results_by_guid[guid]['errors'] = harvest_object.errors
# Do 'harvest_jobs_run' to change the job status to 'finished'
toolkit.get_action('harvest_jobs_run')({'ignore_auth': True}, {})
return results_by_guid

View File

@ -0,0 +1,773 @@
from __future__ import absolute_import
import json
from . import factories
import unittest
from mock import patch
from nose.tools import assert_equal, assert_raises, assert_in
from nose.plugins.skip import SkipTest
from ckantoolkit.tests import factories as ckan_factories
from ckantoolkit.tests.helpers import _get_test_app, reset_db, FunctionalTestBase
from ckan import plugins as p
from ckan.plugins import toolkit
from ckan import model
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestGatherError, HarvestObjectError, HarvestObject, HarvestJob
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.logic.action.update import send_error_mail
class MockHarvesterForActionTests(p.SingletonPlugin):
p.implements(IHarvester)
def info(self):
return {'name': 'test-for-action-nose',
'title': 'Test for action',
'description': 'test'}
def validate_config(self, config):
if not config:
return config
try:
config_obj = json.loads(config)
if 'custom_option' in config_obj:
if not isinstance(config_obj['custom_option'], list):
raise ValueError('custom_option must be a list')
except ValueError as e:
raise e
return config
def gather_stage(self, harvest_job):
return []
def fetch_stage(self, harvest_object):
return True
def import_stage(self, harvest_object):
return True
SOURCE_DICT = {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action-nose",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
def call_action_api(action, apikey=None, status=200, **kwargs):
'''POST an HTTP request to the CKAN API and return the result.
Any additional keyword arguments that you pass to this function as **kwargs
are posted as params to the API.
Usage:
package_dict = call_action_api('package_create', apikey=apikey,
name='my_package')
assert package_dict['name'] == 'my_package'
num_followers = post(app, 'user_follower_count', id='annafan')
If you are expecting an error from the API and want to check the contents
of the error dict, you have to use the status param otherwise an exception
will be raised:
error_dict = call_action_api('group_activity_list', status=403,
id='invalid_id')
assert error_dict['message'] == 'Access Denied'
:param action: the action to post to, e.g. 'package_create'
:type action: string
:param apikey: the API key to put in the Authorization header of the post
(optional, default: None)
:type apikey: string
:param status: the HTTP status code expected in the response from the CKAN
API, e.g. 403, if a different status code is received an exception will
be raised (optional, default: 200)
:type status: int
:param **kwargs: any other keyword arguments passed to this function will
be posted to the API as params
:raises paste.fixture.AppError: if the HTTP status code of the response
from the CKAN API is different from the status param passed to this
function
:returns: the 'result' or 'error' dictionary from the CKAN API response
:rtype: dictionary
'''
params = json.dumps(kwargs)
app = _get_test_app()
response = app.post('/api/action/{0}'.format(action), params=params,
extra_environ={'Authorization': str(apikey)},
status=status)
if status in (200,):
assert response.json['success'] is True
return response.json['result']
else:
assert response.json['success'] is False
return response.json['error']
class ActionBase(object):
@classmethod
def setup_class(cls):
if not p.plugin_loaded('test_nose_action_harvester'):
p.load('test_nose_action_harvester')
def setup(self):
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
p.unload('test_nose_action_harvester')
class HarvestSourceActionBase(FunctionalTestBase):
@classmethod
def setup_class(cls):
super(HarvestSourceActionBase, cls).setup_class()
harvest_model.setup()
if not p.plugin_loaded('test_nose_action_harvester'):
p.load('test_nose_action_harvester')
@classmethod
def teardown_class(cls):
super(HarvestSourceActionBase, cls).teardown_class()
p.unload('test_nose_action_harvester')
def _get_source_dict(self):
return {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action-nose",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
def test_invalid_missing_values(self):
source_dict = {}
test_data = self._get_source_dict()
if 'id' in test_data:
source_dict['id'] = test_data['id']
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
for key in ('name', 'title', 'url', 'source_type'):
assert_equal(result[key], [u'Missing value'])
def test_invalid_unknown_type(self):
source_dict = self._get_source_dict()
source_dict['source_type'] = 'unknown'
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'source_type' in result
assert u'Unknown harvester type' in result['source_type'][0]
def test_invalid_unknown_frequency(self):
wrong_frequency = 'ANNUALLY'
source_dict = self._get_source_dict()
source_dict['frequency'] = wrong_frequency
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'frequency' in result
assert u'Frequency {0} not recognised'.format(wrong_frequency) in result['frequency'][0]
def test_invalid_wrong_configuration(self):
source_dict = self._get_source_dict()
source_dict['config'] = 'not_json'
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'config' in result
assert u'Error parsing the configuration options: No JSON object could be decoded' in result['config'][0]
source_dict['config'] = json.dumps({'custom_option': 'not_a_list'})
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'config' in result
assert u'Error parsing the configuration options: custom_option must be a list' in result['config'][0]
class TestHarvestSourceActionCreate(HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_create'
def test_create(self):
source_dict = self._get_source_dict()
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_create',
apikey=sysadmin['apikey'], **source_dict)
for key in source_dict.keys():
assert_equal(source_dict[key], result[key])
# Check that source was actually created
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
# Trying to create a source with the same URL fails
source_dict = self._get_source_dict()
source_dict['name'] = 'test-source-action-new'
result = call_action_api('harvest_source_create',
apikey=sysadmin['apikey'], status=409,
**source_dict)
assert 'url' in result
assert u'There already is a Harvest Source for this URL' in result['url'][0]
class HarvestSourceFixtureMixin(object):
def _get_source_dict(self):
'''Not only returns a source_dict, but creates the HarvestSource object
as well - suitable for testing update actions.
'''
source = HarvestSourceActionBase._get_source_dict(self)
source = factories.HarvestSource(**source)
# delete status because it gets in the way of the status supplied to
# call_action_api later on. It is only a generated value, not affecting
# the update/patch anyway.
del source['status']
return source
class TestHarvestSourceActionUpdate(HarvestSourceFixtureMixin,
HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_update'
def test_update(self):
source_dict = self._get_source_dict()
source_dict.update({
"url": "http://test.action.updated.com",
"name": "test-source-action-updated",
"title": "Test source action updated",
"notes": "Test source action desc updated",
"source_type": "test-nose",
"frequency": "MONTHLY",
"config": json.dumps({"custom_option": ["c", "d"]})
})
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_update',
apikey=sysadmin['apikey'], **source_dict)
for key in set(('url', 'name', 'title', 'notes', 'source_type',
'frequency', 'config')):
assert_equal(source_dict[key], result[key], "Key: %s" % key)
# Check that source was actually updated
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
class TestHarvestSourceActionPatch(HarvestSourceFixtureMixin,
HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_patch'
if toolkit.check_ckan_version(max_version='2.2.99'):
# harvest_source_patch only came in with ckan 2.3
raise SkipTest()
def test_invalid_missing_values(self):
pass
def test_patch(self):
source_dict = self._get_source_dict()
patch_dict = {
"id": source_dict['id'],
"name": "test-source-action-patched",
"url": "http://test.action.patched.com",
"config": json.dumps({"custom_option": ["pat", "ched"]})
}
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_patch',
apikey=sysadmin['apikey'], **patch_dict)
source_dict.update(patch_dict)
for key in set(('url', 'name', 'title', 'notes', 'source_type',
'frequency', 'config')):
assert_equal(source_dict[key], result[key], "Key: %s" % key)
# Check that source was actually updated
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
class TestActions(ActionBase):
def test_harvest_source_clear(self):
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_clear')(
context, {'id': source.id})
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
assert_equal(model.Package.get(dataset['id']), None)
def test_harvest_source_job_history_clear(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_job_history_clear')(
context, {'id': source.id})
# verify
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db, 'is None'
assert_equal(dataset_from_db.id, dataset['id'])
def test_harvest_sources_job_history_clear(self):
# prepare
data_dict = SOURCE_DICT.copy()
source_1 = factories.HarvestSourceObj(**data_dict)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)
job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])
job_2 = factories.HarvestJobObj(source=source_2)
dataset_2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=dataset_2['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_sources_job_history_clear')(
context, {})
# verify
assert_equal(
sorted(result),
sorted([{'id': source_1.id}, {'id': source_2.id}]))
source_1 = harvest_model.HarvestSource.get(source_1.id)
assert source_1
assert_equal(harvest_model.HarvestJob.get(job_1.id), None)
assert_equal(harvest_model.HarvestObject.get(object_1_.id), None)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1, 'is None'
assert_equal(dataset_from_db_1.id, dataset_1['id'])
source_2 = harvest_model.HarvestSource.get(source_1.id)
assert source_2
assert_equal(harvest_model.HarvestJob.get(job_2.id), None)
assert_equal(harvest_model.HarvestObject.get(object_2_.id), None)
dataset_from_db_2 = model.Package.get(dataset_2['id'])
assert dataset_from_db_2, 'is None'
assert_equal(dataset_from_db_2.id, dataset_2['id'])
def test_harvest_source_create_twice_with_unique_url(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_same_url(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
assert_raises(toolkit.ValidationError,
toolkit.get_action('harvest_source_create'),
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_unique_url_and_config(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
data_dict['config'] = '{"something": "new"}'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_job_create_as_sysadmin(self):
source = factories.HarvestSource(**SOURCE_DICT.copy())
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': site_user}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
def test_harvest_job_create_as_admin(self):
# as if an admin user presses 'refresh'
user = ckan_factories.User()
user['capacity'] = 'admin'
org = ckan_factories.Organization(users=[user])
source_dict = dict(SOURCE_DICT.items() +
[('publisher_id', org['id'])])
source = factories.HarvestSource(**source_dict)
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': user['name']}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
class TestHarvestObject(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_create(self):
job = factories.HarvestJobObj()
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': job.id,
'extras': {'a key': 'a value'},
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
# fetch the object from database to check it was created
created_object = harvest_model.HarvestObject.get(harvest_object['id'])
assert created_object.guid == harvest_object['guid'] == data_dict['guid']
def test_create_bad_parameters(self):
source_a = factories.HarvestSourceObj()
job = factories.HarvestJobObj()
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'job_id': job.id,
'source_id': source_a.id,
'extras': 1
}
harvest_object_create = toolkit.get_action('harvest_object_create')
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
data_dict['extras'] = {'test': 1}
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
class TestHarvestErrorMail(FunctionalTestBase):
@classmethod
def setup_class(cls):
super(TestHarvestErrorMail, cls).setup_class()
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
super(TestHarvestErrorMail, cls).teardown_class()
reset_db()
def _create_harvest_source_and_job_if_not_existing(self):
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
context = {
'user': site_user,
'model': model,
'session': model.Session,
'ignore_auth': True,
}
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test-nose',
}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context,
source_dict
)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context,
{'id': source_dict['name']}
)
pass
try:
job = toolkit.get_action('harvest_job_create')(context, {
'source_id': harvest_source['id'], 'run': True})
except HarvestJobExists:
job = toolkit.get_action('harvest_job_show')(context, {
'id': harvest_source['status']['last_job']['id']})
pass
toolkit.get_action('harvest_jobs_run')(context, {})
toolkit.get_action('harvest_source_reindex')(context, {'id': harvest_source['id']})
return context, harvest_source, job
def _create_harvest_source_with_owner_org_and_job_if_not_existing(self):
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
context = {
'user': site_user,
'model': model,
'session': model.Session,
'ignore_auth': True,
}
test_org = ckan_factories.Organization()
test_other_org = ckan_factories.Organization()
org_admin_user = ckan_factories.User()
org_member_user = ckan_factories.User()
other_org_admin_user = ckan_factories.User()
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_org['id'],
'username': org_admin_user['name'],
'role': 'admin'
}
)
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_org['id'],
'username': org_member_user['name'],
'role': 'member'
}
)
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_other_org['id'],
'username': other_org_admin_user['name'],
'role': 'admin'
}
)
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test-nose',
'owner_org': test_org['id'],
'run': True
}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context.copy(),
source_dict
)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context.copy(),
{'id': source_dict['name']}
)
pass
try:
job = toolkit.get_action('harvest_job_create')(context.copy(), {
'source_id': harvest_source['id'], 'run': True})
except HarvestJobExists:
job = toolkit.get_action('harvest_job_show')(context.copy(), {
'id': harvest_source['status']['last_job']['id']})
pass
toolkit.get_action('harvest_jobs_run')(context.copy(), {})
toolkit.get_action('harvest_source_reindex')(context.copy(), {'id': harvest_source['id']})
return context, harvest_source, job
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_not_sent(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_and_job_if_not_existing()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(0, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.not_called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_and_job_if_not_existing()
# create a HarvestGatherError
job_model = HarvestJob.get(job['id'])
msg = 'System error - No harvester could be found for source type %s' % job_model.source.type
err = HarvestGatherError(message=msg, job=job_model)
err.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent_with_object_error(self, mock_mailer_mail_recipient):
context, harvest_source, harvest_job = self._create_harvest_source_and_job_if_not_existing()
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job['id'],
'extras': {'a key': 'a value'},
'source_id': harvest_source['id']
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
harvest_object_model = HarvestObject.get(harvest_object['id'])
# create a HarvestObjectError
msg = 'HarvestObjectError occured: %s' % harvest_job['id']
harvest_object_error = HarvestObjectError(message=msg, object=harvest_object_model)
harvest_object_error.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent_with_org(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_with_owner_org_and_job_if_not_existing()
# create a HarvestGatherError
job_model = HarvestJob.get(job['id'])
msg = 'System error - No harvester could be found for source type %s' % job_model.source.type
err = HarvestGatherError(message=msg, job=job_model)
err.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
assert_equal(2, mock_mailer_mail_recipient.call_count)

View File

@ -1,7 +1,7 @@
from ckan.lib.helpers import url_for
from ckantoolkit.tests import helpers, factories
from ckanext.harvest.tests import factories as harvest_factories
from ckanext.harvest.tests.nose import factories as harvest_factories
from nose.tools import assert_in
import ckanext.harvest.model as harvest_model

View File

@ -0,0 +1,348 @@
from mock import patch
from ckantoolkit.tests.helpers import reset_db
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.queue as queue
from ckan.plugins.core import SingletonPlugin, implements
import json
import ckan.logic as logic
from ckan import model
from nose.tools import assert_equal, ok_
from ckan.lib.base import config
from nose.plugins.skip import SkipTest
import uuid
class MockHarvester(SingletonPlugin):
implements(IHarvester)
def info(self):
return {'name': 'test-nose', 'title': 'test', 'description': 'test'}
def gather_stage(self, harvest_job):
if harvest_job.source.url.startswith('basic_test'):
obj = HarvestObject(guid='test1', job=harvest_job)
obj.extras.append(HarvestObjectExtra(key='key', value='value'))
obj2 = HarvestObject(guid='test2', job=harvest_job)
obj3 = HarvestObject(guid='test_to_delete', job=harvest_job)
obj.add()
obj2.add()
obj3.save() # this will commit both
return [obj.id, obj2.id, obj3.id]
return []
def fetch_stage(self, harvest_object):
assert harvest_object.state == "FETCH"
assert harvest_object.fetch_started is not None
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
def import_stage(self, harvest_object):
assert harvest_object.state == "IMPORT"
assert harvest_object.fetch_finished is not None
assert harvest_object.import_started is not None
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
package = json.loads(harvest_object.content)
name = package['name']
package_object = model.Package.get(name)
if package_object:
logic_function = 'package_update'
else:
logic_function = 'package_create'
package_dict = logic.get_action(logic_function)(
{'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True},
json.loads(harvest_object.content)
)
# set previous objects to not current
previous_object = model.Session.query(HarvestObject) \
.filter(HarvestObject.guid == harvest_object.guid) \
.filter(
HarvestObject.current == True # noqa: E712
).first()
if previous_object:
previous_object.current = False
previous_object.save()
# delete test_to_delete package on second run
harvest_object.package_id = package_dict['id']
harvest_object.current = True
if package_dict['name'] == 'test_to_delete' and package_object:
harvest_object.current = False
package_object.state = 'deleted'
package_object.save()
harvest_object.save()
return True
class TestHarvestQueue(object):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_01_basic_harvester(self):
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test-nose',
}
harvest_source = logic.get_action('harvest_source_create')(
context,
source_dict
)
assert harvest_source['source_type'] == 'test-nose', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == 3
all_objects = model.Session.query(HarvestObject).filter_by(current=True).all()
assert_equal(len(all_objects), 3)
assert_equal(all_objects[0].state, 'COMPLETE')
assert_equal(all_objects[0].report_status, 'added')
assert_equal(all_objects[1].state, 'COMPLETE')
assert_equal(all_objects[1].report_status, 'added')
assert_equal(all_objects[2].state, 'COMPLETE')
assert_equal(all_objects[2].report_status, 'added')
# fire run again to check if job is set to Finished
logic.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert_equal(harvest_job['status'], u'Finished')
assert_equal(harvest_job['stats'], {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0})
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 3, 'updated': 0,
'not modified': 0, 'errored': 0, 'deleted': 0})
assert_equal(harvest_source_dict['status']['total_datasets'], 3)
assert_equal(harvest_source_dict['status']['job_count'], 1)
# Second run
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 6
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert_equal(count, 3)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='added').all()
assert_equal(len(all_objects), 3)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='updated').all()
assert_equal(len(all_objects), 2)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='deleted').all()
assert_equal(len(all_objects), 1)
# run to make sure job is marked as finshed
logic.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert_equal(harvest_job['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1})
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2,
'not modified': 0, 'errored': 0, 'deleted': 1})
assert_equal(harvest_source_dict['status']['total_datasets'], 2)
assert_equal(harvest_source_dict['status']['job_count'], 2)
def test_redis_queue_purging(self):
'''
Test that Redis queue purging doesn't purge the wrong keys.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
raise SkipTest()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key', 'foobar')
# Create some fake jobs
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
num_keys = redis.dbsize()
# Create some fake objects
gather_consumer = queue.get_gather_consumer()
next(gather_consumer.consume(queue.get_gather_queue_name()))
fetch_consumer = queue.get_fetch_consumer()
next(fetch_consumer.consume(queue.get_fetch_queue_name()))
ok_(redis.dbsize() > num_keys)
queue.purge_queues()
assert_equal(redis.get('ckanext-harvest:some-random-key'),
'foobar')
assert_equal(redis.dbsize(), num_keys)
assert_equal(redis.llen(queue.get_gather_routing_key()), 0)
assert_equal(redis.llen(queue.get_fetch_routing_key()), 0)
finally:
redis.delete('ckanext-harvest:some-random-key')
class TestHarvestCorruptRedis(object):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
@patch('ckanext.harvest.queue.log.error')
def test_redis_corrupt(self, mock_log_error):
'''
Test that corrupt Redis doesn't stop harvest process and still processes other jobs.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
raise SkipTest()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key-2', 'foobar')
# make sure queues/exchanges are created first and are empty
gather_consumer = queue.get_gather_consumer()
fetch_consumer = queue.get_fetch_consumer()
gather_consumer.queue_purge(queue=queue.get_gather_queue_name())
fetch_consumer.queue_purge(queue=queue.get_fetch_queue_name())
# Create some fake jobs and objects with no harvest_job_id
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': None})
h_obj_id = str(uuid.uuid4())
fetch_publisher.send({'harvest_object_id': h_obj_id})
# Create some fake objects
next(gather_consumer.consume(queue.get_gather_queue_name()))
_, _, body = next(fetch_consumer.consume(queue.get_fetch_queue_name()))
json_obj = json.loads(body)
assert json_obj['harvest_object_id'] == h_obj_id
assert mock_log_error.call_count == 1
args, _ = mock_log_error.call_args_list[0]
assert "cannot concatenate 'str' and 'NoneType' objects" in args[1]
finally:
redis.delete('ckanext-harvest:some-random-key-2')

View File

@ -0,0 +1,181 @@
'''Tests elements of queue.py, but doesn't use the queue subsystem
(redis/rabbitmq)
'''
import json
from nose.tools import assert_equal
from ckantoolkit.tests.helpers import reset_db
from ckan import model
from ckan import plugins as p
from ckan.plugins import toolkit
from ckanext.harvest.tests.factories import (HarvestObjectObj)
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
from ckanext.harvest.tests.lib import run_harvest
class MockHarvester(p.SingletonPlugin):
p.implements(IHarvester)
@classmethod
def _set_test_params(cls, guid, **test_params):
cls._guid = guid
cls._test_params = test_params
def info(self):
return {'name': 'test2-nose', 'title': 'test', 'description': 'test'}
def gather_stage(self, harvest_job):
obj = HarvestObjectObj(guid=self._guid, job=harvest_job)
return [obj.id]
def fetch_stage(self, harvest_object):
if self._test_params.get('fetch_object_unchanged'):
return 'unchanged'
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
def import_stage(self, harvest_object):
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
package = json.loads(harvest_object.content)
name = package['name']
package_object = model.Package.get(name)
if package_object:
logic_function = 'package_update'
else:
logic_function = 'package_create'
package_dict = toolkit.get_action(logic_function)(
{'model': model, 'session': model.Session,
'user': user},
json.loads(harvest_object.content)
)
if self._test_params.get('object_error'):
return False
# successful, so move 'current' to this object
previous_object = model.Session.query(harvest_model.HarvestObject) \
.filter_by(guid=harvest_object.guid) \
.filter_by(current=True) \
.first()
if previous_object:
previous_object.current = False
previous_object.save()
harvest_object.package_id = package_dict['id']
harvest_object.current = True
if self._test_params.get('delete'):
# 'current=False' is the key step in getting report_status to be
# set as 'deleted'
harvest_object.current = False
package_object.save()
harvest_object.save()
if self._test_params.get('import_object_unchanged'):
return 'unchanged'
return True
class TestEndStates(object):
def setup(self):
reset_db()
harvest_model.setup()
def test_create_dataset(self):
guid = 'obj-create'
MockHarvester._set_test_params(guid=guid)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['errors'], [])
def test_update_dataset(self):
guid = 'obj-update'
MockHarvester._set_test_params(guid=guid)
# create the original harvest_object and dataset
run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
# update it
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'updated')
assert_equal(result['errors'], [])
def test_delete_dataset(self):
guid = 'obj-delete'
MockHarvester._set_test_params(guid=guid)
# create the original harvest_object and dataset
run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
MockHarvester._set_test_params(guid=guid, delete=True)
# delete it
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'deleted')
assert_equal(result['errors'], [])
def test_obj_error(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, object_error=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'ERROR')
assert_equal(result['report_status'], 'errored')
assert_equal(result['errors'], [])
def test_fetch_unchanged(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, fetch_object_unchanged=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert_equal(result['errors'], [])
def test_import_unchanged(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, import_object_unchanged=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert_equal(result['errors'], [])

View File

@ -1,82 +1,14 @@
from __future__ import absolute_import
import json
from . import factories
import unittest
from mock import patch
from nose.tools import assert_equal, assert_raises, assert_in
from nose.plugins.skip import SkipTest
from ckantoolkit.tests import factories as ckan_factories
from ckantoolkit.tests.helpers import _get_test_app, reset_db, FunctionalTestBase
import pytest
from ckan import plugins as p
from ckan.plugins import toolkit
from ckan import model
from ckantoolkit.tests import factories as ckan_factories, helpers
from ckanext.harvest.tests import factories
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestGatherError, HarvestObjectError, HarvestObject, HarvestJob
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.logic.action.update import send_error_mail
def call_action_api(action, apikey=None, status=200, **kwargs):
'''POST an HTTP request to the CKAN API and return the result.
Any additional keyword arguments that you pass to this function as **kwargs
are posted as params to the API.
Usage:
package_dict = call_action_api('package_create', apikey=apikey,
name='my_package')
assert package_dict['name'] == 'my_package'
num_followers = post(app, 'user_follower_count', id='annafan')
If you are expecting an error from the API and want to check the contents
of the error dict, you have to use the status param otherwise an exception
will be raised:
error_dict = call_action_api('group_activity_list', status=403,
id='invalid_id')
assert error_dict['message'] == 'Access Denied'
:param action: the action to post to, e.g. 'package_create'
:type action: string
:param apikey: the API key to put in the Authorization header of the post
(optional, default: None)
:type apikey: string
:param status: the HTTP status code expected in the response from the CKAN
API, e.g. 403, if a different status code is received an exception will
be raised (optional, default: 200)
:type status: int
:param **kwargs: any other keyword arguments passed to this function will
be posted to the API as params
:raises paste.fixture.AppError: if the HTTP status code of the response
from the CKAN API is different from the status param passed to this
function
:returns: the 'result' or 'error' dictionary from the CKAN API response
:rtype: dictionary
'''
params = json.dumps(kwargs)
app = _get_test_app()
response = app.post('/api/action/{0}'.format(action), params=params,
extra_environ={'Authorization': str(apikey)},
status=status)
if status in (200,):
assert response.json['success'] is True
return response.json['result']
else:
assert response.json['success'] is False
return response.json['error']
class MockHarvesterForActionTests(p.SingletonPlugin):
@ -124,36 +56,9 @@ SOURCE_DICT = {
}
class ActionBase(object):
@classmethod
def setup_class(cls):
if not p.plugin_loaded('test_action_harvester'):
p.load('test_action_harvester')
def setup(self):
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
p.unload('test_action_harvester')
class HarvestSourceActionBase(FunctionalTestBase):
@classmethod
def setup_class(cls):
super(HarvestSourceActionBase, cls).setup_class()
harvest_model.setup()
if not p.plugin_loaded('test_action_harvester'):
p.load('test_action_harvester')
@classmethod
def teardown_class(cls):
super(HarvestSourceActionBase, cls).teardown_class()
p.unload('test_action_harvester')
@pytest.mark.usefixtures('with_plugins', 'clean_db', 'harvest_setup', 'clean_queues')
@pytest.mark.ckan_config('ckan.plugins', 'harvest test_action_harvester')
class HarvestSourceActionBase():
def _get_source_dict(self):
return {
@ -172,21 +77,19 @@ class HarvestSourceActionBase(FunctionalTestBase):
if 'id' in test_data:
source_dict['id'] = test_data['id']
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
result = helpers.call_action(self.action,
**source_dict)
for key in ('name', 'title', 'url', 'source_type'):
assert_equal(result[key], [u'Missing value'])
assert result[key] == [u'Missing value']
def test_invalid_unknown_type(self):
source_dict = self._get_source_dict()
source_dict['source_type'] = 'unknown'
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
result = helpers.call_action(self.action,
**source_dict)
assert 'source_type' in result
@ -197,9 +100,8 @@ class HarvestSourceActionBase(FunctionalTestBase):
source_dict = self._get_source_dict()
source_dict['frequency'] = wrong_frequency
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
result = helpers.call_action(self.action,
**source_dict)
assert 'frequency' in result
@ -209,9 +111,8 @@ class HarvestSourceActionBase(FunctionalTestBase):
source_dict = self._get_source_dict()
source_dict['config'] = 'not_json'
sysadmin = ckan_factories.Sysadmin()
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
result = helpers.call_action(self.action,
**source_dict)
assert 'config' in result
@ -219,8 +120,7 @@ class HarvestSourceActionBase(FunctionalTestBase):
source_dict['config'] = json.dumps({'custom_option': 'not_a_list'})
result = call_action_api(self.action,
apikey=sysadmin['apikey'], status=409,
result = helpers.call_action(self.action,
**source_dict)
assert 'config' in result
@ -236,573 +136,25 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase):
source_dict = self._get_source_dict()
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_create',
apikey=sysadmin['apikey'], **source_dict)
result = helpers.call_action('harvest_source_create',
**source_dict)
for key in source_dict.keys():
assert_equal(source_dict[key], result[key])
assert source_dict[key] == result[key]
# Check that source was actually created
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
assert source.url == source_dict['url']
assert source.type == source_dict['source_type']
# Trying to create a source with the same URL fails
source_dict = self._get_source_dict()
source_dict['name'] = 'test-source-action-new'
result = call_action_api('harvest_source_create',
apikey=sysadmin['apikey'], status=409,
result = helpers.call_action('harvest_source_create',
**source_dict)
assert 'url' in result
assert u'There already is a Harvest Source for this URL' in result['url'][0]
class HarvestSourceFixtureMixin(object):
def _get_source_dict(self):
'''Not only returns a source_dict, but creates the HarvestSource object
as well - suitable for testing update actions.
'''
source = HarvestSourceActionBase._get_source_dict(self)
source = factories.HarvestSource(**source)
# delete status because it gets in the way of the status supplied to
# call_action_api later on. It is only a generated value, not affecting
# the update/patch anyway.
del source['status']
return source
class TestHarvestSourceActionUpdate(HarvestSourceFixtureMixin,
HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_update'
def test_update(self):
source_dict = self._get_source_dict()
source_dict.update({
"url": "http://test.action.updated.com",
"name": "test-source-action-updated",
"title": "Test source action updated",
"notes": "Test source action desc updated",
"source_type": "test",
"frequency": "MONTHLY",
"config": json.dumps({"custom_option": ["c", "d"]})
})
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_update',
apikey=sysadmin['apikey'], **source_dict)
for key in set(('url', 'name', 'title', 'notes', 'source_type',
'frequency', 'config')):
assert_equal(source_dict[key], result[key], "Key: %s" % key)
# Check that source was actually updated
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
class TestHarvestSourceActionPatch(HarvestSourceFixtureMixin,
HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_patch'
if toolkit.check_ckan_version(max_version='2.2.99'):
# harvest_source_patch only came in with ckan 2.3
raise SkipTest()
def test_invalid_missing_values(self):
pass
def test_patch(self):
source_dict = self._get_source_dict()
patch_dict = {
"id": source_dict['id'],
"name": "test-source-action-patched",
"url": "http://test.action.patched.com",
"config": json.dumps({"custom_option": ["pat", "ched"]})
}
sysadmin = ckan_factories.Sysadmin()
result = call_action_api('harvest_source_patch',
apikey=sysadmin['apikey'], **patch_dict)
source_dict.update(patch_dict)
for key in set(('url', 'name', 'title', 'notes', 'source_type',
'frequency', 'config')):
assert_equal(source_dict[key], result[key], "Key: %s" % key)
# Check that source was actually updated
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
assert_equal(source.type, source_dict['source_type'])
class TestActions(ActionBase):
def test_harvest_source_clear(self):
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_clear')(
context, {'id': source.id})
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
assert_equal(model.Package.get(dataset['id']), None)
def test_harvest_source_job_history_clear(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_job_history_clear')(
context, {'id': source.id})
# verify
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db, 'is None'
assert_equal(dataset_from_db.id, dataset['id'])
def test_harvest_sources_job_history_clear(self):
# prepare
data_dict = SOURCE_DICT.copy()
source_1 = factories.HarvestSourceObj(**data_dict)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)
job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])
job_2 = factories.HarvestJobObj(source=source_2)
dataset_2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=dataset_2['id'])
# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_sources_job_history_clear')(
context, {})
# verify
assert_equal(
sorted(result),
sorted([{'id': source_1.id}, {'id': source_2.id}]))
source_1 = harvest_model.HarvestSource.get(source_1.id)
assert source_1
assert_equal(harvest_model.HarvestJob.get(job_1.id), None)
assert_equal(harvest_model.HarvestObject.get(object_1_.id), None)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1, 'is None'
assert_equal(dataset_from_db_1.id, dataset_1['id'])
source_2 = harvest_model.HarvestSource.get(source_1.id)
assert source_2
assert_equal(harvest_model.HarvestJob.get(job_2.id), None)
assert_equal(harvest_model.HarvestObject.get(object_2_.id), None)
dataset_from_db_2 = model.Package.get(dataset_2['id'])
assert dataset_from_db_2, 'is None'
assert_equal(dataset_from_db_2.id, dataset_2['id'])
def test_harvest_source_create_twice_with_unique_url(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_same_url(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
assert_raises(toolkit.ValidationError,
toolkit.get_action('harvest_source_create'),
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_unique_url_and_config(self):
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict['name'] = 'another-source'
data_dict['config'] = '{"something": "new"}'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_job_create_as_sysadmin(self):
source = factories.HarvestSource(**SOURCE_DICT.copy())
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': site_user}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
def test_harvest_job_create_as_admin(self):
# as if an admin user presses 'refresh'
user = ckan_factories.User()
user['capacity'] = 'admin'
org = ckan_factories.Organization(users=[user])
source_dict = dict(SOURCE_DICT.items() +
[('publisher_id', org['id'])])
source = factories.HarvestSource(**source_dict)
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': user['name']}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
class TestHarvestObject(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_create(self):
job = factories.HarvestJobObj()
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': job.id,
'extras': {'a key': 'a value'},
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
# fetch the object from database to check it was created
created_object = harvest_model.HarvestObject.get(harvest_object['id'])
assert created_object.guid == harvest_object['guid'] == data_dict['guid']
def test_create_bad_parameters(self):
source_a = factories.HarvestSourceObj()
job = factories.HarvestJobObj()
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'job_id': job.id,
'source_id': source_a.id,
'extras': 1
}
harvest_object_create = toolkit.get_action('harvest_object_create')
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
data_dict['extras'] = {'test': 1}
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
class TestHarvestErrorMail(FunctionalTestBase):
@classmethod
def setup_class(cls):
super(TestHarvestErrorMail, cls).setup_class()
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
super(TestHarvestErrorMail, cls).teardown_class()
reset_db()
def _create_harvest_source_and_job_if_not_existing(self):
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
context = {
'user': site_user,
'model': model,
'session': model.Session,
'ignore_auth': True,
}
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test',
}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context,
source_dict
)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context,
{'id': source_dict['name']}
)
pass
try:
job = toolkit.get_action('harvest_job_create')(context, {
'source_id': harvest_source['id'], 'run': True})
except HarvestJobExists:
job = toolkit.get_action('harvest_job_show')(context, {
'id': harvest_source['status']['last_job']['id']})
pass
toolkit.get_action('harvest_jobs_run')(context, {})
toolkit.get_action('harvest_source_reindex')(context, {'id': harvest_source['id']})
return context, harvest_source, job
def _create_harvest_source_with_owner_org_and_job_if_not_existing(self):
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
context = {
'user': site_user,
'model': model,
'session': model.Session,
'ignore_auth': True,
}
test_org = ckan_factories.Organization()
test_other_org = ckan_factories.Organization()
org_admin_user = ckan_factories.User()
org_member_user = ckan_factories.User()
other_org_admin_user = ckan_factories.User()
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_org['id'],
'username': org_admin_user['name'],
'role': 'admin'
}
)
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_org['id'],
'username': org_member_user['name'],
'role': 'member'
}
)
toolkit.get_action('organization_member_create')(
context.copy(),
{
'id': test_other_org['id'],
'username': other_org_admin_user['name'],
'role': 'admin'
}
)
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test',
'owner_org': test_org['id'],
'run': True
}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context.copy(),
source_dict
)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context.copy(),
{'id': source_dict['name']}
)
pass
try:
job = toolkit.get_action('harvest_job_create')(context.copy(), {
'source_id': harvest_source['id'], 'run': True})
except HarvestJobExists:
job = toolkit.get_action('harvest_job_show')(context.copy(), {
'id': harvest_source['status']['last_job']['id']})
pass
toolkit.get_action('harvest_jobs_run')(context.copy(), {})
toolkit.get_action('harvest_source_reindex')(context.copy(), {'id': harvest_source['id']})
return context, harvest_source, job
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_not_sent(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_and_job_if_not_existing()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(0, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.not_called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_and_job_if_not_existing()
# create a HarvestGatherError
job_model = HarvestJob.get(job['id'])
msg = 'System error - No harvester could be found for source type %s' % job_model.source.type
err = HarvestGatherError(message=msg, job=job_model)
err.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent_with_object_error(self, mock_mailer_mail_recipient):
context, harvest_source, harvest_job = self._create_harvest_source_and_job_if_not_existing()
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job['id'],
'extras': {'a key': 'a value'},
'source_id': harvest_source['id']
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
harvest_object_model = HarvestObject.get(harvest_object['id'])
# create a HarvestObjectError
msg = 'HarvestObjectError occured: %s' % harvest_job['id']
harvest_object_error = HarvestObjectError(message=msg, object=harvest_object_model)
harvest_object_error.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent_with_org(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_with_owner_org_and_job_if_not_existing()
# create a HarvestGatherError
job_model = HarvestJob.get(job['id'])
msg = 'System error - No harvester could be found for source type %s' % job_model.source.type
err = HarvestGatherError(message=msg, job=job_model)
err.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
assert_equal(2, mock_mailer_mail_recipient.call_count)
# Skip for now as the Harvest DB log doesn't work on CKAN 2.9
class XXTestHarvestDBLog(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def xxtest_harvest_db_logger(self):
# Create source and check if harvest_log table is populated
data_dict = SOURCE_DICT.copy()
data_dict['source_type'] = 'test'
source = factories.HarvestSourceObj(**data_dict)
content = 'Harvest source created: %s' % source.id
log = harvest_model.Session.query(harvest_model.HarvestLog).\
filter(harvest_model.HarvestLog.content == content).first()
self.assertIsNotNone(log)
self.assertEqual(log.level, 'INFO')
context = {
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data = toolkit.get_action('harvest_log_list')(context, {})
self.assertTrue(len(data) > 0)
self.assertIn('level', data[0])
self.assertIn('content', data[0])
self.assertIn('created', data[0])
self.assertTrue(data[0]['created'] > data[1]['created'])
per_page = 1
data = toolkit.get_action('harvest_log_list')(context, {'level': 'info', 'per_page': per_page})
self.assertEqual(len(data), per_page)
self.assertEqual(data[0]['level'], 'INFO')

View File

@ -9,9 +9,7 @@ from ckan.plugins.core import SingletonPlugin, implements
import json
import ckan.logic as logic
from ckan import model
from nose.tools import assert_equal, ok_
from ckan.lib.base import config
from nose.plugins.skip import SkipTest
import uuid
@ -87,262 +85,5 @@ class MockHarvester(SingletonPlugin):
harvest_object.save()
return True
class TestHarvestQueue(object):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_01_basic_harvester(self):
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test',
}
harvest_source = logic.get_action('harvest_source_create')(
context,
source_dict
)
assert harvest_source['source_type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == 3
all_objects = model.Session.query(HarvestObject).filter_by(current=True).all()
assert_equal(len(all_objects), 3)
assert_equal(all_objects[0].state, 'COMPLETE')
assert_equal(all_objects[0].report_status, 'added')
assert_equal(all_objects[1].state, 'COMPLETE')
assert_equal(all_objects[1].report_status, 'added')
assert_equal(all_objects[2].state, 'COMPLETE')
assert_equal(all_objects[2].report_status, 'added')
# fire run again to check if job is set to Finished
logic.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert_equal(harvest_job['status'], u'Finished')
assert_equal(harvest_job['stats'], {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0})
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 3, 'updated': 0,
'not modified': 0, 'errored': 0, 'deleted': 0})
assert_equal(harvest_source_dict['status']['total_datasets'], 3)
assert_equal(harvest_source_dict['status']['job_count'], 1)
# Second run
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 6
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert_equal(count, 3)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='added').all()
assert_equal(len(all_objects), 3)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='updated').all()
assert_equal(len(all_objects), 2)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='deleted').all()
assert_equal(len(all_objects), 1)
# run to make sure job is marked as finshed
logic.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert_equal(harvest_job['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1})
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2,
'not modified': 0, 'errored': 0, 'deleted': 1})
assert_equal(harvest_source_dict['status']['total_datasets'], 2)
assert_equal(harvest_source_dict['status']['job_count'], 2)
def test_redis_queue_purging(self):
'''
Test that Redis queue purging doesn't purge the wrong keys.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
raise SkipTest()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key', 'foobar')
# Create some fake jobs
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
num_keys = redis.dbsize()
# Create some fake objects
gather_consumer = queue.get_gather_consumer()
next(gather_consumer.consume(queue.get_gather_queue_name()))
fetch_consumer = queue.get_fetch_consumer()
next(fetch_consumer.consume(queue.get_fetch_queue_name()))
ok_(redis.dbsize() > num_keys)
queue.purge_queues()
assert_equal(redis.get('ckanext-harvest:some-random-key'),
'foobar')
assert_equal(redis.dbsize(), num_keys)
assert_equal(redis.llen(queue.get_gather_routing_key()), 0)
assert_equal(redis.llen(queue.get_fetch_routing_key()), 0)
finally:
redis.delete('ckanext-harvest:some-random-key')
class TestHarvestCorruptRedis(object):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
@patch('ckanext.harvest.queue.log.error')
def test_redis_corrupt(self, mock_log_error):
'''
Test that corrupt Redis doesn't stop harvest process and still processes other jobs.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
raise SkipTest()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key-2', 'foobar')
# make sure queues/exchanges are created first and are empty
gather_consumer = queue.get_gather_consumer()
fetch_consumer = queue.get_fetch_consumer()
gather_consumer.queue_purge(queue=queue.get_gather_queue_name())
fetch_consumer.queue_purge(queue=queue.get_fetch_queue_name())
# Create some fake jobs and objects with no harvest_job_id
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': None})
h_obj_id = str(uuid.uuid4())
fetch_publisher.send({'harvest_object_id': h_obj_id})
# Create some fake objects
next(gather_consumer.consume(queue.get_gather_queue_name()))
_, _, body = next(fetch_consumer.consume(queue.get_fetch_queue_name()))
json_obj = json.loads(body)
assert json_obj['harvest_object_id'] == h_obj_id
assert mock_log_error.call_count == 1
args, _ = mock_log_error.call_args_list[0]
assert "cannot concatenate 'str' and 'NoneType' objects" in args[1]
finally:
redis.delete('ckanext-harvest:some-random-key-2')
def test_a(self):
assert 1

View File

@ -3,7 +3,6 @@
'''
import json
from nose.tools import assert_equal
from ckantoolkit.tests.helpers import reset_db
from ckan import model
@ -85,96 +84,5 @@ class MockHarvester(p.SingletonPlugin):
return True
class TestEndStates(object):
def setup(self):
reset_db()
harvest_model.setup()
def test_create_dataset(self):
guid = 'obj-create'
MockHarvester._set_test_params(guid=guid)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['errors'], [])
def test_update_dataset(self):
guid = 'obj-update'
MockHarvester._set_test_params(guid=guid)
# create the original harvest_object and dataset
run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
# update it
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'updated')
assert_equal(result['errors'], [])
def test_delete_dataset(self):
guid = 'obj-delete'
MockHarvester._set_test_params(guid=guid)
# create the original harvest_object and dataset
run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
MockHarvester._set_test_params(guid=guid, delete=True)
# delete it
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'deleted')
assert_equal(result['errors'], [])
def test_obj_error(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, object_error=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'ERROR')
assert_equal(result['report_status'], 'errored')
assert_equal(result['errors'], [])
def test_fetch_unchanged(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, fetch_object_unchanged=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert_equal(result['errors'], [])
def test_import_unchanged(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, import_object_unchanged=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert_equal(result['errors'], [])
def test_a(self):
assert 1

7
conftest.py Normal file
View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
pytest_plugins = [
u'ckan.tests.pytest_ckan.ckan_setup',
u'ckan.tests.pytest_ckan.fixtures',
u'ckanext.harvest.tests.fixtures',
]

View File

@ -25,3 +25,6 @@ statistics = true
[flake8]
max-line-length = 127
[tool:pytest]
norecursedirs=ckanext/harvest/tests/nose

View File

@ -32,10 +32,18 @@ setup(
# Add plugins here, eg
harvest=ckanext.harvest.plugin:Harvest
ckan_harvester=ckanext.harvest.harvesters:CKANHarvester
[ckan.test_plugins]
# Test plugins
test_harvester=ckanext.harvest.tests.test_queue:MockHarvester
test_harvester2=ckanext.harvest.tests.test_queue2:MockHarvester
test_action_harvester=ckanext.harvest.tests.test_action:MockHarvesterForActionTests
test_nose_harvester=ckanext.harvest.tests.nose.test_queue:MockHarvester
test_nose_harvester2=ckanext.harvest.tests.nose.test_queue2:MockHarvester
test_nose_action_harvester=ckanext.harvest.tests.nose.test_action:MockHarvesterForActionTests
[paste.paster_command]
harvester = ckanext.harvest.commands.harvester:Harvester
[babel.extractors]

70
test-core-nose.ini Normal file
View File

@ -0,0 +1,70 @@
[DEFAULT]
debug = false
# Uncomment and replace with the address which should receive any error reports
#email_to = you@yourdomain.com
smtp_server = localhost
error_email_from = paste@localhost
[server:main]
use = egg:Paste#http
host = 0.0.0.0
port = 5000
[app:main]
use = config:../ckan/test-core.ini
# Here we hard-code the database and a flag to make default tests
# run fast.
ckan.plugins = harvest ckan_harvester test_nose_harvester test_nose_harvester2 test_nose_action_harvester
ckan.harvest.mq.type = redis
ckan.legacy_templates = false
# NB: other test configuration should go in test-core.ini, which is
# what the postgres tests use.
# Logging configuration
[loggers]
keys = root, ckan, sqlalchemy
[handlers]
keys = console, dblog
[formatters]
keys = generic, dblog
[logger_root]
level = WARN
handlers = console
[logger_ckan]
qualname = ckan
handlers =
level = INFO
[logger_ckan_harvester]
qualname = ckanext.harvest
handlers = dblog
level = DEBUG
[logger_sqlalchemy]
handlers =
qualname = sqlalchemy.engine
level = WARN
[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic
[handler_dblog]
class = ckanext.harvest.log.DBLogHandler
args = ()
level = DEBUG
formatter = dblog
[formatter_dblog]
format = %(message)s
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s