Merge branch 'master' of github.com:ckan/ckanext-harvest into 214-remove-genshi

This commit is contained in:
David Read 2015-12-10 16:02:50 +00:00
commit 260cd1f2b7
25 changed files with 328 additions and 209 deletions

View File

@ -341,11 +341,10 @@ following methods::
'''
implements(IHarvester)
def info(self):
'''
Harvesting implementations must provide this method, which will return a
dictionary containing different descriptors of the harvester. The
Harvesting implementations must provide this method, which will return
a dictionary containing different descriptors of the harvester. The
returned dictionary should contain:
* name: machine-readable name. This will be the value stored in the
@ -353,8 +352,8 @@ following methods::
harvester.
* title: human-readable name. This will appear in the form's select box
in the WUI.
* description: a small description of what the harvester does. This will
appear on the form as a guidance to the user.
* description: a small description of what the harvester does. This
will appear on the form as a guidance to the user.
A complete example may be::
@ -373,9 +372,10 @@ following methods::
[optional]
Harvesters can provide this method to validate the configuration entered in the
form. It should return a single string, which will be stored in the database.
Exceptions raised will be shown in the form's error messages.
Harvesters can provide this method to validate the configuration
entered in the form. It should return a single string, which will be
stored in the database. Exceptions raised will be shown in the form's
error messages.
:param harvest_object_id: Config string coming from the form
:returns: A string with the validated configuration options
@ -406,7 +406,7 @@ following methods::
def gather_stage(self, harvest_job):
'''
The gather stage will recieve a HarvestJob object and will be
The gather stage will receive a HarvestJob object and will be
responsible for:
- gathering all the necessary objects to fetch on a later.
stage (e.g. for a CSW server, perform a GetRecords request)
@ -418,6 +418,8 @@ following methods::
- creating and storing any suitable HarvestGatherErrors that may
occur.
- returning a list with all the ids of the created HarvestObjects.
- to abort the harvest, create a HarvestGatherError and raise an
exception. Any created HarvestObjects will be deleted.
:param harvest_job: HarvestJob object
:returns: A list of HarvestObject ids
@ -432,10 +434,14 @@ following methods::
- saving the content in the provided HarvestObject.
- creating and storing any suitable HarvestObjectErrors that may
occur.
- returning True if everything went as expected, False otherwise.
- returning True if everything is ok (ie the object should now be
imported), "unchanged" if the object didn't need harvesting after
all (ie no error, but don't continue to import stage) or False if
there were errors.
:param harvest_object: HarvestObject object
:returns: True if everything went right, False if errors were found
:returns: True if successful, 'unchanged' if nothing to import after
all, False if not successful
'''
def import_stage(self, harvest_object):
@ -454,12 +460,15 @@ following methods::
objects of this harvest source if the action was successful.
- creating and storing any suitable HarvestObjectErrors that may
occur.
- returning True if the action was done, "unchanged" if nothing
was needed doing after all or False if there were errors.
- creating the HarvestObject - Package relation (if necessary)
- returning True if the action was done, "unchanged" if the object
didn't need harvesting after all or False if there were errors.
NB You can run this stage repeatedly using 'paster harvest import'.
:param harvest_object: HarvestObject object
:returns: True if the action was done, "unchanged" if nothing was
needed doing after all and False if something went wrong.
:returns: True if the action was done, "unchanged" if the object didn't
need harvesting after all or False if there were errors.
'''

View File

@ -38,7 +38,7 @@ class Harvester(CkanCommand):
harvester jobs
- lists harvest jobs
harvester job_abort {source-id/name}
harvester job_abort {source-id/source-name/obj-id}
- marks a job as "Aborted" so that the source can be restarted afresh.
It ensures that the job's harvest objects status are also marked
finished. You should ensure that neither the job nor its objects are
@ -67,7 +67,7 @@ class Harvester(CkanCommand):
WARNING: if using Redis, this command purges all data in the current
Redis database
harvester [-j] [-o] [--segments={segments}] import [{source-id}]
harvester [-j] [-o|-g|-p {id/guid}] [--segments={segments}] import [{source-id}]
- perform the import stage with the last fetched objects, for a certain
source or a single harvest object. Please note that no objects will
be fetched from the remote server. It will only affect the objects
@ -75,6 +75,7 @@ class Harvester(CkanCommand):
To import a particular harvest source, specify its id as an argument.
To import a particular harvest object use the -o option.
To import a particular guid use the -g option.
To import a particular package use the -p option.
You will need to specify the -j flag in cases where the datasets are
@ -111,10 +112,13 @@ class Harvester(CkanCommand):
action='store_true', default=False, help='Do not join harvest objects to existing datasets')
self.parser.add_option('-o', '--harvest-object-id', dest='harvest_object_id',
default=False, help='Id of the harvest object to which perfom the import stage')
default=False, help='Id of the harvest object to which perform the import stage')
self.parser.add_option('-p', '--package-id', dest='package_id',
default=False, help='Id of the package whose harvest object to perfom the import stage for')
default=False, help='Id of the package whose harvest object to perform the import stage for')
self.parser.add_option('-g', '--guid', dest='guid',
default=False, help='Guid of the harvest object to which perform the import stage for')
self.parser.add_option('--segments', dest='segments',
default=False, help=
@ -358,19 +362,15 @@ class Harvester(CkanCommand):
def job_abort(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
job_or_source_id_or_name = unicode(self.args[1])
else:
print 'Please provide a source id'
print 'Please provide a job id or source name/id'
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
context = {'model': model, 'user': self.admin_user['name'],
'session': model.Session}
job = get_action('harvest_job_abort')(context,
{'source_id': source['id']})
job = get_action('harvest_job_abort')(
context, {'id': job_or_source_id_or_name})
print 'Job status: {0}'.format(job['status'])
def run_harvester(self):
@ -445,6 +445,7 @@ class Harvester(CkanCommand):
'source_id': source_id,
'harvest_object_id': self.options.harvest_object_id,
'package_id': self.options.package_id,
'guid': self.options.guid,
})
print '%s objects reimported' % objs_count

View File

@ -17,6 +17,7 @@ import ckan.lib.helpers as h, json
from ckan.lib.base import BaseController, c, \
request, response, render, abort, redirect
from ckanext.harvest.logic import HarvestJobExists, HarvestSourceInactiveError
from ckanext.harvest.plugin import DATASET_TYPE_NAME
import logging
@ -62,13 +63,14 @@ class ViewController(BaseController):
abort(404,_('Harvest source not found'))
except p.toolkit.NotAuthorized:
abort(401,self.not_auth_message)
except HarvestSourceInactiveError, e:
h.flash_error(_('Cannot create new harvest jobs on inactive '
'sources. First, please change the source status '
'to \'active\'.'))
except HarvestJobExists, e:
h.flash_notice(_('A harvest job has already been scheduled for '
'this source'))
except Exception, e:
if 'Can not create jobs on inactive sources' in str(e):
h.flash_error(_('Cannot create new harvest jobs on inactive sources.'
+ ' First, please change the source status to \'active\'.'))
elif 'There already is an unrun job for this source' in str(e):
h.flash_notice(_('A harvest job has already been scheduled for this source'))
else:
msg = 'An error occurred: [%s]' % str(e)
h.flash_error(msg)

View File

@ -2,8 +2,7 @@ import logging
import re
import uuid
from sqlalchemy.sql import update,and_, bindparam
from sqlalchemy.exc import InvalidRequestError
from sqlalchemy.sql import update, bindparam
from pylons import config
from ckan import plugins as p
@ -12,28 +11,45 @@ from ckan.model import Session, Package, PACKAGE_NAME_MAX_LENGTH
from ckan.logic import ValidationError, NotFound, get_action
from ckan.logic.schema import default_create_package_schema
from ckan.lib.navl.validators import ignore_missing,ignore
from ckan.lib.munge import munge_title_to_name,substitute_ascii_equivalents
from ckan.lib.navl.validators import ignore_missing, ignore
from ckan.lib.munge import munge_title_to_name, substitute_ascii_equivalents
from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \
HarvestObjectError
from ckanext.harvest.model import (HarvestObject, HarvestGatherError,
HarvestObjectError)
from ckan.plugins.core import SingletonPlugin, implements
from ckanext.harvest.interfaces import IHarvester
if p.toolkit.check_ckan_version(min_version='2.3'):
from ckan.lib.munge import munge_tag
else:
# Fallback munge_tag for older ckan versions which don't have a decent
# munger
def _munge_to_length(string, min_length, max_length):
'''Pad/truncates a string'''
if len(string) < min_length:
string += '_' * (min_length - len(string))
if len(string) > max_length:
string = string[:max_length]
return string
def munge_tag(tag):
tag = substitute_ascii_equivalents(tag)
tag = tag.lower().strip()
tag = re.sub(r'[^a-zA-Z0-9\- ]', '', tag).replace(' ', '-')
tag = _munge_to_length(tag, model.MIN_TAG_LENGTH, model.MAX_TAG_LENGTH)
return tag
log = logging.getLogger(__name__)
def munge_tag(tag):
tag = substitute_ascii_equivalents(tag)
tag = tag.lower().strip()
return re.sub(r'[^a-zA-Z0-9 -]', '', tag).replace(' ', '-')
class HarvesterBase(SingletonPlugin):
'''
Generic class for harvesters with helper functions
Generic base class for harvesters, providing a number of useful functions.
A harvester doesn't have to derive from this - it could just have:
implements(IHarvester)
'''
implements(IHarvester)
@ -136,31 +152,8 @@ class HarvesterBase(SingletonPlugin):
return ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS] + \
str(uuid.uuid4())[:APPEND_MAX_CHARS]
def _save_gather_error(self, message, job):
err = HarvestGatherError(message=message, job=job)
try:
err.save()
except InvalidRequestError:
Session.rollback()
err.save()
finally:
log.error(message)
def _save_object_error(self, message, obj, stage=u'Fetch', line=None):
err = HarvestObjectError(message=message,
object=obj,
stage=stage,
line=line)
try:
err.save()
except InvalidRequestError, e:
Session.rollback()
err.save()
finally:
log_message = '{0}, line {1}'.format(message,line) if line else message
log.debug(log_message)
_save_gather_error = HarvestGatherError.create
_save_object_error = HarvestObjectError.create
def _get_user_name(self):
'''

View File

@ -1,5 +1,6 @@
from ckan.plugins.interfaces import Interface
class IHarvester(Interface):
'''
Common harvesting interface
@ -8,8 +9,8 @@ class IHarvester(Interface):
def info(self):
'''
Harvesting implementations must provide this method, which will return a
dictionary containing different descriptors of the harvester. The
Harvesting implementations must provide this method, which will return
a dictionary containing different descriptors of the harvester. The
returned dictionary should contain:
* name: machine-readable name. This will be the value stored in the
@ -17,8 +18,8 @@ class IHarvester(Interface):
harvester.
* title: human-readable name. This will appear in the form's select box
in the WUI.
* description: a small description of what the harvester does. This will
appear on the form as a guidance to the user.
* description: a small description of what the harvester does. This
will appear on the form as a guidance to the user.
A complete example may be::
@ -37,9 +38,10 @@ class IHarvester(Interface):
[optional]
Harvesters can provide this method to validate the configuration entered in the
form. It should return a single string, which will be stored in the database.
Exceptions raised will be shown in the form's error messages.
Harvesters can provide this method to validate the configuration
entered in the form. It should return a single string, which will be
stored in the database. Exceptions raised will be shown in the form's
error messages.
:param harvest_object_id: Config string coming from the form
:returns: A string with the validated configuration options
@ -70,7 +72,7 @@ class IHarvester(Interface):
def gather_stage(self, harvest_job):
'''
The gather stage will recieve a HarvestJob object and will be
The gather stage will receive a HarvestJob object and will be
responsible for:
- gathering all the necessary objects to fetch on a later.
stage (e.g. for a CSW server, perform a GetRecords request)
@ -82,6 +84,8 @@ class IHarvester(Interface):
- creating and storing any suitable HarvestGatherErrors that may
occur.
- returning a list with all the ids of the created HarvestObjects.
- to abort the harvest, create a HarvestGatherError and raise an
exception. Any created HarvestObjects will be deleted.
:param harvest_job: HarvestJob object
:returns: A list of HarvestObject ids
@ -96,10 +100,14 @@ class IHarvester(Interface):
- saving the content in the provided HarvestObject.
- creating and storing any suitable HarvestObjectErrors that may
occur.
- returning True if everything went as expected, False otherwise.
- returning True if everything is ok (ie the object should now be
imported), "unchanged" if the object didn't need harvesting after
all (ie no error, but don't continue to import stage) or False if
there were errors.
:param harvest_object: HarvestObject object
:returns: True if everything went right, False if errors were found
:returns: True if successful, 'unchanged' if nothing to import after
all, False if not successful
'''
def import_stage(self, harvest_object):
@ -118,10 +126,13 @@ class IHarvester(Interface):
objects of this harvest source if the action was successful.
- creating and storing any suitable HarvestObjectErrors that may
occur.
- returning True if the action was done, "unchanged" if nothing
was needed doing after all or False if there were errors.
- creating the HarvestObject - Package relation (if necessary)
- returning True if the action was done, "unchanged" if the object
didn't need harvesting after all or False if there were errors.
NB You can run this stage repeatedly using 'paster harvest import'.
:param harvest_object: HarvestObject object
:returns: True if the action was done, "unchanged" if nothing was
needed doing after all and False if something went wrong.
:returns: True if the action was done, "unchanged" if the object didn't
need harvesting after all or False if there were errors.
'''

View File

@ -8,3 +8,7 @@ except ImportError:
class HarvestJobExists(Exception):
pass
class HarvestSourceInactiveError(Exception):
pass

View File

@ -4,15 +4,15 @@ import ckan
from ckan.plugins import toolkit
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.logic import HarvestJobExists, HarvestSourceInactiveError
from ckanext.harvest.plugin import DATASET_TYPE_NAME
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject,
HarvestObjectExtra)
HarvestObjectExtra)
from ckanext.harvest.logic.dictization import (harvest_job_dictize,
harvest_object_dictize)
from ckanext.harvest.logic.schema import (harvest_source_show_package_schema,
harvest_object_create_schema)
from ckanext.harvest.logic.action.get import harvest_source_list,harvest_job_list
harvest_object_dictize)
from ckanext.harvest.logic.schema import harvest_object_create_schema
from ckanext.harvest.logic.action.get import (harvest_source_list,
harvest_job_list)
log = logging.getLogger(__name__)
@ -23,7 +23,8 @@ check_access = toolkit.check_access
class InactiveSource(Exception):
pass
def harvest_source_create(context,data_dict):
def harvest_source_create(context, data_dict):
'''
Creates a new harvest source
@ -76,8 +77,8 @@ def harvest_job_create(context, data_dict):
Creates a Harvest Job for a Harvest Source and runs it (by putting it on
the gather queue)
:param source_id:
:type param: string
:param source_id: id of the harvest source to create a job for
:type source_id: string
:param run: whether to also run it or not (default: True)
:type run: bool
'''
@ -97,7 +98,7 @@ def harvest_job_create(context, data_dict):
if not source.active:
log.warn('Harvest job cannot be created for inactive source %s',
source_id)
raise Exception('Can not create jobs on inactive sources')
raise HarvestSourceInactiveError('Can not create jobs on inactive sources')
# Check if there already is an unrun or currently running job for this
# source
@ -131,20 +132,21 @@ def harvest_job_create_all(context, data_dict):
'''
log.info('Harvest job create all: %r', data_dict)
check_access('harvest_job_create_all',context,data_dict)
check_access('harvest_job_create_all', context, data_dict)
run = data_dict.get('run', True)
data_dict.update({'only_active':True})
data_dict.update({'only_active': True})
# Get all active sources
sources = harvest_source_list(context,data_dict)
sources = harvest_source_list(context, data_dict)
jobs = []
# Create a new job for each, if there isn't already one
for source in sources:
exists = _check_for_existing_jobs(context, source['id'])
if exists:
log.info('Skipping source %s as it already has a pending job', source['id'])
log.info('Skipping source %s as it already has a pending job',
source['id'])
continue
job = harvest_job_create(
@ -155,6 +157,7 @@ def harvest_job_create_all(context, data_dict):
'and run ' if run else '', len(jobs))
return jobs
def _check_for_existing_jobs(context, source_id):
'''
Given a source id, checks if there are jobs for this source
@ -162,20 +165,21 @@ def _check_for_existing_jobs(context, source_id):
rtype: boolean
'''
data_dict ={
'source_id':source_id,
'status':u'New'
data_dict = {
'source_id': source_id,
'status': u'New'
}
exist_new = harvest_job_list(context,data_dict)
data_dict ={
'source_id':source_id,
'status':u'Running'
exist_new = harvest_job_list(context, data_dict)
data_dict = {
'source_id': source_id,
'status': u'Running'
}
exist_running = harvest_job_list(context,data_dict)
exist_running = harvest_job_list(context, data_dict)
exist = len(exist_new + exist_running) > 0
return exist
def harvest_object_create(context, data_dict):
''' Create a new harvest object
@ -187,7 +191,8 @@ def harvest_object_create(context, data_dict):
:type extras: dict (optional)
'''
check_access('harvest_object_create', context, data_dict)
data, errors = _validate(data_dict, harvest_object_create_schema(), context)
data, errors = _validate(data_dict, harvest_object_create_schema(),
context)
if errors:
raise toolkit.ValidationError(errors)
@ -195,11 +200,11 @@ def harvest_object_create(context, data_dict):
obj = HarvestObject(
guid=data.get('guid'),
content=data.get('content'),
job=data['job_id'],
job=data['job_id'], # which was validated into a HarvestJob object
harvest_source_id=data.get('source_id'),
package_id=data.get('package_id'),
extras=[ HarvestObjectExtra(key=k, value=v)
for k, v in data.get('extras', {}).items() ]
extras=[HarvestObjectExtra(key=k, value=v)
for k, v in data.get('extras', {}).items()]
)
obj.save()

View File

@ -128,33 +128,8 @@ def harvest_source_list(context, data_dict):
sources = _get_sources_for_user(context, data_dict)
context.update({'detailed':False})
return [harvest_source_dictize(source, context) for source in sources]
@side_effect_free
def harvest_source_for_a_dataset(context, data_dict):
'''
TODO: Deprecated, harvest source id is added as an extra to each dataset
automatically
'''
'''For a given dataset, return the harvest source that
created or last updated it, otherwise NotFound.'''
model = context['model']
session = context['session']
dataset_id = data_dict.get('id')
query = session.query(HarvestSource)\
.join(HarvestObject)\
.filter_by(package_id=dataset_id)\
.order_by(HarvestObject.gathered.desc())
source = query.first() # newest
if not source:
raise NotFound
return harvest_source_dictize(source,context)
@side_effect_free
def harvest_job_show(context,data_dict):

View File

@ -262,6 +262,8 @@ def harvest_objects_import(context, data_dict):
:param source_id: the id of the harvest source to import
:type source_id: string
:param guid: the guid of the harvest object to import
:type guid: string
:param harvest_object_id: the id of the harvest object to import
:type harvest_object_id: string
:param package_id: the id or name of the package to import
@ -273,6 +275,7 @@ def harvest_objects_import(context, data_dict):
model = context['model']
session = context['session']
source_id = data_dict.get('source_id')
guid = data_dict.get('guid')
harvest_object_id = data_dict.get('harvest_object_id')
package_id_or_name = data_dict.get('package_id')
@ -280,7 +283,13 @@ def harvest_objects_import(context, data_dict):
join_datasets = context.get('join_datasets', True)
if source_id:
if guid:
last_objects_ids = \
session.query(HarvestObject.id) \
.filter(HarvestObject.guid == guid) \
.filter(HarvestObject.current == True)
elif source_id:
source = HarvestSource.get(source_id)
if not source:
log.error('Harvest source %s does not exist', source_id)
@ -462,18 +471,17 @@ def harvest_send_job_to_gather_queue(context, data_dict):
:type id: string
'''
log.info('Send job to gather queue: %r', data_dict)
check_access('harvest_send_job_to_gather_queue', context, data_dict)
job_id = logic.get_or_bust(data_dict, 'id')
job = toolkit.get_action('harvest_job_show')(
context, {'id': job_id})
check_access('harvest_send_job_to_gather_queue', context, job)
# gather queue
publisher = get_gather_publisher()
job = logic.get_action('harvest_job_show')(
context, {'id': job_id})
# Check the source is active
context['detailed'] = False
source = harvest_source_show(context, {'id': job['source_id']})
if not source['active']:
raise toolkit.ValidationError('Source is not active')
@ -495,6 +503,11 @@ def harvest_job_abort(context, data_dict):
marks them "ERROR", so any left in limbo are cleaned up. Does not actually
stop running any queued harvest fetchs/objects.
Specify either id or source_id.
:param id: the job id to abort, or the id or name of the harvest source
with a job to abort
:type id: string
:param source_id: the name or id of the harvest source with a job to abort
:type source_id: string
'''
@ -503,18 +516,25 @@ def harvest_job_abort(context, data_dict):
model = context['model']
source_id = data_dict.get('source_id')
source = harvest_source_show(context, {'id': source_id})
# HarvestJob set status to 'Finished'
# Don not use harvest_job_list since it can use a lot of memory
last_job = model.Session.query(HarvestJob) \
.filter_by(source_id=source['id']) \
.order_by(HarvestJob.created.desc()).first()
if not last_job:
raise NotFound('Error: source has no jobs')
job = get_action('harvest_job_show')(context,
{'id': last_job.id})
source_or_job_id = data_dict.get('source_id') or data_dict.get('id')
if source_or_job_id:
try:
source = harvest_source_show(context, {'id': source_or_job_id})
except NotFound:
job = get_action('harvest_job_show')(
context, {'id': source_or_job_id})
else:
# HarvestJob set status to 'Aborted'
# Do not use harvest_job_list since it can use a lot of memory
# Get the most recent job for the source
job = model.Session.query(HarvestJob) \
.filter_by(source_id=source['id']) \
.order_by(HarvestJob.created.desc()).first()
if not job:
raise NotFound('Error: source has no jobs')
job_id = job.id
job = get_action('harvest_job_show')(
context, {'id': job_id})
if job['status'] != 'Finished':
# i.e. New or Running

View File

@ -66,7 +66,7 @@ def harvest_send_job_to_gather_queue(context, data_dict):
It forwards the checks to harvest_job_create, ie the user can only run
the job if she is allowed to create the job.
'''
from ckanext.harvest.auth.create import harvest_job_create
from ckanext.harvest.logic.auth.create import harvest_job_create
return harvest_job_create(context, data_dict)

View File

@ -104,7 +104,6 @@ def _get_source_status(source, context):
'''
model = context.get('model')
detailed = context.get('detailed', True)
out = dict()
@ -114,11 +113,8 @@ def _get_source_status(source, context):
'job_count': 0,
'next_harvest': '',
'last_harvest_request': '',
'last_harvest_statistics':
{'added': 0, 'updated': 0, 'errors': 0, 'deleted': 0},
'last_harvest_errors': {'gather': [], 'object': []},
'overall_statistics': {'added': 0, 'errors': 0},
'packages': []}
}
if not job_count:
out['msg'] = 'No jobs yet'
@ -141,32 +137,6 @@ def _get_source_status(source, context):
#TODO: Should we encode the dates as strings?
out['last_harvest_request'] = str(last_job.gather_finished)
if detailed:
harvest_job_dict = harvest_job_dictize(last_job, context)
# No packages added or updated
statistics = out['last_harvest_statistics']
statistics['added'] = harvest_job_dict['stats'].get('new', 0)
statistics['updated'] = harvest_job_dict['stats'].get('updated', 0)
statistics['deleted'] = harvest_job_dict['stats'].get('deleted', 0)
statistics['errors'] = (
harvest_job_dict['stats'].get('errored', 0) +
len(last_job.gather_errors))
if detailed:
# We have the gathering errors in last_job.gather_errors, so let's
# also get also the object errors.
object_errors = model.Session.query(HarvestObjectError)\
.join(HarvestObject) \
.filter(HarvestObject.job == last_job)
for gather_error in last_job.gather_errors:
out['last_harvest_errors']['gather'].append(gather_error.message)
for object_error in object_errors:
err = {'object_id': object_error.object.id,
'object_guid': object_error.object.guid,
'message': object_error.message}
out['last_harvest_errors']['object'].append(err)
# Overall statistics
packages = model.Session.query(distinct(HarvestObject.package_id),
Package.name) \
@ -176,9 +146,6 @@ def _get_source_status(source, context):
.filter(Package.state == u'active')
out['overall_statistics']['added'] = packages.count()
if detailed:
for package in packages:
out['packages'].append(package.name)
gather_errors = model.Session.query(HarvestGatherError) \
.join(HarvestJob).join(HarvestSource) \

View File

@ -34,6 +34,9 @@ def harvest_job_exists(value, context):
def _normalize_url(url):
'''Strips off parameters off a URL, and an unnecessary port number, so that
simple variations on a URL are ignored, to used to help avoid getting two
harvesters for the same URL.'''
o = urlparse.urlparse(url)
# Normalize port
@ -118,8 +121,8 @@ def harvest_source_type_exists(value, context):
available_types.append(info['name'])
if not value in available_types:
raise Invalid('Unknown harvester type: %s. Have you registered a '
'harvester for this type?' % value)
raise Invalid('Unknown harvester type: %s. Registered types: %r' %
(value, available_types))
return value

View File

@ -10,6 +10,7 @@ from sqlalchemy import ForeignKey
from sqlalchemy import types
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm import backref, relation
from sqlalchemy.exc import InvalidRequestError
from ckan import model
from ckan import logic
@ -153,13 +154,43 @@ class HarvestGatherError(HarvestDomainObject):
'''Gather errors are raised during the **gather** stage of a harvesting
job.
'''
pass
@classmethod
def create(cls, message, job):
'''
Helper function to create an error object and save it.
'''
err = cls(message=message, job=job)
try:
err.save()
except InvalidRequestError:
Session.rollback()
err.save()
finally:
# No need to alert administrator so don't log as an error
log.info(message)
class HarvestObjectError(HarvestDomainObject):
'''Object errors are raised during the **fetch** or **import** stage of a
harvesting job, and are referenced to a specific harvest object.
'''
pass
@classmethod
def create(cls, message, object, stage=u'Fetch', line=None):
'''
Helper function to create an error object and save it.
'''
err = cls(message=message, object=object,
stage=stage, line=line)
try:
err.save()
except InvalidRequestError:
Session.rollback()
err.save()
finally:
log_message = '{0}, line {1}'.format(message, line) \
if line else message
log.debug(log_message)
def harvest_object_before_insert_listener(mapper,connection,target):
'''

View File

@ -7,6 +7,12 @@ from ckan import logic
from ckan import model
import ckan.plugins as p
from ckan.lib.plugins import DefaultDatasetForm
try:
from ckan.lib.plugins import DefaultTranslation
except ImportError:
class DefaultTranslation():
pass
from ckan.lib.navl import dictization_functions
from ckanext.harvest import logic as harvest_logic
@ -15,12 +21,14 @@ from ckanext.harvest.model import setup as model_setup
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
log = getLogger(__name__)
assert not log.disabled
DATASET_TYPE_NAME = 'harvest'
class Harvest(p.SingletonPlugin, DefaultDatasetForm):
class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
p.implements(p.IConfigurable)
p.implements(p.IRoutes, inherit=True)
@ -31,6 +39,9 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm):
p.implements(p.IPackageController, inherit=True)
p.implements(p.ITemplateHelpers)
p.implements(p.IFacets, inherit=True)
if p.toolkit.check_ckan_version(min_version='2.5.0'):
p.implements(p.ITranslation, inherit=True)
startup = False

View File

@ -395,7 +395,7 @@ def fetch_and_import_stages(harvester, obj):
success_fetch = harvester.fetch_stage(obj)
obj.fetch_finished = datetime.datetime.utcnow()
obj.save()
if success_fetch:
if success_fetch is True:
# If no errors where found, call the import method
obj.import_started = datetime.datetime.utcnow()
obj.state = "IMPORT"
@ -411,6 +411,11 @@ def fetch_and_import_stages(harvester, obj):
else:
obj.state = "ERROR"
obj.save()
elif success_fetch == 'unchanged':
obj.state = 'COMPLETE'
obj.report_status = 'not modified'
obj.save()
return
else:
obj.state = "ERROR"
obj.save()

View File

@ -82,6 +82,9 @@ class HarvestObject(factory.Factory):
if 'job_id' not in kwargs:
kwargs['job_id'] = kwargs['job'].id
kwargs['source_id'] = kwargs['job'].source.id
# Remove 'job' to avoid it getting added as a HarvestObjectExtra
if 'job' in kwargs:
kwargs.pop('job')
job_dict = toolkit.get_action('harvest_object_create')(
context, kwargs)
if cls._return_type == 'dict':

View File

@ -64,6 +64,7 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
return self.respond_json(revision_ids)
if self.path.startswith('/api/rest/revision/'):
revision_ref = self.path.split('/')[-1]
assert api_version == 2
for rev in REVISIONS:
if rev['id'] == revision_ref:
return self.respond_json(rev)
@ -440,7 +441,7 @@ REVISIONS = [
"approved_timestamp": None,
"packages":
[
DATASETS[1]['name']
DATASETS[1]['id']
],
"groups": [ ]
},
@ -452,7 +453,7 @@ REVISIONS = [
"approved_timestamp": None,
"packages":
[
DATASETS[1]['name']
DATASETS[1]['id']
],
"groups": [ ]
}]

View File

@ -2,7 +2,7 @@ import re
from nose.tools import assert_equal
from ckanext.harvest import model as harvest_model
from ckanext.harvest.harvesters.base import HarvesterBase
from ckanext.harvest.harvesters.base import HarvesterBase, munge_tag
try:
from ckan.tests import helpers
from ckan.tests import factories
@ -96,3 +96,30 @@ class TestEnsureNameIsUnique(object):
name = _ensure_name_is_unique('trees', append_type='random-hex')
# e.g. 'trees0b53f'
assert re.match('trees[\da-f]{5}', name)
# taken from ckan/tests/lib/test_munge.py
class TestMungeTag:
# (original, expected)
munge_list = [
('unchanged', 'unchanged'),
('s', 's_'), # too short
('some spaces here', 'some-spaces--here'),
('random:other%characters&_.here', 'randomothercharactershere'),
('river-water-dashes', 'river-water-dashes'),
]
def test_munge_tag(self):
'''Munge a list of tags gives expected results.'''
for org, exp in self.munge_list:
munge = munge_tag(org)
assert_equal(munge, exp)
def test_munge_tag_multiple_pass(self):
'''Munge a list of tags muliple times gives expected results.'''
for org, exp in self.munge_list:
first_munge = munge_tag(org)
assert_equal(first_munge, exp)
second_munge = munge_tag(first_munge)
assert_equal(second_munge, exp)

View File

@ -106,7 +106,7 @@ class TestCkanHarvester(object):
harvester=CKANHarvester())
# updated the dataset which has revisions
result = results_by_guid[mock_ckan.DATASETS[1]['name']]
result = results_by_guid[mock_ckan.DATASETS[1]['id']]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'updated')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name'])
@ -155,7 +155,7 @@ class TestCkanHarvester(object):
# The metadata_modified was the same for this dataset so the import
# would have returned 'unchanged'
result = results_by_guid[mock_ckan.DATASETS[1]['name']]
result = results_by_guid[mock_ckan.DATASETS[1]['id']]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert 'dataset' not in result

View File

@ -1,5 +1,4 @@
import json
import uuid
import factories
import unittest
from nose.tools import assert_equal, assert_raises
@ -7,11 +6,12 @@ from nose.plugins.skip import SkipTest
try:
from ckan.tests import factories as ckan_factories
from ckan.tests.helpers import _get_test_app, reset_db, FunctionalTestBase
from ckan.tests.helpers import (_get_test_app, reset_db,
FunctionalTestBase, assert_in)
except ImportError:
from ckan.new_tests import factories as ckan_factories
from ckan.new_tests.helpers import (_get_test_app, reset_db,
FunctionalTestBase)
FunctionalTestBase, assert_in)
from ckan import plugins as p
from ckan.plugins import toolkit
from ckan import model
@ -403,6 +403,44 @@ class TestActions(ActionBase):
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_job_create_as_sysadmin(self):
source = factories.HarvestSource(**SOURCE_DICT)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': site_user}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
def test_harvest_job_create_as_admin(self):
# as if an admin user presses 'refresh'
user = ckan_factories.User()
user['capacity'] = 'admin'
org = ckan_factories.Organization(users=[user])
source_dict = dict(SOURCE_DICT.items() +
[('publisher_id', org['id'])])
source = factories.HarvestSource(**source_dict)
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': user['name']}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
class TestHarvestObject(unittest.TestCase):
@classmethod

View File

@ -254,7 +254,6 @@ class TestHarvestQueue(object):
)
assert_equal(harvest_job['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1})
context['detailed'] = True
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}

View File

@ -35,6 +35,8 @@ class MockHarvester(p.SingletonPlugin):
return [obj.id]
def fetch_stage(self, harvest_object):
if self._test_params.get('fetch_object_unchanged'):
return 'unchanged'
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
@ -81,14 +83,13 @@ class MockHarvester(p.SingletonPlugin):
harvest_object.save()
if self._test_params.get('object_unchanged'):
if self._test_params.get('import_object_unchanged'):
return 'unchanged'
return True
class TestEndStates(object):
@classmethod
def setup_class(cls):
def setup(self):
reset_db()
harvest_model.setup()
@ -155,9 +156,22 @@ class TestEndStates(object):
assert_equal(result['report_status'], 'errored')
assert_equal(result['errors'], [])
def test_unchanged(self):
def test_fetch_unchanged(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, object_unchanged=True)
MockHarvester._set_test_params(guid=guid, fetch_object_unchanged=True)
results_by_guid = run_harvest(
url='http://some-url.com',
harvester=MockHarvester())
result = results_by_guid[guid]
assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'not modified')
assert_equal(result['errors'], [])
def test_import_unchanged(self):
guid = 'obj-error'
MockHarvester._set_test_params(guid=guid, import_object_unchanged=True)
results_by_guid = run_harvest(
url='http://some-url.com',