From f70c16bce7e1be21d488e5d4dbc9550a28c542e2 Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 21 Oct 2015 16:26:57 +0000 Subject: [PATCH] Add framework for testing harvesters. Modernize existing tests. --- ckanext/harvest/logic/action/get.py | 12 + ckanext/harvest/model/__init__.py | 76 +-- ckanext/harvest/queue.py | 39 +- ckanext/harvest/tests/factories.py | 96 +++- ckanext/harvest/tests/harvesters/__init__.py | 0 ckanext/harvest/tests/harvesters/mock_ckan.py | 458 ++++++++++++++++++ .../tests/harvesters/test_ckanharvester.py | 118 +++++ ckanext/harvest/tests/lib.py | 56 +++ ckanext/harvest/tests/test_action.py | 202 ++++---- ckanext/harvest/tests/test_queue.py | 10 +- 10 files changed, 908 insertions(+), 159 deletions(-) create mode 100644 ckanext/harvest/tests/harvesters/__init__.py create mode 100644 ckanext/harvest/tests/harvesters/mock_ckan.py create mode 100644 ckanext/harvest/tests/harvesters/test_ckanharvester.py create mode 100644 ckanext/harvest/tests/lib.py diff --git a/ckanext/harvest/logic/action/get.py b/ckanext/harvest/logic/action/get.py index 8282ff5..2ddcdd9 100644 --- a/ckanext/harvest/logic/action/get.py +++ b/ckanext/harvest/logic/action/get.py @@ -30,10 +30,22 @@ def harvest_source_show(context,data_dict): :param id: the id or name of the harvest source :type id: string + :param url: url of the harvest source (as an alternative to the id) + :type url: string + :returns: harvest source metadata :rtype: dictionary ''' + model = context.get('model') + # Find the source by URL + if data_dict.get('url') and not data_dict.get('id'): + source = model.Session.query(harvest_model.HarvestSource) \ + .filter_by(url=data_dict.get('url')) \ + .first() + if not source: + raise NotFound + data_dict['id'] = source.id source_dict = logic.get_action('package_show')(context, data_dict) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 1d3a05a..39317fc 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -45,43 +45,47 @@ def setup(): define_harvester_tables() log.debug('Harvest tables defined in memory') - if model.package_table.exists(): - if not harvest_source_table.exists(): - - # Create each table individually rather than - # using metadata.create_all() - harvest_source_table.create() - harvest_job_table.create() - harvest_object_table.create() - harvest_gather_error_table.create() - harvest_object_error_table.create() - harvest_object_extra_table.create() - - log.debug('Harvest tables created') - else: - from ckan.model.meta import engine - log.debug('Harvest tables already exist') - # Check if existing tables need to be updated - inspector = Inspector.from_engine(engine) - columns = inspector.get_columns('harvest_source') - if not 'title' in [column['name'] for column in columns]: - log.debug('Harvest tables need to be updated') - migrate_v2() - if not 'frequency' in [column['name'] for column in columns]: - log.debug('Harvest tables need to be updated') - migrate_v3() - - # Check if this instance has harvest source datasets - source_ids = Session.query(HarvestSource.id).filter_by(active=True).all() - source_package_ids = Session.query(model.Package.id).filter_by(type=u'harvest', state='active').all() - sources_to_migrate = set(source_ids) - set(source_package_ids) - if sources_to_migrate: - log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate)) - sources_to_migrate = [s[0] for s in sources_to_migrate] - migrate_v3_create_datasets(sources_to_migrate) - - else: + if not model.package_table.exists(): + print 'harvest model setup: DEFER' log.debug('Harvest table creation deferred') + return + + if not harvest_source_table.exists(): + + print 'harvest model setup: CREATE' + # Create each table individually rather than + # using metadata.create_all() + harvest_source_table.create() + harvest_job_table.create() + harvest_object_table.create() + harvest_gather_error_table.create() + harvest_object_error_table.create() + harvest_object_extra_table.create() + + log.debug('Harvest tables created') + else: + from ckan.model.meta import engine + print 'harvest model setup: NOTHING TO DO' + log.debug('Harvest tables already exist') + # Check if existing tables need to be updated + inspector = Inspector.from_engine(engine) + columns = inspector.get_columns('harvest_source') + column_names = [column['name'] for column in columns] + if not 'title' in column_names: + log.debug('Harvest tables need to be updated') + migrate_v2() + if not 'frequency' in column_names: + log.debug('Harvest tables need to be updated') + migrate_v3() + + # Check if this instance has harvest source datasets + source_ids = Session.query(HarvestSource.id).filter_by(active=True).all() + source_package_ids = Session.query(model.Package.id).filter_by(type=u'harvest', state='active').all() + sources_to_migrate = set(source_ids) - set(source_package_ids) + if sources_to_migrate: + log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate)) + sources_to_migrate = [s[0] for s in sources_to_migrate] + migrate_v3_create_datasets(sources_to_migrate) class HarvestError(Exception): diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index bb1d63c..8296efb 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -223,23 +223,12 @@ def gather_callback(channel, method, header, body): for harvester in PluginImplementations(IHarvester): if harvester.info()['name'] == job.source.type: harvester_found = True - # Get a list of harvest object ids from the plugin - job.gather_started = datetime.datetime.utcnow() try: - harvest_object_ids = harvester.gather_stage(job) + harvest_object_ids = gather_stage(harvester, job) except (Exception, KeyboardInterrupt): channel.basic_ack(method.delivery_tag) - harvest_objects = model.Session.query(HarvestObject).filter_by( - harvest_job_id=job.id - ) - for harvest_object in harvest_objects: - model.Session.delete(harvest_object) - model.Session.commit() raise - finally: - job.gather_finished = datetime.datetime.utcnow() - job.save() if not isinstance(harvest_object_ids, list): log.error('Gather stage failed') @@ -271,6 +260,31 @@ def gather_callback(channel, method, header, body): channel.basic_ack(method.delivery_tag) +def gather_stage(harvester, job): + '''Calls the harvester's gather_stage, returning harvest object ids, with + some error handling. + + This is split off from gather_callback so that tests can call it without + dealing with queue stuff. + ''' + job.gather_started = datetime.datetime.utcnow() + + try: + harvest_object_ids = harvester.gather_stage(job) + except (Exception, KeyboardInterrupt): + harvest_objects = model.Session.query(HarvestObject).filter_by( + harvest_job_id=job.id + ) + for harvest_object in harvest_objects: + model.Session.delete(harvest_object) + model.Session.commit() + raise + finally: + job.gather_finished = datetime.datetime.utcnow() + job.save() + return harvest_object_ids + + def fetch_callback(channel, method, header, body): try: id = json.loads(body)['harvest_object_id'] @@ -280,7 +294,6 @@ def fetch_callback(channel, method, header, body): channel.basic_ack(method.delivery_tag) return False - obj = HarvestObject.get(id) if not obj: log.error('Harvest object does not exist: %s' % id) diff --git a/ckanext/harvest/tests/factories.py b/ckanext/harvest/tests/factories.py index 8910519..37e0b7f 100644 --- a/ckanext/harvest/tests/factories.py +++ b/ckanext/harvest/tests/factories.py @@ -1,14 +1,92 @@ import factory -from ckanext.harvest.model import HarvestSource, HarvestJob +import ckanext.harvest.model as harvest_model +try: + from ckan.tests.factories import _get_action_user_name +except ImportError: + from ckan.new_tests.factories import _get_action_user_name +from ckan.plugins import toolkit -class HarvestSourceFactory(factory.Factory): - FACTORY_FOR = HarvestSource - url = "http://harvest.test.com" - type = "test-harvest-source" +class HarvestSource(factory.Factory): + FACTORY_FOR = harvest_model.HarvestSource + _return_type = 'dict' -class HarvestJobFactory(factory.Factory): - FACTORY_FOR = HarvestJob + name = factory.Sequence(lambda n: 'test_source_{n}'.format(n=n)) + title = factory.Sequence(lambda n: 'test title {n}'.format(n=n)) + url = factory.Sequence(lambda n: 'http://{n}.test.com'.format(n=n)) + source_type = 'test' # defined in test_queue.py + id = '{0}_id'.format(name).lower() - status = "New" - source = factory.SubFactory(HarvestSourceFactory) + @classmethod + def _create(cls, target_class, *args, **kwargs): + if args: + assert False, "Positional args aren't supported, use keyword args." + context = {'user': _get_action_user_name(kwargs)} + # If there is an existing source for this URL, and we can't create + # another source with that URL, just return the original one. + try: + source_dict = toolkit.get_action('harvest_source_show')( + context, dict(url=kwargs['url'])) + except toolkit.ObjectNotFound: + source_dict = toolkit.get_action('harvest_source_create')( + context, kwargs) + if cls._return_type == 'dict': + return source_dict + else: + return cls.FACTORY_FOR.get(source_dict['id']) + + +class HarvestSourceObj(HarvestSource): + _return_type = 'obj' + + +class HarvestJob(factory.Factory): + FACTORY_FOR = harvest_model.HarvestJob + _return_type = 'dict' + + source = factory.SubFactory(HarvestSourceObj) + + @classmethod + def _create(cls, target_class, *args, **kwargs): + if args: + assert False, "Positional args aren't supported, use keyword args." + context = {'user': _get_action_user_name(kwargs)} + if 'source_id' not in kwargs: + kwargs['source_id'] = kwargs['source'].id + job_dict = toolkit.get_action('harvest_job_create')( + context, kwargs) + if cls._return_type == 'dict': + return job_dict + else: + return cls.FACTORY_FOR.get(job_dict['id']) + + +class HarvestJobObj(HarvestJob): + _return_type = 'obj' + + +class HarvestObject(factory.Factory): + FACTORY_FOR = harvest_model.HarvestObject + _return_type = 'dict' + + #source = factory.SubFactory(HarvestSourceObj) + job = factory.SubFactory(HarvestJobObj) + + @classmethod + def _create(cls, target_class, *args, **kwargs): + if args: + assert False, "Positional args aren't supported, use keyword args." + context = {'user': _get_action_user_name(kwargs)} + if 'job_id' not in kwargs: + kwargs['job_id'] = kwargs['job'].id + kwargs['source_id'] = kwargs['job'].source.id + job_dict = toolkit.get_action('harvest_object_create')( + context, kwargs) + if cls._return_type == 'dict': + return job_dict + else: + return cls.FACTORY_FOR.get(job_dict['id']) + + +class HarvestObjectObj(HarvestObject): + _return_type = 'obj' diff --git a/ckanext/harvest/tests/harvesters/__init__.py b/ckanext/harvest/tests/harvesters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ckanext/harvest/tests/harvesters/mock_ckan.py b/ckanext/harvest/tests/harvesters/mock_ckan.py new file mode 100644 index 0000000..b384eb8 --- /dev/null +++ b/ckanext/harvest/tests/harvesters/mock_ckan.py @@ -0,0 +1,458 @@ +import json +import re +import copy + +import SimpleHTTPServer +import SocketServer +from threading import Thread + +PORT = 8998 + + +class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): + def do_GET(self): + # test name is the first bit of the URL and makes CKAN behave + # differently in some way. + # Its value is recorded and then removed from the path + self.test_name = None + test_name_match = re.match('^/([^/]+)/', self.path) + if test_name_match: + self.test_name = test_name_match.groups()[0] + if self.test_name == 'api': + self.test_name = None + else: + self.path = re.sub('^/([^/]+)/', '/', self.path) + + # The API version is recorded and then removed from the path + api_version = None + version_match = re.match('^/api/(\d)', self.path) + if version_match: + api_version = int(version_match.groups()[0]) + self.path = re.sub('^/api/(\d)/', '/api/', self.path) + + if self.path == '/api/rest/package': + if api_version == 2: + dataset_refs = [d['id'] for d in DATASETS] + else: + dataset_refs = [d['name'] for d in DATASETS] + return self.respond_json(dataset_refs) + if self.path.startswith('/api/rest/package/'): + dataset_ref = self.path.split('/')[-1] + dataset = self.get_dataset(dataset_ref) + if dataset: + return self.respond_json( + convert_dataset_to_restful_form(dataset)) + if self.path.startswith('/api/action/package_show'): + params = self.get_url_params() + dataset_ref = params['id'] + dataset = self.get_dataset(dataset_ref) + if dataset: + return self.respond_json(dataset) + if self.path.startswith('/api/search/dataset'): + params = self.get_url_params() + if params.keys() == ['organization']: + org = self.get_org(params['organization']) + dataset_ids = [d['id'] for d in DATASETS + if d['owner_org'] == org['id']] + return self.respond_json({'count': len(dataset_ids), + 'results': dataset_ids}) + else: + return self.respond( + 'Not implemented search params %s' % params, status=400) + if self.path.startswith('/api/search/revision'): + revision_ids = [r['id'] for r in REVISIONS] + return self.respond_json(revision_ids) + if self.path.startswith('/api/rest/revision/'): + revision_ref = self.path.split('/')[-1] + for rev in REVISIONS: + if rev['id'] == revision_ref: + return self.respond_json(rev) + self.respond('Cannot find revision', status=404) + + # if we wanted to server a file from disk, then we'd call this: + #return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self) + + self.respond('Mock CKAN doesnt recognize that call', status=400) + + def get_dataset(self, dataset_ref): + for dataset in DATASETS: + if dataset['name'] == dataset_ref or \ + dataset['id'] == dataset_ref: + if self.test_name == 'invalid_tag': + dataset['tags'] = INVALID_TAGS + return dataset + + def get_org(self, org_ref): + for org in ORGS: + if org['name'] == org_ref or \ + org['id'] == org_ref: + return org + + def get_url_params(self): + params = self.path.split('?')[-1].split('&') + return dict([param.split('=') for param in params]) + + def respond_action(self, result_dict, status=200): + response_dict = {'result': result_dict} + return self.respond_json(response_dict, status=status) + + def respond_json(self, content_dict, status=200): + return self.respond(json.dumps(content_dict), status=status, + content_type='application/json') + + def respond(self, content, status=200, content_type='application/json'): + self.send_response(status) + self.send_header('Content-Type', content_type) + self.end_headers() + self.wfile.write(content) + self.wfile.close() + + +def serve(port=PORT): + '''Runs a CKAN-alike app (over HTTP) that is used for harvesting tests''' + + # Choose the directory to serve files from + #os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)), + # 'mock_ckan_files')) + + class TestServer(SocketServer.TCPServer): + allow_reuse_address = True + + httpd = TestServer(("", PORT), MockCkanHandler) + + print 'Serving test HTTP server at port', PORT + + httpd_thread = Thread(target=httpd.serve_forever) + httpd_thread.setDaemon(True) + httpd_thread.start() + + +def convert_dataset_to_restful_form(dataset): + dataset = copy.deepcopy(dataset) + dataset['extras'] = dict([(e['key'], e['value']) for e in dataset['extras']]) + dataset['tags'] = [t['name'] for t in dataset.get('tags', [])] + return dataset + + +# Datasets are in the package_show form, rather than the RESTful form +DATASETS = [ + {'id': 'dataset1-id', + 'name': 'dataset1', + 'title': 'Test Dataset1', + 'owner_org': 'org1-id', + 'extras': []}, + { + "id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "name": "cabinet-office-energy-use", + "private": False, + "maintainer_email": None, + "revision_timestamp": "2010-11-23T22:34:55.089925", + "organization": + { + + "description": "The Cabinet Office supports the Prime Minister and Deputy Prime Minister, and ensure the effective running of government. We are also the corporate headquarters for government, in partnership with HM Treasury, and we take the lead in certain critical policy areas.\r\nCO is a ministerial department, supported by 18 agencies and public bodies\r\n\r\nYou can find out more at https://www.gov.uk/government/organisations/cabinet-office", + "created": "2012-06-27T14:48:40.244951", + "title": "Cabinet Office", + "name": "cabinet-office", + "revision_timestamp": "2013-04-02T14:27:23.086886", + "is_organization": True, + "state": "active", + "image_url": "", + "revision_id": "4be8825d-d3f4-4fb2-b80b-43e36f574c05", + "type": "organization", + "id": "aa1e068a-23da-4563-b9c2-2cad272b663e", + "approval_status": "pending" + + }, + "update_frequency": "other", + "metadata_created": "2010-08-02T09:19:47.600853", + "last_major_modification": "2010-08-02T09:19:47.600853", + "metadata_modified": "2014-05-09T22:00:01.486366", + "temporal_granularity": "", + "author_email": None, + "geographic_granularity": "point", + "geographic_coverage": [ ], + "state": "active", + "version": None, + "temporal_coverage-to": "", + "license_id": "uk-ogl", + "type": "dataset", + "published_via": "", + "resources": + [ + { + "content_length": "69837", + "cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv", + "hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3", + "description": "70 Whitehall energy data", + "cache_last_updated": "2013-06-19T00:59:42.762642", + "url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv", + "openness_score_failure_count": "0", + "format": "CSV", + "cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv", + "tracking_summary": + { + "total": 0, + "recent": 0 + }, + "last_modified": "2014-05-09T23:00:01.435211", + "mimetype": "text/csv", + "content_type": "text/csv", + "openness_score": "3", + "openness_score_reason": "open and standardized format", + "position": 0, + "revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4", + "id": "f156019d-ea88-46a6-8fa3-3d12582e2161", + "size": 299107 + } + ], + "num_resources": 1, + "tags": + [ + { + + "vocabulary_id": None, + "display_name": "consumption", + "name": "consumption", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8" + + }, + { + + "vocabulary_id": None, + "display_name": "energy", + "name": "energy", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "id": "9f2ae723-602f-4290-80c4-6637ad617a45" + + } + ], + "precision": "", + "tracking_summary": + { + "total": 0, + "recent": 0 + }, + "taxonomy_url": "", + "groups": [], + "creator_user_id": None, + "national_statistic": "no", + "relationships_as_subject": [], + "num_tags": 8, + "update_frequency-other": "Real-time", + "isopen": True, + "url": "http://www.carbonculture.net/orgs/cabinet-office/70-whitehall/", + "notes": "Cabinet Office head office energy use updated from on-site meters showing use, cost and carbon impact.", + "owner_org": "aa1e068a-23da-4563-b9c2-2cad272b663e", + "theme-secondary": + [ + "Environment" + ], + "extras": + [ + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "categories", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "6813d71b-785b-4f56-b296-1b2acb34eed6" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "2010-07-30", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "date_released", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "515f638b-e2cf-40a6-a8a7-cbc8001269e3" + }, + { + + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "date_updated", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "bff63465-4f96-44e7-bb87-6e66fff5e596" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "000000: ", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "geographic_coverage", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "414bcd35-b628-4218-99e2-639615183df8" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "point", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "geographic_granularity", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "c7b460dd-c61f-4cd2-90c2-eceb6c91fe9b" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "no", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "national_statistic", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "9f04b202-3646-49be-b69e-7fa997399ff3" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "{\"status\": \"final\", \"source\": \"Automatically awarded by ODI\", \"certification_type\": \"automatically awarded\", \"level\": \"raw\", \"title\": \"Cabinet Office 70 Whitehall energy use\", \"created_at\": \"2014-10-28T12:25:57Z\", \"jurisdiction\": \"GB\", \"certificate_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922\", \"badge_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922/badge.png\", \"cert_title\": \"Basic Level Certificate\"}", + "revision_timestamp": "2014-11-12T02:52:35.048060", + "state": "active", + "key": "odi-certificate", + "revision_id": "eae9763b-e258-4d76-9ec2-7f5baf655394", + "id": "373a3cbb-d9c0-45a6-9a78-b95c86398766" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "temporal_coverage-from", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "39f72eed-6f76-4733-b636-7541cee3404f" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "temporal_coverage-to", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "818e2c8f-fee0-49da-8bea-ea3c9401ece5" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "temporal_granularity", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "f868b950-d3ce-4fbe-88ca-5cbc4b672320" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "Towns & Cities", + "revision_timestamp": "2015-03-16T18:10:08.802815", + "state": "active", + "key": "theme-primary", + "revision_id": "fc2b6630-84f8-4c88-8ac7-0ca275b2bc97", + "id": "bdcf00fd-3248-4c2f-9cf8-b90706c88e8d" + }, + { + + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "[\"Environment\"]", + "revision_timestamp": "2015-04-08T20:57:04.895214", + "state": "active", + "key": "theme-secondary", + "revision_id": "c2c48530-ff82-4af1-9373-cdc64d5bc83c", + "id": "417482c5-a9c0-4430-8c4e-0c76e59fe44f" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "Real-time", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "update_frequency", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "e8ad4837-514e-4446-81a2-ffacfa7cf683" + } + ], + "license_url": "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/", + "individual_resources": + [ + + { + + "content_length": "69837", + "cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv", + "hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3", + "description": "70 Whitehall energy data", + "cache_last_updated": "2013-06-19T00:59:42.762642", + "url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv", + "openness_score_failure_count": "0", + "format": "CSV", + "cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv", + "tracking_summary": + + { + "total": 0, + "recent": 0 + }, + "last_modified": "2014-05-09T23:00:01.435211", + "mimetype": "text/csv", + "content_type": "text/csv", + "openness_score": "3", + "openness_score_reason": "open and standardized format", + "position": 0, + "revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4", + "id": "f156019d-ea88-46a6-8fa3-3d12582e2161", + "size": 299107 + } + + ], + "title": "Cabinet Office 70 Whitehall energy use", + "revision_id": "3bd6ced3-35b2-4b20-94e2-c596e24bc375", + "date_released": "30/7/2010", + "theme-primary": "Towns & Cities" +} +] + +INVALID_TAGS = [ + { + "vocabulary_id": None, + "display_name": "consumption%^&", + "name": "consumption%^&", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8" + }, + ] + +ORGS = [ + {'id': 'org1-id', + 'name': 'org1'}, + {'id': 'aa1e068a-23da-4563-b9c2-2cad272b663e', + 'name': 'cabinet-office'} +] + +REVISIONS = [ + { + "id": "23daf2eb-d7ec-4d86-a844-3924acd311ea", + "timestamp": "2015-10-21T09:50:08.160045", + "message": "REST API: Update object dataset1", + "author": "ross", + "approved_timestamp": None, + "packages": + [ + "dataset1" + ], + "groups": [ ] + }, + { + "id": "8254a293-10db-4af2-9dfa-6a1f06ee899c", + "timestamp": "2015-10-21T09:46:21.198021", + "message": "REST API: Update object dataset1", + "author": "ross", + "approved_timestamp": None, + "packages": + [ + "dataset1" + ], + "groups": [ ] + }] diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py new file mode 100644 index 0000000..eb88505 --- /dev/null +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -0,0 +1,118 @@ +from nose.tools import assert_equal +import json + +try: + from ckan.tests.helpers import reset_db + from ckan.tests.factories import Organization +except ImportError: + from ckan.new_tests.helpers import reset_db + from ckan.new_tests.factories import Organization +from ckan import model + +from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj, + HarvestObjectObj) +from ckanext.harvest.tests.lib import run_harvest, run_harvest_job +import ckanext.harvest.model as harvest_model +from ckanext.harvest.harvesters.ckanharvester import CKANHarvester + +import mock_ckan + +# Start CKAN-alike server we can test harvesting against it +mock_ckan.serve() + + +class TestCkanHarvester(object): + @classmethod + def setup(cls): + reset_db() + harvest_model.setup() + + def test_gather_normal(self): + source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT) + job = HarvestJobObj(source=source) + + harvester = CKANHarvester() + obj_ids = harvester.gather_stage(job) + + assert_equal(type(obj_ids), list) + assert_equal(len(obj_ids), len(mock_ckan.DATASETS)) + harvest_object = harvest_model.HarvestObject.get(obj_ids[0]) + assert_equal(harvest_object.guid, mock_ckan.DATASETS[0]['id']) + + def test_fetch_normal(self): + source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT) + job = HarvestJobObj(source=source) + harvest_object = HarvestObjectObj(guid=mock_ckan.DATASETS[0]['id'], + job=job) + + harvester = CKANHarvester() + result = harvester.fetch_stage(harvest_object) + + assert_equal(result, True) + assert_equal( + harvest_object.content, + json.dumps( + mock_ckan.convert_dataset_to_restful_form( + mock_ckan.DATASETS[0]))) + + def test_import_normal(self): + org = Organization() + harvest_object = HarvestObjectObj( + guid=mock_ckan.DATASETS[0]['id'], + content=json.dumps(mock_ckan.convert_dataset_to_restful_form( + mock_ckan.DATASETS[0])), + job__source__owner_org=org['id']) + + harvester = CKANHarvester() + result = harvester.import_stage(harvest_object) + + assert_equal(result, True) + assert harvest_object.package_id + dataset = model.Package.get(harvest_object.package_id) + assert_equal(dataset.name, mock_ckan.DATASETS[0]['name']) + + def test_harvest(self): + results_by_guid = run_harvest( + url='http://localhost:%s/' % mock_ckan.PORT, + harvester=CKANHarvester()) + + result = results_by_guid['dataset1-id'] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'added') + assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) + assert_equal(result['errors'], []) + + result = results_by_guid[mock_ckan.DATASETS[1]['id']] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'added') + assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name']) + assert_equal(result['errors'], []) + + def test_harvest_twice(self): + run_harvest( + url='http://localhost:%s/' % mock_ckan.PORT, + harvester=CKANHarvester()) + results_by_guid = run_harvest( + url='http://localhost:%s/' % mock_ckan.PORT, + harvester=CKANHarvester()) + + # updated the dataset which has revisions + result = results_by_guid['dataset1'] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'updated') + assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) + assert_equal(result['errors'], []) + + # the other dataset is unchanged and not harvested + assert mock_ckan.DATASETS[1]['name'] not in result + + def test_harvest_invalid_tag(self): + from nose.plugins.skip import SkipTest; raise SkipTest() + results_by_guid = run_harvest( + url='http://localhost:%s/invalid_tag' % mock_ckan.PORT, + harvester=CKANHarvester()) + + result = results_by_guid['dataset1-id'] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'added') + assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) diff --git a/ckanext/harvest/tests/lib.py b/ckanext/harvest/tests/lib.py new file mode 100644 index 0000000..ff04f26 --- /dev/null +++ b/ckanext/harvest/tests/lib.py @@ -0,0 +1,56 @@ +from ckanext.harvest.tests.factories import HarvestSourceObj, HarvestJobObj +import ckanext.harvest.model as harvest_model +from ckanext.harvest.logic import NoNewHarvestJobError +from ckanext.harvest import queue +from ckan.plugins import toolkit + + +def run_harvest(url, harvester, config=''): + '''Runs a harvest and returns the results. + This allows you to test a harvester. + Queues are avoided as they are a pain in tests. + ''' + # User creates a harvest source + source = HarvestSourceObj(url=url, config=config) + + # User triggers a harvest, which is the creation of a harvest job + job = HarvestJobObj(source=source) + + return run_harvest_job(job, harvester) + + +def run_harvest_job(job, harvester): + # When 'paster harvest run' is called by the regular cron it does 2 things: + # 1. change the job status to Running + job.status = 'Running' + job.save() + # 2. put the job on the gather queue which is consumed by + # queue.gather_callback, which determines the harvester and then calls + # gather_stage. We simply call the gather_stage. + obj_ids = queue.gather_stage(harvester, job) + + # The object ids are put onto the fetch queue, consumed by + # queue.fetch_callback which calls queue.fetch_and_import_stages + results_by_guid = {} + for obj_id in obj_ids: + harvest_object = harvest_model.HarvestObject.get(obj_id) + guid = harvest_object.guid + results_by_guid[guid] = {'obj_id': obj_id} + + queue.fetch_and_import_stages(harvester, harvest_object) + results_by_guid[guid]['state'] = harvest_object.state + results_by_guid[guid]['report_status'] = harvest_object.report_status + if harvest_object.state == 'COMPLETE' and harvest_object.package_id: + results_by_guid[guid]['dataset'] = \ + toolkit.get_action('package_show')( + {}, dict(id=harvest_object.package_id)) + results_by_guid[guid]['errors'] = harvest_object.errors + + # Do 'harvest_jobs_run' to change the job status to 'finished' + try: + toolkit.get_action('harvest_jobs_run')({'ignore_auth': True}, {}) + except NoNewHarvestJobError: + # This is expected + pass + + return results_by_guid diff --git a/ckanext/harvest/tests/test_action.py b/ckanext/harvest/tests/test_action.py index 8ad8f51..7aed649 100644 --- a/ckanext/harvest/tests/test_action.py +++ b/ckanext/harvest/tests/test_action.py @@ -1,19 +1,23 @@ import json import copy -import ckan -import paste -import pylons.test import factories import unittest -from ckan.lib.create_test_data import CreateTestData -import ckan.new_tests.helpers as helpers +try: + from ckan.tests import factories as ckan_factories + from ckan.tests.helpers import _get_test_app, reset_db +except ImportError: + from ckan.new_tests import factories as ckan_factories + from ckan.new_tests.helpers import _get_test_app, reset_db from ckan import plugins as p from ckan.plugins import toolkit +from ckan import model + from ckanext.harvest.interfaces import IHarvester import ckanext.harvest.model as harvest_model -def call_action_api(app, action, apikey=None, status=200, **kwargs): + +def call_action_api(action, apikey=None, status=200, **kwargs): '''POST an HTTP request to the CKAN API and return the result. Any additional keyword arguments that you pass to this function as **kwargs @@ -21,7 +25,7 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs): Usage: - package_dict = post(app, 'package_create', apikey=apikey, + package_dict = call_action_api('package_create', apikey=apikey, name='my_package') assert package_dict['name'] == 'my_package' @@ -31,13 +35,10 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs): of the error dict, you have to use the status param otherwise an exception will be raised: - error_dict = post(app, 'group_activity_list', status=403, + error_dict = call_action_api('group_activity_list', status=403, id='invalid_id') assert error_dict['message'] == 'Access Denied' - :param app: the test app to post to - :type app: paste.fixture.TestApp - :param action: the action to post to, e.g. 'package_create' :type action: string @@ -62,8 +63,10 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs): ''' params = json.dumps(kwargs) + app = _get_test_app() response = app.post('/api/action/{0}'.format(action), params=params, - extra_environ={'Authorization': str(apikey)}, status=status) + extra_environ={'Authorization': str(apikey)}, + status=status) if status in (200,): assert response.json['success'] is True @@ -75,10 +78,13 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs): class MockHarvesterForActionTests(p.SingletonPlugin): p.implements(IHarvester) - def info(self): - return {'name': 'test-for-action', 'title': 'Test for action', 'description': 'test'} - def validate_config(self,config): + def info(self): + return {'name': 'test-for-action', + 'title': 'Test for action', + 'description': 'test'} + + def validate_config(self, config): if not config: return config @@ -86,10 +92,10 @@ class MockHarvesterForActionTests(p.SingletonPlugin): config_obj = json.loads(config) if 'custom_option' in config_obj: - if not isinstance(config_obj['custom_option'],list): + if not isinstance(config_obj['custom_option'], list): raise ValueError('custom_option must be a list') - except ValueError,e: + except ValueError, e: raise e return config @@ -103,56 +109,63 @@ class MockHarvesterForActionTests(p.SingletonPlugin): def import_stage(self, harvest_object): return True -class HarvestSourceActionBase(object): + +class FunctionalTestBaseWithoutClearBetweenTests(object): + ''' Functional tests should normally derive from + ckan.lib.helpers.FunctionalTestBase, but these are legacy tests so this + class is a compromise. This version doesn't call reset_db before every + test, because these tests are designed with fixtures created in + setup_class.''' @classmethod def setup_class(cls): + reset_db() harvest_model.setup() - CreateTestData.create() - sysadmin_user = ckan.model.User.get('testsysadmin') - cls.sysadmin = { - 'id': sysadmin_user.id, - 'apikey': sysadmin_user.apikey, - 'name': sysadmin_user.name, - } + @classmethod + def teardown_class(cls): + pass - cls.app = paste.fixture.TestApp(pylons.test.pylonsapp) +class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests): - cls.default_source_dict = { - "url": "http://test.action.com", - "name": "test-source-action", - "title": "Test source action", - "notes": "Test source action desc", - "source_type": "test-for-action", - "frequency": "MANUAL", - "config": json.dumps({"custom_option":["a","b"]}) + @classmethod + def setup_class(cls): + super(HarvestSourceActionBase, cls).setup_class() + harvest_model.setup() + + cls.sysadmin = ckan_factories.Sysadmin() + + cls.default_source_dict = { + "url": "http://test.action.com", + "name": "test-source-action", + "title": "Test source action", + "notes": "Test source action desc", + "source_type": "test-for-action", + "frequency": "MANUAL", + "config": json.dumps({"custom_option": ["a", "b"]}) } if not p.plugin_loaded('test_action_harvester'): p.load('test_action_harvester') - @classmethod def teardown_class(cls): - ckan.model.repo.rebuild_db() + super(HarvestSourceActionBase, cls).teardown_class() p.unload('test_action_harvester') - def teardown(self): - pass - def test_invalid_missing_values(self): source_dict = {} if 'id' in self.default_source_dict: source_dict['id'] = self.default_source_dict['id'] - result = call_action_api(self.app, self.action, - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api(self.action, + apikey=self.sysadmin['apikey'], status=409, + **source_dict) - for key in ('name','title','url','source_type'): + for key in ('name', 'title', 'url', 'source_type'): assert result[key] == [u'Missing value'] def test_invalid_unknown_type(self): @@ -160,8 +173,9 @@ class HarvestSourceActionBase(object): source_dict = copy.deepcopy(self.default_source_dict) source_dict['source_type'] = 'unknown' - result = call_action_api(self.app, self.action, - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api(self.action, + apikey=self.sysadmin['apikey'], status=409, + **source_dict) assert 'source_type' in result assert u'Unknown harvester type' in result['source_type'][0] @@ -171,8 +185,9 @@ class HarvestSourceActionBase(object): source_dict = copy.deepcopy(self.default_source_dict) source_dict['frequency'] = wrong_frequency - result = call_action_api(self.app, self.action, - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api(self.action, + apikey=self.sysadmin['apikey'], status=409, + **source_dict) assert 'frequency' in result assert u'Frequency {0} not recognised'.format(wrong_frequency) in result['frequency'][0] @@ -182,16 +197,18 @@ class HarvestSourceActionBase(object): source_dict = copy.deepcopy(self.default_source_dict) source_dict['config'] = 'not_json' - result = call_action_api(self.app, self.action, - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api(self.action, + apikey=self.sysadmin['apikey'], status=409, + **source_dict) assert 'config' in result assert u'Error parsing the configuration options: No JSON object could be decoded' in result['config'][0] source_dict['config'] = json.dumps({'custom_option': 'not_a_list'}) - result = call_action_api(self.app, self.action, - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api(self.action, + apikey=self.sysadmin['apikey'], status=409, + **source_dict) assert 'config' in result assert u'Error parsing the configuration options: custom_option must be a list' in result['config'][0] @@ -202,14 +219,12 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase): def __init__(self): self.action = 'harvest_source_create' - - def test_create(self): source_dict = self.default_source_dict - result = call_action_api(self.app, 'harvest_source_create', - apikey=self.sysadmin['apikey'], **source_dict) + result = call_action_api('harvest_source_create', + apikey=self.sysadmin['apikey'], **source_dict) for key in source_dict.keys(): assert source_dict[key] == result[key] @@ -219,18 +234,18 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase): assert source.url == source_dict['url'] assert source.type == source_dict['source_type'] - # Trying to create a source with the same URL fails - source_dict = copy.deepcopy(self.default_source_dict) source_dict['name'] = 'test-source-action-new' - result = call_action_api(self.app, 'harvest_source_create', - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api('harvest_source_create', + apikey=self.sysadmin['apikey'], status=409, + **source_dict) assert 'url' in result assert u'There already is a Harvest Source for this URL' in result['url'][0] + class TestHarvestSourceActionUpdate(HarvestSourceActionBase): @classmethod @@ -242,8 +257,8 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase): # Create a source to udpate source_dict = cls.default_source_dict - result = call_action_api(cls.app, 'harvest_source_create', - apikey=cls.sysadmin['apikey'], **source_dict) + result = call_action_api('harvest_source_create', + apikey=cls.sysadmin['apikey'], **source_dict) cls.default_source_dict['id'] = result['id'] @@ -251,17 +266,17 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase): source_dict = self.default_source_dict source_dict.update({ - "url": "http://test.action.updated.com", - "name": "test-source-action-updated", - "title": "Test source action updated", - "notes": "Test source action desc updated", - "source_type": "test", - "frequency": "MONTHLY", - "config": json.dumps({"custom_option":["c","d"]}) - }) + "url": "http://test.action.updated.com", + "name": "test-source-action-updated", + "title": "Test source action updated", + "notes": "Test source action desc updated", + "source_type": "test", + "frequency": "MONTHLY", + "config": json.dumps({"custom_option": ["c", "d"]}) + }) - result = call_action_api(self.app, 'harvest_source_update', - apikey=self.sysadmin['apikey'], **source_dict) + result = call_action_api('harvest_source_update', + apikey=self.sysadmin['apikey'], **source_dict) for key in source_dict.keys(): assert source_dict[key] == result[key] @@ -271,29 +286,26 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase): assert source.url == source_dict['url'] assert source.type == source_dict['source_type'] + class TestHarvestObject(unittest.TestCase): @classmethod def setup_class(cls): + reset_db() harvest_model.setup() - @classmethod - def teardown_class(cls): - ckan.model.repo.rebuild_db() - def test_create(self): - job = factories.HarvestJobFactory() - job.save() + job = factories.HarvestJobObj() context = { - 'model' : ckan.model, - 'session': ckan.model.Session, + 'model': model, + 'session': model.Session, 'ignore_auth': True, } data_dict = { - 'guid' : 'guid', - 'content' : 'content', - 'job_id' : job.id, - 'extras' : { 'a key' : 'a value' }, + 'guid': 'guid', + 'content': 'content', + 'job_id': job.id, + 'extras': {'a key': 'a value'}, } harvest_object = toolkit.get_action('harvest_object_create')( context, data_dict) @@ -303,26 +315,24 @@ class TestHarvestObject(unittest.TestCase): assert created_object.guid == harvest_object['guid'] == data_dict['guid'] def test_create_bad_parameters(self): - source_a = factories.HarvestSourceFactory() - source_a.save() - job = factories.HarvestJobFactory() - job.save() + source_a = factories.HarvestSourceObj() + job = factories.HarvestJobObj() context = { - 'model' : ckan.model, - 'session': ckan.model.Session, + 'model': model, + 'session': model.Session, 'ignore_auth': True, } data_dict = { - 'job_id' : job.id, - 'source_id' : source_a.id, - 'extras' : 1 + 'job_id': job.id, + 'source_id': source_a.id, + 'extras': 1 } harvest_object_create = toolkit.get_action('harvest_object_create') - self.assertRaises(ckan.logic.ValidationError, harvest_object_create, - context, data_dict) + self.assertRaises(toolkit.ValidationError, harvest_object_create, + context, data_dict) - data_dict['extras'] = {'test': 1 } + data_dict['extras'] = {'test': 1} - self.assertRaises(ckan.logic.ValidationError, harvest_object_create, - context, data_dict) + self.assertRaises(toolkit.ValidationError, harvest_object_create, + context, data_dict) diff --git a/ckanext/harvest/tests/test_queue.py b/ckanext/harvest/tests/test_queue.py index 2b06926..d5b3888 100644 --- a/ckanext/harvest/tests/test_queue.py +++ b/ckanext/harvest/tests/test_queue.py @@ -1,3 +1,7 @@ +try: + from ckan.tests.helpers import reset_db +except ImportError: + from ckan.new_tests.helpers import reset_db import ckanext.harvest.model as harvest_model from ckanext.harvest.model import HarvestObject, HarvestObjectExtra from ckanext.harvest.interfaces import IHarvester @@ -82,13 +86,9 @@ class TestHarvester(SingletonPlugin): class TestHarvestQueue(object): @classmethod def setup_class(cls): + reset_db() harvest_model.setup() - @classmethod - def teardown_class(cls): - model.repo.rebuild_db() - - def test_01_basic_harvester(self): ### make sure queues/exchanges are created first and are empty