Merge branch 'master' into include-exclude-org

This commit is contained in:
David Read 2015-10-23 14:36:53 +01:00
commit bc49149d5e
9 changed files with 228 additions and 27 deletions

2
.gitignore vendored
View File

@ -8,6 +8,6 @@ build
.DS_Store .DS_Store
dist dist
development.ini development.ini
*.swp *.sw?
*~ *~
node_modules node_modules

View File

@ -5,6 +5,7 @@ env:
- CKANVERSION=master - CKANVERSION=master
- CKANVERSION=2.2 - CKANVERSION=2.2
- CKANVERSION=2.3 - CKANVERSION=2.3
- CKANVERSION=2.4
services: services:
- redis-server - redis-server
install: install:

View File

@ -29,7 +29,7 @@ Installation
On your CKAN configuration file, add:: On your CKAN configuration file, add::
ckan.harvest.mq.type = rabbitmq ckan.harvest.mq.type = amqp
2. Install the extension into your python environment:: 2. Install the extension into your python environment::
@ -149,12 +149,12 @@ Authorization
============= =============
Starting from CKAN 2.0, harvest sources behave exactly the same as datasets Starting from CKAN 2.0, harvest sources behave exactly the same as datasets
(they are actually internally implemented as a dataset type). That means that (they are actually internally implemented as a dataset type). That means they
can be searched and faceted, and that the same authorization rules can be can be searched and faceted, and that the same authorization rules can be
applied to them. The default authorization settings are based on organizations applied to them. The default authorization settings are based on organizations
(equivalent to the `publisher profile` found in old versions). (equivalent to the `publisher profile` found in old versions).
Have a look at the `Authorization <http://docs.ckan.org/en/latest/authorization.html>`_ Have a look at the `Authorization <http://docs.ckan.org/en/latest/authorization.html>`_
documentation on CKAN core to see how to configure your instance depending on documentation on CKAN core to see how to configure your instance depending on
your needs. your needs.
@ -447,7 +447,7 @@ The ``run`` command not only starts any pending harvesting jobs, but also
flags those that are finished, allowing new jobs to be created on that particular flags those that are finished, allowing new jobs to be created on that particular
source and refreshing the source statistics. That means that you will need to run source and refreshing the source statistics. That means that you will need to run
this command before being able to create a new job on a source that was being this command before being able to create a new job on a source that was being
harvested (On a production site you will tipically have a cron job that runs the harvested (On a production site you will typically have a cron job that runs the
command regularly, see next section). command regularly, see next section).
@ -616,4 +616,3 @@ http://www.fsf.org/licensing/licenses/agpl-3.0.html
.. _Supervisor: http://supervisord.org .. _Supervisor: http://supervisord.org

View File

@ -10,12 +10,16 @@ sudo apt-get install postgresql-9.1 solr-jetty libcommons-fileupload-java:amd64=
echo "Installing CKAN and its Python dependencies..." echo "Installing CKAN and its Python dependencies..."
git clone https://github.com/ckan/ckan git clone https://github.com/ckan/ckan
cd ckan cd ckan
if [ $CKANVERSION == '2.3' ] if [ $CKANVERSION == '2.4' ]
then then
git checkout release-v2.3 git checkout release-v2.4-latest
elif [ $CKANVERSION == '2.3' ]
then
git checkout release-v2.3-latest
elif [ $CKANVERSION == '2.2' ] elif [ $CKANVERSION == '2.2' ]
then then
git checkout release-v2.2.3 git checkout release-v2.2-latest
fi fi
python setup.py develop python setup.py develop
pip install -r requirements.txt --allow-all-external pip install -r requirements.txt --allow-all-external

View File

@ -45,6 +45,8 @@ class Harvester(CkanCommand):
harvester purge_queues harvester purge_queues
- removes all jobs from fetch and gather queue - removes all jobs from fetch and gather queue
WARNING: if using Redis, this command purges all data in the current
Redis database
harvester [-j] [-o] [--segments={segments}] import [{source-id}] harvester [-j] [-o] [--segments={segments}] import [{source-id}]
- perform the import stage with the last fetched objects, for a certain - perform the import stage with the last fetched objects, for a certain

View File

@ -8,7 +8,7 @@ from pylons import config
from ckan import plugins as p from ckan import plugins as p
from ckan import model from ckan import model
from ckan.model import Session, Package from ckan.model import Session, Package, PACKAGE_NAME_MAX_LENGTH
from ckan.logic import ValidationError, NotFound, get_action from ckan.logic import ValidationError, NotFound, get_action
from ckan.logic.schema import default_create_package_schema from ckan.logic.schema import default_create_package_schema
@ -41,21 +41,100 @@ class HarvesterBase(SingletonPlugin):
_user_name = None _user_name = None
def _gen_new_name(self, title): @classmethod
def _gen_new_name(cls, title, existing_name=None,
append_type='number-sequence'):
''' '''
Creates a URL friendly name from a title Returns a 'name' for the dataset (URL friendly), based on the title.
If the name already exists, it will add some random characters at the end If the ideal name is already used, it will append a number to it to
ensure it is unique.
If generating a new name because the title of the dataset has changed,
specify the existing name, in case the name doesn't need to change
after all.
:param existing_name: the current name of the dataset - only specify
this if the dataset exists
:type existing_name: string
:param append_type: the type of characters to add to make it unique -
either 'number-sequence' or 'random-hex'.
:type append_type: string
''' '''
name = munge_title_to_name(title).replace('_', '-') ideal_name = munge_title_to_name(title)
while '--' in name: ideal_name = re.sub('-+', '-', ideal_name) # collapse multiple dashes
name = name.replace('--', '-') return cls._ensure_name_is_unique(ideal_name,
pkg_obj = Session.query(Package).filter(Package.name == name).first() existing_name=existing_name,
if pkg_obj: append_type=append_type)
return name + str(uuid.uuid4())[:5]
@staticmethod
def _ensure_name_is_unique(ideal_name, existing_name=None,
append_type='number-sequence'):
'''
Returns a dataset name based on the ideal_name, only it will be
guaranteed to be different than all the other datasets, by adding a
number on the end if necessary.
If generating a new name because the title of the dataset has changed,
specify the existing name, in case the name doesn't need to change
after all.
The maximum dataset name length is taken account of.
:param ideal_name: the desired name for the dataset, if its not already
been taken (usually derived by munging the dataset
title)
:type ideal_name: string
:param existing_name: the current name of the dataset - only specify
this if the dataset exists
:type existing_name: string
:param append_type: the type of characters to add to make it unique -
either 'number-sequence' or 'random-hex'.
:type append_type: string
'''
ideal_name = ideal_name[:PACKAGE_NAME_MAX_LENGTH]
if existing_name == ideal_name:
return ideal_name
if append_type == 'number-sequence':
MAX_NUMBER_APPENDED = 999
APPEND_MAX_CHARS = len(str(MAX_NUMBER_APPENDED))
elif append_type == 'random-hex':
APPEND_MAX_CHARS = 5 # 16^5 = 1 million combinations
else: else:
return name raise NotImplementedError('append_type cannot be %s' % append_type)
# Find out which package names have been taken. Restrict it to names
# derived from the ideal name plus and numbers added
like_q = u'%s%%' % \
ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS]
name_results = Session.query(Package.name)\
.filter(Package.name.ilike(like_q))\
.all()
taken = set([name_result[0] for name_result in name_results])
if existing_name and existing_name in taken:
taken.remove(existing_name)
if ideal_name not in taken:
# great, the ideal name is available
return ideal_name
elif existing_name and existing_name.startswith(ideal_name):
# the ideal name is not available, but its an existing dataset with
# a name based on the ideal one, so there's no point changing it to
# a different number
return existing_name
elif append_type == 'number-sequence':
# find the next available number
counter = 1
while counter <= MAX_NUMBER_APPENDED:
candidate_name = \
ideal_name[:PACKAGE_NAME_MAX_LENGTH-len(str(counter))] + \
str(counter)
if candidate_name not in taken:
return candidate_name
counter = counter + 1
return None
elif append_type == 'random-hex':
return ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS] + \
str(uuid.uuid4())[:APPEND_MAX_CHARS]
def _save_gather_error(self, message, job): def _save_gather_error(self, message, job):

View File

@ -198,12 +198,21 @@ def define_harvester_tables():
Column('gather_finished', types.DateTime), Column('gather_finished', types.DateTime),
Column('finished', types.DateTime), Column('finished', types.DateTime),
Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')), Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
# status: New, Running, Finished
Column('status', types.UnicodeText, default=u'New', nullable=False), Column('status', types.UnicodeText, default=u'New', nullable=False),
) )
# Was harvested_document # A harvest_object contains a representation of one dataset during a
# particular harvest
harvest_object_table = Table('harvest_object', metadata, harvest_object_table = Table('harvest_object', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid), Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
# The guid is the 'identity' of the dataset, according to the source.
# So if you reharvest it, then the harvester knows which dataset to
# update because of this identity. The identity needs to be unique
# within this CKAN.
Column('guid', types.UnicodeText, default=u''), Column('guid', types.UnicodeText, default=u''),
# When you harvest a dataset multiple times, only the latest
# successfully imported harvest_object should be flagged 'current'.
# The import_stage reads and writes it.
Column('current',types.Boolean,default=False), Column('current',types.Boolean,default=False),
Column('gathered', types.DateTime, default=datetime.datetime.utcnow), Column('gathered', types.DateTime, default=datetime.datetime.utcnow),
Column('fetch_started', types.DateTime), Column('fetch_started', types.DateTime),
@ -211,6 +220,7 @@ def define_harvester_tables():
Column('fetch_finished', types.DateTime), Column('fetch_finished', types.DateTime),
Column('import_started', types.DateTime), Column('import_started', types.DateTime),
Column('import_finished', types.DateTime), Column('import_finished', types.DateTime),
# state: WAITING, FETCH, IMPORT, COMPLETE, ERROR
Column('state', types.UnicodeText, default=u'WAITING'), Column('state', types.UnicodeText, default=u'WAITING'),
Column('metadata_modified_date', types.DateTime), Column('metadata_modified_date', types.DateTime),
Column('retry_times',types.Integer, default=0), Column('retry_times',types.Integer, default=0),
@ -393,9 +403,11 @@ ALTER TABLE harvest_object_extra
ALTER TABLE harvest_object_extra ALTER TABLE harvest_object_extra
ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id); ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id);
UPDATE harvest_object set state = 'COMPLETE'; UPDATE harvest_object set state = 'COMPLETE' where package_id is not null;
UPDATE harvest_object set state = 'ERROR' where package_id is null;
UPDATE harvest_object set retry_times = 0; UPDATE harvest_object set retry_times = 0;
UPDATE harvest_object set report_status = 'new'; UPDATE harvest_object set report_status = 'updated' where package_id is not null;
UPDATE harvest_object set report_status = 'errored' where package_id is null;
UPDATE harvest_source set frequency = 'MANUAL'; UPDATE harvest_source set frequency = 'MANUAL';
ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey; ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey;

View File

@ -82,10 +82,13 @@ def purge_queues():
if backend in ('amqp', 'ampq'): if backend in ('amqp', 'ampq'):
channel = connection.channel() channel = connection.channel()
channel.queue_purge(queue=get_gather_queue_name()) channel.queue_purge(queue=get_gather_queue_name())
log.info('AMQP queue purged: %s', get_gather_queue_name())
channel.queue_purge(queue=get_fetch_queue_name()) channel.queue_purge(queue=get_fetch_queue_name())
log.info('AMQP queue purged: %s', get_fetch_queue_name())
return return
if backend == 'redis': if backend == 'redis':
connection.flushall() connection.flushdb()
log.info('Redis database flushed')
def resubmit_jobs(): def resubmit_jobs():
if config.get('ckan.harvest.mq.type') != 'redis': if config.get('ckan.harvest.mq.type') != 'redis':
@ -95,7 +98,7 @@ def resubmit_jobs():
for key in harvest_object_pending: for key in harvest_object_pending:
date_of_key = datetime.datetime.strptime(redis.get(key), date_of_key = datetime.datetime.strptime(redis.get(key),
"%Y-%m-%d %H:%M:%S.%f") "%Y-%m-%d %H:%M:%S.%f")
if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max
redis.rpush('harvest_object_id', redis.rpush('harvest_object_id',
json.dumps({'harvest_object_id': key.split(':')[-1]}) json.dumps({'harvest_object_id': key.split(':')[-1]})
) )
@ -177,7 +180,7 @@ class RedisConsumer(object):
def basic_ack(self, message): def basic_ack(self, message):
self.redis.delete(self.persistance_key(message)) self.redis.delete(self.persistance_key(message))
def queue_purge(self, queue): def queue_purge(self, queue):
self.redis.flushall() self.redis.flushdb()
def basic_get(self, queue): def basic_get(self, queue):
body = self.redis.lpop(self.routing_key) body = self.redis.lpop(self.routing_key)
return (FakeMethod(body), self, body) return (FakeMethod(body), self, body)
@ -250,7 +253,12 @@ def gather_callback(channel, method, header, body):
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids))) log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))
if not harvester_found: if not harvester_found:
msg = 'No harvester could be found for source type %s' % job.source.type # This can occur if you:
# * remove a harvester and it still has sources that are then
# refreshed
# * add a new harvester and restart CKAN but not the gather
# queue.
msg = 'System error - No harvester could be found for source type %s' % job.source.type
err = HarvestGatherError(message=msg,job=job) err = HarvestGatherError(message=msg,job=job)
err.save() err.save()
log.error(msg) log.error(msg)

View File

@ -0,0 +1,96 @@
import re
from nose.tools import assert_equal
from ckanext.harvest.harvesters.base import HarvesterBase
try:
from ckan.tests import helpers
from ckan.tests import factories
except ImportError:
from ckan.new_tests import helpers
from ckan.new_tests import factories
_ensure_name_is_unique = HarvesterBase._ensure_name_is_unique
class TestGenNewName(object):
@classmethod
def setup_class(cls):
helpers.reset_db()
def test_basic(self):
assert_equal(HarvesterBase._gen_new_name('Trees'), 'trees')
def test_munge(self):
assert_equal(
HarvesterBase._gen_new_name('Trees and branches - survey.'),
'trees-and-branches-survey')
class TestEnsureNameIsUnique(object):
def setup(self):
helpers.reset_db()
def test_no_existing_datasets(self):
factories.Dataset(name='unrelated')
assert_equal(_ensure_name_is_unique('trees'), 'trees')
def test_existing_dataset(self):
factories.Dataset(name='trees')
assert_equal(_ensure_name_is_unique('trees'), 'trees1')
def test_two_existing_datasets(self):
factories.Dataset(name='trees')
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('trees'), 'trees2')
def test_no_existing_datasets_and_long_name(self):
assert_equal(_ensure_name_is_unique('x'*101), 'x'*100)
def test_existing_dataset_and_long_name(self):
# because PACKAGE_NAME_MAX_LENGTH = 100
factories.Dataset(name='x'*100)
assert_equal(_ensure_name_is_unique('x'*101), 'x'*99 + '1')
def test_update_dataset_with_new_name(self):
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('tree', existing_name='trees1'),
'tree')
def test_update_dataset_but_with_same_name(self):
# this can happen if you remove a trailing space from the title - the
# harvester sees the title changed and thinks it should have a new
# name, but clearly it can reuse its existing one
factories.Dataset(name='trees')
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('trees', existing_name='trees'),
'trees')
def test_update_dataset_to_available_shorter_name(self):
# this can be handy when if reharvesting, you got duplicates and
# managed to purge one set and through a minor title change you can now
# lose the appended number. users don't like unnecessary numbers.
factories.Dataset(name='trees1')
assert_equal(_ensure_name_is_unique('trees', existing_name='trees1'),
'trees')
def test_update_dataset_but_doesnt_change_to_other_number(self):
# there's no point changing one number for another though
factories.Dataset(name='trees')
factories.Dataset(name='trees2')
assert_equal(_ensure_name_is_unique('trees', existing_name='trees2'),
'trees2')
def test_update_dataset_with_new_name_with_numbers(self):
factories.Dataset(name='trees')
factories.Dataset(name='trees2')
factories.Dataset(name='frogs')
assert_equal(_ensure_name_is_unique('frogs', existing_name='trees2'),
'frogs1')
def test_existing_dataset_appending_hex(self):
factories.Dataset(name='trees')
name = _ensure_name_is_unique('trees', append_type='random-hex')
# e.g. 'trees0b53f'
assert re.match('trees[\da-f]{5}', name)