diff --git a/.gitignore b/.gitignore index b678f55..62ff3fd 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,6 @@ build .DS_Store dist development.ini -*.swp +*.sw? *~ node_modules diff --git a/.travis.yml b/.travis.yml index cabad8b..110f770 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,7 @@ env: - CKANVERSION=master - CKANVERSION=2.2 - CKANVERSION=2.3 + - CKANVERSION=2.4 services: - redis-server install: diff --git a/README.rst b/README.rst index 6226de4..3754d01 100644 --- a/README.rst +++ b/README.rst @@ -29,7 +29,7 @@ Installation On your CKAN configuration file, add:: - ckan.harvest.mq.type = rabbitmq + ckan.harvest.mq.type = amqp 2. Install the extension into your python environment:: @@ -149,12 +149,12 @@ Authorization ============= Starting from CKAN 2.0, harvest sources behave exactly the same as datasets -(they are actually internally implemented as a dataset type). That means that +(they are actually internally implemented as a dataset type). That means they can be searched and faceted, and that the same authorization rules can be applied to them. The default authorization settings are based on organizations (equivalent to the `publisher profile` found in old versions). -Have a look at the `Authorization `_ +Have a look at the `Authorization `_ documentation on CKAN core to see how to configure your instance depending on your needs. @@ -447,7 +447,7 @@ The ``run`` command not only starts any pending harvesting jobs, but also flags those that are finished, allowing new jobs to be created on that particular source and refreshing the source statistics. That means that you will need to run this command before being able to create a new job on a source that was being -harvested (On a production site you will tipically have a cron job that runs the +harvested (On a production site you will typically have a cron job that runs the command regularly, see next section). @@ -616,4 +616,3 @@ http://www.fsf.org/licensing/licenses/agpl-3.0.html .. _Supervisor: http://supervisord.org - diff --git a/bin/travis-build.bash b/bin/travis-build.bash index 8ef11de..aae36f3 100644 --- a/bin/travis-build.bash +++ b/bin/travis-build.bash @@ -10,12 +10,16 @@ sudo apt-get install postgresql-9.1 solr-jetty libcommons-fileupload-java:amd64= echo "Installing CKAN and its Python dependencies..." git clone https://github.com/ckan/ckan cd ckan -if [ $CKANVERSION == '2.3' ] +if [ $CKANVERSION == '2.4' ] then - git checkout release-v2.3 + git checkout release-v2.4-latest +elif [ $CKANVERSION == '2.3' ] +then + git checkout release-v2.3-latest + elif [ $CKANVERSION == '2.2' ] then - git checkout release-v2.2.3 + git checkout release-v2.2-latest fi python setup.py develop pip install -r requirements.txt --allow-all-external diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 7158c8c..c0592e4 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -45,6 +45,8 @@ class Harvester(CkanCommand): harvester purge_queues - removes all jobs from fetch and gather queue + WARNING: if using Redis, this command purges all data in the current + Redis database harvester [-j] [-o] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, for a certain diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 41a5061..1fd0ebc 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -8,7 +8,7 @@ from pylons import config from ckan import plugins as p from ckan import model -from ckan.model import Session, Package +from ckan.model import Session, Package, PACKAGE_NAME_MAX_LENGTH from ckan.logic import ValidationError, NotFound, get_action from ckan.logic.schema import default_create_package_schema @@ -41,21 +41,100 @@ class HarvesterBase(SingletonPlugin): _user_name = None - def _gen_new_name(self, title): + @classmethod + def _gen_new_name(cls, title, existing_name=None, + append_type='number-sequence'): ''' - Creates a URL friendly name from a title + Returns a 'name' for the dataset (URL friendly), based on the title. - If the name already exists, it will add some random characters at the end + If the ideal name is already used, it will append a number to it to + ensure it is unique. + + If generating a new name because the title of the dataset has changed, + specify the existing name, in case the name doesn't need to change + after all. + + :param existing_name: the current name of the dataset - only specify + this if the dataset exists + :type existing_name: string + :param append_type: the type of characters to add to make it unique - + either 'number-sequence' or 'random-hex'. + :type append_type: string ''' - name = munge_title_to_name(title).replace('_', '-') - while '--' in name: - name = name.replace('--', '-') - pkg_obj = Session.query(Package).filter(Package.name == name).first() - if pkg_obj: - return name + str(uuid.uuid4())[:5] + ideal_name = munge_title_to_name(title) + ideal_name = re.sub('-+', '-', ideal_name) # collapse multiple dashes + return cls._ensure_name_is_unique(ideal_name, + existing_name=existing_name, + append_type=append_type) + + @staticmethod + def _ensure_name_is_unique(ideal_name, existing_name=None, + append_type='number-sequence'): + ''' + Returns a dataset name based on the ideal_name, only it will be + guaranteed to be different than all the other datasets, by adding a + number on the end if necessary. + + If generating a new name because the title of the dataset has changed, + specify the existing name, in case the name doesn't need to change + after all. + + The maximum dataset name length is taken account of. + + :param ideal_name: the desired name for the dataset, if its not already + been taken (usually derived by munging the dataset + title) + :type ideal_name: string + :param existing_name: the current name of the dataset - only specify + this if the dataset exists + :type existing_name: string + :param append_type: the type of characters to add to make it unique - + either 'number-sequence' or 'random-hex'. + :type append_type: string + ''' + ideal_name = ideal_name[:PACKAGE_NAME_MAX_LENGTH] + if existing_name == ideal_name: + return ideal_name + if append_type == 'number-sequence': + MAX_NUMBER_APPENDED = 999 + APPEND_MAX_CHARS = len(str(MAX_NUMBER_APPENDED)) + elif append_type == 'random-hex': + APPEND_MAX_CHARS = 5 # 16^5 = 1 million combinations else: - return name + raise NotImplementedError('append_type cannot be %s' % append_type) + # Find out which package names have been taken. Restrict it to names + # derived from the ideal name plus and numbers added + like_q = u'%s%%' % \ + ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS] + name_results = Session.query(Package.name)\ + .filter(Package.name.ilike(like_q))\ + .all() + taken = set([name_result[0] for name_result in name_results]) + if existing_name and existing_name in taken: + taken.remove(existing_name) + if ideal_name not in taken: + # great, the ideal name is available + return ideal_name + elif existing_name and existing_name.startswith(ideal_name): + # the ideal name is not available, but its an existing dataset with + # a name based on the ideal one, so there's no point changing it to + # a different number + return existing_name + elif append_type == 'number-sequence': + # find the next available number + counter = 1 + while counter <= MAX_NUMBER_APPENDED: + candidate_name = \ + ideal_name[:PACKAGE_NAME_MAX_LENGTH-len(str(counter))] + \ + str(counter) + if candidate_name not in taken: + return candidate_name + counter = counter + 1 + return None + elif append_type == 'random-hex': + return ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS] + \ + str(uuid.uuid4())[:APPEND_MAX_CHARS] def _save_gather_error(self, message, job): diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 92b1af2..077e892 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -198,12 +198,21 @@ def define_harvester_tables(): Column('gather_finished', types.DateTime), Column('finished', types.DateTime), Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')), + # status: New, Running, Finished Column('status', types.UnicodeText, default=u'New', nullable=False), ) - # Was harvested_document + # A harvest_object contains a representation of one dataset during a + # particular harvest harvest_object_table = Table('harvest_object', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), + # The guid is the 'identity' of the dataset, according to the source. + # So if you reharvest it, then the harvester knows which dataset to + # update because of this identity. The identity needs to be unique + # within this CKAN. Column('guid', types.UnicodeText, default=u''), + # When you harvest a dataset multiple times, only the latest + # successfully imported harvest_object should be flagged 'current'. + # The import_stage reads and writes it. Column('current',types.Boolean,default=False), Column('gathered', types.DateTime, default=datetime.datetime.utcnow), Column('fetch_started', types.DateTime), @@ -211,6 +220,7 @@ def define_harvester_tables(): Column('fetch_finished', types.DateTime), Column('import_started', types.DateTime), Column('import_finished', types.DateTime), + # state: WAITING, FETCH, IMPORT, COMPLETE, ERROR Column('state', types.UnicodeText, default=u'WAITING'), Column('metadata_modified_date', types.DateTime), Column('retry_times',types.Integer, default=0), @@ -393,9 +403,11 @@ ALTER TABLE harvest_object_extra ALTER TABLE harvest_object_extra ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id); -UPDATE harvest_object set state = 'COMPLETE'; +UPDATE harvest_object set state = 'COMPLETE' where package_id is not null; +UPDATE harvest_object set state = 'ERROR' where package_id is null; UPDATE harvest_object set retry_times = 0; -UPDATE harvest_object set report_status = 'new'; +UPDATE harvest_object set report_status = 'updated' where package_id is not null; +UPDATE harvest_object set report_status = 'errored' where package_id is null; UPDATE harvest_source set frequency = 'MANUAL'; ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey; diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 8296efb..cde4eb6 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -82,10 +82,13 @@ def purge_queues(): if backend in ('amqp', 'ampq'): channel = connection.channel() channel.queue_purge(queue=get_gather_queue_name()) + log.info('AMQP queue purged: %s', get_gather_queue_name()) channel.queue_purge(queue=get_fetch_queue_name()) + log.info('AMQP queue purged: %s', get_fetch_queue_name()) return if backend == 'redis': - connection.flushall() + connection.flushdb() + log.info('Redis database flushed') def resubmit_jobs(): if config.get('ckan.harvest.mq.type') != 'redis': @@ -95,7 +98,7 @@ def resubmit_jobs(): for key in harvest_object_pending: date_of_key = datetime.datetime.strptime(redis.get(key), "%Y-%m-%d %H:%M:%S.%f") - if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max + if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max redis.rpush('harvest_object_id', json.dumps({'harvest_object_id': key.split(':')[-1]}) ) @@ -177,7 +180,7 @@ class RedisConsumer(object): def basic_ack(self, message): self.redis.delete(self.persistance_key(message)) def queue_purge(self, queue): - self.redis.flushall() + self.redis.flushdb() def basic_get(self, queue): body = self.redis.lpop(self.routing_key) return (FakeMethod(body), self, body) @@ -250,7 +253,12 @@ def gather_callback(channel, method, header, body): log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids))) if not harvester_found: - msg = 'No harvester could be found for source type %s' % job.source.type + # This can occur if you: + # * remove a harvester and it still has sources that are then + # refreshed + # * add a new harvester and restart CKAN but not the gather + # queue. + msg = 'System error - No harvester could be found for source type %s' % job.source.type err = HarvestGatherError(message=msg,job=job) err.save() log.error(msg) diff --git a/ckanext/harvest/tests/harvesters/test_base.py b/ckanext/harvest/tests/harvesters/test_base.py new file mode 100644 index 0000000..6e1da21 --- /dev/null +++ b/ckanext/harvest/tests/harvesters/test_base.py @@ -0,0 +1,96 @@ +import re + +from nose.tools import assert_equal + +from ckanext.harvest.harvesters.base import HarvesterBase +try: + from ckan.tests import helpers + from ckan.tests import factories +except ImportError: + from ckan.new_tests import helpers + from ckan.new_tests import factories + + +_ensure_name_is_unique = HarvesterBase._ensure_name_is_unique + + +class TestGenNewName(object): + @classmethod + def setup_class(cls): + helpers.reset_db() + + def test_basic(self): + assert_equal(HarvesterBase._gen_new_name('Trees'), 'trees') + + def test_munge(self): + assert_equal( + HarvesterBase._gen_new_name('Trees and branches - survey.'), + 'trees-and-branches-survey') + + +class TestEnsureNameIsUnique(object): + def setup(self): + helpers.reset_db() + + def test_no_existing_datasets(self): + factories.Dataset(name='unrelated') + assert_equal(_ensure_name_is_unique('trees'), 'trees') + + def test_existing_dataset(self): + factories.Dataset(name='trees') + assert_equal(_ensure_name_is_unique('trees'), 'trees1') + + def test_two_existing_datasets(self): + factories.Dataset(name='trees') + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('trees'), 'trees2') + + def test_no_existing_datasets_and_long_name(self): + assert_equal(_ensure_name_is_unique('x'*101), 'x'*100) + + def test_existing_dataset_and_long_name(self): + # because PACKAGE_NAME_MAX_LENGTH = 100 + factories.Dataset(name='x'*100) + assert_equal(_ensure_name_is_unique('x'*101), 'x'*99 + '1') + + def test_update_dataset_with_new_name(self): + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('tree', existing_name='trees1'), + 'tree') + + def test_update_dataset_but_with_same_name(self): + # this can happen if you remove a trailing space from the title - the + # harvester sees the title changed and thinks it should have a new + # name, but clearly it can reuse its existing one + factories.Dataset(name='trees') + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('trees', existing_name='trees'), + 'trees') + + def test_update_dataset_to_available_shorter_name(self): + # this can be handy when if reharvesting, you got duplicates and + # managed to purge one set and through a minor title change you can now + # lose the appended number. users don't like unnecessary numbers. + factories.Dataset(name='trees1') + assert_equal(_ensure_name_is_unique('trees', existing_name='trees1'), + 'trees') + + def test_update_dataset_but_doesnt_change_to_other_number(self): + # there's no point changing one number for another though + factories.Dataset(name='trees') + factories.Dataset(name='trees2') + assert_equal(_ensure_name_is_unique('trees', existing_name='trees2'), + 'trees2') + + def test_update_dataset_with_new_name_with_numbers(self): + factories.Dataset(name='trees') + factories.Dataset(name='trees2') + factories.Dataset(name='frogs') + assert_equal(_ensure_name_is_unique('frogs', existing_name='trees2'), + 'frogs1') + + def test_existing_dataset_appending_hex(self): + factories.Dataset(name='trees') + name = _ensure_name_is_unique('trees', append_type='random-hex') + # e.g. 'trees0b53f' + assert re.match('trees[\da-f]{5}', name)