From 2da918c2e4666fdc18991762e3eb1fd7c40766ac Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 22 Jul 2015 10:13:02 +0100 Subject: [PATCH 1/9] Fix migration for old harvests so that ones that errored are correctly marked. Added helpful comments in model. --- ckanext/harvest/model/__init__.py | 18 +++++++++++++++--- ckanext/harvest/queue.py | 7 ++++++- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 1d3a05a..6f8b992 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -196,12 +196,21 @@ def define_harvester_tables(): Column('gather_finished', types.DateTime), Column('finished', types.DateTime), Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')), + # status: New, Running, Finished Column('status', types.UnicodeText, default=u'New', nullable=False), ) - # Was harvested_document + # A harvest_object contains a representation of one dataset during a + # particular harvest harvest_object_table = Table('harvest_object', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), + # The guid is the 'identity' of the dataset, according to the source. + # So if you reharvest it, then the harvester knows which dataset to + # update because of this identity. The identity needs to be unique + # within this CKAN. Column('guid', types.UnicodeText, default=u''), + # When you harvest a dataset multiple times, only the latest + # successfully imported harvest_object should be flagged 'current'. + # The import_stage reads and writes it. Column('current',types.Boolean,default=False), Column('gathered', types.DateTime, default=datetime.datetime.utcnow), Column('fetch_started', types.DateTime), @@ -209,6 +218,7 @@ def define_harvester_tables(): Column('fetch_finished', types.DateTime), Column('import_started', types.DateTime), Column('import_finished', types.DateTime), + # state: WAITING, FETCH, IMPORT, COMPLETE, ERROR Column('state', types.UnicodeText, default=u'WAITING'), Column('metadata_modified_date', types.DateTime), Column('retry_times',types.Integer, default=0), @@ -391,9 +401,11 @@ ALTER TABLE harvest_object_extra ALTER TABLE harvest_object_extra ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id); -UPDATE harvest_object set state = 'COMPLETE'; +UPDATE harvest_object set state = 'COMPLETE' where package_id is not null; +UPDATE harvest_object set state = 'ERROR' where package_id is null; UPDATE harvest_object set retry_times = 0; -UPDATE harvest_object set report_status = 'new'; +UPDATE harvest_object set report_status = 'updated' where package_id is not null; +UPDATE harvest_object set report_status = 'errored' where package_id is null; UPDATE harvest_source set frequency = 'MANUAL'; ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey; diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index b8c4131..52d942f 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -250,7 +250,12 @@ def gather_callback(channel, method, header, body): log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids))) if not harvester_found: - msg = 'No harvester could be found for source type %s' % job.source.type + # This can occur if you: + # * remove a harvester and it still has sources that are then + # refreshed + # * add a new harvester and restart CKAN but not the gather + # queue. + msg = 'System error - No harvester could be found for source type %s' % job.source.type err = HarvestGatherError(message=msg,job=job) err.save() log.error(msg) From be3e88086a04fe1075944810b781876d8e997eea Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 1 Oct 2015 17:53:03 +0100 Subject: [PATCH 2/9] Generating unique names improved * Harvesters that change the name when the title changes have had a problem when the change is small and a number was unnecessarily appended. e.g. "Trees "->"Trees" meant _gen_new_name("Trees") returned "trees1". Now you can specify the existing value and it will return that if it still holds. * Maximum dataset name length is now adhered to. * To make a name unique, a sequential number is now added, since for users that is more understandable and pleasant. However hex digits are still an option, for those that want to harvest concurrently. --- ckanext/harvest/harvesters/base.py | 101 +++++++++++++++++++++++++---- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 41a5061..1fd0ebc 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -8,7 +8,7 @@ from pylons import config from ckan import plugins as p from ckan import model -from ckan.model import Session, Package +from ckan.model import Session, Package, PACKAGE_NAME_MAX_LENGTH from ckan.logic import ValidationError, NotFound, get_action from ckan.logic.schema import default_create_package_schema @@ -41,21 +41,100 @@ class HarvesterBase(SingletonPlugin): _user_name = None - def _gen_new_name(self, title): + @classmethod + def _gen_new_name(cls, title, existing_name=None, + append_type='number-sequence'): ''' - Creates a URL friendly name from a title + Returns a 'name' for the dataset (URL friendly), based on the title. - If the name already exists, it will add some random characters at the end + If the ideal name is already used, it will append a number to it to + ensure it is unique. + + If generating a new name because the title of the dataset has changed, + specify the existing name, in case the name doesn't need to change + after all. + + :param existing_name: the current name of the dataset - only specify + this if the dataset exists + :type existing_name: string + :param append_type: the type of characters to add to make it unique - + either 'number-sequence' or 'random-hex'. + :type append_type: string ''' - name = munge_title_to_name(title).replace('_', '-') - while '--' in name: - name = name.replace('--', '-') - pkg_obj = Session.query(Package).filter(Package.name == name).first() - if pkg_obj: - return name + str(uuid.uuid4())[:5] + ideal_name = munge_title_to_name(title) + ideal_name = re.sub('-+', '-', ideal_name) # collapse multiple dashes + return cls._ensure_name_is_unique(ideal_name, + existing_name=existing_name, + append_type=append_type) + + @staticmethod + def _ensure_name_is_unique(ideal_name, existing_name=None, + append_type='number-sequence'): + ''' + Returns a dataset name based on the ideal_name, only it will be + guaranteed to be different than all the other datasets, by adding a + number on the end if necessary. + + If generating a new name because the title of the dataset has changed, + specify the existing name, in case the name doesn't need to change + after all. + + The maximum dataset name length is taken account of. + + :param ideal_name: the desired name for the dataset, if its not already + been taken (usually derived by munging the dataset + title) + :type ideal_name: string + :param existing_name: the current name of the dataset - only specify + this if the dataset exists + :type existing_name: string + :param append_type: the type of characters to add to make it unique - + either 'number-sequence' or 'random-hex'. + :type append_type: string + ''' + ideal_name = ideal_name[:PACKAGE_NAME_MAX_LENGTH] + if existing_name == ideal_name: + return ideal_name + if append_type == 'number-sequence': + MAX_NUMBER_APPENDED = 999 + APPEND_MAX_CHARS = len(str(MAX_NUMBER_APPENDED)) + elif append_type == 'random-hex': + APPEND_MAX_CHARS = 5 # 16^5 = 1 million combinations else: - return name + raise NotImplementedError('append_type cannot be %s' % append_type) + # Find out which package names have been taken. Restrict it to names + # derived from the ideal name plus and numbers added + like_q = u'%s%%' % \ + ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS] + name_results = Session.query(Package.name)\ + .filter(Package.name.ilike(like_q))\ + .all() + taken = set([name_result[0] for name_result in name_results]) + if existing_name and existing_name in taken: + taken.remove(existing_name) + if ideal_name not in taken: + # great, the ideal name is available + return ideal_name + elif existing_name and existing_name.startswith(ideal_name): + # the ideal name is not available, but its an existing dataset with + # a name based on the ideal one, so there's no point changing it to + # a different number + return existing_name + elif append_type == 'number-sequence': + # find the next available number + counter = 1 + while counter <= MAX_NUMBER_APPENDED: + candidate_name = \ + ideal_name[:PACKAGE_NAME_MAX_LENGTH-len(str(counter))] + \ + str(counter) + if candidate_name not in taken: + return candidate_name + counter = counter + 1 + return None + elif append_type == 'random-hex': + return ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS] + \ + str(uuid.uuid4())[:APPEND_MAX_CHARS] def _save_gather_error(self, message, job): From 82bdff2f342c43f14fab241b7c39a0468de9ea18 Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 1 Oct 2015 17:59:17 +0100 Subject: [PATCH 3/9] Add tests --- .gitignore | 2 +- ckanext/harvest/tests/harvesters/__init__.py | 0 ckanext/harvest/tests/harvesters/test_base.py | 96 +++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 ckanext/harvest/tests/harvesters/__init__.py create mode 100644 ckanext/harvest/tests/harvesters/test_base.py diff --git a/.gitignore b/.gitignore index b678f55..62ff3fd 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,6 @@ build .DS_Store dist development.ini -*.swp +*.sw? *~ node_modules diff --git a/ckanext/harvest/tests/harvesters/__init__.py b/ckanext/harvest/tests/harvesters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ckanext/harvest/tests/harvesters/test_base.py b/ckanext/harvest/tests/harvesters/test_base.py new file mode 100644 index 0000000..6e1da21 --- /dev/null +++ b/ckanext/harvest/tests/harvesters/test_base.py @@ -0,0 +1,96 @@ +import re + +from nose.tools import assert_equal + +from ckanext.harvest.harvesters.base import HarvesterBase +try: + from ckan.tests import helpers + from ckan.tests import factories +except ImportError: + from ckan.new_tests import helpers + from ckan.new_tests import factories + + +_ensure_name_is_unique = HarvesterBase._ensure_name_is_unique + + +class TestGenNewName(object): + @classmethod + def setup_class(cls): + helpers.reset_db() + + def test_basic(self): + assert_equal(HarvesterBase._gen_new_name('Trees'), 'trees') + + def test_munge(self): + assert_equal( + HarvesterBase._gen_new_name('Trees and branches - survey.'), + 'trees-and-branches-survey') + + +class TestEnsureNameIsUnique(object): + def setup(self): + helpers.reset_db() + + def test_no_existing_datasets(self): + factories.Dataset(name='unrelated') + assert_equal(_ensure_name_is_unique('trees'), 'trees') + + def test_existing_dataset(self): + factories.Dataset(name='trees') + assert_equal(_ensure_name_is_unique('trees'), 'trees1') + + def test_two_existing_datasets(self): + factories.Dataset(name='trees') + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('trees'), 'trees2') + + def test_no_existing_datasets_and_long_name(self): + assert_equal(_ensure_name_is_unique('x'*101), 'x'*100) + + def test_existing_dataset_and_long_name(self): + # because PACKAGE_NAME_MAX_LENGTH = 100 + factories.Dataset(name='x'*100) + assert_equal(_ensure_name_is_unique('x'*101), 'x'*99 + '1') + + def test_update_dataset_with_new_name(self): + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('tree', existing_name='trees1'), + 'tree') + + def test_update_dataset_but_with_same_name(self): + # this can happen if you remove a trailing space from the title - the + # harvester sees the title changed and thinks it should have a new + # name, but clearly it can reuse its existing one + factories.Dataset(name='trees') + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('trees', existing_name='trees'), + 'trees') + + def test_update_dataset_to_available_shorter_name(self): + # this can be handy when if reharvesting, you got duplicates and + # managed to purge one set and through a minor title change you can now + # lose the appended number. users don't like unnecessary numbers. + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('trees', existing_name='trees1'), + 'trees') + + def test_update_dataset_but_doesnt_change_to_other_number(self): + # there's no point changing one number for another though + factories.Dataset(name='trees') + factories.Dataset(name='trees2') + assert_equal(_ensure_name_is_unique('trees', existing_name='trees2'), + 'trees2') + + def test_update_dataset_with_new_name_with_numbers(self): + factories.Dataset(name='trees') + factories.Dataset(name='trees2') + factories.Dataset(name='frogs') + assert_equal(_ensure_name_is_unique('frogs', existing_name='trees2'), + 'frogs1') + + def test_existing_dataset_appending_hex(self): + factories.Dataset(name='trees') + name = _ensure_name_is_unique('trees', append_type='random-hex') + # e.g. 'trees0b53f' + assert re.match('trees[\da-f]{5}', name) From 1f81fefcbb8fba983e67d4b65fa620eccb1de0d4 Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 21 Oct 2015 16:11:11 +0000 Subject: [PATCH 4/9] Correct doc error for configuring amqp. --- .gitignore | 2 +- README.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index b678f55..62ff3fd 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,6 @@ build .DS_Store dist development.ini -*.swp +*.sw? *~ node_modules diff --git a/README.rst b/README.rst index f3dadf8..3b1f2d3 100644 --- a/README.rst +++ b/README.rst @@ -29,7 +29,7 @@ Installation On your CKAN configuration file, add:: - ckan.harvest.mq.type = rabbitmq + ckan.harvest.mq.type = amqp 2. Install the extension into your python environment:: From d1f84295f8726d6a83f28585ca76ea0a1487f9a1 Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 21 Oct 2015 16:12:40 +0000 Subject: [PATCH 5/9] purge_queues command now has warning about impact of Redis flushall, plus add some (log) output when you run a purge. --- ckanext/harvest/commands/harvester.py | 2 ++ ckanext/harvest/queue.py | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 7158c8c..03a6bb8 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -45,6 +45,8 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue + WARNING: if using Redis, this command purges any other data you have + in Redis too! harvester [-j] [-o] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, for a certain diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index bb1d63c..7e56a31 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -82,10 +82,13 @@ def purge_queues(): if backend in ('amqp', 'ampq'): channel = connection.channel() channel.queue_purge(queue=get_gather_queue_name()) + log.info('AMQP queue purged: %s', get_gather_queue_name()) channel.queue_purge(queue=get_fetch_queue_name()) + log.info('AMQP queue purged: %s', get_fetch_queue_name()) return if backend == 'redis': connection.flushall() + log.info('Redis flushed') def resubmit_jobs(): if config.get('ckan.harvest.mq.type') != 'redis': @@ -95,7 +98,7 @@ def resubmit_jobs(): for key in harvest_object_pending: date_of_key = datetime.datetime.strptime(redis.get(key), "%Y-%m-%d %H:%M:%S.%f") - if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max + if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max redis.rpush('harvest_object_id', json.dumps({'harvest_object_id': key.split(':')[-1]}) ) From dc7af5d1502d4f8f46667b293050050e743e2edb Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 21 Oct 2015 16:38:03 +0000 Subject: [PATCH 6/9] Remove prints. --- ckanext/harvest/model/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 39317fc..92b1af2 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -39,6 +39,7 @@ harvest_gather_error_table = None harvest_object_error_table = None harvest_object_extra_table = None + def setup(): if harvest_source_table is None: @@ -46,13 +47,11 @@ def setup(): log.debug('Harvest tables defined in memory') if not model.package_table.exists(): - print 'harvest model setup: DEFER' log.debug('Harvest table creation deferred') return if not harvest_source_table.exists(): - print 'harvest model setup: CREATE' # Create each table individually rather than # using metadata.create_all() harvest_source_table.create() @@ -65,7 +64,6 @@ def setup(): log.debug('Harvest tables created') else: from ckan.model.meta import engine - print 'harvest model setup: NOTHING TO DO' log.debug('Harvest tables already exist') # Check if existing tables need to be updated inspector = Inspector.from_engine(engine) From 6fb5728d02d6dc46098dd35f767f9a674cd80d15 Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Wed, 21 Oct 2015 17:48:07 +0100 Subject: [PATCH 7/9] Fixed Typos. --- README.rst | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/README.rst b/README.rst index f3dadf8..a433009 100644 --- a/README.rst +++ b/README.rst @@ -149,12 +149,12 @@ Authorization ============= Starting from CKAN 2.0, harvest sources behave exactly the same as datasets -(they are actually internally implemented as a dataset type). That means that +(they are actually internally implemented as a dataset type). That means they can be searched and faceted, and that the same authorization rules can be applied to them. The default authorization settings are based on organizations (equivalent to the `publisher profile` found in old versions). -Have a look at the `Authorization `_ +Have a look at the `Authorization `_ documentation on CKAN core to see how to configure your instance depending on your needs. @@ -429,7 +429,7 @@ The ``run`` command not only starts any pending harvesting jobs, but also flags those that are finished, allowing new jobs to be created on that particular source and refreshing the source statistics. That means that you will need to run this command before being able to create a new job on a source that was being -harvested (On a production site you will tipically have a cron job that runs the +harvested (On a production site you will typically have a cron job that runs the command regularly, see next section). @@ -598,4 +598,3 @@ http://www.fsf.org/licensing/licenses/agpl-3.0.html .. _Supervisor: http://supervisord.org - From 3c6cc55be078431b43681c30e24b097ce8485a90 Mon Sep 17 00:00:00 2001 From: amercader Date: Fri, 23 Oct 2015 11:52:22 +0100 Subject: [PATCH 8/9] Only flush keys on the current Redis database --- ckanext/harvest/commands/harvester.py | 4 ++-- ckanext/harvest/queue.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 03a6bb8..c0592e4 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -45,8 +45,8 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue - WARNING: if using Redis, this command purges any other data you have - in Redis too! + WARNING: if using Redis, this command purges all data in the current + Redis database harvester [-j] [-o] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, for a certain diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index e1a1180..76b0a35 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -87,8 +87,8 @@ def purge_queues(): log.info('AMQP queue purged: %s', get_fetch_queue_name()) return if backend == 'redis': - connection.flushall() - log.info('Redis flushed') + connection.flushdb() + log.info('Redis database flushed') def resubmit_jobs(): if config.get('ckan.harvest.mq.type') != 'redis': @@ -180,7 +180,7 @@ class RedisConsumer(object): def basic_ack(self, message): self.redis.delete(self.persistance_key(message)) def queue_purge(self, queue): - self.redis.flushall() + self.redis.flushdb() def basic_get(self, queue): body = self.redis.lpop(self.routing_key) return (FakeMethod(body), self, body) From 992cdc57d20ae18181e5698407f27a9e729add86 Mon Sep 17 00:00:00 2001 From: amercader Date: Fri, 23 Oct 2015 11:56:12 +0100 Subject: [PATCH 9/9] Test 2.4 and latest branches for all releases --- .travis.yml | 1 + bin/travis-build.bash | 10 +++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index cabad8b..110f770 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,7 @@ env: - CKANVERSION=master - CKANVERSION=2.2 - CKANVERSION=2.3 + - CKANVERSION=2.4 services: - redis-server install: diff --git a/bin/travis-build.bash b/bin/travis-build.bash index 8ef11de..aae36f3 100644 --- a/bin/travis-build.bash +++ b/bin/travis-build.bash @@ -10,12 +10,16 @@ sudo apt-get install postgresql-9.1 solr-jetty libcommons-fileupload-java:amd64= echo "Installing CKAN and its Python dependencies..." git clone https://github.com/ckan/ckan cd ckan -if [ $CKANVERSION == '2.3' ] +if [ $CKANVERSION == '2.4' ] then - git checkout release-v2.3 + git checkout release-v2.4-latest +elif [ $CKANVERSION == '2.3' ] +then + git checkout release-v2.3-latest + elif [ $CKANVERSION == '2.2' ] then - git checkout release-v2.2.3 + git checkout release-v2.2-latest fi python setup.py develop pip install -r requirements.txt --allow-all-external