Merge branch 'model_upgrade' into release-v2.0

This commit is contained in:
amercader 2012-10-30 18:07:24 +00:00
commit d7f8c9165c
14 changed files with 290 additions and 429 deletions

View File

@ -15,7 +15,7 @@ class Harvester(CkanCommand):
harvester initdb
- Creates the necessary tables in the database
harvester source {url} {type} [{active}] [{user-id}] [{publisher-id}]
harvester source {url} {type} [{config}] [{active}] [{user-id}] [{publisher-id}] [{frequency}]
- create new harvest source
harvester rmsource {id}
@ -64,7 +64,7 @@ class Harvester(CkanCommand):
summary = __doc__.split('\n')[0]
usage = __doc__
max_args = 6
max_args = 8
min_args = 0
def __init__(self,name):
@ -169,11 +169,18 @@ class Harvester(CkanCommand):
publisher_id = unicode(self.args[6])
else:
publisher_id = u''
if len(self.args) >= 8:
frequency = unicode(self.args[7])
if not frequency:
frequency = None
else:
frequency = None
try:
data_dict = {
'url':url,
'type':type,
'config':config,
'frequency':frequency,
'active':active,
'user_id':user_id,
'publisher_id':publisher_id}
@ -186,9 +193,11 @@ class Harvester(CkanCommand):
sources = get_action('harvest_source_list')(context,{})
self.print_there_are('harvest source', sources)
# Create a harvest job for the new source
get_action('harvest_job_create')(context,{'source_id':source['id']})
print 'A new Harvest Job for this source has also been created'
# Create a harvest job for the new source if not regular job.
if not data_dict['frequency']:
get_action('harvest_job_create')(context,{'source_id':source['id']})
print 'A new Harvest Job for this source has also been created'
except ValidationError,e:
print 'An error occurred:'
print str(e.error_dict)
@ -278,6 +287,7 @@ class Harvester(CkanCommand):
print ' active: %s' % source['active']
print ' user: %s' % source['user_id']
print 'publisher: %s' % source['publisher_id']
print 'frequency: %s' % source['frequency']
print ' jobs: %s' % source['status']['job_count']
print ''

View File

@ -5,3 +5,5 @@ except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)
class HarvestJobExists(Exception):
pass

View File

@ -2,6 +2,7 @@ import re
import logging
from ckan.logic import NotFound, ValidationError, check_access
from ckanext.harvest.logic import HarvestJobExists
from ckan.lib.navl.dictization_functions import validate
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
@ -32,7 +33,8 @@ def harvest_source_create(context,data_dict):
source.url = data['url'].strip()
source.type = data['type']
opt = ['active','title','description','user_id','publisher_id','config']
opt = ['active','title','description','user_id',
'publisher_id','config', 'frequency']
for o in opt:
if o in data and data[o] is not None:
source.__setattr__(o,data[o])
@ -45,6 +47,7 @@ def harvest_source_create(context,data_dict):
return harvest_source_dictize(source,context)
def harvest_job_create(context,data_dict):
log.info('Harvest job create: %r', data_dict)
check_access('harvest_job_create',context,data_dict)
@ -70,7 +73,7 @@ def harvest_job_create(context,data_dict):
exists = harvest_job_list(context,data_dict)
if len(exists):
log.warn('There is already an unrun job %r for this source %s', exists, source_id)
raise Exception('There already is an unrun job for this source')
raise HarvestJobExists('There already is an unrun job for this source')
job = HarvestJob()
job.source = source

View File

@ -2,6 +2,7 @@ import logging
from sqlalchemy import or_
from ckan.authz import Authorizer
from ckan.model import User
import datetime
from ckan.plugins import PluginImplementations
from ckanext.harvest.interfaces import IHarvester
@ -153,6 +154,7 @@ def _get_sources_for_user(context,data_dict):
user = context.get('user','')
only_active = data_dict.get('only_active',False)
only_to_run = data_dict.get('only_to_run',False)
query = session.query(HarvestSource) \
.order_by(HarvestSource.created.desc())
@ -160,6 +162,14 @@ def _get_sources_for_user(context,data_dict):
if only_active:
query = query.filter(HarvestSource.active==True) \
if only_to_run:
query = query.filter(or_(HarvestSource.frequency!=None,
HarvestSource.frequency!='')
)
query = query.filter(or_(HarvestSource.next_run<=datetime.datetime.utcnow(),
HarvestSource.next_run==None)
)
# Sysadmins will get all sources
if not Authorizer().is_sysadmin(user):
# This only applies to a non sysadmin user when using the

View File

@ -1,8 +1,10 @@
import hashlib
import logging
import datetime
from ckan.plugins import PluginImplementations
from ckan.logic import get_action
from ckanext.harvest.interfaces import IHarvester
from ckan.model import Package
@ -14,10 +16,11 @@ from ckanext.harvest.queue import get_gather_publisher
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
from ckanext.harvest.logic.schema import default_harvest_source_schema
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.logic.dictization import (harvest_source_dictize,harvest_object_dictize)
from ckanext.harvest.logic.action.create import _error_summary
from ckanext.harvest.logic.action.get import harvest_source_show,harvest_job_list
from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user
log = logging.getLogger(__name__)
@ -132,12 +135,56 @@ def harvest_objects_import(context,data_dict):
log.info('Harvest objects imported: %s', last_objects_count)
return last_objects_count
def _caluclate_next_run(frequency):
now = datetime.datetime.utcnow()
if frequency == 'ALWAYS':
return now
if frequency == 'WEEKLY':
return now + datetime.timedelta(weeks=1)
if frequency == 'BIWEEKLY':
return now + datetime.timedelta(weeks=2)
if frequency == 'DAILY':
return now + datetime.timedelta(days=1)
if frequency == 'MONTHLY':
if now.month in (4,6,9,11):
days = 30
elif now.month == 2:
if now.year % 4 == 0:
days = 29
else:
days = 28
else:
days = 31
return now + datetime.timedelta(days=days)
raise Exception('Frequency {freq} not recognised'.format(freq=frequency))
def _make_scheduled_jobs(context, data_dict):
data_dict = {'only_to_run': True,
'only_active': True}
sources = _get_sources_for_user(context, data_dict)
for source in sources:
data_dict = {'source_id': source.id}
try:
get_action('harvest_job_create')(context, data_dict)
except HarvestJobExists, e:
log.info('Trying to rerun job for %s skipping' % source.id)
source.next_run = _caluclate_next_run(source.frequency)
source.save()
def harvest_jobs_run(context,data_dict):
log.info('Harvest job run: %r', data_dict)
check_access('harvest_jobs_run',context,data_dict)
source_id = data_dict.get('source_id',None)
if not source_id:
_make_scheduled_jobs(context, data_dict)
# Check if there are pending harvest jobs
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
if len(jobs) == 0:

View File

@ -11,7 +11,8 @@ from ckanext.harvest.logic.validators import (harvest_source_id_exists,
harvest_source_url_validator,
harvest_source_type_exists,
harvest_source_config_validator,
harvest_source_active_validator,)
harvest_source_active_validator,
harvest_source_frequency_exists)
def default_harvest_source_schema():
@ -21,6 +22,7 @@ def default_harvest_source_schema():
'type': [not_empty, unicode, harvest_source_type_exists],
'title': [ignore_missing,unicode],
'description': [ignore_missing,unicode],
'frequency': [ignore_missing,unicode, harvest_source_frequency_exists],
'active': [ignore_missing,harvest_source_active_validator],
'user_id': [ignore_missing,unicode],
'config': [ignore_missing,harvest_source_config_validator]

View File

@ -99,3 +99,7 @@ def harvest_source_active_validator(value,context):
return False
return bool(value)
def harvest_source_frequency_exists(value):
if value.upper() not in ['MONTHLY','ALWAYS','WEEKLY','BIWEEKLY','DAILY']:
raise Invalid('Frequency %s not recognised' % value)
return value.upper()

View File

@ -36,6 +36,7 @@ harvest_job_table = None
harvest_object_table = None
harvest_gather_error_table = None
harvest_object_error_table = None
harvest_object_extra_table = None
def setup():
@ -53,6 +54,7 @@ def setup():
harvest_object_table.create()
harvest_gather_error_table.create()
harvest_object_error_table.create()
harvest_object_extra_table.create()
log.debug('Harvest tables created')
else:
@ -64,6 +66,9 @@ def setup():
if not 'title' in [column['name'] for column in columns]:
log.debug('Harvest tables need to be updated')
migrate_v2()
if not 'frequency' in [column['name'] for column in columns]:
log.debug('Harvest tables need to be updated')
migrate_v3()
else:
log.debug('Harvest table creation deferred')
@ -123,6 +128,9 @@ class HarvestObject(HarvestDomainObject):
'''
class HarvestObjectExtra(HarvestDomainObject):
'''Extra key value data for Harvest objects'''
class HarvestGatherError(HarvestDomainObject):
'''Gather errors are raised during the **gather** stage of a harvesting
job.
@ -152,6 +160,7 @@ def define_harvester_tables():
global harvest_source_table
global harvest_job_table
global harvest_object_table
global harvest_object_extra_table
global harvest_gather_error_table
global harvest_object_error_table
@ -166,6 +175,8 @@ def define_harvester_tables():
Column('active',types.Boolean,default=True),
Column('user_id', types.UnicodeText, default=u''),
Column('publisher_id', types.UnicodeText, default=u''),
Column('frequency', types.UnicodeText, default=u''),
Column('next_run', types.DateTime),
)
# Was harvesting_job
harvest_job_table = Table('harvest_job', metadata,
@ -185,12 +196,24 @@ def define_harvester_tables():
Column('fetch_started', types.DateTime),
Column('content', types.UnicodeText, nullable=True),
Column('fetch_finished', types.DateTime),
Column('import_started', types.DateTime),
Column('import_finished', types.DateTime),
Column('state', types.UnicodeText, default=u'WAITING'),
Column('metadata_modified_date', types.DateTime),
Column('retry_times',types.Integer),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('package_id', types.UnicodeText, ForeignKey('package.id'), nullable=True),
)
# New table
harvest_object_extra_table = Table('harvest_object_extra', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')),
Column('key',types.UnicodeText),
Column('value', types.UnicodeText),
)
# New table
harvest_gather_error_table = Table('harvest_gather_error',metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
@ -270,6 +293,17 @@ def define_harvester_tables():
},
)
mapper(
HarvestObjectExtra,
harvest_object_extra_table,
properties={
'object':relation(
HarvestObject,
backref='extras'
),
},
)
event.listen(HarvestObject, 'before_insert', harvest_object_before_insert_listener)
def migrate_v2():
@ -312,3 +346,38 @@ def migrate_v2():
Session.commit()
log.info('Harvest tables migrated to v2')
def migrate_v3():
log.debug('Migrating harvest tables to v3. This may take a while...')
conn = Session.connection()
statement = """CREATE TABLE harvest_object_extra (
id text NOT NULL,
harvest_object_id text,
"key" text,
"value" text
);
ALTER TABLE harvest_object
ADD COLUMN import_started timestamp without time zone,
ADD COLUMN import_finished timestamp without time zone,
ADD COLUMN "state" text;
ALTER TABLE harvest_source
ADD COLUMN frequency text,
ADD COLUMN next_run timestamp without time zone;
ALTER TABLE harvest_object_extra
ADD CONSTRAINT harvest_object_extra_pkey PRIMARY KEY (id);
ALTER TABLE harvest_object_extra
ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id);
UPDATE harvest_object set state = 'COMPLETE';
"""
conn.execute(statement)
Session.commit()
log.info('Harvest tables migrated to v3')

View File

@ -155,13 +155,22 @@ def fetch_callback(channel, method, header, body):
# See if the plugin can fetch the harvest object
obj.fetch_started = datetime.datetime.now()
obj.state = "FETCH"
obj.save()
success = harvester.fetch_stage(obj)
obj.fetch_finished = datetime.datetime.now()
obj.save()
#TODO: retry times?
if success:
# If no errors where found, call the import method
obj.import_started = datetime.datetime.now()
obj.state = "IMPORT"
obj.save()
harvester.import_stage(obj)
obj.import_finished = datetime.datetime.now()
if obj.state != "ERROR":
obj.state = "COMPLETE"
obj.save()

View File

@ -1,308 +0,0 @@
# NB These tests have been copied as bleeding chunks from
# dgu/ckanext/dgu/tests/forms/test_form_api.py and will need fixing up
# drastically.
from ckan.tests import *
from ckan.tests import search_related
from ckan.tests.functional.api.base import (ApiTestCase,
Api1TestCase,
Api2TestCase,
ApiUnversionedTestCase)
from ckanext.dgu.tests import WsgiAppCase, MockDrupalCase
from ckanext.harvest.lib import get_harvest_source, create_harvest_source
import ckan.model as model
import ckan.authz as authz
class BaseFormsApiCase(ModelMethods, ApiTestCase, WsgiAppCase, CommonFixtureMethods, CheckMethods, MockDrupalCase):
'''Utilities and pythonic wrapper for the Forms API for testing it.'''
@staticmethod
def get_harvest_source_by_url(source_url, default=Exception):
return get_harvest_source(source_url,attr='url',default=default)
def create_harvest_source(self, **kwds):
source = create_harvest_source(kwds)
return source
def delete_harvest_source(self, url):
source = self.get_harvest_source_by_url(url, None)
if source:
self.delete_commit(source)
def offset_harvest_source_create_form(self):
return self.offset('/form/harvestsource/create')
def offset_harvest_source_edit_form(self, ref):
return self.offset('/form/harvestsource/edit/%s' % ref)
def get_harvest_source_create_form(self, status=[200]):
offset = self.offset_harvest_source_create_form()
res = self.get(offset, status=status)
return self.form_from_res(res)
def get_harvest_source_edit_form(self, harvest_source_id, status=[200]):
offset = self.offset_harvest_source_edit_form(harvest_source_id)
res = self.get(offset, status=status)
return self.form_from_res(res)
def post_harvest_source_create_form(self, form=None, status=[201], **field_args):
if form == None:
form = self.get_harvest_source_create_form()
for key,field_value in field_args.items():
field_name = 'HarvestSource--%s' % key
form[field_name] = field_value
form_data = form.submit_fields()
data = {
'form_data': form_data,
'user_id': 'example publisher user',
'publisher_id': 'example publisher',
}
offset = self.offset_harvest_source_create_form()
return self.post(offset, data, status=status)
def post_harvest_source_edit_form(self, harvest_source_id, form=None, status=[200], **field_args):
if form == None:
form = self.get_harvest_source_edit_form(harvest_source_id)
for key,field_value in field_args.items():
field_name = 'HarvestSource-%s-%s' % (harvest_source_id, key)
self.set_formfield(form, field_name, field_value)
form_data = form.submit_fields()
data = {
'form_data': form_data,
'user_id': 'example publisher user',
'publisher_id': 'example publisher',
}
offset = self.offset_harvest_source_edit_form(harvest_source_id)
return self.post(offset, data, status=status)
class FormsApiTestCase(BaseFormsApiCase):
@classmethod
def setup_class(cls):
super(FormsApiTestCase, cls).setup_class()
from ckanext.harvest.model import setup as harvest_setup
harvest_setup()
def setup(self):
model.repo.init_db()
CreateTestData.create()
self.package_name = u'formsapi'
self.package_name_alt = u'formsapialt'
self.package_name_alt2 = u'formsapialt2'
self.apikey_header_name = config.get('apikey_header_name', 'X-CKAN-API-Key')
self.user = self.get_user_by_name(u'tester')
if not self.user:
self.user = self.create_user(name=u'tester')
self.user = self.get_user_by_name(u'tester')
model.add_user_to_role(self.user, model.Role.ADMIN, model.System())
model.repo.commit_and_remove()
self.extra_environ = {
self.apikey_header_name : str(self.user.apikey)
}
self.create_package(name=self.package_name)
self.harvest_source = None
def teardown(self):
model.repo.rebuild_db()
model.Session.connection().invalidate()
@classmethod
def teardown_class(cls):
super(FormsApiTestCase, cls).teardown_class()
def test_get_harvest_source_create_form(self):
raise SkipTest('These tests should be moved to ckanext-harvest.')
form = self.get_harvest_source_create_form()
self.assert_formfield(form, 'HarvestSource--url', '')
self.assert_formfield(form, 'HarvestSource--type', 'CSW Server')
self.assert_formfield(form, 'HarvestSource--description', '')
def test_submit_harvest_source_create_form_valid(self):
raise SkipTest('These tests should be moved to ckanext-harvest.')
source_url = u'http://localhost/'
source_type= u'CSW Server'
source_description = u'My harvest source.'
assert not self.get_harvest_source_by_url(source_url, None)
res = self.post_harvest_source_create_form(url=source_url,type=source_type,description=source_description)
self.assert_header(res, 'Location')
# Todo: Check the Location looks promising (extract and check given ID).
self.assert_blank_response(res)
source = self.get_harvest_source_by_url(source_url) # Todo: Use extracted ID.
assert_equal(source['user_id'], 'example publisher user')
assert_equal(source['publisher_id'], 'example publisher')
def test_submit_harvest_source_create_form_invalid(self):
raise SkipTest('These tests should be moved to ckanext-harvest.')
source_url = u'' # Blank URL.
source_type= u'CSW Server'
assert not self.get_harvest_source_by_url(source_url, None)
res = self.post_harvest_source_create_form(url=source_url, status=[400])
self.assert_not_header(res, 'Location')
assert "URL for source of metadata: Please enter a value" in res.body, res.body
assert not self.get_harvest_source_by_url(source_url, None)
source_url = u'something' # Not '^http://'
source_type= u'CSW Server'
assert not self.get_harvest_source_by_url(source_url, None)
res = self.post_harvest_source_create_form(url=source_url, status=[400])
self.assert_not_header(res, 'Location')
assert "URL for source of metadata: Harvest source URL is invalid" in res.body, res.body
assert not self.get_harvest_source_by_url(source_url, None)
def test_get_harvest_source_edit_form(self):
raise SkipTest('These tests should be moved to ckanext-harvest.')
source_url = u'http://'
source_type = u'CSW Server'
source_description = u'An example harvest source.'
self.harvest_source = self.create_harvest_source(url=source_url,type=source_type,description=source_description)
form = self.get_harvest_source_edit_form(self.harvest_source['id'])
self.assert_formfield(form, 'HarvestSource-%s-url' % self.harvest_source['id'], source_url)
self.assert_formfield(form, 'HarvestSource-%s-type' % self.harvest_source['id'], source_type)
self.assert_formfield(form, 'HarvestSource-%s-description' % self.harvest_source['id'], source_description)
def test_submit_harvest_source_edit_form_valid(self):
raise SkipTest('These tests should be moved to ckanext-harvest.')
source_url = u'http://'
source_type = u'CSW Server'
source_description = u'An example harvest source.'
alt_source_url = u'http://a'
alt_source_type = u'Web Accessible Folder (WAF)'
alt_source_description = u'An old example harvest source.'
self.harvest_source = self.create_harvest_source(url=source_url, type=source_type,description=source_description)
assert self.get_harvest_source_by_url(source_url, None)
assert not self.get_harvest_source_by_url(alt_source_url, None)
res = self.post_harvest_source_edit_form(self.harvest_source['id'], url=alt_source_url, type=alt_source_type,description=alt_source_description)
self.assert_not_header(res, 'Location')
# Todo: Check the Location looks promising (extract and check given ID).
self.assert_blank_response(res)
assert not self.get_harvest_source_by_url(source_url, None)
source = self.get_harvest_source_by_url(alt_source_url) # Todo: Use extracted ID.
assert source
assert_equal(source['user_id'], 'example publisher user')
assert_equal(source['publisher_id'], 'example publisher')
def test_submit_harvest_source_edit_form_invalid(self):
raise SkipTest('These tests should be moved to ckanext-harvest.')
source_url = u'http://'
source_type = u'CSW Server'
source_description = u'An example harvest source.'
alt_source_url = u''
self.harvest_source = self.create_harvest_source(url=source_url, type=source_type,description=source_description)
assert self.get_harvest_source_by_url(source_url, None)
res = self.post_harvest_source_edit_form(self.harvest_source['id'], url=alt_source_url, status=[400])
assert self.get_harvest_source_by_url(source_url, None)
self.assert_not_header(res, 'Location')
assert "URL for source of metadata: Please enter a value" in res.body, res.body
class FormsApiAuthzTestCase(BaseFormsApiCase):
def setup(self):
# need to do this for every test since we mess with System rights
CreateTestData.create()
model.repo.new_revision()
model.Session.add(model.User(name=u'testadmin'))
model.Session.add(model.User(name=u'testsysadmin'))
model.Session.add(model.User(name=u'notadmin'))
model.repo.commit_and_remove()
pkg = model.Package.by_name(u'annakarenina')
admin = model.User.by_name(u'testadmin')
sysadmin = model.User.by_name(u'testsysadmin')
model.add_user_to_role(admin, model.Role.ADMIN, pkg)
model.add_user_to_role(sysadmin, model.Role.ADMIN, model.System())
model.repo.commit_and_remove()
self.pkg = model.Package.by_name(u'annakarenina')
self.admin = model.User.by_name(u'testadmin')
self.sysadmin = model.User.by_name(u'testsysadmin')
self.notadmin = model.User.by_name(u'notadmin')
def teardown(self):
model.Session.remove()
model.repo.rebuild_db()
model.Session.remove()
def check_create_harvest_source(self, username, expect_success=True):
user = model.User.by_name(username)
self.extra_environ={'Authorization' : str(user.apikey)}
expect_status = 200 if expect_success else 403
form = self.get_harvest_source_create_form(status=expect_status)
def check_edit_harvest_source(self, username, expect_success=True):
# create a harvest source
source_url = u'http://localhost/'
source_type = u'CSW Server'
source_description = u'My harvest source.'
sysadmin = model.User.by_name(u'testsysadmin')
self.extra_environ={'Authorization' : str(sysadmin.apikey)}
if not self.get_harvest_source_by_url(source_url, None):
res = self.post_harvest_source_create_form(url=source_url, type=source_type,description=source_description)
harvest_source = self.get_harvest_source_by_url(source_url, None)
assert harvest_source
user = model.User.by_name(username)
self.extra_environ={'Authorization' : str(user.apikey)}
expect_status = 200 if expect_success else 403
form = self.get_harvest_source_edit_form(harvest_source['id'], status=expect_status)
def remove_default_rights(self):
roles = []
system_role_query = model.Session.query(model.SystemRole)
package_role_query = model.Session.query(model.PackageRole)
for pseudo_user in (u'logged_in', u'visitor'):
roles.extend(system_role_query.join('user').\
filter_by(name=pseudo_user).all())
roles.extend(package_role_query.join('package').\
filter_by(name='annakarenina').\
join('user').filter_by(name=pseudo_user).all())
for role in roles:
role.delete()
model.repo.commit_and_remove()
def test_harvest_source_create(self):
raise SkipTest('These tests should be moved to ckanext-harvest.')
self.check_create_harvest_source('testsysadmin', expect_success=True)
self.check_create_harvest_source('testadmin', expect_success=False)
self.check_create_harvest_source('notadmin', expect_success=False)
self.remove_default_rights()
self.check_create_harvest_source('testsysadmin', expect_success=True)
self.check_create_harvest_source('testadmin', expect_success=False)
self.check_create_harvest_source('notadmin', expect_success=False)
def test_harvest_source_edit(self):
raise SkipTest('These tests should be moved to ckanext-harvest.')
self.check_edit_harvest_source('testsysadmin', expect_success=True)
self.check_edit_harvest_source('testadmin', expect_success=False)
self.check_edit_harvest_source('notadmin', expect_success=False)
self.remove_default_rights()
self.check_edit_harvest_source('testsysadmin', expect_success=True)
self.check_edit_harvest_source('testadmin', expect_success=False)
self.check_edit_harvest_source('notadmin', expect_success=False)
class TestFormsApi1(Api1TestCase, FormsApiTestCase): pass
class TestFormsApi2(Api2TestCase, FormsApiTestCase): pass
class TestFormsApiUnversioned(ApiUnversionedTestCase, FormsApiTestCase): pass
class WithOrigKeyHeader(FormsApiTestCase):
apikey_header_name = 'Authorization'
class TestFormsApiAuthz1(Api1TestCase, FormsApiAuthzTestCase): pass
class TestFormsApiAuthz2(Api2TestCase, FormsApiAuthzTestCase): pass
class TestFormsApiAuthzUnversioned(ApiUnversionedTestCase, FormsApiAuthzTestCase): pass

View File

@ -1,111 +0,0 @@
from ckan.tests import *
from ckanext.harvest.model import HarvestSource
import ckan.model as model
from ckan.tests.pylons_controller import PylonsTestCase
import ckanext.dgu.forms.harvest_source as form
from nose.plugins.skip import SkipTest;
raise SkipTest('These tests should be moved to ckanext-harvest.')
class TestHarvestSource(PylonsTestCase):
@classmethod
def setup_class(cls):
super(TestHarvestSource, cls).setup_class()
def setup(self):
model.repo.init_db()
def teardown(self):
model.repo.rebuild_db()
def test_form_raw(self):
fs = form.get_harvest_source_fieldset()
text = fs.render()
assert 'url' in text
assert 'type' in text
assert 'description' in text
def test_form_bound_to_existing_object(self):
source = HarvestSource(url=u'http://localhost/', description=u'My source', type=u'Gemini')
model.Session.add(source)
model.Session.commit()
model.Session.remove()
fs = form.get_harvest_source_fieldset()
fs = fs.bind(source)
text = fs.render()
assert 'url' in text
assert 'http://localhost/' in text
assert 'description' in text
assert 'My source' in text
def test_form_bound_to_new_object(self):
source = HarvestSource(url=u'http://localhost/', description=u'My source', type=u'Gemini')
fs = form.get_harvest_source_fieldset()
fs = fs.bind(source)
text = fs.render()
assert 'url' in text
assert 'http://localhost/' in text
assert 'description' in text
assert 'My source' in text
def test_form_validate_new_object_and_sync(self):
assert not HarvestSource.get(u'http://localhost/', None, 'url')
fs = form.get_harvest_source_fieldset()
register = HarvestSource
data = {
'HarvestSource--url': u'http://localhost/',
'HarvestSource--type': u'Gemini',
'HarvestSource--description': u'My source'
}
fs = fs.bind(register, data=data, session=model.Session)
# Test bound_fields.validate().
fs.validate()
assert not fs.errors
# Test bound_fields.sync().
fs.sync()
model.Session.commit()
source = HarvestSource.get(u'http://localhost/', None, 'url')
assert source.id
def test_form_invalidate_new_object_null(self):
fs = form.get_harvest_source_fieldset()
register = HarvestSource
data = {
'HarvestSource--url': u'',
'HarvestSource--type': u'Gemini',
'HarvestSource--description': u'My source'
}
fs = fs.bind(register, data=data)
# Test bound_fields.validate().
fs.validate()
assert fs.errors
def test_form_invalidate_new_object_not_http(self):
fs = form.get_harvest_source_fieldset()
register = HarvestSource
data = {
'HarvestSource--url': u'htp:',
'HarvestSource--type': u'Gemini',
'HarvestSource--description': u'My source'
}
fs = fs.bind(register, data=data)
# Test bound_fields.validate().
fs.validate()
assert fs.errors
def test_form_invalidate_new_object_no_type(self):
fs = form.get_harvest_source_fieldset()
register = HarvestSource
data = {
'HarvestSource--url': u'htp:',
'HarvestSource--type': u'',
'HarvestSource--description': u'My source'
}
fs = fs.bind(register, data=data)
# Test bound_fields.validate().
fs.validate()
assert fs.errors

View File

@ -0,0 +1,123 @@
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.queue as queue
from ckan.plugins.core import SingletonPlugin, implements
import json
import ckan.logic as logic
from ckan import model
class TestHarvester(SingletonPlugin):
implements(IHarvester)
def info(self):
return {'name': 'test', 'title': 'test', 'description': 'test'}
def gather_stage(self, harvest_job):
if harvest_job.source.url == 'basic_test':
obj = HarvestObject(guid = 'test1', job = harvest_job)
obj.extras.append(HarvestObjectExtra(key='key', value='value'))
obj2 = HarvestObject(guid = 'test2', job = harvest_job)
obj.add()
obj2.save() # this will commit both
return [obj.id, obj2.id]
return []
def fetch_stage(self, harvest_object):
assert harvest_object.state == "FETCH"
assert harvest_object.fetch_started != None
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
def import_stage(self, harvest_object):
assert harvest_object.state == "IMPORT"
assert harvest_object.fetch_finished != None
assert harvest_object.import_started != None
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
logic.get_action('package_create')(
{'model': model, 'session': model.Session,
'user': user, 'api_version': 3},
json.loads(harvest_object.content)
)
return True
class TestHarvestQueue(object):
@classmethod
def setup_class(cls):
harvest_model.setup()
@classmethod
def teardown_class(cls):
model.repo.rebuild_db()
def test_01_basic_harvester(cls):
### make sure queues/exchanges are created first and are empty
consumer = queue.get_consumer('ckan.harvest.gather','harvest_job_id')
consumer_fetch = queue.get_consumer('ckan.harvest.fetch','harvest_object_id')
consumer.queue_purge(queue='ckan.harvest.gather')
consumer.queue_purge(queue='ckan.harvest.fetch')
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3}
harvest_source = logic.get_action('harvest_source_create')(
context,
{'type':'test', 'url': 'basic_test'}
)
assert harvest_source['type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id':harvest_source['id']}
)
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
harvest_job = logic.get_action('harvest_jobs_run')(
context,
{'source_id':harvest_source['id']}
)
## pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 2
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 2
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
## do twice as two harvest objects
reply = consumer.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer, *reply)
reply = consumer.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer, *reply)
assert len(model.Session.query(model.Package).all()) == 2
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 2
assert all_objects[0].state == 'COMPLETE'
assert all_objects[1].state == 'COMPLETE'

View File

@ -34,6 +34,7 @@ setup(
# Add plugins here, eg
harvest=ckanext.harvest.plugin:Harvest
ckan_harvester=ckanext.harvest.harvesters:CKANHarvester
test_harvester=ckanext.harvest.tests.test_queue:TestHarvester
[paste.paster_command]
harvester = ckanext.harvest.commands.harvester:Harvester
""",

View File

@ -15,7 +15,7 @@ port = 5000
use = config:../ckan/test.ini
# Here we hard-code the database and a flag to make default tests
# run fast.
ckan.plugins = harvest ckan_harvester
ckan.plugins = harvest ckan_harvester test_harvester
# NB: other test configuration should go in test-core.ini, which is
# what the postgres tests use.