From 2da918c2e4666fdc18991762e3eb1fd7c40766ac Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 22 Jul 2015 10:13:02 +0100 Subject: [PATCH 01/15] Fix migration for old harvests so that ones that errored are correctly marked. Added helpful comments in model. --- ckanext/harvest/model/__init__.py | 18 +++++++++++++++--- ckanext/harvest/queue.py | 7 ++++++- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 1d3a05a..6f8b992 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -196,12 +196,21 @@ def define_harvester_tables(): Column('gather_finished', types.DateTime), Column('finished', types.DateTime), Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')), + # status: New, Running, Finished Column('status', types.UnicodeText, default=u'New', nullable=False), ) - # Was harvested_document + # A harvest_object contains a representation of one dataset during a + # particular harvest harvest_object_table = Table('harvest_object', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), + # The guid is the 'identity' of the dataset, according to the source. + # So if you reharvest it, then the harvester knows which dataset to + # update because of this identity. The identity needs to be unique + # within this CKAN. Column('guid', types.UnicodeText, default=u''), + # When you harvest a dataset multiple times, only the latest + # successfully imported harvest_object should be flagged 'current'. + # The import_stage reads and writes it. Column('current',types.Boolean,default=False), Column('gathered', types.DateTime, default=datetime.datetime.utcnow), Column('fetch_started', types.DateTime), @@ -209,6 +218,7 @@ def define_harvester_tables(): Column('fetch_finished', types.DateTime), Column('import_started', types.DateTime), Column('import_finished', types.DateTime), + # state: WAITING, FETCH, IMPORT, COMPLETE, ERROR Column('state', types.UnicodeText, default=u'WAITING'), Column('metadata_modified_date', types.DateTime), Column('retry_times',types.Integer, default=0), @@ -391,9 +401,11 @@ ALTER TABLE harvest_object_extra ALTER TABLE harvest_object_extra ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id); -UPDATE harvest_object set state = 'COMPLETE'; +UPDATE harvest_object set state = 'COMPLETE' where package_id is not null; +UPDATE harvest_object set state = 'ERROR' where package_id is null; UPDATE harvest_object set retry_times = 0; -UPDATE harvest_object set report_status = 'new'; +UPDATE harvest_object set report_status = 'updated' where package_id is not null; +UPDATE harvest_object set report_status = 'errored' where package_id is null; UPDATE harvest_source set frequency = 'MANUAL'; ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey; diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index b8c4131..52d942f 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -250,7 +250,12 @@ def gather_callback(channel, method, header, body): log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids))) if not harvester_found: - msg = 'No harvester could be found for source type %s' % job.source.type + # This can occur if you: + # * remove a harvester and it still has sources that are then + # refreshed + # * add a new harvester and restart CKAN but not the gather + # queue. + msg = 'System error - No harvester could be found for source type %s' % job.source.type err = HarvestGatherError(message=msg,job=job) err.save() log.error(msg) From a77b62c25a55f3810da608cd75f0b299b18d5275 Mon Sep 17 00:00:00 2001 From: Raphael Stolt Date: Mon, 17 Aug 2015 12:11:45 +0200 Subject: [PATCH 02/15] [#103] Include templates_new directory --- MANIFEST.in | 1 + 1 file changed, 1 insertion(+) diff --git a/MANIFEST.in b/MANIFEST.in index 885c4c4..990b70d 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,3 @@ recursive-include ckanext/harvest/templates * +recursive-include ckanext/harvest/templates_new * recursive-include ckanext/harvest/public * From 1905caa961b9b90f4dfd03301b62609e607f3fbb Mon Sep 17 00:00:00 2001 From: florianm Date: Wed, 19 Aug 2015 10:25:20 +0800 Subject: [PATCH 03/15] upgrade harvest_source_clear to not delete from authz models removed in migration 078 --- ckanext/harvest/logic/action/update.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 62e4c08..ae6f431 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -140,14 +140,22 @@ def harvest_source_clear(context,data_dict): delete from resource_group where package_id in (select id from package where state = 'to_delete'); ''' + # CKAN pre-2.5: authz models were removed in migration 078 + if toolkit.check_ckan_version(max_version='2.4'): + sql += ''' + delete from user_object_role where id not in + (select user_object_role_id from package_role) and context = 'Package'; + delete from package_role where package_id in + (select id from package where state = 'to_delete'); + ''' + + sql += ''' delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); delete from harvest_object where harvest_source_id = '{harvest_source_id}'; delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}'); delete from harvest_job where source_id = '{harvest_source_id}'; - delete from package_role where package_id in (select id from package where state = 'to_delete' ); - delete from user_object_role where id not in (select user_object_role_id from package_role) and context = 'Package'; delete from package_tag_revision where package_id in (select id from package where state = 'to_delete'); delete from member_revision where table_id in (select id from package where state = 'to_delete'); delete from package_extra_revision where package_id in (select id from package where state = 'to_delete'); From a6cdda0a1480c79e42a8ba27a51350e9f4e36e4c Mon Sep 17 00:00:00 2001 From: Florian Mayer Date: Wed, 19 Aug 2015 08:41:42 +0000 Subject: [PATCH 04/15] set max version to 2.4.99 --- ckanext/harvest/logic/action/update.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index ae6f431..7044128 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -141,7 +141,7 @@ def harvest_source_clear(context,data_dict): (select id from package where state = 'to_delete'); ''' # CKAN pre-2.5: authz models were removed in migration 078 - if toolkit.check_ckan_version(max_version='2.4'): + if toolkit.check_ckan_version(max_version='2.4.99'): sql += ''' delete from user_object_role where id not in (select user_object_role_id from package_role) and context = 'Package'; From d9fc9bdfc6ed21f8b90c9b94772749fe08cc1797 Mon Sep 17 00:00:00 2001 From: Raphael Stolt Date: Wed, 19 Aug 2015 14:58:16 +0200 Subject: [PATCH 05/15] [#103] Include fanstatic styles directory --- MANIFEST.in | 1 + 1 file changed, 1 insertion(+) diff --git a/MANIFEST.in b/MANIFEST.in index 990b70d..25e3d8f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,4 @@ recursive-include ckanext/harvest/templates * recursive-include ckanext/harvest/templates_new * +recursive-include ckanext/harvest/fanstatic_library/styles * recursive-include ckanext/harvest/public * From be3e88086a04fe1075944810b781876d8e997eea Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 1 Oct 2015 17:53:03 +0100 Subject: [PATCH 06/15] Generating unique names improved * Harvesters that change the name when the title changes have had a problem when the change is small and a number was unnecessarily appended. e.g. "Trees "->"Trees" meant _gen_new_name("Trees") returned "trees1". Now you can specify the existing value and it will return that if it still holds. * Maximum dataset name length is now adhered to. * To make a name unique, a sequential number is now added, since for users that is more understandable and pleasant. However hex digits are still an option, for those that want to harvest concurrently. --- ckanext/harvest/harvesters/base.py | 101 +++++++++++++++++++++++++---- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 41a5061..1fd0ebc 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -8,7 +8,7 @@ from pylons import config from ckan import plugins as p from ckan import model -from ckan.model import Session, Package +from ckan.model import Session, Package, PACKAGE_NAME_MAX_LENGTH from ckan.logic import ValidationError, NotFound, get_action from ckan.logic.schema import default_create_package_schema @@ -41,21 +41,100 @@ class HarvesterBase(SingletonPlugin): _user_name = None - def _gen_new_name(self, title): + @classmethod + def _gen_new_name(cls, title, existing_name=None, + append_type='number-sequence'): ''' - Creates a URL friendly name from a title + Returns a 'name' for the dataset (URL friendly), based on the title. - If the name already exists, it will add some random characters at the end + If the ideal name is already used, it will append a number to it to + ensure it is unique. + + If generating a new name because the title of the dataset has changed, + specify the existing name, in case the name doesn't need to change + after all. + + :param existing_name: the current name of the dataset - only specify + this if the dataset exists + :type existing_name: string + :param append_type: the type of characters to add to make it unique - + either 'number-sequence' or 'random-hex'. + :type append_type: string ''' - name = munge_title_to_name(title).replace('_', '-') - while '--' in name: - name = name.replace('--', '-') - pkg_obj = Session.query(Package).filter(Package.name == name).first() - if pkg_obj: - return name + str(uuid.uuid4())[:5] + ideal_name = munge_title_to_name(title) + ideal_name = re.sub('-+', '-', ideal_name) # collapse multiple dashes + return cls._ensure_name_is_unique(ideal_name, + existing_name=existing_name, + append_type=append_type) + + @staticmethod + def _ensure_name_is_unique(ideal_name, existing_name=None, + append_type='number-sequence'): + ''' + Returns a dataset name based on the ideal_name, only it will be + guaranteed to be different than all the other datasets, by adding a + number on the end if necessary. + + If generating a new name because the title of the dataset has changed, + specify the existing name, in case the name doesn't need to change + after all. + + The maximum dataset name length is taken account of. + + :param ideal_name: the desired name for the dataset, if its not already + been taken (usually derived by munging the dataset + title) + :type ideal_name: string + :param existing_name: the current name of the dataset - only specify + this if the dataset exists + :type existing_name: string + :param append_type: the type of characters to add to make it unique - + either 'number-sequence' or 'random-hex'. + :type append_type: string + ''' + ideal_name = ideal_name[:PACKAGE_NAME_MAX_LENGTH] + if existing_name == ideal_name: + return ideal_name + if append_type == 'number-sequence': + MAX_NUMBER_APPENDED = 999 + APPEND_MAX_CHARS = len(str(MAX_NUMBER_APPENDED)) + elif append_type == 'random-hex': + APPEND_MAX_CHARS = 5 # 16^5 = 1 million combinations else: - return name + raise NotImplementedError('append_type cannot be %s' % append_type) + # Find out which package names have been taken. Restrict it to names + # derived from the ideal name plus and numbers added + like_q = u'%s%%' % \ + ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS] + name_results = Session.query(Package.name)\ + .filter(Package.name.ilike(like_q))\ + .all() + taken = set([name_result[0] for name_result in name_results]) + if existing_name and existing_name in taken: + taken.remove(existing_name) + if ideal_name not in taken: + # great, the ideal name is available + return ideal_name + elif existing_name and existing_name.startswith(ideal_name): + # the ideal name is not available, but its an existing dataset with + # a name based on the ideal one, so there's no point changing it to + # a different number + return existing_name + elif append_type == 'number-sequence': + # find the next available number + counter = 1 + while counter <= MAX_NUMBER_APPENDED: + candidate_name = \ + ideal_name[:PACKAGE_NAME_MAX_LENGTH-len(str(counter))] + \ + str(counter) + if candidate_name not in taken: + return candidate_name + counter = counter + 1 + return None + elif append_type == 'random-hex': + return ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS] + \ + str(uuid.uuid4())[:APPEND_MAX_CHARS] def _save_gather_error(self, message, job): From 82bdff2f342c43f14fab241b7c39a0468de9ea18 Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 1 Oct 2015 17:59:17 +0100 Subject: [PATCH 07/15] Add tests --- .gitignore | 2 +- ckanext/harvest/tests/harvesters/__init__.py | 0 ckanext/harvest/tests/harvesters/test_base.py | 96 +++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 ckanext/harvest/tests/harvesters/__init__.py create mode 100644 ckanext/harvest/tests/harvesters/test_base.py diff --git a/.gitignore b/.gitignore index b678f55..62ff3fd 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,6 @@ build .DS_Store dist development.ini -*.swp +*.sw? *~ node_modules diff --git a/ckanext/harvest/tests/harvesters/__init__.py b/ckanext/harvest/tests/harvesters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ckanext/harvest/tests/harvesters/test_base.py b/ckanext/harvest/tests/harvesters/test_base.py new file mode 100644 index 0000000..6e1da21 --- /dev/null +++ b/ckanext/harvest/tests/harvesters/test_base.py @@ -0,0 +1,96 @@ +import re + +from nose.tools import assert_equal + +from ckanext.harvest.harvesters.base import HarvesterBase +try: + from ckan.tests import helpers + from ckan.tests import factories +except ImportError: + from ckan.new_tests import helpers + from ckan.new_tests import factories + + +_ensure_name_is_unique = HarvesterBase._ensure_name_is_unique + + +class TestGenNewName(object): + @classmethod + def setup_class(cls): + helpers.reset_db() + + def test_basic(self): + assert_equal(HarvesterBase._gen_new_name('Trees'), 'trees') + + def test_munge(self): + assert_equal( + HarvesterBase._gen_new_name('Trees and branches - survey.'), + 'trees-and-branches-survey') + + +class TestEnsureNameIsUnique(object): + def setup(self): + helpers.reset_db() + + def test_no_existing_datasets(self): + factories.Dataset(name='unrelated') + assert_equal(_ensure_name_is_unique('trees'), 'trees') + + def test_existing_dataset(self): + factories.Dataset(name='trees') + assert_equal(_ensure_name_is_unique('trees'), 'trees1') + + def test_two_existing_datasets(self): + factories.Dataset(name='trees') + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('trees'), 'trees2') + + def test_no_existing_datasets_and_long_name(self): + assert_equal(_ensure_name_is_unique('x'*101), 'x'*100) + + def test_existing_dataset_and_long_name(self): + # because PACKAGE_NAME_MAX_LENGTH = 100 + factories.Dataset(name='x'*100) + assert_equal(_ensure_name_is_unique('x'*101), 'x'*99 + '1') + + def test_update_dataset_with_new_name(self): + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('tree', existing_name='trees1'), + 'tree') + + def test_update_dataset_but_with_same_name(self): + # this can happen if you remove a trailing space from the title - the + # harvester sees the title changed and thinks it should have a new + # name, but clearly it can reuse its existing one + factories.Dataset(name='trees') + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('trees', existing_name='trees'), + 'trees') + + def test_update_dataset_to_available_shorter_name(self): + # this can be handy when if reharvesting, you got duplicates and + # managed to purge one set and through a minor title change you can now + # lose the appended number. users don't like unnecessary numbers. + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('trees', existing_name='trees1'), + 'trees') + + def test_update_dataset_but_doesnt_change_to_other_number(self): + # there's no point changing one number for another though + factories.Dataset(name='trees') + factories.Dataset(name='trees2') + assert_equal(_ensure_name_is_unique('trees', existing_name='trees2'), + 'trees2') + + def test_update_dataset_with_new_name_with_numbers(self): + factories.Dataset(name='trees') + factories.Dataset(name='trees2') + factories.Dataset(name='frogs') + assert_equal(_ensure_name_is_unique('frogs', existing_name='trees2'), + 'frogs1') + + def test_existing_dataset_appending_hex(self): + factories.Dataset(name='trees') + name = _ensure_name_is_unique('trees', append_type='random-hex') + # e.g. 'trees0b53f' + assert re.match('trees[\da-f]{5}', name) From 6360681a8f98ea5ba52a70bc589754c7db52f032 Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 12 Oct 2015 15:57:27 +0100 Subject: [PATCH 08/15] [#105] Fix order of deletes, as agreed with @florianm. --- ckanext/harvest/logic/action/update.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 7044128..9e871d9 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -143,10 +143,10 @@ def harvest_source_clear(context,data_dict): # CKAN pre-2.5: authz models were removed in migration 078 if toolkit.check_ckan_version(max_version='2.4.99'): sql += ''' - delete from user_object_role where id not in - (select user_object_role_id from package_role) and context = 'Package'; - delete from package_role where package_id in + delete from package_role where package_id in (select id from package where state = 'to_delete'); + delete from user_object_role where id not in + (select user_object_role_id from package_role) and context = 'Package'; ''' From 1f81fefcbb8fba983e67d4b65fa620eccb1de0d4 Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 21 Oct 2015 16:11:11 +0000 Subject: [PATCH 09/15] Correct doc error for configuring amqp. --- .gitignore | 2 +- README.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index b678f55..62ff3fd 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,6 @@ build .DS_Store dist development.ini -*.swp +*.sw? *~ node_modules diff --git a/README.rst b/README.rst index f3dadf8..3b1f2d3 100644 --- a/README.rst +++ b/README.rst @@ -29,7 +29,7 @@ Installation On your CKAN configuration file, add:: - ckan.harvest.mq.type = rabbitmq + ckan.harvest.mq.type = amqp 2. Install the extension into your python environment:: From d1f84295f8726d6a83f28585ca76ea0a1487f9a1 Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 21 Oct 2015 16:12:40 +0000 Subject: [PATCH 10/15] purge_queues command now has warning about impact of Redis flushall, plus add some (log) output when you run a purge. --- ckanext/harvest/commands/harvester.py | 2 ++ ckanext/harvest/queue.py | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 7158c8c..03a6bb8 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -45,6 +45,8 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue + WARNING: if using Redis, this command purges any other data you have + in Redis too! harvester [-j] [-o] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, for a certain diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index bb1d63c..7e56a31 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -82,10 +82,13 @@ def purge_queues(): if backend in ('amqp', 'ampq'): channel = connection.channel() channel.queue_purge(queue=get_gather_queue_name()) + log.info('AMQP queue purged: %s', get_gather_queue_name()) channel.queue_purge(queue=get_fetch_queue_name()) + log.info('AMQP queue purged: %s', get_fetch_queue_name()) return if backend == 'redis': connection.flushall() + log.info('Redis flushed') def resubmit_jobs(): if config.get('ckan.harvest.mq.type') != 'redis': @@ -95,7 +98,7 @@ def resubmit_jobs(): for key in harvest_object_pending: date_of_key = datetime.datetime.strptime(redis.get(key), "%Y-%m-%d %H:%M:%S.%f") - if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max + if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max redis.rpush('harvest_object_id', json.dumps({'harvest_object_id': key.split(':')[-1]}) ) From f70c16bce7e1be21d488e5d4dbc9550a28c542e2 Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 21 Oct 2015 16:26:57 +0000 Subject: [PATCH 11/15] Add framework for testing harvesters. Modernize existing tests. --- ckanext/harvest/logic/action/get.py | 12 + ckanext/harvest/model/__init__.py | 76 +-- ckanext/harvest/queue.py | 39 +- ckanext/harvest/tests/factories.py | 96 +++- ckanext/harvest/tests/harvesters/__init__.py | 0 ckanext/harvest/tests/harvesters/mock_ckan.py | 458 ++++++++++++++++++ .../tests/harvesters/test_ckanharvester.py | 118 +++++ ckanext/harvest/tests/lib.py | 56 +++ ckanext/harvest/tests/test_action.py | 202 ++++---- ckanext/harvest/tests/test_queue.py | 10 +- 10 files changed, 908 insertions(+), 159 deletions(-) create mode 100644 ckanext/harvest/tests/harvesters/__init__.py create mode 100644 ckanext/harvest/tests/harvesters/mock_ckan.py create mode 100644 ckanext/harvest/tests/harvesters/test_ckanharvester.py create mode 100644 ckanext/harvest/tests/lib.py diff --git a/ckanext/harvest/logic/action/get.py b/ckanext/harvest/logic/action/get.py index 8282ff5..2ddcdd9 100644 --- a/ckanext/harvest/logic/action/get.py +++ b/ckanext/harvest/logic/action/get.py @@ -30,10 +30,22 @@ def harvest_source_show(context,data_dict): :param id: the id or name of the harvest source :type id: string + :param url: url of the harvest source (as an alternative to the id) + :type url: string + :returns: harvest source metadata :rtype: dictionary ''' + model = context.get('model') + # Find the source by URL + if data_dict.get('url') and not data_dict.get('id'): + source = model.Session.query(harvest_model.HarvestSource) \ + .filter_by(url=data_dict.get('url')) \ + .first() + if not source: + raise NotFound + data_dict['id'] = source.id source_dict = logic.get_action('package_show')(context, data_dict) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 1d3a05a..39317fc 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -45,43 +45,47 @@ def setup(): define_harvester_tables() log.debug('Harvest tables defined in memory') - if model.package_table.exists(): - if not harvest_source_table.exists(): - - # Create each table individually rather than - # using metadata.create_all() - harvest_source_table.create() - harvest_job_table.create() - harvest_object_table.create() - harvest_gather_error_table.create() - harvest_object_error_table.create() - harvest_object_extra_table.create() - - log.debug('Harvest tables created') - else: - from ckan.model.meta import engine - log.debug('Harvest tables already exist') - # Check if existing tables need to be updated - inspector = Inspector.from_engine(engine) - columns = inspector.get_columns('harvest_source') - if not 'title' in [column['name'] for column in columns]: - log.debug('Harvest tables need to be updated') - migrate_v2() - if not 'frequency' in [column['name'] for column in columns]: - log.debug('Harvest tables need to be updated') - migrate_v3() - - # Check if this instance has harvest source datasets - source_ids = Session.query(HarvestSource.id).filter_by(active=True).all() - source_package_ids = Session.query(model.Package.id).filter_by(type=u'harvest', state='active').all() - sources_to_migrate = set(source_ids) - set(source_package_ids) - if sources_to_migrate: - log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate)) - sources_to_migrate = [s[0] for s in sources_to_migrate] - migrate_v3_create_datasets(sources_to_migrate) - - else: + if not model.package_table.exists(): + print 'harvest model setup: DEFER' log.debug('Harvest table creation deferred') + return + + if not harvest_source_table.exists(): + + print 'harvest model setup: CREATE' + # Create each table individually rather than + # using metadata.create_all() + harvest_source_table.create() + harvest_job_table.create() + harvest_object_table.create() + harvest_gather_error_table.create() + harvest_object_error_table.create() + harvest_object_extra_table.create() + + log.debug('Harvest tables created') + else: + from ckan.model.meta import engine + print 'harvest model setup: NOTHING TO DO' + log.debug('Harvest tables already exist') + # Check if existing tables need to be updated + inspector = Inspector.from_engine(engine) + columns = inspector.get_columns('harvest_source') + column_names = [column['name'] for column in columns] + if not 'title' in column_names: + log.debug('Harvest tables need to be updated') + migrate_v2() + if not 'frequency' in column_names: + log.debug('Harvest tables need to be updated') + migrate_v3() + + # Check if this instance has harvest source datasets + source_ids = Session.query(HarvestSource.id).filter_by(active=True).all() + source_package_ids = Session.query(model.Package.id).filter_by(type=u'harvest', state='active').all() + sources_to_migrate = set(source_ids) - set(source_package_ids) + if sources_to_migrate: + log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate)) + sources_to_migrate = [s[0] for s in sources_to_migrate] + migrate_v3_create_datasets(sources_to_migrate) class HarvestError(Exception): diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index bb1d63c..8296efb 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -223,23 +223,12 @@ def gather_callback(channel, method, header, body): for harvester in PluginImplementations(IHarvester): if harvester.info()['name'] == job.source.type: harvester_found = True - # Get a list of harvest object ids from the plugin - job.gather_started = datetime.datetime.utcnow() try: - harvest_object_ids = harvester.gather_stage(job) + harvest_object_ids = gather_stage(harvester, job) except (Exception, KeyboardInterrupt): channel.basic_ack(method.delivery_tag) - harvest_objects = model.Session.query(HarvestObject).filter_by( - harvest_job_id=job.id - ) - for harvest_object in harvest_objects: - model.Session.delete(harvest_object) - model.Session.commit() raise - finally: - job.gather_finished = datetime.datetime.utcnow() - job.save() if not isinstance(harvest_object_ids, list): log.error('Gather stage failed') @@ -271,6 +260,31 @@ def gather_callback(channel, method, header, body): channel.basic_ack(method.delivery_tag) +def gather_stage(harvester, job): + '''Calls the harvester's gather_stage, returning harvest object ids, with + some error handling. + + This is split off from gather_callback so that tests can call it without + dealing with queue stuff. + ''' + job.gather_started = datetime.datetime.utcnow() + + try: + harvest_object_ids = harvester.gather_stage(job) + except (Exception, KeyboardInterrupt): + harvest_objects = model.Session.query(HarvestObject).filter_by( + harvest_job_id=job.id + ) + for harvest_object in harvest_objects: + model.Session.delete(harvest_object) + model.Session.commit() + raise + finally: + job.gather_finished = datetime.datetime.utcnow() + job.save() + return harvest_object_ids + + def fetch_callback(channel, method, header, body): try: id = json.loads(body)['harvest_object_id'] @@ -280,7 +294,6 @@ def fetch_callback(channel, method, header, body): channel.basic_ack(method.delivery_tag) return False - obj = HarvestObject.get(id) if not obj: log.error('Harvest object does not exist: %s' % id) diff --git a/ckanext/harvest/tests/factories.py b/ckanext/harvest/tests/factories.py index 8910519..37e0b7f 100644 --- a/ckanext/harvest/tests/factories.py +++ b/ckanext/harvest/tests/factories.py @@ -1,14 +1,92 @@ import factory -from ckanext.harvest.model import HarvestSource, HarvestJob +import ckanext.harvest.model as harvest_model +try: + from ckan.tests.factories import _get_action_user_name +except ImportError: + from ckan.new_tests.factories import _get_action_user_name +from ckan.plugins import toolkit -class HarvestSourceFactory(factory.Factory): - FACTORY_FOR = HarvestSource - url = "http://harvest.test.com" - type = "test-harvest-source" +class HarvestSource(factory.Factory): + FACTORY_FOR = harvest_model.HarvestSource + _return_type = 'dict' -class HarvestJobFactory(factory.Factory): - FACTORY_FOR = HarvestJob + name = factory.Sequence(lambda n: 'test_source_{n}'.format(n=n)) + title = factory.Sequence(lambda n: 'test title {n}'.format(n=n)) + url = factory.Sequence(lambda n: 'http://{n}.test.com'.format(n=n)) + source_type = 'test' # defined in test_queue.py + id = '{0}_id'.format(name).lower() - status = "New" - source = factory.SubFactory(HarvestSourceFactory) + @classmethod + def _create(cls, target_class, *args, **kwargs): + if args: + assert False, "Positional args aren't supported, use keyword args." + context = {'user': _get_action_user_name(kwargs)} + # If there is an existing source for this URL, and we can't create + # another source with that URL, just return the original one. + try: + source_dict = toolkit.get_action('harvest_source_show')( + context, dict(url=kwargs['url'])) + except toolkit.ObjectNotFound: + source_dict = toolkit.get_action('harvest_source_create')( + context, kwargs) + if cls._return_type == 'dict': + return source_dict + else: + return cls.FACTORY_FOR.get(source_dict['id']) + + +class HarvestSourceObj(HarvestSource): + _return_type = 'obj' + + +class HarvestJob(factory.Factory): + FACTORY_FOR = harvest_model.HarvestJob + _return_type = 'dict' + + source = factory.SubFactory(HarvestSourceObj) + + @classmethod + def _create(cls, target_class, *args, **kwargs): + if args: + assert False, "Positional args aren't supported, use keyword args." + context = {'user': _get_action_user_name(kwargs)} + if 'source_id' not in kwargs: + kwargs['source_id'] = kwargs['source'].id + job_dict = toolkit.get_action('harvest_job_create')( + context, kwargs) + if cls._return_type == 'dict': + return job_dict + else: + return cls.FACTORY_FOR.get(job_dict['id']) + + +class HarvestJobObj(HarvestJob): + _return_type = 'obj' + + +class HarvestObject(factory.Factory): + FACTORY_FOR = harvest_model.HarvestObject + _return_type = 'dict' + + #source = factory.SubFactory(HarvestSourceObj) + job = factory.SubFactory(HarvestJobObj) + + @classmethod + def _create(cls, target_class, *args, **kwargs): + if args: + assert False, "Positional args aren't supported, use keyword args." + context = {'user': _get_action_user_name(kwargs)} + if 'job_id' not in kwargs: + kwargs['job_id'] = kwargs['job'].id + kwargs['source_id'] = kwargs['job'].source.id + job_dict = toolkit.get_action('harvest_object_create')( + context, kwargs) + if cls._return_type == 'dict': + return job_dict + else: + return cls.FACTORY_FOR.get(job_dict['id']) + + +class HarvestObjectObj(HarvestObject): + _return_type = 'obj' diff --git a/ckanext/harvest/tests/harvesters/__init__.py b/ckanext/harvest/tests/harvesters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ckanext/harvest/tests/harvesters/mock_ckan.py b/ckanext/harvest/tests/harvesters/mock_ckan.py new file mode 100644 index 0000000..b384eb8 --- /dev/null +++ b/ckanext/harvest/tests/harvesters/mock_ckan.py @@ -0,0 +1,458 @@ +import json +import re +import copy + +import SimpleHTTPServer +import SocketServer +from threading import Thread + +PORT = 8998 + + +class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): + def do_GET(self): + # test name is the first bit of the URL and makes CKAN behave + # differently in some way. + # Its value is recorded and then removed from the path + self.test_name = None + test_name_match = re.match('^/([^/]+)/', self.path) + if test_name_match: + self.test_name = test_name_match.groups()[0] + if self.test_name == 'api': + self.test_name = None + else: + self.path = re.sub('^/([^/]+)/', '/', self.path) + + # The API version is recorded and then removed from the path + api_version = None + version_match = re.match('^/api/(\d)', self.path) + if version_match: + api_version = int(version_match.groups()[0]) + self.path = re.sub('^/api/(\d)/', '/api/', self.path) + + if self.path == '/api/rest/package': + if api_version == 2: + dataset_refs = [d['id'] for d in DATASETS] + else: + dataset_refs = [d['name'] for d in DATASETS] + return self.respond_json(dataset_refs) + if self.path.startswith('/api/rest/package/'): + dataset_ref = self.path.split('/')[-1] + dataset = self.get_dataset(dataset_ref) + if dataset: + return self.respond_json( + convert_dataset_to_restful_form(dataset)) + if self.path.startswith('/api/action/package_show'): + params = self.get_url_params() + dataset_ref = params['id'] + dataset = self.get_dataset(dataset_ref) + if dataset: + return self.respond_json(dataset) + if self.path.startswith('/api/search/dataset'): + params = self.get_url_params() + if params.keys() == ['organization']: + org = self.get_org(params['organization']) + dataset_ids = [d['id'] for d in DATASETS + if d['owner_org'] == org['id']] + return self.respond_json({'count': len(dataset_ids), + 'results': dataset_ids}) + else: + return self.respond( + 'Not implemented search params %s' % params, status=400) + if self.path.startswith('/api/search/revision'): + revision_ids = [r['id'] for r in REVISIONS] + return self.respond_json(revision_ids) + if self.path.startswith('/api/rest/revision/'): + revision_ref = self.path.split('/')[-1] + for rev in REVISIONS: + if rev['id'] == revision_ref: + return self.respond_json(rev) + self.respond('Cannot find revision', status=404) + + # if we wanted to server a file from disk, then we'd call this: + #return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self) + + self.respond('Mock CKAN doesnt recognize that call', status=400) + + def get_dataset(self, dataset_ref): + for dataset in DATASETS: + if dataset['name'] == dataset_ref or \ + dataset['id'] == dataset_ref: + if self.test_name == 'invalid_tag': + dataset['tags'] = INVALID_TAGS + return dataset + + def get_org(self, org_ref): + for org in ORGS: + if org['name'] == org_ref or \ + org['id'] == org_ref: + return org + + def get_url_params(self): + params = self.path.split('?')[-1].split('&') + return dict([param.split('=') for param in params]) + + def respond_action(self, result_dict, status=200): + response_dict = {'result': result_dict} + return self.respond_json(response_dict, status=status) + + def respond_json(self, content_dict, status=200): + return self.respond(json.dumps(content_dict), status=status, + content_type='application/json') + + def respond(self, content, status=200, content_type='application/json'): + self.send_response(status) + self.send_header('Content-Type', content_type) + self.end_headers() + self.wfile.write(content) + self.wfile.close() + + +def serve(port=PORT): + '''Runs a CKAN-alike app (over HTTP) that is used for harvesting tests''' + + # Choose the directory to serve files from + #os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)), + # 'mock_ckan_files')) + + class TestServer(SocketServer.TCPServer): + allow_reuse_address = True + + httpd = TestServer(("", PORT), MockCkanHandler) + + print 'Serving test HTTP server at port', PORT + + httpd_thread = Thread(target=httpd.serve_forever) + httpd_thread.setDaemon(True) + httpd_thread.start() + + +def convert_dataset_to_restful_form(dataset): + dataset = copy.deepcopy(dataset) + dataset['extras'] = dict([(e['key'], e['value']) for e in dataset['extras']]) + dataset['tags'] = [t['name'] for t in dataset.get('tags', [])] + return dataset + + +# Datasets are in the package_show form, rather than the RESTful form +DATASETS = [ + {'id': 'dataset1-id', + 'name': 'dataset1', + 'title': 'Test Dataset1', + 'owner_org': 'org1-id', + 'extras': []}, + { + "id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "name": "cabinet-office-energy-use", + "private": False, + "maintainer_email": None, + "revision_timestamp": "2010-11-23T22:34:55.089925", + "organization": + { + + "description": "The Cabinet Office supports the Prime Minister and Deputy Prime Minister, and ensure the effective running of government. We are also the corporate headquarters for government, in partnership with HM Treasury, and we take the lead in certain critical policy areas.\r\nCO is a ministerial department, supported by 18 agencies and public bodies\r\n\r\nYou can find out more at https://www.gov.uk/government/organisations/cabinet-office", + "created": "2012-06-27T14:48:40.244951", + "title": "Cabinet Office", + "name": "cabinet-office", + "revision_timestamp": "2013-04-02T14:27:23.086886", + "is_organization": True, + "state": "active", + "image_url": "", + "revision_id": "4be8825d-d3f4-4fb2-b80b-43e36f574c05", + "type": "organization", + "id": "aa1e068a-23da-4563-b9c2-2cad272b663e", + "approval_status": "pending" + + }, + "update_frequency": "other", + "metadata_created": "2010-08-02T09:19:47.600853", + "last_major_modification": "2010-08-02T09:19:47.600853", + "metadata_modified": "2014-05-09T22:00:01.486366", + "temporal_granularity": "", + "author_email": None, + "geographic_granularity": "point", + "geographic_coverage": [ ], + "state": "active", + "version": None, + "temporal_coverage-to": "", + "license_id": "uk-ogl", + "type": "dataset", + "published_via": "", + "resources": + [ + { + "content_length": "69837", + "cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv", + "hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3", + "description": "70 Whitehall energy data", + "cache_last_updated": "2013-06-19T00:59:42.762642", + "url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv", + "openness_score_failure_count": "0", + "format": "CSV", + "cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv", + "tracking_summary": + { + "total": 0, + "recent": 0 + }, + "last_modified": "2014-05-09T23:00:01.435211", + "mimetype": "text/csv", + "content_type": "text/csv", + "openness_score": "3", + "openness_score_reason": "open and standardized format", + "position": 0, + "revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4", + "id": "f156019d-ea88-46a6-8fa3-3d12582e2161", + "size": 299107 + } + ], + "num_resources": 1, + "tags": + [ + { + + "vocabulary_id": None, + "display_name": "consumption", + "name": "consumption", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8" + + }, + { + + "vocabulary_id": None, + "display_name": "energy", + "name": "energy", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "id": "9f2ae723-602f-4290-80c4-6637ad617a45" + + } + ], + "precision": "", + "tracking_summary": + { + "total": 0, + "recent": 0 + }, + "taxonomy_url": "", + "groups": [], + "creator_user_id": None, + "national_statistic": "no", + "relationships_as_subject": [], + "num_tags": 8, + "update_frequency-other": "Real-time", + "isopen": True, + "url": "http://www.carbonculture.net/orgs/cabinet-office/70-whitehall/", + "notes": "Cabinet Office head office energy use updated from on-site meters showing use, cost and carbon impact.", + "owner_org": "aa1e068a-23da-4563-b9c2-2cad272b663e", + "theme-secondary": + [ + "Environment" + ], + "extras": + [ + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "categories", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "6813d71b-785b-4f56-b296-1b2acb34eed6" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "2010-07-30", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "date_released", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "515f638b-e2cf-40a6-a8a7-cbc8001269e3" + }, + { + + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "date_updated", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "bff63465-4f96-44e7-bb87-6e66fff5e596" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "000000: ", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "geographic_coverage", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "414bcd35-b628-4218-99e2-639615183df8" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "point", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "geographic_granularity", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "c7b460dd-c61f-4cd2-90c2-eceb6c91fe9b" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "no", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "national_statistic", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "9f04b202-3646-49be-b69e-7fa997399ff3" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "{\"status\": \"final\", \"source\": \"Automatically awarded by ODI\", \"certification_type\": \"automatically awarded\", \"level\": \"raw\", \"title\": \"Cabinet Office 70 Whitehall energy use\", \"created_at\": \"2014-10-28T12:25:57Z\", \"jurisdiction\": \"GB\", \"certificate_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922\", \"badge_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922/badge.png\", \"cert_title\": \"Basic Level Certificate\"}", + "revision_timestamp": "2014-11-12T02:52:35.048060", + "state": "active", + "key": "odi-certificate", + "revision_id": "eae9763b-e258-4d76-9ec2-7f5baf655394", + "id": "373a3cbb-d9c0-45a6-9a78-b95c86398766" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "temporal_coverage-from", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "39f72eed-6f76-4733-b636-7541cee3404f" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "temporal_coverage-to", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "818e2c8f-fee0-49da-8bea-ea3c9401ece5" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "temporal_granularity", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "f868b950-d3ce-4fbe-88ca-5cbc4b672320" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "Towns & Cities", + "revision_timestamp": "2015-03-16T18:10:08.802815", + "state": "active", + "key": "theme-primary", + "revision_id": "fc2b6630-84f8-4c88-8ac7-0ca275b2bc97", + "id": "bdcf00fd-3248-4c2f-9cf8-b90706c88e8d" + }, + { + + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "[\"Environment\"]", + "revision_timestamp": "2015-04-08T20:57:04.895214", + "state": "active", + "key": "theme-secondary", + "revision_id": "c2c48530-ff82-4af1-9373-cdc64d5bc83c", + "id": "417482c5-a9c0-4430-8c4e-0c76e59fe44f" + }, + { + "package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", + "value": "Real-time", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "key": "update_frequency", + "revision_id": "08bac459-1d44-44fb-b388-20f4d8394364", + "id": "e8ad4837-514e-4446-81a2-ffacfa7cf683" + } + ], + "license_url": "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/", + "individual_resources": + [ + + { + + "content_length": "69837", + "cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv", + "hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3", + "description": "70 Whitehall energy data", + "cache_last_updated": "2013-06-19T00:59:42.762642", + "url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv", + "openness_score_failure_count": "0", + "format": "CSV", + "cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv", + "tracking_summary": + + { + "total": 0, + "recent": 0 + }, + "last_modified": "2014-05-09T23:00:01.435211", + "mimetype": "text/csv", + "content_type": "text/csv", + "openness_score": "3", + "openness_score_reason": "open and standardized format", + "position": 0, + "revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4", + "id": "f156019d-ea88-46a6-8fa3-3d12582e2161", + "size": 299107 + } + + ], + "title": "Cabinet Office 70 Whitehall energy use", + "revision_id": "3bd6ced3-35b2-4b20-94e2-c596e24bc375", + "date_released": "30/7/2010", + "theme-primary": "Towns & Cities" +} +] + +INVALID_TAGS = [ + { + "vocabulary_id": None, + "display_name": "consumption%^&", + "name": "consumption%^&", + "revision_timestamp": "2010-08-02T09:19:47.600853", + "state": "active", + "id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8" + }, + ] + +ORGS = [ + {'id': 'org1-id', + 'name': 'org1'}, + {'id': 'aa1e068a-23da-4563-b9c2-2cad272b663e', + 'name': 'cabinet-office'} +] + +REVISIONS = [ + { + "id": "23daf2eb-d7ec-4d86-a844-3924acd311ea", + "timestamp": "2015-10-21T09:50:08.160045", + "message": "REST API: Update object dataset1", + "author": "ross", + "approved_timestamp": None, + "packages": + [ + "dataset1" + ], + "groups": [ ] + }, + { + "id": "8254a293-10db-4af2-9dfa-6a1f06ee899c", + "timestamp": "2015-10-21T09:46:21.198021", + "message": "REST API: Update object dataset1", + "author": "ross", + "approved_timestamp": None, + "packages": + [ + "dataset1" + ], + "groups": [ ] + }] diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py new file mode 100644 index 0000000..eb88505 --- /dev/null +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -0,0 +1,118 @@ +from nose.tools import assert_equal +import json + +try: + from ckan.tests.helpers import reset_db + from ckan.tests.factories import Organization +except ImportError: + from ckan.new_tests.helpers import reset_db + from ckan.new_tests.factories import Organization +from ckan import model + +from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj, + HarvestObjectObj) +from ckanext.harvest.tests.lib import run_harvest, run_harvest_job +import ckanext.harvest.model as harvest_model +from ckanext.harvest.harvesters.ckanharvester import CKANHarvester + +import mock_ckan + +# Start CKAN-alike server we can test harvesting against it +mock_ckan.serve() + + +class TestCkanHarvester(object): + @classmethod + def setup(cls): + reset_db() + harvest_model.setup() + + def test_gather_normal(self): + source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT) + job = HarvestJobObj(source=source) + + harvester = CKANHarvester() + obj_ids = harvester.gather_stage(job) + + assert_equal(type(obj_ids), list) + assert_equal(len(obj_ids), len(mock_ckan.DATASETS)) + harvest_object = harvest_model.HarvestObject.get(obj_ids[0]) + assert_equal(harvest_object.guid, mock_ckan.DATASETS[0]['id']) + + def test_fetch_normal(self): + source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT) + job = HarvestJobObj(source=source) + harvest_object = HarvestObjectObj(guid=mock_ckan.DATASETS[0]['id'], + job=job) + + harvester = CKANHarvester() + result = harvester.fetch_stage(harvest_object) + + assert_equal(result, True) + assert_equal( + harvest_object.content, + json.dumps( + mock_ckan.convert_dataset_to_restful_form( + mock_ckan.DATASETS[0]))) + + def test_import_normal(self): + org = Organization() + harvest_object = HarvestObjectObj( + guid=mock_ckan.DATASETS[0]['id'], + content=json.dumps(mock_ckan.convert_dataset_to_restful_form( + mock_ckan.DATASETS[0])), + job__source__owner_org=org['id']) + + harvester = CKANHarvester() + result = harvester.import_stage(harvest_object) + + assert_equal(result, True) + assert harvest_object.package_id + dataset = model.Package.get(harvest_object.package_id) + assert_equal(dataset.name, mock_ckan.DATASETS[0]['name']) + + def test_harvest(self): + results_by_guid = run_harvest( + url='http://localhost:%s/' % mock_ckan.PORT, + harvester=CKANHarvester()) + + result = results_by_guid['dataset1-id'] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'added') + assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) + assert_equal(result['errors'], []) + + result = results_by_guid[mock_ckan.DATASETS[1]['id']] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'added') + assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name']) + assert_equal(result['errors'], []) + + def test_harvest_twice(self): + run_harvest( + url='http://localhost:%s/' % mock_ckan.PORT, + harvester=CKANHarvester()) + results_by_guid = run_harvest( + url='http://localhost:%s/' % mock_ckan.PORT, + harvester=CKANHarvester()) + + # updated the dataset which has revisions + result = results_by_guid['dataset1'] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'updated') + assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) + assert_equal(result['errors'], []) + + # the other dataset is unchanged and not harvested + assert mock_ckan.DATASETS[1]['name'] not in result + + def test_harvest_invalid_tag(self): + from nose.plugins.skip import SkipTest; raise SkipTest() + results_by_guid = run_harvest( + url='http://localhost:%s/invalid_tag' % mock_ckan.PORT, + harvester=CKANHarvester()) + + result = results_by_guid['dataset1-id'] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'added') + assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) diff --git a/ckanext/harvest/tests/lib.py b/ckanext/harvest/tests/lib.py new file mode 100644 index 0000000..ff04f26 --- /dev/null +++ b/ckanext/harvest/tests/lib.py @@ -0,0 +1,56 @@ +from ckanext.harvest.tests.factories import HarvestSourceObj, HarvestJobObj +import ckanext.harvest.model as harvest_model +from ckanext.harvest.logic import NoNewHarvestJobError +from ckanext.harvest import queue +from ckan.plugins import toolkit + + +def run_harvest(url, harvester, config=''): + '''Runs a harvest and returns the results. + This allows you to test a harvester. + Queues are avoided as they are a pain in tests. + ''' + # User creates a harvest source + source = HarvestSourceObj(url=url, config=config) + + # User triggers a harvest, which is the creation of a harvest job + job = HarvestJobObj(source=source) + + return run_harvest_job(job, harvester) + + +def run_harvest_job(job, harvester): + # When 'paster harvest run' is called by the regular cron it does 2 things: + # 1. change the job status to Running + job.status = 'Running' + job.save() + # 2. put the job on the gather queue which is consumed by + # queue.gather_callback, which determines the harvester and then calls + # gather_stage. We simply call the gather_stage. + obj_ids = queue.gather_stage(harvester, job) + + # The object ids are put onto the fetch queue, consumed by + # queue.fetch_callback which calls queue.fetch_and_import_stages + results_by_guid = {} + for obj_id in obj_ids: + harvest_object = harvest_model.HarvestObject.get(obj_id) + guid = harvest_object.guid + results_by_guid[guid] = {'obj_id': obj_id} + + queue.fetch_and_import_stages(harvester, harvest_object) + results_by_guid[guid]['state'] = harvest_object.state + results_by_guid[guid]['report_status'] = harvest_object.report_status + if harvest_object.state == 'COMPLETE' and harvest_object.package_id: + results_by_guid[guid]['dataset'] = \ + toolkit.get_action('package_show')( + {}, dict(id=harvest_object.package_id)) + results_by_guid[guid]['errors'] = harvest_object.errors + + # Do 'harvest_jobs_run' to change the job status to 'finished' + try: + toolkit.get_action('harvest_jobs_run')({'ignore_auth': True}, {}) + except NoNewHarvestJobError: + # This is expected + pass + + return results_by_guid diff --git a/ckanext/harvest/tests/test_action.py b/ckanext/harvest/tests/test_action.py index 8ad8f51..7aed649 100644 --- a/ckanext/harvest/tests/test_action.py +++ b/ckanext/harvest/tests/test_action.py @@ -1,19 +1,23 @@ import json import copy -import ckan -import paste -import pylons.test import factories import unittest -from ckan.lib.create_test_data import CreateTestData -import ckan.new_tests.helpers as helpers +try: + from ckan.tests import factories as ckan_factories + from ckan.tests.helpers import _get_test_app, reset_db +except ImportError: + from ckan.new_tests import factories as ckan_factories + from ckan.new_tests.helpers import _get_test_app, reset_db from ckan import plugins as p from ckan.plugins import toolkit +from ckan import model + from ckanext.harvest.interfaces import IHarvester import ckanext.harvest.model as harvest_model -def call_action_api(app, action, apikey=None, status=200, **kwargs): + +def call_action_api(action, apikey=None, status=200, **kwargs): '''POST an HTTP request to the CKAN API and return the result. Any additional keyword arguments that you pass to this function as **kwargs @@ -21,7 +25,7 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs): Usage: - package_dict = post(app, 'package_create', apikey=apikey, + package_dict = call_action_api('package_create', apikey=apikey, name='my_package') assert package_dict['name'] == 'my_package' @@ -31,13 +35,10 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs): of the error dict, you have to use the status param otherwise an exception will be raised: - error_dict = post(app, 'group_activity_list', status=403, + error_dict = call_action_api('group_activity_list', status=403, id='invalid_id') assert error_dict['message'] == 'Access Denied' - :param app: the test app to post to - :type app: paste.fixture.TestApp - :param action: the action to post to, e.g. 'package_create' :type action: string @@ -62,8 +63,10 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs): ''' params = json.dumps(kwargs) + app = _get_test_app() response = app.post('/api/action/{0}'.format(action), params=params, - extra_environ={'Authorization': str(apikey)}, status=status) + extra_environ={'Authorization': str(apikey)}, + status=status) if status in (200,): assert response.json['success'] is True @@ -75,10 +78,13 @@ def call_action_api(app, action, apikey=None, status=200, **kwargs): class MockHarvesterForActionTests(p.SingletonPlugin): p.implements(IHarvester) - def info(self): - return {'name': 'test-for-action', 'title': 'Test for action', 'description': 'test'} - def validate_config(self,config): + def info(self): + return {'name': 'test-for-action', + 'title': 'Test for action', + 'description': 'test'} + + def validate_config(self, config): if not config: return config @@ -86,10 +92,10 @@ class MockHarvesterForActionTests(p.SingletonPlugin): config_obj = json.loads(config) if 'custom_option' in config_obj: - if not isinstance(config_obj['custom_option'],list): + if not isinstance(config_obj['custom_option'], list): raise ValueError('custom_option must be a list') - except ValueError,e: + except ValueError, e: raise e return config @@ -103,56 +109,63 @@ class MockHarvesterForActionTests(p.SingletonPlugin): def import_stage(self, harvest_object): return True -class HarvestSourceActionBase(object): + +class FunctionalTestBaseWithoutClearBetweenTests(object): + ''' Functional tests should normally derive from + ckan.lib.helpers.FunctionalTestBase, but these are legacy tests so this + class is a compromise. This version doesn't call reset_db before every + test, because these tests are designed with fixtures created in + setup_class.''' @classmethod def setup_class(cls): + reset_db() harvest_model.setup() - CreateTestData.create() - sysadmin_user = ckan.model.User.get('testsysadmin') - cls.sysadmin = { - 'id': sysadmin_user.id, - 'apikey': sysadmin_user.apikey, - 'name': sysadmin_user.name, - } + @classmethod + def teardown_class(cls): + pass - cls.app = paste.fixture.TestApp(pylons.test.pylonsapp) +class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests): - cls.default_source_dict = { - "url": "http://test.action.com", - "name": "test-source-action", - "title": "Test source action", - "notes": "Test source action desc", - "source_type": "test-for-action", - "frequency": "MANUAL", - "config": json.dumps({"custom_option":["a","b"]}) + @classmethod + def setup_class(cls): + super(HarvestSourceActionBase, cls).setup_class() + harvest_model.setup() + + cls.sysadmin = ckan_factories.Sysadmin() + + cls.default_source_dict = { + "url": "http://test.action.com", + "name": "test-source-action", + "title": "Test source action", + "notes": "Test source action desc", + "source_type": "test-for-action", + "frequency": "MANUAL", + "config": json.dumps({"custom_option": ["a", "b"]}) } if not p.plugin_loaded('test_action_harvester'): p.load('test_action_harvester') - @classmethod def teardown_class(cls): - ckan.model.repo.rebuild_db() + super(HarvestSourceActionBase, cls).teardown_class() p.unload('test_action_harvester') - def teardown(self): - pass - def test_invalid_missing_values(self): source_dict = {} if 'id' in self.default_source_dict: source_dict['id'] = self.default_source_dict['id'] - result = call_action_api(self.app, self.action, - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api(self.action, + apikey=self.sysadmin['apikey'], status=409, + **source_dict) - for key in ('name','title','url','source_type'): + for key in ('name', 'title', 'url', 'source_type'): assert result[key] == [u'Missing value'] def test_invalid_unknown_type(self): @@ -160,8 +173,9 @@ class HarvestSourceActionBase(object): source_dict = copy.deepcopy(self.default_source_dict) source_dict['source_type'] = 'unknown' - result = call_action_api(self.app, self.action, - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api(self.action, + apikey=self.sysadmin['apikey'], status=409, + **source_dict) assert 'source_type' in result assert u'Unknown harvester type' in result['source_type'][0] @@ -171,8 +185,9 @@ class HarvestSourceActionBase(object): source_dict = copy.deepcopy(self.default_source_dict) source_dict['frequency'] = wrong_frequency - result = call_action_api(self.app, self.action, - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api(self.action, + apikey=self.sysadmin['apikey'], status=409, + **source_dict) assert 'frequency' in result assert u'Frequency {0} not recognised'.format(wrong_frequency) in result['frequency'][0] @@ -182,16 +197,18 @@ class HarvestSourceActionBase(object): source_dict = copy.deepcopy(self.default_source_dict) source_dict['config'] = 'not_json' - result = call_action_api(self.app, self.action, - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api(self.action, + apikey=self.sysadmin['apikey'], status=409, + **source_dict) assert 'config' in result assert u'Error parsing the configuration options: No JSON object could be decoded' in result['config'][0] source_dict['config'] = json.dumps({'custom_option': 'not_a_list'}) - result = call_action_api(self.app, self.action, - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api(self.action, + apikey=self.sysadmin['apikey'], status=409, + **source_dict) assert 'config' in result assert u'Error parsing the configuration options: custom_option must be a list' in result['config'][0] @@ -202,14 +219,12 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase): def __init__(self): self.action = 'harvest_source_create' - - def test_create(self): source_dict = self.default_source_dict - result = call_action_api(self.app, 'harvest_source_create', - apikey=self.sysadmin['apikey'], **source_dict) + result = call_action_api('harvest_source_create', + apikey=self.sysadmin['apikey'], **source_dict) for key in source_dict.keys(): assert source_dict[key] == result[key] @@ -219,18 +234,18 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase): assert source.url == source_dict['url'] assert source.type == source_dict['source_type'] - # Trying to create a source with the same URL fails - source_dict = copy.deepcopy(self.default_source_dict) source_dict['name'] = 'test-source-action-new' - result = call_action_api(self.app, 'harvest_source_create', - apikey=self.sysadmin['apikey'], status=409, **source_dict) + result = call_action_api('harvest_source_create', + apikey=self.sysadmin['apikey'], status=409, + **source_dict) assert 'url' in result assert u'There already is a Harvest Source for this URL' in result['url'][0] + class TestHarvestSourceActionUpdate(HarvestSourceActionBase): @classmethod @@ -242,8 +257,8 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase): # Create a source to udpate source_dict = cls.default_source_dict - result = call_action_api(cls.app, 'harvest_source_create', - apikey=cls.sysadmin['apikey'], **source_dict) + result = call_action_api('harvest_source_create', + apikey=cls.sysadmin['apikey'], **source_dict) cls.default_source_dict['id'] = result['id'] @@ -251,17 +266,17 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase): source_dict = self.default_source_dict source_dict.update({ - "url": "http://test.action.updated.com", - "name": "test-source-action-updated", - "title": "Test source action updated", - "notes": "Test source action desc updated", - "source_type": "test", - "frequency": "MONTHLY", - "config": json.dumps({"custom_option":["c","d"]}) - }) + "url": "http://test.action.updated.com", + "name": "test-source-action-updated", + "title": "Test source action updated", + "notes": "Test source action desc updated", + "source_type": "test", + "frequency": "MONTHLY", + "config": json.dumps({"custom_option": ["c", "d"]}) + }) - result = call_action_api(self.app, 'harvest_source_update', - apikey=self.sysadmin['apikey'], **source_dict) + result = call_action_api('harvest_source_update', + apikey=self.sysadmin['apikey'], **source_dict) for key in source_dict.keys(): assert source_dict[key] == result[key] @@ -271,29 +286,26 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase): assert source.url == source_dict['url'] assert source.type == source_dict['source_type'] + class TestHarvestObject(unittest.TestCase): @classmethod def setup_class(cls): + reset_db() harvest_model.setup() - @classmethod - def teardown_class(cls): - ckan.model.repo.rebuild_db() - def test_create(self): - job = factories.HarvestJobFactory() - job.save() + job = factories.HarvestJobObj() context = { - 'model' : ckan.model, - 'session': ckan.model.Session, + 'model': model, + 'session': model.Session, 'ignore_auth': True, } data_dict = { - 'guid' : 'guid', - 'content' : 'content', - 'job_id' : job.id, - 'extras' : { 'a key' : 'a value' }, + 'guid': 'guid', + 'content': 'content', + 'job_id': job.id, + 'extras': {'a key': 'a value'}, } harvest_object = toolkit.get_action('harvest_object_create')( context, data_dict) @@ -303,26 +315,24 @@ class TestHarvestObject(unittest.TestCase): assert created_object.guid == harvest_object['guid'] == data_dict['guid'] def test_create_bad_parameters(self): - source_a = factories.HarvestSourceFactory() - source_a.save() - job = factories.HarvestJobFactory() - job.save() + source_a = factories.HarvestSourceObj() + job = factories.HarvestJobObj() context = { - 'model' : ckan.model, - 'session': ckan.model.Session, + 'model': model, + 'session': model.Session, 'ignore_auth': True, } data_dict = { - 'job_id' : job.id, - 'source_id' : source_a.id, - 'extras' : 1 + 'job_id': job.id, + 'source_id': source_a.id, + 'extras': 1 } harvest_object_create = toolkit.get_action('harvest_object_create') - self.assertRaises(ckan.logic.ValidationError, harvest_object_create, - context, data_dict) + self.assertRaises(toolkit.ValidationError, harvest_object_create, + context, data_dict) - data_dict['extras'] = {'test': 1 } + data_dict['extras'] = {'test': 1} - self.assertRaises(ckan.logic.ValidationError, harvest_object_create, - context, data_dict) + self.assertRaises(toolkit.ValidationError, harvest_object_create, + context, data_dict) diff --git a/ckanext/harvest/tests/test_queue.py b/ckanext/harvest/tests/test_queue.py index 2b06926..d5b3888 100644 --- a/ckanext/harvest/tests/test_queue.py +++ b/ckanext/harvest/tests/test_queue.py @@ -1,3 +1,7 @@ +try: + from ckan.tests.helpers import reset_db +except ImportError: + from ckan.new_tests.helpers import reset_db import ckanext.harvest.model as harvest_model from ckanext.harvest.model import HarvestObject, HarvestObjectExtra from ckanext.harvest.interfaces import IHarvester @@ -82,13 +86,9 @@ class TestHarvester(SingletonPlugin): class TestHarvestQueue(object): @classmethod def setup_class(cls): + reset_db() harvest_model.setup() - @classmethod - def teardown_class(cls): - model.repo.rebuild_db() - - def test_01_basic_harvester(self): ### make sure queues/exchanges are created first and are empty From dc7af5d1502d4f8f46667b293050050e743e2edb Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 21 Oct 2015 16:38:03 +0000 Subject: [PATCH 12/15] Remove prints. --- ckanext/harvest/model/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 39317fc..92b1af2 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -39,6 +39,7 @@ harvest_gather_error_table = None harvest_object_error_table = None harvest_object_extra_table = None + def setup(): if harvest_source_table is None: @@ -46,13 +47,11 @@ def setup(): log.debug('Harvest tables defined in memory') if not model.package_table.exists(): - print 'harvest model setup: DEFER' log.debug('Harvest table creation deferred') return if not harvest_source_table.exists(): - print 'harvest model setup: CREATE' # Create each table individually rather than # using metadata.create_all() harvest_source_table.create() @@ -65,7 +64,6 @@ def setup(): log.debug('Harvest tables created') else: from ckan.model.meta import engine - print 'harvest model setup: NOTHING TO DO' log.debug('Harvest tables already exist') # Check if existing tables need to be updated inspector = Inspector.from_engine(engine) From 6fb5728d02d6dc46098dd35f767f9a674cd80d15 Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Wed, 21 Oct 2015 17:48:07 +0100 Subject: [PATCH 13/15] Fixed Typos. --- README.rst | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/README.rst b/README.rst index f3dadf8..a433009 100644 --- a/README.rst +++ b/README.rst @@ -149,12 +149,12 @@ Authorization ============= Starting from CKAN 2.0, harvest sources behave exactly the same as datasets -(they are actually internally implemented as a dataset type). That means that +(they are actually internally implemented as a dataset type). That means they can be searched and faceted, and that the same authorization rules can be applied to them. The default authorization settings are based on organizations (equivalent to the `publisher profile` found in old versions). -Have a look at the `Authorization `_ +Have a look at the `Authorization `_ documentation on CKAN core to see how to configure your instance depending on your needs. @@ -429,7 +429,7 @@ The ``run`` command not only starts any pending harvesting jobs, but also flags those that are finished, allowing new jobs to be created on that particular source and refreshing the source statistics. That means that you will need to run this command before being able to create a new job on a source that was being -harvested (On a production site you will tipically have a cron job that runs the +harvested (On a production site you will typically have a cron job that runs the command regularly, see next section). @@ -598,4 +598,3 @@ http://www.fsf.org/licensing/licenses/agpl-3.0.html .. _Supervisor: http://supervisord.org - From 3c6cc55be078431b43681c30e24b097ce8485a90 Mon Sep 17 00:00:00 2001 From: amercader Date: Fri, 23 Oct 2015 11:52:22 +0100 Subject: [PATCH 14/15] Only flush keys on the current Redis database --- ckanext/harvest/commands/harvester.py | 4 ++-- ckanext/harvest/queue.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 03a6bb8..c0592e4 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -45,8 +45,8 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue - WARNING: if using Redis, this command purges any other data you have - in Redis too! + WARNING: if using Redis, this command purges all data in the current + Redis database harvester [-j] [-o] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, for a certain diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index e1a1180..76b0a35 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -87,8 +87,8 @@ def purge_queues(): log.info('AMQP queue purged: %s', get_fetch_queue_name()) return if backend == 'redis': - connection.flushall() - log.info('Redis flushed') + connection.flushdb() + log.info('Redis database flushed') def resubmit_jobs(): if config.get('ckan.harvest.mq.type') != 'redis': @@ -180,7 +180,7 @@ class RedisConsumer(object): def basic_ack(self, message): self.redis.delete(self.persistance_key(message)) def queue_purge(self, queue): - self.redis.flushall() + self.redis.flushdb() def basic_get(self, queue): body = self.redis.lpop(self.routing_key) return (FakeMethod(body), self, body) From 992cdc57d20ae18181e5698407f27a9e729add86 Mon Sep 17 00:00:00 2001 From: amercader Date: Fri, 23 Oct 2015 11:56:12 +0100 Subject: [PATCH 15/15] Test 2.4 and latest branches for all releases --- .travis.yml | 1 + bin/travis-build.bash | 10 +++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index cabad8b..110f770 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,7 @@ env: - CKANVERSION=master - CKANVERSION=2.2 - CKANVERSION=2.3 + - CKANVERSION=2.4 services: - redis-server install: diff --git a/bin/travis-build.bash b/bin/travis-build.bash index 8ef11de..aae36f3 100644 --- a/bin/travis-build.bash +++ b/bin/travis-build.bash @@ -10,12 +10,16 @@ sudo apt-get install postgresql-9.1 solr-jetty libcommons-fileupload-java:amd64= echo "Installing CKAN and its Python dependencies..." git clone https://github.com/ckan/ckan cd ckan -if [ $CKANVERSION == '2.3' ] +if [ $CKANVERSION == '2.4' ] then - git checkout release-v2.3 + git checkout release-v2.4-latest +elif [ $CKANVERSION == '2.3' ] +then + git checkout release-v2.3-latest + elif [ $CKANVERSION == '2.2' ] then - git checkout release-v2.2.3 + git checkout release-v2.2-latest fi python setup.py develop pip install -r requirements.txt --allow-all-external