Merge branch 'tests'

This commit is contained in:
amercader 2015-10-23 13:18:15 +01:00
commit 2f4adfb338
9 changed files with 906 additions and 159 deletions

View File

@ -30,10 +30,22 @@ def harvest_source_show(context,data_dict):
:param id: the id or name of the harvest source :param id: the id or name of the harvest source
:type id: string :type id: string
:param url: url of the harvest source (as an alternative to the id)
:type url: string
:returns: harvest source metadata :returns: harvest source metadata
:rtype: dictionary :rtype: dictionary
''' '''
model = context.get('model')
# Find the source by URL
if data_dict.get('url') and not data_dict.get('id'):
source = model.Session.query(harvest_model.HarvestSource) \
.filter_by(url=data_dict.get('url')) \
.first()
if not source:
raise NotFound
data_dict['id'] = source.id
source_dict = logic.get_action('package_show')(context, data_dict) source_dict = logic.get_action('package_show')(context, data_dict)

View File

@ -39,13 +39,17 @@ harvest_gather_error_table = None
harvest_object_error_table = None harvest_object_error_table = None
harvest_object_extra_table = None harvest_object_extra_table = None
def setup(): def setup():
if harvest_source_table is None: if harvest_source_table is None:
define_harvester_tables() define_harvester_tables()
log.debug('Harvest tables defined in memory') log.debug('Harvest tables defined in memory')
if model.package_table.exists(): if not model.package_table.exists():
log.debug('Harvest table creation deferred')
return
if not harvest_source_table.exists(): if not harvest_source_table.exists():
# Create each table individually rather than # Create each table individually rather than
@ -64,10 +68,11 @@ def setup():
# Check if existing tables need to be updated # Check if existing tables need to be updated
inspector = Inspector.from_engine(engine) inspector = Inspector.from_engine(engine)
columns = inspector.get_columns('harvest_source') columns = inspector.get_columns('harvest_source')
if not 'title' in [column['name'] for column in columns]: column_names = [column['name'] for column in columns]
if not 'title' in column_names:
log.debug('Harvest tables need to be updated') log.debug('Harvest tables need to be updated')
migrate_v2() migrate_v2()
if not 'frequency' in [column['name'] for column in columns]: if not 'frequency' in column_names:
log.debug('Harvest tables need to be updated') log.debug('Harvest tables need to be updated')
migrate_v3() migrate_v3()
@ -80,9 +85,6 @@ def setup():
sources_to_migrate = [s[0] for s in sources_to_migrate] sources_to_migrate = [s[0] for s in sources_to_migrate]
migrate_v3_create_datasets(sources_to_migrate) migrate_v3_create_datasets(sources_to_migrate)
else:
log.debug('Harvest table creation deferred')
class HarvestError(Exception): class HarvestError(Exception):
pass pass

View File

@ -226,23 +226,12 @@ def gather_callback(channel, method, header, body):
for harvester in PluginImplementations(IHarvester): for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == job.source.type: if harvester.info()['name'] == job.source.type:
harvester_found = True harvester_found = True
# Get a list of harvest object ids from the plugin
job.gather_started = datetime.datetime.utcnow()
try: try:
harvest_object_ids = harvester.gather_stage(job) harvest_object_ids = gather_stage(harvester, job)
except (Exception, KeyboardInterrupt): except (Exception, KeyboardInterrupt):
channel.basic_ack(method.delivery_tag) channel.basic_ack(method.delivery_tag)
harvest_objects = model.Session.query(HarvestObject).filter_by(
harvest_job_id=job.id
)
for harvest_object in harvest_objects:
model.Session.delete(harvest_object)
model.Session.commit()
raise raise
finally:
job.gather_finished = datetime.datetime.utcnow()
job.save()
if not isinstance(harvest_object_ids, list): if not isinstance(harvest_object_ids, list):
log.error('Gather stage failed') log.error('Gather stage failed')
@ -279,6 +268,31 @@ def gather_callback(channel, method, header, body):
channel.basic_ack(method.delivery_tag) channel.basic_ack(method.delivery_tag)
def gather_stage(harvester, job):
'''Calls the harvester's gather_stage, returning harvest object ids, with
some error handling.
This is split off from gather_callback so that tests can call it without
dealing with queue stuff.
'''
job.gather_started = datetime.datetime.utcnow()
try:
harvest_object_ids = harvester.gather_stage(job)
except (Exception, KeyboardInterrupt):
harvest_objects = model.Session.query(HarvestObject).filter_by(
harvest_job_id=job.id
)
for harvest_object in harvest_objects:
model.Session.delete(harvest_object)
model.Session.commit()
raise
finally:
job.gather_finished = datetime.datetime.utcnow()
job.save()
return harvest_object_ids
def fetch_callback(channel, method, header, body): def fetch_callback(channel, method, header, body):
try: try:
id = json.loads(body)['harvest_object_id'] id = json.loads(body)['harvest_object_id']
@ -288,7 +302,6 @@ def fetch_callback(channel, method, header, body):
channel.basic_ack(method.delivery_tag) channel.basic_ack(method.delivery_tag)
return False return False
obj = HarvestObject.get(id) obj = HarvestObject.get(id)
if not obj: if not obj:
log.error('Harvest object does not exist: %s' % id) log.error('Harvest object does not exist: %s' % id)

View File

@ -1,14 +1,92 @@
import factory import factory
from ckanext.harvest.model import HarvestSource, HarvestJob import ckanext.harvest.model as harvest_model
try:
from ckan.tests.factories import _get_action_user_name
except ImportError:
from ckan.new_tests.factories import _get_action_user_name
from ckan.plugins import toolkit
class HarvestSourceFactory(factory.Factory):
FACTORY_FOR = HarvestSource
url = "http://harvest.test.com" class HarvestSource(factory.Factory):
type = "test-harvest-source" FACTORY_FOR = harvest_model.HarvestSource
_return_type = 'dict'
class HarvestJobFactory(factory.Factory): name = factory.Sequence(lambda n: 'test_source_{n}'.format(n=n))
FACTORY_FOR = HarvestJob title = factory.Sequence(lambda n: 'test title {n}'.format(n=n))
url = factory.Sequence(lambda n: 'http://{n}.test.com'.format(n=n))
source_type = 'test' # defined in test_queue.py
id = '{0}_id'.format(name).lower()
status = "New" @classmethod
source = factory.SubFactory(HarvestSourceFactory) def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
# If there is an existing source for this URL, and we can't create
# another source with that URL, just return the original one.
try:
source_dict = toolkit.get_action('harvest_source_show')(
context, dict(url=kwargs['url']))
except toolkit.ObjectNotFound:
source_dict = toolkit.get_action('harvest_source_create')(
context, kwargs)
if cls._return_type == 'dict':
return source_dict
else:
return cls.FACTORY_FOR.get(source_dict['id'])
class HarvestSourceObj(HarvestSource):
_return_type = 'obj'
class HarvestJob(factory.Factory):
FACTORY_FOR = harvest_model.HarvestJob
_return_type = 'dict'
source = factory.SubFactory(HarvestSourceObj)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
if 'source_id' not in kwargs:
kwargs['source_id'] = kwargs['source'].id
job_dict = toolkit.get_action('harvest_job_create')(
context, kwargs)
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
class HarvestJobObj(HarvestJob):
_return_type = 'obj'
class HarvestObject(factory.Factory):
FACTORY_FOR = harvest_model.HarvestObject
_return_type = 'dict'
#source = factory.SubFactory(HarvestSourceObj)
job = factory.SubFactory(HarvestJobObj)
@classmethod
def _create(cls, target_class, *args, **kwargs):
if args:
assert False, "Positional args aren't supported, use keyword args."
context = {'user': _get_action_user_name(kwargs)}
if 'job_id' not in kwargs:
kwargs['job_id'] = kwargs['job'].id
kwargs['source_id'] = kwargs['job'].source.id
job_dict = toolkit.get_action('harvest_object_create')(
context, kwargs)
if cls._return_type == 'dict':
return job_dict
else:
return cls.FACTORY_FOR.get(job_dict['id'])
class HarvestObjectObj(HarvestObject):
_return_type = 'obj'

View File

@ -0,0 +1,458 @@
import json
import re
import copy
import SimpleHTTPServer
import SocketServer
from threading import Thread
PORT = 8998
class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def do_GET(self):
# test name is the first bit of the URL and makes CKAN behave
# differently in some way.
# Its value is recorded and then removed from the path
self.test_name = None
test_name_match = re.match('^/([^/]+)/', self.path)
if test_name_match:
self.test_name = test_name_match.groups()[0]
if self.test_name == 'api':
self.test_name = None
else:
self.path = re.sub('^/([^/]+)/', '/', self.path)
# The API version is recorded and then removed from the path
api_version = None
version_match = re.match('^/api/(\d)', self.path)
if version_match:
api_version = int(version_match.groups()[0])
self.path = re.sub('^/api/(\d)/', '/api/', self.path)
if self.path == '/api/rest/package':
if api_version == 2:
dataset_refs = [d['id'] for d in DATASETS]
else:
dataset_refs = [d['name'] for d in DATASETS]
return self.respond_json(dataset_refs)
if self.path.startswith('/api/rest/package/'):
dataset_ref = self.path.split('/')[-1]
dataset = self.get_dataset(dataset_ref)
if dataset:
return self.respond_json(
convert_dataset_to_restful_form(dataset))
if self.path.startswith('/api/action/package_show'):
params = self.get_url_params()
dataset_ref = params['id']
dataset = self.get_dataset(dataset_ref)
if dataset:
return self.respond_json(dataset)
if self.path.startswith('/api/search/dataset'):
params = self.get_url_params()
if params.keys() == ['organization']:
org = self.get_org(params['organization'])
dataset_ids = [d['id'] for d in DATASETS
if d['owner_org'] == org['id']]
return self.respond_json({'count': len(dataset_ids),
'results': dataset_ids})
else:
return self.respond(
'Not implemented search params %s' % params, status=400)
if self.path.startswith('/api/search/revision'):
revision_ids = [r['id'] for r in REVISIONS]
return self.respond_json(revision_ids)
if self.path.startswith('/api/rest/revision/'):
revision_ref = self.path.split('/')[-1]
for rev in REVISIONS:
if rev['id'] == revision_ref:
return self.respond_json(rev)
self.respond('Cannot find revision', status=404)
# if we wanted to server a file from disk, then we'd call this:
#return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
self.respond('Mock CKAN doesnt recognize that call', status=400)
def get_dataset(self, dataset_ref):
for dataset in DATASETS:
if dataset['name'] == dataset_ref or \
dataset['id'] == dataset_ref:
if self.test_name == 'invalid_tag':
dataset['tags'] = INVALID_TAGS
return dataset
def get_org(self, org_ref):
for org in ORGS:
if org['name'] == org_ref or \
org['id'] == org_ref:
return org
def get_url_params(self):
params = self.path.split('?')[-1].split('&')
return dict([param.split('=') for param in params])
def respond_action(self, result_dict, status=200):
response_dict = {'result': result_dict}
return self.respond_json(response_dict, status=status)
def respond_json(self, content_dict, status=200):
return self.respond(json.dumps(content_dict), status=status,
content_type='application/json')
def respond(self, content, status=200, content_type='application/json'):
self.send_response(status)
self.send_header('Content-Type', content_type)
self.end_headers()
self.wfile.write(content)
self.wfile.close()
def serve(port=PORT):
'''Runs a CKAN-alike app (over HTTP) that is used for harvesting tests'''
# Choose the directory to serve files from
#os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)),
# 'mock_ckan_files'))
class TestServer(SocketServer.TCPServer):
allow_reuse_address = True
httpd = TestServer(("", PORT), MockCkanHandler)
print 'Serving test HTTP server at port', PORT
httpd_thread = Thread(target=httpd.serve_forever)
httpd_thread.setDaemon(True)
httpd_thread.start()
def convert_dataset_to_restful_form(dataset):
dataset = copy.deepcopy(dataset)
dataset['extras'] = dict([(e['key'], e['value']) for e in dataset['extras']])
dataset['tags'] = [t['name'] for t in dataset.get('tags', [])]
return dataset
# Datasets are in the package_show form, rather than the RESTful form
DATASETS = [
{'id': 'dataset1-id',
'name': 'dataset1',
'title': 'Test Dataset1',
'owner_org': 'org1-id',
'extras': []},
{
"id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"name": "cabinet-office-energy-use",
"private": False,
"maintainer_email": None,
"revision_timestamp": "2010-11-23T22:34:55.089925",
"organization":
{
"description": "The Cabinet Office supports the Prime Minister and Deputy Prime Minister, and ensure the effective running of government. We are also the corporate headquarters for government, in partnership with HM Treasury, and we take the lead in certain critical policy areas.\r\nCO is a ministerial department, supported by 18 agencies and public bodies\r\n\r\nYou can find out more at https://www.gov.uk/government/organisations/cabinet-office",
"created": "2012-06-27T14:48:40.244951",
"title": "Cabinet Office",
"name": "cabinet-office",
"revision_timestamp": "2013-04-02T14:27:23.086886",
"is_organization": True,
"state": "active",
"image_url": "",
"revision_id": "4be8825d-d3f4-4fb2-b80b-43e36f574c05",
"type": "organization",
"id": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"approval_status": "pending"
},
"update_frequency": "other",
"metadata_created": "2010-08-02T09:19:47.600853",
"last_major_modification": "2010-08-02T09:19:47.600853",
"metadata_modified": "2014-05-09T22:00:01.486366",
"temporal_granularity": "",
"author_email": None,
"geographic_granularity": "point",
"geographic_coverage": [ ],
"state": "active",
"version": None,
"temporal_coverage-to": "",
"license_id": "uk-ogl",
"type": "dataset",
"published_via": "",
"resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"num_resources": 1,
"tags":
[
{
"vocabulary_id": None,
"display_name": "consumption",
"name": "consumption",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8"
},
{
"vocabulary_id": None,
"display_name": "energy",
"name": "energy",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "9f2ae723-602f-4290-80c4-6637ad617a45"
}
],
"precision": "",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"taxonomy_url": "",
"groups": [],
"creator_user_id": None,
"national_statistic": "no",
"relationships_as_subject": [],
"num_tags": 8,
"update_frequency-other": "Real-time",
"isopen": True,
"url": "http://www.carbonculture.net/orgs/cabinet-office/70-whitehall/",
"notes": "Cabinet Office head office energy use updated from on-site meters showing use, cost and carbon impact.",
"owner_org": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"theme-secondary":
[
"Environment"
],
"extras":
[
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "categories",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "6813d71b-785b-4f56-b296-1b2acb34eed6"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "2010-07-30",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_released",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "515f638b-e2cf-40a6-a8a7-cbc8001269e3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_updated",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "bff63465-4f96-44e7-bb87-6e66fff5e596"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "000000: ",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_coverage",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "414bcd35-b628-4218-99e2-639615183df8"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "point",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "c7b460dd-c61f-4cd2-90c2-eceb6c91fe9b"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "no",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "national_statistic",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "9f04b202-3646-49be-b69e-7fa997399ff3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "{\"status\": \"final\", \"source\": \"Automatically awarded by ODI\", \"certification_type\": \"automatically awarded\", \"level\": \"raw\", \"title\": \"Cabinet Office 70 Whitehall energy use\", \"created_at\": \"2014-10-28T12:25:57Z\", \"jurisdiction\": \"GB\", \"certificate_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922\", \"badge_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922/badge.png\", \"cert_title\": \"Basic Level Certificate\"}",
"revision_timestamp": "2014-11-12T02:52:35.048060",
"state": "active",
"key": "odi-certificate",
"revision_id": "eae9763b-e258-4d76-9ec2-7f5baf655394",
"id": "373a3cbb-d9c0-45a6-9a78-b95c86398766"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-from",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "39f72eed-6f76-4733-b636-7541cee3404f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-to",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "818e2c8f-fee0-49da-8bea-ea3c9401ece5"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "f868b950-d3ce-4fbe-88ca-5cbc4b672320"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Towns & Cities",
"revision_timestamp": "2015-03-16T18:10:08.802815",
"state": "active",
"key": "theme-primary",
"revision_id": "fc2b6630-84f8-4c88-8ac7-0ca275b2bc97",
"id": "bdcf00fd-3248-4c2f-9cf8-b90706c88e8d"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "[\"Environment\"]",
"revision_timestamp": "2015-04-08T20:57:04.895214",
"state": "active",
"key": "theme-secondary",
"revision_id": "c2c48530-ff82-4af1-9373-cdc64d5bc83c",
"id": "417482c5-a9c0-4430-8c4e-0c76e59fe44f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Real-time",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "update_frequency",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "e8ad4837-514e-4446-81a2-ffacfa7cf683"
}
],
"license_url": "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/",
"individual_resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"title": "Cabinet Office 70 Whitehall energy use",
"revision_id": "3bd6ced3-35b2-4b20-94e2-c596e24bc375",
"date_released": "30/7/2010",
"theme-primary": "Towns & Cities"
}
]
INVALID_TAGS = [
{
"vocabulary_id": None,
"display_name": "consumption%^&",
"name": "consumption%^&",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8"
},
]
ORGS = [
{'id': 'org1-id',
'name': 'org1'},
{'id': 'aa1e068a-23da-4563-b9c2-2cad272b663e',
'name': 'cabinet-office'}
]
REVISIONS = [
{
"id": "23daf2eb-d7ec-4d86-a844-3924acd311ea",
"timestamp": "2015-10-21T09:50:08.160045",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
"dataset1"
],
"groups": [ ]
},
{
"id": "8254a293-10db-4af2-9dfa-6a1f06ee899c",
"timestamp": "2015-10-21T09:46:21.198021",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
"dataset1"
],
"groups": [ ]
}]

View File

@ -0,0 +1,118 @@
from nose.tools import assert_equal
import json
try:
from ckan.tests.helpers import reset_db
from ckan.tests.factories import Organization
except ImportError:
from ckan.new_tests.helpers import reset_db
from ckan.new_tests.factories import Organization
from ckan import model
from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj,
HarvestObjectObj)
from ckanext.harvest.tests.lib import run_harvest, run_harvest_job
import ckanext.harvest.model as harvest_model
from ckanext.harvest.harvesters.ckanharvester import CKANHarvester
import mock_ckan
# Start CKAN-alike server we can test harvesting against it
mock_ckan.serve()
class TestCkanHarvester(object):
@classmethod
def setup(cls):
reset_db()
harvest_model.setup()
def test_gather_normal(self):
source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT)
job = HarvestJobObj(source=source)
harvester = CKANHarvester()
obj_ids = harvester.gather_stage(job)
assert_equal(type(obj_ids), list)
assert_equal(len(obj_ids), len(mock_ckan.DATASETS))
harvest_object = harvest_model.HarvestObject.get(obj_ids[0])
assert_equal(harvest_object.guid, mock_ckan.DATASETS[0]['id'])
def test_fetch_normal(self):
source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT)
job = HarvestJobObj(source=source)
harvest_object = HarvestObjectObj(guid=mock_ckan.DATASETS[0]['id'],
job=job)
harvester = CKANHarvester()
result = harvester.fetch_stage(harvest_object)
assert_equal(result, True)
assert_equal(
harvest_object.content,
json.dumps(
mock_ckan.convert_dataset_to_restful_form(
mock_ckan.DATASETS[0])))
def test_import_normal(self):
org = Organization()
harvest_object = HarvestObjectObj(
guid=mock_ckan.DATASETS[0]['id'],
content=json.dumps(mock_ckan.convert_dataset_to_restful_form(
mock_ckan.DATASETS[0])),
job__source__owner_org=org['id'])
harvester = CKANHarvester()
result = harvester.import_stage(harvest_object)
assert_equal(result, True)
assert harvest_object.package_id
dataset = model.Package.get(harvest_object.package_id)
assert_equal(dataset.name, mock_ckan.DATASETS[0]['name'])
def test_harvest(self):
results_by_guid = run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
result = results_by_guid['dataset1-id']
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name'])
assert_equal(result['errors'], [])
result = results_by_guid[mock_ckan.DATASETS[1]['id']]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name'])
assert_equal(result['errors'], [])
def test_harvest_twice(self):
run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
results_by_guid = run_harvest(
url='http://localhost:%s/' % mock_ckan.PORT,
harvester=CKANHarvester())
# updated the dataset which has revisions
result = results_by_guid['dataset1']
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'updated')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name'])
assert_equal(result['errors'], [])
# the other dataset is unchanged and not harvested
assert mock_ckan.DATASETS[1]['name'] not in result
def test_harvest_invalid_tag(self):
from nose.plugins.skip import SkipTest; raise SkipTest()
results_by_guid = run_harvest(
url='http://localhost:%s/invalid_tag' % mock_ckan.PORT,
harvester=CKANHarvester())
result = results_by_guid['dataset1-id']
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name'])

View File

@ -0,0 +1,56 @@
from ckanext.harvest.tests.factories import HarvestSourceObj, HarvestJobObj
import ckanext.harvest.model as harvest_model
from ckanext.harvest.logic import NoNewHarvestJobError
from ckanext.harvest import queue
from ckan.plugins import toolkit
def run_harvest(url, harvester, config=''):
'''Runs a harvest and returns the results.
This allows you to test a harvester.
Queues are avoided as they are a pain in tests.
'''
# User creates a harvest source
source = HarvestSourceObj(url=url, config=config)
# User triggers a harvest, which is the creation of a harvest job
job = HarvestJobObj(source=source)
return run_harvest_job(job, harvester)
def run_harvest_job(job, harvester):
# When 'paster harvest run' is called by the regular cron it does 2 things:
# 1. change the job status to Running
job.status = 'Running'
job.save()
# 2. put the job on the gather queue which is consumed by
# queue.gather_callback, which determines the harvester and then calls
# gather_stage. We simply call the gather_stage.
obj_ids = queue.gather_stage(harvester, job)
# The object ids are put onto the fetch queue, consumed by
# queue.fetch_callback which calls queue.fetch_and_import_stages
results_by_guid = {}
for obj_id in obj_ids:
harvest_object = harvest_model.HarvestObject.get(obj_id)
guid = harvest_object.guid
results_by_guid[guid] = {'obj_id': obj_id}
queue.fetch_and_import_stages(harvester, harvest_object)
results_by_guid[guid]['state'] = harvest_object.state
results_by_guid[guid]['report_status'] = harvest_object.report_status
if harvest_object.state == 'COMPLETE' and harvest_object.package_id:
results_by_guid[guid]['dataset'] = \
toolkit.get_action('package_show')(
{}, dict(id=harvest_object.package_id))
results_by_guid[guid]['errors'] = harvest_object.errors
# Do 'harvest_jobs_run' to change the job status to 'finished'
try:
toolkit.get_action('harvest_jobs_run')({'ignore_auth': True}, {})
except NoNewHarvestJobError:
# This is expected
pass
return results_by_guid

View File

@ -1,19 +1,23 @@
import json import json
import copy import copy
import ckan
import paste
import pylons.test
import factories import factories
import unittest import unittest
from ckan.lib.create_test_data import CreateTestData try:
import ckan.new_tests.helpers as helpers from ckan.tests import factories as ckan_factories
from ckan.tests.helpers import _get_test_app, reset_db
except ImportError:
from ckan.new_tests import factories as ckan_factories
from ckan.new_tests.helpers import _get_test_app, reset_db
from ckan import plugins as p from ckan import plugins as p
from ckan.plugins import toolkit from ckan.plugins import toolkit
from ckan import model
from ckanext.harvest.interfaces import IHarvester from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model import ckanext.harvest.model as harvest_model
def call_action_api(app, action, apikey=None, status=200, **kwargs):
def call_action_api(action, apikey=None, status=200, **kwargs):
'''POST an HTTP request to the CKAN API and return the result. '''POST an HTTP request to the CKAN API and return the result.
Any additional keyword arguments that you pass to this function as **kwargs Any additional keyword arguments that you pass to this function as **kwargs
@ -21,7 +25,7 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs):
Usage: Usage:
package_dict = post(app, 'package_create', apikey=apikey, package_dict = call_action_api('package_create', apikey=apikey,
name='my_package') name='my_package')
assert package_dict['name'] == 'my_package' assert package_dict['name'] == 'my_package'
@ -31,13 +35,10 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs):
of the error dict, you have to use the status param otherwise an exception of the error dict, you have to use the status param otherwise an exception
will be raised: will be raised:
error_dict = post(app, 'group_activity_list', status=403, error_dict = call_action_api('group_activity_list', status=403,
id='invalid_id') id='invalid_id')
assert error_dict['message'] == 'Access Denied' assert error_dict['message'] == 'Access Denied'
:param app: the test app to post to
:type app: paste.fixture.TestApp
:param action: the action to post to, e.g. 'package_create' :param action: the action to post to, e.g. 'package_create'
:type action: string :type action: string
@ -62,8 +63,10 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs):
''' '''
params = json.dumps(kwargs) params = json.dumps(kwargs)
app = _get_test_app()
response = app.post('/api/action/{0}'.format(action), params=params, response = app.post('/api/action/{0}'.format(action), params=params,
extra_environ={'Authorization': str(apikey)}, status=status) extra_environ={'Authorization': str(apikey)},
status=status)
if status in (200,): if status in (200,):
assert response.json['success'] is True assert response.json['success'] is True
@ -75,8 +78,11 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs):
class MockHarvesterForActionTests(p.SingletonPlugin): class MockHarvesterForActionTests(p.SingletonPlugin):
p.implements(IHarvester) p.implements(IHarvester)
def info(self): def info(self):
return {'name': 'test-for-action', 'title': 'Test for action', 'description': 'test'} return {'name': 'test-for-action',
'title': 'Test for action',
'description': 'test'}
def validate_config(self, config): def validate_config(self, config):
if not config: if not config:
@ -103,22 +109,32 @@ class MockHarvesterForActionTests(p.SingletonPlugin):
def import_stage(self, harvest_object): def import_stage(self, harvest_object):
return True return True
class HarvestSourceActionBase(object):
class FunctionalTestBaseWithoutClearBetweenTests(object):
''' Functional tests should normally derive from
ckan.lib.helpers.FunctionalTestBase, but these are legacy tests so this
class is a compromise. This version doesn't call reset_db before every
test, because these tests are designed with fixtures created in
setup_class.'''
@classmethod @classmethod
def setup_class(cls): def setup_class(cls):
reset_db()
harvest_model.setup() harvest_model.setup()
CreateTestData.create()
sysadmin_user = ckan.model.User.get('testsysadmin') @classmethod
cls.sysadmin = { def teardown_class(cls):
'id': sysadmin_user.id, pass
'apikey': sysadmin_user.apikey,
'name': sysadmin_user.name,
}
cls.app = paste.fixture.TestApp(pylons.test.pylonsapp) class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests):
@classmethod
def setup_class(cls):
super(HarvestSourceActionBase, cls).setup_class()
harvest_model.setup()
cls.sysadmin = ckan_factories.Sysadmin()
cls.default_source_dict = { cls.default_source_dict = {
"url": "http://test.action.com", "url": "http://test.action.com",
@ -133,24 +149,21 @@ class HarvestSourceActionBase(object):
if not p.plugin_loaded('test_action_harvester'): if not p.plugin_loaded('test_action_harvester'):
p.load('test_action_harvester') p.load('test_action_harvester')
@classmethod @classmethod
def teardown_class(cls): def teardown_class(cls):
ckan.model.repo.rebuild_db() super(HarvestSourceActionBase, cls).teardown_class()
p.unload('test_action_harvester') p.unload('test_action_harvester')
def teardown(self):
pass
def test_invalid_missing_values(self): def test_invalid_missing_values(self):
source_dict = {} source_dict = {}
if 'id' in self.default_source_dict: if 'id' in self.default_source_dict:
source_dict['id'] = self.default_source_dict['id'] source_dict['id'] = self.default_source_dict['id']
result = call_action_api(self.app, self.action, result = call_action_api(self.action,
apikey=self.sysadmin['apikey'], status=409, **source_dict) apikey=self.sysadmin['apikey'], status=409,
**source_dict)
for key in ('name', 'title', 'url', 'source_type'): for key in ('name', 'title', 'url', 'source_type'):
assert result[key] == [u'Missing value'] assert result[key] == [u'Missing value']
@ -160,8 +173,9 @@ class HarvestSourceActionBase(object):
source_dict = copy.deepcopy(self.default_source_dict) source_dict = copy.deepcopy(self.default_source_dict)
source_dict['source_type'] = 'unknown' source_dict['source_type'] = 'unknown'
result = call_action_api(self.app, self.action, result = call_action_api(self.action,
apikey=self.sysadmin['apikey'], status=409, **source_dict) apikey=self.sysadmin['apikey'], status=409,
**source_dict)
assert 'source_type' in result assert 'source_type' in result
assert u'Unknown harvester type' in result['source_type'][0] assert u'Unknown harvester type' in result['source_type'][0]
@ -171,8 +185,9 @@ class HarvestSourceActionBase(object):
source_dict = copy.deepcopy(self.default_source_dict) source_dict = copy.deepcopy(self.default_source_dict)
source_dict['frequency'] = wrong_frequency source_dict['frequency'] = wrong_frequency
result = call_action_api(self.app, self.action, result = call_action_api(self.action,
apikey=self.sysadmin['apikey'], status=409, **source_dict) apikey=self.sysadmin['apikey'], status=409,
**source_dict)
assert 'frequency' in result assert 'frequency' in result
assert u'Frequency {0} not recognised'.format(wrong_frequency) in result['frequency'][0] assert u'Frequency {0} not recognised'.format(wrong_frequency) in result['frequency'][0]
@ -182,16 +197,18 @@ class HarvestSourceActionBase(object):
source_dict = copy.deepcopy(self.default_source_dict) source_dict = copy.deepcopy(self.default_source_dict)
source_dict['config'] = 'not_json' source_dict['config'] = 'not_json'
result = call_action_api(self.app, self.action, result = call_action_api(self.action,
apikey=self.sysadmin['apikey'], status=409, **source_dict) apikey=self.sysadmin['apikey'], status=409,
**source_dict)
assert 'config' in result assert 'config' in result
assert u'Error parsing the configuration options: No JSON object could be decoded' in result['config'][0] assert u'Error parsing the configuration options: No JSON object could be decoded' in result['config'][0]
source_dict['config'] = json.dumps({'custom_option': 'not_a_list'}) source_dict['config'] = json.dumps({'custom_option': 'not_a_list'})
result = call_action_api(self.app, self.action, result = call_action_api(self.action,
apikey=self.sysadmin['apikey'], status=409, **source_dict) apikey=self.sysadmin['apikey'], status=409,
**source_dict)
assert 'config' in result assert 'config' in result
assert u'Error parsing the configuration options: custom_option must be a list' in result['config'][0] assert u'Error parsing the configuration options: custom_option must be a list' in result['config'][0]
@ -202,13 +219,11 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase):
def __init__(self): def __init__(self):
self.action = 'harvest_source_create' self.action = 'harvest_source_create'
def test_create(self): def test_create(self):
source_dict = self.default_source_dict source_dict = self.default_source_dict
result = call_action_api(self.app, 'harvest_source_create', result = call_action_api('harvest_source_create',
apikey=self.sysadmin['apikey'], **source_dict) apikey=self.sysadmin['apikey'], **source_dict)
for key in source_dict.keys(): for key in source_dict.keys():
@ -219,18 +234,18 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase):
assert source.url == source_dict['url'] assert source.url == source_dict['url']
assert source.type == source_dict['source_type'] assert source.type == source_dict['source_type']
# Trying to create a source with the same URL fails # Trying to create a source with the same URL fails
source_dict = copy.deepcopy(self.default_source_dict) source_dict = copy.deepcopy(self.default_source_dict)
source_dict['name'] = 'test-source-action-new' source_dict['name'] = 'test-source-action-new'
result = call_action_api(self.app, 'harvest_source_create', result = call_action_api('harvest_source_create',
apikey=self.sysadmin['apikey'], status=409, **source_dict) apikey=self.sysadmin['apikey'], status=409,
**source_dict)
assert 'url' in result assert 'url' in result
assert u'There already is a Harvest Source for this URL' in result['url'][0] assert u'There already is a Harvest Source for this URL' in result['url'][0]
class TestHarvestSourceActionUpdate(HarvestSourceActionBase): class TestHarvestSourceActionUpdate(HarvestSourceActionBase):
@classmethod @classmethod
@ -242,7 +257,7 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase):
# Create a source to udpate # Create a source to udpate
source_dict = cls.default_source_dict source_dict = cls.default_source_dict
result = call_action_api(cls.app, 'harvest_source_create', result = call_action_api('harvest_source_create',
apikey=cls.sysadmin['apikey'], **source_dict) apikey=cls.sysadmin['apikey'], **source_dict)
cls.default_source_dict['id'] = result['id'] cls.default_source_dict['id'] = result['id']
@ -260,7 +275,7 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase):
"config": json.dumps({"custom_option": ["c", "d"]}) "config": json.dumps({"custom_option": ["c", "d"]})
}) })
result = call_action_api(self.app, 'harvest_source_update', result = call_action_api('harvest_source_update',
apikey=self.sysadmin['apikey'], **source_dict) apikey=self.sysadmin['apikey'], **source_dict)
for key in source_dict.keys(): for key in source_dict.keys():
@ -271,22 +286,19 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase):
assert source.url == source_dict['url'] assert source.url == source_dict['url']
assert source.type == source_dict['source_type'] assert source.type == source_dict['source_type']
class TestHarvestObject(unittest.TestCase): class TestHarvestObject(unittest.TestCase):
@classmethod @classmethod
def setup_class(cls): def setup_class(cls):
reset_db()
harvest_model.setup() harvest_model.setup()
@classmethod
def teardown_class(cls):
ckan.model.repo.rebuild_db()
def test_create(self): def test_create(self):
job = factories.HarvestJobFactory() job = factories.HarvestJobObj()
job.save()
context = { context = {
'model' : ckan.model, 'model': model,
'session': ckan.model.Session, 'session': model.Session,
'ignore_auth': True, 'ignore_auth': True,
} }
data_dict = { data_dict = {
@ -303,14 +315,12 @@ class TestHarvestObject(unittest.TestCase):
assert created_object.guid == harvest_object['guid'] == data_dict['guid'] assert created_object.guid == harvest_object['guid'] == data_dict['guid']
def test_create_bad_parameters(self): def test_create_bad_parameters(self):
source_a = factories.HarvestSourceFactory() source_a = factories.HarvestSourceObj()
source_a.save() job = factories.HarvestJobObj()
job = factories.HarvestJobFactory()
job.save()
context = { context = {
'model' : ckan.model, 'model': model,
'session': ckan.model.Session, 'session': model.Session,
'ignore_auth': True, 'ignore_auth': True,
} }
data_dict = { data_dict = {
@ -319,10 +329,10 @@ class TestHarvestObject(unittest.TestCase):
'extras': 1 'extras': 1
} }
harvest_object_create = toolkit.get_action('harvest_object_create') harvest_object_create = toolkit.get_action('harvest_object_create')
self.assertRaises(ckan.logic.ValidationError, harvest_object_create, self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict) context, data_dict)
data_dict['extras'] = {'test': 1} data_dict['extras'] = {'test': 1}
self.assertRaises(ckan.logic.ValidationError, harvest_object_create, self.assertRaises(toolkit.ValidationError, harvest_object_create,
context, data_dict) context, data_dict)

View File

@ -1,3 +1,7 @@
try:
from ckan.tests.helpers import reset_db
except ImportError:
from ckan.new_tests.helpers import reset_db
import ckanext.harvest.model as harvest_model import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from ckanext.harvest.interfaces import IHarvester from ckanext.harvest.interfaces import IHarvester
@ -82,13 +86,9 @@ class TestHarvester(SingletonPlugin):
class TestHarvestQueue(object): class TestHarvestQueue(object):
@classmethod @classmethod
def setup_class(cls): def setup_class(cls):
reset_db()
harvest_model.setup() harvest_model.setup()
@classmethod
def teardown_class(cls):
model.repo.rebuild_db()
def test_01_basic_harvester(self): def test_01_basic_harvester(self):
### make sure queues/exchanges are created first and are empty ### make sure queues/exchanges are created first and are empty