Merge branch 'py3-support' of github.com:ckan/ckanext-harvest into py3-support
This commit is contained in:
commit
8899f67577
|
@ -59,5 +59,6 @@ paster harvester initdb -c ckan/test-core.ini
|
|||
echo "Moving test.ini into a subdir..."
|
||||
mkdir subdir
|
||||
mv test-core.ini subdir
|
||||
mv test-core-nose.ini subdir
|
||||
|
||||
echo "travis-build.bash is done."
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
#!/bin/sh -e
|
||||
|
||||
nosetests --ckan --nologcapture --with-pylons=subdir/test-core.ini -v ckanext/harvest
|
||||
nosetests --ckan --nologcapture --with-pylons=subdir/test-core-nose.ini -v ckanext/harvest
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
{% resource 'ckanext-harvest/styles/harvest.css' %}
|
|
@ -1,94 +0,0 @@
|
|||
import factory
|
||||
import ckanext.harvest.model as harvest_model
|
||||
from ckantoolkit.tests.factories import _get_action_user_name
|
||||
from ckan.plugins import toolkit
|
||||
|
||||
|
||||
class HarvestSource(factory.Factory):
|
||||
FACTORY_FOR = harvest_model.HarvestSource
|
||||
_return_type = 'dict'
|
||||
|
||||
name = factory.Sequence(lambda n: 'test_source_{n}'.format(n=n))
|
||||
title = factory.Sequence(lambda n: 'test title {n}'.format(n=n))
|
||||
url = factory.Sequence(lambda n: 'http://{n}.test.com'.format(n=n))
|
||||
source_type = 'test' # defined in test_queue.py
|
||||
id = '{0}_id'.format(name).lower()
|
||||
|
||||
@classmethod
|
||||
def _create(cls, target_class, *args, **kwargs):
|
||||
if args:
|
||||
assert False, "Positional args aren't supported, use keyword args."
|
||||
context = {'user': _get_action_user_name(kwargs)}
|
||||
# If there is an existing source for this URL, and we can't create
|
||||
# another source with that URL, just return the original one.
|
||||
try:
|
||||
source_dict = toolkit.get_action('harvest_source_show')(
|
||||
context, dict(url=kwargs['url']))
|
||||
except toolkit.ObjectNotFound:
|
||||
source_dict = toolkit.get_action('harvest_source_create')(
|
||||
context, kwargs)
|
||||
if cls._return_type == 'dict':
|
||||
return source_dict
|
||||
else:
|
||||
return cls.FACTORY_FOR.get(source_dict['id'])
|
||||
|
||||
|
||||
class HarvestSourceObj(HarvestSource):
|
||||
_return_type = 'obj'
|
||||
|
||||
|
||||
class HarvestJob(factory.Factory):
|
||||
FACTORY_FOR = harvest_model.HarvestJob
|
||||
_return_type = 'dict'
|
||||
|
||||
source = factory.SubFactory(HarvestSourceObj)
|
||||
|
||||
@classmethod
|
||||
def _create(cls, target_class, *args, **kwargs):
|
||||
if args:
|
||||
assert False, "Positional args aren't supported, use keyword args."
|
||||
context = {'user': _get_action_user_name(kwargs)}
|
||||
if 'source_id' not in kwargs:
|
||||
kwargs['source_id'] = kwargs['source'].id
|
||||
if 'run' not in kwargs:
|
||||
kwargs['run'] = False
|
||||
job_dict = toolkit.get_action('harvest_job_create')(
|
||||
context, kwargs)
|
||||
if cls._return_type == 'dict':
|
||||
return job_dict
|
||||
else:
|
||||
return cls.FACTORY_FOR.get(job_dict['id'])
|
||||
|
||||
|
||||
class HarvestJobObj(HarvestJob):
|
||||
_return_type = 'obj'
|
||||
|
||||
|
||||
class HarvestObject(factory.Factory):
|
||||
FACTORY_FOR = harvest_model.HarvestObject
|
||||
_return_type = 'dict'
|
||||
|
||||
# source = factory.SubFactory(HarvestSourceObj)
|
||||
job = factory.SubFactory(HarvestJobObj)
|
||||
|
||||
@classmethod
|
||||
def _create(cls, target_class, *args, **kwargs):
|
||||
if args:
|
||||
assert False, "Positional args aren't supported, use keyword args."
|
||||
context = {'user': _get_action_user_name(kwargs)}
|
||||
if 'job_id' not in kwargs:
|
||||
kwargs['job_id'] = kwargs['job'].id
|
||||
kwargs['source_id'] = kwargs['job'].source.id
|
||||
# Remove 'job' to avoid it getting added as a HarvestObjectExtra
|
||||
if 'job' in kwargs:
|
||||
kwargs.pop('job')
|
||||
job_dict = toolkit.get_action('harvest_object_create')(
|
||||
context, kwargs)
|
||||
if cls._return_type == 'dict':
|
||||
return job_dict
|
||||
else:
|
||||
return cls.FACTORY_FOR.get(job_dict['id'])
|
||||
|
||||
|
||||
class HarvestObjectObj(HarvestObject):
|
||||
_return_type = 'obj'
|
|
@ -12,7 +12,7 @@ from ckan import model
|
|||
from ckan.plugins import toolkit
|
||||
|
||||
from ckanext.harvest.harvesters.ckanharvester import ContentFetchError
|
||||
from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj,
|
||||
from ckanext.harvest.tests.nose.factories import (HarvestSourceObj, HarvestJobObj,
|
||||
HarvestObjectObj)
|
||||
from ckanext.harvest.tests.lib import run_harvest
|
||||
import ckanext.harvest.model as harvest_model
|
||||
|
@ -113,7 +113,7 @@ class TestCkanHarvester(object):
|
|||
# change the modified date
|
||||
datasets = copy.deepcopy(mock_ckan.DATASETS)
|
||||
datasets[1]['metadata_modified'] = '2050-05-09T22:00:01.486366'
|
||||
with patch('ckanext.harvest.tests.harvesters.mock_ckan.DATASETS',
|
||||
with patch('ckanext.harvest.tests.nose.harvesters.mock_ckan.DATASETS',
|
||||
datasets):
|
||||
results_by_guid = run_harvest(
|
||||
url='http://localhost:%s/' % mock_ckan.PORT,
|
||||
|
|
|
@ -1,711 +0,0 @@
|
|||
from __future__ import absolute_import
|
||||
import json
|
||||
from . import factories
|
||||
import unittest
|
||||
from mock import patch
|
||||
from nose.tools import assert_equal, assert_raises, assert_in
|
||||
from nose.plugins.skip import SkipTest
|
||||
|
||||
from ckantoolkit.tests import factories as ckan_factories
|
||||
from ckantoolkit.tests.helpers import _get_test_app, reset_db, FunctionalTestBase
|
||||
|
||||
from ckan import plugins as p
|
||||
from ckan.plugins import toolkit
|
||||
from ckan import model
|
||||
|
||||
from ckanext.harvest.interfaces import IHarvester
|
||||
import ckanext.harvest.model as harvest_model
|
||||
from ckanext.harvest.model import HarvestGatherError, HarvestObjectError, HarvestObject, HarvestJob
|
||||
from ckanext.harvest.logic import HarvestJobExists
|
||||
from ckanext.harvest.logic.action.update import send_error_mail
|
||||
|
||||
|
||||
class MockHarvesterForActionTests(p.SingletonPlugin):
|
||||
p.implements(IHarvester)
|
||||
|
||||
def info(self):
|
||||
return {'name': 'test-for-action',
|
||||
'title': 'Test for action',
|
||||
'description': 'test'}
|
||||
|
||||
def validate_config(self, config):
|
||||
if not config:
|
||||
return config
|
||||
|
||||
try:
|
||||
config_obj = json.loads(config)
|
||||
|
||||
if 'custom_option' in config_obj:
|
||||
if not isinstance(config_obj['custom_option'], list):
|
||||
raise ValueError('custom_option must be a list')
|
||||
|
||||
except ValueError as e:
|
||||
raise e
|
||||
|
||||
return config
|
||||
|
||||
def gather_stage(self, harvest_job):
|
||||
return []
|
||||
|
||||
def fetch_stage(self, harvest_object):
|
||||
return True
|
||||
|
||||
def import_stage(self, harvest_object):
|
||||
return True
|
||||
|
||||
|
||||
SOURCE_DICT = {
|
||||
"url": "http://test.action.com",
|
||||
"name": "test-source-action",
|
||||
"title": "Test source action",
|
||||
"notes": "Test source action desc",
|
||||
"source_type": "test-for-action",
|
||||
"frequency": "MANUAL",
|
||||
"config": json.dumps({"custom_option": ["a", "b"]})
|
||||
}
|
||||
|
||||
|
||||
class ActionBase(object):
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
if not p.plugin_loaded('test_action_harvester'):
|
||||
p.load('test_action_harvester')
|
||||
|
||||
def setup(self):
|
||||
reset_db()
|
||||
harvest_model.setup()
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
p.unload('test_action_harvester')
|
||||
|
||||
|
||||
class HarvestSourceActionBase(FunctionalTestBase):
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(HarvestSourceActionBase, cls).setup_class()
|
||||
harvest_model.setup()
|
||||
|
||||
if not p.plugin_loaded('test_action_harvester'):
|
||||
p.load('test_action_harvester')
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
super(HarvestSourceActionBase, cls).teardown_class()
|
||||
|
||||
p.unload('test_action_harvester')
|
||||
|
||||
def _get_source_dict(self):
|
||||
return {
|
||||
"url": "http://test.action.com",
|
||||
"name": "test-source-action",
|
||||
"title": "Test source action",
|
||||
"notes": "Test source action desc",
|
||||
"source_type": "test-for-action",
|
||||
"frequency": "MANUAL",
|
||||
"config": json.dumps({"custom_option": ["a", "b"]})
|
||||
}
|
||||
|
||||
def test_invalid_missing_values(self):
|
||||
source_dict = {}
|
||||
test_data = self._get_source_dict()
|
||||
if 'id' in test_data:
|
||||
source_dict['id'] = test_data['id']
|
||||
|
||||
sysadmin = ckan_factories.Sysadmin()
|
||||
result = call_action_api(self.action,
|
||||
apikey=sysadmin['apikey'], status=409,
|
||||
**source_dict)
|
||||
|
||||
for key in ('name', 'title', 'url', 'source_type'):
|
||||
assert_equal(result[key], [u'Missing value'])
|
||||
|
||||
def test_invalid_unknown_type(self):
|
||||
source_dict = self._get_source_dict()
|
||||
source_dict['source_type'] = 'unknown'
|
||||
|
||||
sysadmin = ckan_factories.Sysadmin()
|
||||
result = call_action_api(self.action,
|
||||
apikey=sysadmin['apikey'], status=409,
|
||||
**source_dict)
|
||||
|
||||
assert 'source_type' in result
|
||||
assert u'Unknown harvester type' in result['source_type'][0]
|
||||
|
||||
def test_invalid_unknown_frequency(self):
|
||||
wrong_frequency = 'ANNUALLY'
|
||||
source_dict = self._get_source_dict()
|
||||
source_dict['frequency'] = wrong_frequency
|
||||
|
||||
sysadmin = ckan_factories.Sysadmin()
|
||||
result = call_action_api(self.action,
|
||||
apikey=sysadmin['apikey'], status=409,
|
||||
**source_dict)
|
||||
|
||||
assert 'frequency' in result
|
||||
assert u'Frequency {0} not recognised'.format(wrong_frequency) in result['frequency'][0]
|
||||
|
||||
def test_invalid_wrong_configuration(self):
|
||||
source_dict = self._get_source_dict()
|
||||
source_dict['config'] = 'not_json'
|
||||
|
||||
sysadmin = ckan_factories.Sysadmin()
|
||||
result = call_action_api(self.action,
|
||||
apikey=sysadmin['apikey'], status=409,
|
||||
**source_dict)
|
||||
|
||||
assert 'config' in result
|
||||
assert u'Error parsing the configuration options: No JSON object could be decoded' in result['config'][0]
|
||||
|
||||
source_dict['config'] = json.dumps({'custom_option': 'not_a_list'})
|
||||
|
||||
result = call_action_api(self.action,
|
||||
apikey=sysadmin['apikey'], status=409,
|
||||
**source_dict)
|
||||
|
||||
assert 'config' in result
|
||||
assert u'Error parsing the configuration options: custom_option must be a list' in result['config'][0]
|
||||
|
||||
|
||||
class TestHarvestSourceActionCreate(HarvestSourceActionBase):
|
||||
|
||||
def __init__(self):
|
||||
self.action = 'harvest_source_create'
|
||||
|
||||
def test_create(self):
|
||||
|
||||
source_dict = self._get_source_dict()
|
||||
|
||||
sysadmin = ckan_factories.Sysadmin()
|
||||
result = call_action_api('harvest_source_create',
|
||||
apikey=sysadmin['apikey'], **source_dict)
|
||||
|
||||
for key in source_dict.keys():
|
||||
assert_equal(source_dict[key], result[key])
|
||||
|
||||
# Check that source was actually created
|
||||
source = harvest_model.HarvestSource.get(result['id'])
|
||||
assert_equal(source.url, source_dict['url'])
|
||||
assert_equal(source.type, source_dict['source_type'])
|
||||
|
||||
# Trying to create a source with the same URL fails
|
||||
source_dict = self._get_source_dict()
|
||||
source_dict['name'] = 'test-source-action-new'
|
||||
|
||||
result = call_action_api('harvest_source_create',
|
||||
apikey=sysadmin['apikey'], status=409,
|
||||
**source_dict)
|
||||
|
||||
assert 'url' in result
|
||||
assert u'There already is a Harvest Source for this URL' in result['url'][0]
|
||||
|
||||
|
||||
class HarvestSourceFixtureMixin(object):
|
||||
def _get_source_dict(self):
|
||||
'''Not only returns a source_dict, but creates the HarvestSource object
|
||||
as well - suitable for testing update actions.
|
||||
'''
|
||||
source = HarvestSourceActionBase._get_source_dict(self)
|
||||
source = factories.HarvestSource(**source)
|
||||
# delete status because it gets in the way of the status supplied to
|
||||
# call_action_api later on. It is only a generated value, not affecting
|
||||
# the update/patch anyway.
|
||||
del source['status']
|
||||
return source
|
||||
|
||||
|
||||
class TestHarvestSourceActionUpdate(HarvestSourceFixtureMixin,
|
||||
HarvestSourceActionBase):
|
||||
def __init__(self):
|
||||
self.action = 'harvest_source_update'
|
||||
|
||||
def test_update(self):
|
||||
source_dict = self._get_source_dict()
|
||||
source_dict.update({
|
||||
"url": "http://test.action.updated.com",
|
||||
"name": "test-source-action-updated",
|
||||
"title": "Test source action updated",
|
||||
"notes": "Test source action desc updated",
|
||||
"source_type": "test",
|
||||
"frequency": "MONTHLY",
|
||||
"config": json.dumps({"custom_option": ["c", "d"]})
|
||||
})
|
||||
|
||||
sysadmin = ckan_factories.Sysadmin()
|
||||
result = call_action_api('harvest_source_update',
|
||||
apikey=sysadmin['apikey'], **source_dict)
|
||||
|
||||
for key in set(('url', 'name', 'title', 'notes', 'source_type',
|
||||
'frequency', 'config')):
|
||||
assert_equal(source_dict[key], result[key], "Key: %s" % key)
|
||||
|
||||
# Check that source was actually updated
|
||||
source = harvest_model.HarvestSource.get(result['id'])
|
||||
assert_equal(source.url, source_dict['url'])
|
||||
assert_equal(source.type, source_dict['source_type'])
|
||||
|
||||
|
||||
class TestHarvestSourceActionPatch(HarvestSourceFixtureMixin,
|
||||
HarvestSourceActionBase):
|
||||
def __init__(self):
|
||||
self.action = 'harvest_source_patch'
|
||||
if toolkit.check_ckan_version(max_version='2.2.99'):
|
||||
# harvest_source_patch only came in with ckan 2.3
|
||||
raise SkipTest()
|
||||
|
||||
def test_invalid_missing_values(self):
|
||||
pass
|
||||
|
||||
def test_patch(self):
|
||||
source_dict = self._get_source_dict()
|
||||
|
||||
patch_dict = {
|
||||
"id": source_dict['id'],
|
||||
"name": "test-source-action-patched",
|
||||
"url": "http://test.action.patched.com",
|
||||
"config": json.dumps({"custom_option": ["pat", "ched"]})
|
||||
}
|
||||
|
||||
sysadmin = ckan_factories.Sysadmin()
|
||||
result = call_action_api('harvest_source_patch',
|
||||
apikey=sysadmin['apikey'], **patch_dict)
|
||||
|
||||
source_dict.update(patch_dict)
|
||||
for key in set(('url', 'name', 'title', 'notes', 'source_type',
|
||||
'frequency', 'config')):
|
||||
assert_equal(source_dict[key], result[key], "Key: %s" % key)
|
||||
|
||||
# Check that source was actually updated
|
||||
source = harvest_model.HarvestSource.get(result['id'])
|
||||
assert_equal(source.url, source_dict['url'])
|
||||
assert_equal(source.type, source_dict['source_type'])
|
||||
|
||||
|
||||
class TestActions(ActionBase):
|
||||
def test_harvest_source_clear(self):
|
||||
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
|
||||
job = factories.HarvestJobObj(source=source)
|
||||
dataset = ckan_factories.Dataset()
|
||||
object_ = factories.HarvestObjectObj(job=job, source=source,
|
||||
package_id=dataset['id'])
|
||||
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'ignore_auth': True, 'user': ''}
|
||||
result = toolkit.get_action('harvest_source_clear')(
|
||||
context, {'id': source.id})
|
||||
|
||||
assert_equal(result, {'id': source.id})
|
||||
source = harvest_model.HarvestSource.get(source.id)
|
||||
assert source
|
||||
assert_equal(harvest_model.HarvestJob.get(job.id), None)
|
||||
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
|
||||
assert_equal(model.Package.get(dataset['id']), None)
|
||||
|
||||
def test_harvest_source_job_history_clear(self):
|
||||
# prepare
|
||||
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
|
||||
job = factories.HarvestJobObj(source=source)
|
||||
dataset = ckan_factories.Dataset()
|
||||
object_ = factories.HarvestObjectObj(job=job, source=source,
|
||||
package_id=dataset['id'])
|
||||
|
||||
# execute
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'ignore_auth': True, 'user': ''}
|
||||
result = toolkit.get_action('harvest_source_job_history_clear')(
|
||||
context, {'id': source.id})
|
||||
|
||||
# verify
|
||||
assert_equal(result, {'id': source.id})
|
||||
source = harvest_model.HarvestSource.get(source.id)
|
||||
assert source
|
||||
assert_equal(harvest_model.HarvestJob.get(job.id), None)
|
||||
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
|
||||
dataset_from_db = model.Package.get(dataset['id'])
|
||||
assert dataset_from_db, 'is None'
|
||||
assert_equal(dataset_from_db.id, dataset['id'])
|
||||
|
||||
def test_harvest_sources_job_history_clear(self):
|
||||
# prepare
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
source_1 = factories.HarvestSourceObj(**data_dict)
|
||||
data_dict['name'] = 'another-source'
|
||||
data_dict['url'] = 'http://another-url'
|
||||
source_2 = factories.HarvestSourceObj(**data_dict)
|
||||
|
||||
job_1 = factories.HarvestJobObj(source=source_1)
|
||||
dataset_1 = ckan_factories.Dataset()
|
||||
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
|
||||
package_id=dataset_1['id'])
|
||||
job_2 = factories.HarvestJobObj(source=source_2)
|
||||
dataset_2 = ckan_factories.Dataset()
|
||||
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
|
||||
package_id=dataset_2['id'])
|
||||
|
||||
# execute
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'ignore_auth': True, 'user': ''}
|
||||
result = toolkit.get_action('harvest_sources_job_history_clear')(
|
||||
context, {})
|
||||
|
||||
# verify
|
||||
assert_equal(
|
||||
sorted(result),
|
||||
sorted([{'id': source_1.id}, {'id': source_2.id}]))
|
||||
source_1 = harvest_model.HarvestSource.get(source_1.id)
|
||||
assert source_1
|
||||
assert_equal(harvest_model.HarvestJob.get(job_1.id), None)
|
||||
assert_equal(harvest_model.HarvestObject.get(object_1_.id), None)
|
||||
dataset_from_db_1 = model.Package.get(dataset_1['id'])
|
||||
assert dataset_from_db_1, 'is None'
|
||||
assert_equal(dataset_from_db_1.id, dataset_1['id'])
|
||||
source_2 = harvest_model.HarvestSource.get(source_1.id)
|
||||
assert source_2
|
||||
assert_equal(harvest_model.HarvestJob.get(job_2.id), None)
|
||||
assert_equal(harvest_model.HarvestObject.get(object_2_.id), None)
|
||||
dataset_from_db_2 = model.Package.get(dataset_2['id'])
|
||||
assert dataset_from_db_2, 'is None'
|
||||
assert_equal(dataset_from_db_2.id, dataset_2['id'])
|
||||
|
||||
def test_harvest_source_create_twice_with_unique_url(self):
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
factories.HarvestSourceObj(**data_dict)
|
||||
site_user = toolkit.get_action('get_site_user')(
|
||||
{'model': model, 'ignore_auth': True}, {})['name']
|
||||
data_dict['name'] = 'another-source'
|
||||
data_dict['url'] = 'http://another-url'
|
||||
toolkit.get_action('harvest_source_create')(
|
||||
{'user': site_user}, data_dict)
|
||||
|
||||
def test_harvest_source_create_twice_with_same_url(self):
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
factories.HarvestSourceObj(**data_dict)
|
||||
|
||||
site_user = toolkit.get_action('get_site_user')(
|
||||
{'model': model, 'ignore_auth': True}, {})['name']
|
||||
data_dict['name'] = 'another-source'
|
||||
assert_raises(toolkit.ValidationError,
|
||||
toolkit.get_action('harvest_source_create'),
|
||||
{'user': site_user}, data_dict)
|
||||
|
||||
def test_harvest_source_create_twice_with_unique_url_and_config(self):
|
||||
data_dict = SOURCE_DICT.copy()
|
||||
factories.HarvestSourceObj(**data_dict)
|
||||
|
||||
site_user = toolkit.get_action('get_site_user')(
|
||||
{'model': model, 'ignore_auth': True}, {})['name']
|
||||
data_dict['name'] = 'another-source'
|
||||
data_dict['config'] = '{"something": "new"}'
|
||||
toolkit.get_action('harvest_source_create')(
|
||||
{'user': site_user}, data_dict)
|
||||
|
||||
def test_harvest_job_create_as_sysadmin(self):
|
||||
source = factories.HarvestSource(**SOURCE_DICT.copy())
|
||||
|
||||
site_user = toolkit.get_action('get_site_user')(
|
||||
{'model': model, 'ignore_auth': True}, {})['name']
|
||||
data_dict = {
|
||||
'source_id': source['id'],
|
||||
'run': True
|
||||
}
|
||||
job = toolkit.get_action('harvest_job_create')(
|
||||
{'user': site_user}, data_dict)
|
||||
|
||||
assert_equal(job['source_id'], source['id'])
|
||||
assert_equal(job['status'], 'Running')
|
||||
assert_equal(job['gather_started'], None)
|
||||
assert_in('stats', job.keys())
|
||||
|
||||
def test_harvest_job_create_as_admin(self):
|
||||
# as if an admin user presses 'refresh'
|
||||
user = ckan_factories.User()
|
||||
user['capacity'] = 'admin'
|
||||
org = ckan_factories.Organization(users=[user])
|
||||
source_dict = dict(SOURCE_DICT.items() +
|
||||
[('publisher_id', org['id'])])
|
||||
source = factories.HarvestSource(**source_dict)
|
||||
|
||||
data_dict = {
|
||||
'source_id': source['id'],
|
||||
'run': True
|
||||
}
|
||||
job = toolkit.get_action('harvest_job_create')(
|
||||
{'user': user['name']}, data_dict)
|
||||
|
||||
assert_equal(job['source_id'], source['id'])
|
||||
assert_equal(job['status'], 'Running')
|
||||
assert_equal(job['gather_started'], None)
|
||||
assert_in('stats', job.keys())
|
||||
|
||||
|
||||
class TestHarvestObject(unittest.TestCase):
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
reset_db()
|
||||
harvest_model.setup()
|
||||
|
||||
def test_create(self):
|
||||
job = factories.HarvestJobObj()
|
||||
|
||||
context = {
|
||||
'model': model,
|
||||
'session': model.Session,
|
||||
'ignore_auth': True,
|
||||
}
|
||||
data_dict = {
|
||||
'guid': 'guid',
|
||||
'content': 'content',
|
||||
'job_id': job.id,
|
||||
'extras': {'a key': 'a value'},
|
||||
}
|
||||
harvest_object = toolkit.get_action('harvest_object_create')(
|
||||
context, data_dict)
|
||||
|
||||
# fetch the object from database to check it was created
|
||||
created_object = harvest_model.HarvestObject.get(harvest_object['id'])
|
||||
assert created_object.guid == harvest_object['guid'] == data_dict['guid']
|
||||
|
||||
def test_create_bad_parameters(self):
|
||||
source_a = factories.HarvestSourceObj()
|
||||
job = factories.HarvestJobObj()
|
||||
|
||||
context = {
|
||||
'model': model,
|
||||
'session': model.Session,
|
||||
'ignore_auth': True,
|
||||
}
|
||||
data_dict = {
|
||||
'job_id': job.id,
|
||||
'source_id': source_a.id,
|
||||
'extras': 1
|
||||
}
|
||||
harvest_object_create = toolkit.get_action('harvest_object_create')
|
||||
self.assertRaises(toolkit.ValidationError, harvest_object_create,
|
||||
context, data_dict)
|
||||
|
||||
data_dict['extras'] = {'test': 1}
|
||||
|
||||
self.assertRaises(toolkit.ValidationError, harvest_object_create,
|
||||
context, data_dict)
|
||||
|
||||
|
||||
class TestHarvestErrorMail(FunctionalTestBase):
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(TestHarvestErrorMail, cls).setup_class()
|
||||
reset_db()
|
||||
harvest_model.setup()
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
super(TestHarvestErrorMail, cls).teardown_class()
|
||||
reset_db()
|
||||
|
||||
def _create_harvest_source_and_job_if_not_existing(self):
|
||||
site_user = toolkit.get_action('get_site_user')(
|
||||
{'model': model, 'ignore_auth': True}, {})['name']
|
||||
|
||||
context = {
|
||||
'user': site_user,
|
||||
'model': model,
|
||||
'session': model.Session,
|
||||
'ignore_auth': True,
|
||||
}
|
||||
source_dict = {
|
||||
'title': 'Test Source',
|
||||
'name': 'test-source',
|
||||
'url': 'basic_test',
|
||||
'source_type': 'test',
|
||||
}
|
||||
|
||||
try:
|
||||
harvest_source = toolkit.get_action('harvest_source_create')(
|
||||
context,
|
||||
source_dict
|
||||
)
|
||||
except toolkit.ValidationError:
|
||||
harvest_source = toolkit.get_action('harvest_source_show')(
|
||||
context,
|
||||
{'id': source_dict['name']}
|
||||
)
|
||||
pass
|
||||
|
||||
try:
|
||||
job = toolkit.get_action('harvest_job_create')(context, {
|
||||
'source_id': harvest_source['id'], 'run': True})
|
||||
except HarvestJobExists:
|
||||
job = toolkit.get_action('harvest_job_show')(context, {
|
||||
'id': harvest_source['status']['last_job']['id']})
|
||||
pass
|
||||
|
||||
toolkit.get_action('harvest_jobs_run')(context, {})
|
||||
toolkit.get_action('harvest_source_reindex')(context, {'id': harvest_source['id']})
|
||||
return context, harvest_source, job
|
||||
|
||||
def _create_harvest_source_with_owner_org_and_job_if_not_existing(self):
|
||||
site_user = toolkit.get_action('get_site_user')(
|
||||
{'model': model, 'ignore_auth': True}, {})['name']
|
||||
|
||||
context = {
|
||||
'user': site_user,
|
||||
'model': model,
|
||||
'session': model.Session,
|
||||
'ignore_auth': True,
|
||||
}
|
||||
|
||||
test_org = ckan_factories.Organization()
|
||||
test_other_org = ckan_factories.Organization()
|
||||
org_admin_user = ckan_factories.User()
|
||||
org_member_user = ckan_factories.User()
|
||||
other_org_admin_user = ckan_factories.User()
|
||||
|
||||
toolkit.get_action('organization_member_create')(
|
||||
context.copy(),
|
||||
{
|
||||
'id': test_org['id'],
|
||||
'username': org_admin_user['name'],
|
||||
'role': 'admin'
|
||||
}
|
||||
)
|
||||
|
||||
toolkit.get_action('organization_member_create')(
|
||||
context.copy(),
|
||||
{
|
||||
'id': test_org['id'],
|
||||
'username': org_member_user['name'],
|
||||
'role': 'member'
|
||||
}
|
||||
)
|
||||
|
||||
toolkit.get_action('organization_member_create')(
|
||||
context.copy(),
|
||||
{
|
||||
'id': test_other_org['id'],
|
||||
'username': other_org_admin_user['name'],
|
||||
'role': 'admin'
|
||||
}
|
||||
)
|
||||
|
||||
source_dict = {
|
||||
'title': 'Test Source',
|
||||
'name': 'test-source',
|
||||
'url': 'basic_test',
|
||||
'source_type': 'test',
|
||||
'owner_org': test_org['id'],
|
||||
'run': True
|
||||
}
|
||||
|
||||
try:
|
||||
harvest_source = toolkit.get_action('harvest_source_create')(
|
||||
context.copy(),
|
||||
source_dict
|
||||
)
|
||||
except toolkit.ValidationError:
|
||||
harvest_source = toolkit.get_action('harvest_source_show')(
|
||||
context.copy(),
|
||||
{'id': source_dict['name']}
|
||||
)
|
||||
pass
|
||||
|
||||
try:
|
||||
job = toolkit.get_action('harvest_job_create')(context.copy(), {
|
||||
'source_id': harvest_source['id'], 'run': True})
|
||||
except HarvestJobExists:
|
||||
job = toolkit.get_action('harvest_job_show')(context.copy(), {
|
||||
'id': harvest_source['status']['last_job']['id']})
|
||||
pass
|
||||
|
||||
toolkit.get_action('harvest_jobs_run')(context.copy(), {})
|
||||
toolkit.get_action('harvest_source_reindex')(context.copy(), {'id': harvest_source['id']})
|
||||
return context, harvest_source, job
|
||||
|
||||
@patch('ckan.lib.mailer.mail_recipient')
|
||||
def test_error_mail_not_sent(self, mock_mailer_mail_recipient):
|
||||
context, harvest_source, job = self._create_harvest_source_and_job_if_not_existing()
|
||||
|
||||
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
|
||||
|
||||
send_error_mail(
|
||||
context,
|
||||
harvest_source['id'],
|
||||
status
|
||||
)
|
||||
assert_equal(0, status['last_job']['stats']['errored'])
|
||||
assert mock_mailer_mail_recipient.not_called
|
||||
|
||||
@patch('ckan.lib.mailer.mail_recipient')
|
||||
def test_error_mail_sent(self, mock_mailer_mail_recipient):
|
||||
context, harvest_source, job = self._create_harvest_source_and_job_if_not_existing()
|
||||
|
||||
# create a HarvestGatherError
|
||||
job_model = HarvestJob.get(job['id'])
|
||||
msg = 'System error - No harvester could be found for source type %s' % job_model.source.type
|
||||
err = HarvestGatherError(message=msg, job=job_model)
|
||||
err.save()
|
||||
|
||||
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
|
||||
|
||||
send_error_mail(
|
||||
context,
|
||||
harvest_source['id'],
|
||||
status
|
||||
)
|
||||
|
||||
assert_equal(1, status['last_job']['stats']['errored'])
|
||||
assert mock_mailer_mail_recipient.called
|
||||
|
||||
@patch('ckan.lib.mailer.mail_recipient')
|
||||
def test_error_mail_sent_with_object_error(self, mock_mailer_mail_recipient):
|
||||
|
||||
context, harvest_source, harvest_job = self._create_harvest_source_and_job_if_not_existing()
|
||||
|
||||
data_dict = {
|
||||
'guid': 'guid',
|
||||
'content': 'content',
|
||||
'job_id': harvest_job['id'],
|
||||
'extras': {'a key': 'a value'},
|
||||
'source_id': harvest_source['id']
|
||||
}
|
||||
harvest_object = toolkit.get_action('harvest_object_create')(
|
||||
context, data_dict)
|
||||
|
||||
harvest_object_model = HarvestObject.get(harvest_object['id'])
|
||||
|
||||
# create a HarvestObjectError
|
||||
msg = 'HarvestObjectError occured: %s' % harvest_job['id']
|
||||
harvest_object_error = HarvestObjectError(message=msg, object=harvest_object_model)
|
||||
harvest_object_error.save()
|
||||
|
||||
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
|
||||
|
||||
send_error_mail(
|
||||
context,
|
||||
harvest_source['id'],
|
||||
status
|
||||
)
|
||||
|
||||
assert_equal(1, status['last_job']['stats']['errored'])
|
||||
assert mock_mailer_mail_recipient.called
|
||||
|
||||
@patch('ckan.lib.mailer.mail_recipient')
|
||||
def test_error_mail_sent_with_org(self, mock_mailer_mail_recipient):
|
||||
context, harvest_source, job = self._create_harvest_source_with_owner_org_and_job_if_not_existing()
|
||||
|
||||
# create a HarvestGatherError
|
||||
job_model = HarvestJob.get(job['id'])
|
||||
msg = 'System error - No harvester could be found for source type %s' % job_model.source.type
|
||||
err = HarvestGatherError(message=msg, job=job_model)
|
||||
err.save()
|
||||
|
||||
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
|
||||
|
||||
send_error_mail(
|
||||
context,
|
||||
harvest_source['id'],
|
||||
status
|
||||
)
|
||||
|
||||
assert_equal(1, status['last_job']['stats']['errored'])
|
||||
assert mock_mailer_mail_recipient.called
|
||||
assert_equal(2, mock_mailer_mail_recipient.call_count)
|
|
@ -1,7 +1,7 @@
|
|||
from ckan.lib.helpers import url_for
|
||||
|
||||
from ckantoolkit.tests import helpers, factories
|
||||
from ckanext.harvest.tests import factories as harvest_factories
|
||||
from ckanext.harvest.tests.nose import factories as harvest_factories
|
||||
from nose.tools import assert_in
|
||||
import ckanext.harvest.model as harvest_model
|
||||
|
||||
|
|
|
@ -1,348 +0,0 @@
|
|||
from mock import patch
|
||||
|
||||
from ckantoolkit.tests.helpers import reset_db
|
||||
import ckanext.harvest.model as harvest_model
|
||||
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
|
||||
from ckanext.harvest.interfaces import IHarvester
|
||||
import ckanext.harvest.queue as queue
|
||||
from ckan.plugins.core import SingletonPlugin, implements
|
||||
import json
|
||||
import ckan.logic as logic
|
||||
from ckan import model
|
||||
from nose.tools import assert_equal, ok_
|
||||
from ckan.lib.base import config
|
||||
from nose.plugins.skip import SkipTest
|
||||
import uuid
|
||||
|
||||
|
||||
class MockHarvester(SingletonPlugin):
|
||||
implements(IHarvester)
|
||||
|
||||
def info(self):
|
||||
return {'name': 'test', 'title': 'test', 'description': 'test'}
|
||||
|
||||
def gather_stage(self, harvest_job):
|
||||
|
||||
if harvest_job.source.url.startswith('basic_test'):
|
||||
obj = HarvestObject(guid='test1', job=harvest_job)
|
||||
obj.extras.append(HarvestObjectExtra(key='key', value='value'))
|
||||
obj2 = HarvestObject(guid='test2', job=harvest_job)
|
||||
obj3 = HarvestObject(guid='test_to_delete', job=harvest_job)
|
||||
obj.add()
|
||||
obj2.add()
|
||||
obj3.save() # this will commit both
|
||||
return [obj.id, obj2.id, obj3.id]
|
||||
|
||||
return []
|
||||
|
||||
def fetch_stage(self, harvest_object):
|
||||
assert_equal(harvest_object.state, "FETCH")
|
||||
assert harvest_object.fetch_started is not None
|
||||
harvest_object.content = json.dumps({'name': harvest_object.guid})
|
||||
harvest_object.save()
|
||||
return True
|
||||
|
||||
def import_stage(self, harvest_object):
|
||||
assert_equal(harvest_object.state, "IMPORT")
|
||||
assert harvest_object.fetch_finished is not None
|
||||
assert harvest_object.import_started is not None
|
||||
|
||||
user = logic.get_action('get_site_user')(
|
||||
{'model': model, 'ignore_auth': True}, {}
|
||||
)['name']
|
||||
|
||||
package = json.loads(harvest_object.content)
|
||||
name = package['name']
|
||||
|
||||
package_object = model.Package.get(name)
|
||||
if package_object:
|
||||
logic_function = 'package_update'
|
||||
else:
|
||||
logic_function = 'package_create'
|
||||
|
||||
package_dict = logic.get_action(logic_function)(
|
||||
{'model': model, 'session': model.Session,
|
||||
'user': user, 'api_version': 3, 'ignore_auth': True},
|
||||
json.loads(harvest_object.content)
|
||||
)
|
||||
|
||||
# set previous objects to not current
|
||||
previous_object = model.Session.query(HarvestObject) \
|
||||
.filter(HarvestObject.guid == harvest_object.guid) \
|
||||
.filter(
|
||||
HarvestObject.current == True # noqa: E712
|
||||
).first()
|
||||
if previous_object:
|
||||
previous_object.current = False
|
||||
previous_object.save()
|
||||
|
||||
# delete test_to_delete package on second run
|
||||
harvest_object.package_id = package_dict['id']
|
||||
harvest_object.current = True
|
||||
if package_dict['name'] == 'test_to_delete' and package_object:
|
||||
harvest_object.current = False
|
||||
package_object.state = 'deleted'
|
||||
package_object.save()
|
||||
|
||||
harvest_object.save()
|
||||
return True
|
||||
|
||||
|
||||
class TestHarvestQueue(object):
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
reset_db()
|
||||
harvest_model.setup()
|
||||
|
||||
def test_01_basic_harvester(self):
|
||||
|
||||
# make sure queues/exchanges are created first and are empty
|
||||
consumer = queue.get_gather_consumer()
|
||||
consumer_fetch = queue.get_fetch_consumer()
|
||||
consumer.queue_purge(queue=queue.get_gather_queue_name())
|
||||
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
|
||||
|
||||
user = logic.get_action('get_site_user')(
|
||||
{'model': model, 'ignore_auth': True}, {}
|
||||
)['name']
|
||||
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'user': user, 'api_version': 3, 'ignore_auth': True}
|
||||
|
||||
source_dict = {
|
||||
'title': 'Test Source',
|
||||
'name': 'test-source',
|
||||
'url': 'basic_test',
|
||||
'source_type': 'test',
|
||||
}
|
||||
|
||||
harvest_source = logic.get_action('harvest_source_create')(
|
||||
context,
|
||||
source_dict
|
||||
)
|
||||
|
||||
assert harvest_source['source_type'] == 'test', harvest_source
|
||||
assert harvest_source['url'] == 'basic_test', harvest_source
|
||||
|
||||
harvest_job = logic.get_action('harvest_job_create')(
|
||||
context,
|
||||
{'source_id': harvest_source['id'], 'run': True}
|
||||
)
|
||||
|
||||
job_id = harvest_job['id']
|
||||
|
||||
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
|
||||
|
||||
assert harvest_job['status'] == u'Running'
|
||||
|
||||
assert logic.get_action('harvest_job_show')(
|
||||
context,
|
||||
{'id': job_id}
|
||||
)['status'] == u'Running'
|
||||
|
||||
# pop on item off the queue and run the callback
|
||||
reply = consumer.basic_get(queue='ckan.harvest.gather')
|
||||
|
||||
queue.gather_callback(consumer, *reply)
|
||||
|
||||
all_objects = model.Session.query(HarvestObject).all()
|
||||
|
||||
assert len(all_objects) == 3
|
||||
assert all_objects[0].state == 'WAITING'
|
||||
assert all_objects[1].state == 'WAITING'
|
||||
assert all_objects[2].state == 'WAITING'
|
||||
|
||||
assert len(model.Session.query(HarvestObject).all()) == 3
|
||||
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
|
||||
|
||||
# do three times as three harvest objects
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
|
||||
count = model.Session.query(model.Package) \
|
||||
.filter(model.Package.type == 'dataset') \
|
||||
.count()
|
||||
assert count == 3
|
||||
all_objects = model.Session.query(HarvestObject).filter_by(current=True).all()
|
||||
|
||||
assert_equal(len(all_objects), 3)
|
||||
assert_equal(all_objects[0].state, 'COMPLETE')
|
||||
assert_equal(all_objects[0].report_status, 'added')
|
||||
assert_equal(all_objects[1].state, 'COMPLETE')
|
||||
assert_equal(all_objects[1].report_status, 'added')
|
||||
assert_equal(all_objects[2].state, 'COMPLETE')
|
||||
assert_equal(all_objects[2].report_status, 'added')
|
||||
|
||||
# fire run again to check if job is set to Finished
|
||||
logic.get_action('harvest_jobs_run')(
|
||||
context,
|
||||
{'source_id': harvest_source['id']}
|
||||
)
|
||||
|
||||
harvest_job = logic.get_action('harvest_job_show')(
|
||||
context,
|
||||
{'id': job_id}
|
||||
)
|
||||
|
||||
assert_equal(harvest_job['status'], u'Finished')
|
||||
assert_equal(harvest_job['stats'], {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0})
|
||||
|
||||
harvest_source_dict = logic.get_action('harvest_source_show')(
|
||||
context,
|
||||
{'id': harvest_source['id']}
|
||||
)
|
||||
|
||||
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 3, 'updated': 0,
|
||||
'not modified': 0, 'errored': 0, 'deleted': 0})
|
||||
assert_equal(harvest_source_dict['status']['total_datasets'], 3)
|
||||
assert_equal(harvest_source_dict['status']['job_count'], 1)
|
||||
|
||||
# Second run
|
||||
harvest_job = logic.get_action('harvest_job_create')(
|
||||
context,
|
||||
{'source_id': harvest_source['id'], 'run': True}
|
||||
)
|
||||
|
||||
job_id = harvest_job['id']
|
||||
assert logic.get_action('harvest_job_show')(
|
||||
context,
|
||||
{'id': job_id}
|
||||
)['status'] == u'Running'
|
||||
|
||||
# pop on item off the queue and run the callback
|
||||
reply = consumer.basic_get(queue='ckan.harvest.gather')
|
||||
queue.gather_callback(consumer, *reply)
|
||||
|
||||
all_objects = model.Session.query(HarvestObject).all()
|
||||
|
||||
assert len(all_objects) == 6
|
||||
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
|
||||
count = model.Session.query(model.Package) \
|
||||
.filter(model.Package.type == 'dataset') \
|
||||
.count()
|
||||
assert_equal(count, 3)
|
||||
|
||||
all_objects = model.Session.query(HarvestObject).filter_by(report_status='added').all()
|
||||
assert_equal(len(all_objects), 3)
|
||||
|
||||
all_objects = model.Session.query(HarvestObject).filter_by(report_status='updated').all()
|
||||
assert_equal(len(all_objects), 2)
|
||||
|
||||
all_objects = model.Session.query(HarvestObject).filter_by(report_status='deleted').all()
|
||||
assert_equal(len(all_objects), 1)
|
||||
|
||||
# run to make sure job is marked as finshed
|
||||
logic.get_action('harvest_jobs_run')(
|
||||
context,
|
||||
{'source_id': harvest_source['id']}
|
||||
)
|
||||
|
||||
harvest_job = logic.get_action('harvest_job_show')(
|
||||
context,
|
||||
{'id': job_id}
|
||||
)
|
||||
assert_equal(harvest_job['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1})
|
||||
|
||||
harvest_source_dict = logic.get_action('harvest_source_show')(
|
||||
context,
|
||||
{'id': harvest_source['id']}
|
||||
)
|
||||
|
||||
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2,
|
||||
'not modified': 0, 'errored': 0, 'deleted': 1})
|
||||
assert_equal(harvest_source_dict['status']['total_datasets'], 2)
|
||||
assert_equal(harvest_source_dict['status']['job_count'], 2)
|
||||
|
||||
def test_redis_queue_purging(self):
|
||||
'''
|
||||
Test that Redis queue purging doesn't purge the wrong keys.
|
||||
'''
|
||||
if config.get('ckan.harvest.mq.type') != 'redis':
|
||||
raise SkipTest()
|
||||
redis = queue.get_connection()
|
||||
try:
|
||||
redis.set('ckanext-harvest:some-random-key', 'foobar')
|
||||
|
||||
# Create some fake jobs
|
||||
gather_publisher = queue.get_gather_publisher()
|
||||
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
|
||||
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
|
||||
fetch_publisher = queue.get_fetch_publisher()
|
||||
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
|
||||
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
|
||||
num_keys = redis.dbsize()
|
||||
|
||||
# Create some fake objects
|
||||
gather_consumer = queue.get_gather_consumer()
|
||||
next(gather_consumer.consume(queue.get_gather_queue_name()))
|
||||
fetch_consumer = queue.get_fetch_consumer()
|
||||
next(fetch_consumer.consume(queue.get_fetch_queue_name()))
|
||||
|
||||
ok_(redis.dbsize() > num_keys)
|
||||
|
||||
queue.purge_queues()
|
||||
|
||||
assert_equal(redis.get('ckanext-harvest:some-random-key'),
|
||||
'foobar')
|
||||
assert_equal(redis.dbsize(), num_keys)
|
||||
assert_equal(redis.llen(queue.get_gather_routing_key()), 0)
|
||||
assert_equal(redis.llen(queue.get_fetch_routing_key()), 0)
|
||||
finally:
|
||||
redis.delete('ckanext-harvest:some-random-key')
|
||||
|
||||
|
||||
class TestHarvestCorruptRedis(object):
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
reset_db()
|
||||
harvest_model.setup()
|
||||
|
||||
@patch('ckanext.harvest.queue.log.error')
|
||||
def test_redis_corrupt(self, mock_log_error):
|
||||
'''
|
||||
Test that corrupt Redis doesn't stop harvest process and still processes other jobs.
|
||||
'''
|
||||
if config.get('ckan.harvest.mq.type') != 'redis':
|
||||
raise SkipTest()
|
||||
redis = queue.get_connection()
|
||||
try:
|
||||
redis.set('ckanext-harvest:some-random-key-2', 'foobar')
|
||||
|
||||
# make sure queues/exchanges are created first and are empty
|
||||
gather_consumer = queue.get_gather_consumer()
|
||||
fetch_consumer = queue.get_fetch_consumer()
|
||||
gather_consumer.queue_purge(queue=queue.get_gather_queue_name())
|
||||
fetch_consumer.queue_purge(queue=queue.get_fetch_queue_name())
|
||||
|
||||
# Create some fake jobs and objects with no harvest_job_id
|
||||
gather_publisher = queue.get_gather_publisher()
|
||||
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
|
||||
fetch_publisher = queue.get_fetch_publisher()
|
||||
fetch_publisher.send({'harvest_object_id': None})
|
||||
h_obj_id = str(uuid.uuid4())
|
||||
fetch_publisher.send({'harvest_object_id': h_obj_id})
|
||||
|
||||
# Create some fake objects
|
||||
next(gather_consumer.consume(queue.get_gather_queue_name()))
|
||||
_, _, body = next(fetch_consumer.consume(queue.get_fetch_queue_name()))
|
||||
|
||||
json_obj = json.loads(body)
|
||||
assert json_obj['harvest_object_id'] == h_obj_id
|
||||
|
||||
assert mock_log_error.call_count == 1
|
||||
args, _ = mock_log_error.call_args_list[0]
|
||||
assert "cannot concatenate 'str' and 'NoneType' objects" in args[1]
|
||||
|
||||
finally:
|
||||
redis.delete('ckanext-harvest:some-random-key-2')
|
|
@ -1,180 +0,0 @@
|
|||
'''Tests elements of queue.py, but doesn't use the queue subsystem
|
||||
(redis/rabbitmq)
|
||||
'''
|
||||
import json
|
||||
|
||||
from nose.tools import assert_equal
|
||||
from ckantoolkit.tests.helpers import reset_db
|
||||
|
||||
from ckan import model
|
||||
from ckan import plugins as p
|
||||
from ckan.plugins import toolkit
|
||||
|
||||
from ckanext.harvest.tests.factories import (HarvestObjectObj)
|
||||
from ckanext.harvest.interfaces import IHarvester
|
||||
import ckanext.harvest.model as harvest_model
|
||||
from ckanext.harvest.tests.lib import run_harvest
|
||||
|
||||
|
||||
class MockHarvester(p.SingletonPlugin):
|
||||
p.implements(IHarvester)
|
||||
|
||||
@classmethod
|
||||
def _set_test_params(cls, guid, **test_params):
|
||||
cls._guid = guid
|
||||
cls._test_params = test_params
|
||||
|
||||
def info(self):
|
||||
return {'name': 'test2', 'title': 'test', 'description': 'test'}
|
||||
|
||||
def gather_stage(self, harvest_job):
|
||||
obj = HarvestObjectObj(guid=self._guid, job=harvest_job)
|
||||
return [obj.id]
|
||||
|
||||
def fetch_stage(self, harvest_object):
|
||||
if self._test_params.get('fetch_object_unchanged'):
|
||||
return 'unchanged'
|
||||
harvest_object.content = json.dumps({'name': harvest_object.guid})
|
||||
harvest_object.save()
|
||||
return True
|
||||
|
||||
def import_stage(self, harvest_object):
|
||||
user = toolkit.get_action('get_site_user')(
|
||||
{'model': model, 'ignore_auth': True}, {}
|
||||
)['name']
|
||||
|
||||
package = json.loads(harvest_object.content)
|
||||
name = package['name']
|
||||
|
||||
package_object = model.Package.get(name)
|
||||
if package_object:
|
||||
logic_function = 'package_update'
|
||||
else:
|
||||
logic_function = 'package_create'
|
||||
|
||||
package_dict = toolkit.get_action(logic_function)(
|
||||
{'model': model, 'session': model.Session,
|
||||
'user': user},
|
||||
json.loads(harvest_object.content)
|
||||
)
|
||||
|
||||
if self._test_params.get('object_error'):
|
||||
return False
|
||||
|
||||
# successful, so move 'current' to this object
|
||||
previous_object = model.Session.query(harvest_model.HarvestObject) \
|
||||
.filter_by(guid=harvest_object.guid) \
|
||||
.filter_by(current=True) \
|
||||
.first()
|
||||
if previous_object:
|
||||
previous_object.current = False
|
||||
previous_object.save()
|
||||
harvest_object.package_id = package_dict['id']
|
||||
harvest_object.current = True
|
||||
|
||||
if self._test_params.get('delete'):
|
||||
# 'current=False' is the key step in getting report_status to be
|
||||
# set as 'deleted'
|
||||
harvest_object.current = False
|
||||
package_object.save()
|
||||
|
||||
harvest_object.save()
|
||||
|
||||
if self._test_params.get('import_object_unchanged'):
|
||||
return 'unchanged'
|
||||
return True
|
||||
|
||||
|
||||
class TestEndStates(object):
|
||||
def setup(self):
|
||||
reset_db()
|
||||
harvest_model.setup()
|
||||
|
||||
def test_create_dataset(self):
|
||||
guid = 'obj-create'
|
||||
MockHarvester._set_test_params(guid=guid)
|
||||
|
||||
results_by_guid = run_harvest(
|
||||
url='http://some-url.com',
|
||||
harvester=MockHarvester())
|
||||
|
||||
result = results_by_guid[guid]
|
||||
assert_equal(result['state'], 'COMPLETE')
|
||||
assert_equal(result['report_status'], 'added')
|
||||
assert_equal(result['errors'], [])
|
||||
|
||||
def test_update_dataset(self):
|
||||
guid = 'obj-update'
|
||||
MockHarvester._set_test_params(guid=guid)
|
||||
|
||||
# create the original harvest_object and dataset
|
||||
run_harvest(
|
||||
url='http://some-url.com',
|
||||
harvester=MockHarvester())
|
||||
# update it
|
||||
results_by_guid = run_harvest(
|
||||
url='http://some-url.com',
|
||||
harvester=MockHarvester())
|
||||
|
||||
result = results_by_guid[guid]
|
||||
assert_equal(result['state'], 'COMPLETE')
|
||||
assert_equal(result['report_status'], 'updated')
|
||||
assert_equal(result['errors'], [])
|
||||
|
||||
def test_delete_dataset(self):
|
||||
guid = 'obj-delete'
|
||||
MockHarvester._set_test_params(guid=guid)
|
||||
# create the original harvest_object and dataset
|
||||
run_harvest(
|
||||
url='http://some-url.com',
|
||||
harvester=MockHarvester())
|
||||
MockHarvester._set_test_params(guid=guid, delete=True)
|
||||
|
||||
# delete it
|
||||
results_by_guid = run_harvest(
|
||||
url='http://some-url.com',
|
||||
harvester=MockHarvester())
|
||||
|
||||
result = results_by_guid[guid]
|
||||
assert_equal(result['state'], 'COMPLETE')
|
||||
assert_equal(result['report_status'], 'deleted')
|
||||
assert_equal(result['errors'], [])
|
||||
|
||||
def test_obj_error(self):
|
||||
guid = 'obj-error'
|
||||
MockHarvester._set_test_params(guid=guid, object_error=True)
|
||||
|
||||
results_by_guid = run_harvest(
|
||||
url='http://some-url.com',
|
||||
harvester=MockHarvester())
|
||||
|
||||
result = results_by_guid[guid]
|
||||
assert_equal(result['state'], 'ERROR')
|
||||
assert_equal(result['report_status'], 'errored')
|
||||
assert_equal(result['errors'], [])
|
||||
|
||||
def test_fetch_unchanged(self):
|
||||
guid = 'obj-error'
|
||||
MockHarvester._set_test_params(guid=guid, fetch_object_unchanged=True)
|
||||
|
||||
results_by_guid = run_harvest(
|
||||
url='http://some-url.com',
|
||||
harvester=MockHarvester())
|
||||
|
||||
result = results_by_guid[guid]
|
||||
assert_equal(result['state'], 'COMPLETE')
|
||||
assert_equal(result['report_status'], 'not modified')
|
||||
assert_equal(result['errors'], [])
|
||||
|
||||
def test_import_unchanged(self):
|
||||
guid = 'obj-error'
|
||||
MockHarvester._set_test_params(guid=guid, import_object_unchanged=True)
|
||||
|
||||
results_by_guid = run_harvest(
|
||||
url='http://some-url.com',
|
||||
harvester=MockHarvester())
|
||||
|
||||
result = results_by_guid[guid]
|
||||
assert_equal(result['state'], 'COMPLETE')
|
||||
assert_equal(result['report_status'], 'not modified')
|
||||
assert_equal(result['errors'], [])
|
|
@ -3,7 +3,13 @@ import json
|
|||
import pytest
|
||||
|
||||
from ckan import plugins as p
|
||||
<<<<<<< HEAD
|
||||
from ckan import model
|
||||
=======
|
||||
|
||||
from ckantoolkit.tests import factories as ckan_factories, helpers
|
||||
from ckanext.harvest.tests import factories
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
||||
from ckantoolkit import ValidationError, get_action
|
||||
from ckantoolkit.tests import factories as ckan_factories, helpers
|
||||
|
@ -60,7 +66,11 @@ SOURCE_DICT = {
|
|||
|
||||
@pytest.mark.usefixtures('with_plugins', 'clean_db', 'harvest_setup', 'clean_queues')
|
||||
@pytest.mark.ckan_config('ckan.plugins', 'harvest test_action_harvester')
|
||||
<<<<<<< HEAD
|
||||
class HarvestSourceActionBase(object):
|
||||
=======
|
||||
class HarvestSourceActionBase():
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
||||
def _get_source_dict(self):
|
||||
return {
|
||||
|
@ -79,18 +89,33 @@ class HarvestSourceActionBase(object):
|
|||
if 'id' in test_data:
|
||||
source_dict['id'] = test_data['id']
|
||||
|
||||
<<<<<<< HEAD
|
||||
with pytest.raises(ValidationError) as e:
|
||||
helpers.call_action(self.action, **source_dict)
|
||||
|
||||
for key in ('name', 'title', 'url', 'source_type'):
|
||||
assert e.value.error_dict[key] == [u'Missing value']
|
||||
=======
|
||||
|
||||
result = helpers.call_action(self.action,
|
||||
**source_dict)
|
||||
|
||||
for key in ('name', 'title', 'url', 'source_type'):
|
||||
assert result[key] == [u'Missing value']
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
||||
def test_invalid_unknown_type(self):
|
||||
source_dict = self._get_source_dict()
|
||||
source_dict['source_type'] = 'unknown'
|
||||
|
||||
<<<<<<< HEAD
|
||||
with pytest.raises(ValidationError) as e:
|
||||
helpers.call_action(self.action, **source_dict)
|
||||
=======
|
||||
|
||||
result = helpers.call_action(self.action,
|
||||
**source_dict)
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
||||
assert u'Unknown harvester type' in e.value.error_dict['source_type'][0]
|
||||
|
||||
|
@ -99,8 +124,14 @@ class HarvestSourceActionBase(object):
|
|||
source_dict = self._get_source_dict()
|
||||
source_dict['frequency'] = wrong_frequency
|
||||
|
||||
<<<<<<< HEAD
|
||||
with pytest.raises(ValidationError) as e:
|
||||
helpers.call_action(self.action, **source_dict)
|
||||
=======
|
||||
|
||||
result = helpers.call_action(self.action,
|
||||
**source_dict)
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
||||
assert u'Frequency {0} not recognised'.format(wrong_frequency) in e.value.error_dict['frequency'][0]
|
||||
|
||||
|
@ -108,15 +139,26 @@ class HarvestSourceActionBase(object):
|
|||
source_dict = self._get_source_dict()
|
||||
source_dict['config'] = 'not_json'
|
||||
|
||||
<<<<<<< HEAD
|
||||
with pytest.raises(ValidationError) as e:
|
||||
helpers.call_action(self.action, **source_dict)
|
||||
=======
|
||||
|
||||
result = helpers.call_action(self.action,
|
||||
**source_dict)
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
||||
assert u'Error parsing the configuration options: No JSON object could be decoded' in e.value.error_dict['config'][0]
|
||||
|
||||
source_dict['config'] = json.dumps({'custom_option': 'not_a_list'})
|
||||
|
||||
<<<<<<< HEAD
|
||||
with pytest.raises(ValidationError) as e:
|
||||
helpers.call_action(self.action, **source_dict)
|
||||
=======
|
||||
result = helpers.call_action(self.action,
|
||||
**source_dict)
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
||||
assert u'Error parsing the configuration options: custom_option must be a list' in e.value.error_dict['config'][0]
|
||||
|
||||
|
@ -129,8 +171,13 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase):
|
|||
|
||||
source_dict = self._get_source_dict()
|
||||
|
||||
<<<<<<< HEAD
|
||||
result = helpers.call_action(
|
||||
'harvest_source_create', **source_dict)
|
||||
=======
|
||||
result = helpers.call_action('harvest_source_create',
|
||||
**source_dict)
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
||||
for key in source_dict.keys():
|
||||
assert source_dict[key] == result[key]
|
||||
|
@ -144,13 +191,19 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase):
|
|||
source_dict = self._get_source_dict()
|
||||
source_dict['name'] = 'test-source-action-new'
|
||||
|
||||
<<<<<<< HEAD
|
||||
with pytest.raises(ValidationError) as e:
|
||||
result = helpers.call_action(
|
||||
'harvest_source_create', **source_dict)
|
||||
=======
|
||||
result = helpers.call_action('harvest_source_create',
|
||||
**source_dict)
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
||||
assert u'There already is a Harvest Source for this URL' in e.value.error_dict['url'][0]
|
||||
|
||||
|
||||
<<<<<<< HEAD
|
||||
class HarvestSourceFixtureMixin(object):
|
||||
def _get_source_dict(self):
|
||||
'''Not only returns a source_dict, but creates the HarvestSource object
|
||||
|
@ -385,3 +438,5 @@ class TestActions():
|
|||
assert job['status'] == 'Running'
|
||||
assert job['gather_started'] is None
|
||||
assert 'stats' in job.keys()
|
||||
=======
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
|
|
@ -84,6 +84,7 @@ class MockHarvester(SingletonPlugin):
|
|||
harvest_object.save()
|
||||
return True
|
||||
|
||||
<<<<<<< HEAD
|
||||
|
||||
@pytest.mark.usefixtures('with_plugins', 'clean_db', 'harvest_setup', 'clean_queues')
|
||||
@pytest.mark.ckan_config('ckan.plugins', 'harvest test_harvester')
|
||||
|
@ -336,3 +337,7 @@ class TestHarvestCorruptRedis(object):
|
|||
|
||||
finally:
|
||||
redis.delete('ckanext-harvest:some-random-key-2')
|
||||
=======
|
||||
def test_a(self):
|
||||
assert 1
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
|
|
@ -3,7 +3,11 @@
|
|||
'''
|
||||
import json
|
||||
|
||||
<<<<<<< HEAD
|
||||
import pytest
|
||||
=======
|
||||
from ckantoolkit.tests.helpers import reset_db
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
||||
from ckan import model
|
||||
from ckan import plugins as p
|
||||
|
@ -84,6 +88,7 @@ class MockHarvester(p.SingletonPlugin):
|
|||
return True
|
||||
|
||||
|
||||
<<<<<<< HEAD
|
||||
@pytest.mark.usefixtures('with_plugins', 'clean_db', 'harvest_setup', 'clean_queues')
|
||||
@pytest.mark.ckan_config('ckan.plugins', 'harvest test_harvester2')
|
||||
class TestEndStates(object):
|
||||
|
@ -176,3 +181,7 @@ class TestEndStates(object):
|
|||
assert result['state'] == 'COMPLETE'
|
||||
assert result['report_status'] == 'not modified'
|
||||
assert result['errors'] == []
|
||||
=======
|
||||
def test_a(self):
|
||||
assert 1
|
||||
>>>>>>> 4ee8fa2a5df10b8ea583618e2e89076ef7f7c1b0
|
||||
|
|
10
setup.py
10
setup.py
|
@ -32,10 +32,18 @@ setup(
|
|||
# Add plugins here, eg
|
||||
harvest=ckanext.harvest.plugin:Harvest
|
||||
ckan_harvester=ckanext.harvest.harvesters:CKANHarvester
|
||||
[ckan.test_plugins]
|
||||
|
||||
# Test plugins
|
||||
|
||||
test_harvester=ckanext.harvest.tests.test_queue:MockHarvester
|
||||
test_harvester2=ckanext.harvest.tests.test_queue2:MockHarvester
|
||||
test_action_harvester=ckanext.harvest.tests.test_action:MockHarvesterForActionTests
|
||||
|
||||
test_nose_harvester=ckanext.harvest.tests.nose.test_queue:MockHarvester
|
||||
test_nose_harvester2=ckanext.harvest.tests.nose.test_queue2:MockHarvester
|
||||
test_nose_action_harvester=ckanext.harvest.tests.nose.test_action:MockHarvesterForActionTests
|
||||
|
||||
|
||||
[paste.paster_command]
|
||||
harvester = ckanext.harvest.commands.harvester:Harvester
|
||||
[babel.extractors]
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
[DEFAULT]
|
||||
debug = false
|
||||
# Uncomment and replace with the address which should receive any error reports
|
||||
#email_to = you@yourdomain.com
|
||||
smtp_server = localhost
|
||||
error_email_from = paste@localhost
|
||||
|
||||
[server:main]
|
||||
use = egg:Paste#http
|
||||
host = 0.0.0.0
|
||||
port = 5000
|
||||
|
||||
|
||||
[app:main]
|
||||
use = config:../ckan/test-core.ini
|
||||
# Here we hard-code the database and a flag to make default tests
|
||||
# run fast.
|
||||
ckan.plugins = harvest ckan_harvester test_nose_harvester test_nose_harvester2 test_nose_action_harvester
|
||||
ckan.harvest.mq.type = redis
|
||||
ckan.legacy_templates = false
|
||||
# NB: other test configuration should go in test-core.ini, which is
|
||||
# what the postgres tests use.
|
||||
|
||||
|
||||
# Logging configuration
|
||||
[loggers]
|
||||
keys = root, ckan, sqlalchemy
|
||||
|
||||
[handlers]
|
||||
keys = console, dblog
|
||||
|
||||
[formatters]
|
||||
keys = generic, dblog
|
||||
|
||||
[logger_root]
|
||||
level = WARN
|
||||
handlers = console
|
||||
|
||||
[logger_ckan]
|
||||
qualname = ckan
|
||||
handlers =
|
||||
level = INFO
|
||||
|
||||
[logger_ckan_harvester]
|
||||
qualname = ckanext.harvest
|
||||
handlers = dblog
|
||||
level = DEBUG
|
||||
|
||||
[logger_sqlalchemy]
|
||||
handlers =
|
||||
qualname = sqlalchemy.engine
|
||||
level = WARN
|
||||
|
||||
[handler_console]
|
||||
class = StreamHandler
|
||||
args = (sys.stdout,)
|
||||
level = NOTSET
|
||||
formatter = generic
|
||||
|
||||
[handler_dblog]
|
||||
class = ckanext.harvest.log.DBLogHandler
|
||||
args = ()
|
||||
level = DEBUG
|
||||
formatter = dblog
|
||||
|
||||
[formatter_dblog]
|
||||
format = %(message)s
|
||||
|
||||
[formatter_generic]
|
||||
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s
|
Loading…
Reference in New Issue