Merge branch 'feature-1731-publisher-support'

This commit is contained in:
amercader 2012-03-08 17:40:01 +00:00
commit fd52ec9f7d
31 changed files with 1781 additions and 481 deletions

View File

@ -100,6 +100,34 @@ the config explicitly though::
paster harvester sources --config=../ckan/development.ini
Authorization Profiles
======================
Starting from CKAN 1.6.1, the harvester extension offers the hability to use
different authorization profiles. These can be defined in your ini file as::
ckan.harvest.auth.profile = <profile_name>
The two available profiles right now are:
* `default`: This is the default profile, the same one that this extension has
used historically. Basically, only sysadmins can manage anything related to
harvesting, including creating and editing harvest sources or running harvest
jobs.
* `publisher`: When using this profile, sysadmins can still perform any
harvesting related action, but in addition, users belonging to a publisher
(with role `admin`) can manage and run their own harvest sources and jobs.
Note that this requires CKAN core to also use the `publisher` authorization
profile, i.e you will also need to add::
ckan.auth.profile = publisher
To know more about the CKAN publisher auth profile, visit
http://wiki.ckan.org/Working_with_the_publisher_auth_profile
The CKAN harverster
===================

View File

@ -2,8 +2,10 @@ import sys
import re
from pprint import pprint
from ckan import model
from ckan.logic import get_action, ValidationError
from ckan.lib.cli import CkanCommand
from ckanext.harvest.lib import *
from ckanext.harvest.queue import get_gather_consumer, get_fetch_consumer
class Harvester(CkanCommand):
@ -62,6 +64,13 @@ class Harvester(CkanCommand):
def command(self):
self._load_config()
# We'll need a sysadmin user to perform most of the actions
# We will use the sysadmin site user (named as the site_id)
context = {'model':model,'session':model.Session,'ignore_auth':True}
self.admin_user = get_action('get_site_user')(context,{})
print ''
if len(self.args) == 0:
@ -97,6 +106,9 @@ class Harvester(CkanCommand):
self.import_stage()
elif cmd == 'job-all':
self.create_harvest_job_all()
elif cmd == 'harvesters-info':
harvesters_info = get_action('harvesters_info_show')()
pprint(harvesters_info)
else:
print 'Command %s not recognized' % cmd
@ -139,48 +151,50 @@ class Harvester(CkanCommand):
else:
publisher_id = u''
try:
source = create_harvest_source({
data_dict = {
'url':url,
'type':type,
'config':config,
'active':active,
'user_id':user_id,
'publisher_id':publisher_id})
'publisher_id':publisher_id}
context = {'model':model, 'session':model.Session, 'user': self.admin_user['name']}
source = get_action('harvest_source_create')(context,data_dict)
print 'Created new harvest source:'
self.print_harvest_source(source)
sources = get_harvest_sources()
sources = get_action('harvest_source_list')(context,{})
self.print_there_are('harvest source', sources)
# Create a Harvest Job for the new Source
create_harvest_job(source['id'])
# Create a harvest job for the new source
get_action('harvest_job_create')(context,{'source_id':source['id']})
print 'A new Harvest Job for this source has also been created'
except ValidationError,e:
print 'An error occurred:'
print str(e.error_dict)
raise e
def remove_harvest_source(self):
if len(self.args) >= 2:
source_id = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
remove_harvest_source(source_id)
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
get_action('harvest_source_delete')(context,{'id':source_id})
print 'Removed harvest source: %s' % source_id
def list_harvest_sources(self):
if len(self.args) >= 2 and self.args[1] == 'all':
sources = get_harvest_sources()
data_dict = {}
what = 'harvest source'
else:
sources = get_harvest_sources(active=True)
data_dict = {'only_active':True}
what = 'active harvest source'
context = {'model': model,'session':model.Session, 'user': self.admin_user['name']}
sources = get_action('harvest_source_list')(context,data_dict)
self.print_harvest_sources(sources)
self.print_there_are(what=what, sequence=sources)
@ -194,21 +208,21 @@ class Harvester(CkanCommand):
job = create_harvest_job(source_id)
self.print_harvest_job(job)
status = u'New'
jobs = get_harvest_jobs(status=status)
context = {'model': model,'session':model.Session, 'user': self.admin_user['name']}
jobs = get_action('harvest_job_list')(context,{'status':u'New'})
self.print_there_are('harvest jobs', jobs, condition=status)
def list_harvest_jobs(self):
jobs = get_harvest_jobs()
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
jobs = get_action('harvest_job_list')(context,{})
self.print_harvest_jobs(jobs)
self.print_there_are(what='harvest job', sequence=jobs)
def run_harvester(self):
try:
jobs = run_harvest_jobs()
except:
pass
sys.exit(0)
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
jobs = get_action('harvest_jobs_run')(context,{})
#print 'Sent %s jobs to the gather queue' % len(jobs)
def import_stage(self):
@ -216,12 +230,15 @@ class Harvester(CkanCommand):
source_id = unicode(self.args[1])
else:
source_id = None
objs = import_last_objects(source_id)
context = {'model': model, 'session':model.Session, 'user': self.admin_user['name']}
objs = get_action('harvest_objects_import')(context,{'source_id':source_id})
print '%s objects reimported' % len(objs)
def create_harvest_job_all(self):
jobs = create_harvest_job_all()
print "Created %s new harvest jobs" % len(jobs)
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
jobs = get_action('harvest_job_create_all')(context,{})
print 'Created %s new harvest jobs' % len(jobs)
def print_harvest_sources(self, sources):
if sources:
@ -236,7 +253,7 @@ class Harvester(CkanCommand):
print ' active: %s' % source['active']
print ' user: %s' % source['user_id']
print 'publisher: %s' % source['publisher_id']
print ' jobs: %s' % len(source['jobs'])
print ' jobs: %s' % source['status']['job_count']
print ''
def print_harvest_jobs(self, jobs):
@ -248,8 +265,7 @@ class Harvester(CkanCommand):
def print_harvest_job(self, job):
print ' Job id: %s' % job['id']
print ' status: %s' % job['status']
print ' source: %s' % job['source']['id']
print ' url: %s' % job['source']['url']
print ' source: %s' % job['source']
print ' objects: %s' % len(job['objects'])
print 'gather_errors: %s' % len(job['gather_errors'])

View File

@ -2,34 +2,64 @@ from lxml import etree
from lxml.etree import XMLSyntaxError
from pylons.i18n import _
from ckan.authz import Authorizer
from ckan import model
from ckan.model.group import Group
import ckan.lib.helpers as h, json
from ckan.lib.base import BaseController, c, g, request, \
response, session, render, config, abort, redirect
from ckan.lib.navl.dictization_functions import DataError
from ckan.logic import NotFound, ValidationError
from ckan.logic import NotFound, ValidationError, get_action, NotAuthorized
from ckanext.harvest.logic.schema import harvest_source_form_schema
from ckanext.harvest.lib import create_harvest_source, edit_harvest_source, \
get_harvest_source, get_harvest_sources, \
create_harvest_job, get_registered_harvesters_info, \
get_harvest_object
from ckan.lib.helpers import Page,pager_url
import logging
log = logging.getLogger(__name__)
class ViewController(BaseController):
def __before__(self, action, **env):
super(ViewController, self).__before__(action, **env)
# All calls to this controller must be with a sysadmin key
if not self.authorizer.is_sysadmin(c.user):
response_msg = _('Not authorized to see this page')
status = 401
abort(status, response_msg)
not_auth_message = _('Not authorized to see this page')
def __before__(self, action, **params):
super(ViewController,self).__before__(action, **params)
c.publisher_auth = (config.get('ckan.harvest.auth.profile',None) == 'publisher')
def _get_publishers(self):
groups = None
if c.publisher_auth:
if Authorizer().is_sysadmin(c.user):
groups = Group.all(group_type='publisher')
elif c.userobj:
groups = c.userobj.get_groups('publisher')
else: # anonymous user shouldn't have access to this page anyway.
groups = []
# Be explicit about which fields we make available in the template
groups = [ {
'name': g.name,
'id': g.id,
'title': g.title,
} for g in groups ]
return groups
def index(self):
context = {'model':model, 'user':c.user,'session':model.Session}
try:
# Request all harvest sources
c.sources = get_harvest_sources()
c.sources = get_action('harvest_source_list')(context,{})
except NotAuthorized,e:
abort(401,self.not_auth_message)
if c.publisher_auth:
c.sources = sorted(c.sources,key=lambda source : source['publisher_title'])
return render('index.html')
@ -41,8 +71,16 @@ class ViewController(BaseController):
data = data or {}
errors = errors or {}
error_summary = error_summary or {}
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': get_registered_harvesters_info()}
try:
context = {'model':model, 'user':c.user}
harvesters_info = get_action('harvesters_info_show')(context,{})
except NotAuthorized,e:
abort(401,self.not_auth_message)
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': harvesters_info}
c.groups = self._get_publishers()
c.form = render('source/new_source_form.html', extra_vars=vars)
return render('source/new.html')
@ -50,15 +88,19 @@ class ViewController(BaseController):
try:
data_dict = dict(request.params)
self._check_data_dict(data_dict)
context = {'model':model, 'user':c.user, 'session':model.Session,
'schema':harvest_source_form_schema()}
source = create_harvest_source(data_dict)
source = get_action('harvest_source_create')(context,data_dict)
# Create a harvest job for the new source
create_harvest_job(source['id'])
get_action('harvest_job_create')(context,{'source_id':source['id']})
h.flash_success(_('New harvest source added successfully.'
'A new harvest job for the source has also been created.'))
redirect(h.url_for('harvest'))
redirect('/harvest/%s' % source['id'])
except NotAuthorized,e:
abort(401,self.not_auth_message)
except DataError,e:
abort(400, 'Integrity Error')
except ValidationError,e:
@ -71,30 +113,46 @@ class ViewController(BaseController):
if ('save' in request.params) and not data:
return self._save_edit(id)
if not data:
try:
old_data = get_harvest_source(id)
context = {'model':model, 'user':c.user}
old_data = get_action('harvest_source_show')(context, {'id':id})
except NotFound:
abort(404, _('Harvest Source not found'))
except NotAuthorized,e:
abort(401,self.not_auth_message)
data = data or old_data
errors = errors or {}
error_summary = error_summary or {}
try:
context = {'model':model, 'user':c.user}
harvesters_info = get_action('harvesters_info_show')(context,{})
except NotAuthorized,e:
abort(401,self.not_auth_message)
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': get_registered_harvesters_info()}
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': harvesters_info}
c.groups = self._get_publishers()
c.form = render('source/new_source_form.html', extra_vars=vars)
return render('source/edit.html')
def _save_edit(self,id):
try:
data_dict = dict(request.params)
data_dict['id'] = id
self._check_data_dict(data_dict)
context = {'model':model, 'user':c.user, 'session':model.Session,
'schema':harvest_source_form_schema()}
source = edit_harvest_source(id,data_dict)
source = get_action('harvest_source_update')(context,data_dict)
h.flash_success(_('Harvest source edited successfully.'))
redirect(h.url_for('harvest'))
redirect('/harvest/%s' %id)
except NotAuthorized,e:
abort(401,self.not_auth_message)
except DataError,e:
abort(400, _('Integrity Error'))
except NotFound, e:
@ -106,18 +164,23 @@ class ViewController(BaseController):
def _check_data_dict(self, data_dict):
'''Check if the return data is correct'''
surplus_keys_schema = ['id','publisher_id','user_id','active','save','config']
surplus_keys_schema = ['id','publisher_id','user_id','config','save']
schema_keys = harvest_source_form_schema().keys()
keys_in_schema = set(schema_keys) - set(surplus_keys_schema)
# user_id is not yet used, we'll set the logged user one for the time being
if not data_dict.get('user_id',None):
if c.userobj:
data_dict['user_id'] = c.userobj.id
if keys_in_schema - set(data_dict.keys()):
log.info(_('Incorrect form fields posted'))
raise DataError(data_dict)
def read(self,id):
try:
c.source = get_harvest_source(id)
context = {'model':model, 'user':c.user}
c.source = get_action('harvest_source_show')(context, {'id':id})
c.page = Page(
collection=c.source['status']['packages'],
page=request.params.get('page', 1),
@ -128,24 +191,33 @@ class ViewController(BaseController):
return render('source/read.html')
except NotFound:
abort(404,_('Harvest source not found'))
except NotAuthorized,e:
abort(401,self.not_auth_message)
def delete(self,id):
try:
delete_harvest_source(id)
context = {'model':model, 'user':c.user}
get_action('harvest_source_delete')(context, {'id':id})
h.flash_success(_('Harvesting source deleted successfully'))
h.flash_success(_('Harvesting source successfully inactivated'))
redirect(h.url_for('harvest'))
except NotFound:
abort(404,_('Harvest source not found'))
except NotAuthorized,e:
abort(401,self.not_auth_message)
def create_harvesting_job(self,id):
try:
create_harvest_job(id)
context = {'model':model, 'user':c.user, 'session':model.Session}
get_action('harvest_job_create')(context,{'source_id':id})
h.flash_success(_('Refresh requested, harvesting will take place within 15 minutes.'))
except NotFound:
abort(404,_('Harvest source not found'))
except NotAuthorized,e:
abort(401,self.not_auth_message)
except Exception, e:
msg = 'An error occurred: [%s]' % e.message
h.flash_error(msg)
@ -153,23 +225,28 @@ class ViewController(BaseController):
redirect(h.url_for('harvest'))
def show_object(self,id):
try:
object = get_harvest_object(id)
context = {'model':model, 'user':c.user}
obj = get_action('harvest_object_show')(context, {'id':id})
# Check content type. It will probably be either XML or JSON
try:
etree.fromstring(object['content'])
etree.fromstring(obj['content'])
response.content_type = 'application/xml'
except XMLSyntaxError:
try:
json.loads(object['content'])
json.loads(obj['content'])
response.content_type = 'application/json'
except ValueError:
pass
response.headers["Content-Length"] = len(object['content'])
return object['content']
response.headers['Content-Length'] = len(obj['content'])
return obj['content']
except NotFound:
abort(404,_('Harvest object not found'))
except NotAuthorized,e:
abort(401,self.not_auth_message)
except Exception, e:
msg = 'An error occurred: [%s]' % e.message
h.flash_error(msg)

View File

@ -1,379 +0,0 @@
import urlparse
import re
from sqlalchemy import distinct,func
from ckan.model import Session, repo
from ckan.model import Package
from ckan.lib.navl.dictization_functions import validate
from ckan.logic import NotFound, ValidationError
from ckanext.harvest.logic.schema import harvest_source_form_schema
from ckan.plugins import PluginImplementations
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, \
HarvestGatherError, HarvestObjectError
from ckanext.harvest.queue import get_gather_publisher
from ckanext.harvest.interfaces import IHarvester
import logging
log = logging.getLogger('ckanext')
def _get_source_status(source, detailed=True):
out = dict()
job_count = HarvestJob.filter(source=source).count()
if not job_count:
out['msg'] = 'No jobs yet'
return out
out = {'next_harvest':'',
'last_harvest_request':'',
'last_harvest_statistics':{'added':0,'updated':0,'errors':0},
'last_harvest_errors':{'gather':[],'object':[]},
'overall_statistics':{'added':0, 'errors':0},
'packages':[]}
# Get next scheduled job
next_job = HarvestJob.filter(source=source,status=u'New').first()
if next_job:
out['next_harvest'] = 'Scheduled'
else:
out['next_harvest'] = 'Not yet scheduled'
# Get the last finished job
last_job = HarvestJob.filter(source=source,status=u'Finished') \
.order_by(HarvestJob.created.desc()).first()
if last_job:
#TODO: Should we encode the dates as strings?
out['last_harvest_request'] = str(last_job.gather_finished)
#Get HarvestObjects from last job whit links to packages
if detailed:
last_objects = [obj for obj in last_job.objects if obj.package is not None]
if len(last_objects) == 0:
# No packages added or updated
out['last_harvest_statistics']['added'] = 0
out['last_harvest_statistics']['updated'] = 0
else:
# Check wether packages were added or updated
for last_object in last_objects:
# Check if the same package had been linked before
previous_objects = Session.query(HarvestObject) \
.filter(HarvestObject.package==last_object.package) \
.count()
if previous_objects == 1:
# It didn't previously exist, it has been added
out['last_harvest_statistics']['added'] += 1
else:
# Pacakge already existed, but it has been updated
out['last_harvest_statistics']['updated'] += 1
# Last harvest errors
# We have the gathering errors in last_job.gather_errors, so let's also
# get also the object errors.
object_errors = Session.query(HarvestObjectError).join(HarvestObject) \
.filter(HarvestObject.job==last_job)
out['last_harvest_statistics']['errors'] = len(last_job.gather_errors) \
+ object_errors.count()
if detailed:
for gather_error in last_job.gather_errors:
out['last_harvest_errors']['gather'].append(gather_error.message)
for object_error in object_errors:
err = {'object_id':object_error.object.id,'object_guid':object_error.object.guid,'message': object_error.message}
out['last_harvest_errors']['object'].append(err)
# Overall statistics
packages = Session.query(distinct(HarvestObject.package_id),Package.name) \
.join(Package).join(HarvestSource) \
.filter(HarvestObject.source==source) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active')
out['overall_statistics']['added'] = packages.count()
if detailed:
for package in packages:
out['packages'].append(package.name)
gather_errors = Session.query(HarvestGatherError) \
.join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source==source).count()
object_errors = Session.query(HarvestObjectError) \
.join(HarvestObject).join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source==source).count()
out['overall_statistics']['errors'] = gather_errors + object_errors
else:
out['last_harvest_request'] = 'Not yet harvested'
return out
def _source_as_dict(source, detailed=True):
out = source.as_dict()
out['jobs'] = []
for job in source.jobs:
out['jobs'].append(job.as_dict())
out['status'] = _get_source_status(source, detailed=detailed)
return out
def _job_as_dict(job):
out = job.as_dict()
out['source'] = job.source.as_dict()
out['objects'] = []
out['gather_errors'] = []
for obj in job.objects:
out['objects'].append(obj.as_dict())
for error in job.gather_errors:
out['gather_errors'].append(error.as_dict())
return out
def _object_as_dict(obj):
out = obj.as_dict()
out['source'] = obj.source.as_dict()
out['job'] = obj.job.as_dict()
if obj.package:
out['package'] = obj.package.as_dict()
out['errors'] = []
for error in obj.errors:
out['errors'].append(error.as_dict())
return out
def _prettify(field_name):
field_name = re.sub('(?<!\w)[Uu]rl(?!\w)', 'URL', field_name.replace('_', ' ').capitalize())
return field_name.replace('_', ' ')
def _error_summary(error_dict):
error_summary = {}
for key, error in error_dict.iteritems():
error_summary[_prettify(key)] = error[0]
return error_summary
def get_harvest_source(id,attr=None):
source = HarvestSource.get(id,attr=attr)
if not source:
raise NotFound
return _source_as_dict(source)
def get_harvest_sources(**kwds):
sources = HarvestSource.filter(**kwds) \
.order_by(HarvestSource.created.desc()) \
.all()
return [_source_as_dict(source, detailed=False) for source in sources]
def create_harvest_source(data_dict):
schema = harvest_source_form_schema()
data, errors = validate(data_dict, schema)
if errors:
Session.rollback()
raise ValidationError(errors,_error_summary(errors))
source = HarvestSource()
source.url = data['url']
source.type = data['type']
opt = ['active','title','description','user_id','publisher_id','config']
for o in opt:
if o in data and data[o] is not None:
source.__setattr__(o,data[o])
if 'active' in data_dict:
source.active = data['active']
source.save()
return _source_as_dict(source)
def edit_harvest_source(source_id,data_dict):
schema = harvest_source_form_schema()
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
# Add source id to the dict, as some validators will need it
data_dict['id'] = source.id
data, errors = validate(data_dict, schema)
if errors:
Session.rollback()
raise ValidationError(errors,_error_summary(errors))
fields = ['url','title','type','description','user_id','publisher_id']
for f in fields:
if f in data and data[f] is not None:
source.__setattr__(f,data[f])
if 'active' in data_dict:
source.active = data['active']
if 'config' in data_dict:
source.config = data['config']
source.save()
# Abort any pending jobs
if not source.active:
jobs = HarvestJob.filter(source=source,status=u'New')
if jobs:
for job in jobs:
job.status = u'Aborted'
job.save()
return _source_as_dict(source)
def remove_harvest_source(source_id):
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
# Don't actually delete the record, just flag it as inactive
source.active = False
source.save()
# Abort any pending jobs
jobs = HarvestJob.filter(source=source,status=u'New')
if jobs:
for job in jobs:
job.status = u'Aborted'
job.save()
return True
def get_harvest_job(id,attr=None):
job = HarvestJob.get(id,attr=attr)
if not job:
raise NotFound
return _job_as_dict(job)
def get_harvest_jobs(**kwds):
jobs = HarvestJob.filter(**kwds).all()
return [_job_as_dict(job) for job in jobs]
def create_harvest_job(source_id):
# Check if source exists
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
# Check if the source is active
if not source.active:
raise Exception('Can not create jobs on inactive sources')
# Check if there already is an unrun job for this source
exists = get_harvest_jobs(source=source,status=u'New')
if len(exists):
raise Exception('There already is an unrun job for this source')
job = HarvestJob()
job.source = source
job.save()
return _job_as_dict(job)
def run_harvest_jobs():
# Check if there are pending harvest jobs
jobs = get_harvest_jobs(status=u'New')
if len(jobs) == 0:
raise Exception('There are no new harvesting jobs')
# Send each job to the gather queue
publisher = get_gather_publisher()
sent_jobs = []
for job in jobs:
if job['source']['active']:
publisher.send({'harvest_job_id': job['id']})
log.info('Sent job %s to the gather queue' % job['id'])
sent_jobs.append(job)
publisher.close()
return sent_jobs
def get_harvest_object(id,attr=None):
obj = HarvestObject.get(id,attr=attr)
if not obj:
raise NotFound
return _object_as_dict(obj)
def get_harvest_objects(**kwds):
objects = HarvestObject.filter(**kwds).all()
return [_object_as_dict(obj) for obj in objects]
def import_last_objects(source_id=None):
if source_id:
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
if not source.active:
raise Exception('This harvest source is not active')
last_objects_ids = Session.query(HarvestObject.id) \
.join(HarvestSource).join(Package) \
.filter(HarvestObject.source==source) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.all()
else:
last_objects_ids = Session.query(HarvestObject.id) \
.join(Package) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.all()
last_objects = []
for obj_id in last_objects_ids:
obj = Session.query(HarvestObject).get(obj_id)
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == obj.source.type:
if hasattr(harvester,'force_import'):
harvester.force_import = True
harvester.import_stage(obj)
break
last_objects.append(obj)
return last_objects
def create_harvest_job_all():
# Get all active sources
sources = get_harvest_sources(active=True)
jobs = []
# Create a new job for each
for source in sources:
job = create_harvest_job(source['id'])
jobs.append(job)
return jobs
def get_registered_harvesters_info():
available_harvesters = []
for harvester in PluginImplementations(IHarvester):
info = harvester.info()
if not info or 'name' not in info:
log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester))
continue
info['show_config'] = (info.get('form_config_interface','') == 'Text')
available_harvesters.append(info)
return available_harvesters

View File

@ -0,0 +1,7 @@
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)

View File

@ -0,0 +1,106 @@
import re
from ckan.logic import NotFound, ValidationError, check_access
from ckan.lib.navl.dictization_functions import validate
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
from ckanext.harvest.logic.schema import default_harvest_source_schema
from ckanext.harvest.logic.dictization import (harvest_source_dictize,
harvest_job_dictize)
from ckanext.harvest.logic.action.get import harvest_source_list,harvest_job_list
def harvest_source_create(context,data_dict):
check_access('harvest_source_create',context,data_dict)
model = context['model']
session = context['session']
schema = context.get('schema') or default_harvest_source_schema()
data, errors = validate(data_dict, schema)
if errors:
session.rollback()
raise ValidationError(errors,_error_summary(errors))
source = HarvestSource()
source.url = data['url']
source.type = data['type']
opt = ['active','title','description','user_id','publisher_id','config']
for o in opt:
if o in data and data[o] is not None:
source.__setattr__(o,data[o])
if 'active' in data_dict:
source.active = data['active']
source.save()
return harvest_source_dictize(source,context)
def harvest_job_create(context,data_dict):
check_access('harvest_job_create',context,data_dict)
source_id = data_dict['source_id']
# Check if source exists
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
# Check if the source is active
if not source.active:
raise Exception('Can not create jobs on inactive sources')
# Check if there already is an unrun job for this source
data_dict ={
'source_id':source_id,
'status':u'New'
}
exists = harvest_job_list(context,data_dict)
if len(exists):
raise Exception('There already is an unrun job for this source')
job = HarvestJob()
job.source = source
job.save()
return harvest_job_dictize(job,context)
def harvest_job_create_all(context,data_dict):
check_access('harvest_job_create_all',context,data_dict)
data_dict.update({'only_active':True})
# Get all active sources
sources = harvest_source_list(context,data_dict)
jobs = []
# Create a new job for each, if there isn't already one
for source in sources:
data_dict ={
'source_id':source['id'],
'status':u'New'
}
exists = harvest_job_list(context,data_dict)
if len(exists):
continue
job = harvest_job_create(context,{'source_id':source['id']})
jobs.append(job)
return jobs
def _error_summary(error_dict):
error_summary = {}
for key, error in error_dict.iteritems():
error_summary[_prettify(key)] = error[0]
return error_summary
def _prettify(field_name):
field_name = re.sub('(?<!\w)[Uu]rl(?!\w)', 'URL', field_name.replace('_', ' ').capitalize())
return field_name.replace('_', ' ')

View File

@ -0,0 +1,26 @@
from ckan.logic import NotFound, check_access
from ckanext.harvest.model import (HarvestSource, HarvestJob)
def harvest_source_delete(context,data_dict):
check_access('harvest_source_delete',context,data_dict)
source_id = data_dict.get('id')
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
# Don't actually delete the record, just flag it as inactive
source.active = False
source.save()
# Abort any pending jobs
jobs = HarvestJob.filter(source=source,status=u'New')
if jobs:
for job in jobs:
job.status = u'Aborted'
job.save()
return True

View File

@ -0,0 +1,162 @@
from sqlalchemy import or_
from ckan.authz import Authorizer
from ckan.model import User
from ckan.plugins import PluginImplementations
from ckanext.harvest.interfaces import IHarvester
from ckan.logic import NotFound, check_access
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
from ckanext.harvest.logic.dictization import (harvest_source_dictize,
harvest_job_dictize,
harvest_object_dictize)
def harvest_source_show(context,data_dict):
check_access('harvest_source_show',context,data_dict)
id = data_dict.get('id')
attr = data_dict.get('attr',None)
source = HarvestSource.get(id,attr=attr)
if not source:
raise NotFound
return harvest_source_dictize(source,context)
def harvest_source_list(context, data_dict):
check_access('harvest_source_list',context,data_dict)
model = context['model']
session = context['session']
user = context.get('user','')
sources = _get_sources_for_user(context, data_dict)
context.update({'detailed':False})
return [harvest_source_dictize(source, context) for source in sources]
def harvest_job_show(context,data_dict):
check_access('harvest_job_show',context,data_dict)
id = data_dict.get('id')
attr = data_dict.get('attr',None)
job = HarvestJob.get(id,attr=attr)
if not job:
raise NotFound
return harvest_job_dictize(job,context)
def harvest_job_list(context,data_dict):
check_access('harvest_job_list',context,data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('source_id',False)
status = data_dict.get('status',False)
query = session.query(HarvestJob)
if source_id:
query = query.filter(HarvestJob.source_id==source_id)
if status:
query = query.filter(HarvestJob.status==status)
jobs = query.all()
return [harvest_job_dictize(job,context) for job in jobs]
def harvest_object_show(context,data_dict):
check_access('harvest_object_show',context,data_dict)
id = data_dict.get('id')
attr = data_dict.get('attr',None)
obj = HarvestObject.get(id,attr=attr)
if not obj:
raise NotFound
return harvest_object_dictize(obj,context)
def harvest_object_list(context,data_dict):
check_access('harvest_object_list',context,data_dict)
model = context['model']
session = context['session']
only_current = data_dict.get('only_current',True)
source_id = data_dict.get('source_id',False)
query = session.query(HarvestObject)
if source_id:
query = query.filter(HarvestObject.source_id==source_id)
if only_current:
query = query.filter(HarvestObject.current==True)
objects = query.all()
return [getattr(obj,'id') for obj in objects]
def harvesters_info_show(context,data_dict):
check_access('harvesters_info_show',context,data_dict)
available_harvesters = []
for harvester in PluginImplementations(IHarvester):
info = harvester.info()
if not info or 'name' not in info:
log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester))
continue
info['show_config'] = (info.get('form_config_interface','') == 'Text')
available_harvesters.append(info)
return available_harvesters
def _get_sources_for_user(context,data_dict):
model = context['model']
session = context['session']
user = context.get('user','')
only_active = data_dict.get('only_active',False)
query = session.query(HarvestSource) \
.order_by(HarvestSource.created.desc())
if only_active:
query = query.filter(HarvestSource.active==True) \
# Sysadmins will get all sources
if not Authorizer().is_sysadmin(user):
# This only applies to a non sysadmin user when using the
# publisher auth profile. When using the default profile,
# normal users will never arrive at this point, but even if they
# do, they will get an empty list.
user_obj = User.get(user)
publisher_filters = []
for publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
publisher_filters.append(HarvestSource.publisher_id==publisher_id)
if len(publisher_filters):
query = query.filter(or_(*publisher_filters))
else:
# This user does not belong to a publisher yet, no sources for him/her
return []
sources = query.all()
return sources

View File

@ -0,0 +1,136 @@
import logging
from ckan.plugins import PluginImplementations
from ckanext.harvest.interfaces import IHarvester
from ckan.model import Package
from ckan.logic import NotFound, ValidationError, check_access
from ckan.lib.navl.dictization_functions import validate
from ckanext.harvest.queue import get_gather_publisher
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
from ckanext.harvest.logic.schema import default_harvest_source_schema
from ckanext.harvest.logic.dictization import (harvest_source_dictize,harvest_object_dictize)
from ckanext.harvest.logic.action.create import _error_summary
from ckanext.harvest.logic.action.get import harvest_source_show,harvest_job_list
log = logging.getLogger(__name__)
def harvest_source_update(context,data_dict):
check_access('harvest_source_update',context,data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('id')
schema = context.get('schema') or default_harvest_source_schema()
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
data, errors = validate(data_dict, schema)
if errors:
session.rollback()
raise ValidationError(errors,_error_summary(errors))
fields = ['url','title','type','description','user_id','publisher_id']
for f in fields:
if f in data and data[f] is not None:
source.__setattr__(f,data[f])
if 'active' in data_dict:
source.active = data['active']
if 'config' in data_dict:
source.config = data['config']
source.save()
# Abort any pending jobs
if not source.active:
jobs = HarvestJob.filter(source=source,status=u'New')
if jobs:
for job in jobs:
job.status = u'Aborted'
job.save()
return harvest_source_dictize(source,context)
def harvest_objects_import(context,data_dict):
'''
Reimports the current harvest objects
It performs the import stage with the last fetched objects, optionally
belonging to a certain source.
Please note that no objects will be fetched from the remote server.
It will only affect the last fetched objects already present in the
database.
'''
check_access('harvest_objects_import',context,data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('source_id',None)
if source_id:
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
if not source.active:
raise Exception('This harvest source is not active')
last_objects_ids = session.query(HarvestObject.id) \
.join(HarvestSource).join(Package) \
.filter(HarvestObject.source==source) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.all()
else:
last_objects_ids = session.query(HarvestObject.id) \
.join(Package) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.all()
last_objects = []
for obj_id in last_objects_ids:
obj = session.query(HarvestObject).get(obj_id)
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == obj.source.type:
if hasattr(harvester,'force_import'):
harvester.force_import = True
harvester.import_stage(obj)
break
last_objects.append(harvest_object_dictize(obj,context))
return last_objects
def harvest_jobs_run(context,data_dict):
check_access('harvest_jobs_run',context,data_dict)
source_id = data_dict.get('source_id',None)
# Check if there are pending harvest jobs
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
if len(jobs) == 0:
raise Exception('There are no new harvesting jobs')
# Send each job to the gather queue
publisher = get_gather_publisher()
sent_jobs = []
for job in jobs:
source = harvest_source_show(context,{'id':job['source']})
if source['active']:
publisher.send({'harvest_job_id': job['id']})
log.info('Sent job %s to the gather queue' % job['id'])
sent_jobs.append(job)
publisher.close()
return sent_jobs

View File

@ -0,0 +1,27 @@
from ckan.logic import NotFound
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
def get_source_object(context, data_dict = {}):
if not 'source' in context:
model = context['model']
id = data_dict.get('id',None)
source = HarvestSource.get(id)
if not source:
raise NotFound
else:
source = context['source']
return source
def get_job_object(context, data_dict = {}):
if not 'job' in context:
model = context['model']
id = data_dict.get('id',None)
job = HarvestJob.get(id)
if not job:
raise NotFound
else:
job = context['job']
return job

View File

@ -0,0 +1,30 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
def harvest_source_create(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to create harvest sources') % str(user)}
else:
return {'success': True}
def harvest_job_create(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to create harvest jobs') % str(user)}
else:
return {'success': True}
def harvest_job_create_all(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to create harvest jobs for all sources') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,13 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
def harvest_source_delete(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to delete harvest sources') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,67 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
def harvest_source_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to read this harvest source') % str(user)}
else:
return {'success': True}
def harvest_source_list(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to see the harvest sources') % str(user)}
else:
return {'success': True}
def harvest_job_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to read this harvest job') % str(user)}
else:
return {'success': True}
def harvest_job_list(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to see the harvest jobs') % str(user)}
else:
return {'success': True}
def harvest_object_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to read this harvest object') % str(user)}
else:
return {'success': True}
def harvest_object_list(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to see the harvest objects') % str(user)}
else:
return {'success': True}
def harvesters_info_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to see the harvesters information') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,7 @@
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)

View File

@ -0,0 +1,53 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
from ckanext.harvest.model import HarvestSource
def harvest_source_create(context,data_dict):
model = context['model']
user = context.get('user','')
# Non-logged users can not create sources
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to create harvest sources')}
# Sysadmins and the rest of logged users can create sources,
# as long as they belong to a publisher
user_obj = User.get(user)
if not user_obj or not Authorizer().is_sysadmin(user) and len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to create harvest sources') % str(user)}
else:
return {'success': True}
def harvest_job_create(context,data_dict):
model = context['model']
user = context.get('user')
source_id = data_dict['source_id']
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to create harvest jobs')}
if Authorizer().is_sysadmin(user):
return {'success': True}
user_obj = User.get(user)
source = HarvestSource.get(source_id)
if not source:
raise NotFound
if not user_obj or not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to create a job for source %s') % (str(user),source.id)}
else:
return {'success': True}
def harvest_job_create_all(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('Only sysadmins can create harvest jobs for all sources') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,27 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
from ckanext.harvest.logic.auth import get_source_object
def harvest_source_delete(context,data_dict):
model = context['model']
user = context.get('user','')
source = get_source_object(context,data_dict)
# Non-logged users can not delete this source
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to delete harvest sources')}
# Sysadmins can delete the source
if Authorizer().is_sysadmin(user):
return {'success': True}
# Check if the source publisher id exists on the user's groups
user_obj = User.get(user)
if not user_obj or not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to delete harvest source %s') % (str(user),source.id)}
else:
return {'success': True}

View File

@ -0,0 +1,156 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
from ckanext.harvest.model import HarvestSource
from ckanext.harvest.logic.auth import get_source_object, get_job_object
def harvest_source_show(context,data_dict):
model = context['model']
user = context.get('user','')
source = get_source_object(context,data_dict)
# Non-logged users can not read the source
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to see harvest sources')}
# Sysadmins can read the source
if Authorizer().is_sysadmin(user):
return {'success': True}
# Check if the source publisher id exists on the user's groups
user_obj = User.get(user)
if not user_obj or not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to read harvest source %s') % (str(user),source.id)}
else:
return {'success': True}
def harvest_source_list(context,data_dict):
model = context['model']
user = context.get('user')
# Here we will just check that the user is logged in.
# The logic action will return an empty list if the user does not
# have permissons on any source.
if not user:
return {'success': False, 'msg': _('Only logged users are authorized to see their sources')}
else:
user_obj = User.get(user)
# Only users belonging to a publisher can list sources,
# unless they are sysadmins
if not user_obj or not Authorizer().is_sysadmin(user) and len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to list harvest sources') % str(user)}
else:
return {'success': True}
def harvest_job_show(context,data_dict):
model = context['model']
user = context.get('user')
job = get_job_object(context,data_dict)
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to see harvest jobs')}
if Authorizer().is_sysadmin(user):
return {'success': True}
user_obj = User.get(user)
if not user_obj or not job.source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to read harvest job %s') % (str(user),job.id)}
else:
return {'success': True}
def harvest_job_list(context,data_dict):
model = context['model']
user = context.get('user')
# Check user is logged in
if not user:
return {'success': False, 'msg': _('Only logged users are authorized to see their sources')}
user_obj = User.get(user)
# Checks for non sysadmin users
if not Authorizer().is_sysadmin(user):
if not user_obj or len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to list harvest jobs') % str(user)}
source_id = data_dict.get('source_id',False)
if not source_id:
return {'success': False, 'msg': _('Only sysadmins can list all harvest jobs') % str(user)}
source = HarvestSource.get(source_id)
if not source:
raise NotFound
if not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to list jobs from source %s') % (str(user),source.id)}
return {'success': True}
def harvest_object_show(context,data_dict):
model = context['model']
user = context.get('user')
obj = get_obj_object(context,data_dict)
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to see harvest objects')}
if Authorizer().is_sysadmin(user):
return {'success': True}
user_obj = User.get(user)
if not user_obj or not obj.source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to read harvest object %s') % (str(user),obj.id)}
else:
return {'success': True}
def harvest_object_list(context,data_dict):
model = context['model']
user = context.get('user')
# Check user is logged in
if not user:
return {'success': False, 'msg': _('Only logged users are authorized to see their sources')}
user_obj = User.get(user)
# Checks for non sysadmin users
if not Authorizer().is_sysadmin(user):
if not user_obj or len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to list harvest objects') % str(user)}
source_id = data_dict.get('source_id',False)
if not source_id:
return {'success': False, 'msg': _('Only sysadmins can list all harvest objects') % str(user)}
source = HarvestSource.get(source_id)
if not source:
raise NotFound
if not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to list objects from source %s') % (str(user),source.id)}
return {'success': True}
def harvesters_info_show(context,data_dict):
model = context['model']
user = context.get('user','')
# Non-logged users can not create sources
if not user:
return {'success': False, 'msg': _('Non-logged in users can not see the harvesters info')}
# Sysadmins and the rest of logged users can see the harvesters info,
# as long as they belong to a publisher
user_obj = User.get(user)
if not user_obj or not Authorizer().is_sysadmin(user) and len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to see the harvesters info') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,83 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
from ckanext.harvest.logic.auth import get_source_object
def harvest_source_update(context,data_dict):
model = context['model']
user = context.get('user','')
source = get_source_object(context,data_dict)
# Non-logged users can not update this source
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to update harvest sources')}
# Sysadmins can update the source
if Authorizer().is_sysadmin(user):
return {'success': True}
# Check if the source publisher id exists on the user's groups
user_obj = User.get(user)
if not user_obj or not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to update harvest source %s') % (str(user),source.id)}
else:
return {'success': True}
def harvest_objects_import(context,data_dict):
model = context['model']
user = context.get('user')
# Check user is logged in
if not user:
return {'success': False, 'msg': _('Only logged users are authorized to reimport harvest objects')}
user_obj = User.get(user)
# Checks for non sysadmin users
if not Authorizer().is_sysadmin(user):
if not user_obj or len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to reimport harvest objects') % str(user)}
source_id = data_dict.get('source_id',False)
if not source_id:
return {'success': False, 'msg': _('Only sysadmins can reimport all harvest objects') % str(user)}
source = HarvestSource.get(source_id)
if not source:
raise NotFound
if not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to reimport objects from source %s') % (str(user),source.id)}
return {'success': True}
def harvest_jobs_run(context,data_dict):
model = context['model']
user = context.get('user')
# Check user is logged in
if not user:
return {'success': False, 'msg': _('Only logged users are authorized to run harvest jobs')}
user_obj = User.get(user)
# Checks for non sysadmin users
if not Authorizer().is_sysadmin(user):
if not user_obj or len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to run harvest jobs') % str(user)}
source_id = data_dict.get('source_id',False)
if not source_id:
return {'success': False, 'msg': _('Only sysadmins can run all harvest jobs') % str(user)}
source = HarvestSource.get(source_id)
if not source:
raise NotFound
if not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to run jobs from source %s') % (str(user),source.id)}
return {'success': True}

View File

@ -0,0 +1,30 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
def harvest_source_update(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to update harvest sources') % str(user)}
else:
return {'success': True}
def harvest_objects_import(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to reimport harvest objects') % str(user)}
else:
return {'success': True}
def harvest_jobs_run(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to run the pending harvest jobs') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,153 @@
from sqlalchemy import distinct
from ckan.model import Package,Group
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, \
HarvestGatherError, HarvestObjectError
def harvest_source_dictize(source, context):
out = source.as_dict()
out['publisher_title'] = u''
publisher_id = out.get('publisher_id')
if publisher_id:
group = Group.get(publisher_id)
if group:
out['publisher_title'] = group.title
out['status'] = _get_source_status(source, context)
return out
def harvest_job_dictize(job, context):
out = job.as_dict()
out['source'] = job.source_id
out['objects'] = []
out['gather_errors'] = []
for obj in job.objects:
out['objects'].append(obj.as_dict())
for error in job.gather_errors:
out['gather_errors'].append(error.as_dict())
return out
def harvest_object_dictize(obj, context):
out = obj.as_dict()
out['source'] = obj.harvest_source_id
out['job'] = obj.harvest_job_id
if obj.package:
out['package'] = obj.package.id
out['errors'] = []
for error in obj.errors:
out['errors'].append(error.as_dict())
return out
def _get_source_status(source, context):
model = context.get('model')
detailed = context.get('detailed',True)
out = dict()
job_count = HarvestJob.filter(source=source).count()
if not job_count:
out['msg'] = 'No jobs yet'
return out
out = {
'job_count': job_count,
'next_harvest':'',
'last_harvest_request':'',
'last_harvest_statistics':{'added':0,'updated':0,'errors':0},
'last_harvest_errors':{'gather':[],'object':[]},
'overall_statistics':{'added':0, 'errors':0},
'packages':[]}
# Get next scheduled job
next_job = HarvestJob.filter(source=source,status=u'New').first()
if next_job:
out['next_harvest'] = 'Scheduled'
else:
out['next_harvest'] = 'Not yet scheduled'
# Get the last finished job
last_job = HarvestJob.filter(source=source,status=u'Finished') \
.order_by(HarvestJob.created.desc()).first()
if last_job:
#TODO: Should we encode the dates as strings?
out['last_harvest_request'] = str(last_job.gather_finished)
#Get HarvestObjects from last job whit links to packages
if detailed:
last_objects = [obj for obj in last_job.objects if obj.package is not None]
if len(last_objects) == 0:
# No packages added or updated
out['last_harvest_statistics']['added'] = 0
out['last_harvest_statistics']['updated'] = 0
else:
# Check wether packages were added or updated
for last_object in last_objects:
# Check if the same package had been linked before
previous_objects = model.Session.query(HarvestObject) \
.filter(HarvestObject.package==last_object.package) \
.count()
if previous_objects == 1:
# It didn't previously exist, it has been added
out['last_harvest_statistics']['added'] += 1
else:
# Pacakge already existed, but it has been updated
out['last_harvest_statistics']['updated'] += 1
# Last harvest errors
# We have the gathering errors in last_job.gather_errors, so let's also
# get also the object errors.
object_errors = model.Session.query(HarvestObjectError).join(HarvestObject) \
.filter(HarvestObject.job==last_job)
out['last_harvest_statistics']['errors'] = len(last_job.gather_errors) \
+ object_errors.count()
if detailed:
for gather_error in last_job.gather_errors:
out['last_harvest_errors']['gather'].append(gather_error.message)
for object_error in object_errors:
err = {'object_id':object_error.object.id,'object_guid':object_error.object.guid,'message': object_error.message}
out['last_harvest_errors']['object'].append(err)
# Overall statistics
packages = model.Session.query(distinct(HarvestObject.package_id),Package.name) \
.join(Package).join(HarvestSource) \
.filter(HarvestObject.source==source) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active')
out['overall_statistics']['added'] = packages.count()
if detailed:
for package in packages:
out['packages'].append(package.name)
gather_errors = model.Session.query(HarvestGatherError) \
.join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source==source).count()
object_errors = model.Session.query(HarvestObjectError) \
.join(HarvestObject).join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source==source).count()
out['overall_statistics']['errors'] = gather_errors + object_errors
else:
out['last_harvest_request'] = 'Not yet harvested'
return out

View File

@ -1,3 +1,5 @@
from ckan.lib.base import config
from ckan.lib.navl.validators import (ignore_missing,
not_empty,
empty,
@ -20,11 +22,15 @@ def default_harvest_source_schema():
'title': [ignore_missing,unicode],
'description': [ignore_missing,unicode],
'active': [ignore_missing,harvest_source_active_validator],
'user_id': [ignore_missing],
'publisher_id': [ignore_missing],
'user_id': [ignore_missing,unicode],
'config': [ignore_missing,harvest_source_config_validator]
}
if config.get('ckan.harvest.auth.profile',None) == 'publisher':
schema['publisher_id'] = [not_empty,unicode]
else:
schema['publisher_id'] = [ignore_missing,unicode]
return schema

View File

@ -1,6 +1,7 @@
import os
from logging import getLogger
from pylons import config
from genshi.input import HTML
from genshi.filters import Transformer
@ -8,8 +9,8 @@ import ckan.lib.helpers as h
from ckan.plugins import implements, SingletonPlugin
from ckan.plugins import IRoutes, IConfigurer
from ckan.plugins import IConfigurable, IGenshiStreamFilter
from ckanext.harvest.model import setup
from ckan.plugins import IConfigurable, IActions, IAuthFunctions
from ckanext.harvest.model import setup as model_setup
log = getLogger(__name__)
@ -18,9 +19,32 @@ class Harvest(SingletonPlugin):
implements(IConfigurable)
implements(IRoutes, inherit=True)
implements(IConfigurer, inherit=True)
implements(IActions)
implements(IAuthFunctions)
def configure(self, config):
setup()
auth_profile = config.get('ckan.harvest.auth.profile',None)
if auth_profile:
# Check if auth profile exists
module_root = 'ckanext.harvest.logic.auth'
module_path = '%s.%s' % (module_root, auth_profile)
try:
module = __import__(module_path)
except ImportError,e:
raise ImportError('Unknown auth profile: %s' % auth_profile)
# If we are using the publisher auth profile, make sure CKAN core
# also uses it.
if auth_profile == 'publisher' and \
not config.get('ckan.auth.profile','') == 'publisher':
raise Exception('You must enable the "publisher" auth profile'
+' in CKAN in order to use it on the harvest extension'
+' (adding "ckan.auth.profile=publisher" to your ini file)')
# Setup harvest model
model_setup()
def before_map(self, map):
@ -51,3 +75,71 @@ class Harvest(SingletonPlugin):
config['extra_public_paths'] += ',' + public_dir
else:
config['extra_public_paths'] = public_dir
def get_actions(self):
from ckanext.harvest.logic.action.get import (harvest_source_show,
harvest_source_list,
harvest_job_show,
harvest_job_list,
harvest_object_show,
harvest_object_list,
harvesters_info_show,)
from ckanext.harvest.logic.action.create import (harvest_source_create,
harvest_job_create,
harvest_job_create_all,)
from ckanext.harvest.logic.action.update import (harvest_source_update,
harvest_objects_import,
harvest_jobs_run)
from ckanext.harvest.logic.action.delete import (harvest_source_delete,)
return {
'harvest_source_show': harvest_source_show,
'harvest_source_list': harvest_source_list,
'harvest_job_show': harvest_job_show,
'harvest_job_list': harvest_job_list,
'harvest_object_show': harvest_object_show,
'harvest_object_list': harvest_object_list,
'harvesters_info_show': harvesters_info_show,
'harvest_source_create': harvest_source_create,
'harvest_job_create': harvest_job_create,
'harvest_job_create_all': harvest_job_create_all,
'harvest_source_update': harvest_source_update,
'harvest_source_delete': harvest_source_delete,
'harvest_objects_import': harvest_objects_import,
'harvest_jobs_run':harvest_jobs_run
}
def get_auth_functions(self):
module_root = 'ckanext.harvest.logic.auth'
auth_profile = config.get('ckan.harvest.auth.profile', '')
auth_functions = _get_auth_functions(module_root)
if auth_profile:
module_root = '%s.%s' % (module_root, auth_profile)
auth_functions = _get_auth_functions(module_root,auth_functions)
log.info('Using auth profile at %s' % module_root)
return auth_functions
def _get_auth_functions(module_root, auth_functions = {}):
for auth_module_name in ['get', 'create', 'update','delete']:
module_path = '%s.%s' % (module_root, auth_module_name,)
try:
module = __import__(module_path)
except ImportError,e:
log.debug('No auth module for action "%s"' % auth_module_name)
continue
for part in module_path.split('.')[1:]:
module = getattr(module, part)
for key, value in module.__dict__.items():
if not key.startswith('_'):
auth_functions[key] = value
return auth_functions

View File

@ -46,6 +46,11 @@ body.index.ViewController #content {
color: red;
}
#harvest-sources td{
background-color: white !important;
border-bottom: 1px solid #E3E3E3;
}
.harvester-title{
font-weight: bold;
}
@ -67,3 +72,10 @@ body.index.ViewController #content {
font-weight:bold;
color: red;
}
#harvest-sources .publisher > td{
background-color: #E3E3E3 !important;
padding: 3px;
font-weight: bold;
}

View File

@ -26,7 +26,7 @@
<label for="show-inactive-sources"> Show inactive sources</label>
</div>
<table id="harvest-sources">
<table id="harvest-sources" class="${'publishers' if c.publisher_auth else ''}" >
<tr>
<th class="action">View</th>
<th class="action">Edit</th>
@ -38,8 +38,21 @@
<th>Next Harvest</th>
<th>Created</th>
</tr>
<?python old_publisher = None ?>
<py:for each="source in c.sources">
<tr class="publisher" py:if="c.publisher_auth and old_publisher != source['publisher_id']">
<py:choose>
<py:when test="source.get('publisher_title')">
<td colspan="9">${source['publisher_title']}</td>
</py:when>
<py:otherwise>
<td colspan="9">${source['publisher_id']}</td>
</py:otherwise>
</py:choose>
<tr py:for="source in c.sources" class="${'active' if source.active else 'inactive'}">
</tr>
<?python old_publisher = source['publisher_id'] ?>
<tr class="${'active' if source.active else 'inactive'}">
<td><a href="harvest/${source.id}"><img src="ckanext/harvest/images/icons/source_view.png" alt="View" title="View" /></a></td>
<td><a href="harvest/edit/${source.id}"><img src="ckanext/harvest/images/icons/source_edit.png" alt="Edit" title="Edit" /></a></td>
<td><a href="harvest/refresh/${source.id}"><img src="ckanext/harvest/images/icons/source_refresh.png" alt="Refresh" title="Refresh" /></a></td>
@ -67,6 +80,7 @@
<td>${h.render_datetime(source.created)}</td>
</tr>
</py:for>
</table>
</py:when>
<py:otherwise>

View File

@ -44,8 +44,20 @@
<dt><label class="field_opt" for="description">Description</label></dt>
<dd><textarea id="description" name="description" cols="30" rows="2" style="height:75px">${data.get('description', '')}</textarea></dd>
<dd class="instructions basic">You can add your own notes here about what the URL above represents to remind you later.</dd>
<dt><label class="field_opt" for="config">Configuration</label></dt>
<dd><textarea id="config" name="config" cols="30" rows="2" style="height:75px">${data.get('config', '')}</textarea></dd>
<dt py:if="c.publisher_auth"><label class="field_opt" for="groups__${len(data.get('groups', []))}__id">Publisher</label></dt>
<dd py:if="c.publisher_auth and c.groups">
<select id="publisher_id" name="publisher_id">
<py:for each="group in c.groups">
<option value="${group['id']}" py:attrs="{'selected': 'selected' if group['id'] == data.get('publisher_id',None) else None}">${group['title']}</option>
</py:for>
</select>
</dd>
<dd py:if="c.publisher_auth and not c.groups"><em>Cannot add any publishers.</em></dd>
<dt class="harvest-source-config"><label class="field_opt" for="config">Configuration</label></dt>
<dd class="harvest-source-config"><textarea id="config" name="config" cols="30" rows="2" style="height:75px">${data.get('config', '')}</textarea></dd>
<dt><label class="field_opt" for="active">State</label></dt>
<dd>

View File

@ -15,7 +15,10 @@
<h1>Harvest Source Details</h1>
<div id="harvest-source-actions">
<img src="/ckanext/harvest/images/icons/source_edit.png" alt="Edit" /><a href="/harvest/edit/${c.source.id}">Edit source</a> |
<img src="/ckanext/harvest/images/icons/source_refresh.png" alt="Refresh" /><a href="/harvest/refresh/${c.source.id}">Refresh source</a></div>
<img src="/ckanext/harvest/images/icons/source_refresh.png" alt="Refresh" /><a href="/harvest/refresh/${c.source.id}">Refresh source</a> |
<a href="/harvest">Sources list</a>
</div>
<table id="harvest-source-details">
<tr>
<th>ID</th>
@ -51,13 +54,18 @@
<td>-</td>
</py:if>
</tr>
<tr>
<tr py:if="c.publisher_auth">
<th>User</th>
<td>${c.source.user_id}</td>
</tr>
<tr>
<tr py:if="c.publisher_auth">
<th>Publisher</th>
<py:if test="c.source.publisher_title">
<td>${c.source.publisher_title}</td>
</py:if>
<py:if test="not c.source.publisher_title">
<td>${c.source.publisher_id}</td>
</py:if>
</tr>
<tr>
<th>Created</th>
@ -65,7 +73,7 @@
</tr>
<tr>
<th>Total jobs</th>
<td>${len(c.source.jobs)}</td>
<td>${c.source.status.job_count}</td>
</tr>
<tr>
<th>Status</th>

View File

@ -0,0 +1,223 @@
import logging
from pprint import pprint
from nose.plugins.skip import SkipTest;
from ckan import model
from ckan.model import Package, Session
from ckan.lib.helpers import url_for,json
from ckan.lib.base import config
from ckan.tests import CreateTestData
from ckan.tests.functional.base import FunctionalTestCase
from ckanext.harvest.plugin import Harvest
from ckanext.harvest.model import HarvestSource, HarvestJob, setup as harvest_model_setup
log = logging.getLogger(__name__)
class HarvestAuthBaseCase():
@classmethod
def setup_class(cls):
harvest_model_setup()
@classmethod
def teardown_class(cls):
pass
def _test_auth_not_allowed(self,user_name = None, source = None, status = 401):
if not source:
# Create harvest source
source = HarvestSource(url=u'http://test-source.com',type='ckan')
Session.add(source)
Session.commit()
if user_name:
extra_environ = {'REMOTE_USER': user_name.encode('utf8')}
else:
extra_environ = {}
# List
res = self.app.get('/harvest', status=status, extra_environ=extra_environ)
# Create
res = self.app.get('/harvest/new', status=status, extra_environ=extra_environ)
# Read
res = self.app.get('/harvest/%s' % source.id, status=status, extra_environ=extra_environ)
# Edit
res = self.app.get('/harvest/edit/%s' % source.id, status=status, extra_environ=extra_environ)
# Refresh
res = self.app.get('/harvest/refresh/%s' % source.id, status=status, extra_environ=extra_environ)
def _test_auth_allowed(self,user_name,auth_profile=None):
extra_environ={'REMOTE_USER': user_name.encode('utf8')}
# List
res = self.app.get('/harvest', extra_environ=extra_environ)
assert 'Harvesting Sources' in res
# Create
res = self.app.get('/harvest/new', extra_environ=extra_environ)
assert 'New harvest source' in res
if auth_profile == 'publisher':
assert 'publisher_id' in res
else:
assert not 'publisher_id' in res
fv = res.forms['source-new']
fv['url'] = u'http://test-source.com'
fv['type'] = u'ckan'
fv['title'] = u'Test harvest source'
fv['description'] = u'Test harvest source'
fv['config'] = u'{"a":1,"b":2}'
if auth_profile == 'publisher':
fv['publisher_id'] = self.publisher1.id
res = fv.submit('save', extra_environ=extra_environ)
assert not 'Error' in res, res
source = Session.query(HarvestSource).first()
assert source.url == u'http://test-source.com'
assert source.type == u'ckan'
# Read
res = self.app.get('/harvest/%s' % source.id, extra_environ=extra_environ)
assert 'Harvest Source Details' in res
assert source.id in res
assert source.title in res
# Edit
res = self.app.get('/harvest/edit/%s' % source.id, extra_environ=extra_environ)
assert 'Edit harvest source' in res
if auth_profile == 'publisher':
assert 'publisher_id' in res
else:
assert not 'publisher_id' in res
fv = res.forms['source-new']
fv['title'] = u'Test harvest source Updated'
res = fv.submit('save', extra_environ=extra_environ)
assert not 'Error' in res, res
source = Session.query(HarvestSource).first()
assert source.title == u'Test harvest source Updated'
# Refresh
res = self.app.get('/harvest/refresh/%s' % source.id, extra_environ=extra_environ)
job = Session.query(HarvestJob).first()
assert job.source_id == source.id
class TestAuthDefaultProfile(FunctionalTestCase,HarvestAuthBaseCase):
@classmethod
def setup_class(cls):
if (config.get('ckan.harvest.auth.profile','') != ''):
raise SkipTest('Skipping default auth profile tests. Set ckan.harvest.auth.profile = \'\' to run them')
super(TestAuthDefaultProfile,cls).setup_class()
def setup(self):
CreateTestData.create()
self.sysadmin_user = model.User.get('testsysadmin')
self.normal_user = model.User.get('annafan')
def teardown(self):
model.repo.rebuild_db()
def test_auth_default_profile_sysadmin(self):
self._test_auth_allowed(self.sysadmin_user.name)
def test_auth_default_profile_normal(self):
self._test_auth_not_allowed(self.normal_user.name)
def test_auth_default_profile_notloggedin(self):
self._test_auth_not_allowed(status=302)
class TestAuthPublisherProfile(FunctionalTestCase,HarvestAuthBaseCase):
@classmethod
def setup_class(cls):
if (config.get('ckan.harvest.auth.profile') != 'publisher'):
raise SkipTest('Skipping publisher auth profile tests. Set ckan.harvest.auth.profile = \'publisher\' to run them')
super(TestAuthPublisherProfile,cls).setup_class()
def setup(self):
model.Session.remove()
CreateTestData.create(auth_profile='publisher')
self.sysadmin_user = model.User.get('testsysadmin')
self.normal_user = model.User.get('annafan') # Does not belong to a publisher
self.publisher1_user = model.User.by_name('russianfan')
self.publisher2_user = model.User.by_name('tester')
# Create two Publishers
rev = model.repo.new_revision()
self.publisher1 = model.Group(name=u'test-publisher1',title=u'Test Publihser 1',type=u'publisher')
Session.add(self.publisher1)
self.publisher2 = model.Group(name=u'test-publisher2',title=u'Test Publihser 2',type=u'publisher')
Session.add(self.publisher2)
member1 = model.Member(table_name = 'user',
table_id = self.publisher1_user.id,
group=self.publisher1,
capacity='admin')
Session.add(member1)
member2 = model.Member(table_name = 'user',
table_id = self.publisher2_user.id,
group=self.publisher2,
capacity='admin')
Session.add(member2)
Session.commit()
def teardown(self):
model.repo.rebuild_db()
def test_auth_publisher_profile_normal(self):
self._test_auth_not_allowed(self.normal_user.name)
def test_auth_publisher_profile_notloggedin(self):
self._test_auth_not_allowed(status=302)
def test_auth_publisher_profile_sysadmin(self):
self._test_auth_allowed(self.sysadmin_user.name,auth_profile='publisher')
def test_auth_publisher_profile_publisher(self):
self._test_auth_allowed(self.publisher1_user.name,auth_profile='publisher')
def test_auth_publisher_profile_different_publisher(self):
# Create a source for publisher 1
source = HarvestSource(url=u'http://test-source.com',type='ckan',
publisher_id=self.publisher1.id)
Session.add(source)
Session.commit()
extra_environ = {'REMOTE_USER': self.publisher2_user.name.encode('utf8')}
# List (Publihsers can see the sources list)
res = self.app.get('/harvest', extra_environ=extra_environ)
assert 'Harvesting Sources' in res
# Create
res = self.app.get('/harvest/new', extra_environ=extra_environ)
assert 'New harvest source' in res
assert 'publisher_id' in res
# Check that this publihser is not allowed to manage sources from other publishers
status = 401
# Read
res = self.app.get('/harvest/%s' % source.id, status=status, extra_environ=extra_environ)
# Edit
res = self.app.get('/harvest/edit/%s' % source.id, status=status, extra_environ=extra_environ)
# Refresh
res = self.app.get('/harvest/refresh/%s' % source.id, status=status, extra_environ=extra_environ)

54
test-core.ini Normal file
View File

@ -0,0 +1,54 @@
[DEFAULT]
debug = true
# Uncomment and replace with the address which should receive any error reports
#email_to = you@yourdomain.com
smtp_server = localhost
error_email_from = paste@localhost
[server:main]
use = egg:Paste#http
host = 0.0.0.0
port = 5000
[app:main]
use = config:../ckan/test-core.ini
# Here we hard-code the database and a flag to make default tests
# run fast.
ckan.plugins = harvest ckan_harvester
# NB: other test configuration should go in test-core.ini, which is
# what the postgres tests use.
# Logging configuration
[loggers]
keys = root, ckan, sqlalchemy
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_ckan]
qualname = ckan
handlers =
level = INFO
[logger_sqlalchemy]
handlers =
qualname = sqlalchemy.engine
level = WARN
[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s

54
test.ini Normal file
View File

@ -0,0 +1,54 @@
[DEFAULT]
debug = true
# Uncomment and replace with the address which should receive any error reports
#email_to = you@yourdomain.com
smtp_server = localhost
error_email_from = paste@localhost
[server:main]
use = egg:Paste#http
host = 0.0.0.0
port = 5000
[app:main]
use = config:../ckan/test.ini
# Here we hard-code the database and a flag to make default tests
# run fast.
ckan.plugins = harvest ckan_harvester
# NB: other test configuration should go in test-core.ini, which is
# what the postgres tests use.
# Logging configuration
[loggers]
keys = root, ckan, sqlalchemy
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_ckan]
qualname = ckan
handlers =
level = INFO
[logger_sqlalchemy]
handlers =
qualname = sqlalchemy.engine
level = WARN
[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s