Merge branch 'master' into 157-version-three-apify

This commit is contained in:
David Read 2015-10-23 14:39:48 +01:00
commit caeeace8dc
18 changed files with 1144 additions and 188 deletions

2
.gitignore vendored
View File

@ -8,6 +8,6 @@ build
.DS_Store
dist
development.ini
*.swp
*.sw?
*~
node_modules

View File

@ -5,6 +5,7 @@ env:
- CKANVERSION=master
- CKANVERSION=2.2
- CKANVERSION=2.3
- CKANVERSION=2.4
services:
- redis-server
install:

View File

@ -29,7 +29,7 @@ Installation
On your CKAN configuration file, add::
ckan.harvest.mq.type = rabbitmq
ckan.harvest.mq.type = amqp
2. Install the extension into your python environment::
@ -149,12 +149,12 @@ Authorization
=============
Starting from CKAN 2.0, harvest sources behave exactly the same as datasets
(they are actually internally implemented as a dataset type). That means that
(they are actually internally implemented as a dataset type). That means they
can be searched and faceted, and that the same authorization rules can be
applied to them. The default authorization settings are based on organizations
(equivalent to the `publisher profile` found in old versions).
Have a look at the `Authorization <http://docs.ckan.org/en/latest/authorization.html>`_
Have a look at the `Authorization <http://docs.ckan.org/en/latest/authorization.html>`_
documentation on CKAN core to see how to configure your instance depending on
your needs.
@ -429,7 +429,7 @@ The ``run`` command not only starts any pending harvesting jobs, but also
flags those that are finished, allowing new jobs to be created on that particular
source and refreshing the source statistics. That means that you will need to run
this command before being able to create a new job on a source that was being
harvested (On a production site you will tipically have a cron job that runs the
harvested (On a production site you will typically have a cron job that runs the
command regularly, see next section).
@ -598,4 +598,3 @@ http://www.fsf.org/licensing/licenses/agpl-3.0.html
.. _Supervisor: http://supervisord.org

View File

@ -10,12 +10,16 @@ sudo apt-get install postgresql-9.1 solr-jetty libcommons-fileupload-java:amd64=
echo "Installing CKAN and its Python dependencies..."
git clone https://github.com/ckan/ckan
cd ckan
if [ $CKANVERSION == '2.3' ]
if [ $CKANVERSION == '2.4' ]
then
git checkout release-v2.3
git checkout release-v2.4-latest
elif [ $CKANVERSION == '2.3' ]
then
git checkout release-v2.3-latest
elif [ $CKANVERSION == '2.2' ]
then
git checkout release-v2.2.3
git checkout release-v2.2-latest
fi
python setup.py develop
pip install -r requirements.txt --allow-all-external

View File

@ -45,6 +45,8 @@ class Harvester(CkanCommand):
harvester purge_queues
- removes all jobs from fetch and gather queue
WARNING: if using Redis, this command purges all data in the current
Redis database
harvester [-j] [-o] [--segments={segments}] import [{source-id}]
- perform the import stage with the last fetched objects, for a certain

View File

@ -8,7 +8,7 @@ from pylons import config
from ckan import plugins as p
from ckan import model
from ckan.model import Session, Package
from ckan.model import Session, Package, PACKAGE_NAME_MAX_LENGTH
from ckan.logic import ValidationError, NotFound, get_action
from ckan.logic.schema import default_create_package_schema
@ -41,21 +41,100 @@ class HarvesterBase(SingletonPlugin):
_user_name = None
def _gen_new_name(self, title):
@classmethod
def _gen_new_name(cls, title, existing_name=None,
append_type='number-sequence'):
'''
Creates a URL friendly name from a title
Returns a 'name' for the dataset (URL friendly), based on the title.
If the name already exists, it will add some random characters at the end
If the ideal name is already used, it will append a number to it to
ensure it is unique.
If generating a new name because the title of the dataset has changed,
specify the existing name, in case the name doesn't need to change
after all.
:param existing_name: the current name of the dataset - only specify
this if the dataset exists
:type existing_name: string
:param append_type: the type of characters to add to make it unique -
either 'number-sequence' or 'random-hex'.
:type append_type: string
'''
name = munge_title_to_name(title).replace('_', '-')
while '--' in name:
name = name.replace('--', '-')
pkg_obj = Session.query(Package).filter(Package.name == name).first()
if pkg_obj:
return name + str(uuid.uuid4())[:5]
ideal_name = munge_title_to_name(title)
ideal_name = re.sub('-+', '-', ideal_name) # collapse multiple dashes
return cls._ensure_name_is_unique(ideal_name,
existing_name=existing_name,
append_type=append_type)
@staticmethod
def _ensure_name_is_unique(ideal_name, existing_name=None,
append_type='number-sequence'):
'''
Returns a dataset name based on the ideal_name, only it will be
guaranteed to be different than all the other datasets, by adding a
number on the end if necessary.
If generating a new name because the title of the dataset has changed,
specify the existing name, in case the name doesn't need to change
after all.
The maximum dataset name length is taken account of.
:param ideal_name: the desired name for the dataset, if its not already
been taken (usually derived by munging the dataset
title)
:type ideal_name: string
:param existing_name: the current name of the dataset - only specify
this if the dataset exists
:type existing_name: string
:param append_type: the type of characters to add to make it unique -
either 'number-sequence' or 'random-hex'.
:type append_type: string
'''
ideal_name = ideal_name[:PACKAGE_NAME_MAX_LENGTH]
if existing_name == ideal_name:
return ideal_name
if append_type == 'number-sequence':
MAX_NUMBER_APPENDED = 999
APPEND_MAX_CHARS = len(str(MAX_NUMBER_APPENDED))
elif append_type == 'random-hex':
APPEND_MAX_CHARS = 5 # 16^5 = 1 million combinations
else:
return name
raise NotImplementedError('append_type cannot be %s' % append_type)
# Find out which package names have been taken. Restrict it to names
# derived from the ideal name plus and numbers added
like_q = u'%s%%' % \
ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS]
name_results = Session.query(Package.name)\
.filter(Package.name.ilike(like_q))\
.all()
taken = set([name_result[0] for name_result in name_results])
if existing_name and existing_name in taken:
taken.remove(existing_name)
if ideal_name not in taken:
# great, the ideal name is available
return ideal_name
elif existing_name and existing_name.startswith(ideal_name):
# the ideal name is not available, but its an existing dataset with
# a name based on the ideal one, so there's no point changing it to
# a different number
return existing_name
elif append_type == 'number-sequence':
# find the next available number
counter = 1
while counter <= MAX_NUMBER_APPENDED:
candidate_name = \
ideal_name[:PACKAGE_NAME_MAX_LENGTH-len(str(counter))] + \
str(counter)
if candidate_name not in taken:
return candidate_name
counter = counter + 1
return None
elif append_type == 'random-hex':
return ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS] + \
str(uuid.uuid4())[:APPEND_MAX_CHARS]
def _save_gather_error(self, message, job):

View File

@ -30,10 +30,22 @@ def harvest_source_show(context,data_dict):
:param id: the id or name of the harvest source
:type id: string
:param url: url of the harvest source (as an alternative to the id)
:type url: string
:returns: harvest source metadata
:rtype: dictionary
'''
model = context.get('model')
# Find the source by URL
if data_dict.get('url') and not data_dict.get('id'):
source = model.Session.query(harvest_model.HarvestSource) \
.filter_by(url=data_dict.get('url')) \
.first()
if not source:
raise NotFound
data_dict['id'] = source.id
source_dict = logic.get_action('package_show')(context, data_dict)

View File

@ -140,14 +140,22 @@ def harvest_source_clear(context,data_dict):
delete from resource_group where package_id in
(select id from package where state = 'to_delete');
'''
# CKAN pre-2.5: authz models were removed in migration 078
if toolkit.check_ckan_version(max_version='2.4.99'):
sql += '''
delete from package_role where package_id in
(select id from package where state = 'to_delete');
delete from user_object_role where id not in
(select user_object_role_id from package_role) and context = 'Package';
'''
sql += '''
delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
delete from package_role where package_id in (select id from package where state = 'to_delete' );
delete from user_object_role where id not in (select user_object_role_id from package_role) and context = 'Package';
delete from package_tag_revision where package_id in (select id from package where state = 'to_delete');
delete from member_revision where table_id in (select id from package where state = 'to_delete');
delete from package_extra_revision where package_id in (select id from package where state = 'to_delete');

View File

@ -39,49 +39,51 @@ harvest_gather_error_table = None
harvest_object_error_table = None
harvest_object_extra_table = None
def setup():
if harvest_source_table is None:
define_harvester_tables()
log.debug('Harvest tables defined in memory')
if model.package_table.exists():
if not harvest_source_table.exists():
# Create each table individually rather than
# using metadata.create_all()
harvest_source_table.create()
harvest_job_table.create()
harvest_object_table.create()
harvest_gather_error_table.create()
harvest_object_error_table.create()
harvest_object_extra_table.create()
log.debug('Harvest tables created')
else:
from ckan.model.meta import engine
log.debug('Harvest tables already exist')
# Check if existing tables need to be updated
inspector = Inspector.from_engine(engine)
columns = inspector.get_columns('harvest_source')
if not 'title' in [column['name'] for column in columns]:
log.debug('Harvest tables need to be updated')
migrate_v2()
if not 'frequency' in [column['name'] for column in columns]:
log.debug('Harvest tables need to be updated')
migrate_v3()
# Check if this instance has harvest source datasets
source_ids = Session.query(HarvestSource.id).filter_by(active=True).all()
source_package_ids = Session.query(model.Package.id).filter_by(type=u'harvest', state='active').all()
sources_to_migrate = set(source_ids) - set(source_package_ids)
if sources_to_migrate:
log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate))
sources_to_migrate = [s[0] for s in sources_to_migrate]
migrate_v3_create_datasets(sources_to_migrate)
else:
if not model.package_table.exists():
log.debug('Harvest table creation deferred')
return
if not harvest_source_table.exists():
# Create each table individually rather than
# using metadata.create_all()
harvest_source_table.create()
harvest_job_table.create()
harvest_object_table.create()
harvest_gather_error_table.create()
harvest_object_error_table.create()
harvest_object_extra_table.create()
log.debug('Harvest tables created')
else:
from ckan.model.meta import engine
log.debug('Harvest tables already exist')
# Check if existing tables need to be updated
inspector = Inspector.from_engine(engine)
columns = inspector.get_columns('harvest_source')
column_names = [column['name'] for column in columns]
if not 'title' in column_names:
log.debug('Harvest tables need to be updated')
migrate_v2()
if not 'frequency' in column_names:
log.debug('Harvest tables need to be updated')
migrate_v3()
# Check if this instance has harvest source datasets
source_ids = Session.query(HarvestSource.id).filter_by(active=True).all()
source_package_ids = Session.query(model.Package.id).filter_by(type=u'harvest', state='active').all()
sources_to_migrate = set(source_ids) - set(source_package_ids)
if sources_to_migrate:
log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate))
sources_to_migrate = [s[0] for s in sources_to_migrate]
migrate_v3_create_datasets(sources_to_migrate)
class HarvestError(Exception):
@ -196,12 +198,21 @@ def define_harvester_tables():
Column('gather_finished', types.DateTime),
Column('finished', types.DateTime),
Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
# status: New, Running, Finished
Column('status', types.UnicodeText, default=u'New', nullable=False),
)
# Was harvested_document
# A harvest_object contains a representation of one dataset during a
# particular harvest
harvest_object_table = Table('harvest_object', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
# The guid is the 'identity' of the dataset, according to the source.
# So if you reharvest it, then the harvester knows which dataset to
# update because of this identity. The identity needs to be unique
# within this CKAN.
Column('guid', types.UnicodeText, default=u''),
# When you harvest a dataset multiple times, only the latest
# successfully imported harvest_object should be flagged 'current'.
# The import_stage reads and writes it.
Column('current',types.Boolean,default=False),
Column('gathered', types.DateTime, default=datetime.datetime.utcnow),
Column('fetch_started', types.DateTime),
@ -209,6 +220,7 @@ def define_harvester_tables():
Column('fetch_finished', types.DateTime),
Column('import_started', types.DateTime),
Column('import_finished', types.DateTime),
# state: WAITING, FETCH, IMPORT, COMPLETE, ERROR
Column('state', types.UnicodeText, default=u'WAITING'),
Column('metadata_modified_date', types.DateTime),
Column('retry_times',types.Integer, default=0),
@ -391,9 +403,11 @@ ALTER TABLE harvest_object_extra
ALTER TABLE harvest_object_extra
ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id);
UPDATE harvest_object set state = 'COMPLETE';
UPDATE harvest_object set state = 'COMPLETE' where package_id is not null;
UPDATE harvest_object set state = 'ERROR' where package_id is null;
UPDATE harvest_object set retry_times = 0;
UPDATE harvest_object set report_status = 'new';
UPDATE harvest_object set report_status = 'updated' where package_id is not null;
UPDATE harvest_object set report_status = 'errored' where package_id is null;
UPDATE harvest_source set frequency = 'MANUAL';
ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey;

View File

@ -82,10 +82,13 @@ def purge_queues():
if backend in ('amqp', 'ampq'):
channel = connection.channel()
channel.queue_purge(queue=get_gather_queue_name())
log.info('AMQP queue purged: %s', get_gather_queue_name())
channel.queue_purge(queue=get_fetch_queue_name())
log.info('AMQP queue purged: %s', get_fetch_queue_name())
return
if backend == 'redis':
connection.flushall()
connection.flushdb()
log.info('Redis database flushed')
def resubmit_jobs():
if config.get('ckan.harvest.mq.type') != 'redis':
@ -95,7 +98,7 @@ def resubmit_jobs():
for key in harvest_object_pending:
date_of_key = datetime.datetime.strptime(redis.get(key),
"%Y-%m-%d %H:%M:%S.%f")
if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max
if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max
redis.rpush('harvest_object_id',
json.dumps({'harvest_object_id': key.split(':')[-1]})
)
@ -177,7 +180,7 @@ class RedisConsumer(object):
def basic_ack(self, message):
self.redis.delete(self.persistance_key(message))
def queue_purge(self, queue):
self.redis.flushall()
self.redis.flushdb()
def basic_get(self, queue):
body = self.redis.lpop(self.routing_key)
return (FakeMethod(body), self, body)
@ -223,23 +226,12 @@ def gather_callback(channel, method, header, body):
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == job.source.type:
harvester_found = True
# Get a list of harvest object ids from the plugin
job.gather_started = datetime.datetime.utcnow()
try:
harvest_object_ids = harvester.gather_stage(job)
harvest_object_ids = gather_stage(harvester, job)
except (Exception, KeyboardInterrupt):
channel.basic_ack(method.delivery_tag)
harvest_objects = model.Session.query(HarvestObject).filter_by(
harvest_job_id=job.id
)
for harvest_object in harvest_objects:
model.Session.delete(harvest_object)
model.Session.commit()
raise
finally:
job.gather_finished = datetime.datetime.utcnow()
job.save()
if not isinstance(harvest_object_ids, list):
log.error('Gather stage failed')
@ -261,7 +253,12 @@ def gather_callback(channel, method, header, body):
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))
if not harvester_found:
msg = 'No harvester could be found for source type %s' % job.source.type
# This can occur if you:
# * remove a harvester and it still has sources that are then
# refreshed
# * add a new harvester and restart CKAN but not the gather
# queue.
msg = 'System error - No harvester could be found for source type %s' % job.source.type
err = HarvestGatherError(message=msg,job=job)
err.save()
log.error(msg)
@ -271,6 +268,31 @@ def gather_callback(channel, method, header, body):
channel.basic_ack(method.delivery_tag)
def gather_stage(harvester, job):
'''Calls the harvester's gather_stage, returning harvest object ids, with
some error handling.
This is split off from gather_callback so that tests can call it without
dealing with queue stuff.
'''
job.gather_started = datetime.datetime.utcnow()
try:
harvest_object_ids = harvester.gather_stage(job)
except (Exception, KeyboardInterrupt):
harvest_objects = model.Session.query(HarvestObject).filter_by(
harvest_job_id=job.id
)
for harvest_object in harvest_objects:
model.Session.delete(harvest_object)
model.Session.commit()
raise
finally:
job.gather_finished = datetime.datetime.utcnow()
job.save()
return harvest_object_ids
def fetch_callback(channel, method, header, body):
try:
id = json.loads(body)['harvest_object_id']
@ -280,7 +302,6 @@ def fetch_callback(channel, method, header, body):
channel.basic_ack(method.delivery_tag)
return False
obj = HarvestObject.get(id)
if not obj:
log.error('Harvest object does not exist: %s' % id)

View File

@ -1,14 +1,92 @@
import factory
from ckanext.harvest.model import HarvestSource, HarvestJob
import ckanext.harvest.model as harvest_model
try:
from ckan.tests.factories import _get_action_user_name
except ImportError:
from ckan.new_tests.factories import _get_action_user_name
from ckan.plugins import toolkit
class HarvestSourceFactory(factory.Factory):
FACTORY_FOR = HarvestSource
url = "http://harvest.test.com"
type = "test-harvest-source"
class HarvestSource(factory.Factory):
FACTORY_FOR = harvest_model.HarvestSource
_return_type = 'dict'
class HarvestJobFactory(factory.Factory):
FACTORY_FOR = HarvestJob
name = factory.Sequence(lambda n: 'test_source_{n}'.format(n=n))
title = factory.Sequence(lambda n: 'test title {n}'.format(n=n))
url = factory.Sequence(lambda n: 'http://{n}.test.com'.format(n=n))
source_type = 'test' # defined in test_queue.py
id = '{0}_id'.format(name).lower()
status = "New"
source = factory.SubFactory(HarvestSourceFactory)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
# If there is an existing source for this URL, and we can't create
# another source with that URL, just return the original one.
try:
source_dict = toolkit.get_action('harvest_source_show')(
context, dict(url=kwargs['url']))
except toolkit.ObjectNotFound:
source_dict = toolkit.get_action('harvest_source_create')(
context, kwargs)
if cls._return_type == 'dict':
return source_dict
else:
return cls.FACTORY_FOR.get(source_dict['id'])
class HarvestSourceObj(HarvestSource):
_return_type = 'obj'
class HarvestJob(factory.Factory):
FACTORY_FOR = harvest_model.HarvestJob
_return_type = 'dict'
source = factory.SubFactory(HarvestSourceObj)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
if 'source_id' not in kwargs:
kwargs['source_id'] = kwargs['source'].id
job_dict = toolkit.get_action('harvest_job_create')(
context, kwargs)
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
class HarvestJobObj(HarvestJob):
_return_type = 'obj'
class HarvestObject(factory.Factory):
FACTORY_FOR = harvest_model.HarvestObject
_return_type = 'dict'
#source = factory.SubFactory(HarvestSourceObj)
job = factory.SubFactory(HarvestJobObj)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
if 'job_id' not in kwargs:
kwargs['job_id'] = kwargs['job'].id
kwargs['source_id'] = kwargs['job'].source.id
job_dict = toolkit.get_action('harvest_object_create')(
context, kwargs)
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
class HarvestObjectObj(HarvestObject):
_return_type = 'obj'

View File

@ -0,0 +1,458 @@
import json
import re
import copy
import SimpleHTTPServer
import SocketServer
from threading import Thread
PORT = 8998
class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def do_GET(self):
# test name is the first bit of the URL and makes CKAN behave
# differently in some way.
# Its value is recorded and then removed from the path
self.test_name = None
test_name_match = re.match('^/([^/]+)/', self.path)
if test_name_match:
self.test_name = test_name_match.groups()[0]
if self.test_name == 'api':
self.test_name = None
else:
self.path = re.sub('^/([^/]+)/', '/', self.path)
# The API version is recorded and then removed from the path
api_version = None
version_match = re.match('^/api/(\d)', self.path)
if version_match:
api_version = int(version_match.groups()[0])
self.path = re.sub('^/api/(\d)/', '/api/', self.path)
if self.path == '/api/rest/package':
if api_version == 2:
dataset_refs = [d['id'] for d in DATASETS]
else:
dataset_refs = [d['name'] for d in DATASETS]
return self.respond_json(dataset_refs)
if self.path.startswith('/api/rest/package/'):
dataset_ref = self.path.split('/')[-1]
dataset = self.get_dataset(dataset_ref)
if dataset:
return self.respond_json(
convert_dataset_to_restful_form(dataset))
if self.path.startswith('/api/action/package_show'):
params = self.get_url_params()
dataset_ref = params['id']
dataset = self.get_dataset(dataset_ref)
if dataset:
return self.respond_json(dataset)
if self.path.startswith('/api/search/dataset'):
params = self.get_url_params()
if params.keys() == ['organization']:
org = self.get_org(params['organization'])
dataset_ids = [d['id'] for d in DATASETS
if d['owner_org'] == org['id']]
return self.respond_json({'count': len(dataset_ids),
'results': dataset_ids})
else:
return self.respond(
'Not implemented search params %s' % params, status=400)
if self.path.startswith('/api/search/revision'):
revision_ids = [r['id'] for r in REVISIONS]
return self.respond_json(revision_ids)
if self.path.startswith('/api/rest/revision/'):
revision_ref = self.path.split('/')[-1]
for rev in REVISIONS:
if rev['id'] == revision_ref:
return self.respond_json(rev)
self.respond('Cannot find revision', status=404)
# if we wanted to server a file from disk, then we'd call this:
#return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
self.respond('Mock CKAN doesnt recognize that call', status=400)
def get_dataset(self, dataset_ref):
for dataset in DATASETS:
if dataset['name'] == dataset_ref or \
dataset['id'] == dataset_ref:
if self.test_name == 'invalid_tag':
dataset['tags'] = INVALID_TAGS
return dataset
def get_org(self, org_ref):
for org in ORGS:
if org['name'] == org_ref or \
org['id'] == org_ref:
return org
def get_url_params(self):
params = self.path.split('?')[-1].split('&')
return dict([param.split('=') for param in params])
def respond_action(self, result_dict, status=200):
response_dict = {'result': result_dict}
return self.respond_json(response_dict, status=status)
def respond_json(self, content_dict, status=200):
return self.respond(json.dumps(content_dict), status=status,
content_type='application/json')
def respond(self, content, status=200, content_type='application/json'):
self.send_response(status)
self.send_header('Content-Type', content_type)
self.end_headers()
self.wfile.write(content)
self.wfile.close()
def serve(port=PORT):
'''Runs a CKAN-alike app (over HTTP) that is used for harvesting tests'''
# Choose the directory to serve files from
#os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)),
# 'mock_ckan_files'))
class TestServer(SocketServer.TCPServer):
allow_reuse_address = True
httpd = TestServer(("", PORT), MockCkanHandler)
print 'Serving test HTTP server at port', PORT
httpd_thread = Thread(target=httpd.serve_forever)
httpd_thread.setDaemon(True)
httpd_thread.start()
def convert_dataset_to_restful_form(dataset):
dataset = copy.deepcopy(dataset)
dataset['extras'] = dict([(e['key'], e['value']) for e in dataset['extras']])
dataset['tags'] = [t['name'] for t in dataset.get('tags', [])]
return dataset
# Datasets are in the package_show form, rather than the RESTful form
DATASETS = [
{'id': 'dataset1-id',
'name': 'dataset1',
'title': 'Test Dataset1',
'owner_org': 'org1-id',
'extras': []},
{
"id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"name": "cabinet-office-energy-use",
"private": False,
"maintainer_email": None,
"revision_timestamp": "2010-11-23T22:34:55.089925",
"organization":
{
"description": "The Cabinet Office supports the Prime Minister and Deputy Prime Minister, and ensure the effective running of government. We are also the corporate headquarters for government, in partnership with HM Treasury, and we take the lead in certain critical policy areas.\r\nCO is a ministerial department, supported by 18 agencies and public bodies\r\n\r\nYou can find out more at https://www.gov.uk/government/organisations/cabinet-office",
"created": "2012-06-27T14:48:40.244951",
"title": "Cabinet Office",
"name": "cabinet-office",
"revision_timestamp": "2013-04-02T14:27:23.086886",
"is_organization": True,
"state": "active",
"image_url": "",
"revision_id": "4be8825d-d3f4-4fb2-b80b-43e36f574c05",
"type": "organization",
"id": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"approval_status": "pending"
},
"update_frequency": "other",
"metadata_created": "2010-08-02T09:19:47.600853",
"last_major_modification": "2010-08-02T09:19:47.600853",
"metadata_modified": "2014-05-09T22:00:01.486366",
"temporal_granularity": "",
"author_email": None,
"geographic_granularity": "point",
"geographic_coverage": [ ],
"state": "active",
"version": None,
"temporal_coverage-to": "",
"license_id": "uk-ogl",
"type": "dataset",
"published_via": "",
"resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"num_resources": 1,
"tags":
[
{
"vocabulary_id": None,
"display_name": "consumption",
"name": "consumption",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8"
},
{
"vocabulary_id": None,
"display_name": "energy",
"name": "energy",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "9f2ae723-602f-4290-80c4-6637ad617a45"
}
],
"precision": "",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"taxonomy_url": "",
"groups": [],
"creator_user_id": None,
"national_statistic": "no",
"relationships_as_subject": [],
"num_tags": 8,
"update_frequency-other": "Real-time",
"isopen": True,
"url": "http://www.carbonculture.net/orgs/cabinet-office/70-whitehall/",
"notes": "Cabinet Office head office energy use updated from on-site meters showing use, cost and carbon impact.",
"owner_org": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"theme-secondary":
[
"Environment"
],
"extras":
[
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "categories",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "6813d71b-785b-4f56-b296-1b2acb34eed6"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "2010-07-30",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_released",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "515f638b-e2cf-40a6-a8a7-cbc8001269e3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_updated",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "bff63465-4f96-44e7-bb87-6e66fff5e596"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "000000: ",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_coverage",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "414bcd35-b628-4218-99e2-639615183df8"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "point",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "c7b460dd-c61f-4cd2-90c2-eceb6c91fe9b"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "no",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "national_statistic",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "9f04b202-3646-49be-b69e-7fa997399ff3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "{\"status\": \"final\", \"source\": \"Automatically awarded by ODI\", \"certification_type\": \"automatically awarded\", \"level\": \"raw\", \"title\": \"Cabinet Office 70 Whitehall energy use\", \"created_at\": \"2014-10-28T12:25:57Z\", \"jurisdiction\": \"GB\", \"certificate_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922\", \"badge_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922/badge.png\", \"cert_title\": \"Basic Level Certificate\"}",
"revision_timestamp": "2014-11-12T02:52:35.048060",
"state": "active",
"key": "odi-certificate",
"revision_id": "eae9763b-e258-4d76-9ec2-7f5baf655394",
"id": "373a3cbb-d9c0-45a6-9a78-b95c86398766"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-from",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "39f72eed-6f76-4733-b636-7541cee3404f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-to",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "818e2c8f-fee0-49da-8bea-ea3c9401ece5"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "f868b950-d3ce-4fbe-88ca-5cbc4b672320"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Towns & Cities",
"revision_timestamp": "2015-03-16T18:10:08.802815",
"state": "active",
"key": "theme-primary",
"revision_id": "fc2b6630-84f8-4c88-8ac7-0ca275b2bc97",
"id": "bdcf00fd-3248-4c2f-9cf8-b90706c88e8d"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "[\"Environment\"]",
"revision_timestamp": "2015-04-08T20:57:04.895214",
"state": "active",
"key": "theme-secondary",
"revision_id": "c2c48530-ff82-4af1-9373-cdc64d5bc83c",
"id": "417482c5-a9c0-4430-8c4e-0c76e59fe44f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Real-time",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "update_frequency",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "e8ad4837-514e-4446-81a2-ffacfa7cf683"
}
],
"license_url": "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/",
"individual_resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"title": "Cabinet Office 70 Whitehall energy use",
"revision_id": "3bd6ced3-35b2-4b20-94e2-c596e24bc375",
"date_released": "30/7/2010",
"theme-primary": "Towns & Cities"
}
]
INVALID_TAGS = [
{
"vocabulary_id": None,
"display_name": "consumption%^&",
"name": "consumption%^&",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8"
},
]
ORGS = [
{'id': 'org1-id',
'name': 'org1'},
{'id': 'aa1e068a-23da-4563-b9c2-2cad272b663e',
'name': 'cabinet-office'}
]
REVISIONS = [
{
"id": "23daf2eb-d7ec-4d86-a844-3924acd311ea",
"timestamp": "2015-10-21T09:50:08.160045",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
"dataset1"
],
"groups": [ ]
},
{
"id": "8254a293-10db-4af2-9dfa-6a1f06ee899c",
"timestamp": "2015-10-21T09:46:21.198021",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
"dataset1"
],
"groups": [ ]
}]

View File

@ -0,0 +1,96 @@
import re
from nose.tools import assert_equal
from ckanext.harvest.harvesters.base import HarvesterBase
try:
from ckan.tests import helpers
from ckan.tests import factories
except ImportError:
from ckan.new_tests import helpers
from ckan.new_tests import factories
_ensure_name_is_unique = HarvesterBase._ensure_name_is_unique
class TestGenNewName(object):
@classmethod
def setup_class(cls):
helpers.reset_db()
def test_basic(self):
assert_equal(HarvesterBase._gen_new_name('Trees'), 'trees')
def test_munge(self):
assert_equal(
HarvesterBase._gen_new_name('Trees and branches - survey.'),
'trees-and-branches-survey')
class TestEnsureNameIsUnique(object):
def setup(self):
helpers.reset_db()
def test_no_existing_datasets(self):
factories.Dataset(name='unrelated')
assert_equal(_ensure_name_is_unique('trees'), 'trees')
def test_existing_dataset(self):
factories.Dataset(name='trees')
assert_equal(_ensure_name_is_unique('trees'), 'trees1')
def test_two_existing_datasets(self):
factories.Dataset(name='trees')
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('trees'), 'trees2')
def test_no_existing_datasets_and_long_name(self):
assert_equal(_ensure_name_is_unique('x'*101), 'x'*100)
def test_existing_dataset_and_long_name(self):
# because PACKAGE_NAME_MAX_LENGTH = 100
factories.Dataset(name='x'*100)
assert_equal(_ensure_name_is_unique('x'*101), 'x'*99 + '1')
def test_update_dataset_with_new_name(self):
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('tree', existing_name='trees1'),
'tree')
def test_update_dataset_but_with_same_name(self):
# this can happen if you remove a trailing space from the title - the
# harvester sees the title changed and thinks it should have a new
# name, but clearly it can reuse its existing one
factories.Dataset(name='trees')
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('trees', existing_name='trees'),
'trees')
def test_update_dataset_to_available_shorter_name(self):
# this can be handy when if reharvesting, you got duplicates and
# managed to purge one set and through a minor title change you can now
# lose the appended number. users don't like unnecessary numbers.
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('trees', existing_name='trees1'),
'trees')
def test_update_dataset_but_doesnt_change_to_other_number(self):
# there's no point changing one number for another though
factories.Dataset(name='trees')
factories.Dataset(name='trees2')
assert_equal(_ensure_name_is_unique('trees', existing_name='trees2'),
'trees2')
def test_update_dataset_with_new_name_with_numbers(self):
factories.Dataset(name='trees')
factories.Dataset(name='trees2')
factories.Dataset(name='frogs')
assert_equal(_ensure_name_is_unique('frogs', existing_name='trees2'),
'frogs1')
def test_existing_dataset_appending_hex(self):
factories.Dataset(name='trees')
name = _ensure_name_is_unique('trees', append_type='random-hex')
# e.g. 'trees0b53f'
assert re.match('trees[\da-f]{5}', name)

View File

@ -0,0 +1,118 @@
from nose.tools import assert_equal
import json
try:
from ckan.tests.helpers import reset_db
from ckan.tests.factories import Organization
except ImportError:
from ckan.new_tests.helpers import reset_db
from ckan.new_tests.factories import Organization
from ckan import model
from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj,
HarvestObjectObj)
from ckanext.harvest.tests.lib import run_harvest, run_harvest_job
import ckanext.harvest.model as harvest_model
from ckanext.harvest.harvesters.ckanharvester import CKANHarvester
import mock_ckan
# Start CKAN-alike server we can test harvesting against it
mock_ckan.serve()
class TestCkanHarvester(object):
@classmethod
def setup(cls):
reset_db()
harvest_model.setup()
def test_gather_normal(self):
source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT)
job = HarvestJobObj(source=source)
harvester = CKANHarvester()
obj_ids = harvester.gather_stage(job)
assert_equal(type(obj_ids), list)
assert_equal(len(obj_ids), len(mock_ckan.DATASETS))
harvest_object = harvest_model.HarvestObject.get(obj_ids[0])
assert_equal(harvest_object.guid, mock_ckan.DATASETS[0]['id'])
def test_fetch_normal(self):
source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT)
job = HarvestJobObj(source=source)
harvest_object = HarvestObjectObj(guid=mock_ckan.DATASETS[0]['id'],
job=job)
harvester = CKANHarvester()
result = harvester.fetch_stage(harvest_object)
assert_equal(result, True)
assert_equal(
harvest_object.content,
json.dumps(
mock_ckan.convert_dataset_to_restful_form(
mock_ckan.DATASETS[0])))
def test_import_normal(self):
org = Organization()
harvest_object = HarvestObjectObj(
guid=mock_ckan.DATASETS[0]['id'],
content=json.dumps(mock_ckan.convert_dataset_to_restful_form(
mock_ckan.DATASETS[0])),
job__source__owner_org=org['id'])
harvester = CKANHarvester()
result = harvester.import_stage(harvest_object)
assert_equal(result, True)
assert harvest_object.package_id
dataset = model.Package.get(harvest_object.package_id)
assert_equal(dataset.name, mock_ckan.DATASETS[0]['name'])
def test_harvest(self):
results_by_guid = run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
result = results_by_guid['dataset1-id']
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name'])
assert_equal(result['errors'], [])
result = results_by_guid[mock_ckan.DATASETS[1]['id']]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name'])
assert_equal(result['errors'], [])
def test_harvest_twice(self):
run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
results_by_guid = run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
# updated the dataset which has revisions
result = results_by_guid['dataset1']
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'updated')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name'])
assert_equal(result['errors'], [])
# the other dataset is unchanged and not harvested
assert mock_ckan.DATASETS[1]['name'] not in result
def test_harvest_invalid_tag(self):
from nose.plugins.skip import SkipTest; raise SkipTest()
results_by_guid = run_harvest(
url='http://localhost:%s/invalid_tag' % mock_ckan.PORT,
harvester=CKANHarvester())
result = results_by_guid['dataset1-id']
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name'])

View File

@ -0,0 +1,56 @@
from ckanext.harvest.tests.factories import HarvestSourceObj, HarvestJobObj
import ckanext.harvest.model as harvest_model
from ckanext.harvest.logic import NoNewHarvestJobError
from ckanext.harvest import queue
from ckan.plugins import toolkit
def run_harvest(url, harvester, config=''):
'''Runs a harvest and returns the results.
This allows you to test a harvester.
Queues are avoided as they are a pain in tests.
'''
# User creates a harvest source
source = HarvestSourceObj(url=url, config=config)
# User triggers a harvest, which is the creation of a harvest job
job = HarvestJobObj(source=source)
return run_harvest_job(job, harvester)
def run_harvest_job(job, harvester):
# When 'paster harvest run' is called by the regular cron it does 2 things:
# 1. change the job status to Running
job.status = 'Running'
job.save()
# 2. put the job on the gather queue which is consumed by
# queue.gather_callback, which determines the harvester and then calls
# gather_stage. We simply call the gather_stage.
obj_ids = queue.gather_stage(harvester, job)
# The object ids are put onto the fetch queue, consumed by
# queue.fetch_callback which calls queue.fetch_and_import_stages
results_by_guid = {}
for obj_id in obj_ids:
harvest_object = harvest_model.HarvestObject.get(obj_id)
guid = harvest_object.guid
results_by_guid[guid] = {'obj_id': obj_id}
queue.fetch_and_import_stages(harvester, harvest_object)
results_by_guid[guid]['state'] = harvest_object.state
results_by_guid[guid]['report_status'] = harvest_object.report_status
if harvest_object.state == 'COMPLETE' and harvest_object.package_id:
results_by_guid[guid]['dataset'] = \
toolkit.get_action('package_show')(
{}, dict(id=harvest_object.package_id))
results_by_guid[guid]['errors'] = harvest_object.errors
# Do 'harvest_jobs_run' to change the job status to 'finished'
try:
toolkit.get_action('harvest_jobs_run')({'ignore_auth': True}, {})
except NoNewHarvestJobError:
# This is expected
pass
return results_by_guid

View File

@ -1,19 +1,23 @@
import json
import copy
import ckan
import paste
import pylons.test
import factories
import unittest
from ckan.lib.create_test_data import CreateTestData
import ckan.new_tests.helpers as helpers
try:
from ckan.tests import factories as ckan_factories
from ckan.tests.helpers import _get_test_app, reset_db
except ImportError:
from ckan.new_tests import factories as ckan_factories
from ckan.new_tests.helpers import _get_test_app, reset_db
from ckan import plugins as p
from ckan.plugins import toolkit
from ckan import model
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
def call_action_api(app, action, apikey=None, status=200, **kwargs):
def call_action_api(action, apikey=None, status=200, **kwargs):
'''POST an HTTP request to the CKAN API and return the result.
Any additional keyword arguments that you pass to this function as **kwargs
@ -21,7 +25,7 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs):
Usage:
package_dict = post(app, 'package_create', apikey=apikey,
package_dict = call_action_api('package_create', apikey=apikey,
name='my_package')
assert package_dict['name'] == 'my_package'
@ -31,13 +35,10 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs):
of the error dict, you have to use the status param otherwise an exception
will be raised:
error_dict = post(app, 'group_activity_list', status=403,
error_dict = call_action_api('group_activity_list', status=403,
id='invalid_id')
assert error_dict['message'] == 'Access Denied'
:param app: the test app to post to
:type app: paste.fixture.TestApp
:param action: the action to post to, e.g. 'package_create'
:type action: string
@ -62,8 +63,10 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs):
'''
params = json.dumps(kwargs)
app = _get_test_app()
response = app.post('/api/action/{0}'.format(action), params=params,
extra_environ={'Authorization': str(apikey)}, status=status)
extra_environ={'Authorization': str(apikey)},
status=status)
if status in (200,):
assert response.json['success'] is True
@ -75,10 +78,13 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs):
class MockHarvesterForActionTests(p.SingletonPlugin):
p.implements(IHarvester)
def info(self):
return {'name': 'test-for-action', 'title': 'Test for action', 'description': 'test'}
def validate_config(self,config):
def info(self):
return {'name': 'test-for-action',
'title': 'Test for action',
'description': 'test'}
def validate_config(self, config):
if not config:
return config
@ -86,10 +92,10 @@ class MockHarvesterForActionTests(p.SingletonPlugin):
config_obj = json.loads(config)
if 'custom_option' in config_obj:
if not isinstance(config_obj['custom_option'],list):
if not isinstance(config_obj['custom_option'], list):
raise ValueError('custom_option must be a list')
except ValueError,e:
except ValueError, e:
raise e
return config
@ -103,56 +109,63 @@ class MockHarvesterForActionTests(p.SingletonPlugin):
def import_stage(self, harvest_object):
return True
class HarvestSourceActionBase(object):
class FunctionalTestBaseWithoutClearBetweenTests(object):
''' Functional tests should normally derive from
ckan.lib.helpers.FunctionalTestBase, but these are legacy tests so this
class is a compromise. This version doesn't call reset_db before every
test, because these tests are designed with fixtures created in
setup_class.'''
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
CreateTestData.create()
sysadmin_user = ckan.model.User.get('testsysadmin')
cls.sysadmin = {
'id': sysadmin_user.id,
'apikey': sysadmin_user.apikey,
'name': sysadmin_user.name,
}
@classmethod
def teardown_class(cls):
pass
cls.app = paste.fixture.TestApp(pylons.test.pylonsapp)
class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests):
cls.default_source_dict = {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action",
"frequency": "MANUAL",
"config": json.dumps({"custom_option":["a","b"]})
@classmethod
def setup_class(cls):
super(HarvestSourceActionBase, cls).setup_class()
harvest_model.setup()
cls.sysadmin = ckan_factories.Sysadmin()
cls.default_source_dict = {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
if not p.plugin_loaded('test_action_harvester'):
p.load('test_action_harvester')
@classmethod
def teardown_class(cls):
ckan.model.repo.rebuild_db()
super(HarvestSourceActionBase, cls).teardown_class()
p.unload('test_action_harvester')
def teardown(self):
pass
def test_invalid_missing_values(self):
source_dict = {}
if 'id' in self.default_source_dict:
source_dict['id'] = self.default_source_dict['id']
result = call_action_api(self.app, self.action,
apikey=self.sysadmin['apikey'], status=409, **source_dict)
result = call_action_api(self.action,
apikey=self.sysadmin['apikey'], status=409,
**source_dict)
for key in ('name','title','url','source_type'):
for key in ('name', 'title', 'url', 'source_type'):
assert result[key] == [u'Missing value']
def test_invalid_unknown_type(self):
@ -160,8 +173,9 @@ class HarvestSourceActionBase(object):
source_dict = copy.deepcopy(self.default_source_dict)
source_dict['source_type'] = 'unknown'
result = call_action_api(self.app, self.action,
apikey=self.sysadmin['apikey'], status=409, **source_dict)
result = call_action_api(self.action,
apikey=self.sysadmin['apikey'], status=409,
**source_dict)
assert 'source_type' in result
assert u'Unknown harvester type' in result['source_type'][0]
@ -171,8 +185,9 @@ class HarvestSourceActionBase(object):
source_dict = copy.deepcopy(self.default_source_dict)
source_dict['frequency'] = wrong_frequency
result = call_action_api(self.app, self.action,
apikey=self.sysadmin['apikey'], status=409, **source_dict)
result = call_action_api(self.action,
apikey=self.sysadmin['apikey'], status=409,
**source_dict)
assert 'frequency' in result
assert u'Frequency {0} not recognised'.format(wrong_frequency) in result['frequency'][0]
@ -182,16 +197,18 @@ class HarvestSourceActionBase(object):
source_dict = copy.deepcopy(self.default_source_dict)
source_dict['config'] = 'not_json'
result = call_action_api(self.app, self.action,
apikey=self.sysadmin['apikey'], status=409, **source_dict)
result = call_action_api(self.action,
apikey=self.sysadmin['apikey'], status=409,
**source_dict)
assert 'config' in result
assert u'Error parsing the configuration options: No JSON object could be decoded' in result['config'][0]
source_dict['config'] = json.dumps({'custom_option': 'not_a_list'})
result = call_action_api(self.app, self.action,
apikey=self.sysadmin['apikey'], status=409, **source_dict)
result = call_action_api(self.action,
apikey=self.sysadmin['apikey'], status=409,
**source_dict)
assert 'config' in result
assert u'Error parsing the configuration options: custom_option must be a list' in result['config'][0]
@ -202,14 +219,12 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase):
def __init__(self):
self.action = 'harvest_source_create'
def test_create(self):
source_dict = self.default_source_dict
result = call_action_api(self.app, 'harvest_source_create',
apikey=self.sysadmin['apikey'], **source_dict)
result = call_action_api('harvest_source_create',
apikey=self.sysadmin['apikey'], **source_dict)
for key in source_dict.keys():
assert source_dict[key] == result[key]
@ -219,18 +234,18 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase):
assert source.url == source_dict['url']
assert source.type == source_dict['source_type']
# Trying to create a source with the same URL fails
source_dict = copy.deepcopy(self.default_source_dict)
source_dict['name'] = 'test-source-action-new'
result = call_action_api(self.app, 'harvest_source_create',
apikey=self.sysadmin['apikey'], status=409, **source_dict)
result = call_action_api('harvest_source_create',
apikey=self.sysadmin['apikey'], status=409,
**source_dict)
assert 'url' in result
assert u'There already is a Harvest Source for this URL' in result['url'][0]
class TestHarvestSourceActionUpdate(HarvestSourceActionBase):
@classmethod
@ -242,8 +257,8 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase):
# Create a source to udpate
source_dict = cls.default_source_dict
result = call_action_api(cls.app, 'harvest_source_create',
apikey=cls.sysadmin['apikey'], **source_dict)
result = call_action_api('harvest_source_create',
apikey=cls.sysadmin['apikey'], **source_dict)
cls.default_source_dict['id'] = result['id']
@ -251,17 +266,17 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase):
source_dict = self.default_source_dict
source_dict.update({
"url": "http://test.action.updated.com",
"name": "test-source-action-updated",
"title": "Test source action updated",
"notes": "Test source action desc updated",
"source_type": "test",
"frequency": "MONTHLY",
"config": json.dumps({"custom_option":["c","d"]})
})
"url": "http://test.action.updated.com",
"name": "test-source-action-updated",
"title": "Test source action updated",
"notes": "Test source action desc updated",
"source_type": "test",
"frequency": "MONTHLY",
"config": json.dumps({"custom_option": ["c", "d"]})
})
result = call_action_api(self.app, 'harvest_source_update',
apikey=self.sysadmin['apikey'], **source_dict)
result = call_action_api('harvest_source_update',
apikey=self.sysadmin['apikey'], **source_dict)
for key in source_dict.keys():
assert source_dict[key] == result[key]
@ -271,29 +286,26 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase):
assert source.url == source_dict['url']
assert source.type == source_dict['source_type']
class TestHarvestObject(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
ckan.model.repo.rebuild_db()
def test_create(self):
job = factories.HarvestJobFactory()
job.save()
job = factories.HarvestJobObj()
context = {
'model' : ckan.model,
'session': ckan.model.Session,
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'guid' : 'guid',
'content' : 'content',
'job_id' : job.id,
'extras' : { 'a key' : 'a value' },
'guid': 'guid',
'content': 'content',
'job_id': job.id,
'extras': {'a key': 'a value'},
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
@ -303,26 +315,24 @@ class TestHarvestObject(unittest.TestCase):
assert created_object.guid == harvest_object['guid'] == data_dict['guid']
def test_create_bad_parameters(self):
source_a = factories.HarvestSourceFactory()
source_a.save()
job = factories.HarvestJobFactory()
job.save()
source_a = factories.HarvestSourceObj()
job = factories.HarvestJobObj()
context = {
'model' : ckan.model,
'session': ckan.model.Session,
'model': model,
'session': model.Session,
'ignore_auth': True,
}
data_dict = {
'job_id' : job.id,
'source_id' : source_a.id,
'extras' : 1
'job_id': job.id,
'source_id': source_a.id,
'extras': 1
}
harvest_object_create = toolkit.get_action('harvest_object_create')
self.assertRaises(ckan.logic.ValidationError, harvest_object_create,
context, data_dict)
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)
data_dict['extras'] = {'test': 1 }
data_dict['extras'] = {'test': 1}
self.assertRaises(ckan.logic.ValidationError, harvest_object_create,
context, data_dict)
self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict)

View File

@ -1,3 +1,7 @@
try:
from ckan.tests.helpers import reset_db
except ImportError:
from ckan.new_tests.helpers import reset_db
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from ckanext.harvest.interfaces import IHarvester
@ -82,13 +86,9 @@ class TestHarvester(SingletonPlugin):
class TestHarvestQueue(object):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
model.repo.rebuild_db()
def test_01_basic_harvester(self):
### make sure queues/exchanges are created first and are empty